• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4781

09 Oct 2025 07:30AM UTC coverage: 57.663% (-0.1%) from 57.808%
#4781

push

travis-ci

web-flow
Merge pull request #33183 from taosdata/fix/sort-release-note

fix: replace DocCardList with SortedDocCardList in release notes

136563 of 302745 branches covered (45.11%)

Branch coverage included in aggregate %.

207769 of 294403 relevant lines covered (70.57%)

4216536.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.47
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
45,689✔
26
  if (readLock) {
45,689✔
27
    while (taosRTryLockLatch(pLock)) {
41,218✔
28
      taosMsleep(1);
441✔
29
    }
30

31
    return true;
40,777✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
4,912✔
35

36
  return true;
4,912✔
37
}
38

39
void mstDestroySStmVgStreamStatus(void* p) { 
872✔
40
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
872✔
41
  taosArrayDestroy(pStatus->trigReaders); 
872✔
42
  taosArrayDestroy(pStatus->calcReaders); 
872✔
43
}
872✔
44

45
void mstDestroySStmSnodeStreamStatus(void* p) { 
442✔
46
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
442✔
47
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,768✔
48
    taosArrayDestroy(pStatus->runners[i]);
1,326✔
49
    pStatus->runners[i] = NULL;
1,326✔
50
  }
51
}
442✔
52

53

54
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
55
  taosHashCleanup(pVgStatus->streamTasks);
×
56
  pVgStatus->streamTasks = NULL;
×
57
}
×
58

59
void mstDestroySStmTaskToDeployExt(void* param) {
2,822✔
60
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
2,822✔
61
  if (pExt->deployed) {
2,822!
62
    return;
2,822✔
63
  }
64
  
65
  switch (pExt->deploy.task.type) {
×
66
    case STREAM_TRIGGER_TASK:
×
67
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
×
68
      pExt->deploy.msg.trigger.readerList = NULL;
×
69
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
×
70
      pExt->deploy.msg.trigger.runnerList = NULL;
×
71
      break;
×
72
    case STREAM_RUNNER_TASK:
×
73
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
×
74
      break;
×
75
    default:  
×
76
      break;;
×
77
  }
78
}
79

80
void mstDestroyScanAddrList(void* param) {
686✔
81
  if (NULL == param) {
686!
82
    return;
×
83
  }
84
  SArray* pList = *(SArray**)param;
686✔
85
  taosArrayDestroy(pList);
686✔
86
}
87

88
void mstDestroySStmSnodeTasksDeploy(void* param) {
105✔
89
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
105✔
90
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
105✔
91
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
105✔
92
}
105✔
93

94
void mstDestroySStmVgTasksToDeploy(void* param) {
293✔
95
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
293✔
96
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
293✔
97
}
293✔
98

99
void mstDestroySStmSnodeStatus(void* param) {
96✔
100
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
96✔
101
  taosHashCleanup(pSnode->streamTasks);
96✔
102
}
96✔
103

104
void mstDestroySStmVgroupStatus(void* param) {
220✔
105
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
220✔
106
  taosHashCleanup(pVg->streamTasks);
220✔
107
}
220✔
108

109
void mstResetSStmStatus(SStmStatus* pStatus) {
412✔
110
  taosArrayDestroy(pStatus->trigReaders);
412✔
111
  pStatus->trigReaders = NULL;
412✔
112
  taosArrayDestroy(pStatus->trigOReaders);
412✔
113
  pStatus->trigOReaders = NULL;
412✔
114
  pStatus->calcReaders = tdListFree(pStatus->calcReaders);
412✔
115
  if (pStatus->triggerTask) {
412!
116
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
412✔
117
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
412!
118
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
412✔
119
  }
120
  taosMemoryFreeClear(pStatus->triggerTask);
412!
121
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,648✔
122
    taosArrayDestroy(pStatus->runners[i]);
1,236✔
123
    pStatus->runners[i] = NULL;
1,236✔
124
  }
125
}
412✔
126

127
void mstDestroySStmStatus(void* param) {
390✔
128
  SStmStatus* pStatus = (SStmStatus*)param;
390✔
129
  taosMemoryFreeClear(pStatus->streamName);
390!
130

131
  mstResetSStmStatus(pStatus);
390✔
132

133
  taosWLockLatch(&pStatus->userRecalcLock);
390✔
134
  taosArrayDestroy(pStatus->userRecalcList);
390✔
135
  taosWUnLockLatch(&pStatus->userRecalcLock);
390✔
136

137
  tFreeSCMCreateStreamReq(pStatus->pCreate);
390✔
138
  taosMemoryFreeClear(pStatus->pCreate);  
390!
139
}
390✔
140

141
void mstDestroySStmAction(void* param) {
631✔
142
  SStmAction* pAction = (SStmAction*)param;
631✔
143

144
  taosArrayDestroy(pAction->undeploy.taskList);
631✔
145
  taosArrayDestroy(pAction->recalc.recalcList);
631✔
146
}
631✔
147

148
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
551✔
149
  pDeploy->readerTasks = NULL;
551✔
150
  pDeploy->triggerTask = NULL;
551✔
151
  pDeploy->runnerTasks = NULL;
551✔
152
}
551✔
153

154
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
186✔
155
  SSdb   *pSdb = pMnode->pSdb;
186✔
156
  void   *pIter = NULL;
186✔
157
  
158
  while (1) {
860✔
159
    SStreamObj *pStream = NULL;
1,046✔
160
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
1,046✔
161
    if (pIter == NULL) break;
1,046✔
162

163
    if (pStream->pCreate->streamId == streamId) {
988✔
164
      *dropped = pStream->userDropped ? true : false;
128✔
165
      sdbRelease(pSdb, pStream);
128✔
166
      sdbCancelFetch(pSdb, pIter);
128✔
167
      mstsDebug("stream found, dropped:%d", *dropped);
128✔
168
      return TSDB_CODE_SUCCESS;
128✔
169
    }
170
    
171
    sdbRelease(pSdb, pStream);
860✔
172
  }
173

174
  *dropped = true;
58✔
175

176
  return TSDB_CODE_SUCCESS;
58✔
177
}
178

179
typedef struct SStmCheckDbInUseCtx {
180
  bool* dbStream;
181
  bool* vtableStream;
182
  bool  ignoreCurrDb;
183
} SStmCheckDbInUseCtx;
184

185
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
159✔
186
  SStreamObj *pStream = pObj;
159✔
187
  if (atomic_load_8(&pStream->userDropped)) {
159!
188
    return true;
×
189
  }
190

191
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
159✔
192
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
159✔
193
    return true;
10✔
194
  }
195
  
196
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
149!
197
    *pCtx->dbStream = true;
4✔
198
    return false;
4✔
199
  }
200

201
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
145✔
202
  for (int32_t i = 0; i < calcDBNum; ++i) {
287✔
203
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
145✔
204
    if (0 == strcmp(calcDB, p1)) {
145✔
205
      *pCtx->dbStream = true;
3✔
206
      return false;
3✔
207
    }
208
  }
209

210
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
142✔
211
    *pCtx->vtableStream = true;
44✔
212
    return true;
44✔
213
  }
214
  
215
  return true;
98✔
216
}
217

218
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
1,462✔
219
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
1,462✔
220
  if (streamNum <= 0) {
1,462✔
221
    return;
1,437✔
222
  }
223

224
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
25✔
225
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
25✔
226
}
227

228
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
229
  if (status == STREAM_STATUS_INIT) {
×
230
    tstrncpy(dst, "init", bufLen);
×
231
  } else if (status == STREAM_STATUS_RUNNING) {
×
232
    tstrncpy(dst, "running", bufLen);
×
233
  } else if (status == STREAM_STATUS_STOPPED) {
×
234
    tstrncpy(dst, "stopped", bufLen);
×
235
  } else if (status == STREAM_STATUS_FAILED) {
×
236
    tstrncpy(dst, "failed", bufLen);
×
237
  }
238
}
×
239

240
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
241
  SSdb      *pSdb = pMnode->pSdb;
×
242
  void      *pIter = NULL;
×
243
  SSnodeObj *pObj = NULL;
×
244

245
  while (1) {
246
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
247
    if (pIter == NULL) {
×
248
      break;
×
249
    }
250

251
    sdbRelease(pSdb, pObj);
×
252
    sdbCancelFetch(pSdb, pIter);
×
253
    return TSDB_CODE_SUCCESS;
×
254
  }
255

256
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
257
}
258

259
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
260
  pTask->id.taskId = pMsg->taskId;
×
261
  pTask->id.deployId = pMsg->deployId;
×
262
  pTask->id.seriousId = pMsg->seriousId;
×
263
  pTask->id.nodeId = pMsg->nodeId;
×
264
  pTask->id.taskIdx = pMsg->taskIdx;
×
265

266
  pTask->type = pMsg->type;
×
267
  pTask->flags = pMsg->flags;
×
268
  pTask->status = pMsg->status;
×
269
  pTask->lastUpTs = pCtx->currTs;
×
270
}
×
271

272
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
1,627✔
273
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
1,627✔
274
    return false;
1,203✔
275
  }
276

277
  SStmQNode *orig = pQueue->head;
424✔
278

279
  SStmQNode *node = pQueue->head->next;
424✔
280
  pQueue->head = pQueue->head->next;
424✔
281

282
  *param = node;
424✔
283

284
  taosMemoryFreeClear(orig);
424!
285

286
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
424✔
287

288
  return true;
424✔
289
}
290

291
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
424✔
292
  taosWLockLatch(&pQueue->lock);
424✔
293
  pQueue->tail->next = param;
424✔
294
  pQueue->tail = param;
424✔
295
  taosWUnLockLatch(&pQueue->lock);
424✔
296

297
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
424✔
298
}
424✔
299

300
char* mstGetStreamActionString(int32_t action) {
233✔
301
  switch (action) {
233!
302
    case STREAM_ACT_DEPLOY:
233✔
303
      return "DEPLOY";
233✔
304
    case STREAM_ACT_UNDEPLOY:
×
305
      return "UNDEPLOY";
×
306
    case STREAM_ACT_START:
×
307
      return "START";
×
308
    case STREAM_ACT_UPDATE_TRIGGER:
×
309
      return "UPDATE TRIGGER";
×
310
    case STREAM_ACT_RECALC:
×
311
      return "USER RECALC";
×
312
    default:
×
313
      break;
×
314
  }
315

316
  return "UNKNOWN";
×
317
}
318

319
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
418✔
320
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
418!
321
  if (NULL == pNode) {
418!
322
    taosMemoryFreeClear(param);
×
323
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
324
    return;
×
325
  }
326

327
  pNode->type = action;
418✔
328
  pNode->streamAct = true;
418✔
329
  pNode->action.stream.streamId = streamId;
418✔
330
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
418✔
331
  pNode->action.stream.userAction = userAction;
418✔
332
  pNode->action.stream.actionParam = param;
418✔
333
  
334
  pNode->next = NULL;
418✔
335

336
  mndStreamActionEnqueue(actionQ, pNode);
418✔
337

338
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
418✔
339
}
340

341
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
6✔
342
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
6!
343
  if (NULL == pNode) {
6!
344
    int64_t streamId = pAction->streamId;
×
345
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
346
    return;
×
347
  }
348

349
  pNode->type = action;
6✔
350
  pNode->streamAct = false;
6✔
351
  pNode->action.task = *pAction;
6✔
352
  
353
  pNode->next = NULL;
6✔
354

355
  mndStreamActionEnqueue(actionQ, pNode);
6✔
356
}
357

358
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
133✔
359
  int32_t iter = 0;
133✔
360
  SDBVgHashInfo* pVg = NULL;
133✔
361
  void* p = NULL;
133✔
362
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
726✔
363
    pVg = (SDBVgHashInfo*)p;
593✔
364
    taosArrayDestroy(pVg->vgArray);
593✔
365
  }
366
  
367
  tSimpleHashCleanup(pDbVgs);
133✔
368
}
133✔
369

370

371
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
133✔
372
  void*   pIter = NULL;
133✔
373
  int32_t code = TSDB_CODE_SUCCESS;
133✔
374
  int32_t lino = 0;
133✔
375
  SArray* pTarget = NULL;
133✔
376
  SArray* pNew = NULL;
133✔
377
  SDbObj* pDb = NULL;
133✔
378
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
133✔
379
  SVgObj* pVgroup = NULL;
133✔
380

381
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
133✔
382
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
133!
383

384
  while (1) {
842✔
385
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
975✔
386
    if (pIter == NULL) {
975✔
387
      break;
133✔
388
    }
389

390
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
842✔
391
    if (NULL == pDbInfo) {
842✔
392
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
593✔
393
      if (NULL == pNew) {
593!
394
        sdbRelease(pMnode->pSdb, pVgroup);
×
395
        sdbCancelFetch(pMnode->pSdb, pIter);
×
396
        pVgroup = NULL;
×
397
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
398
      }
399
      
400
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
593✔
401
      if (NULL == pDb) {
593!
402
        sdbRelease(pMnode->pSdb, pVgroup);
×
403
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
404
        pVgroup = NULL;
×
405
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
406
      }
407
      dbInfo.vgSorted = false;
593✔
408
      dbInfo.hashMethod = pDb->cfg.hashMethod;
593✔
409
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
593✔
410
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
593✔
411
      dbInfo.vgArray = pNew;
593✔
412
      
413
      mndReleaseDb(pMnode, pDb);
593✔
414

415
      pTarget = pNew;
593✔
416
    } else {
417
      pTarget = pDbInfo->vgArray;
249✔
418
    }
419

420
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
842✔
421
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
842!
422
      sdbRelease(pMnode->pSdb, pVgroup);
×
423
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
424
      pVgroup = NULL;
×
425
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
426
    }
427

428
    if (NULL == pDbInfo) {
842✔
429
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
593✔
430
      if (code) {
593!
431
        sdbRelease(pMnode->pSdb, pVgroup);
×
432
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
433
        pVgroup = NULL;
×
434
        TAOS_CHECK_EXIT(code);
×
435
      }
436
      pNew = NULL;
593✔
437
    }
438

439
    sdbRelease(pMnode->pSdb, pVgroup);
842✔
440
    pVgroup = NULL;
842✔
441
  }
442

443
  *ppRes = pDbVgroup;
133✔
444
  
445
_exit:
133✔
446

447
  taosArrayDestroy(pNew);
133✔
448

449
  if (code) {
133!
450
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
451
  }
452

453
  return code;
133✔
454
}
455

456
int mstDbVgInfoComp(const void* lp, const void* rp) {
95✔
457
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
95✔
458
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
95✔
459
  if (pLeft->hashBegin < pRight->hashBegin) {
95!
460
    return -1;
95✔
461
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
462
    return 1;
×
463
  }
464

465
  return 0;
×
466
}
467

468
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
446✔
469
  uint32_t*    key = (uint32_t*)lp;
446✔
470
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
446✔
471

472
  if (*key < pVg->hashBegin) {
446!
473
    return -1;
×
474
  } else if (*key > pVg->hashEnd) {
446✔
475
    return 1;
107✔
476
  }
477

478
  return 0;
339✔
479
}
480

481

482
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
339✔
483
  int32_t code = 0;
339✔
484
  int32_t lino = 0;
339✔
485
  SVgroupInfo* vgInfo = NULL;
339✔
486
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
487

488
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
339✔
489
  if (NULL == dbInfo) {
339!
490
    mstError("db %s does not exist", dbFName);
×
491
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
492
  }
493
  
494
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
339✔
495
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
678✔
496
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
339!
497

498
  if (!dbInfo->vgSorted) {
339✔
499
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
133✔
500
    dbInfo->vgSorted = true;
133✔
501
  }
502

503
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
339✔
504
  if (NULL == vgInfo) {
339!
505
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
506
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
507
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
508
  }
509

510
  *vgId = vgInfo->vgId;
339✔
511

512
_exit:
339✔
513

514
  if (code) {
339!
515
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
516
  }
517

518
  return code;
339✔
519
}
520

521

522
void mstLogSStreamObj(char* tips, SStreamObj* p) {
395✔
523
  if (!(stDebugFlag & DEBUG_DEBUG)) {
395✔
524
    return;
180✔
525
  }
526
  
527
  if (NULL == p) {
215!
528
    mstDebug("%s: stream is NULL", tips);
×
529
    return;
×
530
  }
531

532
  mstDebug("%s: stream obj", tips);
215!
533
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
215!
534
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
535

536
  SCMCreateStreamReq* q = p->pCreate;
215✔
537
  if (NULL == q) {
215!
538
    mstDebug("stream pCreate is NULL");
×
539
    return;
×
540
  }
541

542
  int64_t streamId = q->streamId;
215✔
543
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
215✔
544
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
215✔
545
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
215✔
546
  int32_t outColNum = taosArrayGetSize(q->outCols);
215✔
547
  int32_t outTagNum = taosArrayGetSize(q->outTags);
215✔
548
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
215✔
549

550
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
215!
551
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
552
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d "
553
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
554
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
555
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
556
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
557
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
558
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
559
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory,
560
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
561
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
562
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
563
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
564

565
  switch (q->triggerType) {
215!
566
    case WINDOW_TYPE_INTERVAL: {
125✔
567
      SSlidingTrigger* t = &q->trigger.sliding;
125✔
568
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
125!
569
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
570
      break;
125✔
571
    }  
572
    case WINDOW_TYPE_SESSION: {
8✔
573
      SSessionTrigger* t = &q->trigger.session;
8✔
574
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
8!
575
      break;
8✔
576
    }
577
    case WINDOW_TYPE_STATE: {
39✔
578
      SStateWinTrigger* t = &q->trigger.stateWin;
39✔
579
      mstsDebug("state trigger options, slotId:%d, expr:%s, extend:%d, trueForDuration:%" PRId64, t->slotId, (char *)t->expr, t->extend, t->trueForDuration);
39!
580
      break;
39✔
581
    }
582
    case WINDOW_TYPE_EVENT:{
21✔
583
      SEventTrigger* t = &q->trigger.event;
21✔
584
      mstsDebug("event trigger options, startCond:%s, endCond:%s, trueForDuration:%" PRId64, (char*)t->startCond, (char*)t->endCond, t->trueForDuration);
21!
585
      break;
21✔
586
    }
587
    case WINDOW_TYPE_COUNT: {
13✔
588
      SCountTrigger* t = &q->trigger.count;
13✔
589
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
13!
590
      break;
13✔
591
    }
592
    case WINDOW_TYPE_PERIOD: {
9✔
593
      SPeriodTrigger* t = &q->trigger.period;
9✔
594
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
9!
595
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
596
      break;
9✔
597
    }
598
    default:
×
599
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
600
      break;
×
601
  }
602

603
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
215!
604

605
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
215!
606

607
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
215!
608

609
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
215!
610

611
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
215!
612

613
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
215!
614

615

616
  for (int32_t i = 0; i < calcDBNum; ++i) {
429✔
617
    char* dbName = taosArrayGetP(q->calcDB, i);
214✔
618
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
214!
619
  }
620

621
  for (int32_t i = 0; i < calcScanNum; ++i) {
501✔
622
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
286✔
623
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
286✔
624
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
286!
625
    for (int32_t v = 0; v < vgNum; ++v) {
572✔
626
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
286!
627
    }
628
  }
629

630
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
240✔
631
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
25✔
632
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
25!
633
  }
634

635
  for (int32_t i = 0; i < outColNum; ++i) {
1,026✔
636
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
811✔
637
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
811!
638
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
639
  }
640
      
641
}
642

643
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
1,517✔
644
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
1,517!
645
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
646
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
647
}
1,517✔
648

649
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
412✔
650
  if (!(stDebugFlag & DEBUG_DEBUG)) {
412✔
651
    return;
185✔
652
  }
653
  
654
  if (NULL == p) {
227!
655
    mstsDebug("%s: stream status is NULL", tips);
×
656
    return;
×
657
  }
658

659
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
227✔
660
  int32_t trigOReaderNum = taosArrayGetSize(p->trigOReaders);
227✔
661
  int32_t calcReaderNum = MST_LIST_SIZE(p->calcReaders);
227!
662
  int32_t triggerNum = p->triggerTask ? 1 : 0;
227✔
663
  int32_t runnerNum = 0;
227✔
664

665
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
905✔
666
    runnerNum += taosArrayGetSize(p->runners[i]);
678✔
667
  }
668

669
  mstsDebug("%s: stream status", tips);
227!
670
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
227!
671
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
672
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
673
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
674

675
  SStmTaskStatus* pTask = NULL;
227✔
676
  for (int32_t i = 0; i < trigReaderNum; ++i) {
555✔
677
    pTask = taosArrayGet(p->trigReaders, i);
328✔
678
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
328✔
679
  }
680

681
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
227!
682
    pTask = taosArrayGet(p->trigOReaders, i);
×
683
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
684
  }
685

686
  SListNode* pNode = listHead(p->calcReaders);
227✔
687
  for (int32_t i = 0; i < calcReaderNum; ++i) {
445✔
688
    pTask = (SStmTaskStatus*)pNode->data;
218✔
689
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
218✔
690
    pNode = TD_DLIST_NODE_NEXT(pNode);
218✔
691
  }
692

693
  if (triggerNum > 0) {
227!
694
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
227✔
695
  }
696

697
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
905✔
698
    int32_t num = taosArrayGetSize(p->runners[i]);
678✔
699
    if (num <= 0) {
678!
700
      continue;
×
701
    }
702
    
703
    mstsDebug("the %dth deploy runners status", i);
678!
704
    for (int32_t m = 0; m < num; ++m) {
1,422✔
705
      pTask = taosArrayGet(p->runners[i], m);
744✔
706
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
744✔
707
    }
708
  }
709
      
710
}
711

712
bool mstEventPassIsolation(int32_t num, int32_t event) {
9,287✔
713
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_SHORT_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
9,287✔
714
  if (ret) {
9,287✔
715
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
6,850✔
716
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
717
  }
718

719
  return ret;
9,287✔
720
}
721

722
bool mstEventHandledChkSet(int32_t event) {
6,850✔
723
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
6,850✔
724
    mstDebug("event %s set handled", gMndStreamEvent[event]);
420✔
725
    return true;
420✔
726
  }
727
  return false;
6,430✔
728
}
729

730
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
1,866✔
731
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
1,866✔
732
  if (0 == active || MND_STM_STATE_NORMAL != state) {
1,866!
733
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
×
734
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
×
735
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
×
736
    return TSDB_CODE_SUCCESS;
×
737
  }
738

739
  if (atomic_load_8(&pStream->userDropped)) {
1,866!
740
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
741
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
742
    return TSDB_CODE_SUCCESS;
×
743
  }
744

745
  if (atomic_load_8(&pStream->userStopped)) {
1,866✔
746
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
26✔
747
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
26✔
748
    return TSDB_CODE_SUCCESS;
26✔
749
  }
750

751
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
1,840✔
752
  
753
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
1,840✔
754
  if (NULL == pStatus) {
1,840✔
755
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
308✔
756
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
308✔
757
    goto _exit;
308✔
758
  }
759

760
  char tmpBuf[256];
761
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1,532✔
762
  switch (stopped) {
1,532!
763
    case 1:
72✔
764
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
72✔
765
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
72✔
766
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
72✔
767
      goto _exit;
72✔
768
      break;
769
    case 4:
×
770
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
771
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
772
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
773
      goto _exit;
×
774
      break;
775
    default:
1,460✔
776
      break;
1,460✔
777
  }
778

779
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
1,460✔
780
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
869✔
781
    strcpy(tmpBuf, "Running start from: ");
869✔
782
    (void)formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
869✔
783
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
869✔
784
    goto _exit;
869✔
785
  }
786

787
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
591✔
788
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
591✔
789
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
591✔
790
  goto _exit;
591✔
791

792
_exit:
1,840✔
793
  
794
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1,840✔
795

796
  return TSDB_CODE_SUCCESS;
1,840✔
797
}
798

799
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
1,866✔
800
  int32_t code = 0;
1,866✔
801
  int32_t cols = 0;
1,866✔
802
  int32_t lino = 0;
1,866✔
803

804
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,866✔
805
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
1,866✔
806
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
807
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
808

809
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
1,866✔
810
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
811

812
  // db_name
813
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,866✔
814
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
1,866✔
815
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
816
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
817
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
1,866✔
818
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
819

820
  // create time
821
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
822
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
823
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
1,866✔
824
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
825

826
  // stream id
827
  char streamId2[19] = {0};
1,866✔
828
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
1,866✔
829
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
1,866✔
830
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
1,866✔
831
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
832
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
833
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
1,866✔
834
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
835

836
  // sql
837
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
1,866✔
838
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
1,866✔
839
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
840
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
841
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
1,866✔
842
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
843

844
  // status
845
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
1,866✔
846
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,866✔
847
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
1,866✔
848
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
849

850
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
851
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
852
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
1,866✔
853
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
854

855
  // snodeLeader
856
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
857
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
858
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
1,866✔
859
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
860

861
  // snodeReplica
862
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
863
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
864
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
1,866✔
865
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
1,866!
866
  mndReleaseSnode(pMnode, pSnode);
1,866✔
867
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
1,866✔
868
  TSDB_CHECK_CODE(code, lino, _end);
1,866!
869

870
  // msg
871
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,866✔
872
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,866!
873
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
1,866✔
874

875
_end:
1,866✔
876
  if (code) {
1,866!
877
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
878
  }
879
  return code;
1,866✔
880
}
881

882

883
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
30,289✔
884
  char tmpBuf[256];
885
  
886
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
30,289✔
887
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
30,289!
888
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
1✔
889
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
1✔
890
    return TSDB_CODE_SUCCESS;
1✔
891
  }
892

893
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
30,288!
894
    if (pTask->detailStatus) {
4,486✔
895
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
3,281✔
896
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
3,281✔
897
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
898
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
3,281✔
899
      taosRUnLockLatch(&pTask->detailStatusLock);
3,281✔
900
      return TSDB_CODE_SUCCESS;
3,281✔
901
    }
902

903
    taosRUnLockLatch(&pTask->detailStatusLock);    
1,205✔
904
  }
905
  
906
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
27,007✔
907
  
908
  return TSDB_CODE_SUCCESS;
27,007✔
909
}
910

911
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
30,289✔
912
  switch (pTask->type) {
30,289✔
913
    case STREAM_READER_TASK:
11,908✔
914
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
11,908✔
915
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
5,829✔
916
      } else {
917
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
6,079✔
918
      }
919
      return TSDB_CODE_SUCCESS;
11,908✔
920
    case STREAM_RUNNER_TASK:
13,894✔
921
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
13,894✔
922
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
13,246✔
923
        return TSDB_CODE_SUCCESS;
13,246✔
924
      }
925
      break;
648✔
926
    default:
4,487✔
927
      break;
4,487✔
928
  }
929

930
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
5,135✔
931
  return TSDB_CODE_SUCCESS;
5,135✔
932
}
933

934

935
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
30,289✔
936
  int32_t code = 0;
30,289✔
937
  int32_t cols = 0;
30,289✔
938
  int32_t lino = 0;
30,289✔
939

940
  // stream_name
941
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
30,289✔
942
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
30,289✔
943
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
944
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
945

946
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
30,289✔
947
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
948

949
  // stream id
950
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
30,289✔
951
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
30,289✔
952
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
30,289✔
953
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
954
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
955
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
30,289✔
956
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
957

958
  // task id
959
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
30,289✔
960
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
30,289✔
961
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
962
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
963
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
30,289✔
964
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
965

966
  // type
967
  char type[20 + VARSTR_HEADER_SIZE] = {0};
30,289✔
968
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
30,289✔
969
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
970
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
971
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
30,289✔
972
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
973

974
  // serious id
975
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
30,289✔
976
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
30,289✔
977
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
978
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
979
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
30,289✔
980
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
981

982
  // deploy id
983
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
984
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
985
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
30,289✔
986
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
987

988
  // node_type
989
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
30,289✔
990
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
30,289✔
991
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
992
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
993
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
30,289✔
994
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
995

996
  // node id
997
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
998
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
999
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
30,289✔
1000
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
1001

1002
  // task idx
1003
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
1004
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
1005
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
30,289✔
1006
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
1007

1008
  // status
1009
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
30,289✔
1010
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
30,289✔
1011
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
30,289✔
1012
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
1013

1014
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
1015
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
1016
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
30,289✔
1017
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
1018

1019
  // start time
1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
1021
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
1022
  if (pTask->runningStartTs) {
30,289✔
1023
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
4,381✔
1024
  } else {
1025
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
25,908✔
1026
  }
1027
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
1028

1029
  // last update
1030
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
1031
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
1032
  if (pTask->lastUpTs) {
30,289!
1033
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
30,289✔
1034
  } else {
1035
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1036
  }
1037
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
1038

1039
  // extra info
1040
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
30,289✔
1041
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
30,289✔
1042
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
1043
  
1044
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
1045
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
1046
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
30,289✔
1047
  TSDB_CHECK_CODE(code, lino, _end);
30,289!
1048

1049
  // msg
1050
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
30,289✔
1051
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
30,289!
1052
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
30,289✔
1053

1054
_end:
30,289✔
1055
  if (code) {
30,289!
1056
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1057
  }
1058
  return code;
30,289✔
1059
}
1060

1061
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
4,490✔
1062
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + taosArrayGetSize(pStatus->trigOReaders) + MST_LIST_SIZE(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
4,490!
1063
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
17,960✔
1064
    num += taosArrayGetSize(pStatus->runners[i]);
13,470✔
1065
  }
1066

1067
  return num;
4,490✔
1068
}
1069

1070
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
4,925✔
1071
  int32_t code = 0;
4,925✔
1072
  int32_t lino = 0;
4,925✔
1073
  int64_t streamId = pStream->pCreate->streamId;
4,925✔
1074

1075
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
4,925✔
1076

1077
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
4,925✔
1078
  if (NULL == pStatus) {
4,925✔
1079
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
434✔
1080
    goto _exit;
434✔
1081
  }
1082

1083
  int8_t stopped = atomic_load_8(&pStatus->stopped);
4,491✔
1084
  if (stopped) {
4,491✔
1085
    mstsDebug("stream stopped %d, ignore it", stopped);
1!
1086
    goto _exit;
1✔
1087
  }
1088
  
1089
  int32_t count = mstGetNumOfStreamTasks(pStatus);
4,490✔
1090

1091
  if (*numOfRows + count > rowsCapacity) {
4,490✔
1092
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
138✔
1093
    if (code) {
138!
1094
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1095
      TAOS_CHECK_EXIT(code);
×
1096
    }
1097
  }
1098

1099
  SStmTaskStatus* pTask = NULL;
4,490✔
1100
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
4,490✔
1101
  for (int32_t i = 0; i < trigReaderNum; ++i) {
9,512✔
1102
    pTask = taosArrayGet(pStatus->trigReaders, i);
5,022✔
1103
  
1104
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
5,022✔
1105
    if (code == TSDB_CODE_SUCCESS) {
5,022!
1106
      (*numOfRows)++;
5,022✔
1107
    }
1108
  }
1109

1110
  trigReaderNum = taosArrayGetSize(pStatus->trigOReaders);
4,490✔
1111
  for (int32_t i = 0; i < trigReaderNum; ++i) {
5,297✔
1112
    pTask = taosArrayGet(pStatus->trigOReaders, i);
807✔
1113
  
1114
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
807✔
1115
    if (code == TSDB_CODE_SUCCESS) {
807!
1116
      (*numOfRows)++;
807✔
1117
    }
1118
  }
1119

1120

1121
  int32_t calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
4,490!
1122
  SListNode* pNode = listHead(pStatus->calcReaders);
4,490✔
1123
  for (int32_t i = 0; i < calcReaderNum; ++i) {
10,569✔
1124
    pTask = (SStmTaskStatus*)pNode->data;
6,079✔
1125
  
1126
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
6,079✔
1127
    if (code == TSDB_CODE_SUCCESS) {
6,079!
1128
      (*numOfRows)++;
6,079✔
1129
    }
1130
    pNode = TD_DLIST_NODE_NEXT(pNode);
6,079✔
1131
  }
1132

1133
  if (pStatus->triggerTask) {
4,490✔
1134
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
4,487✔
1135
    if (code == TSDB_CODE_SUCCESS) {
4,487!
1136
      (*numOfRows)++;
4,487✔
1137
    }
1138
  }
1139

1140
  int32_t runnerNum = 0;
4,490✔
1141
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
17,960✔
1142
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
13,470✔
1143
    for (int32_t m = 0; m < runnerNum; ++m) {
27,364✔
1144
      pTask = taosArrayGet(pStatus->runners[i], m);
13,894✔
1145
    
1146
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
13,894✔
1147
      if (code == TSDB_CODE_SUCCESS) {
13,894!
1148
        (*numOfRows)++;
13,894✔
1149
      }
1150
    }
1151
  }
1152
  
1153
  pBlock->info.rows = *numOfRows;
4,490✔
1154

1155
_exit:
4,925✔
1156
  
1157
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
4,925✔
1158

1159
  if (code) {
4,925!
1160
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1161
  }
1162
  
1163
  return code;
4,925✔
1164
}
1165

1166

1167
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
17✔
1168
  int32_t code = 0;
17✔
1169
  int32_t lino = 0;
17✔
1170
  bool    locked = false;
17✔
1171
  SArray* userRecalcList = NULL;
17✔
1172

1173
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
17✔
1174
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
17!
1175
  
1176
  taosWLockLatch(&pStream->userRecalcLock);
17✔
1177
  locked = true;
17✔
1178
  
1179
  if (NULL == pStream->userRecalcList) {
17!
1180
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
17✔
1181
    if (NULL == userRecalcList) {
17!
1182
      TAOS_CHECK_EXIT(terrno);
×
1183
    }
1184

1185
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
17!
1186

1187
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
17✔
1188
    userRecalcList = NULL;    
17✔
1189
  } else {
1190
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1191
  }
1192
  
1193
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
17!
1194

1195
_exit:
×
1196

1197
  taosArrayDestroy(userRecalcList);
17✔
1198

1199
  if (locked) {
17!
1200
    taosWUnLockLatch(&pStream->userRecalcLock);
17✔
1201
  }
1202
  
1203
  if (code) {
17!
1204
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1205
  }
1206
  
1207
  return code;
17✔
1208
}
1209

1210

1211

1212
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1213
  int32_t code = 0;
×
1214
  int32_t cols = 0;
×
1215
  int32_t lino = 0;
×
1216

1217
  // stream_name
1218
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1219
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1220
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1221
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1222

1223
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1224
  TSDB_CHECK_CODE(code, lino, _end);
×
1225

1226
  // stream id
1227
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1228
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1229
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1230
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1231
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1232
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1233
  TSDB_CHECK_CODE(code, lino, _end);
×
1234

1235
  // recalc id
1236
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1237
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1238
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1239
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1240
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1241
  TSDB_CHECK_CODE(code, lino, _end);
×
1242

1243
  // start
1244
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1245
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1246
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1247
  TSDB_CHECK_CODE(code, lino, _end);
×
1248

1249
  // end
1250
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1251
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1252
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1253
  TSDB_CHECK_CODE(code, lino, _end);
×
1254

1255
  // progress
1256
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1257
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1258
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1259
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1260
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1261
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1262
  TSDB_CHECK_CODE(code, lino, _end);
×
1263

1264
_end:
×
1265
  if (code) {
×
1266
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1267
  }
1268
  return code;
×
1269
}
1270

1271

1272
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
1✔
1273
  int32_t code = 0;
1✔
1274
  int32_t lino = 0;
1✔
1275
  int64_t streamId = pStream->pCreate->streamId;
1✔
1276

1277
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
1✔
1278

1279
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1✔
1280
  if (NULL == pStatus) {
1!
1281
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1282
    goto _exit;
×
1283
  }
1284

1285
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1✔
1286
  if (stopped) {
1!
1287
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1288
    goto _exit;
×
1289
  }
1290

1291
  if (NULL == pStatus->triggerTask) {
1!
1292
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1293
    goto _exit;
×
1294
  }
1295

1296
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
1✔
1297
  if (NULL == pStatus->triggerTask->detailStatus) {
1!
1298
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1299
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1300
    goto _exit;
×
1301
  }
1302

1303
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
1✔
1304
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
1✔
1305

1306
  if (*numOfRows + count > rowsCapacity) {
1!
1307
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1308
    if (code) {
×
1309
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1310
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1311
      TAOS_CHECK_EXIT(code);
×
1312
    }
1313
  }
1314

1315
  for (int32_t i = 0; i < count; ++i) {
1!
1316
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1317
  
1318
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1319
    if (code == TSDB_CODE_SUCCESS) {
×
1320
      (*numOfRows)++;
×
1321
    }
1322
  }
1323

1324
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
1✔
1325
  
1326
  pBlock->info.rows = *numOfRows;
1✔
1327

1328
_exit:
1✔
1329
  
1330
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1✔
1331

1332
  if (code) {
1!
1333
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1334
  }
1335
  
1336
  return code;
1✔
1337
}
1338

1339
int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) {
505✔
1340
  SSubplan* pSubplan = NULL;
505✔
1341
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
505✔
1342
  
1343
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
505!
1344

1345
  if (pSubplan->pNode && nodeType(pSubplan->pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
505!
1346
    SScanPhysiNode* pScanNode = (SScanPhysiNode*)pSubplan->pNode;
217✔
1347
    *uid = pScanNode->uid;
217✔
1348
  }
1349
  
1350
_exit:
288✔
1351

1352
  if (code) {
505!
1353
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1354
  }
1355

1356
  nodesDestroyNode((SNode *)pSubplan);
505✔
1357

1358
  return code;
505✔
1359
}
1360

1361

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc