• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4661

08 Aug 2025 08:36AM UTC coverage: 59.883% (-0.2%) from 60.053%
#4661

push

travis-ci

web-flow
test: update cases desc (#32498)

137331 of 291923 branches covered (47.04%)

Branch coverage included in aggregate %.

207730 of 284307 relevant lines covered (73.07%)

4552406.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.55
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
40,850✔
26
  if (readLock) {
40,850✔
27
    while (taosRTryLockLatch(pLock)) {
38,534!
28
      taosMsleep(1);
×
29
    }
30

31
    return true;
38,534✔
32
  }
33

34
  while (taosWTryLockLatch(pLock)) {
2,546✔
35
    taosMsleep(1);
230✔
36
  }
37

38
  return true;
2,316✔
39
}
40

41
void mndStreamDestroySStreamMgmtRsp(SStreamMgmtRsp* p) {
×
42
  taosArrayDestroy(p->cont.vgIds);
×
43
  taosArrayDestroy(p->cont.readerList);
×
44
}
×
45

46
void mstDestroySStmVgStreamStatus(void* p) { 
414✔
47
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
414✔
48
  taosArrayDestroy(pStatus->trigReaders); 
414✔
49
  taosArrayDestroy(pStatus->calcReaders); 
414✔
50
}
414✔
51

52
void mstDestroySStmSnodeStreamStatus(void* p) { 
303✔
53
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
303✔
54
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,212✔
55
    taosArrayDestroy(pStatus->runners[i]);
909✔
56
    pStatus->runners[i] = NULL;
909✔
57
  }
58
}
303✔
59

60

61
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
62
  taosHashCleanup(pVgStatus->streamTasks);
×
63
  pVgStatus->streamTasks = NULL;
×
64
}
×
65

66
void mstDestroySStmTaskToDeployExt(void* param) {
1,566✔
67
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
1,566✔
68
  if (pExt->deployed) {
1,566✔
69
    return;
1,564✔
70
  }
71
  
72
  switch (pExt->deploy.task.type) {
2!
73
    case STREAM_TRIGGER_TASK:
×
74
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
×
75
      pExt->deploy.msg.trigger.readerList = NULL;
×
76
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
×
77
      pExt->deploy.msg.trigger.runnerList = NULL;
×
78
      break;
×
79
    case STREAM_RUNNER_TASK:
×
80
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
×
81
      break;
×
82
    default:  
2✔
83
      break;;
2✔
84
  }
85
}
86

87
void mstDestroyScanAddrList(void* param) {
266✔
88
  if (NULL == param) {
266!
89
    return;
×
90
  }
91
  SArray* pList = *(SArray**)param;
266✔
92
  taosArrayDestroy(pList);
266✔
93
}
94

95
void mstDestroySStmSnodeTasksDeploy(void* param) {
101✔
96
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
101✔
97
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
101✔
98
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
101✔
99
}
101✔
100

101
void mstDestroySStmVgTasksToDeploy(void* param) {
201✔
102
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
201✔
103
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
201✔
104
}
201✔
105

106
void mstDestroySStmSnodeStatus(void* param) {
85✔
107
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
85✔
108
  taosHashCleanup(pSnode->streamTasks);
85✔
109
}
85✔
110

111
void mstDestroySStmVgroupStatus(void* param) {
167✔
112
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
167✔
113
  taosHashCleanup(pVg->streamTasks);
167✔
114
}
167✔
115

116
void mstResetSStmStatus(SStmStatus* pStatus) {
264✔
117
  taosArrayDestroy(pStatus->trigReaders);
264✔
118
  pStatus->trigReaders = NULL;
264✔
119
  taosArrayDestroy(pStatus->trigOReaders);
264✔
120
  pStatus->trigOReaders = NULL;
264✔
121
  taosArrayDestroy(pStatus->calcReaders);
264✔
122
  pStatus->calcReaders = NULL;
264✔
123
  if (pStatus->triggerTask) {
264!
124
    mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
264✔
125
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
264!
126
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
264✔
127
  }
128
  taosMemoryFreeClear(pStatus->triggerTask);
264!
129
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,056✔
130
    taosArrayDestroy(pStatus->runners[i]);
792✔
131
    pStatus->runners[i] = NULL;
792✔
132
  }
133
}
264✔
134

135
void mstDestroySStmStatus(void* param) {
238✔
136
  SStmStatus* pStatus = (SStmStatus*)param;
238✔
137
  taosMemoryFreeClear(pStatus->streamName);
238!
138

139
  mstResetSStmStatus(pStatus);
238✔
140

141
  taosWLockLatch(&pStatus->userRecalcLock);
238✔
142
  taosArrayDestroy(pStatus->userRecalcList);
238✔
143
  taosWUnLockLatch(&pStatus->userRecalcLock);
238✔
144

145
  tFreeSCMCreateStreamReq(pStatus->pCreate);
238✔
146
  taosMemoryFreeClear(pStatus->pCreate);  
238!
147
}
238✔
148

149
void mstDestroySStmAction(void* param) {
367✔
150
  SStmAction* pAction = (SStmAction*)param;
367✔
151

152
  taosArrayDestroy(pAction->undeploy.taskList);
367✔
153
  taosArrayDestroy(pAction->recalc.recalcList);
367✔
154
}
367✔
155

156
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
361✔
157
  pDeploy->readerTasks = NULL;
361✔
158
  pDeploy->triggerTask = NULL;
361✔
159
  pDeploy->runnerTasks = NULL;
361✔
160
}
361✔
161

162
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
156✔
163
  SSdb   *pSdb = pMnode->pSdb;
156✔
164
  void   *pIter = NULL;
156✔
165
  
166
  while (1) {
271✔
167
    SStreamObj *pStream = NULL;
427✔
168
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
427✔
169
    if (pIter == NULL) break;
427✔
170

171
    if (pStream->pCreate->streamId == streamId) {
395✔
172
      *dropped = pStream->userDropped ? true : false;
124✔
173
      sdbRelease(pSdb, pStream);
124✔
174
      sdbCancelFetch(pSdb, pIter);
124✔
175
      mstsDebug("stream found, dropped:%d", *dropped);
124!
176
      return TSDB_CODE_SUCCESS;
124✔
177
    }
178
    
179
    sdbRelease(pSdb, pStream);
271✔
180
  }
181

182
  *dropped = true;
32✔
183

184
  return TSDB_CODE_SUCCESS;
32✔
185
}
186

187
typedef struct SStmCheckDbInUseCtx {
188
  bool* dbStream;
189
  bool* vtableStream;
190
  bool  ignoreCurrDb;
191
} SStmCheckDbInUseCtx;
192

193
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
194
  SStreamObj *pStream = pObj;
×
195
  if (atomic_load_8(&pStream->userDropped)) {
×
196
    return true;
×
197
  }
198

199
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
×
200
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
×
201
    return true;
×
202
  }
203
  
204
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
×
205
    *pCtx->dbStream = true;
×
206
    return false;
×
207
  }
208

209
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
×
210
  for (int32_t i = 0; i < calcDBNum; ++i) {
×
211
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
×
212
    if (0 == strcmp(calcDB, p1)) {
×
213
      *pCtx->dbStream = true;
×
214
      return false;
×
215
    }
216
  }
217

218
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
×
219
    *pCtx->vtableStream = true;
×
220
    return true;
×
221
  }
222
  
223
  return true;
×
224
}
225

226
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
1,772✔
227
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
1,772✔
228
  if (streamNum <= 0) {
1,772!
229
    return;
1,772✔
230
  }
231

232
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
×
233
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
×
234
}
235

236
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
237
  if (status == STREAM_STATUS_INIT) {
×
238
    tstrncpy(dst, "init", bufLen);
×
239
  } else if (status == STREAM_STATUS_RUNNING) {
×
240
    tstrncpy(dst, "running", bufLen);
×
241
  } else if (status == STREAM_STATUS_STOPPED) {
×
242
    tstrncpy(dst, "stopped", bufLen);
×
243
  } else if (status == STREAM_STATUS_FAILED) {
×
244
    tstrncpy(dst, "failed", bufLen);
×
245
  }
246
}
×
247

248
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
249
  SSdb      *pSdb = pMnode->pSdb;
×
250
  void      *pIter = NULL;
×
251
  SSnodeObj *pObj = NULL;
×
252

253
  while (1) {
254
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
255
    if (pIter == NULL) {
×
256
      break;
×
257
    }
258

259
    sdbRelease(pSdb, pObj);
×
260
    sdbCancelFetch(pSdb, pIter);
×
261
    return TSDB_CODE_SUCCESS;
×
262
  }
263

264
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
265
}
266

267
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
268
  pTask->id.taskId = pMsg->taskId;
×
269
  pTask->id.deployId = pMsg->deployId;
×
270
  pTask->id.seriousId = pMsg->seriousId;
×
271
  pTask->id.nodeId = pMsg->nodeId;
×
272
  pTask->id.taskIdx = pMsg->taskIdx;
×
273

274
  pTask->type = pMsg->type;
×
275
  pTask->flags = pMsg->flags;
×
276
  pTask->status = pMsg->status;
×
277
  pTask->lastUpTs = pCtx->currTs;
×
278
}
×
279

280
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
2,638✔
281
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
2,638✔
282
    return false;
2,338✔
283
  }
284

285
  SStmQNode *orig = pQueue->head;
300✔
286

287
  SStmQNode *node = pQueue->head->next;
300✔
288
  pQueue->head = pQueue->head->next;
300✔
289

290
  *param = node;
300✔
291

292
  taosMemoryFreeClear(orig);
300!
293

294
  atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
300✔
295

296
  return true;
300✔
297
}
298

299
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
300✔
300
  taosWLockLatch(&pQueue->lock);
300✔
301
  pQueue->tail->next = param;
300✔
302
  pQueue->tail = param;
300✔
303
  taosWUnLockLatch(&pQueue->lock);
300✔
304

305
  atomic_add_fetch_64(&pQueue->qRemainNum, 1);
300✔
306
}
300✔
307

308
char* mstGetStreamActionString(int32_t action) {
175✔
309
  switch (action) {
175!
310
    case STREAM_ACT_DEPLOY:
175✔
311
      return "DEPLOY";
175✔
312
    case STREAM_ACT_UNDEPLOY:
×
313
      return "UNDEPLOY";
×
314
    case STREAM_ACT_START:
×
315
      return "START";
×
316
    case STREAM_ACT_UPDATE_TRIGGER:
×
317
      return "UPDATE TRIGGER";
×
318
    case STREAM_ACT_RECALC:
×
319
      return "USER RECALC";
×
320
    default:
×
321
      break;
×
322
  }
323

324
  return "UNKNOWN";
×
325
}
326

327
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
269✔
328
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
269!
329
  if (NULL == pNode) {
269!
330
    taosMemoryFreeClear(param);
×
331
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
332
    return;
×
333
  }
334

335
  pNode->type = action;
269✔
336
  pNode->streamAct = true;
269✔
337
  pNode->action.stream.streamId = streamId;
269✔
338
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
269✔
339
  pNode->action.stream.userAction = userAction;
269✔
340
  pNode->action.stream.actionParam = param;
269✔
341
  
342
  pNode->next = NULL;
269✔
343

344
  mndStreamActionEnqueue(actionQ, pNode);
269✔
345

346
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
269✔
347
}
348

349
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
31✔
350
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
31!
351
  if (NULL == pNode) {
31!
352
    int64_t streamId = pAction->streamId;
×
353
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
354
    return;
×
355
  }
356

357
  pNode->type = action;
31✔
358
  pNode->streamAct = false;
31✔
359
  pNode->action.task = *pAction;
31✔
360
  
361
  pNode->next = NULL;
31✔
362

363
  mndStreamActionEnqueue(actionQ, pNode);
31✔
364
}
365

366
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
43✔
367
  int32_t iter = 0;
43✔
368
  SDBVgHashInfo* pVg = NULL;
43✔
369
  void* p = NULL;
43✔
370
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
323✔
371
    pVg = (SDBVgHashInfo*)p;
280✔
372
    taosArrayDestroy(pVg->vgArray);
280✔
373
  }
374
  
375
  tSimpleHashCleanup(pDbVgs);
43✔
376
}
43✔
377

378

379
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
43✔
380
  void*   pIter = NULL;
43✔
381
  int32_t code = TSDB_CODE_SUCCESS;
43✔
382
  int32_t lino = 0;
43✔
383
  SArray* pTarget = NULL;
43✔
384
  SArray* pNew = NULL;
43✔
385
  SDbObj* pDb = NULL;
43✔
386
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
43✔
387
  SVgObj* pVgroup = NULL;
43✔
388

389
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
43✔
390
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
43!
391

392
  while (1) {
314✔
393
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
357✔
394
    if (pIter == NULL) {
357✔
395
      break;
43✔
396
    }
397

398
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
314✔
399
    if (NULL == pDbInfo) {
314✔
400
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
280✔
401
      if (NULL == pNew) {
280!
402
        sdbRelease(pMnode->pSdb, pVgroup);
×
403
        sdbCancelFetch(pMnode->pSdb, pIter);
×
404
        pVgroup = NULL;
×
405
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
406
      }
407
      
408
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
280✔
409
      if (NULL == pDb) {
280!
410
        sdbRelease(pMnode->pSdb, pVgroup);
×
411
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
412
        pVgroup = NULL;
×
413
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
414
      }
415
      dbInfo.vgSorted = false;
280✔
416
      dbInfo.hashMethod = pDb->cfg.hashMethod;
280✔
417
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
280✔
418
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
280✔
419
      dbInfo.vgArray = pNew;
280✔
420
      
421
      mndReleaseDb(pMnode, pDb);
280✔
422

423
      pTarget = pNew;
280✔
424
    } else {
425
      pTarget = pDbInfo->vgArray;
34✔
426
    }
427

428
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
314✔
429
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
314!
430
      sdbRelease(pMnode->pSdb, pVgroup);
×
431
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
432
      pVgroup = NULL;
×
433
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
434
    }
435

436
    if (NULL == pDbInfo) {
314✔
437
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
280✔
438
      if (code) {
280!
439
        sdbRelease(pMnode->pSdb, pVgroup);
×
440
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
441
        pVgroup = NULL;
×
442
        TAOS_CHECK_EXIT(code);
×
443
      }
444
      pNew = NULL;
280✔
445
    }
446

447
    sdbRelease(pMnode->pSdb, pVgroup);
314✔
448
    pVgroup = NULL;
314✔
449
  }
450

451
  *ppRes = pDbVgroup;
43✔
452
  
453
_exit:
43✔
454

455
  taosArrayDestroy(pNew);
43✔
456

457
  if (code) {
43!
458
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
459
  }
460

461
  return code;
43✔
462
}
463

464
int mstDbVgInfoComp(const void* lp, const void* rp) {
17✔
465
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
17✔
466
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
17✔
467
  if (pLeft->hashBegin < pRight->hashBegin) {
17!
468
    return -1;
17✔
469
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
470
    return 1;
×
471
  }
472

473
  return 0;
×
474
}
475

476
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
108✔
477
  uint32_t*    key = (uint32_t*)lp;
108✔
478
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
108✔
479

480
  if (*key < pVg->hashBegin) {
108!
481
    return -1;
×
482
  } else if (*key > pVg->hashEnd) {
108✔
483
    return 1;
18✔
484
  }
485

486
  return 0;
90✔
487
}
488

489

490
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
90✔
491
  int32_t code = 0;
90✔
492
  int32_t lino = 0;
90✔
493
  SVgroupInfo* vgInfo = NULL;
90✔
494
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
495

496
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
90✔
497
  if (NULL == dbInfo) {
90!
498
    mstError("db %s does not exist", dbFName);
×
499
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
500
  }
501
  
502
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
90✔
503
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
180✔
504
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
90!
505

506
  if (!dbInfo->vgSorted) {
90✔
507
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
43✔
508
    dbInfo->vgSorted = true;
43✔
509
  }
510

511
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
90✔
512
  if (NULL == vgInfo) {
90!
513
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
514
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
515
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
516
  }
517

518
  *vgId = vgInfo->vgId;
90✔
519

520
_exit:
90✔
521

522
  if (code) {
90!
523
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
524
  }
525

526
  return code;
90✔
527
}
528

529

530
void mstLogSStreamObj(char* tips, SStreamObj* p) {
243✔
531
  if (!(stDebugFlag & DEBUG_DEBUG)) {
243✔
532
    return;
94✔
533
  }
534
  
535
  if (NULL == p) {
149!
536
    mstDebug("%s: stream is NULL", tips);
×
537
    return;
×
538
  }
539

540
  mstDebug("%s: stream obj", tips);
149!
541
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
149!
542
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
543

544
  SCMCreateStreamReq* q = p->pCreate;
149✔
545
  if (NULL == q) {
149!
546
    mstDebug("stream pCreate is NULL");
×
547
    return;
×
548
  }
549

550
  int64_t streamId = q->streamId;
149✔
551
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
149✔
552
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
149✔
553
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
149✔
554
  int32_t outColNum = taosArrayGetSize(q->outCols);
149✔
555
  int32_t outTagNum = taosArrayGetSize(q->outTags);
149✔
556
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
149✔
557

558
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
149!
559
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
560
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d notifyErrorHandle:%d notifyHistory:%d "
561
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
562
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
563
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
564
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
565
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
566
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
567
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->notifyErrorHandle, q->notifyHistory,
568
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
569
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
570
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
571
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
572

573
  switch (q->triggerType) {
149!
574
    case WINDOW_TYPE_INTERVAL: {
83✔
575
      SSlidingTrigger* t = &q->trigger.sliding;
83✔
576
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
83!
577
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
578
      break;
83✔
579
    }  
580
    case WINDOW_TYPE_SESSION: {
5✔
581
      SSessionTrigger* t = &q->trigger.session;
5✔
582
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
5!
583
      break;
5✔
584
    }
585
    case WINDOW_TYPE_STATE: {
27✔
586
      SStateWinTrigger* t = &q->trigger.stateWin;
27✔
587
      mstsDebug("state trigger options, slotId:%d, trueForDuration:%" PRId64, t->slotId, t->trueForDuration);
27!
588
      break;
27✔
589
    }
590
    case WINDOW_TYPE_EVENT:{
16✔
591
      SEventTrigger* t = &q->trigger.event;
16✔
592
      mstsDebug("event trigger options, startCond:%s, endCond:%s, trueForDuration:%" PRId64, (char*)t->startCond, (char*)t->endCond, t->trueForDuration);
16!
593
      break;
16✔
594
    }
595
    case WINDOW_TYPE_COUNT: {
11✔
596
      SCountTrigger* t = &q->trigger.count;
11✔
597
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
11!
598
      break;
11✔
599
    }
600
    case WINDOW_TYPE_PERIOD: {
7✔
601
      SPeriodTrigger* t = &q->trigger.period;
7✔
602
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
7!
603
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
604
      break;
7✔
605
    }
606
    default:
×
607
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
608
      break;
×
609
  }
610

611
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
149!
612

613
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
149!
614

615
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
149!
616

617
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
149!
618

619
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
149!
620

621
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
149!
622

623

624
  for (int32_t i = 0; i < calcDBNum; ++i) {
297✔
625
    char* dbName = taosArrayGetP(q->calcDB, i);
148✔
626
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
148!
627
  }
628

629
  for (int32_t i = 0; i < calcScanNum; ++i) {
299✔
630
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
150✔
631
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
150✔
632
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
150!
633
    for (int32_t v = 0; v < vgNum; ++v) {
300✔
634
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
150!
635
    }
636
  }
637

638
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
167✔
639
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
18✔
640
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
18!
641
  }
642

643
  for (int32_t i = 0; i < outColNum; ++i) {
664✔
644
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
515✔
645
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
515!
646
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
647
  }
648
      
649
}
650

651
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
1,036✔
652
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
1,036!
653
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
654
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
655
}
1,036✔
656

657
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
264✔
658
  if (!(stDebugFlag & DEBUG_DEBUG)) {
264✔
659
    return;
94✔
660
  }
661
  
662
  if (NULL == p) {
170!
663
    mstsDebug("%s: stream status is NULL", tips);
×
664
    return;
×
665
  }
666

667
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
170✔
668
  int32_t trigOReaderNum = taosArrayGetSize(p->trigOReaders);
170✔
669
  int32_t calcReaderNum = taosArrayGetSize(p->calcReaders);
170✔
670
  int32_t triggerNum = p->triggerTask ? 1 : 0;
170✔
671
  int32_t runnerNum = 0;
170✔
672

673
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
677✔
674
    runnerNum += taosArrayGetSize(p->runners[i]);
507✔
675
  }
676

677
  mstsDebug("%s: stream status", tips);
170!
678
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
170!
679
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
680
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
681
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
682

683
  SStmTaskStatus* pTask = NULL;
170✔
684
  for (int32_t i = 0; i < trigReaderNum; ++i) {
416✔
685
    pTask = taosArrayGet(p->trigReaders, i);
246✔
686
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
246✔
687
  }
688

689
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
170!
690
    pTask = taosArrayGet(p->trigOReaders, i);
×
691
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
692
  }
693

694
  for (int32_t i = 0; i < calcReaderNum; ++i) {
283✔
695
    pTask = taosArrayGet(p->calcReaders, i);
113✔
696
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
113✔
697
  }
698

699
  if (triggerNum > 0) {
170!
700
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
170✔
701
  }
702

703
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
677✔
704
    int32_t num = taosArrayGetSize(p->runners[i]);
507✔
705
    if (num <= 0) {
507!
706
      continue;
×
707
    }
708
    
709
    mstsDebug("the %dth deploy runners status", i);
507!
710
    for (int32_t m = 0; m < num; ++m) {
1,014✔
711
      pTask = taosArrayGet(p->runners[i], m);
507✔
712
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
507✔
713
    }
714
  }
715
      
716
}
717

718
bool mstEventPassIsolation(int32_t num, int32_t event) {
5,223✔
719
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
5,223✔
720
  if (ret) {
5,223✔
721
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
3,883✔
722
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
723
  }
724

725
  return ret;
5,223✔
726
}
727

728
bool mstEventHandledChkSet(int32_t event) {
3,883✔
729
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
3,883✔
730
    mstDebug("event %s set handled", gMndStreamEvent[event]);
320✔
731
    return true;
320✔
732
  }
733
  return false;
3,563✔
734
}
735

736
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
1,399✔
737
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
1,399✔
738
  if (0 == active || MND_STM_STATE_NORMAL != state) {
1,399!
739
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
×
740
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
×
741
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
×
742
    return TSDB_CODE_SUCCESS;
×
743
  }
744

745
  if (atomic_load_8(&pStream->userDropped)) {
1,399!
746
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
747
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
748
    return TSDB_CODE_SUCCESS;
×
749
  }
750

751
  if (atomic_load_8(&pStream->userStopped)) {
1,399✔
752
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
22✔
753
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
22✔
754
    return TSDB_CODE_SUCCESS;
22✔
755
  }
756

757
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
1,377✔
758
  
759
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
1,377✔
760
  if (NULL == pStatus) {
1,377✔
761
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
189✔
762
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
189✔
763
    goto _exit;
189✔
764
  }
765

766
  char tmpBuf[256];
767
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1,188✔
768
  switch (stopped) {
1,188!
769
    case 1:
117✔
770
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
117✔
771
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
117✔
772
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
117✔
773
      goto _exit;
117✔
774
      break;
775
    case 4:
×
776
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
777
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
778
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
779
      goto _exit;
×
780
      break;
781
    default:
1,071✔
782
      break;
1,071✔
783
  }
784

785
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
1,071✔
786
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
500✔
787
    strcpy(tmpBuf, "Running start from: ");
500✔
788
    formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
500✔
789
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
500✔
790
    goto _exit;
500✔
791
  }
792

793
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
571✔
794
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
571✔
795
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
571✔
796
  goto _exit;
571✔
797

798
_exit:
1,377✔
799
  
800
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1,377✔
801

802
  return TSDB_CODE_SUCCESS;
1,377✔
803
}
804

805
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
1,399✔
806
  int32_t code = 0;
1,399✔
807
  int32_t cols = 0;
1,399✔
808
  int32_t lino = 0;
1,399✔
809

810
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,399✔
811
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
1,399✔
812
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
813
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
814

815
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
1,399✔
816
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
817

818
  // db_name
819
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,399✔
820
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
1,399✔
821
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
822
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
823
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
1,399✔
824
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
825

826
  // create time
827
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
828
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
829
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
1,399✔
830
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
831

832
  // stream id
833
  char streamId2[19] = {0};
1,399✔
834
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
1,399✔
835
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
1,399✔
836
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
1,399✔
837
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
838
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
839
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
1,399✔
840
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
841

842
  // sql
843
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
1,399✔
844
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
1,399✔
845
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
846
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
847
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
1,399✔
848
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
849

850
  // status
851
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
1,399✔
852
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,399✔
853
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
1,399✔
854
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
855

856
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
857
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
858
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
1,399✔
859
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
860

861
  // snodeLeader
862
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
863
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
864
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
1,399✔
865
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
866

867
  // snodeReplica
868
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
869
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
870
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
1,399✔
871
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
1,399!
872
  mndReleaseSnode(pMnode, pSnode);
1,399✔
873
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
1,399✔
874
  TSDB_CHECK_CODE(code, lino, _end);
1,399!
875

876
  // msg
877
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,399✔
878
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,399!
879
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
1,399✔
880

881
_end:
1,399✔
882
  if (code) {
1,399!
883
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
884
  }
885
  return code;
1,399✔
886
}
887

888

889
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
12,183✔
890
  char tmpBuf[256];
891
  
892
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
12,183✔
893
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
12,183!
894
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
×
895
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
896
    return TSDB_CODE_SUCCESS;
×
897
  }
898

899
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
12,183!
900
    if (pTask->detailStatus) {
2,258✔
901
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
1,590✔
902
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
1,590✔
903
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
904
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
1,590✔
905
      taosRUnLockLatch(&pTask->detailStatusLock);
1,590✔
906
      return TSDB_CODE_SUCCESS;
1,590✔
907
    }
908

909
    taosRUnLockLatch(&pTask->detailStatusLock);    
668✔
910
  }
911
  
912
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
10,593✔
913
  
914
  return TSDB_CODE_SUCCESS;
10,593✔
915
}
916

917
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
12,183✔
918
  switch (pTask->type) {
12,183✔
919
    case STREAM_READER_TASK:
3,179✔
920
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
3,179✔
921
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
2,444✔
922
      } else {
923
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
735✔
924
      }
925
      return TSDB_CODE_SUCCESS;
3,179✔
926
    case STREAM_RUNNER_TASK:
6,746✔
927
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
6,746!
928
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
6,746✔
929
        return TSDB_CODE_SUCCESS;
6,746✔
930
      }
931
      break;
×
932
    default:
2,258✔
933
      break;
2,258✔
934
  }
935

936
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
2,258✔
937
  return TSDB_CODE_SUCCESS;
2,258✔
938
}
939

940

941
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
12,183✔
942
  int32_t code = 0;
12,183✔
943
  int32_t cols = 0;
12,183✔
944
  int32_t lino = 0;
12,183✔
945

946
  // stream_name
947
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
12,183✔
948
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
12,183✔
949
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
950
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
951

952
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
12,183✔
953
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
954

955
  // stream id
956
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
12,183✔
957
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
12,183✔
958
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
12,183✔
959
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
960
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
961
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
12,183✔
962
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
963

964
  // task id
965
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
12,183✔
966
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
12,183✔
967
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
968
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
969
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
12,183✔
970
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
971

972
  // type
973
  char type[20 + VARSTR_HEADER_SIZE] = {0};
12,183✔
974
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
12,183✔
975
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
976
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
977
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
12,183✔
978
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
979

980
  // serious id
981
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
12,183✔
982
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
12,183✔
983
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
984
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
985
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
12,183✔
986
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
987

988
  // deploy id
989
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
990
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
991
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
12,183✔
992
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
993

994
  // node_type
995
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
12,183✔
996
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
12,183✔
997
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
998
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
999
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
12,183✔
1000
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1001

1002
  // node id
1003
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
1004
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
1005
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
12,183✔
1006
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1007

1008
  // task idx
1009
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
1010
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
1011
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
12,183✔
1012
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1013

1014
  // status
1015
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
12,183✔
1016
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
12,183✔
1017
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
12,183✔
1018
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1019

1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
1021
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
1022
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
12,183✔
1023
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1024

1025
  // start time
1026
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
1027
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
1028
  if (pTask->runningStartTs) {
12,183✔
1029
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
1,808✔
1030
  } else {
1031
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
10,375✔
1032
  }
1033
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1034

1035
  // last update
1036
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
1037
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
1038
  if (pTask->lastUpTs) {
12,183!
1039
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
12,183✔
1040
  } else {
1041
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1042
  }
1043
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1044

1045
  // extra info
1046
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
12,183✔
1047
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
12,183✔
1048
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1049
  
1050
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
1051
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
1052
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
12,183✔
1053
  TSDB_CHECK_CODE(code, lino, _end);
12,183!
1054

1055
  // msg
1056
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
12,183✔
1057
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
12,183!
1058
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
12,183✔
1059

1060
_end:
12,183✔
1061
  if (code) {
12,183!
1062
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1063
  }
1064
  return code;
12,183✔
1065
}
1066

1067
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
2,260✔
1068
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + taosArrayGetSize(pStatus->trigOReaders) + taosArrayGetSize(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
2,260✔
1069
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
9,040✔
1070
    num += taosArrayGetSize(pStatus->runners[i]);
6,780✔
1071
  }
1072

1073
  return num;
2,260✔
1074
}
1075

1076
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
2,520✔
1077
  int32_t code = 0;
2,520✔
1078
  int32_t lino = 0;
2,520✔
1079
  int64_t streamId = pStream->pCreate->streamId;
2,520✔
1080

1081
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
2,520✔
1082

1083
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
2,520✔
1084
  if (NULL == pStatus) {
2,520✔
1085
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
260✔
1086
    goto _exit;
260✔
1087
  }
1088

1089
  int8_t stopped = atomic_load_8(&pStatus->stopped);
2,260✔
1090
  if (stopped) {
2,260!
1091
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1092
    goto _exit;
×
1093
  }
1094
  
1095
  int32_t count = mstGetNumOfStreamTasks(pStatus);
2,260✔
1096

1097
  if (*numOfRows + count > rowsCapacity) {
2,260✔
1098
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
51✔
1099
    if (code) {
51!
1100
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1101
      TAOS_CHECK_EXIT(code);
×
1102
    }
1103
  }
1104

1105
  SStmTaskStatus* pTask = NULL;
2,260✔
1106
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
2,260✔
1107
  for (int32_t i = 0; i < trigReaderNum; ++i) {
4,543✔
1108
    pTask = taosArrayGet(pStatus->trigReaders, i);
2,283✔
1109
  
1110
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
2,283✔
1111
    if (code == TSDB_CODE_SUCCESS) {
2,283!
1112
      (*numOfRows)++;
2,283✔
1113
    }
1114
  }
1115

1116
  trigReaderNum = taosArrayGetSize(pStatus->trigOReaders);
2,260✔
1117
  for (int32_t i = 0; i < trigReaderNum; ++i) {
2,421✔
1118
    pTask = taosArrayGet(pStatus->trigOReaders, i);
161✔
1119
  
1120
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
161✔
1121
    if (code == TSDB_CODE_SUCCESS) {
161!
1122
      (*numOfRows)++;
161✔
1123
    }
1124
  }
1125

1126

1127
  int32_t calcReaderNum = taosArrayGetSize(pStatus->calcReaders);
2,260✔
1128
  for (int32_t i = 0; i < calcReaderNum; ++i) {
2,995✔
1129
    pTask = taosArrayGet(pStatus->calcReaders, i);
735✔
1130
  
1131
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
735✔
1132
    if (code == TSDB_CODE_SUCCESS) {
735!
1133
      (*numOfRows)++;
735✔
1134
    }
1135
  }
1136

1137
  if (pStatus->triggerTask) {
2,260✔
1138
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
2,258✔
1139
    if (code == TSDB_CODE_SUCCESS) {
2,258!
1140
      (*numOfRows)++;
2,258✔
1141
    }
1142
  }
1143

1144
  int32_t runnerNum = 0;
2,260✔
1145
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
9,040✔
1146
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
6,780✔
1147
    for (int32_t m = 0; m < runnerNum; ++m) {
13,526✔
1148
      pTask = taosArrayGet(pStatus->runners[i], m);
6,746✔
1149
    
1150
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
6,746✔
1151
      if (code == TSDB_CODE_SUCCESS) {
6,746!
1152
        (*numOfRows)++;
6,746✔
1153
      }
1154
    }
1155
  }
1156
  
1157
  pBlock->info.rows = *numOfRows;
2,260✔
1158

1159
_exit:
2,520✔
1160
  
1161
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
2,520✔
1162

1163
  if (code) {
2,520!
1164
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1165
  }
1166
  
1167
  return code;
2,520✔
1168
}
1169

1170

1171
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
1✔
1172
  int32_t code = 0;
1✔
1173
  int32_t lino = 0;
1✔
1174
  bool    locked = false;
1✔
1175
  SArray* userRecalcList = NULL;
1✔
1176

1177
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
1✔
1178
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
1!
1179
  
1180
  taosWLockLatch(&pStream->userRecalcLock);
1✔
1181
  locked = true;
1✔
1182
  
1183
  if (NULL == pStream->userRecalcList) {
1!
1184
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
1✔
1185
    if (NULL == userRecalcList) {
1!
1186
      TAOS_CHECK_EXIT(terrno);
×
1187
    }
1188

1189
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
1!
1190

1191
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
1✔
1192
    userRecalcList = NULL;    
1✔
1193
  } else {
1194
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1195
  }
1196
  
1197
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
1!
1198

1199
_exit:
×
1200

1201
  taosArrayDestroy(userRecalcList);
1✔
1202

1203
  if (locked) {
1!
1204
    taosWUnLockLatch(&pStream->userRecalcLock);
1✔
1205
  }
1206
  
1207
  if (code) {
1!
1208
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1209
  }
1210
  
1211
  return code;
1✔
1212
}
1213

1214

1215

1216
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1217
  int32_t code = 0;
×
1218
  int32_t cols = 0;
×
1219
  int32_t lino = 0;
×
1220

1221
  // stream_name
1222
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1223
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1224
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1225
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1226

1227
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1228
  TSDB_CHECK_CODE(code, lino, _end);
×
1229

1230
  // stream id
1231
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1232
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1233
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1234
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1235
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1236
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1237
  TSDB_CHECK_CODE(code, lino, _end);
×
1238

1239
  // recalc id
1240
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1241
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1242
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1243
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1244
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1245
  TSDB_CHECK_CODE(code, lino, _end);
×
1246

1247
  // start
1248
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1249
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1250
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1251
  TSDB_CHECK_CODE(code, lino, _end);
×
1252

1253
  // end
1254
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1255
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1256
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1257
  TSDB_CHECK_CODE(code, lino, _end);
×
1258

1259
  // progress
1260
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1261
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1262
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1263
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1264
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1265
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1266
  TSDB_CHECK_CODE(code, lino, _end);
×
1267

1268
_end:
×
1269
  if (code) {
×
1270
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1271
  }
1272
  return code;
×
1273
}
1274

1275

1276
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
1✔
1277
  int32_t code = 0;
1✔
1278
  int32_t lino = 0;
1✔
1279
  int64_t streamId = pStream->pCreate->streamId;
1✔
1280

1281
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
1✔
1282

1283
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1✔
1284
  if (NULL == pStatus) {
1!
1285
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1286
    goto _exit;
×
1287
  }
1288

1289
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1✔
1290
  if (stopped) {
1!
1291
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1292
    goto _exit;
×
1293
  }
1294

1295
  if (NULL == pStatus->triggerTask) {
1!
1296
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1297
    goto _exit;
×
1298
  }
1299

1300
  mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
1✔
1301
  if (NULL == pStatus->triggerTask->detailStatus) {
1!
1302
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1303
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1304
    goto _exit;
×
1305
  }
1306

1307
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
1✔
1308
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
1✔
1309

1310
  if (*numOfRows + count > rowsCapacity) {
1!
1311
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1312
    if (code) {
×
1313
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1314
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1315
      TAOS_CHECK_EXIT(code);
×
1316
    }
1317
  }
1318

1319
  for (int32_t i = 0; i < count; ++i) {
1!
1320
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1321
  
1322
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1323
    if (code == TSDB_CODE_SUCCESS) {
×
1324
      (*numOfRows)++;
×
1325
    }
1326
  }
1327

1328
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
1✔
1329
  
1330
  pBlock->info.rows = *numOfRows;
1✔
1331

1332
_exit:
1✔
1333
  
1334
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1✔
1335

1336
  if (code) {
1!
1337
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1338
  }
1339
  
1340
  return code;
1✔
1341
}
1342

1343

1344

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc