• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.2
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
23,680,792✔
26
  if (readLock) {
23,680,792✔
27
    while (taosRTryLockLatch(pLock)) {
21,629,853✔
28
      taosMsleep(1);
163,161✔
29
    }
30

31
    return true;
21,466,692✔
32
  }
33

34
  taosWWaitLockLatch(pLock);
2,214,100✔
35

36
  return true;
2,214,100✔
37
}
38

39
void mstDestroySStmVgStreamStatus(void* p) { 
364,280✔
40
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
364,280✔
41
  taosArrayDestroy(pStatus->trigReaders); 
364,280✔
42
  taosArrayDestroy(pStatus->calcReaders); 
364,280✔
43
}
364,280✔
44

45
void mstDestroySStmSnodeStreamStatus(void* p) { 
164,493✔
46
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
164,493✔
47
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
657,972✔
48
    taosArrayDestroy(pStatus->runners[i]);
493,479✔
49
    pStatus->runners[i] = NULL;
493,479✔
50
  }
51
}
164,493✔
52

53

54
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
55
  taosHashCleanup(pVgStatus->streamTasks);
×
56
  pVgStatus->streamTasks = NULL;
×
57
}
×
58

59
void mstDestroySStmTaskToDeployExt(void* param) {
1,139,834✔
60
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
1,139,834✔
61
  if (pExt->deployed) {
1,139,834✔
62
    return;
1,136,624✔
63
  }
64
  
65
  switch (pExt->deploy.task.type) {
3,210✔
66
    case STREAM_TRIGGER_TASK:
498✔
67
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
498✔
68
      pExt->deploy.msg.trigger.readerList = NULL;
498✔
69
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
498✔
70
      pExt->deploy.msg.trigger.runnerList = NULL;
498✔
71
      break;
498✔
72
    case STREAM_RUNNER_TASK:
1,716✔
73
      taosMemoryFreeClear(pExt->deploy.msg.runner.pPlan);
1,716✔
74
      break;
1,716✔
75
    default:  
996✔
76
      break;;
996✔
77
  }
78
}
79

80
void mstDestroyScanAddrList(void* param) {
310,962✔
81
  if (NULL == param) {
310,962✔
82
    return;
×
83
  }
84
  SArray* pList = *(SArray**)param;
310,962✔
85
  taosArrayDestroy(pList);
310,962✔
86
}
87

88
void mstDestroySStmSnodeTasksDeploy(void* param) {
34,643✔
89
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
34,643✔
90
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
34,643✔
91
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
34,643✔
92
}
34,643✔
93

94
void mstDestroySStmVgTasksToDeploy(void* param) {
109,917✔
95
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
109,917✔
96
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
109,917✔
97
}
109,917✔
98

99
void mstDestroySStmSnodeStatus(void* param) {
25,975✔
100
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
25,975✔
101
  taosHashCleanup(pSnode->streamTasks);
25,975✔
102
}
25,975✔
103

104
void mstDestroySStmVgroupStatus(void* param) {
58,985✔
105
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
58,985✔
106
  taosHashCleanup(pVg->streamTasks);
58,985✔
107
}
58,985✔
108

109
void mstFreeTrigOReaderList(void* param) {
48,912✔
110
  SArray** ppList = (SArray**)param;
48,912✔
111
  taosArrayDestroy(*ppList);
48,912✔
112
}
48,912✔
113

114
void mstResetSStmStatus(SStmStatus* pStatus) {
159,671✔
115
  (void)mstWaitLock(&pStatus->resetLock, false);
159,671✔
116

117
  taosArrayDestroy(pStatus->trigReaders);
159,671✔
118
  pStatus->trigReaders = NULL;
159,671✔
119
  taosArrayDestroyEx(pStatus->trigOReaders, mstFreeTrigOReaderList);
159,671✔
120
  pStatus->trigOReaders = NULL;
159,671✔
121
  pStatus->calcReaders = tdListFree(pStatus->calcReaders);
159,671✔
122
  if (pStatus->triggerTask) {
159,671✔
123
    (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
159,671✔
124
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
159,671✔
125
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
159,671✔
126
  }
127
  taosMemoryFreeClear(pStatus->triggerTask);
159,671✔
128
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
638,684✔
129
    taosArrayDestroy(pStatus->runners[i]);
479,013✔
130
    pStatus->runners[i] = NULL;
479,013✔
131
  }
132
  pStatus->lastTrigMgmtReqId = 0;
159,671✔
133

134
  taosWUnLockLatch(&pStatus->resetLock);
159,671✔
135
}
159,671✔
136

137
void mstDestroySStmStatus(void* param) {
154,510✔
138
  SStmStatus* pStatus = (SStmStatus*)param;
154,510✔
139
  taosMemoryFreeClear(pStatus->streamName);
154,510✔
140

141
  mstResetSStmStatus(pStatus);
154,510✔
142

143
  taosWLockLatch(&pStatus->userRecalcLock);
154,510✔
144
  taosArrayDestroy(pStatus->userRecalcList);
154,510✔
145
  taosWUnLockLatch(&pStatus->userRecalcLock);
154,510✔
146

147
  tFreeSCMCreateStreamReq(pStatus->pCreate);
154,510✔
148
  taosMemoryFreeClear(pStatus->pCreate);  
154,510✔
149
}
154,510✔
150

151
void mstDestroySStmAction(void* param) {
247,566✔
152
  SStmAction* pAction = (SStmAction*)param;
247,566✔
153

154
  taosArrayDestroy(pAction->undeploy.taskList);
247,566✔
155
  taosArrayDestroy(pAction->recalc.recalcList);
247,566✔
156
}
247,566✔
157

158
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
230,085✔
159
  pDeploy->readerTasks = NULL;
230,085✔
160
  pDeploy->triggerTask = NULL;
230,085✔
161
  pDeploy->runnerTasks = NULL;
230,085✔
162
}
230,085✔
163

164
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
109,711✔
165
  SSdb   *pSdb = pMnode->pSdb;
109,711✔
166
  void   *pIter = NULL;
109,711✔
167
  
168
  while (1) {
342,368✔
169
    SStreamObj *pStream = NULL;
452,079✔
170
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
452,079✔
171
    if (pIter == NULL) break;
452,079✔
172

173
    if (pStream->pCreate->streamId == streamId) {
392,669✔
174
      *dropped = pStream->userDropped ? true : false;
50,301✔
175
      sdbRelease(pSdb, pStream);
50,301✔
176
      sdbCancelFetch(pSdb, pIter);
50,301✔
177
      mstsDebug("stream found, dropped:%d", *dropped);
50,301✔
178
      return TSDB_CODE_SUCCESS;
50,301✔
179
    }
180
    
181
    sdbRelease(pSdb, pStream);
342,368✔
182
  }
183

184
  *dropped = true;
59,410✔
185

186
  return TSDB_CODE_SUCCESS;
59,410✔
187
}
188

189
typedef struct SStmCheckDbInUseCtx {
190
  bool* dbStream;
191
  bool* vtableStream;
192
  bool  ignoreCurrDb;
193
} SStmCheckDbInUseCtx;
194

195
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
51,386✔
196
  SStreamObj *pStream = pObj;
51,386✔
197
  if (atomic_load_8(&pStream->userDropped)) {
51,386✔
198
    return true;
×
199
  }
200

201
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
51,386✔
202
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
51,386✔
203
    return true;
16,795✔
204
  }
205
  
206
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
34,591✔
207
    *pCtx->dbStream = true;
818✔
208
    return false;
818✔
209
  }
210

211
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
33,773✔
212
  for (int32_t i = 0; i < calcDBNum; ++i) {
66,913✔
213
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
33,773✔
214
    if (0 == strcmp(calcDB, p1)) {
33,773✔
215
      *pCtx->dbStream = true;
633✔
216
      return false;
633✔
217
    }
218
  }
219

220
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
33,140✔
221
    *pCtx->vtableStream = true;
9,141✔
222
    return true;
9,141✔
223
  }
224
  
225
  return true;
23,999✔
226
}
227

228
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
623,944✔
229
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
623,944✔
230
  if (streamNum <= 0) {
623,944✔
231
    return;
614,663✔
232
  }
233

234
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
9,281✔
235
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
9,281✔
236
}
237

238
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
239
  if (status == STREAM_STATUS_INIT) {
×
240
    tstrncpy(dst, "init", bufLen);
×
241
  } else if (status == STREAM_STATUS_RUNNING) {
×
242
    tstrncpy(dst, "running", bufLen);
×
243
  } else if (status == STREAM_STATUS_STOPPED) {
×
244
    tstrncpy(dst, "stopped", bufLen);
×
245
  } else if (status == STREAM_STATUS_FAILED) {
×
246
    tstrncpy(dst, "failed", bufLen);
×
247
  }
248
}
×
249

250
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
251
  SSdb      *pSdb = pMnode->pSdb;
×
252
  void      *pIter = NULL;
×
253
  SSnodeObj *pObj = NULL;
×
254

255
  while (1) {
256
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
257
    if (pIter == NULL) {
×
258
      break;
×
259
    }
260

261
    sdbRelease(pSdb, pObj);
×
262
    sdbCancelFetch(pSdb, pIter);
×
263
    return TSDB_CODE_SUCCESS;
×
264
  }
265

266
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
267
}
268

269
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
270
  pTask->id.taskId = pMsg->taskId;
×
271
  pTask->id.deployId = pMsg->deployId;
×
272
  pTask->id.seriousId = pMsg->seriousId;
×
273
  pTask->id.nodeId = pMsg->nodeId;
×
274
  pTask->id.taskIdx = pMsg->taskIdx;
×
275

276
  pTask->type = pMsg->type;
×
277
  pTask->flags = pMsg->flags;
×
278
  pTask->status = pMsg->status;
×
279
  pTask->lastUpTs = pCtx->currTs;
×
280
}
×
281

282
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
563,967✔
283
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
563,967✔
284
    return false;
399,964✔
285
  }
286

287
  SStmQNode *orig = pQueue->head;
164,003✔
288

289
  SStmQNode *node = pQueue->head->next;
164,003✔
290
  pQueue->head = pQueue->head->next;
164,003✔
291

292
  *param = node;
164,003✔
293

294
  taosMemoryFreeClear(orig);
164,003✔
295

296
  (void)atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
164,003✔
297

298
  return true;
164,003✔
299
}
300

301
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
164,003✔
302
  taosWLockLatch(&pQueue->lock);
164,003✔
303
  pQueue->tail->next = param;
164,003✔
304
  pQueue->tail = param;
164,003✔
305
  taosWUnLockLatch(&pQueue->lock);
164,003✔
306

307
  (void)atomic_add_fetch_64(&pQueue->qRemainNum, 1);
164,003✔
308
}
164,003✔
309

310
char* mstGetStreamActionString(int32_t action) {
137,355✔
311
  switch (action) {
137,355✔
312
    case STREAM_ACT_DEPLOY:
137,355✔
313
      return "DEPLOY";
137,355✔
314
    case STREAM_ACT_UNDEPLOY:
×
315
      return "UNDEPLOY";
×
316
    case STREAM_ACT_START:
×
317
      return "START";
×
318
    case STREAM_ACT_UPDATE_TRIGGER:
×
319
      return "UPDATE TRIGGER";
×
320
    case STREAM_ACT_RECALC:
×
321
      return "USER RECALC";
×
322
    default:
×
323
      break;
×
324
  }
325

326
  return "UNKNOWN";
×
327
}
328

329
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
162,619✔
330
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
162,619✔
331
  if (NULL == pNode) {
162,619✔
332
    taosMemoryFreeClear(param);
×
333
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
334
    return;
×
335
  }
336

337
  pNode->type = action;
162,619✔
338
  pNode->streamAct = true;
162,619✔
339
  pNode->action.stream.streamId = streamId;
162,619✔
340
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
162,619✔
341
  pNode->action.stream.userAction = userAction;
162,619✔
342
  pNode->action.stream.actionParam = param;
162,619✔
343
  
344
  pNode->next = NULL;
162,619✔
345

346
  mndStreamActionEnqueue(actionQ, pNode);
162,619✔
347

348
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
162,619✔
349
}
350

351
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
1,384✔
352
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
1,384✔
353
  if (NULL == pNode) {
1,384✔
354
    int64_t streamId = pAction->streamId;
×
355
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
356
    return;
×
357
  }
358

359
  pNode->type = action;
1,384✔
360
  pNode->streamAct = false;
1,384✔
361
  pNode->action.task = *pAction;
1,384✔
362
  
363
  pNode->next = NULL;
1,384✔
364

365
  mndStreamActionEnqueue(actionQ, pNode);
1,384✔
366
}
367

368
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
57,770✔
369
  int32_t iter = 0;
57,770✔
370
  SDBVgHashInfo* pVg = NULL;
57,770✔
371
  void* p = NULL;
57,770✔
372
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
269,044✔
373
    pVg = (SDBVgHashInfo*)p;
211,274✔
374
    taosArrayDestroy(pVg->vgArray);
211,274✔
375
  }
376
  
377
  tSimpleHashCleanup(pDbVgs);
57,770✔
378
}
57,770✔
379

380

381
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
57,770✔
382
  void*   pIter = NULL;
57,770✔
383
  int32_t code = TSDB_CODE_SUCCESS;
57,770✔
384
  int32_t lino = 0;
57,770✔
385
  SArray* pTarget = NULL;
57,770✔
386
  SArray* pNew = NULL;
57,770✔
387
  SDbObj* pDb = NULL;
57,770✔
388
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
57,770✔
389
  SVgObj* pVgroup = NULL;
57,770✔
390

391
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
57,770✔
392
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
57,770✔
393

394
  while (1) {
354,071✔
395
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
411,841✔
396
    if (pIter == NULL) {
411,841✔
397
      break;
57,770✔
398
    }
399

400
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
354,071✔
401
    if (NULL == pDbInfo) {
354,071✔
402
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
211,274✔
403
      if (NULL == pNew) {
211,274✔
404
        sdbRelease(pMnode->pSdb, pVgroup);
×
405
        sdbCancelFetch(pMnode->pSdb, pIter);
×
406
        pVgroup = NULL;
×
407
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
408
      }
409
      
410
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
211,274✔
411
      if (NULL == pDb) {
211,274✔
412
        sdbRelease(pMnode->pSdb, pVgroup);
×
413
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
414
        pVgroup = NULL;
×
415
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
416
      }
417
      dbInfo.vgSorted = false;
211,274✔
418
      dbInfo.hashMethod = pDb->cfg.hashMethod;
211,274✔
419
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
211,274✔
420
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
211,274✔
421
      dbInfo.vgArray = pNew;
211,274✔
422
      
423
      mndReleaseDb(pMnode, pDb);
211,274✔
424

425
      pTarget = pNew;
211,274✔
426
    } else {
427
      pTarget = pDbInfo->vgArray;
142,797✔
428
    }
429

430
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
354,071✔
431
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
354,071✔
432
      sdbRelease(pMnode->pSdb, pVgroup);
×
433
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
434
      pVgroup = NULL;
×
435
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
436
    }
437

438
    if (NULL == pDbInfo) {
354,071✔
439
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
211,274✔
440
      if (code) {
211,274✔
441
        sdbRelease(pMnode->pSdb, pVgroup);
×
442
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
443
        pVgroup = NULL;
×
444
        TAOS_CHECK_EXIT(code);
×
445
      }
446
      pNew = NULL;
211,274✔
447
    }
448

449
    sdbRelease(pMnode->pSdb, pVgroup);
354,071✔
450
    pVgroup = NULL;
354,071✔
451
  }
452

453
  *ppRes = pDbVgroup;
57,770✔
454
  
455
_exit:
57,770✔
456

457
  taosArrayDestroy(pNew);
57,770✔
458

459
  if (code) {
57,770✔
460
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
461
  }
462

463
  return code;
57,770✔
464
}
465

466
int mstDbVgInfoComp(const void* lp, const void* rp) {
99,976✔
467
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
99,976✔
468
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
99,976✔
469
  if (pLeft->hashBegin < pRight->hashBegin) {
99,976✔
470
    return -1;
99,976✔
471
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
472
    return 1;
×
473
  }
474

475
  return 0;
×
476
}
477

478
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
237,013✔
479
  uint32_t*    key = (uint32_t*)lp;
237,013✔
480
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
237,013✔
481

482
  if (*key < pVg->hashBegin) {
237,013✔
483
    return -1;
×
484
  } else if (*key > pVg->hashEnd) {
237,013✔
485
    return 1;
64,758✔
486
  }
487

488
  return 0;
172,255✔
489
}
490

491

492
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
172,255✔
493
  int32_t code = 0;
172,255✔
494
  int32_t lino = 0;
172,255✔
495
  SVgroupInfo* vgInfo = NULL;
172,255✔
496
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
172,255✔
497

498
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
172,255✔
499
  if (NULL == dbInfo) {
172,255✔
500
    mstError("db %s does not exist", dbFName);
×
501
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
502
  }
503
  
504
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
172,255✔
505
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
344,510✔
506
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
172,255✔
507

508
  if (!dbInfo->vgSorted) {
172,255✔
509
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
57,979✔
510
    dbInfo->vgSorted = true;
57,979✔
511
  }
512

513
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
172,255✔
514
  if (NULL == vgInfo) {
172,255✔
515
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
516
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
517
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
518
  }
519

520
  *vgId = vgInfo->vgId;
172,255✔
521

522
_exit:
172,255✔
523

524
  if (code) {
172,255✔
525
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
526
  }
527

528
  return code;
172,255✔
529
}
530

531

532
void mstLogSStreamObj(char* tips, SStreamObj* p) {
157,112✔
533
  if (!(stDebugFlag & DEBUG_DEBUG)) {
157,112✔
534
    return;
24,846✔
535
  }
536
  
537
  if (NULL == p) {
132,266✔
538
    mstDebug("%s: stream is NULL", tips);
×
539
    return;
×
540
  }
541

542
  mstDebug("%s: stream obj", tips);
132,266✔
543
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
132,266✔
544
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
545

546
  SCMCreateStreamReq* q = p->pCreate;
132,266✔
547
  if (NULL == q) {
132,266✔
548
    mstDebug("stream pCreate is NULL");
×
549
    return;
×
550
  }
551

552
  int64_t streamId = q->streamId;
132,266✔
553
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
132,266✔
554
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
132,266✔
555
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
132,266✔
556
  int32_t outColNum = taosArrayGetSize(q->outCols);
132,266✔
557
  int32_t outTagNum = taosArrayGetSize(q->outTags);
132,266✔
558
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
132,266✔
559

560
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
132,266✔
561
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
562
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d addOptions:%d notifyHistory:%d "
563
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
564
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
565
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
566
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
567
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
568
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
569
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->addOptions, q->notifyHistory,
570
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
571
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
572
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
573
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
574

575
  switch (q->triggerType) {
132,266✔
576
    case WINDOW_TYPE_INTERVAL: {
51,074✔
577
      SSlidingTrigger* t = &q->trigger.sliding;
51,074✔
578
      mstsDebug("sliding trigger options, intervalUnit:%d, slidingUnit:%d, offsetUnit:%d, soffsetUnit:%d, precision:%d, interval:%" PRId64 ", offset:%" PRId64 ", sliding:%" PRId64 ", soffset:%" PRId64, 
51,074✔
579
          t->intervalUnit, t->slidingUnit, t->offsetUnit, t->soffsetUnit, t->precision, t->interval, t->offset, t->sliding, t->soffset);
580
      break;
51,074✔
581
    }  
582
    case WINDOW_TYPE_SESSION: {
7,500✔
583
      SSessionTrigger* t = &q->trigger.session;
7,500✔
584
      mstsDebug("session trigger options, slotId:%d, sessionVal:%" PRId64, t->slotId, t->sessionVal);
7,500✔
585
      break;
7,500✔
586
    }
587
    case WINDOW_TYPE_STATE: {
36,676✔
588
      SStateWinTrigger* t = &q->trigger.stateWin;
36,676✔
589
      mstsDebug(
36,676✔
590
          "state trigger options, slotId:%d, expr:%s, extend:%d, zeroth:%s, trueForType: %d, trueForCount: %d, "
591
          "trueForDuration:%" PRId64,
592
          t->slotId, (char*)t->expr, t->extend, (char*)t->zeroth, t->trueForType, t->trueForCount, t->trueForDuration);
593
      break;
36,676✔
594
    }
595
    case WINDOW_TYPE_EVENT:{
19,775✔
596
      SEventTrigger* t = &q->trigger.event;
19,775✔
597
      mstsDebug(
19,775✔
598
          "event trigger options, startCond:%s, endCond:%s, trueForType: %d, trueForCount: %d, "
599
          "trueForDuration:%" PRId64,
600
          (char*)t->startCond, (char*)t->endCond, t->trueForType, t->trueForCount, t->trueForDuration);
601
      break;
19,775✔
602
    }
603
    case WINDOW_TYPE_COUNT: {
10,398✔
604
      SCountTrigger* t = &q->trigger.count;
10,398✔
605
      mstsDebug("count trigger options, countVal:%" PRId64 ", sliding:%" PRId64 ", condCols:%s", t->countVal, t->sliding, (char*)t->condCols);
10,398✔
606
      break;
10,398✔
607
    }
608
    case WINDOW_TYPE_PERIOD: {
6,843✔
609
      SPeriodTrigger* t = &q->trigger.period;
6,843✔
610
      mstsDebug("period trigger options, periodUnit:%d, offsetUnit:%d, precision:%d, period:%" PRId64 ", offset:%" PRId64, 
6,843✔
611
          t->periodUnit, t->offsetUnit, t->precision, t->period, t->offset);
612
      break;
6,843✔
613
    }
UNCOV
614
    default:
×
UNCOV
615
      mstsDebug("unknown triggerType:%d", q->triggerType);
×
UNCOV
616
      break;
×
617
  }
618

619
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
132,266✔
620

621
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
132,266✔
622

623
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
132,266✔
624

625
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
132,266✔
626

627
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
132,266✔
628

629
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
132,266✔
630

631

632
  for (int32_t i = 0; i < calcDBNum; ++i) {
261,455✔
633
    char* dbName = taosArrayGetP(q->calcDB, i);
129,189✔
634
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
129,189✔
635
  }
636

637
  for (int32_t i = 0; i < calcScanNum; ++i) {
410,366✔
638
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
278,100✔
639
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
278,100✔
640
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
278,100✔
641
    for (int32_t v = 0; v < vgNum; ++v) {
556,200✔
642
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
278,100✔
643
    }
644
  }
645

646
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
181,829✔
647
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
49,563✔
648
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
49,563✔
649
  }
650

651
  for (int32_t i = 0; i < outColNum; ++i) {
674,888✔
652
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
542,622✔
653
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
542,622✔
654
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
655
  }
656
      
657
}
658

659
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
938,252✔
660
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
938,252✔
661
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
662
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
663
}
938,252✔
664

665
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
159,671✔
666
  if (!(stDebugFlag & DEBUG_DEBUG)) {
159,671✔
667
    return;
25,264✔
668
  }
669
  
670
  if (NULL == p) {
134,407✔
UNCOV
671
    mstsDebug("%s: stream status is NULL", tips);
×
UNCOV
672
    return;
×
673
  }
674

675
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
134,407✔
676
  int32_t trigOReaderNum = msmGetTrigOReaderSize(p->trigOReaders);
134,407✔
677
  int32_t calcReaderNum = MST_LIST_SIZE(p->calcReaders);
134,407✔
678
  int32_t triggerNum = p->triggerTask ? 1 : 0;
134,407✔
679
  int32_t runnerNum = 0;
134,407✔
680

681
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
528,397✔
682
    runnerNum += taosArrayGetSize(p->runners[i]);
393,990✔
683
  }
684

685
  mstsDebug("%s: stream status", tips);
134,407✔
686
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
134,407✔
687
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
688
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
689
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
690

691
  SStmTaskStatus* pTask = NULL;
134,407✔
692
  for (int32_t i = 0; i < trigReaderNum; ++i) {
301,031✔
693
    pTask = taosArrayGet(p->trigReaders, i);
166,624✔
694
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
166,624✔
695
  }
696

697
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
134,407✔
UNCOV
698
    pTask = msmGetTrigOReader(p->trigOReaders, i);
×
UNCOV
699
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
700
  }
701

702
  SListNode* pNode = listHead(p->calcReaders);
134,407✔
703
  for (int32_t i = 0; i < calcReaderNum; ++i) {
358,558✔
704
    pTask = (SStmTaskStatus*)pNode->data;
224,151✔
705
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
224,151✔
706
    pNode = TD_DLIST_NODE_NEXT(pNode);
224,151✔
707
  }
708

709
  if (triggerNum > 0) {
134,407✔
710
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
134,407✔
711
  }
712

713
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
528,397✔
714
    int32_t num = taosArrayGetSize(p->runners[i]);
393,990✔
715
    if (num <= 0) {
393,990✔
UNCOV
716
      continue;
×
717
    }
718
    
719
    mstsDebug("the %dth deploy runners status", i);
393,990✔
720
    for (int32_t m = 0; m < num; ++m) {
807,060✔
721
      pTask = taosArrayGet(p->runners[i], m);
413,070✔
722
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
413,070✔
723
    }
724
  }
725
      
726
}
727

728
bool mstEventPassIsolation(int32_t num, int32_t event) {
2,817,403✔
729
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_SHORT_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
2,817,403✔
730
  if (ret) {
2,817,403✔
731
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
2,100,231✔
732
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
733
  }
734

735
  return ret;
2,817,403✔
736
}
737

738
bool mstEventHandledChkSet(int32_t event) {
2,100,231✔
739
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
2,100,231✔
740
    mstDebug("event %s set handled", gMndStreamEvent[event]);
118,164✔
741
    return true;
118,164✔
742
  }
743
  return false;
1,982,067✔
744
}
745

746
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
484,825✔
747
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
484,825✔
748
  if (0 == active || MND_STM_STATE_NORMAL != state) {
484,825✔
749
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
161✔
750
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
161✔
751
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
161✔
752
    return TSDB_CODE_SUCCESS;
161✔
753
  }
754

755
  if (atomic_load_8(&pStream->userDropped)) {
484,664✔
UNCOV
756
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
UNCOV
757
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
UNCOV
758
    return TSDB_CODE_SUCCESS;
×
759
  }
760

761
  if (atomic_load_8(&pStream->userStopped)) {
484,664✔
762
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
4,299✔
763
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
4,299✔
764
    return TSDB_CODE_SUCCESS;
4,299✔
765
  }
766

767
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
480,365✔
768
  
769
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
480,365✔
770
  if (NULL == pStatus) {
480,365✔
771
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
89,484✔
772
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
89,484✔
773
    goto _exit;
89,484✔
774
  }
775

776
  char tmpBuf[256];
390,881✔
777
  int8_t stopped = atomic_load_8(&pStatus->stopped);
390,881✔
778
  switch (stopped) {
390,881✔
779
    case 1:
7,106✔
780
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
7,106✔
781
      snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
7,106✔
782
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
7,106✔
783
      goto _exit;
7,106✔
784
      break;
UNCOV
785
    case 4:
×
UNCOV
786
      STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
UNCOV
787
      snprintf(tmpBuf, sizeof(tmpBuf), "Error: %s", tstrerror(TSDB_CODE_GRANT_STREAM_EXPIRED));
×
UNCOV
788
      STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
UNCOV
789
      goto _exit;
×
790
      break;
791
    default:
383,775✔
792
      break;
383,775✔
793
  }
794

795
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
383,775✔
796
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
207,028✔
797
    strcpy(tmpBuf, "Running start from: ");
207,028✔
798
    (void)formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
207,028✔
799
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
207,028✔
800
    goto _exit;
207,028✔
801
  }
802

803
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
176,747✔
804
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
176,747✔
805
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
176,747✔
806
  goto _exit;
176,747✔
807

808
_exit:
480,365✔
809
  
810
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
480,365✔
811

812
  return TSDB_CODE_SUCCESS;
480,365✔
813
}
814

815
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
484,825✔
816
  int32_t code = 0;
484,825✔
817
  int32_t cols = 0;
484,825✔
818
  int32_t lino = 0;
484,825✔
819

820
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
484,825✔
821
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
484,825✔
822
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
823
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
824

825
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
484,825✔
826
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
827

828
  // db_name
829
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
484,825✔
830
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
484,825✔
831
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
832
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
833
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
484,825✔
834
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
835

836
  // create time
837
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
838
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
839
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
484,825✔
840
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
841

842
  // stream id
843
  char streamId2[19] = {0};
484,825✔
844
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
484,825✔
845
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
484,825✔
846
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
484,825✔
847
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
848
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
849
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
484,825✔
850
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
851

852
  // sql
853
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
484,825✔
854
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
484,825✔
855
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
856
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
857
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
484,825✔
858
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
859

860
  // status
861
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
484,825✔
862
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
484,825✔
863
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
484,825✔
864
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
865

866
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
867
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
868
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
484,825✔
869
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
870

871
  // snodeLeader
872
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
873
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
874
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
484,825✔
875
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
876

877
  // snodeReplica
878
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
879
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
880
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
484,825✔
881
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
484,825✔
882
  mndReleaseSnode(pMnode, pSnode);
484,825✔
883
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
484,825✔
884
  TSDB_CHECK_CODE(code, lino, _end);
484,825✔
885

886
  // msg
887
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
484,825✔
888
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
484,825✔
889
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
484,825✔
890

891
_end:
484,825✔
892
  if (code) {
484,825✔
UNCOV
893
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
894
  }
895
  return code;
484,825✔
896
}
897

898

899
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
15,009,784✔
900
  char tmpBuf[256];
14,989,918✔
901
  
902
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
15,009,784✔
903
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
15,009,784✔
UNCOV
904
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
×
UNCOV
905
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
UNCOV
906
    return TSDB_CODE_SUCCESS;
×
907
  }
908

909
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
15,009,784✔
910
    if (pTask->detailStatus) {
2,140,341✔
911
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
1,824,720✔
912
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
1,824,720✔
913
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
914
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
1,824,720✔
915
      taosRUnLockLatch(&pTask->detailStatusLock);
1,824,720✔
916
      return TSDB_CODE_SUCCESS;
1,824,720✔
917
    }
918

919
    taosRUnLockLatch(&pTask->detailStatusLock);    
315,621✔
920
  }
921
  
922
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
13,185,064✔
923
  
924
  return TSDB_CODE_SUCCESS;
13,185,064✔
925
}
926

927
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
15,009,784✔
928
  switch (pTask->type) {
15,009,784✔
929
    case STREAM_READER_TASK:
6,405,083✔
930
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
6,405,083✔
931
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
2,900,087✔
932
      } else {
933
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
3,504,996✔
934
      }
935
      return TSDB_CODE_SUCCESS;
6,405,083✔
936
    case STREAM_RUNNER_TASK:
6,464,360✔
937
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
6,464,360✔
938
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
6,279,230✔
939
        return TSDB_CODE_SUCCESS;
6,279,230✔
940
      }
941
      break;
185,130✔
942
    default:
2,140,341✔
943
      break;
2,140,341✔
944
  }
945

946
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
2,325,471✔
947
  return TSDB_CODE_SUCCESS;
2,325,471✔
948
}
949

950

951
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
15,009,784✔
952
  int32_t code = 0;
15,009,784✔
953
  int32_t cols = 0;
15,009,784✔
954
  int32_t lino = 0;
15,009,784✔
955

956
  // stream_name
957
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
15,009,784✔
958
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
15,009,784✔
959
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
960
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
961

962
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
15,009,784✔
963
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
964

965
  // stream id
966
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
15,009,784✔
967
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
15,009,784✔
968
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE])); 
15,009,784✔
969
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
970
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
971
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
15,009,784✔
972
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
973

974
  // task id
975
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
15,009,784✔
976
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
15,009,784✔
977
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
978
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
979
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
15,009,784✔
980
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
981

982
  // type
983
  char type[20 + VARSTR_HEADER_SIZE] = {0};
15,009,784✔
984
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
15,009,784✔
985
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
986
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
987
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
15,009,784✔
988
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
989

990
  // serious id
991
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
15,009,784✔
992
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]));
15,009,784✔
993
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
994
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
995
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
15,009,784✔
996
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
997

998
  // deploy id
999
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1000
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1001
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
15,009,784✔
1002
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1003

1004
  // node_type
1005
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
15,009,784✔
1006
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
15,009,784✔
1007
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1008
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1009
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
15,009,784✔
1010
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1011

1012
  // node id
1013
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1014
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1015
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
15,009,784✔
1016
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1017

1018
  // task idx
1019
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1020
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1021
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
15,009,784✔
1022
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1023

1024
  // status
1025
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
15,009,784✔
1026
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
15,009,784✔
1027
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
15,009,784✔
1028
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1029

1030
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1031
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1032
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
15,009,784✔
1033
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1034

1035
  // start time
1036
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1037
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1038
  if (pTask->runningStartTs) {
15,009,784✔
1039
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
3,111,852✔
1040
  } else {
1041
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
11,897,932✔
1042
  }
1043
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1044

1045
  // last update
1046
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1047
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1048
  if (pTask->lastUpTs) {
15,009,784✔
1049
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
15,009,575✔
1050
  } else {
1051
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
209✔
1052
  }
1053
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1054

1055
  // extra info
1056
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
15,009,784✔
1057
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
15,009,784✔
1058
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1059
  
1060
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1061
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1062
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
15,009,784✔
1063
  TSDB_CHECK_CODE(code, lino, _end);
15,009,784✔
1064

1065
  // msg
1066
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
15,009,784✔
1067
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
15,009,784✔
1068
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
15,009,784✔
1069

1070
_end:
15,009,784✔
1071
  if (code) {
15,009,784✔
UNCOV
1072
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1073
  }
1074
  return code;
15,009,784✔
1075
}
1076

1077
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
2,141,334✔
1078
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + msmGetTrigOReaderSize(pStatus->trigOReaders) + MST_LIST_SIZE(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
2,141,334✔
1079
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
8,565,336✔
1080
    num += taosArrayGetSize(pStatus->runners[i]);
6,424,002✔
1081
  }
1082

1083
  return num;
2,141,334✔
1084
}
1085

1086
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
2,301,170✔
1087
  int32_t code = 0;
2,301,170✔
1088
  int32_t lino = 0;
2,301,170✔
1089
  int64_t streamId = pStream->pCreate->streamId;
2,301,170✔
1090
  bool    statusLocked = false;
2,301,170✔
1091

1092
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
2,301,170✔
1093

1094
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
2,301,170✔
1095
  if (NULL == pStatus) {
2,301,170✔
1096
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
113,714✔
1097
    goto _exit;
113,714✔
1098
  }
1099

1100
  int8_t stopped = atomic_load_8(&pStatus->stopped);
2,187,456✔
1101
  if (stopped) {
2,187,456✔
1102
    mstsDebug("stream stopped %d, ignore it", stopped);
46,122✔
1103
    goto _exit;
46,122✔
1104
  }
1105

1106
  (void)mstWaitLock(&pStatus->resetLock, true);
2,141,334✔
1107
  statusLocked = true;
2,141,334✔
1108
  
1109
  int32_t count = mstGetNumOfStreamTasks(pStatus);
2,141,334✔
1110

1111
  if (*numOfRows + count > rowsCapacity) {
2,141,334✔
1112
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
70,692✔
1113
    if (code) {
70,692✔
UNCOV
1114
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
UNCOV
1115
      TAOS_CHECK_EXIT(code);
×
1116
    }
1117
  }
1118

1119
  SStmTaskStatus* pTask = NULL;
2,141,334✔
1120
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
2,141,334✔
1121
  for (int32_t i = 0; i < trigReaderNum; ++i) {
4,514,347✔
1122
    pTask = taosArrayGet(pStatus->trigReaders, i);
2,373,013✔
1123
  
1124
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
2,373,013✔
1125
    if (code == TSDB_CODE_SUCCESS) {
2,373,013✔
1126
      (*numOfRows)++;
2,373,013✔
1127
    }
1128
  }
1129

1130
  trigReaderNum = msmGetTrigOReaderSize(pStatus->trigOReaders);
2,141,334✔
1131
  for (int32_t i = 0; i < trigReaderNum; ++i) {
2,668,408✔
1132
    pTask = msmGetTrigOReader(pStatus->trigOReaders, i);
527,074✔
1133
  
1134
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
527,074✔
1135
    if (code == TSDB_CODE_SUCCESS) {
527,074✔
1136
      (*numOfRows)++;
527,074✔
1137
    }
1138
  }
1139

1140
  if (pStatus->calcReaders) {
2,141,334✔
1141
    int32_t calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
2,141,334✔
1142
    SListNode* pNode = listHead(pStatus->calcReaders);
2,141,334✔
1143
    for (int32_t i = 0; i < calcReaderNum; ++i) {
5,646,330✔
1144
      pTask = (SStmTaskStatus*)pNode->data;
3,504,996✔
1145
    
1146
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
3,504,996✔
1147
      if (code == TSDB_CODE_SUCCESS) {
3,504,996✔
1148
        (*numOfRows)++;
3,504,996✔
1149
      }
1150
      pNode = TD_DLIST_NODE_NEXT(pNode);
3,504,996✔
1151
    }
1152
  }
1153

1154
  if (pStatus->triggerTask) {
2,141,334✔
1155
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
2,140,341✔
1156
    if (code == TSDB_CODE_SUCCESS) {
2,140,341✔
1157
      (*numOfRows)++;
2,140,341✔
1158
    }
1159
  }
1160

1161
  int32_t runnerNum = 0;
2,141,334✔
1162
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
8,565,336✔
1163
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
6,424,002✔
1164
    for (int32_t m = 0; m < runnerNum; ++m) {
12,888,362✔
1165
      pTask = taosArrayGet(pStatus->runners[i], m);
6,464,360✔
1166
    
1167
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
6,464,360✔
1168
      if (code == TSDB_CODE_SUCCESS) {
6,464,360✔
1169
        (*numOfRows)++;
6,464,360✔
1170
      }
1171
    }
1172
  }
1173
  
1174
  pBlock->info.rows = *numOfRows;
2,141,334✔
1175

1176
_exit:
2,301,170✔
1177

1178
  if (statusLocked) {
2,301,170✔
1179
    taosRUnLockLatch(&pStatus->resetLock);
2,141,334✔
1180
  }
1181
  
1182
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
2,301,170✔
1183

1184
  if (code) {
2,301,170✔
UNCOV
1185
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1186
  }
1187
  
1188
  return code;
2,301,170✔
1189
}
1190

1191

1192
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
3,116✔
1193
  int32_t code = 0;
3,116✔
1194
  int32_t lino = 0;
3,116✔
1195
  bool    locked = false;
3,116✔
1196
  SArray* userRecalcList = NULL;
3,116✔
1197

1198
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
3,116✔
1199
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
3,116✔
1200
  
1201
  taosWLockLatch(&pStream->userRecalcLock);
3,116✔
1202
  locked = true;
3,116✔
1203
  
1204
  if (NULL == pStream->userRecalcList) {
3,116✔
1205
    userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
3,116✔
1206
    if (NULL == userRecalcList) {
3,116✔
UNCOV
1207
      TAOS_CHECK_EXIT(terrno);
×
1208
    }
1209

1210
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
3,116✔
1211

1212
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
3,116✔
1213
    userRecalcList = NULL;    
3,116✔
1214
  } else {
UNCOV
1215
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1216
  }
1217
  
1218
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
3,116✔
1219

1220
_exit:
3,116✔
1221

1222
  taosArrayDestroy(userRecalcList);
3,116✔
1223

1224
  if (locked) {
3,116✔
1225
    taosWUnLockLatch(&pStream->userRecalcLock);
3,116✔
1226
  }
1227
  
1228
  if (code) {
3,116✔
UNCOV
1229
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1230
  }
1231
  
1232
  return code;
3,116✔
1233
}
1234

1235

1236

UNCOV
1237
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
UNCOV
1238
  int32_t code = 0;
×
UNCOV
1239
  int32_t cols = 0;
×
UNCOV
1240
  int32_t lino = 0;
×
1241

1242
  // stream_name
1243
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1244
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1245
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1246
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1247

UNCOV
1248
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1249
  TSDB_CHECK_CODE(code, lino, _end);
×
1250

1251
  // stream id
1252
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
1253
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1254
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1255
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1256
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
UNCOV
1257
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1258
  TSDB_CHECK_CODE(code, lino, _end);
×
1259

1260
  // recalc id
1261
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1262
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1263
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1264
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
UNCOV
1265
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
UNCOV
1266
  TSDB_CHECK_CODE(code, lino, _end);
×
1267

1268
  // start
1269
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1270
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1271
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1272
  TSDB_CHECK_CODE(code, lino, _end);
×
1273

1274
  // end
1275
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1276
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1277
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1278
  TSDB_CHECK_CODE(code, lino, _end);
×
1279

1280
  // progress
1281
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1282
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1283
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1284
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
1285
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
UNCOV
1286
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1287
  TSDB_CHECK_CODE(code, lino, _end);
×
1288

1289
_end:
×
1290
  if (code) {
×
1291
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1292
  }
1293
  return code;
×
1294
}
1295

1296

1297
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
243✔
1298
  int32_t code = 0;
243✔
1299
  int32_t lino = 0;
243✔
1300
  int64_t streamId = pStream->pCreate->streamId;
243✔
1301

1302
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
243✔
1303

1304
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
243✔
1305
  if (NULL == pStatus) {
243✔
UNCOV
1306
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
UNCOV
1307
    goto _exit;
×
1308
  }
1309

1310
  int8_t stopped = atomic_load_8(&pStatus->stopped);
243✔
1311
  if (stopped) {
243✔
1312
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1313
    goto _exit;
×
1314
  }
1315

1316
  if (NULL == pStatus->triggerTask) {
243✔
UNCOV
1317
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1318
    goto _exit;
×
1319
  }
1320

1321
  (void)mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
243✔
1322
  if (NULL == pStatus->triggerTask->detailStatus) {
243✔
1323
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1324
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
UNCOV
1325
    goto _exit;
×
1326
  }
1327

1328
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
243✔
1329
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
243✔
1330

1331
  if (*numOfRows + count > rowsCapacity) {
243✔
UNCOV
1332
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
UNCOV
1333
    if (code) {
×
UNCOV
1334
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
UNCOV
1335
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
UNCOV
1336
      TAOS_CHECK_EXIT(code);
×
1337
    }
1338
  }
1339

1340
  for (int32_t i = 0; i < count; ++i) {
243✔
1341
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1342
  
UNCOV
1343
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
UNCOV
1344
    if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1345
      (*numOfRows)++;
×
1346
    }
1347
  }
1348

1349
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
243✔
1350
  
1351
  pBlock->info.rows = *numOfRows;
243✔
1352

1353
_exit:
243✔
1354
  
1355
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
243✔
1356

1357
  if (code) {
243✔
UNCOV
1358
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1359
  }
1360
  
1361
  return code;
243✔
1362
}
1363

1364
int32_t mstGetScanUidFromPlan(int64_t streamId, void* scanPlan, int64_t* uid) {
240,751✔
1365
  SSubplan* pSubplan = NULL;
240,751✔
1366
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
240,751✔
1367
  
1368
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
240,751✔
1369

1370
  if (pSubplan->pNode && nodeType(pSubplan->pNode) == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
240,751✔
1371
    SScanPhysiNode* pScanNode = (SScanPhysiNode*)pSubplan->pNode;
90,801✔
1372
    *uid = pScanNode->uid;
90,801✔
1373
  }
1374
  
1375
_exit:
240,401✔
1376

1377
  if (code) {
240,751✔
UNCOV
1378
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1379
  }
1380

1381
  nodesDestroyNode((SNode *)pSubplan);
240,751✔
1382

1383
  return code;
240,751✔
1384
}
1385

1386

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc