• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4740

18 Sep 2025 04:31AM UTC coverage: 58.139% (-0.9%) from 59.028%
#4740

push

travis-ci

web-flow
fix: clear parse csv error syntax error msg (#33000)

133663 of 293099 branches covered (45.6%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

4143 existing lines in 175 files now uncovered.

202241 of 284660 relevant lines covered (71.05%)

5584206.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.98
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
47,005✔
26
  if (readLock) {
47,005✔
27
    while (taosRTryLockLatch(pLock)) {
43,659✔
28
      taosMsleep(1);
137✔
29
    }
30

31
    return true;
43,522✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
3,483✔
35

36
  return true;
3,483✔
37
}
38

39
void mndStreamDestroySStreamMgmtRsp(SStreamMgmtRsp* p) {
×
40
  taosArrayDestroy(p->cont.vgIds);
×
41
  taosArrayDestroy(p->cont.readerList);
×
42
}
×
43

44
void mstDestroySStmVgStreamStatus(void* p) { 
650✔
45
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
650✔
46
  taosArrayDestroy(pStatus->trigReaders); 
650✔
47
  taosArrayDestroy(pStatus->calcReaders); 
650✔
48
}
650✔
49

50
void mstDestroySStmSnodeStreamStatus(void* p) { 
409✔
51
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
409✔
52
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,636✔
53
    taosArrayDestroy(pStatus->runners[i]);
1,227✔
54
    pStatus->runners[i] = NULL;
1,227✔
55
  }
56
}
409✔
57

58

59
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
60
  taosHashCleanup(pVgStatus->streamTasks);
×
61
  pVgStatus->streamTasks = NULL;
×
62
}
×
63

64
void mstDestroySStmTaskToDeployExt(void* param) {
2,368✔
65
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
2,368✔
66
  if (pExt->deployed) {
2,368✔
67
    return;
2,349✔
68
  }
69
  
70
  switch (pExt->deploy.task.type) {
19✔
71
    case STREAM_TRIGGER_TASK:
3✔
72
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
3✔
73
      pExt->deploy.msg.trigger.readerList = NULL;
3✔
74
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
3✔
75
      pExt->deploy.msg.trigger.runnerList = NULL;
3✔
76
      break;
3✔
77
    case STREAM_RUNNER_TASK:
9✔
78
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
9!
79
      break;
9✔
80
    default:  
7✔
81
      break;;
7✔
82
  }
83
}
84

85
void mstDestroyScanAddrList(void* param) {
407✔
86
  if (NULL == param) {
407!
87
    return;
×
88
  }
89
  SArray* pList = *(SArray**)param;
407✔
90
  taosArrayDestroy(pList);
407✔
91
}
92

93
void mstDestroySStmSnodeTasksDeploy(void* param) {
103✔
94
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
103✔
95
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
103✔
96
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
103✔
97
}
103✔
98

99
void mstDestroySStmVgTasksToDeploy(void* param) {
251✔
100
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
251✔
101
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
251✔
102
}
251✔
103

104
void mstDestroySStmSnodeStatus(void* param) {
87✔
105
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
87✔
106
  taosHashCleanup(pSnode->streamTasks);
87✔
107
}
87✔
108

109
void mstDestroySStmVgroupStatus(void* param) {
185✔
110
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
185✔
111
  taosHashCleanup(pVg->streamTasks);
185✔
112
}
185✔
113

114
void mstResetSStmStatus(SStmStatus* pStatus) {
371✔
115
  taosArrayDestroy(pStatus->trigReaders);
371✔
116
  pStatus->trigReaders = NULL;
371✔
117
  taosArrayDestroy(pStatus->trigOReaders);
371✔
118
  pStatus->trigOReaders = NULL;
371✔
119
  taosArrayDestroy(pStatus->calcReaders);
371✔
120
  pStatus->calcReaders = NULL;
371✔
121
  if (pStatus->triggerTask) {
371!
122
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
371✔
123
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
371!
124
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
371✔
125
  }
126
  taosMemoryFreeClear(pStatus->triggerTask);
371!
127
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,484✔
128
    taosArrayDestroy(pStatus->runners[i]);
1,113✔
129
    pStatus->runners[i] = NULL;
1,113✔
130
  }
131
}
371✔
132

133
void mstDestroySStmStatus(void* param) {
347✔
134
  SStmStatus* pStatus = (SStmStatus*)param;
347✔
135
  taosMemoryFreeClear(pStatus->streamName);
347!
136

137
  mstResetSStmStatus(pStatus);
347✔
138

139
  taosWLockLatch(&pStatus->userRecalcLock);
347✔
140
  taosArrayDestroy(pStatus->userRecalcList);
347✔
141
  taosWUnLockLatch(&pStatus->userRecalcLock);
347✔
142

143
  tFreeSCMCreateStreamReq(pStatus->pCreate);
347✔
144
  taosMemoryFreeClear(pStatus->pCreate);  
347!
145
}
347✔
146

147
void mstDestroySStmAction(void* param) {
570✔
148
  SStmAction* pAction = (SStmAction*)param;
570✔
149

150
  taosArrayDestroy(pAction->undeploy.taskList);
570✔
151
  taosArrayDestroy(pAction->recalc.recalcList);
570✔
152
}
570✔
153

154
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
527✔
155
  pDeploy->readerTasks = NULL;
527✔
156
  pDeploy->triggerTask = NULL;
527✔
157
  pDeploy->runnerTasks = NULL;
527✔
158
}
527✔
159

160
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
158✔
161
  SSdb   *pSdb = pMnode->pSdb;
158✔
162
  void   *pIter = NULL;
158✔
163
  
164
  while (1) {
665✔
165
    SStreamObj *pStream = NULL;
823✔
166
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
823✔
167
    if (pIter == NULL) break;
823✔
168

169
    if (pStream->pCreate->streamId == streamId) {
775✔
170
      *dropped = pStream->userDropped ? true : false;
110✔
171
      sdbRelease(pSdb, pStream);
110✔
172
      sdbCancelFetch(pSdb, pIter);
110✔
173
      mstsDebug("stream found, dropped:%d", *dropped);
110✔
174
      return TSDB_CODE_SUCCESS;
110✔
175
    }
176
    
177
    sdbRelease(pSdb, pStream);
665✔
178
  }
179

180
  *dropped = true;
48✔
181

182
  return TSDB_CODE_SUCCESS;
48✔
183
}
184

185
typedef struct SStmCheckDbInUseCtx {
186
  bool* dbStream;
187
  bool* vtableStream;
188
  bool  ignoreCurrDb;
189
} SStmCheckDbInUseCtx;
190

191
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
67✔
192
  SStreamObj *pStream = pObj;
67✔
193
  if (atomic_load_8(&pStream->userDropped)) {
67!
194
    return true;
×
195
  }
196

197
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
67✔
198
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
67✔
199
    return true;
2✔
200
  }
201
  
202
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
65!
203
    *pCtx->dbStream = true;
3✔
204
    return false;
3✔
205
  }
206

207
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
62✔
208
  for (int32_t i = 0; i < calcDBNum; ++i) {
122✔
209
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
62✔
210
    if (0 == strcmp(calcDB, p1)) {
62✔
211
      *pCtx->dbStream = true;
2✔
212
      return false;
2✔
213
    }
214
  }
215

216
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
60✔
217
    *pCtx->vtableStream = true;
54✔
218
    return true;
54✔
219
  }
220
  
221
  return true;
6✔
222
}
223

224
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
1,957✔
225
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
1,957✔
226
  if (streamNum <= 0) {
1,957✔
227
    return;
1,948✔
228
  }
229

230
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
9✔
231
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
9✔
232
}
233

234
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
235
  if (status == STREAM_STATUS_INIT) {
×
236
    tstrncpy(dst, "init", bufLen);
×
237
  } else if (status == STREAM_STATUS_RUNNING) {
×
238
    tstrncpy(dst, "running", bufLen);
×
239
  } else if (status == STREAM_STATUS_STOPPED) {
×
240
    tstrncpy(dst, "stopped", bufLen);
×
241
  } else if (status == STREAM_STATUS_FAILED) {
×
242
    tstrncpy(dst, "failed", bufLen);
×
243
  }
244
}
×
245

246
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
247
  SSdb      *pSdb = pMnode->pSdb;
×
248
  void      *pIter = NULL;
×
249
  SSnodeObj *pObj = NULL;
×
250

251
  while (1) {
252
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
253
    if (pIter == NULL) {
×
254
      break;
×
255
    }
256

257
    sdbRelease(pSdb, pObj);
×
258
    sdbCancelFetch(pSdb, pIter);
×
259
    return TSDB_CODE_SUCCESS;
×
260
  }
261

262
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
263
}
264

265
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
266
  pTask->id.taskId = pMsg->taskId;
×
267
  pTask->id.deployId = pMsg->deployId;
×
268
  pTask->id.seriousId = pMsg->seriousId;
×
269
  pTask->id.nodeId = pMsg->nodeId;
×
270
  pTask->id.taskIdx = pMsg->taskIdx;
×
271

272
  pTask->type = pMsg->type;
×
273
  pTask->flags = pMsg->flags;
×
274
  pTask->status = pMsg->status;
×
275
  pTask->lastUpTs = pCtx->currTs;
×
276
}
×
277

278
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
2,223✔
279
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
2,223✔
280
    return false;
1,805✔
281
  }
282

283
  SStmQNode *orig = pQueue->head;
418✔
284

285
  SStmQNode *node = pQueue->head->next;
418✔
286
  pQueue->head = pQueue->head->next;
418✔
287

288
  *param = node;
418✔
289

290
  taosMemoryFreeClear(orig);
418!
291

292
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
418✔
293

294
  return true;
418✔
295
}
296

297
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
418✔
298
  taosWLockLatch(&pQueue->lock);
418✔
299
  pQueue->tail->next = param;
418✔
300
  pQueue->tail = param;
418✔
301
  taosWUnLockLatch(&pQueue->lock);
418✔
302

303
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
418✔
304
}
418✔
305

306
char* mstGetStreamActionString(int32_t action) {
267✔
307
  switch (action) {
267!
308
    case STREAM_ACT_DEPLOY:
267✔
309
      return "DEPLOY";
267✔
310
    case STREAM_ACT_UNDEPLOY:
×
311
      return "UNDEPLOY";
×
312
    case STREAM_ACT_START:
×
313
      return "START";
×
314
    case STREAM_ACT_UPDATE_TRIGGER:
×
315
      return "UPDATE TRIGGER";
×
316
    case STREAM_ACT_RECALC:
×
317
      return "USER RECALC";
×
318
    default:
×
319
      break;
×
320
  }
321

322
  return "UNKNOWN";
×
323
}
324

325
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
376✔
326
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
376!
327
  if (NULL == pNode) {
376!
328
    taosMemoryFreeClear(param);
×
329
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
330
    return;
×
331
  }
332

333
  pNode->type = action;
376✔
334
  pNode->streamAct = true;
376✔
335
  pNode->action.stream.streamId = streamId;
376✔
336
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
376✔
337
  pNode->action.stream.userAction = userAction;
376✔
338
  pNode->action.stream.actionParam = param;
376✔
339
  
340
  pNode->next = NULL;
376✔
341

342
  mndStreamActionEnqueue(actionQ, pNode);
376✔
343

344
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
376✔
345
}
346

347
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
42✔
348
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
42!
349
  if (NULL == pNode) {
42!
350
    int64_t streamId = pAction->streamId;
×
351
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
352
    return;
×
353
  }
354

355
  pNode->type = action;
42✔
356
  pNode->streamAct = false;
42✔
357
  pNode->action.task = *pAction;
42✔
358
  
359
  pNode->next = NULL;
42✔
360

361
  mndStreamActionEnqueue(actionQ, pNode);
42✔
362
}
363

364
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
140✔
365
  int32_t iter = 0;
140✔
366
  SDBVgHashInfo* pVg = NULL;
140✔
367
  void* p = NULL;
140✔
368
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
799✔
369
    pVg = (SDBVgHashInfo*)p;
659✔
370
    taosArrayDestroy(pVg->vgArray);
659✔
371
  }
372
  
373
  tSimpleHashCleanup(pDbVgs);
140✔
374
}
140✔
375

376

377
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
140✔
378
  void*   pIter = NULL;
140✔
379
  int32_t code = TSDB_CODE_SUCCESS;
140✔
380
  int32_t lino = 0;
140✔
381
  SArray* pTarget = NULL;
140✔
382
  SArray* pNew = NULL;
140✔
383
  SDbObj* pDb = NULL;
140✔
384
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
140✔
385
  SVgObj* pVgroup = NULL;
140✔
386

387
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
140✔
388
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
140!
389

390
  while (1) {
918✔
391
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
1,058✔
392
    if (pIter == NULL) {
1,058✔
393
      break;
140✔
394
    }
395

396
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
918✔
397
    if (NULL == pDbInfo) {
918✔
398
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
659✔
399
      if (NULL == pNew) {
659!
400
        sdbRelease(pMnode->pSdb, pVgroup);
×
401
        sdbCancelFetch(pMnode->pSdb, pIter);
×
402
        pVgroup = NULL;
×
403
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
404
      }
405
      
406
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
659✔
407
      if (NULL == pDb) {
659!
408
        sdbRelease(pMnode->pSdb, pVgroup);
×
409
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
410
        pVgroup = NULL;
×
411
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
412
      }
413
      dbInfo.vgSorted = false;
659✔
414
      dbInfo.hashMethod = pDb->cfg.hashMethod;
659✔
415
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
659✔
416
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
659✔
417
      dbInfo.vgArray = pNew;
659✔
418
      
419
      mndReleaseDb(pMnode, pDb);
659✔
420

421
      pTarget = pNew;
659✔
422
    } else {
423
      pTarget = pDbInfo->vgArray;
259✔
424
    }
425

426
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
918✔
427
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
918!
428
      sdbRelease(pMnode->pSdb, pVgroup);
×
429
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
430
      pVgroup = NULL;
×
431
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
432
    }
433

434
    if (NULL == pDbInfo) {
918✔
435
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
659✔
436
      if (code) {
659!
437
        sdbRelease(pMnode->pSdb, pVgroup);
×
438
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
439
        pVgroup = NULL;
×
440
        TAOS_CHECK_EXIT(code);
×
441
      }
442
      pNew = NULL;
659✔
443
    }
444

445
    sdbRelease(pMnode->pSdb, pVgroup);
918✔
446
    pVgroup = NULL;
918✔
447
  }
448

449
  *ppRes = pDbVgroup;
140✔
450
  
451
_exit:
140✔
452

453
  taosArrayDestroy(pNew);
140✔
454

455
  if (code) {
140!
456
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
457
  }
458

459
  return code;
140✔
460
}
461

462
int mstDbVgInfoComp(const void* lp, const void* rp) {
98✔
463
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
98✔
464
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
98✔
465
  if (pLeft->hashBegin < pRight->hashBegin) {
98!
466
    return -1;
98✔
467
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
468
    return 1;
×
469
  }
470

471
  return 0;
×
472
}
473

474
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
463✔
475
  uint32_t*    key = (uint32_t*)lp;
463✔
476
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
463✔
477

478
  if (*key < pVg->hashBegin) {
463!
479
    return -1;
×
480
  } else if (*key > pVg->hashEnd) {
463✔
481
    return 1;
109✔
482
  }
483

484
  return 0;
354✔
485
}
486

487

488
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
354✔
489
  int32_t code = 0;
354✔
490
  int32_t lino = 0;
354✔
491
  SVgroupInfo* vgInfo = NULL;
354✔
492
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
493

494
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
354✔
495
  if (NULL == dbInfo) {
354!
496
    mstError("db %s does not exist", dbFName);
×
497
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
498
  }
499
  
500
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
354✔
501
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
708✔
502
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
354!
503

504
  if (!dbInfo->vgSorted) {
354✔
505
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
140✔
506
    dbInfo->vgSorted = true;
140✔
507
  }
508

509
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
354✔
510
  if (NULL == vgInfo) {
354!
511
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
512
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
513
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
514
  }
515

516
  *vgId = vgInfo->vgId;
354✔
517

518
_exit:
354✔
519

520
  if (code) {
354!
521
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
522
  }
523

524
  return code;
354✔
525
}
526

527

528
void mstLogSStreamObj(char* tips, SStreamObj* p) {
350✔
529
  if (!(stDebugFlag & DEBUG_DEBUG)) {
350✔
530
    return;
106✔
531
  }
532
  
533
  if (NULL == p) {
244!
534
    mstDebug("%s: stream is NULL", tips);
×
535
    return;
×
536
  }
537

538
  mstDebug("%s: stream obj", tips);
244!
539
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
244!
540
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
541

542
  SCMCreateStreamReq* q = p->pCreate;
244✔
543
  if (NULL == q) {
244!
544
    mstDebug("stream pCreate is NULL");
×
545
    return;
×
546
  }
547

548
  int64_t streamId = q->streamId;
244✔
549
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
244✔
550
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
244✔
551
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
244✔
552
  int32_t outColNum = taosArrayGetSize(q->outCols);
244✔
553
  int32_t outTagNum = taosArrayGetSize(q->outTags);
244✔
554
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
244✔
555

556
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
244!
557
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
558
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d notifyErrorHandle:%d notifyHistory:%d "
559
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
560
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
561
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
562
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
563
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
564
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
565
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->notifyErrorHandle, q->notifyHistory,
566
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
567
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
568
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
569
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
570

571
  switch (q->triggerType) {
244!
572
    case WINDOW_TYPE_INTERVAL: {
136✔
573
      SSlidingTrigger* t = &q->trigger.sliding;
136✔
574
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
136!
575
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
576
      break;
136✔
577
    }  
578
    case WINDOW_TYPE_SESSION: {
13✔
579
      SSessionTrigger* t = &q->trigger.session;
13✔
580
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
13!
581
      break;
13✔
582
    }
583
    case WINDOW_TYPE_STATE: {
34✔
584
      SStateWinTrigger* t = &q->trigger.stateWin;
34✔
585
      mstsDebug("state trigger options, slotId:%d, trueForDuration:%" PRId64, t->slotId, t->trueForDuration);
34!
586
      break;
34✔
587
    }
588
    case WINDOW_TYPE_EVENT:{
30✔
589
      SEventTrigger* t = &q->trigger.event;
30✔
590
      mstsDebug("event trigger options, startCond:%s, endCond:%s, trueForDuration:%" PRId64, (char*)t->startCond, (char*)t->endCond, t->trueForDuration);
30!
591
      break;
30✔
592
    }
593
    case WINDOW_TYPE_COUNT: {
17✔
594
      SCountTrigger* t = &q->trigger.count;
17✔
595
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
17!
596
      break;
17✔
597
    }
598
    case WINDOW_TYPE_PERIOD: {
14✔
599
      SPeriodTrigger* t = &q->trigger.period;
14✔
600
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
14!
601
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
602
      break;
14✔
603
    }
604
    default:
×
605
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
606
      break;
×
607
  }
608

609
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
244!
610

611
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
244!
612

613
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
244!
614

615
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
244!
616

617
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
244!
618

619
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
244!
620

621

622
  for (int32_t i = 0; i < calcDBNum; ++i) {
483✔
623
    char* dbName = taosArrayGetP(q->calcDB, i);
239✔
624
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
239!
625
  }
626

627
  for (int32_t i = 0; i < calcScanNum; ++i) {
496✔
628
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
252✔
629
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
252✔
630
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
252!
631
    for (int32_t v = 0; v < vgNum; ++v) {
504✔
632
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
252!
633
    }
634
  }
635

636
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
327✔
637
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
83✔
638
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
83!
639
  }
640

641
  for (int32_t i = 0; i < outColNum; ++i) {
1,171✔
642
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
927✔
643
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
927!
644
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
645
  }
646
      
647
}
648

649
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
1,625✔
650
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
1,625!
651
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
652
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
653
}
1,625✔
654

655
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
371✔
656
  if (!(stDebugFlag & DEBUG_DEBUG)) {
371✔
657
    return;
108✔
658
  }
659
  
660
  if (NULL == p) {
263!
661
    mstsDebug("%s: stream status is NULL", tips);
×
662
    return;
×
663
  }
664

665
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
263✔
666
  int32_t trigOReaderNum = taosArrayGetSize(p->trigOReaders);
263✔
667
  int32_t calcReaderNum = taosArrayGetSize(p->calcReaders);
263✔
668
  int32_t triggerNum = p->triggerTask ? 1 : 0;
263✔
669
  int32_t runnerNum = 0;
263✔
670

671
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
1,037✔
672
    runnerNum += taosArrayGetSize(p->runners[i]);
774✔
673
  }
674

675
  mstsDebug("%s: stream status", tips);
263!
676
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
263!
677
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
678
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
679
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
680

681
  SStmTaskStatus* pTask = NULL;
263✔
682
  for (int32_t i = 0; i < trigReaderNum; ++i) {
624✔
683
    pTask = taosArrayGet(p->trigReaders, i);
361✔
684
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
361✔
685
  }
686

687
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
263!
688
    pTask = taosArrayGet(p->trigOReaders, i);
×
689
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
690
  }
691

692
  for (int32_t i = 0; i < calcReaderNum; ++i) {
418✔
693
    pTask = taosArrayGet(p->calcReaders, i);
155✔
694
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
155✔
695
  }
696

697
  if (triggerNum > 0) {
263!
698
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
263✔
699
  }
700

701
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
1,037✔
702
    int32_t num = taosArrayGetSize(p->runners[i]);
774✔
703
    if (num <= 0) {
774!
704
      continue;
×
705
    }
706
    
707
    mstsDebug("the %dth deploy runners status", i);
774!
708
    for (int32_t m = 0; m < num; ++m) {
1,620✔
709
      pTask = taosArrayGet(p->runners[i], m);
846✔
710
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
846✔
711
    }
712
  }
713
      
714
}
715

716
bool mstEventPassIsolation(int32_t num, int32_t event) {
5,387✔
717
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
5,387✔
718
  if (ret) {
5,387✔
719
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
3,963✔
720
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
721
  }
722

723
  return ret;
5,387✔
724
}
725

726
bool mstEventHandledChkSet(int32_t event) {
3,963✔
727
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
3,963✔
728
    mstDebug("event %s set handled", gMndStreamEvent[event]);
354✔
729
    return true;
354✔
730
  }
731
  return false;
3,609✔
732
}
733

734
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
1,763✔
735
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
1,763✔
736
  if (0 == active || MND_STM_STATE_NORMAL != state) {
1,763!
UNCOV
737
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
×
UNCOV
738
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
×
UNCOV
739
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
×
UNCOV
740
    return TSDB_CODE_SUCCESS;
×
741
  }
742

743
  if (atomic_load_8(&pStream->userDropped)) {
1,763!
744
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
745
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
746
    return TSDB_CODE_SUCCESS;
×
747
  }
748

749
  if (atomic_load_8(&pStream->userStopped)) {
1,763✔
750
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
26✔
751
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
26✔
752
    return TSDB_CODE_SUCCESS;
26✔
753
  }
754

755
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
1,737✔
756
  
757
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
1,737✔
758
  if (NULL == pStatus) {
1,737✔
759
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
279✔
760
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
279✔
761
    goto _exit;
279✔
762
  }
763

764
  char tmpBuf[256];
765
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1,458✔
766
  switch (stopped) {
1,458!
767
    case 1:
69✔
768
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
69✔
769
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
69✔
770
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
69✔
771
      goto _exit;
69✔
772
      break;
773
    case 4:
×
774
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
775
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
776
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
777
      goto _exit;
×
778
      break;
779
    default:
1,389✔
780
      break;
1,389✔
781
  }
782

783
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
1,389!
784
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
873✔
785
    strcpy(tmpBuf, "Running start from: ");
873✔
786
    (void)formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
873✔
787
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
873✔
788
    goto _exit;
873✔
789
  }
790

791
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
516✔
792
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
516✔
793
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
516✔
794
  goto _exit;
516✔
795

796
_exit:
1,737✔
797
  
798
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1,737✔
799

800
  return TSDB_CODE_SUCCESS;
1,737✔
801
}
802

803
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
1,763✔
804
  int32_t code = 0;
1,763✔
805
  int32_t cols = 0;
1,763✔
806
  int32_t lino = 0;
1,763✔
807

808
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,763✔
809
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
1,763✔
810
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
811
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
812

813
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
1,763✔
814
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
815

816
  // db_name
817
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,763✔
818
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
1,763✔
819
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
820
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
821
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
1,763✔
822
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
823

824
  // create time
825
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
826
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
827
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
1,763✔
828
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
829

830
  // stream id
831
  char streamId2[19] = {0};
1,763✔
832
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
1,763✔
833
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
1,763✔
834
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
1,763✔
835
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
836
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
837
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
1,763✔
838
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
839

840
  // sql
841
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
1,763✔
842
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
1,763✔
843
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
844
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
845
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
1,763✔
846
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
847

848
  // status
849
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
1,763✔
850
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,763✔
851
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
1,763✔
852
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
853

854
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
855
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
856
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
1,763✔
857
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
858

859
  // snodeLeader
860
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
861
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
862
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
1,763✔
863
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
864

865
  // snodeReplica
866
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
867
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
868
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
1,763✔
869
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
1,763!
870
  mndReleaseSnode(pMnode, pSnode);
1,763✔
871
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
1,763✔
872
  TSDB_CHECK_CODE(code, lino, _end);
1,763!
873

874
  // msg
875
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,763✔
876
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,763!
877
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
1,763✔
878

879
_end:
1,763✔
880
  if (code) {
1,763!
881
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
882
  }
883
  return code;
1,763✔
884
}
885

886

887
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
22,964✔
888
  char tmpBuf[256];
889
  
890
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
22,964✔
891
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
22,964!
892
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
×
893
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
894
    return TSDB_CODE_SUCCESS;
×
895
  }
896

897
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
22,964!
898
    if (pTask->detailStatus) {
3,912✔
899
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
2,871✔
900
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
2,871✔
901
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
902
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
2,871✔
903
      taosRUnLockLatch(&pTask->detailStatusLock);
2,871✔
904
      return TSDB_CODE_SUCCESS;
2,871✔
905
    }
906

907
    taosRUnLockLatch(&pTask->detailStatusLock);    
1,041✔
908
  }
909
  
910
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
20,093✔
911
  
912
  return TSDB_CODE_SUCCESS;
20,093✔
913
}
914

915
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
22,964✔
916
  switch (pTask->type) {
22,964✔
917
    case STREAM_READER_TASK:
6,830✔
918
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
6,830✔
919
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
5,116✔
920
      } else {
921
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
1,714✔
922
      }
923
      return TSDB_CODE_SUCCESS;
6,830✔
924
    case STREAM_RUNNER_TASK:
12,222✔
925
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
12,222✔
926
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
11,514✔
927
        return TSDB_CODE_SUCCESS;
11,514✔
928
      }
929
      break;
708✔
930
    default:
3,912✔
931
      break;
3,912✔
932
  }
933

934
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
4,620✔
935
  return TSDB_CODE_SUCCESS;
4,620✔
936
}
937

938

939
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
22,964✔
940
  int32_t code = 0;
22,964✔
941
  int32_t cols = 0;
22,964✔
942
  int32_t lino = 0;
22,964✔
943

944
  // stream_name
945
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
22,964✔
946
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
22,964✔
947
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
948
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
949

950
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
22,964✔
951
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
952

953
  // stream id
954
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
22,964✔
955
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
22,964✔
956
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
22,964✔
957
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
958
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
959
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
22,964✔
960
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
961

962
  // task id
963
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
22,964✔
964
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
22,964✔
965
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
966
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
967
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
22,964✔
968
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
969

970
  // type
971
  char type[20 + VARSTR_HEADER_SIZE] = {0};
22,964✔
972
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
22,964✔
973
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
974
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
975
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
22,964✔
976
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
977

978
  // serious id
979
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
22,964✔
980
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
22,964✔
981
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
982
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
983
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
22,964✔
984
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
985

986
  // deploy id
987
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
988
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
989
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
22,964✔
990
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
991

992
  // node_type
993
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
22,964✔
994
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
22,964✔
995
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
996
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
997
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
22,964✔
998
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
999

1000
  // node id
1001
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
1002
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
1003
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
22,964✔
1004
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
1005

1006
  // task idx
1007
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
1008
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
1009
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
22,964✔
1010
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
1011

1012
  // status
1013
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
22,964✔
1014
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
22,964✔
1015
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
22,964✔
1016
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
1017

1018
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
1019
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
1020
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
22,964✔
1021
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
1022

1023
  // start time
1024
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
1025
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
1026
  if (pTask->runningStartTs) {
22,964✔
1027
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
3,367✔
1028
  } else {
1029
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
19,597✔
1030
  }
1031
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
1032

1033
  // last update
1034
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
1035
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
1036
  if (pTask->lastUpTs) {
22,964!
1037
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
22,964✔
1038
  } else {
1039
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1040
  }
1041
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
1042

1043
  // extra info
1044
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
22,964✔
1045
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
22,964✔
1046
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
1047
  
1048
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
1049
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
1050
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
22,964✔
1051
  TSDB_CHECK_CODE(code, lino, _end);
22,964!
1052

1053
  // msg
1054
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
22,964✔
1055
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
22,964!
1056
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
22,964✔
1057

1058
_end:
22,964✔
1059
  if (code) {
22,964!
1060
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1061
  }
1062
  return code;
22,964✔
1063
}
1064

1065
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
3,912✔
1066
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + taosArrayGetSize(pStatus->trigOReaders) + taosArrayGetSize(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
3,912!
1067
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
15,648✔
1068
    num += taosArrayGetSize(pStatus->runners[i]);
11,736✔
1069
  }
1070

1071
  return num;
3,912✔
1072
}
1073

1074
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
4,268✔
1075
  int32_t code = 0;
4,268✔
1076
  int32_t lino = 0;
4,268✔
1077
  int64_t streamId = pStream->pCreate->streamId;
4,268✔
1078

1079
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
4,268✔
1080

1081
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
4,268✔
1082
  if (NULL == pStatus) {
4,268✔
1083
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
355✔
1084
    goto _exit;
355✔
1085
  }
1086

1087
  int8_t stopped = atomic_load_8(&pStatus->stopped);
3,913✔
1088
  if (stopped) {
3,913✔
1089
    mstsDebug("stream stopped %d, ignore it", stopped);
1!
1090
    goto _exit;
1✔
1091
  }
1092
  
1093
  int32_t count = mstGetNumOfStreamTasks(pStatus);
3,912✔
1094

1095
  if (*numOfRows + count > rowsCapacity) {
3,912✔
1096
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
120✔
1097
    if (code) {
120!
1098
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1099
      TAOS_CHECK_EXIT(code);
×
1100
    }
1101
  }
1102

1103
  SStmTaskStatus* pTask = NULL;
3,912✔
1104
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
3,912✔
1105
  for (int32_t i = 0; i < trigReaderNum; ++i) {
8,183✔
1106
    pTask = taosArrayGet(pStatus->trigReaders, i);
4,271✔
1107
  
1108
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
4,271✔
1109
    if (code == TSDB_CODE_SUCCESS) {
4,271!
1110
      (*numOfRows)++;
4,271✔
1111
    }
1112
  }
1113

1114
  trigReaderNum = taosArrayGetSize(pStatus->trigOReaders);
3,912✔
1115
  for (int32_t i = 0; i < trigReaderNum; ++i) {
4,757✔
1116
    pTask = taosArrayGet(pStatus->trigOReaders, i);
845✔
1117
  
1118
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
845✔
1119
    if (code == TSDB_CODE_SUCCESS) {
845!
1120
      (*numOfRows)++;
845✔
1121
    }
1122
  }
1123

1124

1125
  int32_t calcReaderNum = taosArrayGetSize(pStatus->calcReaders);
3,912✔
1126
  for (int32_t i = 0; i < calcReaderNum; ++i) {
5,626✔
1127
    pTask = taosArrayGet(pStatus->calcReaders, i);
1,714✔
1128
  
1129
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
1,714✔
1130
    if (code == TSDB_CODE_SUCCESS) {
1,714!
1131
      (*numOfRows)++;
1,714✔
1132
    }
1133
  }
1134

1135
  if (pStatus->triggerTask) {
3,912!
1136
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
3,912✔
1137
    if (code == TSDB_CODE_SUCCESS) {
3,912!
1138
      (*numOfRows)++;
3,912✔
1139
    }
1140
  }
1141

1142
  int32_t runnerNum = 0;
3,912✔
1143
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
15,648✔
1144
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
11,736✔
1145
    for (int32_t m = 0; m < runnerNum; ++m) {
23,958✔
1146
      pTask = taosArrayGet(pStatus->runners[i], m);
12,222✔
1147
    
1148
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
12,222✔
1149
      if (code == TSDB_CODE_SUCCESS) {
12,222!
1150
        (*numOfRows)++;
12,222✔
1151
      }
1152
    }
1153
  }
1154
  
1155
  pBlock->info.rows = *numOfRows;
3,912✔
1156

1157
_exit:
4,268✔
1158
  
1159
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
4,268✔
1160

1161
  if (code) {
4,268!
1162
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1163
  }
1164
  
1165
  return code;
4,268✔
1166
}
1167

1168

1169
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
5✔
1170
  int32_t code = 0;
5✔
1171
  int32_t lino = 0;
5✔
1172
  bool    locked = false;
5✔
1173
  SArray* userRecalcList = NULL;
5✔
1174

1175
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
5✔
1176
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
5!
1177
  
1178
  taosWLockLatch(&pStream->userRecalcLock);
5✔
1179
  locked = true;
5✔
1180
  
1181
  if (NULL == pStream->userRecalcList) {
5!
1182
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
5✔
1183
    if (NULL == userRecalcList) {
5!
1184
      TAOS_CHECK_EXIT(terrno);
×
1185
    }
1186

1187
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
5!
1188

1189
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
5✔
1190
    userRecalcList = NULL;    
5✔
1191
  } else {
1192
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1193
  }
1194
  
1195
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
5!
1196

1197
_exit:
×
1198

1199
  taosArrayDestroy(userRecalcList);
5✔
1200

1201
  if (locked) {
5!
1202
    taosWUnLockLatch(&pStream->userRecalcLock);
5✔
1203
  }
1204
  
1205
  if (code) {
5!
1206
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1207
  }
1208
  
1209
  return code;
5✔
1210
}
1211

1212

1213

1214
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1215
  int32_t code = 0;
×
1216
  int32_t cols = 0;
×
1217
  int32_t lino = 0;
×
1218

1219
  // stream_name
1220
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1221
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1222
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1223
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1224

1225
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1226
  TSDB_CHECK_CODE(code, lino, _end);
×
1227

1228
  // stream id
1229
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1230
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1231
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1232
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1233
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1234
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1235
  TSDB_CHECK_CODE(code, lino, _end);
×
1236

1237
  // recalc id
1238
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1239
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1240
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1241
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1242
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1243
  TSDB_CHECK_CODE(code, lino, _end);
×
1244

1245
  // start
1246
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1247
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1248
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1249
  TSDB_CHECK_CODE(code, lino, _end);
×
1250

1251
  // end
1252
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1253
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1254
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1255
  TSDB_CHECK_CODE(code, lino, _end);
×
1256

1257
  // progress
1258
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1259
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1260
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1261
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1262
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1263
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1264
  TSDB_CHECK_CODE(code, lino, _end);
×
1265

1266
_end:
×
1267
  if (code) {
×
1268
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1269
  }
1270
  return code;
×
1271
}
1272

1273

1274
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
1✔
1275
  int32_t code = 0;
1✔
1276
  int32_t lino = 0;
1✔
1277
  int64_t streamId = pStream->pCreate->streamId;
1✔
1278

1279
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
1✔
1280

1281
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1✔
1282
  if (NULL == pStatus) {
1!
1283
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1284
    goto _exit;
×
1285
  }
1286

1287
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1✔
1288
  if (stopped) {
1!
1289
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1290
    goto _exit;
×
1291
  }
1292

1293
  if (NULL == pStatus->triggerTask) {
1!
1294
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1295
    goto _exit;
×
1296
  }
1297

1298
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
1✔
1299
  if (NULL == pStatus->triggerTask->detailStatus) {
1!
1300
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1301
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1302
    goto _exit;
×
1303
  }
1304

1305
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
1✔
1306
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
1✔
1307

1308
  if (*numOfRows + count > rowsCapacity) {
1!
1309
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1310
    if (code) {
×
1311
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1312
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1313
      TAOS_CHECK_EXIT(code);
×
1314
    }
1315
  }
1316

1317
  for (int32_t i = 0; i < count; ++i) {
1!
1318
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1319
  
1320
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1321
    if (code == TSDB_CODE_SUCCESS) {
×
1322
      (*numOfRows)++;
×
1323
    }
1324
  }
1325

1326
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
1✔
1327
  
1328
  pBlock->info.rows = *numOfRows;
1✔
1329

1330
_exit:
1✔
1331
  
1332
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1✔
1333

1334
  if (code) {
1!
1335
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1336
  }
1337
  
1338
  return code;
1✔
1339
}
1340

1341

1342

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc