• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4874

04 Dec 2025 01:55AM UTC coverage: 64.623% (+0.07%) from 64.558%
#4874

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

865 of 2219 new or added lines in 36 files covered. (38.98%)

6317 existing lines in 143 files now uncovered.

159543 of 246882 relevant lines covered (64.62%)

106415537.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.98
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
27,451,516✔
26
  if (readLock) {
27,451,516✔
27
    while (taosRTryLockLatch(pLock)) {
24,521,202✔
28
      taosMsleep(1);
85,914✔
29
    }
30

31
    return true;
24,435,288✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
3,016,228✔
35

36
  return true;
3,016,228✔
37
}
38

39
void mstDestroySStmVgStreamStatus(void* p) { 
472,137✔
40
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
472,137✔
41
  taosArrayDestroy(pStatus->trigReaders); 
472,137✔
42
  taosArrayDestroy(pStatus->calcReaders); 
472,137✔
43
}
472,137✔
44

45
void mstDestroySStmSnodeStreamStatus(void* p) { 
220,984✔
46
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
220,984✔
47
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
883,936✔
48
    taosArrayDestroy(pStatus->runners[i]);
662,952✔
49
    pStatus->runners[i] = NULL;
662,952✔
50
  }
51
}
220,984✔
52

53

54
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
55
  taosHashCleanup(pVgStatus->streamTasks);
×
56
  pVgStatus->streamTasks = NULL;
×
57
}
×
58

59
void mstDestroySStmTaskToDeployExt(void* param) {
1,501,140✔
60
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
1,501,140✔
61
  if (pExt->deployed) {
1,501,140✔
62
    return;
1,492,294✔
63
  }
64
  
65
  switch (pExt->deploy.task.type) {
8,846✔
66
    case STREAM_TRIGGER_TASK:
1,579✔
67
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
1,579✔
68
      pExt->deploy.msg.trigger.readerList = NULL;
1,579✔
69
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
1,579✔
70
      pExt->deploy.msg.trigger.runnerList = NULL;
1,579✔
71
      break;
1,579✔
72
    case STREAM_RUNNER_TASK:
4,737✔
73
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
4,737✔
74
      break;
4,737✔
75
    default:  
2,530✔
76
      break;;
2,530✔
77
  }
78
}
79

80
void mstDestroyScanAddrList(void* param) {
389,054✔
81
  if (NULL == param) {
389,054✔
82
    return;
×
83
  }
84
  SArray* pList = *(SArray**)param;
389,054✔
85
  taosArrayDestroy(pList);
389,054✔
86
}
87

88
void mstDestroySStmSnodeTasksDeploy(void* param) {
55,061✔
89
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
55,061✔
90
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
55,061✔
91
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
55,061✔
92
}
55,061✔
93

94
void mstDestroySStmVgTasksToDeploy(void* param) {
155,461✔
95
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
155,461✔
96
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
155,461✔
97
}
155,461✔
98

99
void mstDestroySStmSnodeStatus(void* param) {
41,754✔
100
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
41,754✔
101
  taosHashCleanup(pSnode->streamTasks);
41,754✔
102
}
41,754✔
103

104
void mstDestroySStmVgroupStatus(void* param) {
99,854✔
105
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
99,854✔
106
  taosHashCleanup(pVg->streamTasks);
99,854✔
107
}
99,854✔
108

109
void mstResetSStmStatus(SStmStatus* pStatus) {
211,306✔
110
  (void)mstWaitLock(&pStatus->resetLock, false);
211,306✔
111

112
  taosArrayDestroy(pStatus->trigReaders);
211,306✔
113
  pStatus->trigReaders = NULL;
211,306✔
114
  taosArrayDestroy(pStatus->trigOReaders);
211,306✔
115
  pStatus->trigOReaders = NULL;
211,306✔
116
  pStatus->calcReaders = tdListFree(pStatus->calcReaders);
211,306✔
117
  if (pStatus->triggerTask) {
211,306✔
118
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
211,306✔
119
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
211,306✔
120
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
211,306✔
121
  }
122
  taosMemoryFreeClear(pStatus->triggerTask);
211,306✔
123
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
845,224✔
124
    taosArrayDestroy(pStatus->runners[i]);
633,918✔
125
    pStatus->runners[i] = NULL;
633,918✔
126
  }
127

128
  taosWUnLockLatch(&pStatus->resetLock);
211,306✔
129
}
211,306✔
130

131
void mstDestroySStmStatus(void* param) {
201,435✔
132
  SStmStatus* pStatus = (SStmStatus*)param;
201,435✔
133
  taosMemoryFreeClear(pStatus->streamName);
201,435✔
134

135
  mstResetSStmStatus(pStatus);
201,435✔
136

137
  taosWLockLatch(&pStatus->userRecalcLock);
201,435✔
138
  taosArrayDestroy(pStatus->userRecalcList);
201,435✔
139
  taosWUnLockLatch(&pStatus->userRecalcLock);
201,435✔
140

141
  tFreeSCMCreateStreamReq(pStatus->pCreate);
201,435✔
142
  taosMemoryFreeClear(pStatus->pCreate);  
201,435✔
143
}
201,435✔
144

145
void mstDestroySStmAction(void* param) {
319,290✔
146
  SStmAction* pAction = (SStmAction*)param;
319,290✔
147

148
  taosArrayDestroy(pAction->undeploy.taskList);
319,290✔
149
  taosArrayDestroy(pAction->recalc.recalcList);
319,290✔
150
}
319,290✔
151

152
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
291,295✔
153
  pDeploy->readerTasks = NULL;
291,295✔
154
  pDeploy->triggerTask = NULL;
291,295✔
155
  pDeploy->runnerTasks = NULL;
291,295✔
156
}
291,295✔
157

158
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
88,952✔
159
  SSdb   *pSdb = pMnode->pSdb;
88,952✔
160
  void   *pIter = NULL;
88,952✔
161
  
162
  while (1) {
501,368✔
163
    SStreamObj *pStream = NULL;
590,320✔
164
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
590,320✔
165
    if (pIter == NULL) break;
590,320✔
166

167
    if (pStream->pCreate->streamId == streamId) {
563,810✔
168
      *dropped = pStream->userDropped ? true : false;
62,442✔
169
      sdbRelease(pSdb, pStream);
62,442✔
170
      sdbCancelFetch(pSdb, pIter);
62,442✔
171
      mstsDebug("stream found, dropped:%d", *dropped);
62,442✔
172
      return TSDB_CODE_SUCCESS;
62,442✔
173
    }
174
    
175
    sdbRelease(pSdb, pStream);
501,368✔
176
  }
177

178
  *dropped = true;
26,510✔
179

180
  return TSDB_CODE_SUCCESS;
26,510✔
181
}
182

183
typedef struct SStmCheckDbInUseCtx {
184
  bool* dbStream;
185
  bool* vtableStream;
186
  bool  ignoreCurrDb;
187
} SStmCheckDbInUseCtx;
188

189
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
67,160✔
190
  SStreamObj *pStream = pObj;
67,160✔
191
  if (atomic_load_8(&pStream->userDropped)) {
67,160✔
192
    return true;
×
193
  }
194

195
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
67,160✔
196
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
67,160✔
197
    return true;
4,622✔
198
  }
199
  
200
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
62,538✔
201
    *pCtx->dbStream = true;
1,609✔
202
    return false;
1,609✔
203
  }
204

205
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
60,929✔
206
  for (int32_t i = 0; i < calcDBNum; ++i) {
120,631✔
207
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
60,929✔
208
    if (0 == strcmp(calcDB, p1)) {
60,929✔
209
      *pCtx->dbStream = true;
1,227✔
210
      return false;
1,227✔
211
    }
212
  }
213

214
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
59,702✔
215
    *pCtx->vtableStream = true;
17,938✔
216
    return true;
17,938✔
217
  }
218
  
219
  return true;
41,764✔
220
}
221

222
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
741,648✔
223
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
741,648✔
224
  if (streamNum <= 0) {
741,648✔
225
    return;
730,659✔
226
  }
227

228
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
10,989✔
229
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
10,989✔
230
}
231

232
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
233
  if (status == STREAM_STATUS_INIT) {
×
234
    tstrncpy(dst, "init", bufLen);
×
235
  } else if (status == STREAM_STATUS_RUNNING) {
×
236
    tstrncpy(dst, "running", bufLen);
×
237
  } else if (status == STREAM_STATUS_STOPPED) {
×
238
    tstrncpy(dst, "stopped", bufLen);
×
239
  } else if (status == STREAM_STATUS_FAILED) {
×
240
    tstrncpy(dst, "failed", bufLen);
×
241
  }
242
}
×
243

244
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
245
  SSdb      *pSdb = pMnode->pSdb;
×
246
  void      *pIter = NULL;
×
247
  SSnodeObj *pObj = NULL;
×
248

249
  while (1) {
250
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
251
    if (pIter == NULL) {
×
252
      break;
×
253
    }
254

255
    sdbRelease(pSdb, pObj);
×
256
    sdbCancelFetch(pSdb, pIter);
×
257
    return TSDB_CODE_SUCCESS;
×
258
  }
259

260
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
261
}
262

263
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
264
  pTask->id.taskId = pMsg->taskId;
×
265
  pTask->id.deployId = pMsg->deployId;
×
266
  pTask->id.seriousId = pMsg->seriousId;
×
267
  pTask->id.nodeId = pMsg->nodeId;
×
268
  pTask->id.taskIdx = pMsg->taskIdx;
×
269

270
  pTask->type = pMsg->type;
×
271
  pTask->flags = pMsg->flags;
×
272
  pTask->status = pMsg->status;
×
273
  pTask->lastUpTs = pCtx->currTs;
×
274
}
×
275

276
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
695,491✔
277
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
695,491✔
278
    return false;
479,134✔
279
  }
280

281
  SStmQNode *orig = pQueue->head;
216,357✔
282

283
  SStmQNode *node = pQueue->head->next;
216,357✔
284
  pQueue->head = pQueue->head->next;
216,357✔
285

286
  *param = node;
216,357✔
287

288
  taosMemoryFreeClear(orig);
216,357✔
289

290
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
216,357✔
291

292
  return true;
216,357✔
293
}
294

295
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
216,357✔
296
  taosWLockLatch(&pQueue->lock);
216,357✔
297
  pQueue->tail->next = param;
216,357✔
298
  pQueue->tail = param;
216,357✔
299
  taosWUnLockLatch(&pQueue->lock);
216,357✔
300

301
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
216,357✔
302
}
216,357✔
303

304
char* mstGetStreamActionString(int32_t action) {
164,551✔
305
  switch (action) {
164,551✔
306
    case STREAM_ACT_DEPLOY:
164,551✔
307
      return "DEPLOY";
164,551✔
308
    case STREAM_ACT_UNDEPLOY:
×
309
      return "UNDEPLOY";
×
310
    case STREAM_ACT_START:
×
311
      return "START";
×
312
    case STREAM_ACT_UPDATE_TRIGGER:
×
313
      return "UPDATE TRIGGER";
×
314
    case STREAM_ACT_RECALC:
×
315
      return "USER RECALC";
×
316
    default:
×
317
      break;
×
318
  }
319

320
  return "UNKNOWN";
×
321
}
322

323
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
212,903✔
324
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
212,903✔
325
  if (NULL == pNode) {
212,903✔
326
    taosMemoryFreeClear(param);
×
327
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
328
    return;
×
329
  }
330

331
  pNode->type = action;
212,903✔
332
  pNode->streamAct = true;
212,903✔
333
  pNode->action.stream.streamId = streamId;
212,903✔
334
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
212,903✔
335
  pNode->action.stream.userAction = userAction;
212,903✔
336
  pNode->action.stream.actionParam = param;
212,903✔
337
  
338
  pNode->next = NULL;
212,903✔
339

340
  mndStreamActionEnqueue(actionQ, pNode);
212,903✔
341

342
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
212,903✔
343
}
344

345
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
3,454✔
346
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
3,454✔
347
  if (NULL == pNode) {
3,454✔
348
    int64_t streamId = pAction->streamId;
×
349
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
350
    return;
×
351
  }
352

353
  pNode->type = action;
3,454✔
354
  pNode->streamAct = false;
3,454✔
355
  pNode->action.task = *pAction;
3,454✔
356
  
357
  pNode->next = NULL;
3,454✔
358

359
  mndStreamActionEnqueue(actionQ, pNode);
3,454✔
360
}
361

362
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
76,017✔
363
  int32_t iter = 0;
76,017✔
364
  SDBVgHashInfo* pVg = NULL;
76,017✔
365
  void* p = NULL;
76,017✔
366
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
377,112✔
367
    pVg = (SDBVgHashInfo*)p;
301,095✔
368
    taosArrayDestroy(pVg->vgArray);
301,095✔
369
  }
370
  
371
  tSimpleHashCleanup(pDbVgs);
76,017✔
372
}
76,017✔
373

374

375
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
76,017✔
376
  void*   pIter = NULL;
76,017✔
377
  int32_t code = TSDB_CODE_SUCCESS;
76,017✔
378
  int32_t lino = 0;
76,017✔
379
  SArray* pTarget = NULL;
76,017✔
380
  SArray* pNew = NULL;
76,017✔
381
  SDbObj* pDb = NULL;
76,017✔
382
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
76,017✔
383
  SVgObj* pVgroup = NULL;
76,017✔
384

385
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
76,017✔
386
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
76,017✔
387

388
  while (1) {
469,847✔
389
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
545,864✔
390
    if (pIter == NULL) {
545,864✔
391
      break;
76,017✔
392
    }
393

394
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
469,847✔
395
    if (NULL == pDbInfo) {
469,847✔
396
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
301,095✔
397
      if (NULL == pNew) {
301,095✔
398
        sdbRelease(pMnode->pSdb, pVgroup);
×
399
        sdbCancelFetch(pMnode->pSdb, pIter);
×
400
        pVgroup = NULL;
×
401
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
402
      }
403
      
404
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
301,095✔
405
      if (NULL == pDb) {
301,095✔
406
        sdbRelease(pMnode->pSdb, pVgroup);
×
407
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
408
        pVgroup = NULL;
×
409
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
410
      }
411
      dbInfo.vgSorted = false;
301,095✔
412
      dbInfo.hashMethod = pDb->cfg.hashMethod;
301,095✔
413
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
301,095✔
414
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
301,095✔
415
      dbInfo.vgArray = pNew;
301,095✔
416
      
417
      mndReleaseDb(pMnode, pDb);
301,095✔
418

419
      pTarget = pNew;
301,095✔
420
    } else {
421
      pTarget = pDbInfo->vgArray;
168,752✔
422
    }
423

424
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
469,847✔
425
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
469,847✔
426
      sdbRelease(pMnode->pSdb, pVgroup);
×
427
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
428
      pVgroup = NULL;
×
429
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
430
    }
431

432
    if (NULL == pDbInfo) {
469,847✔
433
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
301,095✔
434
      if (code) {
301,095✔
435
        sdbRelease(pMnode->pSdb, pVgroup);
×
436
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
437
        pVgroup = NULL;
×
438
        TAOS_CHECK_EXIT(code);
×
439
      }
440
      pNew = NULL;
301,095✔
441
    }
442

443
    sdbRelease(pMnode->pSdb, pVgroup);
469,847✔
444
    pVgroup = NULL;
469,847✔
445
  }
446

447
  *ppRes = pDbVgroup;
76,017✔
448
  
449
_exit:
76,017✔
450

451
  taosArrayDestroy(pNew);
76,017✔
452

453
  if (code) {
76,017✔
454
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
455
  }
456

457
  return code;
76,017✔
458
}
459

460
int mstDbVgInfoComp(const void* lp, const void* rp) {
121,934✔
461
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
121,934✔
462
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
121,934✔
463
  if (pLeft->hashBegin < pRight->hashBegin) {
121,934✔
464
    return -1;
121,934✔
465
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
466
    return 1;
×
467
  }
468

469
  return 0;
×
470
}
471

472
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
287,009✔
473
  uint32_t*    key = (uint32_t*)lp;
287,009✔
474
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
287,009✔
475

476
  if (*key < pVg->hashBegin) {
287,009✔
477
    return -1;
×
478
  } else if (*key > pVg->hashEnd) {
287,009✔
479
    return 1;
75,180✔
480
  }
481

482
  return 0;
211,829✔
483
}
484

485

486
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
211,829✔
487
  int32_t code = 0;
211,829✔
488
  int32_t lino = 0;
211,829✔
489
  SVgroupInfo* vgInfo = NULL;
211,829✔
490
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
211,829✔
491

492
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
211,829✔
493
  if (NULL == dbInfo) {
211,829✔
494
    mstError("db %s does not exist", dbFName);
×
495
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
496
  }
497
  
498
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
211,829✔
499
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
423,658✔
500
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
211,829✔
501

502
  if (!dbInfo->vgSorted) {
211,829✔
503
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
76,420✔
504
    dbInfo->vgSorted = true;
76,420✔
505
  }
506

507
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
211,829✔
508
  if (NULL == vgInfo) {
211,829✔
509
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
510
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
511
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
512
  }
513

514
  *vgId = vgInfo->vgId;
211,829✔
515

516
_exit:
211,829✔
517

518
  if (code) {
211,829✔
519
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
520
  }
521

522
  return code;
211,829✔
523
}
524

525

526
void mstLogSStreamObj(char* tips, SStreamObj* p) {
200,387✔
527
  if (!(stDebugFlag & DEBUG_DEBUG)) {
200,387✔
528
    return;
47,948✔
529
  }
530
  
531
  if (NULL == p) {
152,439✔
532
    mstDebug("%s: stream is NULL", tips);
×
533
    return;
×
534
  }
535

536
  mstDebug("%s: stream obj", tips);
152,439✔
537
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
152,439✔
538
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
539

540
  SCMCreateStreamReq* q = p->pCreate;
152,439✔
541
  if (NULL == q) {
152,439✔
542
    mstDebug("stream pCreate is NULL");
×
543
    return;
×
544
  }
545

546
  int64_t streamId = q->streamId;
152,439✔
547
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
152,439✔
548
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
152,439✔
549
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
152,439✔
550
  int32_t outColNum = taosArrayGetSize(q->outCols);
152,439✔
551
  int32_t outTagNum = taosArrayGetSize(q->outTags);
152,439✔
552
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
152,439✔
553

554
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
152,439✔
555
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
556
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d "
557
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
558
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
559
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
560
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
561
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
562
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
563
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory,
564
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
565
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
566
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
567
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
568

569
  switch (q->triggerType) {
152,439✔
570
    case WINDOW_TYPE_INTERVAL: {
79,387✔
571
      SSlidingTrigger* t = &q->trigger.sliding;
79,387✔
572
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
79,387✔
573
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
574
      break;
79,387✔
575
    }  
576
    case WINDOW_TYPE_SESSION: {
9,730✔
577
      SSessionTrigger* t = &q->trigger.session;
9,730✔
578
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
9,730✔
579
      break;
9,730✔
580
    }
581
    case WINDOW_TYPE_STATE: {
20,383✔
582
      SStateWinTrigger* t = &q->trigger.stateWin;
20,383✔
583
      mstsDebug("state trigger options, slotId:%d, expr:%s, extend:%d, zeroth:%s, trueForDuration:%" PRId64, t->slotId, (char *)t->expr, t->extend, (char *)t->zeroth, t->trueForDuration);
20,383✔
584
      break;
20,383✔
585
    }
586
    case WINDOW_TYPE_EVENT:{
24,211✔
587
      SEventTrigger* t = &q->trigger.event;
24,211✔
588
      mstsDebug("event trigger options, startCond:%s, endCond:%s, trueForDuration:%" PRId64, (char*)t->startCond, (char*)t->endCond, t->trueForDuration);
24,211✔
589
      break;
24,211✔
590
    }
591
    case WINDOW_TYPE_COUNT: {
9,589✔
592
      SCountTrigger* t = &q->trigger.count;
9,589✔
593
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
9,589✔
594
      break;
9,589✔
595
    }
596
    case WINDOW_TYPE_PERIOD: {
9,139✔
597
      SPeriodTrigger* t = &q->trigger.period;
9,139✔
598
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
9,139✔
599
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
600
      break;
9,139✔
601
    }
602
    default:
×
603
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
604
      break;
×
605
  }
606

607
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
152,439✔
608

609
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
152,439✔
610

611
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
152,439✔
612

613
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
152,439✔
614

615
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
152,439✔
616

617
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
152,439✔
618

619

620
  for (int32_t i = 0; i < calcDBNum; ++i) {
301,326✔
621
    char* dbName = taosArrayGetP(q->calcDB, i);
148,887✔
622
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
148,887✔
623
  }
624

625
  for (int32_t i = 0; i < calcScanNum; ++i) {
467,866✔
626
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
315,427✔
627
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
315,427✔
628
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
315,427✔
629
    for (int32_t v = 0; v < vgNum; ++v) {
630,854✔
630
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
315,427✔
631
    }
632
  }
633

634
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
207,629✔
635
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
55,190✔
636
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
55,190✔
637
  }
638

639
  for (int32_t i = 0; i < outColNum; ++i) {
763,105✔
640
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
610,666✔
641
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
610,666✔
642
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
643
  }
644
      
645
}
646

647
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
1,151,010✔
648
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
1,151,010✔
649
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
650
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
651
}
1,151,010✔
652

653
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
211,306✔
654
  if (!(stDebugFlag & DEBUG_DEBUG)) {
211,306✔
655
    return;
48,352✔
656
  }
657
  
658
  if (NULL == p) {
162,954✔
659
    mstsDebug("%s: stream status is NULL", tips);
×
660
    return;
×
661
  }
662

663
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
162,954✔
664
  int32_t trigOReaderNum = taosArrayGetSize(p->trigOReaders);
162,954✔
665
  int32_t calcReaderNum = MST_LIST_SIZE(p->calcReaders);
162,954✔
666
  int32_t triggerNum = p->triggerTask ? 1 : 0;
162,954✔
667
  int32_t runnerNum = 0;
162,954✔
668

669
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
641,160✔
670
    runnerNum += taosArrayGetSize(p->runners[i]);
478,206✔
671
  }
672

673
  mstsDebug("%s: stream status", tips);
162,954✔
674
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
162,954✔
675
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
676
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
677
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
678

679
  SStmTaskStatus* pTask = NULL;
162,954✔
680
  for (int32_t i = 0; i < trigReaderNum; ++i) {
374,617✔
681
    pTask = taosArrayGet(p->trigReaders, i);
211,663✔
682
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
211,663✔
683
  }
684

685
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
162,954✔
686
    pTask = taosArrayGet(p->trigOReaders, i);
×
687
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
688
  }
689

690
  SListNode* pNode = listHead(p->calcReaders);
162,954✔
691
  for (int32_t i = 0; i < calcReaderNum; ++i) {
424,535✔
692
    pTask = (SStmTaskStatus*)pNode->data;
261,581✔
693
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
261,581✔
694
    pNode = TD_DLIST_NODE_NEXT(pNode);
261,581✔
695
  }
696

697
  if (triggerNum > 0) {
162,954✔
698
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
162,954✔
699
  }
700

701
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
641,160✔
702
    int32_t num = taosArrayGetSize(p->runners[i]);
478,206✔
703
    if (num <= 0) {
478,206✔
704
      continue;
×
705
    }
706
    
707
    mstsDebug("the %dth deploy runners status", i);
478,206✔
708
    for (int32_t m = 0; m < num; ++m) {
993,018✔
709
      pTask = taosArrayGet(p->runners[i], m);
514,812✔
710
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
514,812✔
711
    }
712
  }
713
      
714
}
715

716
bool mstEventPassIsolation(int32_t num, int32_t event) {
4,431,935✔
717
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_SHORT_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
4,431,935✔
718
  if (ret) {
4,431,935✔
719
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
3,299,551✔
720
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
721
  }
722

723
  return ret;
4,431,935✔
724
}
725

726
bool mstEventHandledChkSet(int32_t event) {
3,299,551✔
727
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
3,299,551✔
728
    mstDebug("event %s set handled", gMndStreamEvent[event]);
193,437✔
729
    return true;
193,437✔
730
  }
731
  return false;
3,106,114✔
732
}
733

734
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
785,532✔
735
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
785,532✔
736
  if (0 == active || MND_STM_STATE_NORMAL != state) {
785,532✔
737
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
1,869✔
738
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
1,869✔
739
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
1,869✔
740
    return TSDB_CODE_SUCCESS;
1,869✔
741
  }
742

743
  if (atomic_load_8(&pStream->userDropped)) {
783,663✔
744
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
745
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
746
    return TSDB_CODE_SUCCESS;
×
747
  }
748

749
  if (atomic_load_8(&pStream->userStopped)) {
783,663✔
750
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
8,672✔
751
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
8,672✔
752
    return TSDB_CODE_SUCCESS;
8,672✔
753
  }
754

755
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
774,991✔
756
  
757
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
774,991✔
758
  if (NULL == pStatus) {
774,991✔
759
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
123,075✔
760
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
123,075✔
761
    goto _exit;
123,075✔
762
  }
763

764
  char tmpBuf[256];
651,916✔
765
  int8_t stopped = atomic_load_8(&pStatus->stopped);
651,916✔
766
  switch (stopped) {
651,916✔
767
    case 1:
24,452✔
768
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
24,452✔
769
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
24,452✔
770
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
24,452✔
771
      goto _exit;
24,452✔
772
      break;
773
    case 4:
×
774
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
775
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
776
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
777
      goto _exit;
×
778
      break;
779
    default:
627,464✔
780
      break;
627,464✔
781
  }
782

783
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
627,464✔
784
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
339,213✔
785
    strcpy(tmpBuf, "Running start from: ");
339,213✔
786
    (void)formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
339,213✔
787
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
339,213✔
788
    goto _exit;
339,213✔
789
  }
790

791
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
288,251✔
792
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
288,251✔
793
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
288,251✔
794
  goto _exit;
288,251✔
795

796
_exit:
774,991✔
797
  
798
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
774,991✔
799

800
  return TSDB_CODE_SUCCESS;
774,991✔
801
}
802

803
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
785,532✔
804
  int32_t code = 0;
785,532✔
805
  int32_t cols = 0;
785,532✔
806
  int32_t lino = 0;
785,532✔
807

808
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
785,532✔
809
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
785,532✔
810
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
811
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
812

813
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
785,532✔
814
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
815

816
  // db_name
817
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
785,532✔
818
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
785,532✔
819
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
820
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
821
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
785,532✔
822
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
823

824
  // create time
825
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
826
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
827
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
785,532✔
828
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
829

830
  // stream id
831
  char streamId2[19] = {0};
785,532✔
832
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
785,532✔
833
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
785,532✔
834
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
785,532✔
835
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
836
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
837
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
785,532✔
838
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
839

840
  // sql
841
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
785,532✔
842
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
785,532✔
843
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
844
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
845
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
785,532✔
846
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
847

848
  // status
849
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
785,532✔
850
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
785,532✔
851
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
785,532✔
852
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
853

854
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
855
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
856
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
785,532✔
857
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
858

859
  // snodeLeader
860
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
861
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
862
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
785,532✔
863
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
864

865
  // snodeReplica
866
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
867
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
868
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
785,532✔
869
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
785,532✔
870
  mndReleaseSnode(pMnode, pSnode);
785,532✔
871
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
785,532✔
872
  TSDB_CHECK_CODE(code, lino, _end);
785,532✔
873

874
  // msg
875
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
785,532✔
876
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
785,532✔
877
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
785,532✔
878

879
_end:
785,532✔
880
  if (code) {
785,532✔
881
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
882
  }
883
  return code;
785,532✔
884
}
885

886

887
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
19,408,708✔
888
  char tmpBuf[256];
19,186,780✔
889
  
890
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
19,408,708✔
891
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
19,408,708✔
892
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
×
893
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
894
    return TSDB_CODE_SUCCESS;
×
895
  }
896

897
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
19,408,708✔
898
    if (pTask->detailStatus) {
2,820,602✔
899
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
2,324,832✔
900
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
2,324,832✔
901
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
902
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
2,324,832✔
903
      taosRUnLockLatch(&pTask->detailStatusLock);
2,324,832✔
904
      return TSDB_CODE_SUCCESS;
2,324,832✔
905
    }
906

907
    taosRUnLockLatch(&pTask->detailStatusLock);    
495,770✔
908
  }
909
  
910
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
17,083,876✔
911
  
912
  return TSDB_CODE_SUCCESS;
17,083,876✔
913
}
914

915
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
19,408,708✔
916
  switch (pTask->type) {
19,408,708✔
917
    case STREAM_READER_TASK:
7,933,590✔
918
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
7,933,590✔
919
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
3,706,360✔
920
      } else {
921
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
4,227,230✔
922
      }
923
      return TSDB_CODE_SUCCESS;
7,933,590✔
924
    case STREAM_RUNNER_TASK:
8,654,516✔
925
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
8,654,516✔
926
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
8,306,150✔
927
        return TSDB_CODE_SUCCESS;
8,306,150✔
928
      }
929
      break;
348,366✔
930
    default:
2,820,602✔
931
      break;
2,820,602✔
932
  }
933

934
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
3,168,968✔
935
  return TSDB_CODE_SUCCESS;
3,168,968✔
936
}
937

938

939
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
19,408,708✔
940
  int32_t code = 0;
19,408,708✔
941
  int32_t cols = 0;
19,408,708✔
942
  int32_t lino = 0;
19,408,708✔
943

944
  // stream_name
945
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
19,408,708✔
946
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
19,408,708✔
947
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
948
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
949

950
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
19,408,708✔
951
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
952

953
  // stream id
954
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
19,408,708✔
955
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
19,408,708✔
956
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
19,408,708✔
957
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
958
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
959
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
19,408,708✔
960
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
961

962
  // task id
963
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
19,408,708✔
964
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
19,408,708✔
965
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
966
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
967
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
19,408,708✔
968
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
969

970
  // type
971
  char type[20 + VARSTR_HEADER_SIZE] = {0};
19,408,708✔
972
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
19,408,708✔
973
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
974
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
975
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
19,408,708✔
976
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
977

978
  // serious id
979
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
19,408,708✔
980
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
19,408,708✔
981
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
982
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
983
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
19,408,708✔
984
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
985

986
  // deploy id
987
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
988
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
989
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
19,408,708✔
990
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
991

992
  // node_type
993
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
19,408,708✔
994
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
19,408,708✔
995
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
996
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
997
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
19,408,708✔
998
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
999

1000
  // node id
1001
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
1002
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
1003
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
19,408,708✔
1004
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
1005

1006
  // task idx
1007
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
1008
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
1009
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
19,408,708✔
1010
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
1011

1012
  // status
1013
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
19,408,708✔
1014
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
19,408,708✔
1015
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
19,408,708✔
1016
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
1017

1018
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
1019
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
1020
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
19,408,708✔
1021
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
1022

1023
  // start time
1024
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
1025
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
1026
  if (pTask->runningStartTs) {
19,408,708✔
1027
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
4,096,763✔
1028
  } else {
1029
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
15,311,945✔
1030
  }
1031
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
1032

1033
  // last update
1034
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
1035
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
1036
  if (pTask->lastUpTs) {
19,408,708✔
1037
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
19,408,708✔
1038
  } else {
1039
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1040
  }
1041
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
1042

1043
  // extra info
1044
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
19,408,708✔
1045
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
19,408,708✔
1046
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
1047
  
1048
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
1049
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
1050
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
19,408,708✔
1051
  TSDB_CHECK_CODE(code, lino, _end);
19,408,708✔
1052

1053
  // msg
1054
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
19,408,708✔
1055
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
19,408,708✔
1056
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
19,408,708✔
1057

1058
_end:
19,408,708✔
1059
  if (code) {
19,408,708✔
1060
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1061
  }
1062
  return code;
19,408,708✔
1063
}
1064

1065
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
2,824,997✔
1066
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + taosArrayGetSize(pStatus->trigOReaders) + MST_LIST_SIZE(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
2,824,997✔
1067
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
11,299,988✔
1068
    num += taosArrayGetSize(pStatus->runners[i]);
8,474,991✔
1069
  }
1070

1071
  return num;
2,824,997✔
1072
}
1073

1074
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
3,105,513✔
1075
  int32_t code = 0;
3,105,513✔
1076
  int32_t lino = 0;
3,105,513✔
1077
  int64_t streamId = pStream->pCreate->streamId;
3,105,513✔
1078
  bool    statusLocked = false;
3,105,513✔
1079

1080
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
3,105,513✔
1081

1082
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
3,105,513✔
1083
  if (NULL == pStatus) {
3,105,513✔
1084
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
189,252✔
1085
    goto _exit;
189,252✔
1086
  }
1087

1088
  int8_t stopped = atomic_load_8(&pStatus->stopped);
2,916,261✔
1089
  if (stopped) {
2,916,261✔
1090
    mstsDebug("stream stopped %d, ignore it", stopped);
91,264✔
1091
    goto _exit;
91,264✔
1092
  }
1093

1094
  (void)mstWaitLock(&pStatus->resetLock, true);
2,824,997✔
1095
  statusLocked = true;
2,824,997✔
1096
  
1097
  int32_t count = mstGetNumOfStreamTasks(pStatus);
2,824,997✔
1098

1099
  if (*numOfRows + count > rowsCapacity) {
2,824,997✔
1100
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
88,386✔
1101
    if (code) {
88,386✔
1102
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1103
      TAOS_CHECK_EXIT(code);
×
1104
    }
1105
  }
1106

1107
  SStmTaskStatus* pTask = NULL;
2,824,997✔
1108
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
2,824,997✔
1109
  for (int32_t i = 0; i < trigReaderNum; ++i) {
5,924,555✔
1110
    pTask = taosArrayGet(pStatus->trigReaders, i);
3,099,558✔
1111
  
1112
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
3,099,558✔
1113
    if (code == TSDB_CODE_SUCCESS) {
3,099,558✔
1114
      (*numOfRows)++;
3,099,558✔
1115
    }
1116
  }
1117

1118
  trigReaderNum = taosArrayGetSize(pStatus->trigOReaders);
2,824,997✔
1119
  for (int32_t i = 0; i < trigReaderNum; ++i) {
3,431,799✔
1120
    pTask = taosArrayGet(pStatus->trigOReaders, i);
606,802✔
1121
  
1122
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
606,802✔
1123
    if (code == TSDB_CODE_SUCCESS) {
606,802✔
1124
      (*numOfRows)++;
606,802✔
1125
    }
1126
  }
1127

1128
  if (pStatus->calcReaders) {
2,824,997✔
1129
    int32_t calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
2,824,997✔
1130
    SListNode* pNode = listHead(pStatus->calcReaders);
2,824,997✔
1131
    for (int32_t i = 0; i < calcReaderNum; ++i) {
7,052,227✔
1132
      pTask = (SStmTaskStatus*)pNode->data;
4,227,230✔
1133
    
1134
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
4,227,230✔
1135
      if (code == TSDB_CODE_SUCCESS) {
4,227,230✔
1136
        (*numOfRows)++;
4,227,230✔
1137
      }
1138
      pNode = TD_DLIST_NODE_NEXT(pNode);
4,227,230✔
1139
    }
1140
  }
1141

1142
  if (pStatus->triggerTask) {
2,824,997✔
1143
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
2,820,602✔
1144
    if (code == TSDB_CODE_SUCCESS) {
2,820,602✔
1145
      (*numOfRows)++;
2,820,602✔
1146
    }
1147
  }
1148

1149
  int32_t runnerNum = 0;
2,824,997✔
1150
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
11,299,988✔
1151
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
8,474,991✔
1152
    for (int32_t m = 0; m < runnerNum; ++m) {
17,129,507✔
1153
      pTask = taosArrayGet(pStatus->runners[i], m);
8,654,516✔
1154
    
1155
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
8,654,516✔
1156
      if (code == TSDB_CODE_SUCCESS) {
8,654,516✔
1157
        (*numOfRows)++;
8,654,516✔
1158
      }
1159
    }
1160
  }
1161
  
1162
  pBlock->info.rows = *numOfRows;
2,824,997✔
1163

1164
_exit:
3,105,513✔
1165

1166
  if (statusLocked) {
3,105,513✔
1167
    taosRUnLockLatch(&pStatus->resetLock);
2,824,997✔
1168
  }
1169
  
1170
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
3,105,513✔
1171

1172
  if (code) {
3,105,513✔
UNCOV
1173
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1174
  }
1175
  
1176
  return code;
3,105,513✔
1177
}
1178

1179

1180
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
6,723✔
1181
  int32_t code = 0;
6,723✔
1182
  int32_t lino = 0;
6,723✔
1183
  bool    locked = false;
6,723✔
1184
  SArray* userRecalcList = NULL;
6,723✔
1185

1186
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
6,723✔
1187
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
6,723✔
1188
  
1189
  taosWLockLatch(&pStream->userRecalcLock);
6,723✔
1190
  locked = true;
6,723✔
1191
  
1192
  if (NULL == pStream->userRecalcList) {
6,723✔
1193
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
6,723✔
1194
    if (NULL == userRecalcList) {
6,723✔
UNCOV
1195
      TAOS_CHECK_EXIT(terrno);
×
1196
    }
1197

1198
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
6,723✔
1199

1200
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
6,723✔
1201
    userRecalcList = NULL;    
6,723✔
1202
  } else {
UNCOV
1203
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1204
  }
1205
  
1206
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
6,723✔
1207

1208
_exit:
6,723✔
1209

1210
  taosArrayDestroy(userRecalcList);
6,723✔
1211

1212
  if (locked) {
6,723✔
1213
    taosWUnLockLatch(&pStream->userRecalcLock);
6,723✔
1214
  }
1215
  
1216
  if (code) {
6,723✔
UNCOV
1217
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1218
  }
1219
  
1220
  return code;
6,723✔
1221
}
1222

1223

1224

UNCOV
1225
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1226
  int32_t code = 0;
×
1227
  int32_t cols = 0;
×
1228
  int32_t lino = 0;
×
1229

1230
  // stream_name
UNCOV
1231
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1232
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1233
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1234
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1235

UNCOV
1236
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1237
  TSDB_CHECK_CODE(code, lino, _end);
×
1238

1239
  // stream id
UNCOV
1240
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1241
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1242
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1243
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1244
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1245
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1246
  TSDB_CHECK_CODE(code, lino, _end);
×
1247

1248
  // recalc id
UNCOV
1249
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1250
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1251
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1252
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1253
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1254
  TSDB_CHECK_CODE(code, lino, _end);
×
1255

1256
  // start
UNCOV
1257
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1258
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1259
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1260
  TSDB_CHECK_CODE(code, lino, _end);
×
1261

1262
  // end
UNCOV
1263
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1264
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1265
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1266
  TSDB_CHECK_CODE(code, lino, _end);
×
1267

1268
  // progress
UNCOV
1269
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1270
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1271
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1272
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1273
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1274
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1275
  TSDB_CHECK_CODE(code, lino, _end);
×
1276

UNCOV
1277
_end:
×
1278
  if (code) {
×
1279
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1280
  }
UNCOV
1281
  return code;
×
1282
}
1283

1284

1285
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
435✔
1286
  int32_t code = 0;
435✔
1287
  int32_t lino = 0;
435✔
1288
  int64_t streamId = pStream->pCreate->streamId;
435✔
1289

1290
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
435✔
1291

1292
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
435✔
1293
  if (NULL == pStatus) {
435✔
UNCOV
1294
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1295
    goto _exit;
×
1296
  }
1297

1298
  int8_t stopped = atomic_load_8(&pStatus->stopped);
435✔
1299
  if (stopped) {
435✔
UNCOV
1300
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1301
    goto _exit;
×
1302
  }
1303

1304
  if (NULL == pStatus->triggerTask) {
435✔
UNCOV
1305
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1306
    goto _exit;
×
1307
  }
1308

1309
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
435✔
1310
  if (NULL == pStatus->triggerTask->detailStatus) {
435✔
UNCOV
1311
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1312
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1313
    goto _exit;
×
1314
  }
1315

1316
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
435✔
1317
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
435✔
1318

1319
  if (*numOfRows + count > rowsCapacity) {
435✔
UNCOV
1320
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1321
    if (code) {
×
1322
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1323
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1324
      TAOS_CHECK_EXIT(code);
×
1325
    }
1326
  }
1327

1328
  for (int32_t i = 0; i < count; ++i) {
435✔
UNCOV
1329
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1330
  
UNCOV
1331
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1332
    if (code == TSDB_CODE_SUCCESS) {
×
1333
      (*numOfRows)++;
×
1334
    }
1335
  }
1336

1337
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
435✔
1338
  
1339
  pBlock->info.rows = *numOfRows;
435✔
1340

1341
_exit:
435✔
1342
  
1343
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
435✔
1344

1345
  if (code) {
435✔
UNCOV
1346
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1347
  }
1348
  
1349
  return code;
435✔
1350
}
1351

1352
int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) {
294,008✔
1353
  SSubplan* pSubplan = NULL;
294,008✔
1354
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
294,008✔
1355
  
1356
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
294,008✔
1357

1358
  if (pSubplan->pNode && nodeType(pSubplan->pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
294,008✔
1359
    SScanPhysiNode* pScanNode = (SScanPhysiNode*)pSubplan->pNode;
112,174✔
1360
    *uid = pScanNode->uid;
112,174✔
1361
  }
1362
  
1363
_exit:
289,535✔
1364

1365
  if (code) {
294,008✔
UNCOV
1366
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1367
  }
1368

1369
  nodesDestroyNode((SNode *)pSubplan);
294,008✔
1370

1371
  return code;
294,008✔
1372
}
1373

1374

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc