• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.81
/source/dnode/mnode/impl/src/mndStreamUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndDb.h"
17
#include "mndStb.h"
18
#include "mndStream.h"
19
#include "mndTrans.h"
20
#include "mndVgroup.h"
21
#include "taoserror.h"
22
#include "tmisce.h"
23
#include "mndSnode.h"
24

25
bool mstWaitLock(SRWLatch* pLock, bool readLock) {
50✔
26
  if (readLock) {
50!
27
    while (taosRTryLockLatch(pLock)) {
50!
28
      taosMsleep(1);
×
29
    }
30

31
    return true;
50✔
32
  }
33

34
  while (taosWTryLockLatch(pLock)) {
×
35
    taosMsleep(1);
×
36
  }
37

38
  return true;
×
39
}
40

41
void mndStreamDestroySStreamMgmtRsp(SStreamMgmtRsp* p) {
×
42
  taosArrayDestroy(p->cont.vgIds);
×
43
  taosArrayDestroy(p->cont.readerList);
×
44
}
×
45

46
void mstDestroySStmVgStreamStatus(void* p) { 
×
47
  SStmVgStreamStatus* pStatus = (SStmVgStreamStatus*)p;
×
48
  taosArrayDestroy(pStatus->trigReaders); 
×
49
  taosArrayDestroy(pStatus->calcReaders); 
×
50
}
×
51

52
void mstDestroySStmSnodeStreamStatus(void* p) { 
×
53
  SStmSnodeStreamStatus* pStatus = (SStmSnodeStreamStatus*)p;
×
54
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
×
55
    taosArrayDestroy(pStatus->runners[i]);
×
56
    pStatus->runners[i] = NULL;
×
57
  }
58
}
×
59

60

61
void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus) {
×
62
  taosHashCleanup(pVgStatus->streamTasks);
×
63
  pVgStatus->streamTasks = NULL;
×
64
}
×
65

66
void mstDestroySStmTaskToDeployExt(void* param) {
×
67
  SStmTaskToDeployExt* pExt = (SStmTaskToDeployExt*)param;
×
68
  if (pExt->deployed) {
×
69
    return;
×
70
  }
71
  
72
  switch (pExt->deploy.task.type) {
×
73
    case STREAM_TRIGGER_TASK:
×
74
      taosArrayDestroy(pExt->deploy.msg.trigger.readerList);
×
75
      taosArrayDestroy(pExt->deploy.msg.trigger.runnerList);
×
76
      break;
×
77
    case STREAM_RUNNER_TASK:
×
78
      taosMemoryFree(pExt->deploy.msg.runner.pPlan);
×
79
      break;
×
80
    default:  
×
81
      break;;
×
82
  }
83
}
84

85
void mstDestroyScanAddrList(void* param) {
×
86
  if (NULL == param) {
×
87
    return;
×
88
  }
89
  SArray* pList = *(SArray**)param;
×
90
  taosArrayDestroy(pList);
×
91
}
92

93
void mstDestroySStmSnodeTasksDeploy(void* param) {
×
94
  SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)param;
×
95
  taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
×
96
  taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
×
97
}
×
98

99
void mstDestroySStmVgTasksToDeploy(void* param) {
×
100
  SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)param;
×
101
  taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
×
102
}
×
103

104
void mstDestroySStmSnodeStatus(void* param) {
×
105
  SStmSnodeStatus* pSnode = (SStmSnodeStatus*)param;
×
106
  taosHashCleanup(pSnode->streamTasks);
×
107
}
×
108

109
void mstDestroySStmVgroupStatus(void* param) {
×
110
  SStmVgroupStatus* pVg = (SStmVgroupStatus*)param;
×
111
  taosHashCleanup(pVg->streamTasks);
×
112
}
×
113

114
void mstResetSStmStatus(SStmStatus* pStatus) {
×
115
  taosArrayDestroy(pStatus->trigReaders);
×
116
  pStatus->trigReaders = NULL;
×
117
  taosArrayDestroy(pStatus->trigOReaders);
×
118
  pStatus->trigOReaders = NULL;
×
119
  taosArrayDestroy(pStatus->calcReaders);
×
120
  pStatus->calcReaders = NULL;
×
121
  if (pStatus->triggerTask) {
×
122
    mstWaitLock(&pStatus->triggerTask->detailStatusLock, false);
×
123
    taosMemoryFreeClear(pStatus->triggerTask->detailStatus);
×
124
    taosWUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
125
  }
126
  taosMemoryFreeClear(pStatus->triggerTask);
×
127
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
×
128
    taosArrayDestroy(pStatus->runners[i]);
×
129
    pStatus->runners[i] = NULL;
×
130
  }
131
}
×
132

133
void mstDestroySStmStatus(void* param) {
×
134
  SStmStatus* pStatus = (SStmStatus*)param;
×
135
  taosMemoryFreeClear(pStatus->streamName);
×
136

137
  mstResetSStmStatus(pStatus);
×
138

139
  tFreeSCMCreateStreamReq(pStatus->pCreate);
×
140
  taosMemoryFreeClear(pStatus->pCreate);  
×
141
}
×
142

143
void mstDestroySStmAction(void* param) {
×
144
  SStmAction* pAction = (SStmAction*)param;
×
145

146
  taosArrayDestroy(pAction->undeploy.taskList);
×
147
  taosArrayDestroy(pAction->recalc.recalcList);
×
148
}
×
149

150
void mstClearSStmStreamDeploy(SStmStreamDeploy* pDeploy) {
×
151
  pDeploy->readerTasks = NULL;
×
152
  pDeploy->triggerTask = NULL;
×
153
  pDeploy->runnerTasks = NULL;
×
154
}
×
155

156
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
×
157
  SSdb   *pSdb = pMnode->pSdb;
×
158
  void   *pIter = NULL;
×
159
  
160
  while (1) {
×
161
    SStreamObj *pStream = NULL;
×
162
    pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
×
163
    if (pIter == NULL) break;
×
164

165
    if (pStream->pCreate->streamId == streamId) {
×
166
      *dropped = pStream->userDropped ? true : false;
×
167
      sdbRelease(pSdb, pStream);
×
168
      sdbCancelFetch(pSdb, pIter);
×
169
      mstsDebug("stream found, dropped:%d", *dropped);
×
170
      return TSDB_CODE_SUCCESS;
×
171
    }
172
    
173
    sdbRelease(pSdb, pStream);
×
174
  }
175

176
  *dropped = true;
×
177

178
  return TSDB_CODE_SUCCESS;
×
179
}
180

181
typedef struct SStmCheckDbInUseCtx {
182
  bool* dbStream;
183
  bool* vtableStream;
184
  bool  ignoreCurrDb;
185
} SStmCheckDbInUseCtx;
186

187
static bool mstChkSetDbInUse(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
188
  SStreamObj *pStream = pObj;
×
189
  if (atomic_load_8(&pStream->userDropped)) {
×
190
    return true;
×
191
  }
192

193
  SStmCheckDbInUseCtx* pCtx = (SStmCheckDbInUseCtx*)p2;
×
194
  if (pCtx->ignoreCurrDb && 0 == strcmp(pStream->pCreate->streamDB, p1)) {
×
195
    return true;
×
196
  }
197
  
198
  if (pStream->pCreate->triggerDB && 0 == strcmp(pStream->pCreate->triggerDB, p1)) {
×
199
    *pCtx->dbStream = true;
×
200
    return false;
×
201
  }
202

203
  int32_t calcDBNum = taosArrayGetSize(pStream->pCreate->calcDB);
×
204
  for (int32_t i = 0; i < calcDBNum; ++i) {
×
205
    char* calcDB = taosArrayGetP(pStream->pCreate->calcDB, i);
×
206
    if (0 == strcmp(calcDB, p1)) {
×
207
      *pCtx->dbStream = true;
×
208
      return false;
×
209
    }
210
  }
211

212
  if (pStream->pCreate->vtableCalc || STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
×
213
    *pCtx->vtableStream = true;
×
214
    return true;
×
215
  }
216
  
217
  return true;
×
218
}
219

220
void mstCheckDbInUse(SMnode *pMnode, char *dbFName, bool *dbStream, bool *vtableStream, bool ignoreCurrDb) {
8✔
221
  int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
8✔
222
  if (streamNum <= 0) {
8!
223
    return;
8✔
224
  }
225

226
  SStmCheckDbInUseCtx ctx = {dbStream, vtableStream, ignoreCurrDb};
×
227
  sdbTraverse(pMnode->pSdb, SDB_STREAM, mstChkSetDbInUse, dbFName, &ctx, NULL);
×
228
}
229

230
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
×
231
  if (status == STREAM_STATUS_INIT) {
×
232
    tstrncpy(dst, "init", bufLen);
×
233
  } else if (status == STREAM_STATUS_RUNNING) {
×
234
    tstrncpy(dst, "running", bufLen);
×
235
  } else if (status == STREAM_STATUS_STOPPED) {
×
236
    tstrncpy(dst, "stopped", bufLen);
×
237
  } else if (status == STREAM_STATUS_FAILED) {
×
238
    tstrncpy(dst, "failed", bufLen);
×
239
  }
240
}
×
241

242
int32_t mstGenerateResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
×
243
  int32_t code = 0;
×
244
  int32_t cols = 0;
×
245
  int32_t lino = 0;
×
246

247
/* STREAMTODO
248
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
249
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
250
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
251
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
252

253
  code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
254
  TSDB_CHECK_CODE(code, lino, _end);
255

256
  // create time
257
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
258
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
259
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
260
  TSDB_CHECK_CODE(code, lino, _end);
261

262
  // stream id
263
  char buf[128] = {0};
264
  int64ToHexStr(pStream->uid, buf, tListLen(buf));
265
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
266
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
267
  code = colDataSetVal(pColInfo, numOfRows, buf, false);
268
  TSDB_CHECK_CODE(code, lino, _end);
269

270
  // related fill-history stream id
271
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
272
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
273
  if (pStream->hTaskUid != 0) {
274
    int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
275
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
276
  } else {
277
    code = colDataSetVal(pColInfo, numOfRows, buf, true);
278
  }
279
  TSDB_CHECK_CODE(code, lino, _end);
280

281
  // related fill-history stream id
282
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
283
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
284
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
285
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
286
  code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
287
  TSDB_CHECK_CODE(code, lino, _end);
288

289
  char status[20 + VARSTR_HEADER_SIZE] = {0};
290
  char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
291
  bool isPaused = false;
292
  //code = isAllTaskPaused(pStream, &isPaused);
293
  TSDB_CHECK_CODE(code, lino, _end);
294

295
  int8_t streamStatus = atomic_load_8(&pStream->status);
296
  if (isPaused && pStream->pTaskList != NULL) {
297
    streamStatus = STREAM_STATUS__PAUSE;
298
  }
299
  mndShowStreamStatus(status2, streamStatus);
300
  STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
301
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
302
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
303

304
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
305
  TSDB_CHECK_CODE(code, lino, _end);
306

307
  char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
308
  STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
309
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
310
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
311

312
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
313
  TSDB_CHECK_CODE(code, lino, _end);
314

315
  char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
316
  STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
317
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
318
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
319

320
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
321
  TSDB_CHECK_CODE(code, lino, _end);
322

323
  if (pStream->targetSTbName[0] == 0) {
324
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
325
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
326

327
    code = colDataSetVal(pColInfo, numOfRows, NULL, true);
328
  } else {
329
    char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
330
    STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
331
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
332
    TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
333

334
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
335
  }
336
  TSDB_CHECK_CODE(code, lino, _end);
337

338
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
339
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
340

341
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
342
  TSDB_CHECK_CODE(code, lino, _end);
343

344
  char trigger[20 + VARSTR_HEADER_SIZE] = {0};
345
  char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
346
  mndShowStreamTrigger(trigger2, pStream);
347
  STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
348
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
349
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
350

351
  code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
352
  TSDB_CHECK_CODE(code, lino, _end);
353

354
  // sink_quota
355
  char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
356
  sinkQuota[0] = '0';
357
  char dstStr[20] = {0};
358
  STR_TO_VARSTR(dstStr, sinkQuota)
359
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
360
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
361

362
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
363
  TSDB_CHECK_CODE(code, lino, _end);
364

365

366
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
367
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
368

369
  // checkpoint backup type
370
  char backup[20 + VARSTR_HEADER_SIZE] = {0};
371
  STR_TO_VARSTR(backup, "none")
372
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
373
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
374

375
  code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
376
  TSDB_CHECK_CODE(code, lino, _end);
377

378
  // history scan idle
379
  char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
380
  tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
381

382
  memset(dstStr, 0, tListLen(dstStr));
383
  STR_TO_VARSTR(dstStr, scanHistoryIdle)
384
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
385
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
386

387
  code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
388
  TSDB_CHECK_CODE(code, lino, _end);
389

390
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
391
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
392
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
393
  if (streamStatus == STREAM_STATUS__FAILED){
394
    STR_TO_VARSTR(msg, pStream->reserve)
395
  } else {
396
    STR_TO_VARSTR(msg, " ")
397
  }
398
  code = colDataSetVal(pColInfo, numOfRows, (const char *)msg, false);
399

400
_end:
401
  if (code) {
402
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
403
  }
404
*/
405

406
  return code;
×
407
}
408

409
int32_t mstCheckSnodeExists(SMnode *pMnode) {
×
410
  SSdb      *pSdb = pMnode->pSdb;
×
411
  void      *pIter = NULL;
×
412
  SSnodeObj *pObj = NULL;
×
413

414
  while (1) {
415
    pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
416
    if (pIter == NULL) {
×
417
      break;
×
418
    }
419

420
    sdbRelease(pSdb, pObj);
×
421
    sdbCancelFetch(pSdb, pIter);
×
422
    return TSDB_CODE_SUCCESS;
×
423
  }
424

425
  return TSDB_CODE_SNODE_NOT_DEPLOYED;
×
426
}
427

428
void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg) {
×
429
  pTask->id.taskId = pMsg->taskId;
×
430
  pTask->id.deployId = pMsg->deployId;
×
431
  pTask->id.seriousId = pMsg->seriousId;
×
432
  pTask->id.nodeId = pMsg->nodeId;
×
433
  pTask->id.taskIdx = pMsg->taskIdx;
×
434

435
  pTask->type = pMsg->type;
×
436
  pTask->flags = pMsg->flags;
×
437
  pTask->status = pMsg->status;
×
438
  pTask->lastUpTs = pCtx->currTs;
×
439
}
×
440

441
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
×
442
  while (0 == atomic_load_64(&pQueue->qRemainNum)) {
×
443
    return false;
×
444
  }
445

446
  SStmQNode *orig = pQueue->head;
×
447

448
  SStmQNode *node = pQueue->head->next;
×
449
  pQueue->head = pQueue->head->next;
×
450

451
  *param = node;
×
452

453
  taosMemoryFreeClear(orig);
×
454

455
  atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
×
456

457
  return true;
×
458
}
459

460
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
×
461
  taosWLockLatch(&pQueue->lock);
×
462
  pQueue->tail->next = param;
×
463
  pQueue->tail = param;
×
464
  taosWUnLockLatch(&pQueue->lock);
×
465

466
  atomic_add_fetch_64(&pQueue->qRemainNum, 1);
×
467
}
×
468

469
char* mstGetStreamActionString(int32_t action) {
×
470
  switch (action) {
×
471
    case STREAM_ACT_DEPLOY:
×
472
      return "DEPLOY";
×
473
    case STREAM_ACT_UNDEPLOY:
×
474
      return "UNDEPLOY";
×
475
    case STREAM_ACT_START:
×
476
      return "START";
×
477
    case STREAM_ACT_UPDATE_TRIGGER:
×
478
      return "UPDATE TRIGGER";
×
479
    case STREAM_ACT_RECALC:
×
480
      return "USER RECALC";
×
481
    default:
×
482
      break;
×
483
  }
484

485
  return "UNKNOWN";
×
486
}
487

488
void mstPostStreamAction(SStmActionQ*       actionQ, int64_t streamId, char* streamName, void* param, bool userAction, int32_t action) {
×
489
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
×
490
  if (NULL == pNode) {
×
491
    taosMemoryFreeClear(param);
×
492
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
493
    return;
×
494
  }
495

496
  pNode->type = action;
×
497
  pNode->streamAct = true;
×
498
  pNode->action.stream.streamId = streamId;
×
499
  TAOS_STRCPY(pNode->action.stream.streamName, streamName);
×
500
  pNode->action.stream.userAction = userAction;
×
501
  pNode->action.stream.actionParam = param;
×
502
  
503
  pNode->next = NULL;
×
504

505
  mndStreamActionEnqueue(actionQ, pNode);
×
506

507
  mstsDebug("stream action %s posted enqueue", mstGetStreamActionString(action));
×
508
}
509

510
void mstPostTaskAction(SStmActionQ*        actionQ, SStmTaskAction* pAction, int32_t action) {
×
511
  SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
×
512
  if (NULL == pNode) {
×
513
    int64_t streamId = pAction->streamId;
×
514
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
×
515
    return;
×
516
  }
517

518
  pNode->type = action;
×
519
  pNode->streamAct = false;
×
520
  pNode->action.task = *pAction;
×
521
  
522
  pNode->next = NULL;
×
523

524
  mndStreamActionEnqueue(actionQ, pNode);
×
525
}
526

527
void mstDestroyDbVgroupsHash(SSHashObj *pDbVgs) {
×
528
  int32_t iter = 0;
×
529
  SDBVgHashInfo* pVg = NULL;
×
530
  void* p = NULL;
×
531
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
×
532
    pVg = (SDBVgHashInfo*)p;
×
533
    taosArrayDestroy(pVg->vgArray);
×
534
  }
535
  
536
  tSimpleHashCleanup(pDbVgs);
×
537
}
×
538

539

540
int32_t mstBuildDBVgroupsMap(SMnode* pMnode, SSHashObj** ppRes) {
×
541
  void*   pIter = NULL;
×
542
  int32_t code = TSDB_CODE_SUCCESS;
×
543
  int32_t lino = 0;
×
544
  SArray* pTarget = NULL;
×
545
  SArray* pNew = NULL;
×
546
  SDbObj* pDb = NULL;
×
547
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
×
548
  SVgObj* pVgroup = NULL;
×
549

550
  SSHashObj* pDbVgroup = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
551
  TSDB_CHECK_NULL(pDbVgroup, code, lino, _exit, terrno);
×
552

553
  while (1) {
×
554
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
×
555
    if (pIter == NULL) {
×
556
      break;
×
557
    }
558

559
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
×
560
    if (NULL == pDbInfo) {
×
561
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
×
562
      if (NULL == pNew) {
×
563
        sdbRelease(pMnode->pSdb, pVgroup);
×
564
        sdbCancelFetch(pMnode->pSdb, pIter);
×
565
        pVgroup = NULL;
×
566
        TSDB_CHECK_NULL(pNew, code, lino, _exit, terrno);
×
567
      }
568
      
569
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
570
      if (NULL == pDb) {
×
571
        sdbRelease(pMnode->pSdb, pVgroup);
×
572
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
573
        pVgroup = NULL;
×
574
        TSDB_CHECK_NULL(pDb, code, lino, _exit, terrno);
×
575
      }
576
      dbInfo.vgSorted = false;
×
577
      dbInfo.hashMethod = pDb->cfg.hashMethod;
×
578
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
×
579
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
×
580
      dbInfo.vgArray = pNew;
×
581
      
582
      mndReleaseDb(pMnode, pDb);
×
583

584
      pTarget = pNew;
×
585
    } else {
586
      pTarget = pDbInfo->vgArray;
×
587
    }
588

589
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
×
590
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
×
591
      sdbRelease(pMnode->pSdb, pVgroup);
×
592
      sdbCancelFetch(pMnode->pSdb, pIter);      
×
593
      pVgroup = NULL;
×
594
      TSDB_CHECK_NULL(NULL, code, lino, _exit, terrno);
×
595
    }
596

597
    if (NULL == pDbInfo) {
×
598
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
×
599
      if (code) {
×
600
        sdbRelease(pMnode->pSdb, pVgroup);
×
601
        sdbCancelFetch(pMnode->pSdb, pIter);      
×
602
        pVgroup = NULL;
×
603
        TAOS_CHECK_EXIT(code);
×
604
      }
605
      pNew = NULL;
×
606
    }
607

608
    sdbRelease(pMnode->pSdb, pVgroup);
×
609
    pVgroup = NULL;
×
610
  }
611

612
  *ppRes = pDbVgroup;
×
613
  
614
_exit:
×
615

616
  taosArrayDestroy(pNew);
×
617

618
  if (code) {
×
619
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
620
  }
621

622
  return code;
×
623
}
624

625
int mstDbVgInfoComp(const void* lp, const void* rp) {
×
626
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
×
627
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
×
628
  if (pLeft->hashBegin < pRight->hashBegin) {
×
629
    return -1;
×
630
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
631
    return 1;
×
632
  }
633

634
  return 0;
×
635
}
636

637
int32_t mstTableHashValueComp(void const* lp, void const* rp) {
×
638
  uint32_t*    key = (uint32_t*)lp;
×
639
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
640

641
  if (*key < pVg->hashBegin) {
×
642
    return -1;
×
643
  } else if (*key > pVg->hashEnd) {
×
644
    return 1;
×
645
  }
646

647
  return 0;
×
648
}
649

650

651
int32_t mstGetTableVgId(SSHashObj* pDbVgroups, char* dbFName, char *tbName, int32_t* vgId) {
×
652
  int32_t code = 0;
×
653
  int32_t lino = 0;
×
654
  SVgroupInfo* vgInfo = NULL;
×
655
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
656

657
  SDBVgHashInfo* dbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, dbFName, strlen(dbFName) + 1);
×
658
  if (NULL == dbInfo) {
×
659
    mstError("db %s does not exist", dbFName);
×
660
    TAOS_CHECK_EXIT(TSDB_CODE_MND_DB_NOT_EXIST);
×
661
  }
662
  
663
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
×
664
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
×
665
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
666

667
  if (!dbInfo->vgSorted) {
×
668
    taosArraySort(dbInfo->vgArray, mstDbVgInfoComp);
×
669
    dbInfo->vgSorted = true;
×
670
  }
671

672
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, mstTableHashValueComp, TD_EQ);
×
673
  if (NULL == vgInfo) {
×
674
    mstError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
675
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
676
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
677
  }
678

679
  *vgId = vgInfo->vgId;
×
680

681
_exit:
×
682

683
  if (code) {
×
684
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
685
  }
686

687
  return code;
×
688
}
689

690

691
void mstLogSStreamObj(char* tips, SStreamObj* p) {
×
692
  if (!(stDebugFlag & DEBUG_DEBUG)) {
×
693
    return;
×
694
  }
695
  
696
  if (NULL == p) {
×
697
    mstDebug("%s: stream is NULL", tips);
×
698
    return;
×
699
  }
700

701
  mstDebug("%s: stream obj", tips);
×
702
  mstDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
×
703
      p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
704

705
  SCMCreateStreamReq* q = p->pCreate;
×
706
  if (NULL == q) {
×
707
    mstDebug("stream pCreate is NULL");
×
708
    return;
×
709
  }
710

711
  int64_t streamId = q->streamId;
×
712
  int32_t calcDBNum = taosArrayGetSize(q->calcDB);
×
713
  int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
×
714
  int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
×
715
  int32_t outColNum = taosArrayGetSize(q->outCols);
×
716
  int32_t outTagNum = taosArrayGetSize(q->outTags);
×
717
  int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
×
718

719
  mstsDebugL("create_info: name:%s sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
×
720
      "igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
721
      "calcNotifyOnly:%d lowLatencyCalc:%d igNoDataTrigger:%d notifyUrlNum:%d notifyEventTypes:%d notifyErrorHandle:%d notifyHistory:%d "
722
      "outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
723
      "triggerTblType:%d triggerTblUid:%" PRIx64 " triggerTblSuid:%" PRIx64 " vtableCalc:%d outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
724
      "eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " calcTsSlotId:%d triTsSlotId:%d "
725
      "triggerTblVgId:%d outTblVgId:%d calcScanPlanNum:%d forceOutCols:%d",
726
      q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
727
      q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
728
      q->calcNotifyOnly, q->lowLatencyCalc, q->igNoDataTrigger, notifyUrlNum, q->notifyEventTypes, q->notifyErrorHandle, q->notifyHistory,
729
      outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
730
      q->triggerTblType, q->triggerTblUid, q->triggerTblSuid, q->vtableCalc, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
731
      q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->calcTsSlotId, q->triTsSlotId,
732
      q->triggerTblVgId, q->outTblVgId, calcScanNum, forceOutColNum);
733

734
  mstsDebugL("create_info: triggerCols:[%s]", (char*)q->triggerCols);
×
735

736
  mstsDebugL("create_info: partitionCols:[%s]", (char*)q->partitionCols);
×
737

738
  mstsDebugL("create_info: triggerScanPlan:[%s]", (char*)q->triggerScanPlan);
×
739

740
  mstsDebugL("create_info: calcPlan:[%s]", (char*)q->calcPlan);
×
741

742
  mstsDebugL("create_info: subTblNameExpr:[%s]", (char*)q->subTblNameExpr);
×
743

744
  mstsDebugL("create_info: tagValueExpr:[%s]", (char*)q->tagValueExpr);
×
745

746

747
  for (int32_t i = 0; i < calcDBNum; ++i) {
×
748
    char* dbName = taosArrayGetP(q->calcDB, i);
×
749
    mstsDebug("create_info: calcDB[%d] - %s", i, dbName);
×
750
  }
751

752
  for (int32_t i = 0; i < calcScanNum; ++i) {
×
753
    SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
×
754
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
×
755
    mstsDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, (char*)pScan->scanPlan);
×
756
    for (int32_t v = 0; v < vgNum; ++v) {
×
757
      mstsDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
×
758
    }
759
  }
760

761
  for (int32_t i = 0; i < notifyUrlNum; ++i) {
×
762
    char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
×
763
    mstsDebug("create_info: notifyUrl[%d] - %s", i, url);
×
764
  }
765

766
  for (int32_t i = 0; i < outColNum; ++i) {
×
767
    SFieldWithOptions* o = taosArrayGet(q->outCols, i);
×
768
    mstsDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d", 
×
769
        i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
770
  }
771
      
772
}
773

774
void mstLogSStmTaskStatus(char* name, int64_t streamId, SStmTaskStatus* pTask, int32_t idx) {
×
775
  mstsDebug("%s[%d]: task %" PRIx64 " deployId:%d SID:%" PRId64 " nodeId:%d tidx:%d type:%s flags:%" PRIx64 " status:%s lastUpTs:%" PRId64, 
×
776
      name, idx, pTask->id.taskId, pTask->id.deployId, pTask->id.seriousId, pTask->id.nodeId, pTask->id.taskIdx,
777
      gStreamTaskTypeStr[pTask->type], pTask->flags, gStreamStatusStr[pTask->status], pTask->lastUpTs);
778
}
×
779

780
void mstLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p) {
×
781
  if (!(stDebugFlag & DEBUG_DEBUG)) {
×
782
    return;
×
783
  }
784
  
785
  if (NULL == p) {
×
786
    mstsDebug("%s: stream status is NULL", tips);
×
787
    return;
×
788
  }
789

790
  int32_t trigReaderNum = taosArrayGetSize(p->trigReaders);
×
791
  int32_t trigOReaderNum = taosArrayGetSize(p->trigOReaders);
×
792
  int32_t calcReaderNum = taosArrayGetSize(p->calcReaders);
×
793
  int32_t triggerNum = p->triggerTask ? 1 : 0;
×
794
  int32_t runnerNum = 0;
×
795

796
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
×
797
    runnerNum += taosArrayGetSize(p->runners[i]);
×
798
  }
799

800
  mstsDebug("%s: stream status", tips);
×
801
  mstsDebug("name:%s runnerNum:%d runnerDeploys:%d runnerReplica:%d lastActionTs:%" PRId64
×
802
           " trigReaders:%d trigOReaders:%d calcReaders:%d trigger:%d runners:%d",
803
      p->streamName, p->runnerNum, p->runnerDeploys, p->runnerReplica, p->lastActionTs,
804
      trigReaderNum, trigOReaderNum, calcReaderNum, triggerNum, runnerNum);
805

806
  SStmTaskStatus* pTask = NULL;
×
807
  for (int32_t i = 0; i < trigReaderNum; ++i) {
×
808
    pTask = taosArrayGet(p->trigReaders, i);
×
809
    mstLogSStmTaskStatus("trigReader task", streamId, pTask, i);
×
810
  }
811

812
  for (int32_t i = 0; i < trigOReaderNum; ++i) {
×
813
    pTask = taosArrayGet(p->trigOReaders, i);
×
814
    mstLogSStmTaskStatus("trigOReader task", streamId, pTask, i);
×
815
  }
816

817
  for (int32_t i = 0; i < calcReaderNum; ++i) {
×
818
    pTask = taosArrayGet(p->calcReaders, i);
×
819
    mstLogSStmTaskStatus("calcReader task", streamId, pTask, i);
×
820
  }
821

822
  if (triggerNum > 0) {
×
823
    mstLogSStmTaskStatus("trigger task", streamId, p->triggerTask, 0);
×
824
  }
825

826
  for (int32_t i = 0; i < p->runnerDeploys; ++i) {
×
827
    int32_t num = taosArrayGetSize(p->runners[i]);
×
828
    if (num <= 0) {
×
829
      continue;
×
830
    }
831
    
832
    mstsDebug("the %dth deploy runners status", i);
×
833
    for (int32_t m = 0; m < num; ++m) {
×
834
      pTask = taosArrayGet(p->runners[i], m);
×
835
      mstLogSStmTaskStatus("runner task", streamId, pTask, m);
×
836
    }
837
  }
838
      
839
}
840

841
bool mstEventPassIsolation(int32_t num, int32_t event) {
×
842
  bool ret = ((mStreamMgmt.lastTs[event].ts + num * MST_ISOLATION_DURATION) <= mStreamMgmt.hCtx.currentTs);
×
843
  if (ret) {
×
844
    mstDebug("event %s passed %d isolation, last:%" PRId64 ", curr:%" PRId64, 
×
845
        gMndStreamEvent[event], num, mStreamMgmt.lastTs[event].ts, mStreamMgmt.hCtx.currentTs);
846
  }
847

848
  return ret;
×
849
}
850

851
bool mstEventHandledChkSet(int32_t event) {
×
852
  if (0 == atomic_val_compare_exchange_8((int8_t*)&mStreamMgmt.lastTs[event].handled, 0, 1)) {
×
853
    mstDebug("event %s set handled", gMndStreamEvent[event]);
×
854
    return true;
×
855
  }
856
  return false;
×
857
}
858

859
int32_t mstGetStreamStatusStr(SStreamObj* pStream, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
×
860
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
×
861
  if (0 == active || MND_STM_STATE_NORMAL != state) {
×
862
    mstDebug("mnode streamMgmt not in active mode, active:%d, state:%d", active, state);
×
863
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
×
864
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "Mnode may be unstable, try again later", msgSize);
×
865
    return TSDB_CODE_SUCCESS;
×
866
  }
867

868
  if (atomic_load_8(&pStream->userDropped)) {
×
869
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_DROPPING], statusSize);
×
870
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
871
    return TSDB_CODE_SUCCESS;
×
872
  }
873

874
  if (atomic_load_8(&pStream->userStopped)) {
×
875
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_STOPPED], statusSize);
×
876
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
877
    return TSDB_CODE_SUCCESS;
×
878
  }
879

880
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
881
  
882
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &pStream->pCreate->streamId, sizeof(pStream->pCreate->streamId));
×
883
  if (NULL == pStatus) {
×
884
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_UNDEPLOYED], statusSize);
×
885
    STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
886
    goto _exit;
×
887
  }
888

889
  char tmpBuf[256];
890
  if (1 == atomic_load_8(&pStatus->stopped)) {
×
891
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_FAILED], statusSize);
×
892
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s, Failed times: %" PRId64, tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes);
×
893
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
894
    goto _exit;
×
895
  }
896

897
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
×
898
    STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_RUNNING], statusSize);
×
899
    strcpy(tmpBuf, "Running start from: ");
×
900
    formatTimestampLocal(&tmpBuf[strlen(tmpBuf)], pStatus->triggerTask->runningStartTs, TSDB_TIME_PRECISION_MILLI);
×
901
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
902
    goto _exit;
×
903
  }
904

905
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[STREAM_STATUS_INIT], statusSize);
×
906
  snprintf(tmpBuf, sizeof(tmpBuf), "Current deploy times: %" PRId64, pStatus->deployTimes);
×
907
  STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
908
  goto _exit;
×
909

910
_exit:
×
911
  
912
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
913

914
  return TSDB_CODE_SUCCESS;
×
915
}
916

917
int32_t mstSetStreamAttrResBlock(SMnode *pMnode, SStreamObj* pStream, SSDataBlock* pBlock, int32_t numOfRows) {
×
918
  int32_t code = 0;
×
919
  int32_t cols = 0;
×
920
  int32_t lino = 0;
×
921

922
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
923
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
924
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
925
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
926

927
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
928
  TSDB_CHECK_CODE(code, lino, _end);
×
929

930
  // db_name
931
  char streamDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
932
  STR_WITH_MAXSIZE_TO_VARSTR(streamDB, mndGetDbStr(pStream->pCreate->streamDB), sizeof(streamDB));
×
933
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
934
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
935
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&streamDB, false);
×
936
  TSDB_CHECK_CODE(code, lino, _end);
×
937

938
  // create time
939
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
940
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
941
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->createTime, false);
×
942
  TSDB_CHECK_CODE(code, lino, _end);
×
943

944
  // stream id
945
  char streamId2[19] = {0};
×
946
  char streamId[19 + VARSTR_HEADER_SIZE] = {0};
×
947
  snprintf(streamId2, sizeof(streamId2), "%" PRIx64, pStream->pCreate->streamId);
×
948
  STR_WITH_MAXSIZE_TO_VARSTR(streamId, streamId2, sizeof(streamId));
×
949
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
950
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
951
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamId, false);
×
952
  TSDB_CHECK_CODE(code, lino, _end);
×
953

954
  // sql
955
  char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
956
  STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->pCreate->sql, sizeof(sql));
×
957
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
958
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
959
  code = colDataSetVal(pColInfo, numOfRows, (const char*)sql, false);
×
960
  TSDB_CHECK_CODE(code, lino, _end);
×
961

962
  // status
963
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
×
964
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
×
965
  code = mstGetStreamStatusStr(pStream, status, sizeof(status), msg, sizeof(msg));
×
966
  TSDB_CHECK_CODE(code, lino, _end);
×
967

968
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
969
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
970
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
×
971
  TSDB_CHECK_CODE(code, lino, _end);
×
972

973
  // snodeLeader
974
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
975
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
976
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pStream->mainSnodeId, false);
×
977
  TSDB_CHECK_CODE(code, lino, _end);
×
978

979
  // snodeReplica
980
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
981
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
982
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, pStream->mainSnodeId);
×
983
  int32_t replicaSnodeId = pSnode ? pSnode->replicaId : -1;
×
984
  mndReleaseSnode(pMnode, pSnode);
×
985
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&replicaSnodeId, false);
×
986
  TSDB_CHECK_CODE(code, lino, _end);
×
987

988
  // msg
989
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
990
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
991
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
×
992

993
_end:
×
994
  if (code) {
×
995
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
996
  }
997
  return code;
×
998
}
999

1000

1001
int32_t mstGetTaskStatusStr(SStmTaskStatus* pTask, char* status, int32_t statusSize, char* msg, int32_t msgSize) {
×
1002
  char tmpBuf[256];
1003
  
1004
  STR_WITH_MAXSIZE_TO_VARSTR(status, gStreamStatusStr[pTask->status], statusSize);
×
1005
  if (STREAM_STATUS_FAILED == pTask->status && pTask->errCode) {
×
1006
    snprintf(tmpBuf, sizeof(tmpBuf), "Last error: %s", tstrerror(pTask->errCode));
×
1007
    STR_WITH_MAXSIZE_TO_VARSTR(msg, tmpBuf, msgSize);
×
1008
    return TSDB_CODE_SUCCESS;
×
1009
  }
1010

1011
  if (STREAM_TRIGGER_TASK == pTask->type && mstWaitLock(&pTask->detailStatusLock, true)) {
×
1012
    if (pTask->detailStatus) {
×
1013
      SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pTask->detailStatus;
×
1014
      snprintf(tmpBuf, sizeof(tmpBuf), "Current RT/HI/RE session num: %d/%d/%d, histroy progress:%d%%, total AUTO/USER recalc num: %d/%d", 
×
1015
          pTrigger->realtimeSessionNum, pTrigger->historySessionNum, pTrigger->recalcSessionNum, pTrigger->histroyProgress,
1016
          pTrigger->autoRecalcNum, (int32_t)taosArrayGetSize(pTrigger->userRecalcs));
×
1017
      taosRUnLockLatch(&pTask->detailStatusLock);
×
1018
      return TSDB_CODE_SUCCESS;
×
1019
    }
1020

1021
    taosRUnLockLatch(&pTask->detailStatusLock);    
×
1022
  }
1023
  
1024
  STR_WITH_MAXSIZE_TO_VARSTR(msg, "", msgSize);
×
1025
  
1026
  return TSDB_CODE_SUCCESS;
×
1027
}
1028

1029
int32_t mstGetTaskExtraStr(SStmTaskStatus* pTask, char* extraStr, int32_t extraSize) {
×
1030
  switch (pTask->type) {
×
1031
    case STREAM_READER_TASK:
×
1032
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
×
1033
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "trigReader", extraSize);
×
1034
      } else {
1035
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "calcReader", extraSize);
×
1036
      }
1037
      return TSDB_CODE_SUCCESS;
×
1038
    case STREAM_RUNNER_TASK:
×
1039
      if (STREAM_IS_TOP_RUNNER(pTask->flags)) {
×
1040
        STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "topRunner", extraSize);
×
1041
        return TSDB_CODE_SUCCESS;
×
1042
      }
1043
      break;
×
1044
    default:
×
1045
      break;
×
1046
  }
1047

1048
  STR_WITH_MAXSIZE_TO_VARSTR(extraStr, "", extraSize);
×
1049
  return TSDB_CODE_SUCCESS;
×
1050
}
1051

1052

1053
int32_t mstSetStreamTaskResBlock(SStreamObj* pStream, SStmTaskStatus* pTask, SSDataBlock* pBlock, int32_t numOfRows) {
×
1054
  int32_t code = 0;
×
1055
  int32_t cols = 0;
×
1056
  int32_t lino = 0;
×
1057

1058
  // stream_name
1059
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1060
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1061
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1062
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1063

1064
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1065
  TSDB_CHECK_CODE(code, lino, _end);
×
1066

1067
  // stream id
1068
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1069
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1070
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1071
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1072
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1073
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1074
  TSDB_CHECK_CODE(code, lino, _end);
×
1075

1076
  // task id
1077
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.taskId);
×
1078
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1079
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1080
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1081
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1082
  TSDB_CHECK_CODE(code, lino, _end);
×
1083

1084
  // type
1085
  char type[20 + VARSTR_HEADER_SIZE] = {0};
×
1086
  STR_WITH_MAXSIZE_TO_VARSTR(type, (STREAM_READER_TASK == pTask->type) ? "Reader" : ((STREAM_TRIGGER_TASK == pTask->type) ? "Trigger" : "Runner"), sizeof(type));
×
1087
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1088
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1089
  code = colDataSetVal(pColInfo, numOfRows, (const char*)type, false);
×
1090
  TSDB_CHECK_CODE(code, lino, _end);
×
1091

1092
  // serious id
1093
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pTask->id.seriousId);
×
1094
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1095
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1096
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1097
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1098
  TSDB_CHECK_CODE(code, lino, _end);
×
1099

1100
  // deploy id
1101
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1102
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1103
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.deployId, false);
×
1104
  TSDB_CHECK_CODE(code, lino, _end);
×
1105

1106
  // node_type
1107
  char nodeType[10 + VARSTR_HEADER_SIZE] = {0};
×
1108
  STR_WITH_MAXSIZE_TO_VARSTR(nodeType, (STREAM_READER_TASK == pTask->type) ? "vnode" : "snode", sizeof(nodeType));
×
1109
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1110
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1111
  code = colDataSetVal(pColInfo, numOfRows, (const char*)nodeType, false);
×
1112
  TSDB_CHECK_CODE(code, lino, _end);
×
1113

1114
  // node id
1115
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1116
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1117
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.nodeId, false);
×
1118
  TSDB_CHECK_CODE(code, lino, _end);
×
1119

1120
  // task idx
1121
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1122
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1123
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->id.taskIdx, false);
×
1124
  TSDB_CHECK_CODE(code, lino, _end);
×
1125

1126
  // status
1127
  char   status[20 + VARSTR_HEADER_SIZE] = {0};
×
1128
  char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
×
1129
  code = mstGetTaskStatusStr(pTask, status, sizeof(status), msg, sizeof(msg));
×
1130
  TSDB_CHECK_CODE(code, lino, _end);
×
1131

1132
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1133
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1134
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&status, false);
×
1135
  TSDB_CHECK_CODE(code, lino, _end);
×
1136

1137
  // start time
1138
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1139
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1140
  if (pTask->runningStartTs) {
×
1141
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, false);
×
1142
  } else {
1143
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->runningStartTs, true);
×
1144
  }
1145
  TSDB_CHECK_CODE(code, lino, _end);
×
1146

1147
  // last update
1148
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1149
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1150
  if (pTask->lastUpTs) {
×
1151
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, false);
×
1152
  } else {
1153
    code = colDataSetVal(pColInfo, numOfRows, (const char*)&pTask->lastUpTs, true);
×
1154
  }
1155
  TSDB_CHECK_CODE(code, lino, _end);
×
1156

1157
  // extra info
1158
  char extra[64 + VARSTR_HEADER_SIZE] = {0};
×
1159
  code = mstGetTaskExtraStr(pTask, extra, sizeof(extra));
×
1160
  TSDB_CHECK_CODE(code, lino, _end);
×
1161
  
1162
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1163
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1164
  code = colDataSetVal(pColInfo, numOfRows, (const char*)extra, false);
×
1165
  TSDB_CHECK_CODE(code, lino, _end);
×
1166

1167
  // msg
1168
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1169
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1170
  code = colDataSetVal(pColInfo, numOfRows, (const char*)msg, false);
×
1171

1172
_end:
×
1173
  if (code) {
×
1174
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1175
  }
1176
  return code;
×
1177
}
1178

1179
int32_t mstGetNumOfStreamTasks(SStmStatus* pStatus) {
×
1180
  int32_t num = taosArrayGetSize(pStatus->trigReaders) + taosArrayGetSize(pStatus->trigOReaders) + taosArrayGetSize(pStatus->calcReaders) + (pStatus->triggerTask ? 1 : 0);
×
1181
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
×
1182
    num += taosArrayGetSize(pStatus->runners[i]);
×
1183
  }
1184

1185
  return num;
×
1186
}
1187

1188
int32_t mstSetStreamTasksResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
×
1189
  int32_t code = 0;
×
1190
  int32_t lino = 0;
×
1191
  int64_t streamId = pStream->pCreate->streamId;
×
1192

1193
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
1194

1195
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
1196
  if (NULL == pStatus) {
×
1197
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1198
    goto _exit;
×
1199
  }
1200

1201
  int8_t stopped = atomic_load_8(&pStatus->stopped);
×
1202
  if (stopped) {
×
1203
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1204
    goto _exit;
×
1205
  }
1206
  
1207
  int32_t count = mstGetNumOfStreamTasks(pStatus);
×
1208

1209
  if (*numOfRows + count > rowsCapacity) {
×
1210
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1211
    if (code) {
×
1212
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1213
      TAOS_CHECK_EXIT(code);
×
1214
    }
1215
  }
1216

1217
  SStmTaskStatus* pTask = NULL;
×
1218
  int32_t trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
×
1219
  for (int32_t i = 0; i < trigReaderNum; ++i) {
×
1220
    pTask = taosArrayGet(pStatus->trigReaders, i);
×
1221
  
1222
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
×
1223
    if (code == TSDB_CODE_SUCCESS) {
×
1224
      (*numOfRows)++;
×
1225
    }
1226
  }
1227

1228
  trigReaderNum = taosArrayGetSize(pStatus->trigOReaders);
×
1229
  for (int32_t i = 0; i < trigReaderNum; ++i) {
×
1230
    pTask = taosArrayGet(pStatus->trigOReaders, i);
×
1231
  
1232
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
×
1233
    if (code == TSDB_CODE_SUCCESS) {
×
1234
      (*numOfRows)++;
×
1235
    }
1236
  }
1237

1238

1239
  int32_t calcReaderNum = taosArrayGetSize(pStatus->calcReaders);
×
1240
  for (int32_t i = 0; i < calcReaderNum; ++i) {
×
1241
    pTask = taosArrayGet(pStatus->calcReaders, i);
×
1242
  
1243
    code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
×
1244
    if (code == TSDB_CODE_SUCCESS) {
×
1245
      (*numOfRows)++;
×
1246
    }
1247
  }
1248

1249
  if (pStatus->triggerTask) {
×
1250
    code = mstSetStreamTaskResBlock(pStream, pStatus->triggerTask, pBlock, *numOfRows);
×
1251
    if (code == TSDB_CODE_SUCCESS) {
×
1252
      (*numOfRows)++;
×
1253
    }
1254
  }
1255

1256
  int32_t runnerNum = 0;
×
1257
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
×
1258
    runnerNum = taosArrayGetSize(pStatus->runners[i]);
×
1259
    for (int32_t m = 0; m < runnerNum; ++m) {
×
1260
      pTask = taosArrayGet(pStatus->runners[i], m);
×
1261
    
1262
      code = mstSetStreamTaskResBlock(pStream, pTask, pBlock, *numOfRows);
×
1263
      if (code == TSDB_CODE_SUCCESS) {
×
1264
        (*numOfRows)++;
×
1265
      }
1266
    }
1267
  }
1268
  
1269
  pBlock->info.rows = *numOfRows;
×
1270

1271
_exit:
×
1272
  
1273
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
1274

1275
  if (code) {
×
1276
    mError("error happens when build stream tasks result block, lino:%d, code:%s", lino, tstrerror(code));
×
1277
  }
1278
  
1279
  return code;
×
1280
}
1281

1282

1283
int32_t mstAppendNewRecalcRange(int64_t streamId, SStmStatus *pStream, STimeWindow* pRange) {
×
1284
  int32_t code = 0;
×
1285
  int32_t lino = 0;
×
1286
  bool    locked = false;
×
1287

1288
  SStreamRecalcReq req = {.recalcId = 0, .start = pRange->skey, .end = pRange->ekey};
×
1289
  TAOS_CHECK_EXIT(taosGetSystemUUIDU64(&req.recalcId));
×
1290
  
1291
  taosWLockLatch(&pStream->userRecalcLock);
×
1292
  locked = true;
×
1293
  
1294
  if (NULL == pStream->userRecalcList) {
×
1295
    SArray* userRecalcList = taosArrayInit(2, sizeof(SStreamRecalcReq));
×
1296
    if (NULL == userRecalcList) {
×
1297
      TAOS_CHECK_EXIT(terrno);
×
1298
    }
1299

1300
    TSDB_CHECK_NULL(taosArrayPush(userRecalcList, &req), code, lino, _exit, terrno);
×
1301

1302
    atomic_store_ptr(&pStream->userRecalcList, userRecalcList);
×
1303
  } else {
1304
    TSDB_CHECK_NULL(taosArrayPush(pStream->userRecalcList, &req), code, lino, _exit, terrno);
×
1305
  }
1306
  
1307
  mstsInfo("stream recalc ID:%" PRIx64 " range:%" PRId64 " - %" PRId64 " added", req.recalcId, pRange->skey, pRange->ekey);
×
1308

1309
_exit:
×
1310

1311
  if (locked) {
×
1312
    taosWUnLockLatch(&pStream->userRecalcLock);
×
1313
  }
1314
  
1315
  if (code) {
×
1316
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1317
  }
1318
  
1319
  return code;
×
1320
}
1321

1322

1323

1324
int32_t mstSetStreamRecalculateResBlock(SStreamObj* pStream, SSTriggerRecalcProgress* pProgress, SSDataBlock* pBlock, int32_t numOfRows) {
×
1325
  int32_t code = 0;
×
1326
  int32_t cols = 0;
×
1327
  int32_t lino = 0;
×
1328

1329
  // stream_name
1330
  char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
×
1331
  STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetStableStr(pStream->name), sizeof(streamName));
×
1332
  SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1333
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1334

1335
  code = colDataSetVal(pColInfo, numOfRows, (const char*)streamName, false);
×
1336
  TSDB_CHECK_CODE(code, lino, _end);
×
1337

1338
  // stream id
1339
  char idstr[19 + VARSTR_HEADER_SIZE] = {0};
×
1340
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pStream->pCreate->streamId);
×
1341
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE); 
×
1342
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1343
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1344
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1345
  TSDB_CHECK_CODE(code, lino, _end);
×
1346

1347
  // recalc id
1348
  snprintf(&idstr[VARSTR_HEADER_SIZE], sizeof(idstr) - VARSTR_HEADER_SIZE, "%" PRIx64, pProgress->recalcId);
×
1349
  varDataSetLen(idstr, strlen(&idstr[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1350
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1351
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1352
  code = colDataSetVal(pColInfo, numOfRows, (const char*)idstr, false);
×
1353
  TSDB_CHECK_CODE(code, lino, _end);
×
1354

1355
  // start
1356
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1357
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1358
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->start, false);
×
1359
  TSDB_CHECK_CODE(code, lino, _end);
×
1360

1361
  // end
1362
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1363
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1364
  code = colDataSetVal(pColInfo, numOfRows, (const char*)&pProgress->end, false);
×
1365
  TSDB_CHECK_CODE(code, lino, _end);
×
1366

1367
  // progress
1368
  char progress[20 + VARSTR_HEADER_SIZE] = {0};
×
1369
  snprintf(&progress[VARSTR_HEADER_SIZE], sizeof(progress) - VARSTR_HEADER_SIZE, "%d%%", pProgress->progress);
×
1370
  varDataSetLen(progress, strlen(&progress[VARSTR_HEADER_SIZE]) + VARSTR_HEADER_SIZE);
×
1371
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1372
  TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
×
1373
  code = colDataSetVal(pColInfo, numOfRows, (const char*)progress, false);
×
1374
  TSDB_CHECK_CODE(code, lino, _end);
×
1375

1376
_end:
×
1377
  if (code) {
×
1378
    mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
×
1379
  }
1380
  return code;
×
1381
}
1382

1383

1384
int32_t mstSetStreamRecalculatesResBlock(SStreamObj* pStream, SSDataBlock* pBlock, int32_t* numOfRows, int32_t rowsCapacity) {
×
1385
  int32_t code = 0;
×
1386
  int32_t lino = 0;
×
1387
  int64_t streamId = pStream->pCreate->streamId;
×
1388

1389
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
1390

1391
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
1392
  if (NULL == pStatus) {
×
1393
    mstsDebug("stream not in streamMap, ignore it, dropped:%d, stopped:%d", atomic_load_8(&pStream->userDropped), atomic_load_8(&pStream->userStopped));
×
1394
    goto _exit;
×
1395
  }
1396

1397
  int8_t stopped = atomic_load_8(&pStatus->stopped);
×
1398
  if (stopped) {
×
1399
    mstsDebug("stream stopped %d, ignore it", stopped);
×
1400
    goto _exit;
×
1401
  }
1402

1403
  if (NULL == pStatus->triggerTask) {
×
1404
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1405
    goto _exit;
×
1406
  }
1407

1408
  mstWaitLock(&pStatus->triggerTask->detailStatusLock, true);
×
1409
  if (NULL == pStatus->triggerTask->detailStatus) {
×
1410
    mstsDebug("no trigger task now, deployTimes:%" PRId64 ", ignore it", pStatus->deployTimes);
×
1411
    taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1412
    goto _exit;
×
1413
  }
1414

1415
  SSTriggerRuntimeStatus* pTrigger = (SSTriggerRuntimeStatus*)pStatus->triggerTask->detailStatus;
×
1416
  int32_t count = taosArrayGetSize(pTrigger->userRecalcs);
×
1417

1418
  if (*numOfRows + count > rowsCapacity) {
×
1419
    code = blockDataEnsureCapacity(pBlock, *numOfRows + count);
×
1420
    if (code) {
×
1421
      mstError("failed to prepare the result block buffer, rows:%d", *numOfRows + count);
×
1422
      taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1423
      TAOS_CHECK_EXIT(code);
×
1424
    }
1425
  }
1426

1427
  for (int32_t i = 0; i < count; ++i) {
×
1428
    SSTriggerRecalcProgress* pProgress = taosArrayGet(pTrigger->userRecalcs, i);
×
1429
  
1430
    code = mstSetStreamRecalculateResBlock(pStream, pProgress, pBlock, *numOfRows);
×
1431
    if (code == TSDB_CODE_SUCCESS) {
×
1432
      (*numOfRows)++;
×
1433
    }
1434
  }
1435

1436
  taosRUnLockLatch(&pStatus->triggerTask->detailStatusLock);
×
1437
  
1438
  pBlock->info.rows = *numOfRows;
×
1439

1440
_exit:
×
1441
  
1442
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
1443

1444
  if (code) {
×
1445
    mError("error happens when build stream recalculates result block, lino:%d, code:%s", lino, tstrerror(code));
×
1446
  }
1447
  
1448
  return code;
×
1449
}
1450

1451

1452

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc