• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4774

01 Oct 2025 04:06AM UTC coverage: 58.357% (-0.3%) from 58.689%
#4774

push

travis-ci

web-flow
Merge pull request #33171 from taosdata/merge/3.3.6tomain

merge: from 3.3.6 to main branch

138553 of 302743 branches covered (45.77%)

Branch coverage included in aggregate %.

15 of 20 new or added lines in 2 files covered. (75.0%)

2558 existing lines in 138 files now uncovered.

209925 of 294403 relevant lines covered (71.31%)

5595496.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.86
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
52,894✔
26
  if (readLock) {
52,894✔
27
    while (taosRTryLockLatch(pLock)) {
48,307✔
28
      taosMsleep(1);
57✔
29
    }
30

31
    return true;
48,250✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
4,644✔
35

36
  return true;
4,644✔
37
}
38

39
void mstDestroySStmVgStreamStatus(void* p) { 
662✔
40
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
662✔
41
  taosArrayDestroy(pStatus->trigReaders); 
662✔
42
  taosArrayDestroy(pStatus->calcReaders); 
662✔
43
}
662✔
44

45
void mstDestroySStmSnodeStreamStatus(void* p) { 
420✔
46
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
420✔
47
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,680✔
48
    taosArrayDestroy(pStatus->runners[i]);
1,260✔
49
    pStatus->runners[i] = NULL;
1,260✔
50
  }
51
}
420✔
52

53

54
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
55
  taosHashCleanup(pVgStatus->streamTasks);
×
56
  pVgStatus->streamTasks = NULL;
×
57
}
×
58

59
void mstDestroySStmTaskToDeployExt(void* param) {
2,571✔
60
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
2,571✔
61
  if (pExt->deployed) {
2,571✔
62
    return;
2,553✔
63
  }
64
  
65
  switch (pExt->deploy.task.type) {
18✔
66
    case STREAM_TRIGGER_TASK:
3✔
67
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
3✔
68
      pExt->deploy.msg.trigger.readerList = NULL;
3✔
69
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
3✔
70
      pExt->deploy.msg.trigger.runnerList = NULL;
3✔
71
      break;
3✔
72
    case STREAM_RUNNER_TASK:
9✔
73
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
9!
74
      break;
9✔
75
    default:  
6✔
76
      break;;
6✔
77
  }
78
}
79

80
void mstDestroyScanAddrList(void* param) {
523✔
81
  if (NULL == param) {
523!
82
    return;
×
83
  }
84
  SArray* pList = *(SArray**)param;
523✔
85
  taosArrayDestroy(pList);
523✔
86
}
87

88
void mstDestroySStmSnodeTasksDeploy(void* param) {
124✔
89
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
124✔
90
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
124✔
91
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
124✔
92
}
124✔
93

94
void mstDestroySStmVgTasksToDeploy(void* param) {
294✔
95
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
294✔
96
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
294✔
97
}
294✔
98

99
void mstDestroySStmSnodeStatus(void* param) {
102✔
100
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
102✔
101
  taosHashCleanup(pSnode->streamTasks);
102✔
102
}
102✔
103

104
void mstDestroySStmVgroupStatus(void* param) {
234✔
105
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
234✔
106
  taosHashCleanup(pVg->streamTasks);
234✔
107
}
234✔
108

109
void mstResetSStmStatus(SStmStatus* pStatus) {
392✔
110
  taosArrayDestroy(pStatus->trigReaders);
392✔
111
  pStatus->trigReaders = NULL;
392✔
112
  taosArrayDestroy(pStatus->trigOReaders);
392✔
113
  pStatus->trigOReaders = NULL;
392✔
114
  pStatus->calcReaders = tdListFree(pStatus->calcReaders);
392✔
115
  if (pStatus->triggerTask) {
392!
116
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
392✔
117
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
392!
118
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
392✔
119
  }
120
  taosMemoryFreeClear(pStatus->triggerTask);
392!
121
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,568✔
122
    taosArrayDestroy(pStatus->runners[i]);
1,176✔
123
    pStatus->runners[i] = NULL;
1,176✔
124
  }
125
}
392✔
126

127
void mstDestroySStmStatus(void* param) {
371✔
128
  SStmStatus* pStatus = (SStmStatus*)param;
371✔
129
  taosMemoryFreeClear(pStatus->streamName);
371!
130

131
  mstResetSStmStatus(pStatus);
371✔
132

133
  taosWLockLatch(&pStatus->userRecalcLock);
371✔
134
  taosArrayDestroy(pStatus->userRecalcList);
371✔
135
  taosWUnLockLatch(&pStatus->userRecalcLock);
371✔
136

137
  tFreeSCMCreateStreamReq(pStatus->pCreate);
371✔
138
  taosMemoryFreeClear(pStatus->pCreate);  
371!
139
}
371✔
140

141
void mstDestroySStmAction(void* param) {
529✔
142
  SStmAction* pAction = (SStmAction*)param;
529✔
143

144
  taosArrayDestroy(pAction->undeploy.taskList);
529✔
145
  taosArrayDestroy(pAction->recalc.recalcList);
529✔
146
}
529✔
147

148
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
447✔
149
  pDeploy->readerTasks = NULL;
447✔
150
  pDeploy->triggerTask = NULL;
447✔
151
  pDeploy->runnerTasks = NULL;
447✔
152
}
447✔
153

154
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
220✔
155
  SSdb   *pSdb = pMnode->pSdb;
220✔
156
  void   *pIter = NULL;
220✔
157
  
158
  while (1) {
711✔
159
    SStreamObj *pStream = NULL;
931✔
160
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
931✔
161
    if (pIter == NULL) break;
931✔
162

163
    if (pStream->pCreate->streamId == streamId) {
817✔
164
      *dropped = pStream->userDropped ? true : false;
106✔
165
      sdbRelease(pSdb, pStream);
106✔
166
      sdbCancelFetch(pSdb, pIter);
106✔
167
      mstsDebug("stream found, dropped:%d", *dropped);
106✔
168
      return TSDB_CODE_SUCCESS;
106✔
169
    }
170
    
171
    sdbRelease(pSdb, pStream);
711✔
172
  }
173

174
  *dropped = true;
114✔
175

176
  return TSDB_CODE_SUCCESS;
114✔
177
}
178

179
typedef struct SStmCheckDbInUseCtx {
180
  bool* dbStream;
181
  bool* vtableStream;
182
  bool  ignoreCurrDb;
183
} SStmCheckDbInUseCtx;
184

185
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
195✔
186
  SStreamObj *pStream = pObj;
195✔
187
  if (atomic_load_8(&pStream->userDropped)) {
195!
188
    return true;
×
189
  }
190

191
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
195✔
192
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
195✔
193
    return true;
22✔
194
  }
195
  
196
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
173!
197
    *pCtx->dbStream = true;
4✔
198
    return false;
4✔
199
  }
200

201
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
169✔
202
  for (int32_t i = 0; i < calcDBNum; ++i) {
335✔
203
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
169✔
204
    if (0 == strcmp(calcDB, p1)) {
169✔
205
      *pCtx->dbStream = true;
3✔
206
      return false;
3✔
207
    }
208
  }
209

210
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
166✔
211
    *pCtx->vtableStream = true;
44✔
212
    return true;
44✔
213
  }
214
  
215
  return true;
122✔
216
}
217

218
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
1,995✔
219
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
1,995✔
220
  if (streamNum <= 0) {
1,995✔
221
    return;
1,952✔
222
  }
223

224
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
43✔
225
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
43✔
226
}
227

228
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
229
  if (status == STREAM_STATUS_INIT) {
×
230
    tstrncpy(dst, "init", bufLen);
×
231
  } else if (status == STREAM_STATUS_RUNNING) {
×
232
    tstrncpy(dst, "running", bufLen);
×
233
  } else if (status == STREAM_STATUS_STOPPED) {
×
234
    tstrncpy(dst, "stopped", bufLen);
×
235
  } else if (status == STREAM_STATUS_FAILED) {
×
236
    tstrncpy(dst, "failed", bufLen);
×
237
  }
238
}
×
239

240
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
241
  SSdb      *pSdb = pMnode->pSdb;
×
242
  void      *pIter = NULL;
×
243
  SSnodeObj *pObj = NULL;
×
244

245
  while (1) {
246
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
247
    if (pIter == NULL) {
×
248
      break;
×
249
    }
250

251
    sdbRelease(pSdb, pObj);
×
252
    sdbCancelFetch(pSdb, pIter);
×
253
    return TSDB_CODE_SUCCESS;
×
254
  }
255

256
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
257
}
258

259
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
260
  pTask->id.taskId = pMsg->taskId;
×
261
  pTask->id.deployId = pMsg->deployId;
×
262
  pTask->id.seriousId = pMsg->seriousId;
×
263
  pTask->id.nodeId = pMsg->nodeId;
×
264
  pTask->id.taskIdx = pMsg->taskIdx;
×
265

266
  pTask->type = pMsg->type;
×
267
  pTask->flags = pMsg->flags;
×
268
  pTask->status = pMsg->status;
×
269
  pTask->lastUpTs = pCtx->currTs;
×
270
}
×
271

272
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
2,250✔
273
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
2,250✔
274
    return false;
1,843✔
275
  }
276

277
  SStmQNode *orig = pQueue->head;
407✔
278

279
  SStmQNode *node = pQueue->head->next;
407✔
280
  pQueue->head = pQueue->head->next;
407✔
281

282
  *param = node;
407✔
283

284
  taosMemoryFreeClear(orig);
407!
285

286
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
407✔
287

288
  return true;
407✔
289
}
290

291
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
407✔
292
  taosWLockLatch(&pQueue->lock);
407✔
293
  pQueue->tail->next = param;
407✔
294
  pQueue->tail = param;
407✔
295
  taosWUnLockLatch(&pQueue->lock);
407✔
296

297
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
407✔
298
}
407✔
299

300
char* mstGetStreamActionString(int32_t action) {
276✔
301
  switch (action) {
276!
302
    case STREAM_ACT_DEPLOY:
276✔
303
      return "DEPLOY";
276✔
304
    case STREAM_ACT_UNDEPLOY:
×
305
      return "UNDEPLOY";
×
306
    case STREAM_ACT_START:
×
307
      return "START";
×
308
    case STREAM_ACT_UPDATE_TRIGGER:
×
309
      return "UPDATE TRIGGER";
×
310
    case STREAM_ACT_RECALC:
×
311
      return "USER RECALC";
×
312
    default:
×
313
      break;
×
314
  }
315

316
  return "UNKNOWN";
×
317
}
318

319
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
398✔
320
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
398!
321
  if (NULL == pNode) {
398!
322
    taosMemoryFreeClear(param);
×
323
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
324
    return;
×
325
  }
326

327
  pNode->type = action;
398✔
328
  pNode->streamAct = true;
398✔
329
  pNode->action.stream.streamId = streamId;
398✔
330
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
398✔
331
  pNode->action.stream.userAction = userAction;
398✔
332
  pNode->action.stream.actionParam = param;
398✔
333
  
334
  pNode->next = NULL;
398✔
335

336
  mndStreamActionEnqueue(actionQ, pNode);
398✔
337

338
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
398✔
339
}
340

341
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
9✔
342
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
9!
343
  if (NULL == pNode) {
9!
344
    int64_t streamId = pAction->streamId;
×
345
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
346
    return;
×
347
  }
348

349
  pNode->type = action;
9✔
350
  pNode->streamAct = false;
9✔
351
  pNode->action.task = *pAction;
9✔
352
  
353
  pNode->next = NULL;
9✔
354

355
  mndStreamActionEnqueue(actionQ, pNode);
9✔
356
}
357

358
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
50✔
359
  int32_t iter = 0;
50✔
360
  SDBVgHashInfo* pVg = NULL;
50✔
361
  void* p = NULL;
50✔
362
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
411✔
363
    pVg = (SDBVgHashInfo*)p;
361✔
364
    taosArrayDestroy(pVg->vgArray);
361✔
365
  }
366
  
367
  tSimpleHashCleanup(pDbVgs);
50✔
368
}
50✔
369

370

371
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
50✔
372
  void*   pIter = NULL;
50✔
373
  int32_t code = TSDB_CODE_SUCCESS;
50✔
374
  int32_t lino = 0;
50✔
375
  SArray* pTarget = NULL;
50✔
376
  SArray* pNew = NULL;
50✔
377
  SDbObj* pDb = NULL;
50✔
378
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
50✔
379
  SVgObj* pVgroup = NULL;
50✔
380

381
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
50✔
382
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
50!
383

384
  while (1) {
386✔
385
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
436✔
386
    if (pIter == NULL) {
436✔
387
      break;
50✔
388
    }
389

390
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
386✔
391
    if (NULL == pDbInfo) {
386✔
392
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
361✔
393
      if (NULL == pNew) {
361!
394
        sdbRelease(pMnode->pSdb, pVgroup);
×
395
        sdbCancelFetch(pMnode->pSdb, pIter);
×
396
        pVgroup = NULL;
×
397
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
398
      }
399
      
400
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
361✔
401
      if (NULL == pDb) {
361!
402
        sdbRelease(pMnode->pSdb, pVgroup);
×
403
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
404
        pVgroup = NULL;
×
405
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
406
      }
407
      dbInfo.vgSorted = false;
361✔
408
      dbInfo.hashMethod = pDb->cfg.hashMethod;
361✔
409
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
361✔
410
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
361✔
411
      dbInfo.vgArray = pNew;
361✔
412
      
413
      mndReleaseDb(pMnode, pDb);
361✔
414

415
      pTarget = pNew;
361✔
416
    } else {
417
      pTarget = pDbInfo->vgArray;
25✔
418
    }
419

420
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
386✔
421
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
386!
422
      sdbRelease(pMnode->pSdb, pVgroup);
×
423
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
424
      pVgroup = NULL;
×
425
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
426
    }
427

428
    if (NULL == pDbInfo) {
386✔
429
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
361✔
430
      if (code) {
361!
431
        sdbRelease(pMnode->pSdb, pVgroup);
×
432
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
433
        pVgroup = NULL;
×
434
        TAOS_CHECK_EXIT(code);
×
435
      }
436
      pNew = NULL;
361✔
437
    }
438

439
    sdbRelease(pMnode->pSdb, pVgroup);
386✔
440
    pVgroup = NULL;
386✔
441
  }
442

443
  *ppRes = pDbVgroup;
50✔
444
  
445
_exit:
50✔
446

447
  taosArrayDestroy(pNew);
50✔
448

449
  if (code) {
50!
450
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
451
  }
452

453
  return code;
50✔
454
}
455

456
int mstDbVgInfoComp(const void* lp, const void* rp) {
13✔
457
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
13✔
458
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
13✔
459
  if (pLeft->hashBegin < pRight->hashBegin) {
13!
460
    return -1;
13✔
461
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
462
    return 1;
×
463
  }
464

465
  return 0;
×
466
}
467

468
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
109✔
469
  uint32_t*    key = (uint32_t*)lp;
109✔
470
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
109✔
471

472
  if (*key < pVg->hashBegin) {
109!
473
    return -1;
×
474
  } else if (*key > pVg->hashEnd) {
109✔
475
    return 1;
7✔
476
  }
477

478
  return 0;
102✔
479
}
480

481

482
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
102✔
483
  int32_t code = 0;
102✔
484
  int32_t lino = 0;
102✔
485
  SVgroupInfo* vgInfo = NULL;
102✔
486
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
487

488
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
102✔
489
  if (NULL == dbInfo) {
102!
490
    mstError("db %s does not exist", dbFName);
×
491
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
492
  }
493
  
494
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
102✔
495
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
204✔
496
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
102!
497

498
  if (!dbInfo->vgSorted) {
102✔
499
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
50✔
500
    dbInfo->vgSorted = true;
50✔
501
  }
502

503
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
102✔
504
  if (NULL == vgInfo) {
102!
505
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
506
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
507
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
508
  }
509

510
  *vgId = vgInfo->vgId;
102✔
511

512
_exit:
102✔
513

514
  if (code) {
102!
515
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
516
  }
517

518
  return code;
102✔
519
}
520

521

522
void mstLogSStreamObj(char* tips, SStreamObj* p) {
370✔
523
  if (!(stDebugFlag & DEBUG_DEBUG)) {
370✔
524
    return;
118✔
525
  }
526
  
527
  if (NULL == p) {
252!
528
    mstDebug("%s: stream is NULL", tips);
×
529
    return;
×
530
  }
531

532
  mstDebug("%s: stream obj", tips);
252!
533
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
252!
534
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
535

536
  SCMCreateStreamReq* q = p->pCreate;
252✔
537
  if (NULL == q) {
252!
538
    mstDebug("stream pCreate is NULL");
×
539
    return;
×
540
  }
541

542
  int64_t streamId = q->streamId;
252✔
543
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
252✔
544
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
252✔
545
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
252✔
546
  int32_t outColNum = taosArrayGetSize(q->outCols);
252✔
547
  int32_t outTagNum = taosArrayGetSize(q->outTags);
252✔
548
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
252✔
549

550
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
252!
551
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
552
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d "
553
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
554
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
555
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
556
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
557
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
558
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
559
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory,
560
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
561
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
562
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
563
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
564

565
  switch (q->triggerType) {
252!
566
    case WINDOW_TYPE_INTERVAL: {
140✔
567
      SSlidingTrigger* t = &q->trigger.sliding;
140✔
568
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
140!
569
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
570
      break;
140✔
571
    }  
572
    case WINDOW_TYPE_SESSION: {
10✔
573
      SSessionTrigger* t = &q->trigger.session;
10✔
574
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
10!
575
      break;
10✔
576
    }
577
    case WINDOW_TYPE_STATE: {
58✔
578
      SStateWinTrigger* t = &q->trigger.stateWin;
58✔
579
      mstsDebug("state trigger options, slotId:%d, expr:%s, extend:%d, trueForDuration:%" PRId64, t->slotId, (char *)t->expr, t->extend, t->trueForDuration);
58!
580
      break;
58✔
581
    }
582
    case WINDOW_TYPE_EVENT:{
13✔
583
      SEventTrigger* t = &q->trigger.event;
13✔
584
      mstsDebug("event trigger options, startCond:%s, endCond:%s, trueForDuration:%" PRId64, (char*)t->startCond, (char*)t->endCond, t->trueForDuration);
13!
585
      break;
13✔
586
    }
587
    case WINDOW_TYPE_COUNT: {
22✔
588
      SCountTrigger* t = &q->trigger.count;
22✔
589
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
22!
590
      break;
22✔
591
    }
592
    case WINDOW_TYPE_PERIOD: {
9✔
593
      SPeriodTrigger* t = &q->trigger.period;
9✔
594
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
9!
595
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
596
      break;
9✔
597
    }
598
    default:
×
599
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
600
      break;
×
601
  }
602

603
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
252!
604

605
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
252!
606

607
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
252!
608

609
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
252!
610

611
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
252!
612

613
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
252!
614

615

616
  for (int32_t i = 0; i < calcDBNum; ++i) {
503✔
617
    char* dbName = taosArrayGetP(q->calcDB, i);
251✔
618
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
251!
619
  }
620

621
  for (int32_t i = 0; i < calcScanNum; ++i) {
601✔
622
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
349✔
623
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
349✔
624
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
349!
625
    for (int32_t v = 0; v < vgNum; ++v) {
698✔
626
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
349!
627
    }
628
  }
629

630
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
256✔
631
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
4✔
632
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
4!
633
  }
634

635
  for (int32_t i = 0; i < outColNum; ++i) {
1,374✔
636
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
1,122✔
637
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
1,122!
638
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
639
  }
640
      
641
}
642

643
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
1,854✔
644
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
1,854!
645
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
646
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
647
}
1,854✔
648

649
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
392✔
650
  if (!(stDebugFlag & DEBUG_DEBUG)) {
392✔
651
    return;
122✔
652
  }
653
  
654
  if (NULL == p) {
270!
655
    mstsDebug("%s: stream status is NULL", tips);
×
656
    return;
×
657
  }
658

659
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
270✔
660
  int32_t trigOReaderNum = taosArrayGetSize(p->trigOReaders);
270✔
661
  int32_t calcReaderNum = MST_LIST_SIZE(p->calcReaders);
270!
662
  int32_t triggerNum = p->triggerTask ? 1 : 0;
270✔
663
  int32_t runnerNum = 0;
270✔
664

665
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
1,077✔
666
    runnerNum += taosArrayGetSize(p->runners[i]);
807✔
667
  }
668

669
  mstsDebug("%s: stream status", tips);
270!
670
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
270!
671
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
672
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
673
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
674

675
  SStmTaskStatus* pTask = NULL;
270✔
676
  for (int32_t i = 0; i < trigReaderNum; ++i) {
674✔
677
    pTask = taosArrayGet(p->trigReaders, i);
404✔
678
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
404✔
679
  }
680

681
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
270!
682
    pTask = taosArrayGet(p->trigOReaders, i);
×
683
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
684
  }
685

686
  SListNode* pNode = listHead(p->calcReaders);
270✔
687
  for (int32_t i = 0; i < calcReaderNum; ++i) {
577✔
688
    pTask = (SStmTaskStatus*)pNode->data;
307✔
689
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
307✔
690
    pNode = TD_DLIST_NODE_NEXT(pNode);
307✔
691
  }
692

693
  if (triggerNum > 0) {
270!
694
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
270✔
695
  }
696

697
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
1,077✔
698
    int32_t num = taosArrayGetSize(p->runners[i]);
807✔
699
    if (num <= 0) {
807!
700
      continue;
×
701
    }
702
    
703
    mstsDebug("the %dth deploy runners status", i);
807!
704
    for (int32_t m = 0; m < num; ++m) {
1,680✔
705
      pTask = taosArrayGet(p->runners[i], m);
873✔
706
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
873✔
707
    }
708
  }
709
      
710
}
711

712
bool mstEventPassIsolation(int32_t num, int32_t event) {
10,424✔
713
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_SHORT_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
10,424✔
714
  if (ret) {
10,424✔
715
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
7,659✔
716
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
717
  }
718

719
  return ret;
10,424✔
720
}
721

722
bool mstEventHandledChkSet(int32_t event) {
7,659✔
723
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
7,659✔
724
    mstDebug("event %s set handled", gMndStreamEvent[event]);
455✔
725
    return true;
455✔
726
  }
727
  return false;
7,204✔
728
}
729

730
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
2,000✔
731
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
2,000✔
732
  if (0 == active || MND_STM_STATE_NORMAL != state) {
2,000!
733
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
6!
734
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
6✔
735
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
6✔
736
    return TSDB_CODE_SUCCESS;
6✔
737
  }
738

739
  if (atomic_load_8(&pStream->userDropped)) {
1,994!
740
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
741
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
742
    return TSDB_CODE_SUCCESS;
×
743
  }
744

745
  if (atomic_load_8(&pStream->userStopped)) {
1,994✔
746
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
26✔
747
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
26✔
748
    return TSDB_CODE_SUCCESS;
26✔
749
  }
750

751
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
1,968✔
752
  
753
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
1,968✔
754
  if (NULL == pStatus) {
1,968✔
755
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
299✔
756
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
299✔
757
    goto _exit;
299✔
758
  }
759

760
  char tmpBuf[256];
761
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1,669✔
762
  switch (stopped) {
1,669!
763
    case 1:
114✔
764
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
114✔
765
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
114✔
766
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
114✔
767
      goto _exit;
114✔
768
      break;
769
    case 4:
×
770
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
771
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
772
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
773
      goto _exit;
×
774
      break;
775
    default:
1,555✔
776
      break;
1,555✔
777
  }
778

779
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
1,555✔
780
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
940✔
781
    strcpy(tmpBuf, "Running start from: ");
940✔
782
    (void)formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
940✔
783
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
940✔
784
    goto _exit;
940✔
785
  }
786

787
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
615✔
788
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
615✔
789
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
615✔
790
  goto _exit;
615✔
791

792
_exit:
1,968✔
793
  
794
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1,968✔
795

796
  return TSDB_CODE_SUCCESS;
1,968✔
797
}
798

799
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
2,000✔
800
  int32_t code = 0;
2,000✔
801
  int32_t cols = 0;
2,000✔
802
  int32_t lino = 0;
2,000✔
803

804
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2,000✔
805
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
2,000✔
806
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
807
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
808

809
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
2,000✔
810
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
811

812
  // db_name
813
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
2,000✔
814
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
2,000✔
815
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
816
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
817
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
2,000✔
818
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
819

820
  // create time
821
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
822
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
823
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
2,000✔
824
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
825

826
  // stream id
827
  char streamId2[19] = {0};
2,000✔
828
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
2,000✔
829
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
2,000✔
830
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
2,000✔
831
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
832
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
833
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
2,000✔
834
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
835

836
  // sql
837
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
2,000✔
838
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
2,000✔
839
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
840
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
841
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
2,000✔
842
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
843

844
  // status
845
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
2,000✔
846
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
2,000✔
847
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
2,000✔
848
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
849

850
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
851
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
852
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
2,000✔
853
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
854

855
  // snodeLeader
856
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
857
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
858
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
2,000✔
859
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
860

861
  // snodeReplica
862
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
863
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
864
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
2,000✔
865
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
2,000!
866
  mndReleaseSnode(pMnode, pSnode);
2,000✔
867
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
2,000✔
868
  TSDB_CHECK_CODE(code, lino, _end);
2,000!
869

870
  // msg
871
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2,000✔
872
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
2,000!
873
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
2,000✔
874

875
_end:
2,000✔
876
  if (code) {
2,000!
877
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
878
  }
879
  return code;
2,000✔
880
}
881

882

883
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
28,241✔
884
  char tmpBuf[256];
885
  
886
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
28,241✔
887
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
28,241!
UNCOV
888
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
×
UNCOV
889
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
UNCOV
890
    return TSDB_CODE_SUCCESS;
×
891
  }
892

893
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
28,241!
894
    if (pTask->detailStatus) {
4,439✔
895
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
3,225✔
896
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
3,225✔
897
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
898
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
3,225✔
899
      taosRUnLockLatch(&pTask->detailStatusLock);
3,225✔
900
      return TSDB_CODE_SUCCESS;
3,225✔
901
    }
902

903
    taosRUnLockLatch(&pTask->detailStatusLock);    
1,214✔
904
  }
905
  
906
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
25,016✔
907
  
908
  return TSDB_CODE_SUCCESS;
25,016✔
909
}
910

911
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
28,241✔
912
  switch (pTask->type) {
28,241✔
913
    case STREAM_READER_TASK:
9,849✔
914
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
9,849✔
915
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
5,395✔
916
      } else {
917
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
4,454✔
918
      }
919
      return TSDB_CODE_SUCCESS;
9,849✔
920
    case STREAM_RUNNER_TASK:
13,953✔
921
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
13,953✔
922
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
13,293✔
923
        return TSDB_CODE_SUCCESS;
13,293✔
924
      }
925
      break;
660✔
926
    default:
4,439✔
927
      break;
4,439✔
928
  }
929

930
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
5,099✔
931
  return TSDB_CODE_SUCCESS;
5,099✔
932
}
933

934

935
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
28,241✔
936
  int32_t code = 0;
28,241✔
937
  int32_t cols = 0;
28,241✔
938
  int32_t lino = 0;
28,241✔
939

940
  // stream_name
941
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
28,241✔
942
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
28,241✔
943
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
944
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
945

946
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
28,241✔
947
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
948

949
  // stream id
950
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
28,241✔
951
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
28,241✔
952
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
28,241✔
953
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
954
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
955
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
28,241✔
956
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
957

958
  // task id
959
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
28,241✔
960
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
28,241✔
961
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
962
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
963
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
28,241✔
964
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
965

966
  // type
967
  char type[20 + VARSTR_HEADER_SIZE] = {0};
28,241✔
968
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
28,241✔
969
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
970
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
971
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
28,241✔
972
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
973

974
  // serious id
975
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
28,241✔
976
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
28,241✔
977
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
978
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
979
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
28,241✔
980
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
981

982
  // deploy id
983
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
984
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
985
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
28,241✔
986
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
987

988
  // node_type
989
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
28,241✔
990
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
28,241✔
991
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
992
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
993
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
28,241✔
994
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
995

996
  // node id
997
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
998
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
999
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
28,241✔
1000
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
1001

1002
  // task idx
1003
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
1004
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
1005
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
28,241✔
1006
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
1007

1008
  // status
1009
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
28,241✔
1010
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
28,241✔
1011
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
28,241✔
1012
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
1013

1014
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
1015
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
1016
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
28,241✔
1017
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
1018

1019
  // start time
1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
1021
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
1022
  if (pTask->runningStartTs) {
28,241✔
1023
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
5,968✔
1024
  } else {
1025
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
22,273✔
1026
  }
1027
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
1028

1029
  // last update
1030
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
1031
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
1032
  if (pTask->lastUpTs) {
28,241!
1033
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
28,241✔
1034
  } else {
1035
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1036
  }
1037
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
1038

1039
  // extra info
1040
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
28,241✔
1041
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
28,241✔
1042
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
1043
  
1044
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
1045
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
1046
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
28,241✔
1047
  TSDB_CHECK_CODE(code, lino, _end);
28,241!
1048

1049
  // msg
1050
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
28,241✔
1051
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
28,241!
1052
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
28,241✔
1053

1054
_end:
28,241✔
1055
  if (code) {
28,241!
1056
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1057
  }
1058
  return code;
28,241✔
1059
}
1060

1061
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
4,443✔
1062
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + taosArrayGetSize(pStatus->trigOReaders) + MST_LIST_SIZE(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
4,443!
1063
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
17,772✔
1064
    num += taosArrayGetSize(pStatus->runners[i]);
13,329✔
1065
  }
1066

1067
  return num;
4,443✔
1068
}
1069

1070
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
4,898✔
1071
  int32_t code = 0;
4,898✔
1072
  int32_t lino = 0;
4,898✔
1073
  int64_t streamId = pStream->pCreate->streamId;
4,898✔
1074

1075
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
4,898✔
1076

1077
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
4,898✔
1078
  if (NULL == pStatus) {
4,898✔
1079
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
454✔
1080
    goto _exit;
454✔
1081
  }
1082

1083
  int8_t stopped = atomic_load_8(&pStatus->stopped);
4,444✔
1084
  if (stopped) {
4,444✔
1085
    mstsDebug("stream stopped %d, ignore it", stopped);
1!
1086
    goto _exit;
1✔
1087
  }
1088
  
1089
  int32_t count = mstGetNumOfStreamTasks(pStatus);
4,443✔
1090

1091
  if (*numOfRows + count > rowsCapacity) {
4,443✔
1092
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
81✔
1093
    if (code) {
81!
1094
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1095
      TAOS_CHECK_EXIT(code);
×
1096
    }
1097
  }
1098

1099
  SStmTaskStatus* pTask = NULL;
4,443✔
1100
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
4,443✔
1101
  for (int32_t i = 0; i < trigReaderNum; ++i) {
9,779✔
1102
    pTask = taosArrayGet(pStatus->trigReaders, i);
5,336✔
1103
  
1104
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
5,336✔
1105
    if (code == TSDB_CODE_SUCCESS) {
5,336!
1106
      (*numOfRows)++;
5,336✔
1107
    }
1108
  }
1109

1110
  trigReaderNum = taosArrayGetSize(pStatus->trigOReaders);
4,443✔
1111
  for (int32_t i = 0; i < trigReaderNum; ++i) {
4,502✔
1112
    pTask = taosArrayGet(pStatus->trigOReaders, i);
59✔
1113
  
1114
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
59✔
1115
    if (code == TSDB_CODE_SUCCESS) {
59!
1116
      (*numOfRows)++;
59✔
1117
    }
1118
  }
1119

1120

1121
  int32_t calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
4,443!
1122
  SListNode* pNode = listHead(pStatus->calcReaders);
4,443✔
1123
  for (int32_t i = 0; i < calcReaderNum; ++i) {
8,897✔
1124
    pTask = (SStmTaskStatus*)pNode->data;
4,454✔
1125
  
1126
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
4,454✔
1127
    if (code == TSDB_CODE_SUCCESS) {
4,454!
1128
      (*numOfRows)++;
4,454✔
1129
    }
1130
    pNode = TD_DLIST_NODE_NEXT(pNode);
4,454✔
1131
  }
1132

1133
  if (pStatus->triggerTask) {
4,443✔
1134
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
4,439✔
1135
    if (code == TSDB_CODE_SUCCESS) {
4,439!
1136
      (*numOfRows)++;
4,439✔
1137
    }
1138
  }
1139

1140
  int32_t runnerNum = 0;
4,443✔
1141
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
17,772✔
1142
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
13,329✔
1143
    for (int32_t m = 0; m < runnerNum; ++m) {
27,282✔
1144
      pTask = taosArrayGet(pStatus->runners[i], m);
13,953✔
1145
    
1146
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
13,953✔
1147
      if (code == TSDB_CODE_SUCCESS) {
13,953!
1148
        (*numOfRows)++;
13,953✔
1149
      }
1150
    }
1151
  }
1152
  
1153
  pBlock->info.rows = *numOfRows;
4,443✔
1154

1155
_exit:
4,898✔
1156
  
1157
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
4,898✔
1158

1159
  if (code) {
4,898!
1160
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1161
  }
1162
  
1163
  return code;
4,898✔
1164
}
1165

1166

1167
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
17✔
1168
  int32_t code = 0;
17✔
1169
  int32_t lino = 0;
17✔
1170
  bool    locked = false;
17✔
1171
  SArray* userRecalcList = NULL;
17✔
1172

1173
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
17✔
1174
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
17!
1175
  
1176
  taosWLockLatch(&pStream->userRecalcLock);
17✔
1177
  locked = true;
17✔
1178
  
1179
  if (NULL == pStream->userRecalcList) {
17!
1180
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
17✔
1181
    if (NULL == userRecalcList) {
17!
1182
      TAOS_CHECK_EXIT(terrno);
×
1183
    }
1184

1185
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
17!
1186

1187
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
17✔
1188
    userRecalcList = NULL;    
17✔
1189
  } else {
1190
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1191
  }
1192
  
1193
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
17!
1194

1195
_exit:
×
1196

1197
  taosArrayDestroy(userRecalcList);
17✔
1198

1199
  if (locked) {
17!
1200
    taosWUnLockLatch(&pStream->userRecalcLock);
17✔
1201
  }
1202
  
1203
  if (code) {
17!
1204
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1205
  }
1206
  
1207
  return code;
17✔
1208
}
1209

1210

1211

1212
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1213
  int32_t code = 0;
×
1214
  int32_t cols = 0;
×
1215
  int32_t lino = 0;
×
1216

1217
  // stream_name
1218
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1219
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1220
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1221
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1222

1223
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1224
  TSDB_CHECK_CODE(code, lino, _end);
×
1225

1226
  // stream id
1227
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1228
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1229
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1230
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1231
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1232
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1233
  TSDB_CHECK_CODE(code, lino, _end);
×
1234

1235
  // recalc id
1236
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1237
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1238
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1239
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1240
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1241
  TSDB_CHECK_CODE(code, lino, _end);
×
1242

1243
  // start
1244
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1245
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1246
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1247
  TSDB_CHECK_CODE(code, lino, _end);
×
1248

1249
  // end
1250
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1251
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1252
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1253
  TSDB_CHECK_CODE(code, lino, _end);
×
1254

1255
  // progress
1256
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1257
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1258
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1259
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1260
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1261
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1262
  TSDB_CHECK_CODE(code, lino, _end);
×
1263

1264
_end:
×
1265
  if (code) {
×
1266
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1267
  }
1268
  return code;
×
1269
}
1270

1271

1272
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
1✔
1273
  int32_t code = 0;
1✔
1274
  int32_t lino = 0;
1✔
1275
  int64_t streamId = pStream->pCreate->streamId;
1✔
1276

1277
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
1✔
1278

1279
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1✔
1280
  if (NULL == pStatus) {
1!
1281
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1282
    goto _exit;
×
1283
  }
1284

1285
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1✔
1286
  if (stopped) {
1!
1287
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1288
    goto _exit;
×
1289
  }
1290

1291
  if (NULL == pStatus->triggerTask) {
1!
1292
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1293
    goto _exit;
×
1294
  }
1295

1296
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
1✔
1297
  if (NULL == pStatus->triggerTask->detailStatus) {
1!
1298
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1299
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1300
    goto _exit;
×
1301
  }
1302

1303
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
1✔
1304
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
1✔
1305

1306
  if (*numOfRows + count > rowsCapacity) {
1!
1307
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1308
    if (code) {
×
1309
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1310
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1311
      TAOS_CHECK_EXIT(code);
×
1312
    }
1313
  }
1314

1315
  for (int32_t i = 0; i < count; ++i) {
1!
1316
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1317
  
1318
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1319
    if (code == TSDB_CODE_SUCCESS) {
×
1320
      (*numOfRows)++;
×
1321
    }
1322
  }
1323

1324
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
1✔
1325
  
1326
  pBlock->info.rows = *numOfRows;
1✔
1327

1328
_exit:
1✔
1329
  
1330
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1✔
1331

1332
  if (code) {
1!
1333
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1334
  }
1335
  
1336
  return code;
1✔
1337
}
1338

1339
int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) {
390✔
1340
  SSubplan* pSubplan = NULL;
390✔
1341
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
390✔
1342
  
1343
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
390!
1344

1345
  if (pSubplan->pNode && nodeType(pSubplan->pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
390!
1346
    SScanPhysiNode* pScanNode = (SScanPhysiNode*)pSubplan->pNode;
283✔
1347
    *uid = pScanNode->uid;
283✔
1348
  }
1349
  
1350
_exit:
107✔
1351

1352
  if (code) {
390!
1353
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1354
  }
1355

1356
  nodesDestroyNode((SNode *)pSubplan);
390✔
1357

1358
  return code;
390✔
1359
}
1360

1361

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc