• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4791

13 Oct 2025 06:50AM UTC coverage: 57.628% (-0.8%) from 58.476%
#4791

push

travis-ci

web-flow
Merge pull request #33213 from taosdata/fix/huoh/timemoe_model_directory

fix: fix tdgpt timemoe model directory

136628 of 303332 branches covered (45.04%)

Branch coverage included in aggregate %.

208121 of 294900 relevant lines covered (70.57%)

4250784.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.58
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
44,451✔
26
  if (readLock) {
44,451✔
27
    while (taosRTryLockLatch(pLock)) {
40,233✔
28
      taosMsleep(1);
104✔
29
    }
30

31
    return true;
40,129✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
4,322✔
35

36
  return true;
4,322✔
37
}
38

39
void mstDestroySStmVgStreamStatus(void* p) { 
600✔
40
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
600✔
41
  taosArrayDestroy(pStatus->trigReaders); 
600✔
42
  taosArrayDestroy(pStatus->calcReaders); 
600✔
43
}
600✔
44

45
void mstDestroySStmSnodeStreamStatus(void* p) { 
358✔
46
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
358✔
47
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,432✔
48
    taosArrayDestroy(pStatus->runners[i]);
1,074✔
49
    pStatus->runners[i] = NULL;
1,074✔
50
  }
51
}
358✔
52

53

54
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
55
  taosHashCleanup(pVgStatus->streamTasks);
×
56
  pVgStatus->streamTasks = NULL;
×
57
}
×
58

59
void mstDestroySStmTaskToDeployExt(void* param) {
2,205✔
60
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
2,205✔
61
  if (pExt->deployed) {
2,205✔
62
    return;
2,187✔
63
  }
64
  
65
  switch (pExt->deploy.task.type) {
18✔
66
    case STREAM_TRIGGER_TASK:
3✔
67
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
3✔
68
      pExt->deploy.msg.trigger.readerList = NULL;
3✔
69
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
3✔
70
      pExt->deploy.msg.trigger.runnerList = NULL;
3✔
71
      break;
3✔
72
    case STREAM_RUNNER_TASK:
9✔
73
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
9!
74
      break;
9✔
75
    default:  
6✔
76
      break;;
6✔
77
  }
78
}
79

80
void mstDestroyScanAddrList(void* param) {
447✔
81
  if (NULL == param) {
447!
82
    return;
×
83
  }
84
  SArray* pList = *(SArray**)param;
447✔
85
  taosArrayDestroy(pList);
447✔
86
}
87

88
void mstDestroySStmSnodeTasksDeploy(void* param) {
95✔
89
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
95✔
90
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
95✔
91
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
95✔
92
}
95✔
93

94
void mstDestroySStmVgTasksToDeploy(void* param) {
277✔
95
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
277✔
96
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
277✔
97
}
277✔
98

99
void mstDestroySStmSnodeStatus(void* param) {
87✔
100
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
87✔
101
  taosHashCleanup(pSnode->streamTasks);
87✔
102
}
87✔
103

104
void mstDestroySStmVgroupStatus(void* param) {
207✔
105
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
207✔
106
  taosHashCleanup(pVg->streamTasks);
207✔
107
}
207✔
108

109
void mstResetSStmStatus(SStmStatus* pStatus) {
341✔
110
  taosArrayDestroy(pStatus->trigReaders);
341✔
111
  pStatus->trigReaders = NULL;
341✔
112
  taosArrayDestroy(pStatus->trigOReaders);
341✔
113
  pStatus->trigOReaders = NULL;
341✔
114
  pStatus->calcReaders = tdListFree(pStatus->calcReaders);
341✔
115
  if (pStatus->triggerTask) {
341!
116
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
341✔
117
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
341!
118
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
341✔
119
  }
120
  taosMemoryFreeClear(pStatus->triggerTask);
341!
121
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
1,364✔
122
    taosArrayDestroy(pStatus->runners[i]);
1,023✔
123
    pStatus->runners[i] = NULL;
1,023✔
124
  }
125
}
341✔
126

127
void mstDestroySStmStatus(void* param) {
320✔
128
  SStmStatus* pStatus = (SStmStatus*)param;
320✔
129
  taosMemoryFreeClear(pStatus->streamName);
320!
130

131
  mstResetSStmStatus(pStatus);
320✔
132

133
  taosWLockLatch(&pStatus->userRecalcLock);
320✔
134
  taosArrayDestroy(pStatus->userRecalcList);
320✔
135
  taosWUnLockLatch(&pStatus->userRecalcLock);
320✔
136

137
  tFreeSCMCreateStreamReq(pStatus->pCreate);
320✔
138
  taosMemoryFreeClear(pStatus->pCreate);  
320!
139
}
320✔
140

141
void mstDestroySStmAction(void* param) {
479✔
142
  SStmAction* pAction = (SStmAction*)param;
479✔
143

144
  taosArrayDestroy(pAction->undeploy.taskList);
479✔
145
  taosArrayDestroy(pAction->recalc.recalcList);
479✔
146
}
479✔
147

148
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
403✔
149
  pDeploy->readerTasks = NULL;
403✔
150
  pDeploy->triggerTask = NULL;
403✔
151
  pDeploy->runnerTasks = NULL;
403✔
152
}
403✔
153

154
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
159✔
155
  SSdb   *pSdb = pMnode->pSdb;
159✔
156
  void   *pIter = NULL;
159✔
157
  
158
  while (1) {
778✔
159
    SStreamObj *pStream = NULL;
937✔
160
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
937✔
161
    if (pIter == NULL) break;
937✔
162

163
    if (pStream->pCreate->streamId == streamId) {
886✔
164
      *dropped = pStream->userDropped ? true : false;
108✔
165
      sdbRelease(pSdb, pStream);
108✔
166
      sdbCancelFetch(pSdb, pIter);
108✔
167
      mstsDebug("stream found, dropped:%d", *dropped);
108✔
168
      return TSDB_CODE_SUCCESS;
108✔
169
    }
170
    
171
    sdbRelease(pSdb, pStream);
778✔
172
  }
173

174
  *dropped = true;
51✔
175

176
  return TSDB_CODE_SUCCESS;
51✔
177
}
178

179
typedef struct SStmCheckDbInUseCtx {
180
  bool* dbStream;
181
  bool* vtableStream;
182
  bool  ignoreCurrDb;
183
} SStmCheckDbInUseCtx;
184

185
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
159✔
186
  SStreamObj *pStream = pObj;
159✔
187
  if (atomic_load_8(&pStream->userDropped)) {
159!
188
    return true;
×
189
  }
190

191
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
159✔
192
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
159✔
193
    return true;
10✔
194
  }
195
  
196
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
149!
197
    *pCtx->dbStream = true;
4✔
198
    return false;
4✔
199
  }
200

201
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
145✔
202
  for (int32_t i = 0; i < calcDBNum; ++i) {
287✔
203
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
145✔
204
    if (0 == strcmp(calcDB, p1)) {
145✔
205
      *pCtx->dbStream = true;
3✔
206
      return false;
3✔
207
    }
208
  }
209

210
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
142✔
211
    *pCtx->vtableStream = true;
44✔
212
    return true;
44✔
213
  }
214
  
215
  return true;
98✔
216
}
217

218
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
1,501✔
219
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
1,501✔
220
  if (streamNum <= 0) {
1,501✔
221
    return;
1,476✔
222
  }
223

224
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
25✔
225
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
25✔
226
}
227

228
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
229
  if (status == STREAM_STATUS_INIT) {
×
230
    tstrncpy(dst, "init", bufLen);
×
231
  } else if (status == STREAM_STATUS_RUNNING) {
×
232
    tstrncpy(dst, "running", bufLen);
×
233
  } else if (status == STREAM_STATUS_STOPPED) {
×
234
    tstrncpy(dst, "stopped", bufLen);
×
235
  } else if (status == STREAM_STATUS_FAILED) {
×
236
    tstrncpy(dst, "failed", bufLen);
×
237
  }
238
}
×
239

240
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
241
  SSdb      *pSdb = pMnode->pSdb;
×
242
  void      *pIter = NULL;
×
243
  SSnodeObj *pObj = NULL;
×
244

245
  while (1) {
246
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
247
    if (pIter == NULL) {
×
248
      break;
×
249
    }
250

251
    sdbRelease(pSdb, pObj);
×
252
    sdbCancelFetch(pSdb, pIter);
×
253
    return TSDB_CODE_SUCCESS;
×
254
  }
255

256
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
257
}
258

259
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
260
  pTask->id.taskId = pMsg->taskId;
×
261
  pTask->id.deployId = pMsg->deployId;
×
262
  pTask->id.seriousId = pMsg->seriousId;
×
263
  pTask->id.nodeId = pMsg->nodeId;
×
264
  pTask->id.taskIdx = pMsg->taskIdx;
×
265

266
  pTask->type = pMsg->type;
×
267
  pTask->flags = pMsg->flags;
×
268
  pTask->status = pMsg->status;
×
269
  pTask->lastUpTs = pCtx->currTs;
×
270
}
×
271

272
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
1,565✔
273
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
1,565✔
274
    return false;
1,207✔
275
  }
276

277
  SStmQNode *orig = pQueue->head;
358✔
278

279
  SStmQNode *node = pQueue->head->next;
358✔
280
  pQueue->head = pQueue->head->next;
358✔
281

282
  *param = node;
358✔
283

284
  taosMemoryFreeClear(orig);
358!
285

286
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
358✔
287

288
  return true;
358✔
289
}
290

291
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
358✔
292
  taosWLockLatch(&pQueue->lock);
358✔
293
  pQueue->tail->next = param;
358✔
294
  pQueue->tail = param;
358✔
295
  taosWUnLockLatch(&pQueue->lock);
358✔
296

297
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
358✔
298
}
358✔
299

300
char* mstGetStreamActionString(int32_t action) {
224✔
301
  switch (action) {
224!
302
    case STREAM_ACT_DEPLOY:
224✔
303
      return "DEPLOY";
224✔
304
    case STREAM_ACT_UNDEPLOY:
×
305
      return "UNDEPLOY";
×
306
    case STREAM_ACT_START:
×
307
      return "START";
×
308
    case STREAM_ACT_UPDATE_TRIGGER:
×
309
      return "UPDATE TRIGGER";
×
310
    case STREAM_ACT_RECALC:
×
311
      return "USER RECALC";
×
312
    default:
×
313
      break;
×
314
  }
315

316
  return "UNKNOWN";
×
317
}
318

319
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
348✔
320
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
348!
321
  if (NULL == pNode) {
348!
322
    taosMemoryFreeClear(param);
×
323
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
324
    return;
×
325
  }
326

327
  pNode->type = action;
348✔
328
  pNode->streamAct = true;
348✔
329
  pNode->action.stream.streamId = streamId;
348✔
330
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
348✔
331
  pNode->action.stream.userAction = userAction;
348✔
332
  pNode->action.stream.actionParam = param;
348✔
333
  
334
  pNode->next = NULL;
348✔
335

336
  mndStreamActionEnqueue(actionQ, pNode);
348✔
337

338
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
348✔
339
}
340

341
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
10✔
342
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
10!
343
  if (NULL == pNode) {
10!
344
    int64_t streamId = pAction->streamId;
×
345
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
346
    return;
×
347
  }
348

349
  pNode->type = action;
10✔
350
  pNode->streamAct = false;
10✔
351
  pNode->action.task = *pAction;
10✔
352
  
353
  pNode->next = NULL;
10✔
354

355
  mndStreamActionEnqueue(actionQ, pNode);
10✔
356
}
357

358
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
73✔
359
  int32_t iter = 0;
73✔
360
  SDBVgHashInfo* pVg = NULL;
73✔
361
  void* p = NULL;
73✔
362
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
486✔
363
    pVg = (SDBVgHashInfo*)p;
413✔
364
    taosArrayDestroy(pVg->vgArray);
413✔
365
  }
366
  
367
  tSimpleHashCleanup(pDbVgs);
73✔
368
}
73✔
369

370

371
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
73✔
372
  void*   pIter = NULL;
73✔
373
  int32_t code = TSDB_CODE_SUCCESS;
73✔
374
  int32_t lino = 0;
73✔
375
  SArray* pTarget = NULL;
73✔
376
  SArray* pNew = NULL;
73✔
377
  SDbObj* pDb = NULL;
73✔
378
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
73✔
379
  SVgObj* pVgroup = NULL;
73✔
380

381
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
73✔
382
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
73!
383

384
  while (1) {
482✔
385
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
555✔
386
    if (pIter == NULL) {
555✔
387
      break;
73✔
388
    }
389

390
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
482✔
391
    if (NULL == pDbInfo) {
482✔
392
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
413✔
393
      if (NULL == pNew) {
413!
394
        sdbRelease(pMnode->pSdb, pVgroup);
×
395
        sdbCancelFetch(pMnode->pSdb, pIter);
×
396
        pVgroup = NULL;
×
397
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
398
      }
399
      
400
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
413✔
401
      if (NULL == pDb) {
413!
402
        sdbRelease(pMnode->pSdb, pVgroup);
×
403
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
404
        pVgroup = NULL;
×
405
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
406
      }
407
      dbInfo.vgSorted = false;
413✔
408
      dbInfo.hashMethod = pDb->cfg.hashMethod;
413✔
409
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
413✔
410
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
413✔
411
      dbInfo.vgArray = pNew;
413✔
412
      
413
      mndReleaseDb(pMnode, pDb);
413✔
414

415
      pTarget = pNew;
413✔
416
    } else {
417
      pTarget = pDbInfo->vgArray;
69✔
418
    }
419

420
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
482✔
421
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
482!
422
      sdbRelease(pMnode->pSdb, pVgroup);
×
423
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
424
      pVgroup = NULL;
×
425
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
426
    }
427

428
    if (NULL == pDbInfo) {
482✔
429
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
413✔
430
      if (code) {
413!
431
        sdbRelease(pMnode->pSdb, pVgroup);
×
432
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
433
        pVgroup = NULL;
×
434
        TAOS_CHECK_EXIT(code);
×
435
      }
436
      pNew = NULL;
413✔
437
    }
438

439
    sdbRelease(pMnode->pSdb, pVgroup);
482✔
440
    pVgroup = NULL;
482✔
441
  }
442

443
  *ppRes = pDbVgroup;
73✔
444
  
445
_exit:
73✔
446

447
  taosArrayDestroy(pNew);
73✔
448

449
  if (code) {
73!
450
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
451
  }
452

453
  return code;
73✔
454
}
455

456
int mstDbVgInfoComp(const void* lp, const void* rp) {
35✔
457
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
35✔
458
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
35✔
459
  if (pLeft->hashBegin < pRight->hashBegin) {
35!
460
    return -1;
35✔
461
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
462
    return 1;
×
463
  }
464

465
  return 0;
×
466
}
467

468
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
163✔
469
  uint32_t*    key = (uint32_t*)lp;
163✔
470
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
163✔
471

472
  if (*key < pVg->hashBegin) {
163!
473
    return -1;
×
474
  } else if (*key > pVg->hashEnd) {
163✔
475
    return 1;
24✔
476
  }
477

478
  return 0;
139✔
479
}
480

481

482
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
139✔
483
  int32_t code = 0;
139✔
484
  int32_t lino = 0;
139✔
485
  SVgroupInfo* vgInfo = NULL;
139✔
486
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
487

488
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
139✔
489
  if (NULL == dbInfo) {
139!
490
    mstError("db %s does not exist", dbFName);
×
491
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
492
  }
493
  
494
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
139✔
495
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
278✔
496
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
139!
497

498
  if (!dbInfo->vgSorted) {
139✔
499
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
73✔
500
    dbInfo->vgSorted = true;
73✔
501
  }
502

503
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
139✔
504
  if (NULL == vgInfo) {
139!
505
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
506
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
507
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
508
  }
509

510
  *vgId = vgInfo->vgId;
139✔
511

512
_exit:
139✔
513

514
  if (code) {
139!
515
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
516
  }
517

518
  return code;
139✔
519
}
520

521

522
void mstLogSStreamObj(char* tips, SStreamObj* p) {
325✔
523
  if (!(stDebugFlag & DEBUG_DEBUG)) {
325✔
524
    return;
118✔
525
  }
526
  
527
  if (NULL == p) {
207!
528
    mstDebug("%s: stream is NULL", tips);
×
529
    return;
×
530
  }
531

532
  mstDebug("%s: stream obj", tips);
207!
533
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
207!
534
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
535

536
  SCMCreateStreamReq* q = p->pCreate;
207✔
537
  if (NULL == q) {
207!
538
    mstDebug("stream pCreate is NULL");
×
539
    return;
×
540
  }
541

542
  int64_t streamId = q->streamId;
207✔
543
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
207✔
544
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
207✔
545
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
207✔
546
  int32_t outColNum = taosArrayGetSize(q->outCols);
207✔
547
  int32_t outTagNum = taosArrayGetSize(q->outTags);
207✔
548
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
207✔
549

550
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
207!
551
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
552
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d "
553
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
554
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
555
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
556
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
557
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
558
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
559
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory,
560
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
561
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
562
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
563
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
564

565
  switch (q->triggerType) {
207!
566
    case WINDOW_TYPE_INTERVAL: {
117✔
567
      SSlidingTrigger* t = &q->trigger.sliding;
117✔
568
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
117!
569
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
570
      break;
117✔
571
    }  
572
    case WINDOW_TYPE_SESSION: {
8✔
573
      SSessionTrigger* t = &q->trigger.session;
8✔
574
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
8!
575
      break;
8✔
576
    }
577
    case WINDOW_TYPE_STATE: {
39✔
578
      SStateWinTrigger* t = &q->trigger.stateWin;
39✔
579
      mstsDebug("state trigger options, slotId:%d, expr:%s, extend:%d, trueForDuration:%" PRId64, t->slotId, (char *)t->expr, t->extend, t->trueForDuration);
39!
580
      break;
39✔
581
    }
582
    case WINDOW_TYPE_EVENT:{
21✔
583
      SEventTrigger* t = &q->trigger.event;
21✔
584
      mstsDebug("event trigger options, startCond:%s, endCond:%s, trueForDuration:%" PRId64, (char*)t->startCond, (char*)t->endCond, t->trueForDuration);
21!
585
      break;
21✔
586
    }
587
    case WINDOW_TYPE_COUNT: {
13✔
588
      SCountTrigger* t = &q->trigger.count;
13✔
589
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
13!
590
      break;
13✔
591
    }
592
    case WINDOW_TYPE_PERIOD: {
9✔
593
      SPeriodTrigger* t = &q->trigger.period;
9✔
594
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
9!
595
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
596
      break;
9✔
597
    }
598
    default:
×
599
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
600
      break;
×
601
  }
602

603
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
207!
604

605
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
207!
606

607
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
207!
608

609
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
207!
610

611
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
207!
612

613
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
207!
614

615

616
  for (int32_t i = 0; i < calcDBNum; ++i) {
413✔
617
    char* dbName = taosArrayGetP(q->calcDB, i);
206✔
618
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
206!
619
  }
620

621
  for (int32_t i = 0; i < calcScanNum; ++i) {
485✔
622
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
278✔
623
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
278✔
624
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
278!
625
    for (int32_t v = 0; v < vgNum; ++v) {
556✔
626
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
278!
627
    }
628
  }
629

630
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
232✔
631
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
25✔
632
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
25!
633
  }
634

635
  for (int32_t i = 0; i < outColNum; ++i) {
1,001✔
636
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
794✔
637
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
794!
638
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
639
  }
640
      
641
}
642

643
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
1,459✔
644
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
1,459!
645
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
646
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
647
}
1,459✔
648

649
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
341✔
650
  if (!(stDebugFlag & DEBUG_DEBUG)) {
341✔
651
    return;
123✔
652
  }
653
  
654
  if (NULL == p) {
218!
655
    mstsDebug("%s: stream status is NULL", tips);
×
656
    return;
×
657
  }
658

659
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
218✔
660
  int32_t trigOReaderNum = taosArrayGetSize(p->trigOReaders);
218✔
661
  int32_t calcReaderNum = MST_LIST_SIZE(p->calcReaders);
218!
662
  int32_t triggerNum = p->triggerTask ? 1 : 0;
218✔
663
  int32_t runnerNum = 0;
218✔
664

665
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
869✔
666
    runnerNum += taosArrayGetSize(p->runners[i]);
651✔
667
  }
668

669
  mstsDebug("%s: stream status", tips);
218!
670
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
218!
671
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
672
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
673
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
674

675
  SStmTaskStatus* pTask = NULL;
218✔
676
  for (int32_t i = 0; i < trigReaderNum; ++i) {
532✔
677
    pTask = taosArrayGet(p->trigReaders, i);
314✔
678
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
314✔
679
  }
680

681
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
218!
682
    pTask = taosArrayGet(p->trigOReaders, i);
×
683
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
684
  }
685

686
  SListNode* pNode = listHead(p->calcReaders);
218✔
687
  for (int32_t i = 0; i < calcReaderNum; ++i) {
428✔
688
    pTask = (SStmTaskStatus*)pNode->data;
210✔
689
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
210✔
690
    pNode = TD_DLIST_NODE_NEXT(pNode);
210✔
691
  }
692

693
  if (triggerNum > 0) {
218!
694
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
218✔
695
  }
696

697
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
869✔
698
    int32_t num = taosArrayGetSize(p->runners[i]);
651✔
699
    if (num <= 0) {
651!
700
      continue;
×
701
    }
702
    
703
    mstsDebug("the %dth deploy runners status", i);
651!
704
    for (int32_t m = 0; m < num; ++m) {
1,368✔
705
      pTask = taosArrayGet(p->runners[i], m);
717✔
706
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
717✔
707
    }
708
  }
709
      
710
}
711

712
bool mstEventPassIsolation(int32_t num, int32_t event) {
9,060✔
713
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_SHORT_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
9,060✔
714
  if (ret) {
9,060✔
715
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
6,739✔
716
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
717
  }
718

719
  return ret;
9,060✔
720
}
721

722
bool mstEventHandledChkSet(int32_t event) {
6,739✔
723
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
6,739✔
724
    mstDebug("event %s set handled", gMndStreamEvent[event]);
395✔
725
    return true;
395✔
726
  }
727
  return false;
6,344✔
728
}
729

730
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
1,635✔
731
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
1,635✔
732
  if (0 == active || MND_STM_STATE_NORMAL != state) {
1,635!
733
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
×
734
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
×
735
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
×
736
    return TSDB_CODE_SUCCESS;
×
737
  }
738

739
  if (atomic_load_8(&pStream->userDropped)) {
1,635!
740
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
741
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
742
    return TSDB_CODE_SUCCESS;
×
743
  }
744

745
  if (atomic_load_8(&pStream->userStopped)) {
1,635✔
746
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
26✔
747
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
26✔
748
    return TSDB_CODE_SUCCESS;
26✔
749
  }
750

751
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
1,609✔
752
  
753
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
1,609✔
754
  if (NULL == pStatus) {
1,609✔
755
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
227✔
756
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
227✔
757
    goto _exit;
227✔
758
  }
759

760
  char tmpBuf[256];
761
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1,382✔
762
  switch (stopped) {
1,382!
763
    case 1:
37✔
764
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
37✔
765
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
37✔
766
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
37✔
767
      goto _exit;
37✔
768
      break;
769
    case 4:
×
770
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
771
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
772
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
773
      goto _exit;
×
774
      break;
775
    default:
1,345✔
776
      break;
1,345✔
777
  }
778

779
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
1,345!
780
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
888✔
781
    strcpy(tmpBuf, "Running start from: ");
888✔
782
    (void)formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
888✔
783
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
888✔
784
    goto _exit;
888✔
785
  }
786

787
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
457✔
788
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
457✔
789
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
457✔
790
  goto _exit;
457✔
791

792
_exit:
1,609✔
793
  
794
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1,609✔
795

796
  return TSDB_CODE_SUCCESS;
1,609✔
797
}
798

799
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
1,635✔
800
  int32_t code = 0;
1,635✔
801
  int32_t cols = 0;
1,635✔
802
  int32_t lino = 0;
1,635✔
803

804
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,635✔
805
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
1,635✔
806
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
807
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
808

809
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
1,635✔
810
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
811

812
  // db_name
813
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
1,635✔
814
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
1,635✔
815
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
816
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
817
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
1,635✔
818
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
819

820
  // create time
821
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
822
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
823
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
1,635✔
824
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
825

826
  // stream id
827
  char streamId2[19] = {0};
1,635✔
828
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
1,635✔
829
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
1,635✔
830
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
1,635✔
831
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
832
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
833
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
1,635✔
834
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
835

836
  // sql
837
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
1,635✔
838
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
1,635✔
839
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
840
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
841
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
1,635✔
842
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
843

844
  // status
845
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
1,635✔
846
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
1,635✔
847
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
1,635✔
848
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
849

850
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
851
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
852
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
1,635✔
853
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
854

855
  // snodeLeader
856
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
857
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
858
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
1,635✔
859
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
860

861
  // snodeReplica
862
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
863
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
864
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
1,635✔
865
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
1,635!
866
  mndReleaseSnode(pMnode, pSnode);
1,635✔
867
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
1,635✔
868
  TSDB_CHECK_CODE(code, lino, _end);
1,635!
869

870
  // msg
871
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
1,635✔
872
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
1,635!
873
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
1,635✔
874

875
_end:
1,635✔
876
  if (code) {
1,635!
877
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
878
  }
879
  return code;
1,635✔
880
}
881

882

883
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
24,928✔
884
  char tmpBuf[256];
885
  
886
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
24,928✔
887
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
24,928!
888
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
1✔
889
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
1✔
890
    return TSDB_CODE_SUCCESS;
1✔
891
  }
892

893
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
24,927!
894
    if (pTask->detailStatus) {
3,974✔
895
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
2,950✔
896
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
2,950✔
897
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
898
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
2,950✔
899
      taosRUnLockLatch(&pTask->detailStatusLock);
2,950✔
900
      return TSDB_CODE_SUCCESS;
2,950✔
901
    }
902

903
    taosRUnLockLatch(&pTask->detailStatusLock);    
1,024✔
904
  }
905
  
906
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
21,977✔
907
  
908
  return TSDB_CODE_SUCCESS;
21,977✔
909
}
910

911
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
24,928✔
912
  switch (pTask->type) {
24,928✔
913
    case STREAM_READER_TASK:
8,356✔
914
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
8,356✔
915
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
4,619✔
916
      } else {
917
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
3,737✔
918
      }
919
      return TSDB_CODE_SUCCESS;
8,356✔
920
    case STREAM_RUNNER_TASK:
12,597✔
921
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
12,597✔
922
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
11,895✔
923
        return TSDB_CODE_SUCCESS;
11,895✔
924
      }
925
      break;
702✔
926
    default:
3,975✔
927
      break;
3,975✔
928
  }
929

930
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
4,677✔
931
  return TSDB_CODE_SUCCESS;
4,677✔
932
}
933

934

935
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
24,928✔
936
  int32_t code = 0;
24,928✔
937
  int32_t cols = 0;
24,928✔
938
  int32_t lino = 0;
24,928✔
939

940
  // stream_name
941
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
24,928✔
942
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
24,928✔
943
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
944
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
945

946
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
24,928✔
947
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
948

949
  // stream id
950
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
24,928✔
951
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
24,928✔
952
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
24,928✔
953
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
954
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
955
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
24,928✔
956
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
957

958
  // task id
959
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
24,928✔
960
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
24,928✔
961
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
962
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
963
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
24,928✔
964
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
965

966
  // type
967
  char type[20 + VARSTR_HEADER_SIZE] = {0};
24,928✔
968
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
24,928✔
969
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
970
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
971
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
24,928✔
972
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
973

974
  // serious id
975
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
24,928✔
976
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
24,928✔
977
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
978
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
979
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
24,928✔
980
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
981

982
  // deploy id
983
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
984
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
985
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
24,928✔
986
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
987

988
  // node_type
989
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
24,928✔
990
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
24,928✔
991
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
992
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
993
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
24,928✔
994
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
995

996
  // node id
997
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
998
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
999
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
24,928✔
1000
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
1001

1002
  // task idx
1003
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
1004
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
1005
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
24,928✔
1006
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
1007

1008
  // status
1009
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
24,928✔
1010
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
24,928✔
1011
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
24,928✔
1012
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
1013

1014
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
1015
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
1016
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
24,928✔
1017
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
1018

1019
  // start time
1020
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
1021
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
1022
  if (pTask->runningStartTs) {
24,928✔
1023
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
4,426✔
1024
  } else {
1025
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
20,502✔
1026
  }
1027
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
1028

1029
  // last update
1030
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
1031
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
1032
  if (pTask->lastUpTs) {
24,928!
1033
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
24,928✔
1034
  } else {
1035
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1036
  }
1037
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
1038

1039
  // extra info
1040
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
24,928✔
1041
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
24,928✔
1042
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
1043
  
1044
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
1045
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
1046
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
24,928✔
1047
  TSDB_CHECK_CODE(code, lino, _end);
24,928!
1048

1049
  // msg
1050
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
24,928✔
1051
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
24,928!
1052
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
24,928✔
1053

1054
_end:
24,928✔
1055
  if (code) {
24,928!
1056
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1057
  }
1058
  return code;
24,928✔
1059
}
1060

1061
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
3,975✔
1062
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + taosArrayGetSize(pStatus->trigOReaders) + MST_LIST_SIZE(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
3,975!
1063
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
15,900✔
1064
    num += taosArrayGetSize(pStatus->runners[i]);
11,925✔
1065
  }
1066

1067
  return num;
3,975✔
1068
}
1069

1070
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
4,387✔
1071
  int32_t code = 0;
4,387✔
1072
  int32_t lino = 0;
4,387✔
1073
  int64_t streamId = pStream->pCreate->streamId;
4,387✔
1074

1075
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
4,387✔
1076

1077
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
4,387✔
1078
  if (NULL == pStatus) {
4,387✔
1079
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
411✔
1080
    goto _exit;
411✔
1081
  }
1082

1083
  int8_t stopped = atomic_load_8(&pStatus->stopped);
3,976✔
1084
  if (stopped) {
3,976✔
1085
    mstsDebug("stream stopped %d, ignore it", stopped);
1!
1086
    goto _exit;
1✔
1087
  }
1088
  
1089
  int32_t count = mstGetNumOfStreamTasks(pStatus);
3,975✔
1090

1091
  if (*numOfRows + count > rowsCapacity) {
3,975✔
1092
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
82✔
1093
    if (code) {
82!
1094
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1095
      TAOS_CHECK_EXIT(code);
×
1096
    }
1097
  }
1098

1099
  SStmTaskStatus* pTask = NULL;
3,975✔
1100
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
3,975✔
1101
  for (int32_t i = 0; i < trigReaderNum; ++i) {
8,343✔
1102
    pTask = taosArrayGet(pStatus->trigReaders, i);
4,368✔
1103
  
1104
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
4,368✔
1105
    if (code == TSDB_CODE_SUCCESS) {
4,368!
1106
      (*numOfRows)++;
4,368✔
1107
    }
1108
  }
1109

1110
  trigReaderNum = taosArrayGetSize(pStatus->trigOReaders);
3,975✔
1111
  for (int32_t i = 0; i < trigReaderNum; ++i) {
4,226✔
1112
    pTask = taosArrayGet(pStatus->trigOReaders, i);
251✔
1113
  
1114
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
251✔
1115
    if (code == TSDB_CODE_SUCCESS) {
251!
1116
      (*numOfRows)++;
251✔
1117
    }
1118
  }
1119

1120

1121
  int32_t calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
3,975!
1122
  SListNode* pNode = listHead(pStatus->calcReaders);
3,975✔
1123
  for (int32_t i = 0; i < calcReaderNum; ++i) {
7,712✔
1124
    pTask = (SStmTaskStatus*)pNode->data;
3,737✔
1125
  
1126
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
3,737✔
1127
    if (code == TSDB_CODE_SUCCESS) {
3,737!
1128
      (*numOfRows)++;
3,737✔
1129
    }
1130
    pNode = TD_DLIST_NODE_NEXT(pNode);
3,737✔
1131
  }
1132

1133
  if (pStatus->triggerTask) {
3,975!
1134
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
3,975✔
1135
    if (code == TSDB_CODE_SUCCESS) {
3,975!
1136
      (*numOfRows)++;
3,975✔
1137
    }
1138
  }
1139

1140
  int32_t runnerNum = 0;
3,975✔
1141
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
15,900✔
1142
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
11,925✔
1143
    for (int32_t m = 0; m < runnerNum; ++m) {
24,522✔
1144
      pTask = taosArrayGet(pStatus->runners[i], m);
12,597✔
1145
    
1146
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
12,597✔
1147
      if (code == TSDB_CODE_SUCCESS) {
12,597!
1148
        (*numOfRows)++;
12,597✔
1149
      }
1150
    }
1151
  }
1152
  
1153
  pBlock->info.rows = *numOfRows;
3,975✔
1154

1155
_exit:
4,387✔
1156
  
1157
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
4,387✔
1158

1159
  if (code) {
4,387!
1160
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1161
  }
1162
  
1163
  return code;
4,387✔
1164
}
1165

1166

1167
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
17✔
1168
  int32_t code = 0;
17✔
1169
  int32_t lino = 0;
17✔
1170
  bool    locked = false;
17✔
1171
  SArray* userRecalcList = NULL;
17✔
1172

1173
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
17✔
1174
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
17!
1175
  
1176
  taosWLockLatch(&pStream->userRecalcLock);
17✔
1177
  locked = true;
17✔
1178
  
1179
  if (NULL == pStream->userRecalcList) {
17!
1180
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
17✔
1181
    if (NULL == userRecalcList) {
17!
1182
      TAOS_CHECK_EXIT(terrno);
×
1183
    }
1184

1185
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
17!
1186

1187
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
17✔
1188
    userRecalcList = NULL;    
17✔
1189
  } else {
1190
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1191
  }
1192
  
1193
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
17!
1194

1195
_exit:
×
1196

1197
  taosArrayDestroy(userRecalcList);
17✔
1198

1199
  if (locked) {
17!
1200
    taosWUnLockLatch(&pStream->userRecalcLock);
17✔
1201
  }
1202
  
1203
  if (code) {
17!
1204
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1205
  }
1206
  
1207
  return code;
17✔
1208
}
1209

1210

1211

1212
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1213
  int32_t code = 0;
×
1214
  int32_t cols = 0;
×
1215
  int32_t lino = 0;
×
1216

1217
  // stream_name
1218
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1219
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1220
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1221
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1222

1223
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1224
  TSDB_CHECK_CODE(code, lino, _end);
×
1225

1226
  // stream id
1227
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1228
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1229
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1230
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1231
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1232
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1233
  TSDB_CHECK_CODE(code, lino, _end);
×
1234

1235
  // recalc id
1236
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1237
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1238
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1239
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1240
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1241
  TSDB_CHECK_CODE(code, lino, _end);
×
1242

1243
  // start
1244
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1245
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1246
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1247
  TSDB_CHECK_CODE(code, lino, _end);
×
1248

1249
  // end
1250
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1251
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1252
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1253
  TSDB_CHECK_CODE(code, lino, _end);
×
1254

1255
  // progress
1256
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1257
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1258
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1259
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1260
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1261
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1262
  TSDB_CHECK_CODE(code, lino, _end);
×
1263

1264
_end:
×
1265
  if (code) {
×
1266
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1267
  }
1268
  return code;
×
1269
}
1270

1271

1272
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
1✔
1273
  int32_t code = 0;
1✔
1274
  int32_t lino = 0;
1✔
1275
  int64_t streamId = pStream->pCreate->streamId;
1✔
1276

1277
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
1✔
1278

1279
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1✔
1280
  if (NULL == pStatus) {
1!
1281
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1282
    goto _exit;
×
1283
  }
1284

1285
  int8_t stopped = atomic_load_8(&pStatus->stopped);
1✔
1286
  if (stopped) {
1!
1287
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1288
    goto _exit;
×
1289
  }
1290

1291
  if (NULL == pStatus->triggerTask) {
1!
1292
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1293
    goto _exit;
×
1294
  }
1295

1296
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
1✔
1297
  if (NULL == pStatus->triggerTask->detailStatus) {
1!
1298
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1299
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1300
    goto _exit;
×
1301
  }
1302

1303
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
1✔
1304
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
1✔
1305

1306
  if (*numOfRows + count > rowsCapacity) {
1!
1307
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1308
    if (code) {
×
1309
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1310
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1311
      TAOS_CHECK_EXIT(code);
×
1312
    }
1313
  }
1314

1315
  for (int32_t i = 0; i < count; ++i) {
1!
1316
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1317
  
1318
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1319
    if (code == TSDB_CODE_SUCCESS) {
×
1320
      (*numOfRows)++;
×
1321
    }
1322
  }
1323

1324
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
1✔
1325
  
1326
  pBlock->info.rows = *numOfRows;
1✔
1327

1328
_exit:
1✔
1329
  
1330
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
1✔
1331

1332
  if (code) {
1!
1333
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1334
  }
1335
  
1336
  return code;
1✔
1337
}
1338

1339
int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) {
294✔
1340
  SSubplan* pSubplan = NULL;
294✔
1341
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
294✔
1342
  
1343
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
294!
1344

1345
  if (pSubplan->pNode && nodeType(pSubplan->pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
294!
1346
    SScanPhysiNode* pScanNode = (SScanPhysiNode*)pSubplan->pNode;
188✔
1347
    *uid = pScanNode->uid;
188✔
1348
  }
1349
  
1350
_exit:
106✔
1351

1352
  if (code) {
294!
1353
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1354
  }
1355

1356
  nodesDestroyNode((SNode *)pSubplan);
294✔
1357

1358
  return code;
294✔
1359
}
1360

1361

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc