• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5060

17 May 2026 01:15AM UTC coverage: 73.425% (-0.02%) from 73.443%
#5060

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281800 of 383795 relevant lines covered (73.42%)

134332207.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.47
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
53,312,494✔
26
  if (readLock) {
53,312,494✔
27
    while (taosRTryLockLatch(pLock)) {
50,634,957✔
28
      taosMsleep(1);
3,454,458✔
29
    }
30

31
    return true;
47,180,499✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
6,131,995✔
35

36
  return true;
6,131,995✔
37
}
38

39
void mstDestroySStmVgStreamStatus(void* p) { 
597,336✔
40
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
597,336✔
41
  taosArrayDestroy(pStatus->trigReaders); 
597,336✔
42
  taosArrayDestroy(pStatus->calcReaders); 
597,336✔
43
}
597,336✔
44

45
void mstDestroySStmSnodeStreamStatus(void* p) { 
268,970✔
46
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
268,970✔
47
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,075,880✔
48
    taosArrayDestroy(pStatus->runners[i]);
806,910✔
49
    pStatus->runners[i] = NULL;
806,910✔
50
  }
51
}
268,970✔
52

53

54
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
55
  taosHashCleanup(pVgStatus->streamTasks);
×
56
  pVgStatus->streamTasks = NULL;
×
57
}
×
58

59
void mstDestroySStmTaskToDeployExt(void* param) {
2,014,337✔
60
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
2,014,337✔
61
  if (pExt->deployed) {
2,014,337✔
62
    return;
2,011,307✔
63
  }
64
  
65
  switch (pExt->deploy.task.type) {
3,030✔
66
    case STREAM_TRIGGER_TASK:
721✔
67
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
721✔
68
      pExt->deploy.msg.trigger.readerList = NULL;
721✔
69
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
721✔
70
      pExt->deploy.msg.trigger.runnerList = NULL;
721✔
71
      break;
721✔
72
    case STREAM_RUNNER_TASK:
1,527✔
73
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
1,527✔
74
      break;
1,527✔
75
    case STREAM_READER_TASK:
782✔
76
      if (!pExt->deploy.msg.reader.triggerReader) {
782✔
77
        SStreamReaderDeployFromCalc* pCalcReaderDeploy = &pExt->deploy.msg.reader.msg.calc;
307✔
78
        if (pCalcReaderDeploy->freeScanPlan) {
307✔
79
          taosMemoryFreeClear(pCalcReaderDeploy->calcScanPlan);
307✔
80
        }
81
      }
82
      break;
782✔
83
    default:  
×
84
      break;
×
85
  }
86
}
87

88
void mstDestroyScanAddrList(void* param) {
541,231✔
89
  if (NULL == param) {
541,231✔
90
    return;
×
91
  }
92
  SArray* pList = *(SArray**)param;
541,231✔
93
  taosArrayDestroy(pList);
541,231✔
94
}
95

96
void mstDestroySStmSnodeTasksDeploy(void* param) {
86,300✔
97
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
86,300✔
98
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
86,300✔
99
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
86,300✔
100
}
86,300✔
101

102
void mstDestroySStmVgTasksToDeploy(void* param) {
248,161✔
103
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
248,161✔
104
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
248,161✔
105
}
248,161✔
106

107
void mstDestroySStmSnodeStatus(void* param) {
44,773✔
108
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
44,773✔
109
  taosHashCleanup(pSnode->streamTasks);
44,773✔
110
}
44,773✔
111

112
void mstDestroySStmVgroupStatus(void* param) {
110,111✔
113
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
110,111✔
114
  taosHashCleanup(pVg->streamTasks);
110,111✔
115
}
110,111✔
116

117
void mstFreeTrigOReaderList(void* param) {
58,875✔
118
  SArray** ppList = (SArray**)param;
58,875✔
119
  taosArrayDestroy(*ppList);
58,875✔
120
}
58,875✔
121

122
void mstResetSStmStatus(SStmStatus* pStatus) {
260,098✔
123
  (void)mstWaitLock(&pStatus->resetLock, false);
260,098✔
124

125
  taosArrayDestroy(pStatus->trigReaders);
260,098✔
126
  pStatus->trigReaders = NULL;
260,098✔
127
  taosArrayDestroyEx(pStatus->trigOReaders, mstFreeTrigOReaderList);
260,098✔
128
  pStatus->trigOReaders = NULL;
260,098✔
129
  pStatus->calcReaders = tdListFree(pStatus->calcReaders);
260,098✔
130
  if (pStatus->triggerTask) {
260,098✔
131
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
259,420✔
132
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
259,420✔
133
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
259,420✔
134
  }
135
  taosMemoryFreeClear(pStatus->triggerTask);
260,098✔
136
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,040,392✔
137
    taosArrayDestroy(pStatus->runners[i]);
780,294✔
138
    pStatus->runners[i] = NULL;
780,294✔
139
  }
140
  pStatus->lastTrigMgmtReqId = 0;
260,098✔
141

142
  taosWUnLockLatch(&pStatus->resetLock);
260,098✔
143
}
260,098✔
144

145
void mstDestroySStmStatus(void* param) {
248,174✔
146
  SStmStatus* pStatus = (SStmStatus*)param;
248,174✔
147
  taosMemoryFreeClear(pStatus->streamName);
248,174✔
148

149
  mstResetSStmStatus(pStatus);
248,174✔
150

151
  taosWLockLatch(&pStatus->userRecalcLock);
248,174✔
152
  taosArrayDestroy(pStatus->userRecalcList);
248,174✔
153
  taosWUnLockLatch(&pStatus->userRecalcLock);
248,174✔
154

155
  tFreeSCMCreateStreamReq(pStatus->pCreate);
248,174✔
156
  taosMemoryFreeClear(pStatus->pCreate);  
248,174✔
157
}
248,174✔
158

159
void mstDestroySStmAction(void* param) {
318,606✔
160
  SStmAction* pAction = (SStmAction*)param;
318,606✔
161

162
  taosArrayDestroy(pAction->undeploy.taskList);
318,606✔
163
  taosArrayDestroy(pAction->recalc.recalcList);
318,606✔
164
}
318,606✔
165

166
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
371,224✔
167
  pDeploy->readerTasks = NULL;
371,224✔
168
  pDeploy->triggerTask = NULL;
371,224✔
169
  pDeploy->runnerTasks = NULL;
371,224✔
170
}
371,224✔
171

172
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
331,632✔
173
  SSdb   *pSdb = pMnode->pSdb;
331,632✔
174
  void   *pIter = NULL;
331,632✔
175
  
176
  while (1) {
1,262,120✔
177
    SStreamObj *pStream = NULL;
1,593,752✔
178
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
1,593,752✔
179
    if (pIter == NULL) break;
1,593,752✔
180

181
    if (pStream->pCreate->streamId == streamId) {
1,357,591✔
182
      *dropped = pStream->userDropped ? true : false;
95,471✔
183
      sdbRelease(pSdb, pStream);
95,471✔
184
      sdbCancelFetch(pSdb, pIter);
95,471✔
185
      mstsDebug("stream found, dropped:%d", *dropped);
95,471✔
186
      return TSDB_CODE_SUCCESS;
95,471✔
187
    }
188
    
189
    sdbRelease(pSdb, pStream);
1,262,120✔
190
  }
191

192
  *dropped = true;
236,161✔
193

194
  return TSDB_CODE_SUCCESS;
236,161✔
195
}
196

197
typedef struct SStmCheckDbInUseCtx {
198
  bool* dbStream;
199
  bool* vtableStream;
200
  bool  ignoreCurrDb;
201
} SStmCheckDbInUseCtx;
202

203
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
46,521✔
204
  SStreamObj *pStream = pObj;
46,521✔
205
  if (atomic_load_8(&pStream->userDropped)) {
46,521✔
206
    return true;
×
207
  }
208

209
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
46,521✔
210
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
46,521✔
211
    return true;
22,796✔
212
  }
213
  
214
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
23,725✔
215
    *pCtx->dbStream = true;
819✔
216
    return false;
819✔
217
  }
218

219
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
22,906✔
220
  for (int32_t i = 0; i < calcDBNum; ++i) {
45,250✔
221
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
22,906✔
222
    if (0 == strcmp(calcDB, p1)) {
22,906✔
223
      *pCtx->dbStream = true;
562✔
224
      return false;
562✔
225
    }
226
  }
227

228
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
22,344✔
229
    *pCtx->vtableStream = true;
12,381✔
230
    return true;
12,381✔
231
  }
232
  
233
  return true;
9,963✔
234
}
235

236
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
866,062✔
237
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
866,062✔
238
  if (streamNum <= 0) {
866,062✔
239
    return;
854,624✔
240
  }
241

242
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
11,438✔
243
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
11,438✔
244
}
245

246
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
247
  if (status == STREAM_STATUS_INIT) {
×
248
    tstrncpy(dst, "init", bufLen);
×
249
  } else if (status == STREAM_STATUS_RUNNING) {
×
250
    tstrncpy(dst, "running", bufLen);
×
251
  } else if (status == STREAM_STATUS_STOPPED) {
×
252
    tstrncpy(dst, "stopped", bufLen);
×
253
  } else if (status == STREAM_STATUS_FAILED) {
×
254
    tstrncpy(dst, "failed", bufLen);
×
255
  }
256
}
×
257

258
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
259
  SSdb      *pSdb = pMnode->pSdb;
×
260
  void      *pIter = NULL;
×
261
  SSnodeObj *pObj = NULL;
×
262

263
  while (1) {
264
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
265
    if (pIter == NULL) {
×
266
      break;
×
267
    }
268

269
    sdbRelease(pSdb, pObj);
×
270
    sdbCancelFetch(pSdb, pIter);
×
271
    return TSDB_CODE_SUCCESS;
×
272
  }
273

274
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
275
}
276

277
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
278
  pTask->id.taskId = pMsg->taskId;
×
279
  pTask->id.deployId = pMsg->deployId;
×
280
  pTask->id.seriousId = pMsg->seriousId;
×
281
  pTask->id.nodeId = pMsg->nodeId;
×
282
  pTask->id.taskIdx = pMsg->taskIdx;
×
283

284
  pTask->type = pMsg->type;
×
285
  pTask->flags = pMsg->flags;
×
286
  pTask->status = pMsg->status;
×
287
  pTask->lastUpTs = pCtx->currTs;
×
288
}
×
289

290
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
863,078✔
291
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
863,078✔
292
    return false;
586,487✔
293
  }
294

295
  SStmQNode *orig = pQueue->head;
276,591✔
296

297
  SStmQNode *node = pQueue->head->next;
276,591✔
298
  pQueue->head = pQueue->head->next;
276,591✔
299

300
  *param = node;
276,591✔
301

302
  taosMemoryFreeClear(orig);
276,591✔
303

304
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
276,591✔
305

306
  return true;
276,591✔
307
}
308

309
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
276,591✔
310
  taosWLockLatch(&pQueue->lock);
276,591✔
311
  pQueue->tail->next = param;
276,591✔
312
  pQueue->tail = param;
276,591✔
313
  taosWUnLockLatch(&pQueue->lock);
276,591✔
314

315
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
276,591✔
316
}
276,591✔
317

318
char* mstGetStreamActionString(int32_t action) {
243,200✔
319
  switch (action) {
243,200✔
320
    case STREAM_ACT_DEPLOY:
243,200✔
321
      return "DEPLOY";
243,200✔
322
    case STREAM_ACT_UNDEPLOY:
×
323
      return "UNDEPLOY";
×
324
    case STREAM_ACT_START:
×
325
      return "START";
×
326
    case STREAM_ACT_UPDATE_TRIGGER:
×
327
      return "UPDATE TRIGGER";
×
328
    case STREAM_ACT_RECALC:
×
329
      return "USER RECALC";
×
330
    default:
×
331
      break;
×
332
  }
333

334
  return "UNKNOWN";
×
335
}
336

337
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
269,564✔
338
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
269,564✔
339
  if (NULL == pNode) {
269,564✔
340
    taosMemoryFreeClear(param);
×
341
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
342
    return;
×
343
  }
344

345
  pNode->type = action;
269,564✔
346
  pNode->streamAct = true;
269,564✔
347
  pNode->action.stream.streamId = streamId;
269,564✔
348
  tstrncpy(pNode->action.stream.streamName, streamName, sizeof(pNode->action.stream.streamName));
269,564✔
349
  pNode->action.stream.userAction = userAction;
269,564✔
350
  pNode->action.stream.actionParam = param;
269,564✔
351
  
352
  pNode->next = NULL;
269,564✔
353

354
  mndStreamActionEnqueue(actionQ, pNode);
269,564✔
355

356
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
269,564✔
357
}
358

359
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
7,027✔
360
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
7,027✔
361
  if (NULL == pNode) {
7,027✔
362
    int64_t streamId = pAction->streamId;
×
363
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
364
    return;
×
365
  }
366

367
  pNode->type = action;
7,027✔
368
  pNode->streamAct = false;
7,027✔
369
  pNode->action.task = *pAction;
7,027✔
370
  
371
  pNode->next = NULL;
7,027✔
372

373
  mndStreamActionEnqueue(actionQ, pNode);
7,027✔
374
}
375

376
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
71,061✔
377
  int32_t iter = 0;
71,061✔
378
  SDBVgHashInfo* pVg = NULL;
71,061✔
379
  void* p = NULL;
71,061✔
380
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
330,185✔
381
    pVg = (SDBVgHashInfo*)p;
259,124✔
382
    taosArrayDestroy(pVg->vgArray);
259,124✔
383
  }
384
  
385
  tSimpleHashCleanup(pDbVgs);
71,061✔
386
}
71,061✔
387

388

389
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
71,061✔
390
  void*   pIter = NULL;
71,061✔
391
  int32_t code = TSDB_CODE_SUCCESS;
71,061✔
392
  int32_t lino = 0;
71,061✔
393
  SArray* pTarget = NULL;
71,061✔
394
  SArray* pNew = NULL;
71,061✔
395
  SDbObj* pDb = NULL;
71,061✔
396
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
71,061✔
397
  SVgObj* pVgroup = NULL;
71,061✔
398

399
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
71,061✔
400
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
71,061✔
401

402
  while (1) {
429,747✔
403
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
500,808✔
404
    if (pIter == NULL) {
500,808✔
405
      break;
71,061✔
406
    }
407

408
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
429,747✔
409
    if (NULL == pDbInfo) {
429,747✔
410
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
259,124✔
411
      if (NULL == pNew) {
259,124✔
412
        sdbRelease(pMnode->pSdb, pVgroup);
×
413
        sdbCancelFetch(pMnode->pSdb, pIter);
×
414
        pVgroup = NULL;
×
415
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
416
      }
417
      
418
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
259,124✔
419
      if (NULL == pDb) {
259,124✔
420
        sdbRelease(pMnode->pSdb, pVgroup);
×
421
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
422
        pVgroup = NULL;
×
423
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
424
      }
425
      dbInfo.vgSorted = false;
259,124✔
426
      dbInfo.hashMethod = pDb->cfg.hashMethod;
259,124✔
427
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
259,124✔
428
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
259,124✔
429
      dbInfo.vgArray = pNew;
259,124✔
430
      
431
      mndReleaseDb(pMnode, pDb);
259,124✔
432

433
      pTarget = pNew;
259,124✔
434
    } else {
435
      pTarget = pDbInfo->vgArray;
170,623✔
436
    }
437

438
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
429,747✔
439
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
429,747✔
440
      sdbRelease(pMnode->pSdb, pVgroup);
×
441
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
442
      pVgroup = NULL;
×
443
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
444
    }
445

446
    if (NULL == pDbInfo) {
429,747✔
447
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
259,124✔
448
      if (code) {
259,124✔
449
        sdbRelease(pMnode->pSdb, pVgroup);
×
450
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
451
        pVgroup = NULL;
×
452
        TAOS_CHECK_EXIT(code);
×
453
      }
454
      pNew = NULL;
259,124✔
455
    }
456

457
    sdbRelease(pMnode->pSdb, pVgroup);
429,747✔
458
    pVgroup = NULL;
429,747✔
459
  }
460

461
  *ppRes = pDbVgroup;
71,061✔
462
  
463
_exit:
71,061✔
464

465
  taosArrayDestroy(pNew);
71,061✔
466

467
  if (code) {
71,061✔
468
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
469
  }
470

471
  return code;
71,061✔
472
}
473

474
int mstDbVgInfoComp(const void* lp, const void* rp) {
121,272✔
475
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
121,272✔
476
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
121,272✔
477
  if (pLeft->hashBegin < pRight->hashBegin) {
121,272✔
478
    return -1;
121,272✔
479
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
480
    return 1;
×
481
  }
482

483
  return 0;
×
484
}
485

486
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
281,673✔
487
  uint32_t*    key = (uint32_t*)lp;
281,673✔
488
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
281,673✔
489

490
  if (*key < pVg->hashBegin) {
281,673✔
491
    return -1;
×
492
  } else if (*key > pVg->hashEnd) {
281,673✔
493
    return 1;
75,762✔
494
  }
495

496
  return 0;
205,911✔
497
}
498

499

500
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
205,911✔
501
  int32_t code = 0;
205,911✔
502
  int32_t lino = 0;
205,911✔
503
  SVgroupInfo* vgInfo = NULL;
205,911✔
504
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
205,911✔
505

506
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
205,911✔
507
  if (NULL == dbInfo) {
205,911✔
508
    mstError("db %s does not exist", dbFName);
×
509
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
510
  }
511
  
512
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
205,911✔
513
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
411,822✔
514
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
205,911✔
515

516
  if (!dbInfo->vgSorted) {
205,911✔
517
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
71,343✔
518
    dbInfo->vgSorted = true;
71,343✔
519
  }
520

521
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
205,911✔
522
  if (NULL == vgInfo) {
205,911✔
523
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
524
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
525
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
526
  }
527

528
  *vgId = vgInfo->vgId;
205,911✔
529

530
_exit:
205,911✔
531

532
  if (code) {
205,911✔
533
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
534
  }
535

536
  return code;
205,911✔
537
}
538

539

540
void mstLogSStreamObj(char* tips, SStreamObj* p) {
255,120✔
541
  if (!(stDebugFlag & DEBUG_DEBUG)) {
255,120✔
542
    return;
25,804✔
543
  }
544
  
545
  if (NULL == p) {
229,316✔
546
    mstDebug("%s: stream is NULL", tips);
×
547
    return;
×
548
  }
549

550
  mstDebug("%s: stream obj", tips);
229,316✔
551
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
229,316✔
552
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
553

554
  SCMCreateStreamReq* q = p->pCreate;
229,316✔
555
  if (NULL == q) {
229,316✔
556
    mstDebug("stream pCreate is NULL");
×
557
    return;
×
558
  }
559

560
  int64_t streamId = q->streamId;
229,316✔
561
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
229,316✔
562
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
229,316✔
563
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
229,316✔
564
  int32_t outColNum = taosArrayGetSize(q->outCols);
229,316✔
565
  int32_t outTagNum = taosArrayGetSize(q->outTags);
229,316✔
566
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
229,316✔
567

568
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
229,316✔
569
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
570
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d enableMultiGroupCalc:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d "
571
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
572
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
573
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d calcPkSlotId:%d triPkSlotId:%d "
574
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d idleTimeoutMs:%" PRId64,
575
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
576
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
577
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, q->enableMultiGroupCalc, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory,
578
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
579
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
580
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId, q->calcPkSlotId, q->triPkSlotId,
581
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum, q->idleTimeoutMs);
582

583
  switch (q->triggerType) {
229,316✔
584
    case WINDOW_TYPE_INTERVAL: {
83,940✔
585
      SSlidingTrigger* t = &q->trigger.sliding;
83,940✔
586
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
83,940✔
587
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
588
      break;
83,940✔
589
    }  
590
    case WINDOW_TYPE_SESSION: {
9,219✔
591
      SSessionTrigger* t = &q->trigger.session;
9,219✔
592
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
9,219✔
593
      break;
9,219✔
594
    }
595
    case WINDOW_TYPE_STATE: {
73,123✔
596
      SStateWinTrigger* t = &q->trigger.stateWin;
73,123✔
597
      int32_t           slotIdNum = t->pSlotIds ? taosArrayGetSize(t->pSlotIds) : 0;
73,123✔
598
      int16_t           firstSlotId = (slotIdNum > 0) ? *(int16_t*)taosArrayGet(t->pSlotIds, 0) : -1;
73,123✔
599
      mstsDebug(
73,123✔
600
          "state trigger options, slotIdNum:%d, firstSlotId:%d, expr:%s, extend:%d, zeroth:%s, trueForType: %d, trueForCount: %d, "
601
          "trueForDuration:%" PRId64,
602
          slotIdNum, firstSlotId, (char*)t->expr, t->extend, (char*)t->zeroth, t->trueForType, t->trueForCount,
603
          t->trueForDuration);
604
      break;
73,123✔
605
    }
606
    case WINDOW_TYPE_EVENT:{
24,831✔
607
      SEventTrigger* t = &q->trigger.event;
24,831✔
608
      mstsDebug(
24,831✔
609
          "event trigger options, startCond:%s, endCond:%s, trueForType: %d, trueForCount: %d, "
610
          "trueForDuration:%" PRId64,
611
          (char*)t->startCond, (char*)t->endCond, t->trueForType, t->trueForCount, t->trueForDuration);
612
      break;
24,831✔
613
    }
614
    case WINDOW_TYPE_COUNT: {
26,077✔
615
      SCountTrigger* t = &q->trigger.count;
26,077✔
616
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
26,077✔
617
      break;
26,077✔
618
    }
619
    case WINDOW_TYPE_PERIOD: {
12,126✔
620
      SPeriodTrigger* t = &q->trigger.period;
12,126✔
621
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
12,126✔
622
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
623
      break;
12,126✔
624
    }
625
    default:
×
626
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
627
      break;
×
628
  }
629

630
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
229,316✔
631

632
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
229,316✔
633

634
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
229,316✔
635

636
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
229,316✔
637

638
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
229,316✔
639

640
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
229,316✔
641

642

643
  for (int32_t i = 0; i < calcDBNum; ++i) {
455,045✔
644
    char* dbName = taosArrayGetP(q->calcDB, i);
225,729✔
645
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
225,729✔
646
  }
647

648
  for (int32_t i = 0; i < calcScanNum; ++i) {
742,912✔
649
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
513,596✔
650
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
513,596✔
651
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
513,596✔
652
    for (int32_t v = 0; v < vgNum; ++v) {
1,027,192✔
653
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
513,596✔
654
    }
655
  }
656

657
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
288,245✔
658
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
58,929✔
659
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
58,929✔
660
  }
661

662
  for (int32_t i = 0; i < outColNum; ++i) {
1,142,566✔
663
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
913,250✔
664
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
913,250✔
665
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
666
  }
667
      
668
}
669

670
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
1,798,374✔
671
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
1,798,374✔
672
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
673
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
674
}
1,798,374✔
675

676
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
259,420✔
677
  if (!(stDebugFlag & DEBUG_DEBUG)) {
259,420✔
678
    return;
25,804✔
679
  }
680
  
681
  if (NULL == p) {
233,616✔
682
    mstsDebug("%s: stream status is NULL", tips);
×
683
    return;
×
684
  }
685

686
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
233,616✔
687
  int32_t trigOReaderNum = msmGetTrigOReaderSize(p->trigOReaders);
233,616✔
688
  int32_t calcReaderNum = MST_LIST_SIZE(p->calcReaders);
233,616✔
689
  int32_t triggerNum = p->triggerTask ? 1 : 0;
233,616✔
690
  int32_t runnerNum = 0;
233,616✔
691

692
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
923,703✔
693
    runnerNum += taosArrayGetSize(p->runners[i]);
690,087✔
694
  }
695

696
  mstsDebug("%s: stream status", tips);
233,616✔
697
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
233,616✔
698
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
699
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
700
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
701

702
  SStmTaskStatus* pTask = NULL;
233,616✔
703
  for (int32_t i = 0; i < trigReaderNum; ++i) {
527,104✔
704
    pTask = taosArrayGet(p->trigReaders, i);
293,488✔
705
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
293,488✔
706
  }
707

708
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
233,616✔
709
    pTask = msmGetTrigOReader(p->trigOReaders, i);
×
710
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
711
  }
712

713
  SListNode* pNode = listHead(p->calcReaders);
233,616✔
714
  for (int32_t i = 0; i < calcReaderNum; ++i) {
680,789✔
715
    pTask = (SStmTaskStatus*)pNode->data;
447,173✔
716
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
447,173✔
717
    pNode = TD_DLIST_NODE_NEXT(pNode);
447,173✔
718
  }
719

720
  if (triggerNum > 0) {
233,616✔
721
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
233,616✔
722
  }
723

724
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
923,703✔
725
    int32_t num = taosArrayGetSize(p->runners[i]);
690,087✔
726
    if (num <= 0) {
690,087✔
727
      continue;
×
728
    }
729
    
730
    mstsDebug("the %dth deploy runners status", i);
690,087✔
731
    for (int32_t m = 0; m < num; ++m) {
1,514,184✔
732
      pTask = taosArrayGet(p->runners[i], m);
824,097✔
733
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
824,097✔
734
    }
735
  }
736
      
737
}
738

739
bool mstEventPassIsolation(int32_t num, int32_t event) {
6,709,447✔
740
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_SHORT_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
6,709,447✔
741
  if (ret) {
6,709,447✔
742
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
5,234,588✔
743
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
744
  }
745

746
  return ret;
6,709,447✔
747
}
748

749
bool mstEventHandledChkSet(int32_t event) {
5,234,588✔
750
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
5,234,588✔
751
    mstDebug("event %s set handled", gMndStreamEvent[event]);
269,217✔
752
    return true;
269,217✔
753
  }
754
  return false;
4,965,371✔
755
}
756

757
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
940,716✔
758
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
940,716✔
759
  if (0 == active || MND_STM_STATE_NORMAL != state) {
940,716✔
760
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
238✔
761
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
238✔
762
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
238✔
763
    return TSDB_CODE_SUCCESS;
238✔
764
  }
765

766
  if (atomic_load_8(&pStream->userDropped)) {
940,478✔
767
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
768
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
769
    return TSDB_CODE_SUCCESS;
×
770
  }
771

772
  if (atomic_load_8(&pStream->userStopped)) {
940,478✔
773
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
5,868✔
774
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
5,868✔
775
    return TSDB_CODE_SUCCESS;
5,868✔
776
  }
777

778
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
934,610✔
779
  
780
  int64_t streamId = pStream->pCreate->streamId;
934,610✔
781
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
934,610✔
782
  if (NULL == pStatus) {
934,610✔
783
    mstDebug("return Undeployed: stream not in streamMap (show streams)");
192,703✔
784
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
192,703✔
785
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
192,703✔
786
    goto _exit;
192,703✔
787
  }
788

789
  char tmpBuf[256];
741,907✔
790
  int8_t stopped = atomic_load_8(&pStatus->stopped);
741,907✔
791
  switch (stopped) {
741,907✔
792
    case 1:
24,066✔
793
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
24,066✔
794
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
24,066✔
795
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
24,066✔
796
      goto _exit;
24,066✔
797
      break;
798
    case 4:
×
799
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
800
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
801
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
802
      goto _exit;
×
803
      break;
804
    default:
717,841✔
805
      break;
717,841✔
806
  }
807

808
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
717,841✔
809
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
407,623✔
810
    tstrncpy(tmpBuf, "Running start from: ", sizeof(tmpBuf));
407,623✔
811
    int32_t offset = strlen(tmpBuf);
407,623✔
812
    (void)formatTimestampLocal(tmpBuf + offset, sizeof(tmpBuf) - offset, pStatus->triggerTask->runningStartTs,
407,623✔
813
                               TSDB_TIME_PRECISION_MILLI);
814
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
407,623✔
815
    goto _exit;
407,623✔
816
  }
817

818
  mstsDebug("return Idle: pStatus=%p triggerTask=%p triggerStatus=%d (show streams view)",
310,218✔
819
      (void*)pStatus, (void*)(pStatus ? pStatus->triggerTask : NULL),
820
      (pStatus && pStatus->triggerTask) ? (int)pStatus->triggerTask->status : -1);
821
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
310,218✔
822
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
310,218✔
823
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
310,218✔
824
  goto _exit;
310,218✔
825

826
_exit:
934,610✔
827
  
828
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
934,610✔
829

830
  return TSDB_CODE_SUCCESS;
934,610✔
831
}
832

833
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
940,716✔
834
  int32_t code = 0;
940,716✔
835
  int32_t cols = 0;
940,716✔
836
  int32_t lino = 0;
940,716✔
837

838
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
940,716✔
839
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
940,716✔
840
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
841
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
842

843
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
940,716✔
844
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
845

846
  // db_name
847
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
940,716✔
848
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
940,716✔
849
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
850
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
851
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
940,716✔
852
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
853

854
  // create time
855
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
856
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
857
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
940,716✔
858
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
859

860
  // stream id
861
  char streamId2[19] = {0};
940,716✔
862
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
940,716✔
863
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
940,716✔
864
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
940,716✔
865
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
866
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
867
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
940,716✔
868
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
869

870
  // sql
871
  char sql[TSDB_INS_STREAM_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
940,716✔
872
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
940,716✔
873
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
874
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
875
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
940,716✔
876
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
877

878
  // status
879
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
940,716✔
880
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
940,716✔
881
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
940,716✔
882
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
883

884
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
885
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
886
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
940,716✔
887
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
888

889
  // snodeLeader
890
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
891
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
892
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
940,716✔
893
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
894

895
  // snodeReplica
896
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
897
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
898
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
940,716✔
899
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
940,716✔
900
  mndReleaseSnode(pMnode, pSnode);
940,716✔
901
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
940,716✔
902
  TSDB_CHECK_CODE(code, lino, _end);
940,716✔
903

904
  // msg
905
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
940,716✔
906
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
940,716✔
907
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
940,716✔
908

909
_end:
940,716✔
910
  if (code) {
940,716✔
911
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
912
  }
913
  return code;
940,716✔
914
}
915

916

917
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
64,992,959✔
918
  char tmpBuf[256];
64,963,979✔
919
  
920
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
64,992,959✔
921
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
64,992,959✔
922
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
257✔
923
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
257✔
924
    return TSDB_CODE_SUCCESS;
257✔
925
  }
926

927
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
64,992,702✔
928
    if (pTask->detailStatus) {
8,507,300✔
929
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
7,824,546✔
930
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
7,824,546✔
931
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
932
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
7,824,546✔
933
      taosRUnLockLatch(&pTask->detailStatusLock);
7,824,546✔
934
      return TSDB_CODE_SUCCESS;
7,824,546✔
935
    }
936

937
    taosRUnLockLatch(&pTask->detailStatusLock);    
682,754✔
938
  }
939
  
940
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
57,168,156✔
941
  
942
  return TSDB_CODE_SUCCESS;
57,168,156✔
943
}
944

945
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
64,992,959✔
946
  switch (pTask->type) {
64,992,959✔
947
    case STREAM_READER_TASK:
31,340,462✔
948
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
31,340,462✔
949
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
14,204,908✔
950
      } else {
951
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
17,135,554✔
952
      }
953
      return TSDB_CODE_SUCCESS;
31,340,462✔
954
    case STREAM_RUNNER_TASK:
25,145,197✔
955
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
25,145,197✔
956
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
24,849,985✔
957
        return TSDB_CODE_SUCCESS;
24,849,985✔
958
      }
959
      break;
295,212✔
960
    default:
8,507,300✔
961
      break;
8,507,300✔
962
  }
963

964
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
8,802,512✔
965
  return TSDB_CODE_SUCCESS;
8,802,512✔
966
}
967

968

969
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
64,992,959✔
970
  int32_t code = 0;
64,992,959✔
971
  int32_t cols = 0;
64,992,959✔
972
  int32_t lino = 0;
64,992,959✔
973

974
  // stream_name
975
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
64,992,959✔
976
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
64,992,959✔
977
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
978
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
979

980
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
64,992,959✔
981
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
982

983
  // stream id
984
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
64,992,959✔
985
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
64,992,959✔
986
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
64,992,959✔
987
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
988
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
989
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
64,992,959✔
990
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
991

992
  // task id
993
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
64,992,959✔
994
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
64,992,959✔
995
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
996
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
997
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
64,992,959✔
998
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
999

1000
  // type
1001
  char type[20 + VARSTR_HEADER_SIZE] = {0};
64,992,959✔
1002
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
64,992,959✔
1003
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1004
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1005
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
64,992,959✔
1006
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1007

1008
  // serious id
1009
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
64,992,959✔
1010
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
64,992,959✔
1011
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1012
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1013
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
64,992,959✔
1014
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1015

1016
  // deploy id
1017
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1018
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1019
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
64,992,959✔
1020
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1021

1022
  // node_type
1023
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
64,992,959✔
1024
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
64,992,959✔
1025
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1026
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1027
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
64,992,959✔
1028
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1029

1030
  // node id
1031
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1032
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1033
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
64,992,959✔
1034
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1035

1036
  // task idx
1037
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1038
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1039
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
64,992,959✔
1040
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1041

1042
  // status
1043
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
64,992,959✔
1044
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
64,992,959✔
1045
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
64,992,959✔
1046
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1047

1048
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1049
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1050
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
64,992,959✔
1051
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1052

1053
  // start time
1054
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1055
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1056
  if (pTask->runningStartTs) {
64,992,959✔
1057
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
22,058,822✔
1058
  } else {
1059
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
42,934,137✔
1060
  }
1061
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1062

1063
  // last update
1064
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1065
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1066
  if (pTask->lastUpTs) {
64,992,959✔
1067
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
64,992,959✔
1068
  } else {
1069
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1070
  }
1071
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1072

1073
  // extra info
1074
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
64,992,959✔
1075
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
64,992,959✔
1076
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1077
  
1078
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1079
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1080
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
64,992,959✔
1081
  TSDB_CHECK_CODE(code, lino, _end);
64,992,959✔
1082

1083
  // msg
1084
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
64,992,959✔
1085
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
64,992,959✔
1086
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
64,992,959✔
1087

1088
_end:
64,992,959✔
1089
  if (code) {
64,992,959✔
1090
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1091
  }
1092
  return code;
64,992,959✔
1093
}
1094

1095
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
8,513,313✔
1096
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + msmGetTrigOReaderSize(pStatus->trigOReaders) + MST_LIST_SIZE(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
8,513,313✔
1097
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
34,053,252✔
1098
    num += taosArrayGetSize(pStatus->runners[i]);
25,539,939✔
1099
  }
1100

1101
  return num;
8,513,313✔
1102
}
1103

1104
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
8,894,367✔
1105
  int32_t code = 0;
8,894,367✔
1106
  int32_t lino = 0;
8,894,367✔
1107
  int64_t streamId = pStream->pCreate->streamId;
8,894,367✔
1108
  bool    statusLocked = false;
8,894,367✔
1109

1110
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
8,894,367✔
1111

1112
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
8,894,367✔
1113
  if (NULL == pStatus) {
8,894,367✔
1114
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
230,686✔
1115
    goto _exit;
230,686✔
1116
  }
1117

1118
  int8_t stopped = atomic_load_8(&pStatus->stopped);
8,663,681✔
1119
  if (stopped) {
8,663,681✔
1120
    mstsDebug("stream stopped %d, ignore it", stopped);
150,368✔
1121
    goto _exit;
150,368✔
1122
  }
1123

1124
  (void)mstWaitLock(&pStatus->resetLock, true);
8,513,313✔
1125
  statusLocked = true;
8,513,313✔
1126
  
1127
  int32_t count = mstGetNumOfStreamTasks(pStatus);
8,513,313✔
1128

1129
  if (*numOfRows + count > rowsCapacity) {
8,513,313✔
1130
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
316,228✔
1131
    if (code) {
316,228✔
1132
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1133
      TAOS_CHECK_EXIT(code);
×
1134
    }
1135
  }
1136

1137
  SStmTaskStatus* pTask = NULL;
8,513,313✔
1138
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
8,513,313✔
1139
  for (int32_t i = 0; i < trigReaderNum; ++i) {
18,002,156✔
1140
    pTask = taosArrayGet(pStatus->trigReaders, i);
9,488,843✔
1141
  
1142
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
9,488,843✔
1143
    if (code == TSDB_CODE_SUCCESS) {
9,488,843✔
1144
      (*numOfRows)++;
9,488,843✔
1145
    }
1146
  }
1147

1148
  trigReaderNum = msmGetTrigOReaderSize(pStatus->trigOReaders);
8,513,313✔
1149
  for (int32_t i = 0; i < trigReaderNum; ++i) {
13,229,378✔
1150
    pTask = msmGetTrigOReader(pStatus->trigOReaders, i);
4,716,065✔
1151
  
1152
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
4,716,065✔
1153
    if (code == TSDB_CODE_SUCCESS) {
4,716,065✔
1154
      (*numOfRows)++;
4,716,065✔
1155
    }
1156
  }
1157

1158
  if (pStatus->calcReaders) {
8,513,313✔
1159
    int32_t calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
8,513,313✔
1160
    SListNode* pNode = listHead(pStatus->calcReaders);
8,513,313✔
1161
    for (int32_t i = 0; i < calcReaderNum; ++i) {
25,648,867✔
1162
      pTask = (SStmTaskStatus*)pNode->data;
17,135,554✔
1163
    
1164
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
17,135,554✔
1165
      if (code == TSDB_CODE_SUCCESS) {
17,135,554✔
1166
        (*numOfRows)++;
17,135,554✔
1167
      }
1168
      pNode = TD_DLIST_NODE_NEXT(pNode);
17,135,554✔
1169
    }
1170
  }
1171

1172
  if (pStatus->triggerTask) {
8,513,313✔
1173
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
8,507,300✔
1174
    if (code == TSDB_CODE_SUCCESS) {
8,507,300✔
1175
      (*numOfRows)++;
8,507,300✔
1176
    }
1177
  }
1178

1179
  int32_t runnerNum = 0;
8,513,313✔
1180
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
34,053,252✔
1181
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
25,539,939✔
1182
    for (int32_t m = 0; m < runnerNum; ++m) {
50,685,136✔
1183
      pTask = taosArrayGet(pStatus->runners[i], m);
25,145,197✔
1184
    
1185
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
25,145,197✔
1186
      if (code == TSDB_CODE_SUCCESS) {
25,145,197✔
1187
        (*numOfRows)++;
25,145,197✔
1188
      }
1189
    }
1190
  }
1191
  
1192
  pBlock->info.rows = *numOfRows;
8,513,313✔
1193

1194
_exit:
8,894,367✔
1195

1196
  if (statusLocked) {
8,894,367✔
1197
    taosRUnLockLatch(&pStatus->resetLock);
8,513,313✔
1198
  }
1199
  
1200
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
8,894,367✔
1201

1202
  if (code) {
8,894,367✔
1203
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1204
  }
1205
  
1206
  return code;
8,894,367✔
1207
}
1208

1209

1210
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
4,276✔
1211
  int32_t code = 0;
4,276✔
1212
  int32_t lino = 0;
4,276✔
1213
  bool    locked = false;
4,276✔
1214
  SArray* userRecalcList = NULL;
4,276✔
1215

1216
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
4,276✔
1217
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
4,276✔
1218
  
1219
  taosWLockLatch(&pStream->userRecalcLock);
4,276✔
1220
  locked = true;
4,276✔
1221
  
1222
  if (NULL == pStream->userRecalcList) {
4,276✔
1223
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
4,276✔
1224
    if (NULL == userRecalcList) {
4,276✔
1225
      TAOS_CHECK_EXIT(terrno);
×
1226
    }
1227

1228
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
4,276✔
1229

1230
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
4,276✔
1231
    userRecalcList = NULL;    
4,276✔
1232
  } else {
1233
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1234
  }
1235
  
1236
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
4,276✔
1237

1238
_exit:
4,276✔
1239

1240
  taosArrayDestroy(userRecalcList);
4,276✔
1241

1242
  if (locked) {
4,276✔
1243
    taosWUnLockLatch(&pStream->userRecalcLock);
4,276✔
1244
  }
1245
  
1246
  if (code) {
4,276✔
1247
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1248
  }
1249
  
1250
  return code;
4,276✔
1251
}
1252

1253

1254

1255
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1256
  int32_t code = 0;
×
1257
  int32_t cols = 0;
×
1258
  int32_t lino = 0;
×
1259

1260
  // stream_name
1261
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1262
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1263
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1264
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1265

1266
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1267
  TSDB_CHECK_CODE(code, lino, _end);
×
1268

1269
  // stream id
1270
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1271
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1272
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1273
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1274
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1275
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1276
  TSDB_CHECK_CODE(code, lino, _end);
×
1277

1278
  // recalc id
1279
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1280
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1281
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1282
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1283
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1284
  TSDB_CHECK_CODE(code, lino, _end);
×
1285

1286
  // start
1287
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1288
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1289
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1290
  TSDB_CHECK_CODE(code, lino, _end);
×
1291

1292
  // end
1293
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1294
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1295
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1296
  TSDB_CHECK_CODE(code, lino, _end);
×
1297

1298
  // progress
1299
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1300
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1301
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1302
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1303
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1304
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1305
  TSDB_CHECK_CODE(code, lino, _end);
×
1306

1307
_end:
×
1308
  if (code) {
×
1309
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1310
  }
1311
  return code;
×
1312
}
1313

1314

1315
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
324✔
1316
  int32_t code = 0;
324✔
1317
  int32_t lino = 0;
324✔
1318
  int64_t streamId = pStream->pCreate->streamId;
324✔
1319

1320
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
324✔
1321

1322
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
324✔
1323
  if (NULL == pStatus) {
324✔
1324
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1325
    goto _exit;
×
1326
  }
1327

1328
  int8_t stopped = atomic_load_8(&pStatus->stopped);
324✔
1329
  if (stopped) {
324✔
1330
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1331
    goto _exit;
×
1332
  }
1333

1334
  if (NULL == pStatus->triggerTask) {
324✔
1335
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1336
    goto _exit;
×
1337
  }
1338

1339
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
324✔
1340
  if (NULL == pStatus->triggerTask->detailStatus) {
324✔
1341
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1342
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1343
    goto _exit;
×
1344
  }
1345

1346
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
324✔
1347
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
324✔
1348

1349
  if (*numOfRows + count > rowsCapacity) {
324✔
1350
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1351
    if (code) {
×
1352
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1353
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1354
      TAOS_CHECK_EXIT(code);
×
1355
    }
1356
  }
1357

1358
  for (int32_t i = 0; i < count; ++i) {
324✔
1359
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1360
  
1361
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1362
    if (code == TSDB_CODE_SUCCESS) {
×
1363
      (*numOfRows)++;
×
1364
    }
1365
  }
1366

1367
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
324✔
1368
  
1369
  pBlock->info.rows = *numOfRows;
324✔
1370

1371
_exit:
324✔
1372
  
1373
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
324✔
1374

1375
  if (code) {
324✔
1376
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1377
  }
1378
  
1379
  return code;
324✔
1380
}
1381

1382
int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) {
459,379✔
1383
  SSubplan* pSubplan = NULL;
459,379✔
1384
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
459,379✔
1385
  
1386
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
459,379✔
1387

1388
  if (pSubplan->pNode && nodeType(pSubplan->pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
459,379✔
1389
    SScanPhysiNode* pScanNode = (SScanPhysiNode*)pSubplan->pNode;
198,141✔
1390
    *uid = pScanNode->uid;
198,141✔
1391
  }
1392
  
1393
_exit:
458,858✔
1394

1395
  if (code) {
459,379✔
1396
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1397
  }
1398

1399
  nodesDestroyNode((SNode *)pSubplan);
459,379✔
1400

1401
  return code;
459,379✔
1402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc