• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4791

13 Oct 2025 06:50AM UTC coverage: 57.628% (-0.8%) from 58.476%
#4791

push

travis-ci

web-flow
Merge pull request #33213 from taosdata/fix/huoh/timemoe_model_directory

fix: fix tdgpt timemoe model directory

136628 of 303332 branches covered (45.04%)

Branch coverage included in aggregate %.

208121 of 294900 relevant lines covered (70.57%)

4250784.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.72
/source/dnode/mnode/impl/src/mndStreamMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndStb.h"
21
#include "mndTrans.h"
22
#include "osMemory.h"
23
#include "parser.h"
24
#include "taoserror.h"
25
#include "tmisce.h"
26
#include "tname.h"
27
#include "mndDnode.h"
28
#include "mndVgroup.h"
29
#include "mndSnode.h"
30
#include "mndMnode.h"
31
#include "cmdnodes.h"
32

33
void msmDestroyActionQ() {
2,236✔
34
  SStmQNode* pQNode = NULL;
2,236✔
35

36
  if (NULL == mStreamMgmt.actionQ) {
2,236✔
37
    return;
1,118✔
38
  }
39

40
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
1,123✔
41
  }
42

43
  taosMemoryFreeClear(mStreamMgmt.actionQ->head);
1,118!
44
  taosMemoryFreeClear(mStreamMgmt.actionQ);
1,118!
45
}
46

47
void msmDestroySStmThreadCtx(SStmThreadCtx* pCtx) {
5,517✔
48
  for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
33,102✔
49
    taosHashCleanup(pCtx->deployStm[m]);
27,585✔
50
    taosHashCleanup(pCtx->actionStm[m]);
27,585✔
51
  }
52
}
5,517✔
53

54
void msmDestroyThreadCtxs() {
2,236✔
55
  if (NULL == mStreamMgmt.tCtx) {
2,236✔
56
    return;
1,118✔
57
  }
58
  
59
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
6,635✔
60
    msmDestroySStmThreadCtx(mStreamMgmt.tCtx + i);
5,517✔
61
  }
62
  taosMemoryFreeClear(mStreamMgmt.tCtx);
1,118!
63
}
64

65

66
void msmDestroyRuntimeInfo(SMnode *pMnode) {
2,236✔
67
  msmDestroyActionQ();
2,236✔
68
  msmDestroyThreadCtxs();
2,236✔
69

70
  taosHashCleanup(mStreamMgmt.toUpdateScanMap);
2,236✔
71
  mStreamMgmt.toUpdateScanMap = NULL;
2,236✔
72
  mStreamMgmt.toUpdateScanNum = 0;
2,236✔
73
  taosHashCleanup(mStreamMgmt.toDeployVgMap);
2,236✔
74
  mStreamMgmt.toDeployVgMap = NULL;
2,236✔
75
  mStreamMgmt.toDeployVgTaskNum = 0;
2,236✔
76
  taosHashCleanup(mStreamMgmt.toDeploySnodeMap);
2,236✔
77
  mStreamMgmt.toDeploySnodeMap = NULL;
2,236✔
78
  mStreamMgmt.toDeploySnodeTaskNum = 0;
2,236✔
79

80
  taosHashCleanup(mStreamMgmt.dnodeMap);
2,236✔
81
  mStreamMgmt.dnodeMap = NULL;
2,236✔
82
  taosHashCleanup(mStreamMgmt.snodeMap);
2,236✔
83
  mStreamMgmt.snodeMap = NULL;
2,236✔
84
  taosHashCleanup(mStreamMgmt.vgroupMap);
2,236✔
85
  mStreamMgmt.vgroupMap = NULL;
2,236✔
86
  taosHashCleanup(mStreamMgmt.taskMap);
2,236✔
87
  mStreamMgmt.taskMap = NULL;
2,236✔
88
  taosHashCleanup(mStreamMgmt.streamMap);
2,236✔
89
  mStreamMgmt.streamMap = NULL;
2,236✔
90

91
  memset(mStreamMgmt.lastTs, 0, sizeof(mStreamMgmt.lastTs));
2,236✔
92

93
  mstInfo("mnode stream mgmt destroyed");  
2,236!
94
}
2,236✔
95

96

97
void msmStopStreamByError(int64_t streamId, SStmStatus* pStatus, int32_t errCode, int64_t currTs) {
10✔
98
  int32_t code = TSDB_CODE_SUCCESS;
10✔
99
  int32_t lino = 0;
10✔
100
  SStmStatus* pStream = NULL;
10✔
101

102
  mstsInfo("try to stop stream for error: %s", tstrerror(errCode));
10!
103

104
  if (NULL == pStatus) {
10!
105
    pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
10✔
106
    if (NULL == pStream) {
10!
107
      mstsInfo("stream already not in streamMap, error:%s", tstrerror(terrno));
×
108
      goto _exit;
×
109
    }
110

111
    pStatus = pStream;
10✔
112
  }
113

114
  int8_t stopped = atomic_load_8(&pStatus->stopped);
10✔
115
  if (stopped) {
10!
116
    mstsDebug("stream already stopped %d, ignore stop", stopped);
×
117
    goto _exit;
×
118
  }
119

120
  if (pStatus->triggerTask && pStatus->triggerTask->runningStartTs && (currTs - pStatus->triggerTask->runningStartTs) > 2 * MST_ISOLATION_DURATION) {
10!
121
    pStatus->fatalRetryTimes = 0;
7✔
122
    mstsDebug("reset stream retryTimes, running duation:%" PRId64 "ms", currTs - pStatus->triggerTask->runningStartTs);
7✔
123
  }
124

125
  pStatus->fatalRetryTimes++;
10✔
126
  pStatus->fatalError = errCode;
10✔
127
  pStatus->fatalRetryDuration = (pStatus->fatalRetryTimes > 10) ? MST_MAX_RETRY_DURATION : MST_ISOLATION_DURATION;
10!
128
  pStatus->fatalRetryTs = currTs + pStatus->fatalRetryDuration;
10✔
129

130
  pStatus->stat.lastError = errCode;
10✔
131
    
132
  if (0 == atomic_val_compare_exchange_8(&pStatus->stopped, 0, 1)) {
10!
133
    MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, currTs);
10✔
134
  }
135

136
_exit:
5✔
137

138
  taosHashRelease(mStreamMgmt.streamMap, pStream);
10✔
139

140
  if (code) {
10!
141
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
142
  }
143
}
10✔
144

145

146
static void msmSetInitRuntimeState(int8_t state) {
1,118✔
147
  switch (state) {
1,118!
148
    case MND_STM_STATE_WATCH:
×
149
      mStreamMgmt.watch.ending = 0;
×
150
      mStreamMgmt.watch.taskRemains = 0;
×
151
      mStreamMgmt.watch.processing = 0;
×
152
      mstInfo("switch to WATCH state");
×
153
      break;
×
154
    case MND_STM_STATE_NORMAL:
1,118✔
155
      MND_STREAM_SET_LAST_TS(STM_EVENT_NORMAL_BEGIN, taosGetTimestampMs());
2,210✔
156
      mstInfo("switch to NORMAL state");
1,118!
157
      break;
1,118✔
158
    default:
×
159
      return;
×
160
  }
161
  
162
  atomic_store_8(&mStreamMgmt.state, state);
1,118✔
163
}
164

165
void msmSTDeleteSnodeFromMap(int32_t snodeId) {
×
166
  int32_t code = taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
×
167
  if (code) {
×
168
    mstWarn("remove snode %d from snodeMap failed, error:%s", snodeId, tstrerror(code));
×
169
  } else {
170
    mstInfo("snode %d removed from snodeMap", snodeId);
×
171
  }
172
}
×
173

174
static int32_t msmSTAddSnodesToMap(SMnode* pMnode) {
1,183✔
175
  int32_t code = TSDB_CODE_SUCCESS;
1,183✔
176
  int32_t lino = 0;
1,183✔
177
  SStmSnodeStatus tasks = {0};
1,183✔
178
  SSnodeObj *pSnode = NULL;
1,183✔
179
  void *pIter = NULL;
1,183✔
180
  while (1) {
181
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pSnode);
1,285✔
182
    if (pIter == NULL) {
1,285✔
183
      break;
1,183✔
184
    }
185

186
    tasks.lastUpTs = taosGetTimestampMs();
102✔
187
    code = taosHashPut(mStreamMgmt.snodeMap, &pSnode->id, sizeof(pSnode->id), &tasks, sizeof(tasks));
102✔
188
    if (code && TSDB_CODE_DUP_KEY != code) {
102!
189
      sdbRelease(pMnode->pSdb, pSnode);
×
190
      sdbCancelFetch(pMnode->pSdb, pIter);
×
191
      pSnode = NULL;
×
192
      TAOS_CHECK_EXIT(code);
×
193
    }
194

195
    code = TSDB_CODE_SUCCESS;
102✔
196
  
197
    sdbRelease(pMnode->pSdb, pSnode);
102✔
198
  }
199

200
  pSnode = NULL;
1,183✔
201

202
_exit:
1,183✔
203

204
  if (code) {
1,183!
205
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
206
  }
207

208
  return code;
1,183✔
209
}
210

211
static int32_t msmSTAddDnodesToMap(SMnode* pMnode) {
2,329✔
212
  int32_t code = TSDB_CODE_SUCCESS;
2,329✔
213
  int32_t lino = 0;
2,329✔
214
  int64_t lastUpTs = INT64_MIN;
2,329✔
215
  SDnodeObj *pDnode = NULL;
2,329✔
216
  void *pIter = NULL;
2,329✔
217
  while (1) {
218
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
5,596✔
219
    if (pIter == NULL) {
5,596✔
220
      break;
2,329✔
221
    }
222

223
    code = taosHashPut(mStreamMgmt.dnodeMap, &pDnode->id, sizeof(pDnode->id), &lastUpTs, sizeof(lastUpTs));
3,267✔
224
    if (code && TSDB_CODE_DUP_KEY != code) {
3,267!
225
      sdbRelease(pMnode->pSdb, pDnode);
×
226
      sdbCancelFetch(pMnode->pSdb, pIter);
×
227
      pDnode = NULL;
×
228
      TAOS_CHECK_EXIT(code);
×
229
    }
230

231
    code = TSDB_CODE_SUCCESS;
3,267✔
232
    sdbRelease(pMnode->pSdb, pDnode);
3,267✔
233
  }
234

235
  pDnode = NULL;
2,329✔
236

237
_exit:
2,329✔
238

239
  if (code) {
2,329!
240
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
241
  }
242

243
  return code;
2,329✔
244
}
245

246

247

248
static int32_t msmSTAddToTaskMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SList* pList, SStmTaskStatus* pTask) {
2,084✔
249
  int32_t code = TSDB_CODE_SUCCESS;
2,084✔
250
  int32_t lino = 0;
2,084✔
251
  int32_t taskNum = pTask ? 1 : (pList ? MST_LIST_SIZE(pList) : taosArrayGetSize(pTasks));
2,084!
252
  int64_t key[2] = {streamId, 0};
2,084✔
253
  SListNode* pNode = pList ? listHead(pList) : NULL;
2,084✔
254
  
255
  for (int32_t i = 0; i < taskNum; ++i) {
4,261✔
256
    SStmTaskStatus* pStatus = pTask ? pTask : (pList ? (SStmTaskStatus*)pNode->data : taosArrayGet(pTasks, i));
2,177✔
257
    key[1] = pStatus->id.taskId;
2,177✔
258
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.taskMap, key, sizeof(key), &pStatus, POINTER_BYTES));
2,177!
259
    mstsDebug("task %" PRIx64" tidx %d added to taskMap", pStatus->id.taskId, pStatus->id.taskIdx);
2,177✔
260
    if (pNode) {
2,177✔
261
      pNode = TD_DLIST_NODE_NEXT(pNode);
272✔
262
    }
263
  }
264
  
265
_exit:
2,084✔
266

267
  if (code) {
2,084!
268
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
269
  }
270
  
271
  return code;
2,084✔
272
}
273

274
static int32_t msmSTAddToVgStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
750✔
275
  int32_t code = TSDB_CODE_SUCCESS;
750✔
276
  int32_t lino = 0;
750✔
277
  SStmVgStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
750✔
278
  if (NULL == pStream) {
750✔
279
    SStmVgStreamStatus stream = {0};
600✔
280
    if (trigReader) {
600✔
281
      stream.trigReaders = taosArrayInit(1, POINTER_BYTES);
463✔
282
      TSDB_CHECK_NULL(stream.trigReaders, code, lino, _exit, terrno);
463!
283
      TSDB_CHECK_NULL(taosArrayPush(stream.trigReaders, &pStatus), code, lino, _exit, terrno);
926!
284
    } else {
285
      stream.calcReaders = taosArrayInit(2, POINTER_BYTES);
137✔
286
      TSDB_CHECK_NULL(stream.calcReaders, code, lino, _exit, terrno);
137!
287
      TSDB_CHECK_NULL(taosArrayPush(stream.calcReaders, &pStatus), code, lino, _exit, terrno);
274!
288
    }
289
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
600!
290
    goto _exit;
600✔
291
  }
292
  
293
  if (trigReader) {
150✔
294
    if (NULL == pStream->trigReaders) {
4!
295
      pStream->trigReaders = taosArrayInit(1, POINTER_BYTES);
4✔
296
      TSDB_CHECK_NULL(pStream->trigReaders, code, lino, _exit, terrno);
4!
297
    }
298
    
299
    TSDB_CHECK_NULL(taosArrayPush(pStream->trigReaders, &pStatus), code, lino, _exit, terrno);
8!
300
    goto _exit;
4✔
301
  }
302
  
303
  if (NULL == pStream->calcReaders) {
146✔
304
    pStream->calcReaders = taosArrayInit(1, POINTER_BYTES);
120✔
305
    TSDB_CHECK_NULL(pStream->calcReaders, code, lino, _exit, terrno);
120!
306
  }
307

308
  TSDB_CHECK_NULL(taosArrayPush(pStream->calcReaders, &pStatus), code, lino, _exit, terrno);
292!
309

310
_exit:
146✔
311

312
  if (code) {
750!
313
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to vgroup %d streamHash in %s at line %d, error:%s", 
×
314
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, __FUNCTION__, lino, tstrerror(code));
315
  } else {
316
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to vgroup %d streamHash", 
750✔
317
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId);
318
  }
319

320
  return code;
750✔
321
}
322

323
static int32_t msmSTAddToVgroupMapImpl(int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
750✔
324
  int32_t code = TSDB_CODE_SUCCESS;
750✔
325
  int32_t lino = 0;
750✔
326
  SStmVgroupStatus vg = {0};
750✔
327

328
  SStmVgroupStatus* pVg = taosHashGet(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
750✔
329
  if (NULL == pVg) {
750✔
330
    vg.streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
207✔
331
    TSDB_CHECK_NULL(vg.streamTasks, code, lino, _exit, terrno);
207!
332
    taosHashSetFreeFp(vg.streamTasks, mstDestroySStmVgStreamStatus);
207✔
333

334
    vg.lastUpTs = taosGetTimestampMs();
207✔
335
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(vg.streamTasks, streamId, pStatus, trigReader));
207!
336
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId), &vg, sizeof(vg)));
207!
337
  } else {
338
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(pVg->streamTasks, streamId, pStatus, trigReader));
543!
339
  }
340
  
341
_exit:
543✔
342

343
  if (code) {
750!
344
    mstDestroyVgroupStatus(&vg);
×
345
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
346
  } else {
347
    mstsDebug("task %" PRIx64 " tidx %d added to vgroupMap %d", pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
750✔
348
  }
349

350
  return code;
750✔
351
}
352

353
static int32_t msmTDAddToVgroupMap(SHashObj* pVgMap, SStmTaskDeploy* pDeploy, int64_t streamId) {
758✔
354
  int32_t code = TSDB_CODE_SUCCESS;
758✔
355
  int32_t lino = 0;
758✔
356
  SStmVgTasksToDeploy vg = {0};
758✔
357
  SStreamTask* pTask = &pDeploy->task;
758✔
358
  SStmTaskToDeployExt ext = {0};
758✔
359
  ext.deploy = *pDeploy;
758✔
360

361
  while (true) {
×
362
    SStmVgTasksToDeploy* pVg = taosHashAcquire(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
758✔
363
    if (NULL == pVg) {
758✔
364
      vg.taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
277✔
365
      TSDB_CHECK_NULL(vg.taskList, code, lino, _return, terrno);
277!
366
      TSDB_CHECK_NULL(taosArrayPush(vg.taskList, &ext), code, lino, _return, terrno);
554!
367
      code = taosHashPut(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &vg, sizeof(SStmVgTasksToDeploy));
277✔
368
      if (TSDB_CODE_SUCCESS == code) {
277!
369
        goto _return;
277✔
370
      }
371

372
      if (TSDB_CODE_DUP_KEY != code) {
×
373
        goto _return;
×
374
      }    
375

376
      taosArrayDestroy(vg.taskList);
×
377
      continue;
×
378
    }
379

380
    taosWLockLatch(&pVg->lock);
481✔
381
    if (NULL == pVg->taskList) {
481!
382
      pVg->taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
×
383
      TSDB_CHECK_NULL(pVg->taskList, code, lino, _return, terrno);
×
384
    }
385
    if (NULL == taosArrayPush(pVg->taskList, &ext)) {
962!
386
      taosWUnLockLatch(&pVg->lock);
×
387
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
388
    }
389
    taosWUnLockLatch(&pVg->lock);
481✔
390
    
391
    taosHashRelease(pVgMap, pVg);
481✔
392
    break;
481✔
393
  }
394
  
395
_return:
758✔
396

397
  if (code) {
758!
398
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
399
  } else {
400
    int32_t num = atomic_add_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
758✔
401
    msttDebug("task added to toDeployVgTaskNum, vgToDeployTaskNum:%d", num);
758✔
402
  }
403

404
  return code;
758✔
405
}
406

407

408
static int32_t msmSTAddToSnodeStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
1,429✔
409
  int32_t code = TSDB_CODE_SUCCESS;
1,429✔
410
  int32_t lino = 0;
1,429✔
411
  SStmSnodeStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
1,429✔
412
  if (NULL == pStream) {
1,429✔
413
    SStmSnodeStreamStatus stream = {0};
358✔
414
    if (deployId < 0) {
358✔
415
      stream.trigger = pStatus;
1✔
416
    } else {
417
      stream.runners[deployId] = taosArrayInit(2, POINTER_BYTES);
357✔
418
      TSDB_CHECK_NULL(stream.runners[deployId], code, lino, _exit, terrno);
357!
419
      TSDB_CHECK_NULL(taosArrayPush(stream.runners[deployId], &pStatus), code, lino, _exit, terrno);
714!
420
    }
421
    
422
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
358!
423
    goto _exit;
358✔
424
  }
425
  
426
  if (deployId < 0) {
1,071✔
427
    if (NULL != pStream->trigger) {
340!
428
      mstsWarn("stream already got trigger task %" PRIx64 " SID:%" PRIx64 " in snode %d, replace it with task %" PRIx64 " SID:%" PRIx64, 
×
429
          pStream->trigger->id.taskId, pStream->trigger->id.seriousId, pStatus->id.nodeId, pStatus->id.taskId, pStatus->id.seriousId);
430
    }
431
    
432
    pStream->trigger = pStatus;
340✔
433
    goto _exit;
340✔
434
  }
435
  
436
  if (NULL == pStream->runners[deployId]) {
731✔
437
    pStream->runners[deployId] = taosArrayInit(2, POINTER_BYTES);
665✔
438
    TSDB_CHECK_NULL(pStream->runners[deployId], code, lino, _exit, terrno);
665!
439
  }
440

441
  TSDB_CHECK_NULL(taosArrayPush(pStream->runners[deployId], &pStatus), code, lino, _exit, terrno);
1,462!
442

443
_exit:
731✔
444

445
  if (code) {
1,429!
446
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to snode %d streamHash deployId:%d in %s at line %d, error:%s", 
×
447
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId, __FUNCTION__, lino, tstrerror(code));
448
  } else {
449
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to snode %d streamHash deployId:%d", 
1,429✔
450
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId);
451
  }
452

453
  return code;
1,429✔
454
}
455

456

457
static int32_t msmSTAddToSnodeMapImpl(int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
1,429✔
458
  int32_t code = TSDB_CODE_SUCCESS;
1,429✔
459
  int32_t lino = 0;
1,429✔
460

461
  SStmSnodeStatus* pSnode = taosHashGet(mStreamMgmt.snodeMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
1,429✔
462
  if (NULL == pSnode) {
1,429!
463
    mstsWarn("snode %d not exists in snodeMap anymore, may be dropped", pStatus->id.nodeId);
×
464
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
465
  } else {
466
    if (NULL == pSnode->streamTasks) {
1,429✔
467
      pSnode->streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
55✔
468
      TSDB_CHECK_NULL(pSnode->streamTasks, code, lino, _exit, terrno);
55!
469
      taosHashSetFreeFp(pSnode->streamTasks, mstDestroySStmSnodeStreamStatus);
55✔
470
    }
471
    
472
    TAOS_CHECK_EXIT(msmSTAddToSnodeStreamHash(pSnode->streamTasks, streamId, pStatus, deployId));
1,429!
473
  }
474
  
475
_exit:
1,429✔
476

477
  if (code) {
1,429!
478
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
479
  } else {
480
    mstsDebug("%s task %" PRIx64 " tidx %d added to snodeMap, snodeId:%d", (deployId < 0) ? "trigger" : "runner", 
1,429✔
481
        pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
482
  }
483

484
  return code;
1,429✔
485
}
486

487

488

489
static int32_t msmTDAddTriggerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
341✔
490
  int32_t code = TSDB_CODE_SUCCESS;
341✔
491
  int32_t lino = 0;
341✔
492
  int64_t streamId = pStream->pCreate->streamId;
341✔
493
  SStmSnodeTasksDeploy snode = {0};
341✔
494
  SStmTaskToDeployExt ext;
495
  SStreamTask* pTask = &pDeploy->task;
341✔
496

497
  while (true) {
×
498
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
341✔
499
    if (NULL == pSnode) {
341✔
500
      snode.triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
1✔
501
      TSDB_CHECK_NULL(snode.triggerList, code, lino, _return, terrno);
1!
502

503
      ext.deploy = *pDeploy;
1✔
504
      ext.deployed = false;
1✔
505
      TSDB_CHECK_NULL(taosArrayPush(snode.triggerList, &ext), code, lino, _return, terrno);
2!
506

507
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
1✔
508
      if (TSDB_CODE_SUCCESS == code) {
1!
509
        goto _return;
1✔
510
      }
511

512
      if (TSDB_CODE_DUP_KEY != code) {
×
513
        goto _return;
×
514
      }    
515

516
      taosArrayDestroy(snode.triggerList);
×
517
      continue;
×
518
    }
519
    
520
    taosWLockLatch(&pSnode->lock);
340✔
521
    if (NULL == pSnode->triggerList) {
340✔
522
      pSnode->triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
90✔
523
      if (NULL == pSnode->triggerList) {
90!
524
        taosWUnLockLatch(&pSnode->lock);
×
525
        TSDB_CHECK_NULL(pSnode->triggerList, code, lino, _return, terrno);
×
526
      }
527
    }
528
    
529
    ext.deploy = *pDeploy;
340✔
530
    ext.deployed = false;
340✔
531
    
532
    if (NULL == taosArrayPush(pSnode->triggerList, &ext)) {
680!
533
      taosWUnLockLatch(&pSnode->lock);
×
534
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
535
    }
536
    taosWUnLockLatch(&pSnode->lock);
340✔
537
    
538
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
340✔
539
    break;
340✔
540
  }
541
  
542
_return:
341✔
543

544
  if (code) {
341!
545
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
546
  } else {
547
    msttDebug("trigger task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
341✔
548
  }
549

550
  return code;
341✔
551
}
552

553
static int32_t msmTDAddRunnerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
1,088✔
554
  int32_t code = TSDB_CODE_SUCCESS;
1,088✔
555
  int32_t lino = 0;
1,088✔
556
  int64_t streamId = pStream->pCreate->streamId;
1,088✔
557
  SStmSnodeTasksDeploy snode = {0};
1,088✔
558
  SStmTaskToDeployExt ext;
559
  SStreamTask* pTask = &pDeploy->task;
1,088✔
560

561
  while (true) {
×
562
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
1,088✔
563
    if (NULL == pSnode) {
1,088✔
564
      snode.runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
94✔
565
      TSDB_CHECK_NULL(snode.runnerList, code, lino, _return, terrno);
94!
566

567
      ext.deploy = *pDeploy;
94✔
568
      ext.deployed = false;
94✔
569
      TSDB_CHECK_NULL(taosArrayPush(snode.runnerList, &ext), code, lino, _return, terrno);
188!
570

571
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
94✔
572
      if (TSDB_CODE_SUCCESS == code) {
94!
573
        goto _return;
94✔
574
      }
575

576
      if (TSDB_CODE_DUP_KEY != code) {
×
577
        goto _return;
×
578
      }    
579

580
      taosArrayDestroy(snode.runnerList);
×
581
      continue;
×
582
    }
583
    
584
    taosWLockLatch(&pSnode->lock);
994✔
585
    if (NULL == pSnode->runnerList) {
994✔
586
      pSnode->runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
1✔
587
      if (NULL == pSnode->runnerList) {
1!
588
        taosWUnLockLatch(&pSnode->lock);
×
589
        TSDB_CHECK_NULL(pSnode->runnerList, code, lino, _return, terrno);
×
590
      }
591
    }
592
    
593
    ext.deploy = *pDeploy;
994✔
594
    ext.deployed = false;
994✔
595
    
596
    if (NULL == taosArrayPush(pSnode->runnerList, &ext)) {
1,988!
597
      taosWUnLockLatch(&pSnode->lock);
×
598
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
599
    }
600
    taosWUnLockLatch(&pSnode->lock);
994✔
601
    
602
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
994✔
603
    break;
994✔
604
  }
605
  
606
_return:
1,088✔
607

608
  if (code) {
1,088!
609
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
610
  } else {
611
    msttDebug("task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
1,088✔
612
  }
613

614
  return code;
1,088✔
615
}
616

617
static int32_t msmTDAddRunnersToSnodeMap(SArray* runnerList, SStreamObj* pStream) {
1,022✔
618
  int32_t code = TSDB_CODE_SUCCESS;
1,022✔
619
  int32_t lino = 0;
1,022✔
620
  int32_t runnerNum = taosArrayGetSize(runnerList);
1,022✔
621
  SStmTaskDeploy* pDeploy = NULL;
1,022✔
622
  int64_t streamId = pStream->pCreate->streamId;
1,022✔
623

624
  for (int32_t i = 0; i < runnerNum; ++i) {
2,110✔
625
    pDeploy = taosArrayGet(runnerList, i);
1,088✔
626
    
627
    TAOS_CHECK_EXIT(msmTDAddRunnerToSnodeMap(pDeploy, pStream));
1,088!
628
    
629
    (void)atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);    
1,088✔
630
  }
631

632
_exit:
1,022✔
633

634
  if (code) {
1,022!
635
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
636
  }
637

638
  return code;
1,022✔
639
}
640

641

642
int32_t msmUpdateSnodeUpTs(SStmGrpCtx* pCtx) {
2,820✔
643
  int32_t  code = TSDB_CODE_SUCCESS;
2,820✔
644
  int32_t  lino = 0;
2,820✔
645
  SStmSnodeStatus* pStatus = NULL;
2,820✔
646
  bool     noExists = false;
2,820✔
647

648
  while (true) {
649
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &pCtx->pReq->snodeId, sizeof(pCtx->pReq->snodeId));
2,843✔
650
    if (NULL == pStatus) {
2,843✔
651
      if (noExists) {
23!
652
        mstWarn("snode %d not exists in snodeMap, may be dropped, ignore it", pCtx->pReq->snodeId);
×
653
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
×
654
      }
655

656
      noExists = true;
23✔
657
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pCtx->pMnode));
23!
658
      
659
      continue;
23✔
660
    }
661

662
    atomic_store_32(&pStatus->runnerThreadNum, pCtx->pReq->runnerThreadNum);
2,820✔
663
    
664
    while (true) {
×
665
      int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
2,820✔
666
      if (pCtx->currTs > lastTsValue) {
2,820✔
667
        if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
2,797!
668
          mstDebug("snode %d lastUpTs updated", pCtx->pReq->snodeId);
2,797✔
669
          return code;
2,797✔
670
        }
671

672
        continue;
×
673
      }
674

675
      return code;
23✔
676
    }
677

678
    break;
679
  }
680

681
_exit:
×
682

683
  if (code) {
×
684
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
685
  }
686

687
  return code;  
×
688
}
689

690
void msmUpdateVgroupUpTs(SStmGrpCtx* pCtx, int32_t vgId) {
74,605✔
691
  int32_t  code = TSDB_CODE_SUCCESS;
74,605✔
692
  int32_t  lino = 0;
74,605✔
693
  SStmVgroupStatus* pStatus = taosHashGet(mStreamMgmt.vgroupMap, &vgId, sizeof(vgId));
74,605✔
694
  if (NULL == pStatus) {
74,605✔
695
    mstDebug("vgroup %d not exists in vgroupMap, ignore update upTs", vgId);
63,452✔
696
    return;
63,452✔
697
  }
698

699
  while (true) {
×
700
    int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
11,153✔
701
    if (pCtx->currTs > lastTsValue) {
11,153✔
702
      if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
10,946!
703
        mstDebug("vgroup %d lastUpTs updated to %" PRId64, vgId, pCtx->currTs);
10,946✔
704
        return;
10,946✔
705
      }
706

707
      continue;
×
708
    }
709

710
    return;
207✔
711
  }  
712
}
713

714
int32_t msmUpdateVgroupsUpTs(SStmGrpCtx* pCtx) {
28,982✔
715
  int32_t code = TSDB_CODE_SUCCESS;
28,982✔
716
  int32_t lino = 0;
28,982✔
717
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
28,982✔
718

719
  mstDebug("start to update vgroups upTs");
28,982✔
720
  
721
  for (int32_t i = 0; i < vgNum; ++i) {
102,942✔
722
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
73,960✔
723

724
    msmUpdateVgroupUpTs(pCtx, *vgId);
73,960✔
725
  }
726

727
_exit:
28,982✔
728

729
  if (code) {
28,982!
730
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
731
  }
732

733
  return code;
28,982✔
734
}
735

736

737

738
void* msmSearchCalcCacheScanPlan(SArray* pList) {
544✔
739
  int32_t num = taosArrayGetSize(pList);
544✔
740
  for (int32_t i = 0; i < num; ++i) {
961✔
741
    SStreamCalcScan* pScan = taosArrayGet(pList, i);
743✔
742
    if (pScan->readFromCache) {
743✔
743
      return pScan->scanPlan;
326✔
744
    }
745
  }
746

747
  return NULL;
218✔
748
}
749

750
int32_t msmBuildReaderDeployInfo(SStmTaskDeploy* pDeploy, void* calcScanPlan, SStmStatus* pInfo, bool triggerReader) {
758✔
751
  SStreamReaderDeployMsg* pMsg = &pDeploy->msg.reader;
758✔
752
  pMsg->triggerReader = triggerReader;
758✔
753
  
754
  if (triggerReader) {
758✔
755
    SStreamReaderDeployFromTrigger* pTrigger = &pMsg->msg.trigger;
471✔
756
    pTrigger->triggerTblName = pInfo->pCreate->triggerTblName;
471✔
757
    pTrigger->triggerTblUid = pInfo->pCreate->triggerTblUid;
471✔
758
    pTrigger->triggerTblSuid = pInfo->pCreate->triggerTblSuid;
471✔
759
    pTrigger->triggerTblType = pInfo->pCreate->triggerTblType;
471✔
760
    pTrigger->deleteReCalc = pInfo->pCreate->deleteReCalc;
471✔
761
    pTrigger->deleteOutTbl = pInfo->pCreate->deleteOutTbl;
471✔
762
    pTrigger->partitionCols = pInfo->pCreate->partitionCols;
471✔
763
    pTrigger->triggerCols = pInfo->pCreate->triggerCols;
471✔
764
    //pTrigger->triggerPrevFilter = pStream->pCreate->triggerPrevFilter;
765
    pTrigger->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
471✔
766
    pTrigger->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
471✔
767
  } else {
768
    SStreamReaderDeployFromCalc* pCalc = &pMsg->msg.calc;
287✔
769
    pCalc->execReplica = pInfo->runnerDeploys * pInfo->runnerReplica;
287✔
770
    pCalc->calcScanPlan = calcScanPlan;
287✔
771
  }
772

773
  return TSDB_CODE_SUCCESS;
758✔
774
}
775

776
int32_t msmBuildTriggerRunnerTargets(SMnode* pMnode, SStmStatus* pInfo, int64_t streamId, SArray** ppRes) {
340✔
777
  int32_t code = TSDB_CODE_SUCCESS;
340✔
778
  int32_t lino = 0;
340✔
779

780
  if (pInfo->runnerDeploys > 0) {
340!
781
    *ppRes = taosArrayInit(pInfo->runnerDeploys, sizeof(SStreamRunnerTarget));
340✔
782
    TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
340!
783
  }
784
  
785
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,360✔
786
    SStmTaskStatus* pStatus = taosArrayGetLast(pInfo->runners[i]);
1,020✔
787
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
1,020!
788

789
    if (!STREAM_IS_TOP_RUNNER(pStatus->flags)) {
1,020!
790
      mstsError("the last runner task %" PRIx64 " SID:%" PRId64 " tidx:%d in deploy %d is not top runner", 
×
791
          pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.taskIdx, i);
792
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
793
    }
794
    
795
    SStreamRunnerTarget runner;
796
    runner.addr.taskId = pStatus->id.taskId;
1,020✔
797
    runner.addr.nodeId = pStatus->id.nodeId;
1,020✔
798
    runner.addr.epset = mndGetDnodeEpsetById(pMnode, pStatus->id.nodeId);
1,020✔
799
    runner.execReplica = pInfo->runnerReplica; 
1,020✔
800
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &runner), code, lino, _exit, terrno);
2,040!
801
    mstsDebug("the %dth runner target added to trigger's runnerList, TASK:%" PRIx64 , i, runner.addr.taskId);
1,020✔
802
  }
803

804
_exit:
340✔
805

806
  if (code) {
340!
807
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
808
  }
809
  
810
  return TSDB_CODE_SUCCESS;
340✔
811
}
812

813
int32_t msmBuildStreamSnodeInfo(SMnode* pMnode, SStreamObj* pStream, SStreamSnodeInfo* pInfo) {
×
814
  int64_t streamId = pStream->pCreate->streamId;
×
815
  int32_t leaderSnodeId = atomic_load_32(&pStream->mainSnodeId);
×
816
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, leaderSnodeId);
×
817
  if (NULL == pSnode) {
×
818
    mstsError("snode %d not longer exists, ignore build stream snode info", leaderSnodeId);
×
819
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
820
  }
821
  
822
  pInfo->leaderSnodeId = leaderSnodeId;
×
823
  pInfo->replicaSnodeId = pSnode->replicaId;
×
824

825
  mndReleaseSnode(pMnode, pSnode);
×
826

827
  pInfo->leaderEpSet = mndGetDnodeEpsetById(pMnode, pInfo->leaderSnodeId);
×
828
  if (GOT_SNODE(pInfo->replicaSnodeId)) {
×
829
    pInfo->replicaEpSet = mndGetDnodeEpsetById(pMnode, pInfo->replicaSnodeId);
×
830
  }
831

832
  return TSDB_CODE_SUCCESS;
×
833
}
834

835
int32_t msmBuildTriggerDeployInfo(SMnode* pMnode, SStmStatus* pInfo, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
341✔
836
  int32_t code = TSDB_CODE_SUCCESS;
341✔
837
  int32_t lino = 0;
341✔
838
  int64_t streamId = pStream->pCreate->streamId;
341✔
839
  SStreamTriggerDeployMsg* pMsg = &pDeploy->msg.trigger;
341✔
840
  
841
  pMsg->triggerType = pStream->pCreate->triggerType;
341✔
842
  pMsg->igDisorder = pStream->pCreate->igDisorder;
341✔
843
  pMsg->fillHistory = pStream->pCreate->fillHistory;
341✔
844
  pMsg->fillHistoryFirst = pStream->pCreate->fillHistoryFirst;
341✔
845
  pMsg->lowLatencyCalc = pStream->pCreate->lowLatencyCalc;
341✔
846
  pMsg->igNoDataTrigger = pStream->pCreate->igNoDataTrigger;
341✔
847
  pMsg->isTriggerTblVirt = STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags);
341✔
848
  pMsg->triggerHasPF = pStream->pCreate->triggerHasPF;
341✔
849
  pMsg->isTriggerTblStb = (pStream->pCreate->triggerTblType == TSDB_SUPER_TABLE);
341✔
850
  pMsg->partitionCols = pStream->pCreate->partitionCols;
341✔
851

852
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
341✔
853
  pMsg->notifyEventTypes = pStream->pCreate->notifyEventTypes;
341✔
854
  pMsg->addOptions = pStream->pCreate->addOptions;
341✔
855
  pMsg->notifyHistory = pStream->pCreate->notifyHistory;
341✔
856

857
  pMsg->maxDelay = pStream->pCreate->maxDelay;
341✔
858
  pMsg->fillHistoryStartTime = pStream->pCreate->fillHistoryStartTime;
341✔
859
  pMsg->watermark = pStream->pCreate->watermark;
341✔
860
  pMsg->expiredTime = pStream->pCreate->expiredTime;
341✔
861
  pMsg->trigger = pInfo->pCreate->trigger;
341✔
862

863
  pMsg->eventTypes = pStream->pCreate->eventTypes;
341✔
864
  pMsg->placeHolderBitmap = pStream->pCreate->placeHolderBitmap;
341✔
865
  pMsg->calcTsSlotId = pStream->pCreate->calcTsSlotId;
341✔
866
  pMsg->triTsSlotId = pStream->pCreate->triTsSlotId;
341✔
867
  pMsg->triggerPrevFilter = pInfo->pCreate->triggerPrevFilter;
341✔
868
  if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
341✔
869
    pMsg->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
73✔
870
    pMsg->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
73✔
871
  }
872

873
  SStreamTaskAddr addr;
874
  int32_t triggerReaderNum = taosArrayGetSize(pInfo->trigReaders);
341✔
875
  if (triggerReaderNum > 0) {
341✔
876
    pMsg->readerList = taosArrayInit(triggerReaderNum, sizeof(SStreamTaskAddr));
340✔
877
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
340!
878
  }
879
  
880
  for (int32_t i = 0; i < triggerReaderNum; ++i) {
778✔
881
    SStmTaskStatus* pStatus = taosArrayGet(pInfo->trigReaders, i);
437✔
882
    addr.taskId = pStatus->id.taskId;
437✔
883
    addr.nodeId = pStatus->id.nodeId;
437✔
884
    addr.epset = mndGetVgroupEpsetById(pMnode, pStatus->id.nodeId);
437✔
885
    TSDB_CHECK_NULL(taosArrayPush(pMsg->readerList, &addr), code, lino, _exit, terrno);
874!
886
    mstsDebug("the %dth trigReader src added to trigger's readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
437✔
887
  }
888

889
  pMsg->leaderSnodeId = pStream->mainSnodeId;
341✔
890
  pMsg->streamName = pInfo->streamName;
341✔
891

892
  if (0 == pInfo->runnerNum) {
341✔
893
    mstsDebug("no runner task, skip set trigger's runner list, deployNum:%d", pInfo->runnerDeploys);
1!
894
    return code;
1✔
895
  }
896

897
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pInfo, streamId, &pMsg->runnerList));
340!
898

899
_exit:
340✔
900

901
  if (code) {
340!
902
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
903
  } else {
904
    mstsDebug("trigger deploy info built, readerNum:%d, runnerNum:%d", (int32_t)taosArrayGetSize(pMsg->readerList), (int32_t)taosArrayGetSize(pMsg->runnerList));
340✔
905
  }
906
  
907
  return TSDB_CODE_SUCCESS;
340✔
908
}
909

910

911
int32_t msmBuildRunnerDeployInfo(SStmTaskDeploy* pDeploy, SSubplan *plan, SStreamObj* pStream, SStmStatus* pInfo, bool topPlan) {
1,088✔
912
  int32_t code = TSDB_CODE_SUCCESS;
1,088✔
913
  int32_t lino = 0;
1,088✔
914
  int64_t streamId = pStream->pCreate->streamId;
1,088✔
915
  SStreamRunnerDeployMsg* pMsg = &pDeploy->msg.runner;
1,088✔
916
  //TAOS_CHECK_EXIT(qSubPlanToString(plan, &pMsg->pPlan, NULL));
917

918
  pMsg->execReplica = pInfo->runnerReplica;
1,088✔
919
  pMsg->streamName = pInfo->streamName;
1,088✔
920
  //TAOS_CHECK_EXIT(nodesCloneNode((SNode*)plan, (SNode**)&pMsg->pPlan));
921
  pMsg->pPlan = plan;
1,088✔
922
  pMsg->outDBFName = pInfo->pCreate->outDB;
1,088✔
923
  pMsg->outTblName = pInfo->pCreate->outTblName;
1,088✔
924
  pMsg->outTblType = pStream->pCreate->outTblType;
1,088✔
925
  pMsg->calcNotifyOnly = pStream->pCreate->calcNotifyOnly;
1,088✔
926
  pMsg->topPlan = topPlan;
1,088✔
927
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
1,088✔
928
  pMsg->addOptions = pStream->pCreate->addOptions;
1,088✔
929
  if(pStream->pCreate->trigger.sliding.overlap) {
1,088✔
930
    pMsg->addOptions |= CALC_SLIDING_OVERLAP;
6✔
931
  }
932
  pMsg->outCols = pInfo->pCreate->outCols;
1,088✔
933
  pMsg->outTags = pInfo->pCreate->outTags;
1,088✔
934
  pMsg->outStbUid = pStream->pCreate->outStbUid;
1,088✔
935
  pMsg->outStbSversion = pStream->pCreate->outStbSversion;
1,088✔
936
  
937
  pMsg->subTblNameExpr = pInfo->pCreate->subTblNameExpr;
1,088✔
938
  pMsg->tagValueExpr = pInfo->pCreate->tagValueExpr;
1,088✔
939
  pMsg->forceOutCols = pInfo->pCreate->forceOutCols;
1,088✔
940

941
_exit:
1,088✔
942

943
  if (code) {
1,088!
944
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
945
  }
946
  
947
  return code;
1,088✔
948
}
949

950

951
static int32_t msmSTAddToVgroupMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SList* pList, SStmTaskStatus* pTask, bool trigReader) {
723✔
952
  int32_t code = TSDB_CODE_SUCCESS;
723✔
953
  int32_t lino = 0;
723✔
954
  int32_t taskNum = pTask ? 1 : (pList ? MST_LIST_SIZE(pList) :taosArrayGetSize(pTasks));
723!
955
  SListNode* pNode = pList ? listHead(pList) : NULL;
723✔
956
  
957
  for (int32_t i = 0; i < taskNum; ++i) {
1,473✔
958
    SStmTaskStatus* pStatus = pTask ? pTask : (pNode ? (SStmTaskStatus*)pNode->data : taosArrayGet(pTasks, i));
750✔
959
    TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pStatus, trigReader));
750!
960
    if (pNode) {
750✔
961
      pNode = TD_DLIST_NODE_NEXT(pNode);
272✔
962
    }
963
  }
964
  
965
_exit:
723✔
966

967
  if (code) {
723!
968
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
969
  }
970
  
971
  return code;
723✔
972
}
973

974

975
static int32_t msmSTAddToSnodeMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, int32_t taskNum, int32_t deployId) {
1,363✔
976
  int32_t code = TSDB_CODE_SUCCESS;
1,363✔
977
  int32_t lino = 0;
1,363✔
978
  int32_t rtaskNum = (taskNum > 0) ? taskNum : taosArrayGetSize(pTasks);
1,363✔
979
  int32_t taskType = (deployId < 0) ? STREAM_TRIGGER_TASK : STREAM_RUNNER_TASK;
1,363✔
980
  
981
  for (int32_t i = 0; i < rtaskNum; ++i) {
2,792✔
982
    SStmTaskStatus* pStatus = (taskNum > 0) ? (pTask + i) : taosArrayGet(pTasks, i);
1,429✔
983
    TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pStatus, deployId));
1,429!
984
  }
985
  
986
_exit:
1,363✔
987

988
  if (code) {
1,363!
989
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
990
  }
991

992
  return code;
1,363✔
993
}
994

995
int64_t msmAssignTaskId(void) {
2,177✔
996
  return atomic_fetch_add_64(&mStreamMgmt.lastTaskId, 1);
2,177✔
997
}
998

999
int64_t msmAssignTaskSeriousId(void) {
2,177✔
1000
  return taosGetTimestampNs();
2,177✔
1001
}
1002

1003

1004
int32_t msmIsSnodeAlive(SMnode* pMnode, int32_t snodeId, int64_t streamId, bool* alive) {
1,787✔
1005
  int32_t code = TSDB_CODE_SUCCESS;
1,787✔
1006
  int32_t lino = 0;
1,787✔
1007
  bool     noExists = false;
1,787✔
1008
  SStmSnodeStatus* pStatus = NULL;
1,787✔
1009

1010
  while (true) {
1011
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
1,824✔
1012
    if (NULL == pStatus) {
1,824✔
1013
      if (noExists) {
37!
1014
        mstsError("snode %d not exists in snodeMap", snodeId);
×
1015
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1016
      }
1017

1018
      noExists = true;
37✔
1019
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
37!
1020
      
1021
      continue;
37✔
1022
    }
1023

1024
    *alive = (pStatus->runnerThreadNum >= 0);
1,787✔
1025
    break;
1,787✔
1026
  }
1027

1028
_exit:
1,787✔
1029

1030
  if (code) {
1,787!
1031
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1032
  }
1033

1034
  return code;
1,787✔
1035
}
1036

1037
int32_t msmRetrieveStaticSnodeId(SMnode* pMnode, SStreamObj* pStream) {
683✔
1038
  int32_t code = TSDB_CODE_SUCCESS;
683✔
1039
  int32_t lino = 0;
683✔
1040
  bool alive = false;
683✔
1041
  int32_t mainSnodeId = atomic_load_32(&pStream->mainSnodeId);
683✔
1042
  int32_t snodeId = mainSnodeId;
683✔
1043
  int64_t streamId = pStream->pCreate->streamId;
683✔
1044
  
1045
  while (true) {
1046
    TAOS_CHECK_EXIT(msmIsSnodeAlive(pMnode, snodeId, streamId, &alive));
683!
1047

1048
    if (alive) {
683!
1049
      return snodeId;
683✔
1050
    }
1051
    
1052
    if (snodeId == mainSnodeId) {
×
1053
      SSnodeObj* pSnode = mndAcquireSnode(pMnode, snodeId);
×
1054
      if (NULL == pSnode) {
×
1055
        stsWarn("snode %d not longer exists, ignore assign snode", snodeId);
×
1056
        return 0;
×
1057
      }
1058
      
1059
      if (pSnode->replicaId <= 0) {
×
1060
        mstsError("no available snode now, mainSnodeId:%d, replicaId:%d", mainSnodeId, pSnode->replicaId);
×
1061
        mndReleaseSnode(pMnode, pSnode);
×
1062
        return 0;
×
1063
      }
1064

1065
      snodeId = pSnode->replicaId;
×
1066
      mndReleaseSnode(pMnode, pSnode);
×
1067
      
1068
      continue;
×
1069
    }
1070

1071
    mstsError("no available snode now, mainSnodeId:%d, followerSnodeId:%d", mainSnodeId, snodeId);
×
1072
    return 0;
×
1073
  }
1074

1075
_exit:
×
1076

1077
  if (code) {
×
1078
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1079
  }
1080

1081
  return 0;
×
1082
}
1083

1084
int32_t msmAssignRandomSnodeId(SMnode* pMnode, int64_t streamId) {
1,070✔
1085
  int32_t code = TSDB_CODE_SUCCESS;
1,070✔
1086
  int32_t lino = 0;
1,070✔
1087
  int32_t snodeIdx = 0;
1,070✔
1088
  int32_t snodeId = 0;
1,070✔
1089
  void      *pIter = NULL;
1,070✔
1090
  SSnodeObj *pObj = NULL;
1,070✔
1091
  bool alive = false;
1,070✔
1092
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,070✔
1093
  if (snodeNum <= 0) {
1,070✔
1094
    mstsInfo("no available snode now, num:%d", snodeNum);
10!
1095
    goto _exit;
10✔
1096
  }
1097

1098
  int32_t snodeTarget = taosRand() % snodeNum;
1,060✔
1099

1100
  while (1) {
1101
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
1,104✔
1102
    if (pIter == NULL) {
1,104!
1103
      if (0 == snodeId) {
×
1104
        mstsError("no alive snode now, snodeNum:%d", snodeNum);
×
1105
        break;
×
1106
      }
1107
      
1108
      snodeId = 0;
×
1109
      continue;
×
1110
    }
1111

1112
    code = msmIsSnodeAlive(pMnode, pObj->id, streamId, &alive);
1,104✔
1113
    if (code) {
1,104!
1114
      sdbRelease(pMnode->pSdb, pObj);
×
1115
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1116
      pObj = NULL;
×
1117
      TAOS_CHECK_EXIT(code);
×
1118
    }
1119
    
1120
    if (!alive) {
1,104!
1121
      sdbRelease(pMnode->pSdb, pObj);
×
1122
      continue;
×
1123
    }
1124

1125
    snodeId = pObj->id;
1,104✔
1126
    if (snodeIdx == snodeTarget) {
1,104✔
1127
      sdbRelease(pMnode->pSdb, pObj);
1,060✔
1128
      sdbCancelFetch(pMnode->pSdb, pIter);
1,060✔
1129
      pObj = NULL;
1,060✔
1130
      goto _exit;
1,060✔
1131
    }
1132

1133
    sdbRelease(pMnode->pSdb, pObj);
44✔
1134
    snodeIdx++;
44✔
1135
  }
1136

1137
_exit:
1,070✔
1138

1139
  if (code) {
1,070!
1140
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1141
  }
1142

1143
  if (0 == snodeId) {
1,070✔
1144
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
10✔
1145
  }
1146

1147
  return snodeId;
1,070✔
1148
}
1149

1150
int32_t msmAssignTaskSnodeId(SMnode* pMnode, SStreamObj* pStream, bool isStatic) {
1,363✔
1151
  int64_t streamId = pStream->pCreate->streamId;
1,363✔
1152
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,363✔
1153
  int32_t snodeId = 0;
1,363✔
1154
  if (snodeNum <= 0) {
1,363!
1155
    mstsInfo("no available snode now, num:%d", snodeNum);
×
1156
    goto _exit;
×
1157
  }
1158

1159
  snodeId = isStatic ? msmRetrieveStaticSnodeId(pMnode, pStream) : msmAssignRandomSnodeId(pMnode, streamId);
1,363✔
1160

1161
_exit:
1,363✔
1162

1163
  if (0 == snodeId) {
1,363!
1164
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
×
1165
  }
1166

1167
  return snodeId;
1,363✔
1168
}
1169

1170

1171
static int32_t msmBuildTriggerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
341✔
1172
  int32_t code = TSDB_CODE_SUCCESS;
341✔
1173
  int32_t lino = 0;
341✔
1174
  int64_t streamId = pStream->pCreate->streamId;
341✔
1175

1176
  pInfo->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskStatus));
341!
1177
  TSDB_CHECK_NULL(pInfo->triggerTask, code, lino, _exit, terrno);
341!
1178

1179
  pInfo->triggerTask->id.taskId = pCtx->triggerTaskId;
341✔
1180
  pInfo->triggerTask->id.deployId = 0;
341✔
1181
  pInfo->triggerTask->id.seriousId = msmAssignTaskSeriousId();
341✔
1182
  pInfo->triggerTask->id.nodeId = pCtx->triggerNodeId;
341✔
1183
  pInfo->triggerTask->id.taskIdx = 0;
341✔
1184
  pInfo->triggerTask->type = STREAM_TRIGGER_TASK;
341✔
1185
  pInfo->triggerTask->lastUpTs = pCtx->currTs;
341✔
1186
  pInfo->triggerTask->pStream = pInfo;
341✔
1187

1188
  SStmTaskDeploy info = {0};
341✔
1189
  info.task.type = pInfo->triggerTask->type;
341✔
1190
  info.task.streamId = streamId;
341✔
1191
  info.task.taskId =  pInfo->triggerTask->id.taskId;
341✔
1192
  info.task.seriousId = pInfo->triggerTask->id.seriousId;
341✔
1193
  info.task.nodeId =  pInfo->triggerTask->id.nodeId;
341✔
1194
  info.task.taskIdx =  pInfo->triggerTask->id.taskIdx;
341✔
1195
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pInfo, &info, pStream));
341!
1196
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
341!
1197
  
1198
  (void)atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
341✔
1199

1200
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, NULL, pInfo->triggerTask));
341!
1201
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pInfo->triggerTask, 1, -1));
341!
1202

1203
_exit:
341✔
1204

1205
  if (code) {
341!
1206
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1207
  }
1208

1209
  return code;
341✔
1210
}
1211

1212
static int32_t msmTDAddSingleTrigReader(SStmGrpCtx* pCtx, SStmTaskStatus* pState, int32_t nodeId, SStmStatus* pInfo, int64_t streamId) {
467✔
1213
  int32_t code = TSDB_CODE_SUCCESS;
467✔
1214
  int32_t lino = 0;
467✔
1215

1216
  pState->id.taskId = msmAssignTaskId();
467✔
1217
  pState->id.deployId = 0;
467✔
1218
  pState->id.seriousId = msmAssignTaskSeriousId();
467✔
1219
  pState->id.nodeId = nodeId;
467✔
1220
  pState->id.taskIdx = 0;
467✔
1221
  pState->type = STREAM_READER_TASK;
467✔
1222
  pState->flags = STREAM_FLAG_TRIGGER_READER;
467✔
1223
  pState->status = STREAM_STATUS_UNDEPLOYED;
467✔
1224
  pState->lastUpTs = pCtx->currTs;
467✔
1225
  pState->pStream = pInfo;
467✔
1226
  
1227
  SStmTaskDeploy info = {0};
467✔
1228
  info.task.type = pState->type;
467✔
1229
  info.task.streamId = streamId;
467✔
1230
  info.task.taskId = pState->id.taskId;
467✔
1231
  info.task.flags = pState->flags;
467✔
1232
  info.task.seriousId = pState->id.seriousId;
467✔
1233
  info.task.nodeId = pState->id.nodeId;
467✔
1234
  info.task.taskIdx = pState->id.taskIdx;
467✔
1235
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, NULL, pInfo, true));
467!
1236
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
467!
1237

1238
_exit:
467✔
1239

1240
  if (code) {
467!
1241
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1242
  }
1243

1244
  return code;
467✔
1245
}
1246

1247
static int32_t msmTDAddTrigReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
341✔
1248
  int32_t code = TSDB_CODE_SUCCESS;
341✔
1249
  int32_t lino = 0;
341✔
1250
  int64_t streamId = pStream->pCreate->streamId;
341✔
1251
  SSdb   *pSdb = pCtx->pMnode->pSdb;
341✔
1252
  SStmTaskStatus* pState = NULL;
341✔
1253
  SVgObj *pVgroup = NULL;
341✔
1254
  SDbObj* pDb = NULL;
341✔
1255
  
1256
  switch (pStream->pCreate->triggerTblType) {
341✔
1257
    case TSDB_NORMAL_TABLE:
127✔
1258
    case TSDB_CHILD_TABLE:
1259
    case TSDB_VIRTUAL_CHILD_TABLE:
1260
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1261
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
127✔
1262
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
127!
1263
      pState = taosArrayGet(pInfo->trigReaders, 0);
127✔
1264
      
1265
      TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, pStream->pCreate->triggerTblVgId, pInfo, streamId));
127!
1266
      break;
127✔
1267
    }
1268
    case TSDB_SUPER_TABLE: {
213✔
1269
      pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
213✔
1270
      if (NULL == pDb) {
213!
1271
        code = terrno;
×
1272
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
1273
        goto _exit;
×
1274
      }
1275

1276
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
213✔
1277
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
213!
1278
      
1279
      void *pIter = NULL;
213✔
1280
      while (1) {
1,249✔
1281
        SStmTaskDeploy info = {0};
1,462✔
1282
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
1,462✔
1283
        if (pIter == NULL) {
1,462✔
1284
          break;
213✔
1285
        }
1286
      
1287
        if (pVgroup->dbUid == pDb->uid && !pVgroup->isTsma) {
1,249!
1288
          pState = taosArrayReserve(pInfo->trigReaders, 1);
310✔
1289

1290
          code = msmTDAddSingleTrigReader(pCtx, pState, pVgroup->vgId, pInfo, streamId);
310✔
1291
          if (code) {
310!
1292
            sdbRelease(pSdb, pVgroup);
×
1293
            sdbCancelFetch(pSdb, pIter);
×
1294
            pVgroup = NULL;
×
1295
            TAOS_CHECK_EXIT(code);
×
1296
          }
1297
        }
1298

1299
        sdbRelease(pSdb, pVgroup);
1,249✔
1300
      }
1301
      break;
213✔
1302
    }
1303
    default:
1✔
1304
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
1!
1305
      break;
1✔
1306
  }
1307

1308
_exit:
341✔
1309

1310
  mndReleaseDb(pCtx->pMnode, pDb);
341✔
1311

1312
  if (code) {
341!
1313
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1314
  }
1315

1316
  return code;
341✔
1317
}
1318

1319
int32_t msmUPAddScanTask(SStmGrpCtx* pCtx, SStreamObj* pStream, char* scanPlan, int32_t vgId, int64_t taskId) {
274✔
1320
  int32_t code = TSDB_CODE_SUCCESS;
274✔
1321
  int32_t lino = 0;
274✔
1322
  SSubplan* pSubplan = NULL;
274✔
1323
  int64_t streamId = pStream->pCreate->streamId;
274✔
1324
  int64_t key[2] = {streamId, 0};
274✔
1325
  SStmTaskSrcAddr addr;
1326
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
274!
1327
  addr.isFromCache = false;
274✔
1328
  
1329
  if (MNODE_HANDLE == vgId) {
274!
1330
    mndGetMnodeEpSet(pCtx->pMnode, &addr.epset);
×
1331
  } else if (vgId > MNODE_HANDLE) {
274!
1332
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, vgId);
274✔
1333
  } else {
1334
    mstsError("invalid vgId %d in scanPlan", vgId);
×
1335
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1336
  }
1337
  
1338
  addr.taskId = taskId;
274✔
1339
  addr.vgId = vgId;
274✔
1340
  addr.groupId = pSubplan->id.groupId;
274✔
1341

1342
  key[1] = pSubplan->id.subplanId;
274✔
1343

1344
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
274✔
1345
  if (NULL == ppRes) {
274!
1346
    SArray* pRes = taosArrayInit(1, sizeof(addr));
274✔
1347
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
274!
1348
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
548!
1349
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
274!
1350
  } else {
1351
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1352
  }
1353

1354
  mstsDebug("calcReader %" PRIx64 " added to toUpdateScan, vgId:%d, groupId:%d, subplanId:%d", taskId, vgId, pSubplan->id.groupId, pSubplan->id.subplanId);
274✔
1355
  
1356
  (void)atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
274✔
1357
  
1358
_exit:
274✔
1359

1360
  nodesDestroyNode((SNode*)pSubplan);
274✔
1361

1362
  if (code) {
274!
1363
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1364
  }
1365

1366
  return code;
274✔
1367
}
1368

1369
int32_t msmUPAddCacheTask(SStmGrpCtx* pCtx, SStreamCalcScan* pScan, SStreamObj* pStream) {
173✔
1370
  int32_t code = TSDB_CODE_SUCCESS;
173✔
1371
  int32_t lino = 0;
173✔
1372
  SSubplan* pSubplan = NULL;
173✔
1373
  int64_t streamId = pStream->pCreate->streamId;
173✔
1374
  int64_t key[2] = {streamId, 0};
173✔
1375
  TAOS_CHECK_EXIT(nodesStringToNode(pScan->scanPlan, (SNode**)&pSubplan));
173!
1376

1377
  SStmTaskSrcAddr addr;
1378
  addr.isFromCache = true;
173✔
1379
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pCtx->triggerNodeId);
173✔
1380
  addr.taskId = pCtx->triggerTaskId;
173✔
1381
  addr.vgId = pCtx->triggerNodeId;
173✔
1382
  addr.groupId = pSubplan->id.groupId;
173✔
1383

1384
  key[1] = pSubplan->id.subplanId;
173✔
1385
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
173✔
1386
  if (NULL == ppRes) {
173!
1387
    SArray* pRes = taosArrayInit(1, sizeof(addr));
173✔
1388
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
173!
1389
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
346!
1390
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
173!
1391
  } else {
1392
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1393
  }
1394
  
1395
  (void)atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
173✔
1396
  
1397
_exit:
173✔
1398

1399
  nodesDestroyNode((SNode*)pSubplan);
173✔
1400
  
1401
  if (code) {
173!
1402
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1403
  }
1404

1405
  return code;
173✔
1406
}
1407

1408

1409
static int32_t msmTDAddSingleCalcReader(SStmGrpCtx* pCtx, SStmTaskStatus* pState, int32_t taskIdx, int32_t nodeId, void* calcScanPlan, SStmStatus* pInfo, int64_t streamId) {
283✔
1410
  int32_t code = TSDB_CODE_SUCCESS;
283✔
1411
  int32_t lino = 0;
283✔
1412

1413
  TAOS_CHECK_EXIT(mstGetScanUidFromPlan(streamId, calcScanPlan, &pState->id.uid));
283!
1414

1415
  pState->id.taskId = msmAssignTaskId();
283✔
1416
  pState->id.deployId = 0;
283✔
1417
  pState->id.seriousId = msmAssignTaskSeriousId();
283✔
1418
  pState->id.nodeId = nodeId;
283✔
1419
  pState->id.taskIdx = taskIdx;
283✔
1420
  pState->type = STREAM_READER_TASK;
283✔
1421
  pState->flags = 0;
283✔
1422
  pState->status = STREAM_STATUS_UNDEPLOYED;
283✔
1423
  pState->lastUpTs = pCtx->currTs;
283✔
1424
  pState->pStream = pInfo;
283✔
1425
  
1426
  SStmTaskDeploy info = {0};
283✔
1427
  info.task.type = pState->type;
283✔
1428
  info.task.streamId = streamId;
283✔
1429
  info.task.taskId = pState->id.taskId;
283✔
1430
  info.task.flags = pState->flags;
283✔
1431
  info.task.seriousId = pState->id.seriousId;
283✔
1432
  info.task.nodeId = pState->id.nodeId;
283✔
1433
  info.task.taskIdx = pState->id.taskIdx;
283✔
1434
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, calcScanPlan, pInfo, false));
283!
1435
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
283!
1436

1437
_exit:
283✔
1438

1439
  if (code) {
283!
1440
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1441
  }
1442

1443
  return code;
283✔
1444
}
1445

1446

1447
static int32_t msmTDAddCalcReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
341✔
1448
  int32_t code = TSDB_CODE_SUCCESS;
341✔
1449
  int32_t lino = 0;
341✔
1450
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
341✔
1451
  int64_t streamId = pStream->pCreate->streamId;
341✔
1452
  SStmTaskStatus* pState = NULL;
341✔
1453
  pInfo->calcReaders = tdListNew(sizeof(SStmTaskStatus));
341✔
1454
  TSDB_CHECK_NULL(pInfo->calcReaders, code, lino, _exit, terrno);
341!
1455

1456
  
1457
  for (int32_t i = 0; i < calcTasksNum; ++i) {
786✔
1458
    SStreamCalcScan* pScan = taosArrayGet(pInfo->pCreate->calcScanPlanList, i);
445✔
1459
    if (pScan->readFromCache) {
445✔
1460
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
173!
1461
      continue;
173✔
1462
    }
1463
    
1464
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
272✔
1465
    for (int32_t m = 0; m < vgNum; ++m) {
544✔
1466
      pState = tdListReserve(pInfo->calcReaders);
272✔
1467
      TSDB_CHECK_NULL(pState, code, lino, _exit, terrno);
272!
1468

1469
      TAOS_CHECK_EXIT(msmTDAddSingleCalcReader(pCtx, pState, i, *(int32_t*)taosArrayGet(pScan->vgList, m), pScan->scanPlan, pInfo, streamId));
272!
1470
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pState->id.nodeId, pState->id.taskId));
272!
1471
    }
1472
  }
1473

1474
_exit:
341✔
1475

1476
  if (code) {
341!
1477
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1478
  }
1479

1480
  return code;
341✔
1481
}
1482

1483

1484

1485
static int32_t msmUPPrepareReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
2✔
1486
  int32_t code = TSDB_CODE_SUCCESS;
2✔
1487
  int32_t lino = 0;
2✔
1488
  int64_t streamId = pStream->pCreate->streamId;
2✔
1489
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
2✔
1490
  if (calcTasksNum <= 0) {
2!
1491
    mstsDebug("no calc scan plan, ignore parepare reader tasks, readerNum:%d", (int32_t)MST_LIST_SIZE(pInfo->calcReaders));
×
1492
    return code;    
×
1493
  }
1494
  
1495
  SListNode* pNode = listHead(pInfo->calcReaders);
2✔
1496
  
1497
  for (int32_t i = 0; i < calcTasksNum; ++i) {
4✔
1498
    SStreamCalcScan* pScan = taosArrayGet(pStream->pCreate->calcScanPlanList, i);
2✔
1499
    if (pScan->readFromCache) {
2!
1500
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
×
1501
      continue;
×
1502
    }
1503
    
1504
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
2✔
1505
    for (int32_t m = 0; m < vgNum; ++m) {
4✔
1506
      SStmTaskStatus* pReader = (SStmTaskStatus*)pNode->data;
2✔
1507
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pReader->id.nodeId, pReader->id.taskId));
2!
1508
      pNode = TD_DLIST_NODE_NEXT(pNode);
2✔
1509
    }
1510
  }
1511

1512
_exit:
2✔
1513

1514
  if (code) {
2!
1515
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1516
  }
1517

1518
  return code;
2✔
1519
}
1520

1521
static int32_t msmBuildReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
341✔
1522
  int32_t code = TSDB_CODE_SUCCESS;
341✔
1523
  int32_t lino = 0;
341✔
1524
  int64_t streamId = pStream->pCreate->streamId;
341✔
1525
  
1526
  TAOS_CHECK_EXIT(msmTDAddTrigReaderTasks(pCtx, pInfo, pStream));
341!
1527
  TAOS_CHECK_EXIT(msmTDAddCalcReaderTasks(pCtx, pInfo, pStream));
341!
1528

1529
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->trigReaders, NULL, NULL));
341!
1530
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pInfo->calcReaders, NULL));
341!
1531
  
1532
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->trigReaders, NULL, NULL, true));
341!
1533
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, NULL, pInfo->calcReaders, NULL, false));
341!
1534
  
1535
_exit:
341✔
1536

1537
  if (code) {
341!
1538
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1539
  }
1540
  
1541
  return code;
341✔
1542
}
1543

1544
int32_t msmUpdatePlanSourceAddr(SStreamTask* pTask, int64_t streamId, SSubplan* plan, int64_t clientId, SStmTaskSrcAddr* pSrc, int32_t msgType, int64_t srcSubplanId) {
1,403✔
1545
  SDownstreamSourceNode source = {
1,403✔
1546
      .type = QUERY_NODE_DOWNSTREAM_SOURCE,
1547
      .clientId = clientId,
1548
      .taskId = pSrc->taskId,
1,403✔
1549
      .sId = 0,
1550
      .execId = 0,
1551
      .fetchMsgType = msgType,
1552
      .localExec = false,
1553
  };
1554

1555
  source.addr.epSet = pSrc->epset;
1,403✔
1556
  source.addr.nodeId = pSrc->vgId;
1,403✔
1557

1558
  msttDebug("try to update subplan %d grp %d sourceAddr from subplan %" PRId64 ", clientId:%" PRIx64 ", srcTaskId:%" PRIx64 ", srcNodeId:%d, msgType:%s", 
1,403!
1559
      plan->id.subplanId, pSrc->groupId, srcSubplanId, source.clientId, source.taskId, source.addr.nodeId, TMSG_INFO(source.fetchMsgType));
1560
  
1561
  return qSetSubplanExecutionNode(plan, pSrc->groupId, &source);
1,403✔
1562
}
1563

1564
int32_t msmGetTaskIdFromSubplanId(SStreamObj* pStream, SArray* pRunners, int32_t beginIdx, int32_t subplanId, int64_t* taskId, SStreamTask** ppParent) {
66✔
1565
  int64_t streamId = pStream->pCreate->streamId;
66✔
1566
  int32_t runnerNum = taosArrayGetSize(pRunners);
66✔
1567
  for (int32_t i = beginIdx; i < runnerNum; ++i) {
165!
1568
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
165✔
1569
    SSubplan* pPlan = pDeploy->msg.runner.pPlan;
165✔
1570
    if (pPlan->id.subplanId == subplanId) {
165✔
1571
      *taskId = pDeploy->task.taskId;
66✔
1572
      *ppParent = &pDeploy->task;
66✔
1573
      return TSDB_CODE_SUCCESS;
66✔
1574
    }
1575
  }
1576

1577
  mstsError("subplanId %d not found in runner list", subplanId);
×
1578

1579
  return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
1580
}
1581

1582
int32_t msmUpdateLowestPlanSourceAddr(SSubplan* pPlan, SStmTaskDeploy* pDeploy, int64_t streamId) {
1,088✔
1583
  int32_t code = TSDB_CODE_SUCCESS;
1,088✔
1584
  int32_t lino = 0;
1,088✔
1585
  int64_t key[2] = {streamId, -1};
1,088✔
1586
  SNode* pNode = NULL;
1,088✔
1587
  SStreamTask* pTask = &pDeploy->task;
1,088✔
1588
  FOREACH(pNode, pPlan->pChildren) {
2,491!
1589
    if (QUERY_NODE_VALUE != nodeType(pNode)) {
1,403✔
1590
      msttDebug("node type %d is not valueNode, skip it", nodeType(pNode));
66!
1591
      continue;
66✔
1592
    }
1593
    
1594
    SValueNode* pVal = (SValueNode*)pNode;
1,337✔
1595
    if (TSDB_DATA_TYPE_BIGINT != pVal->node.resType.type) {
1,337!
1596
      msttWarn("invalid value node data type %d for runner's child subplan", pVal->node.resType.type);
×
1597
      continue;
×
1598
    }
1599

1600
    key[1] = MND_GET_RUNNER_SUBPLANID(pVal->datum.i);
1,337✔
1601

1602
    SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
1,337✔
1603
    if (NULL == ppRes) {
1,337!
1604
      msttError("lowest runner subplan ID:%d,%d can't get its child ID:%" PRId64 " addr", pPlan->id.groupId, pPlan->id.subplanId, key[1]);
×
1605
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1606
    }
1607

1608
    int32_t childrenNum = taosArrayGetSize(*ppRes);
1,337✔
1609
    for (int32_t i = 0; i < childrenNum; ++i) {
2,674✔
1610
      SStmTaskSrcAddr* pAddr = taosArrayGet(*ppRes, i);
1,337✔
1611
      TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(pTask, streamId, pPlan, pDeploy->task.taskId, pAddr, pAddr->isFromCache ? TDMT_STREAM_FETCH_FROM_CACHE : TDMT_STREAM_FETCH, key[1]));
1,337!
1612
    }
1613
  }
1614

1615
_exit:
1,088✔
1616

1617
  if (code) {
1,088!
1618
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1619
  }
1620

1621
  return code;
1,088✔
1622
}
1623

1624
int32_t msmUpdateRunnerPlan(SStmGrpCtx* pCtx, SArray* pRunners, int32_t beginIdx, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
1,088✔
1625
  int32_t code = TSDB_CODE_SUCCESS;
1,088✔
1626
  int32_t lino = 0;
1,088✔
1627
  SSubplan* pPlan = pDeploy->msg.runner.pPlan;
1,088✔
1628
  SStreamTask* pTask = &pDeploy->task;
1,088✔
1629
  SStreamTask* parentTask = NULL;
1,088✔
1630
  int64_t streamId = pStream->pCreate->streamId;
1,088✔
1631

1632
  TAOS_CHECK_EXIT(msmUpdateLowestPlanSourceAddr(pPlan, pDeploy, streamId));
1,088!
1633

1634
  SNode* pTmp = NULL;
1,088✔
1635
  WHERE_EACH(pTmp, pPlan->pChildren) {
2,491!
1636
    if (QUERY_NODE_VALUE == nodeType(pTmp)) {
1,403✔
1637
      ERASE_NODE(pPlan->pChildren);
1,337✔
1638
      continue;
1,337✔
1639
    }
1640
    WHERE_NEXT;
66✔
1641
  }
1642
  nodesClearList(pPlan->pChildren);
1,088✔
1643
  pPlan->pChildren = NULL;
1,088✔
1644

1645
  if (NULL == pPlan->pParents) {
1,088✔
1646
    goto _exit;
1,022✔
1647
  }
1648

1649
  SNode* pNode = NULL;
66✔
1650
  int64_t parentTaskId = 0;
66✔
1651
  SStmTaskSrcAddr addr = {0};
66✔
1652
  addr.taskId = pDeploy->task.taskId;
66✔
1653
  addr.vgId = pDeploy->task.nodeId;
66✔
1654
  addr.groupId = pPlan->id.groupId;
66✔
1655
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pDeploy->task.nodeId);
66✔
1656
  FOREACH(pNode, pPlan->pParents) {
132!
1657
    SSubplan* pSubplan = (SSubplan*)pNode;
66✔
1658
    TAOS_CHECK_EXIT(msmGetTaskIdFromSubplanId(pStream, pRunners, beginIdx, pSubplan->id.subplanId, &parentTaskId, &parentTask));
66!
1659
    TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(parentTask, streamId, pSubplan, parentTaskId, &addr, TDMT_STREAM_FETCH_FROM_RUNNER, pPlan->id.subplanId));
66!
1660
  }
1661
  
1662
_exit:
66✔
1663

1664
  if (code) {
1,088!
1665
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1666
  }
1667

1668
  return code;
1,088✔
1669
}
1670

1671
int32_t msmUpdateRunnerPlans(SStmGrpCtx* pCtx, SArray* pRunners, SStreamObj* pStream) {
1,022✔
1672
  int32_t code = TSDB_CODE_SUCCESS;
1,022✔
1673
  int32_t lino = 0;
1,022✔
1674
  int64_t streamId = pStream->pCreate->streamId;
1,022✔
1675
  int32_t runnerNum = taosArrayGetSize(pRunners);
1,022✔
1676
  
1677
  for (int32_t i = 0; i < runnerNum; ++i) {
2,110✔
1678
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
1,088✔
1679
    TAOS_CHECK_EXIT(msmUpdateRunnerPlan(pCtx, pRunners, i, pDeploy, pStream));
1,088!
1680
    TAOS_CHECK_EXIT(nodesNodeToString((SNode*)pDeploy->msg.runner.pPlan, false, (char**)&pDeploy->msg.runner.pPlan, NULL));
1,088!
1681

1682
    SStreamTask* pTask = &pDeploy->task;
1,088✔
1683
    msttDebugL("runner updated task plan:%s", (const char*)pDeploy->msg.runner.pPlan);
1,088✔
1684
  }
1685

1686
_exit:
1,022✔
1687

1688
  if (code) {
1,022!
1689
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1690
  }
1691

1692
  return code;
1,022✔
1693
}
1694

1695
int32_t msmBuildRunnerTasksImpl(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream) {
340✔
1696
  int32_t code = 0;
340✔
1697
  int32_t lino = 0;
340✔
1698
  int64_t streamId = pStream->pCreate->streamId;
340✔
1699
  SArray* deployTaskList = NULL;
340✔
1700
  SArray* deployList = NULL;
340✔
1701
  int32_t deployNodeId = 0;
340✔
1702
  SStmTaskStatus* pState = NULL;
340✔
1703
  int32_t taskIdx = 0;
340✔
1704
  SNodeListNode *plans = NULL;
340✔
1705
  int32_t        taskNum = 0;
340✔
1706
  int32_t        totalTaskNum = 0;
340✔
1707

1708
  if (pDag->numOfSubplans <= 0) {
340!
1709
    mstsError("invalid subplan num:%d", pDag->numOfSubplans);
×
1710
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1711
  }
1712

1713
  if (pDag->numOfSubplans != pStream->pCreate->numOfCalcSubplan) {
340!
1714
    mstsError("numOfCalcSubplan %d mismatch with numOfSubplans %d", pStream->pCreate->numOfCalcSubplan, pDag->numOfSubplans);
×
1715
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1716
  }
1717

1718
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
340!
1719
  if (levelNum <= 0) {
340!
1720
    mstsError("invalid level num:%d", levelNum);
×
1721
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1722
  }
1723

1724
  int32_t        lowestLevelIdx = levelNum - 1;
340✔
1725
  
1726
  plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, 0);
340✔
1727
  if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
340!
1728
    mstsError("invalid level plan, level:0, planNodeType:%d", nodeType(plans));
×
1729
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1730
  }
1731
  
1732
  taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
340!
1733
  if (taskNum != 1) {
340!
1734
    mstsError("invalid level plan number:%d, level:0", taskNum);
×
1735
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1736
  }
1737

1738
  deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
340✔
1739
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
340!
1740
  
1741
  for (int32_t deployId = 0; deployId < pInfo->runnerDeploys; ++deployId) {
1,360✔
1742
    totalTaskNum = 0;
1,020✔
1743

1744
    deployList = pInfo->runners[deployId];
1,020✔
1745
    deployNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == deployId) ? true : false);
1,020✔
1746
    if (!GOT_SNODE(deployNodeId)) {
1,020!
1747
      TAOS_CHECK_EXIT(terrno);
×
1748
    }
1749

1750
    taskIdx = 0;
1,020✔
1751
    
1752
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
2,073✔
1753
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
1,053✔
1754
      if (NULL == plans) {
1,053!
1755
        mstsError("empty level plan, level:%d", i);
×
1756
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1757
      }
1758

1759
      if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
1,053!
1760
        mstsError("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
×
1761
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1762
      }
1763

1764
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
1,053!
1765
      if (taskNum <= 0) {
1,053!
1766
        mstsError("invalid level plan number:%d, level:%d", taskNum, i);
×
1767
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1768
      }
1769

1770
      totalTaskNum += taskNum;
1,053✔
1771
      if (totalTaskNum > pDag->numOfSubplans) {
1,053!
1772
        mstsError("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
×
1773
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1774
      }
1775

1776
      for (int32_t n = 0; n < taskNum; ++n) {
2,139✔
1777
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
1,086✔
1778
        pState = taosArrayReserve(deployList, 1);
1,086✔
1779

1780
        pState->id.taskId = msmAssignTaskId();
1,086✔
1781
        pState->id.deployId = deployId;
1,086✔
1782
        pState->id.seriousId = msmAssignTaskSeriousId();
1,086✔
1783
        pState->id.nodeId = deployNodeId;
1,086✔
1784
        pState->id.taskIdx = MND_SET_RUNNER_TASKIDX(i, n);
1,086✔
1785
        pState->type = STREAM_RUNNER_TASK;
1,086✔
1786
        pState->flags = (0 == i) ? STREAM_FLAG_TOP_RUNNER : 0;
1,086✔
1787
        pState->status = STREAM_STATUS_UNDEPLOYED;
1,086✔
1788
        pState->lastUpTs = pCtx->currTs;
1,086✔
1789
        pState->pStream = pInfo;
1,086✔
1790

1791
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
1,086✔
1792
        pDeploy->task.type = pState->type;
1,086✔
1793
        pDeploy->task.streamId = streamId;
1,086✔
1794
        pDeploy->task.taskId = pState->id.taskId;
1,086✔
1795
        pDeploy->task.flags = pState->flags;
1,086✔
1796
        pDeploy->task.seriousId = pState->id.seriousId;
1,086✔
1797
        pDeploy->task.deployId = pState->id.deployId;
1,086✔
1798
        pDeploy->task.nodeId = pState->id.nodeId;
1,086✔
1799
        pDeploy->task.taskIdx = pState->id.taskIdx;
1,086✔
1800
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
1,086!
1801

1802
        SStreamTask* pTask = &pDeploy->task;
1,086✔
1803
        msttDebug("runner task deploy built, subplan level:%d, taskIdx:%d, groupId:%d, subplanId:%d",
1,086✔
1804
            i, pTask->taskIdx, plan->id.groupId, plan->id.subplanId);
1805
      }
1806

1807
      mstsDebug("deploy %d level %d initialized, taskNum:%d", deployId, i, taskNum);
1,053✔
1808
    }
1809

1810
    if (totalTaskNum != pDag->numOfSubplans) {
1,020!
1811
      mstsError("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
×
1812
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1813
    }
1814

1815
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
1,020!
1816

1817
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
1,020!
1818

1819
    nodesDestroyNode((SNode *)pDag);
1,020✔
1820
    pDag = NULL;
1,020✔
1821
    
1822
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
1,020!
1823

1824
    mstsDebug("total %d runner tasks added for deploy %d", totalTaskNum, deployId);
1,020✔
1825
  }
1826

1827
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,360✔
1828
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->runners[i], NULL, NULL));
1,020!
1829
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[i], NULL, 0, i));
1,020!
1830
  }
1831
  
1832
  pInfo->runnerNum = totalTaskNum;
340✔
1833
  
1834
_exit:
340✔
1835

1836
  if (code) {
340!
1837
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1838
  }
1839

1840
  taosArrayDestroy(deployTaskList);
340✔
1841
  nodesDestroyNode((SNode *)pDag);
340✔
1842

1843
  return code;
340✔
1844
}
1845

1846
int32_t msmReBuildRunnerTasks(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream, SStmTaskAction* pAction) {
2✔
1847
  int32_t code = 0;
2✔
1848
  int32_t lino = 0;
2✔
1849
  int64_t streamId = pStream->pCreate->streamId;
2✔
1850
  int32_t newNodeId = 0;
2✔
1851
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
2!
1852
  int32_t        lowestLevelIdx = levelNum - 1;
2✔
1853
  SNodeListNode *plans = NULL;
2✔
1854
  int32_t        taskNum = 0;
2✔
1855
  int32_t        totalTaskNum = 0;
2✔
1856
  int32_t        deployId = 0;
2✔
1857
  SStmTaskStatus* pRunner = NULL;
2✔
1858
  SStmTaskStatus* pStartRunner = NULL;
2✔
1859
  int32_t taskIdx = 0;
2✔
1860
  SArray* deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
2✔
1861
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
2!
1862

1863
  for (int32_t r = 0; r < pAction->deployNum; ++r) {
4✔
1864
    deployId = pAction->deployId[r];
2✔
1865

1866
    pRunner = taosArrayGet(pInfo->runners[deployId], 0);
2✔
1867

1868
    pStartRunner = pRunner;
2✔
1869
    totalTaskNum = 0;
2✔
1870

1871
    newNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == r) ? true : false);
2✔
1872
    if (!GOT_SNODE(newNodeId)) {
2!
1873
      TAOS_CHECK_EXIT(terrno);
×
1874
    }
1875

1876
    taskIdx = 0;
2✔
1877
    
1878
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
4✔
1879
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
2✔
1880
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
2!
1881
      totalTaskNum += taskNum;
2✔
1882

1883
      pRunner->flags &= STREAM_FLAG_REDEPLOY_RUNNER;
2✔
1884
      
1885
      for (int32_t n = 0; n < taskNum; ++n) {
4✔
1886
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
2✔
1887

1888
        int32_t newTaskIdx = MND_SET_RUNNER_TASKIDX(i, n);
2✔
1889
        if (pRunner->id.taskIdx != newTaskIdx) {
2!
1890
          mstsError("runner TASK:%" PRId64 " taskIdx %d mismatch with newTaskIdx:%d", pRunner->id.taskId, pRunner->id.taskIdx, newTaskIdx);
×
1891
          TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
1892
        }
1893

1894
        pRunner->id.nodeId = newNodeId;
2✔
1895

1896
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
2✔
1897
        pDeploy->task.type = pRunner->type;
2✔
1898
        pDeploy->task.streamId = streamId;
2✔
1899
        pDeploy->task.taskId = pRunner->id.taskId;
2✔
1900
        pDeploy->task.flags = pRunner->flags;
2✔
1901
        pDeploy->task.seriousId = pRunner->id.seriousId;
2✔
1902
        pDeploy->task.nodeId = pRunner->id.nodeId;
2✔
1903
        pDeploy->task.taskIdx = pRunner->id.taskIdx;
2✔
1904
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
2!
1905

1906
        pRunner++;
2✔
1907
      }
1908

1909
      mstsDebug("level %d initialized, taskNum:%d", i, taskNum);
2!
1910
    }
1911

1912
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
2!
1913

1914
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
2!
1915

1916
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[deployId], NULL, 0, deployId));
2!
1917

1918
    nodesDestroyNode((SNode *)pDag);
2✔
1919
    pDag = NULL;
2✔
1920

1921
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
2!
1922
  }
1923

1924
_exit:
2✔
1925

1926
  if (code) {
2!
1927
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1928
  }
1929

1930
  nodesDestroyNode((SNode *)pDag);
2✔
1931
  taosArrayDestroy(deployTaskList);
2✔
1932

1933
  return code;
2✔
1934
}
1935

1936

1937
int32_t msmSetStreamRunnerExecReplica(int64_t streamId, SStmStatus* pInfo) {
319✔
1938
  int32_t code = TSDB_CODE_SUCCESS;
319✔
1939
  int32_t lino = 0;
319✔
1940
  //STREAMTODO 
1941
  
1942
  pInfo->runnerDeploys = MND_STREAM_RUNNER_DEPLOY_NUM;
319✔
1943
  pInfo->runnerReplica = MND_STREAM_RUNNER_REPLICA_NUM;
319✔
1944

1945
_exit:
319✔
1946

1947
  if (code) {
319!
1948
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1949
  }
1950

1951
  return code;
319✔
1952
}
1953

1954

1955
static int32_t msmBuildRunnerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
341✔
1956
  if (NULL == pStream->pCreate->calcPlan) {
341✔
1957
    return TSDB_CODE_SUCCESS;
1✔
1958
  }
1959
  
1960
  int32_t code = TSDB_CODE_SUCCESS;
340✔
1961
  int32_t lino = 0;
340✔
1962
  int64_t streamId = pStream->pCreate->streamId;
340✔
1963
  SQueryPlan* pPlan = NULL;
340✔
1964

1965
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
340!
1966

1967
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,360✔
1968
    pInfo->runners[i] = taosArrayInit(pPlan->numOfSubplans, sizeof(SStmTaskStatus));
1,020✔
1969
    TSDB_CHECK_NULL(pInfo->runners[i], code, lino, _exit, terrno);
1,020!
1970
  }
1971

1972
  code = msmBuildRunnerTasksImpl(pCtx, pPlan, pInfo, pStream);
340✔
1973
  pPlan = NULL;
340✔
1974
  
1975
  TAOS_CHECK_EXIT(code);
340!
1976

1977
  taosHashClear(mStreamMgmt.toUpdateScanMap);
340✔
1978
  mStreamMgmt.toUpdateScanNum = 0;
340✔
1979

1980
_exit:
340✔
1981

1982
  nodesDestroyNode((SNode *)pPlan);
340✔
1983

1984
  if (code) {
340!
1985
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1986
  }
1987

1988
  return code;
340✔
1989
}
1990

1991

1992
static int32_t msmBuildStreamTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
341✔
1993
  int32_t code = TSDB_CODE_SUCCESS;
341✔
1994
  int32_t lino = 0;
341✔
1995
  int64_t streamId = pStream->pCreate->streamId;
341✔
1996

1997
  mstsInfo("start to deploy stream tasks, deployTimes:%" PRId64, pInfo->deployTimes);
341!
1998

1999
  pCtx->triggerTaskId = msmAssignTaskId();
341✔
2000
  pCtx->triggerNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
341✔
2001
  if (!GOT_SNODE(pCtx->triggerNodeId)) {
341!
2002
    TAOS_CHECK_EXIT(terrno);
×
2003
  }
2004

2005
  TAOS_CHECK_EXIT(msmBuildReaderTasks(pCtx, pInfo, pStream));
341!
2006
  TAOS_CHECK_EXIT(msmBuildRunnerTasks(pCtx, pInfo, pStream));
341!
2007
  TAOS_CHECK_EXIT(msmBuildTriggerTasks(pCtx, pInfo, pStream));
341!
2008
  
2009
_exit:
341✔
2010

2011
  if (code) {
341!
2012
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2013
  }
2014

2015
  return code;
341✔
2016
}
2017

2018
static int32_t msmInitTrigReaderList(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
2019
  int32_t code = TSDB_CODE_SUCCESS;
×
2020
  int32_t lino = 0;
×
2021
  int64_t streamId = pStream->pCreate->streamId;
×
2022
  SSdb   *pSdb = pCtx->pMnode->pSdb;
×
2023
  SStmTaskStatus* pState = NULL;
×
2024
  SDbObj* pDb = NULL;
×
2025
  
2026
  switch (pStream->pCreate->triggerTblType) {
×
2027
    case TSDB_NORMAL_TABLE:
×
2028
    case TSDB_CHILD_TABLE:
2029
    case TSDB_VIRTUAL_CHILD_TABLE:
2030
    case TSDB_VIRTUAL_NORMAL_TABLE: {
2031
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
×
2032
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
2033
      pInfo->trigReaderNum = 1;
×
2034
      break;
×
2035
    }
2036
    case TSDB_SUPER_TABLE: {
×
2037
      pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
×
2038
      if (NULL == pDb) {
×
2039
        code = terrno;
×
2040
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
2041
        goto _exit;
×
2042
      }
2043

2044
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
×
2045
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
2046
      pInfo->trigReaderNum = pDb->cfg.numOfVgroups;
×
2047
      mndReleaseDb(pCtx->pMnode, pDb);
×
2048
      pDb = NULL;
×
2049
      break;
×
2050
    }
2051
    default:
×
2052
      pInfo->trigReaderNum = 0;
×
2053
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
×
2054
      break;
×
2055
  }
2056

2057
_exit:
×
2058

2059
  if (code) {
×
2060
    mndReleaseDb(pCtx->pMnode, pDb);
×
2061
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2062
  }
2063

2064
  return code;
×
2065
}
2066

2067

2068
static int32_t msmInitStmStatus(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStreamObj* pStream, bool initList) {
320✔
2069
  int32_t code = TSDB_CODE_SUCCESS;
320✔
2070
  int32_t lino = 0;
320✔
2071
  int64_t streamId = pStream->pCreate->streamId;
320✔
2072

2073
  pStatus->lastActionTs = INT64_MIN;
320✔
2074

2075
  if (NULL == pStatus->streamName) {
320!
2076
    pStatus->streamName = taosStrdup(pStream->name);
320!
2077
    TSDB_CHECK_NULL(pStatus->streamName, code, lino, _exit, terrno);
320!
2078
  }
2079

2080
  TAOS_CHECK_EXIT(tCloneStreamCreateDeployPointers(pStream->pCreate, &pStatus->pCreate));
320!
2081
  
2082
  if (pStream->pCreate->numOfCalcSubplan > 0) {
320✔
2083
    pStatus->runnerNum = pStream->pCreate->numOfCalcSubplan;
319✔
2084
    
2085
    TAOS_CHECK_EXIT(msmSetStreamRunnerExecReplica(streamId, pStatus));
319!
2086
  }
2087

2088
  if (initList) {
320!
2089
    TAOS_CHECK_EXIT(msmInitTrigReaderList(pCtx, pStatus, pStream));
×
2090

2091
    int32_t subPlanNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
×
2092
    if (subPlanNum > 0) {
×
2093
      pStatus->calcReaderNum = subPlanNum;
×
2094
      pStatus->calcReaders = tdListNew(sizeof(SStmTaskStatus));
×
2095
      TSDB_CHECK_NULL(pStatus->calcReaders, code, lino, _exit, terrno);
×
2096
    }
2097

2098
    if (pStatus->runnerNum > 0) {
×
2099
      for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
2100
        pStatus->runners[i] = taosArrayInit(pStatus->runnerNum, sizeof(SStmTaskStatus));
×
2101
        TSDB_CHECK_NULL(pStatus->runners[i], code, lino, _exit, terrno);
×
2102
      }
2103
    }
2104
  }
2105
  
2106
_exit:
320✔
2107

2108
  if (code) {
320!
2109
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2110
  }
2111

2112
  return code;
320✔
2113
}
2114

2115
static int32_t msmDeployStreamTasks(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmStatus* pStatus) {
341✔
2116
  int32_t code = TSDB_CODE_SUCCESS;
341✔
2117
  int32_t lino = 0;
341✔
2118
  int64_t streamId = pStream->pCreate->streamId;
341✔
2119
  SStmStatus info = {0};
341✔
2120

2121
  if (NULL == pStatus) {
341✔
2122
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &info, pStream, false));
320!
2123

2124
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &info, sizeof(info)));
320!
2125

2126
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
320✔
2127
  }
2128
  
2129
  TAOS_CHECK_EXIT(msmBuildStreamTasks(pCtx, pStatus, pStream));
341!
2130

2131
  mstLogSStmStatus("stream deployed", streamId, pStatus);
341✔
2132

2133
_exit:
341✔
2134

2135
  if (code) {
341!
2136
    if (NULL != pStatus) {
×
2137
      msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2138
      mstsError("stream build error:%s, will try to stop current stream", tstrerror(code));
×
2139
    }
2140
    
2141
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2142
  }
2143

2144
  return code;
341✔
2145
}
2146

2147

2148
static int32_t msmSTRemoveStream(int64_t streamId, bool fromStreamMap) {
23✔
2149
  int32_t code = TSDB_CODE_SUCCESS;
23✔
2150
  void* pIter = NULL;
23✔
2151

2152
  while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
39✔
2153
    SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
16✔
2154
    (void)mstWaitLock(&pVg->lock, true);
16✔
2155

2156
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
16✔
2157
    if (atomic_load_32(&pVg->deployed) == taskNum) {
16!
2158
      taosRUnLockLatch(&pVg->lock);
×
2159
      continue;
×
2160
    }
2161

2162
    for (int32_t i = 0; i < taskNum; ++i) {
78✔
2163
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
62✔
2164
      if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
62✔
2165
        continue;
56✔
2166
      }
2167

2168
      mstDestroySStmTaskToDeployExt(pExt);
6✔
2169
      pExt->deployed = true;
6✔
2170
    }
2171
    
2172
    taosRUnLockLatch(&pVg->lock);
16✔
2173
  }
2174

2175
  while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
52✔
2176
    SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
29✔
2177
    (void)mstWaitLock(&pSnode->lock, true);
29✔
2178

2179
    int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
29✔
2180
    if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
29✔
2181
      for (int32_t i = 0; i < taskNum; ++i) {
52✔
2182
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
32✔
2183
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
32✔
2184
          continue;
29✔
2185
        }
2186
        
2187
        mstDestroySStmTaskToDeployExt(pExt);
3✔
2188
        pExt->deployed = true;
3✔
2189
      }
2190
    }
2191

2192
    taskNum = taosArrayGetSize(pSnode->runnerList);
29✔
2193
    if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
29!
2194
      for (int32_t i = 0; i < taskNum; ++i) {
125✔
2195
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
96✔
2196
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
96✔
2197
          continue;
87✔
2198
        }
2199
        
2200
        mstDestroySStmTaskToDeployExt(pExt);
9✔
2201
        pExt->deployed = true;
9✔
2202
      }
2203
    }
2204

2205
    taosRUnLockLatch(&pSnode->lock);
29✔
2206
  }
2207

2208
  
2209
  while ((pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter))) {
79✔
2210
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
56✔
2211
    code = taosHashRemove(pSnode->streamTasks, &streamId, sizeof(streamId));
56✔
2212
    if (TSDB_CODE_SUCCESS == code) {
56✔
2213
      mstsDebug("stream removed from snodeMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pSnode->streamTasks));
33✔
2214
    }
2215
  }
2216

2217
  while ((pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter))) {
101✔
2218
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
78✔
2219
    code = taosHashRemove(pVg->streamTasks, &streamId, sizeof(streamId));
78✔
2220
    if (TSDB_CODE_SUCCESS == code) {
78✔
2221
      mstsDebug("stream removed from vgroupMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pVg->streamTasks));
34✔
2222
    }
2223
  }
2224

2225
  size_t keyLen = 0;
23✔
2226
  while ((pIter = taosHashIterate(mStreamMgmt.taskMap, pIter))) {
1,248✔
2227
    int64_t* pStreamId = taosHashGetKey(pIter, &keyLen);
1,225✔
2228
    if (*pStreamId == streamId) {
1,225✔
2229
      int64_t taskId = *(pStreamId + 1);
147✔
2230
      code = taosHashRemove(mStreamMgmt.taskMap, pStreamId, keyLen);
147✔
2231
      if (code) {
147!
2232
        mstsError("TASK:%" PRIx64 " remove from taskMap failed, error:%s", taskId, tstrerror(code));
×
2233
      } else {
2234
        mstsDebug("TASK:%" PRIx64 " removed from taskMap", taskId);
147✔
2235
      }
2236
    }
2237
  }
2238

2239
  if (fromStreamMap) {
23✔
2240
    code = taosHashRemove(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
2✔
2241
    if (code) {
2!
2242
      mstsError("stream remove from streamMap failed, error:%s", tstrerror(code));
×
2243
    } else {
2244
      mstsDebug("stream removed from streamMap, remains:%d", taosHashGetSize(mStreamMgmt.streamMap));
2✔
2245
    }
2246
  }
2247
  
2248
  return code;
23✔
2249
}
2250

2251
static void msmResetStreamForRedeploy(int64_t streamId, SStmStatus* pStatus) {
21✔
2252
  mstsInfo("try to reset stream for redeploy, stopped:%d, current deployTimes:%" PRId64, atomic_load_8(&pStatus->stopped), pStatus->deployTimes);
21!
2253
  
2254
  (void)msmSTRemoveStream(streamId, false);  
21✔
2255

2256
  mstResetSStmStatus(pStatus);
21✔
2257

2258
  pStatus->deployTimes++;
21✔
2259
}
21✔
2260

2261
static int32_t msmLaunchStreamDeployAction(SStmGrpCtx* pCtx, SStmStreamAction* pAction) {
343✔
2262
  int32_t code = TSDB_CODE_SUCCESS;
343✔
2263
  int32_t lino = 0;
343✔
2264
  int64_t streamId = pAction->streamId;
343✔
2265
  char* streamName = pAction->streamName;
343✔
2266
  SStreamObj* pStream = NULL;
343✔
2267
  int8_t stopped = 0;
343✔
2268

2269
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
343✔
2270
  if (pStatus) {
343✔
2271
    stopped = atomic_load_8(&pStatus->stopped);
21✔
2272
    if (0 == stopped) {
21✔
2273
      mstsDebug("stream %s will try to reset and redeploy it", pAction->streamName);
3!
2274
      msmResetStreamForRedeploy(streamId, pStatus);
3✔
2275
    } else {
2276
      if (MST_IS_USER_STOPPED(stopped) && !pAction->userAction) {
18!
2277
        mstsWarn("stream %s already stopped by user, stopped:%d, ignore deploy it", pAction->streamName, stopped);
×
2278
        return code;
×
2279
      }
2280
      
2281
      if (stopped == atomic_val_compare_exchange_8(&pStatus->stopped, stopped, 0)) {
18!
2282
        mstsDebug("stream %s will try to reset and redeploy it from stopped %d", pAction->streamName, stopped);
18✔
2283
        msmResetStreamForRedeploy(streamId, pStatus);
18✔
2284
      }
2285
    }
2286
  }
2287

2288
  code = mndAcquireStream(pCtx->pMnode, streamName, &pStream);
343✔
2289
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
343✔
2290
    mstsWarn("stream %s no longer exists, ignore deploy", streamName);
2!
2291
    return TSDB_CODE_SUCCESS;
2✔
2292
  }
2293

2294
  TAOS_CHECK_EXIT(code);
341!
2295

2296
  if (pStatus && pStream->pCreate->streamId != streamId) {
341!
2297
    mstsWarn("stream %s already dropped by user, ignore deploy it", pAction->streamName);
×
2298
    atomic_store_8(&pStatus->stopped, 2);
×
2299
    mstsInfo("set stream %s stopped by user since streamId mismatch", streamName);
×
2300
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2301
  }
2302

2303
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
341✔
2304
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
341✔
2305
  if (userStopped || userDropped) {
341!
2306
    mstsWarn("stream %s is stopped %d or removing %d, ignore deploy", streamName, userStopped, userDropped);
×
2307
    goto _exit;
×
2308
  }
2309
  
2310
  TAOS_CHECK_EXIT(msmDeployStreamTasks(pCtx, pStream, pStatus));
341!
2311

2312
_exit:
341✔
2313

2314
  mndReleaseStream(pCtx->pMnode, pStream);
341✔
2315

2316
  if (code) {
341!
2317
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2318
  }
2319

2320
  return code;
341✔
2321
}
2322

2323
static int32_t msmReLaunchReaderTask(SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
8✔
2324
  int32_t code = TSDB_CODE_SUCCESS;
8✔
2325
  int32_t lino = 0;
8✔
2326
  int64_t streamId = pAction->streamId;
8✔
2327
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
8✔
2328
  if (NULL == ppTask) {
8!
2329
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
×
2330
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2331
  }
2332
  
2333
  SStmTaskDeploy info = {0};
8✔
2334
  info.task.type = pAction->type;
8✔
2335
  info.task.streamId = pAction->streamId;
8✔
2336
  info.task.taskId = pAction->id.taskId;
8✔
2337
  info.task.seriousId = (*ppTask)->id.seriousId;
8✔
2338
  info.task.nodeId = pAction->id.nodeId;
8✔
2339
  info.task.taskIdx = pAction->id.taskIdx;
8✔
2340
  
2341
  bool isTriggerReader = STREAM_IS_TRIGGER_READER(pAction->flag);
8✔
2342
  SStreamCalcScan* scanPlan = NULL;
8✔
2343
  if (!isTriggerReader) {
8✔
2344
    scanPlan = taosArrayGet(pStatus->pCreate->calcScanPlanList, pAction->id.taskIdx);
4✔
2345
    if (NULL == scanPlan) {
4!
2346
      mstsError("fail to get TASK:%" PRId64 " scanPlan, taskIdx:%d, scanPlanNum:%zu", 
×
2347
          pAction->id.taskId, pAction->id.taskIdx, taosArrayGetSize(pStatus->pCreate->calcScanPlanList));
2348
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2349
    }
2350
  }
2351
  
2352
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, scanPlan ? scanPlan->scanPlan : NULL, pStatus, isTriggerReader));
8!
2353
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, pAction->streamId));
8!
2354

2355
_exit:
8✔
2356

2357
  if (code) {
8!
2358
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2359
  }
2360

2361
  return code;
8✔
2362
}
2363

2364
/*
2365
static int32_t msmReLaunchTriggerTask(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
2366
  int32_t code = TSDB_CODE_SUCCESS;
2367
  int32_t lino = 0;
2368
  int64_t streamId = pAction->streamId;
2369
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
2370
  if (NULL == ppTask) {
2371
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
2372
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
2373
  }
2374
  
2375
  (*ppTask)->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2376
  if (!GOT_SNODE((*ppTask)->id.nodeId)) {
2377
    mstsError("no avaible snode for deploying trigger task, seriousId: %" PRId64, (*ppTask)->id.seriousId);
2378
    return TSDB_CODE_SUCCESS;
2379
  }
2380
  
2381
  SStmTaskDeploy info = {0};
2382
  info.task.type = pAction->type;
2383
  info.task.streamId = streamId;
2384
  info.task.taskId = pAction->id.taskId;
2385
  info.task.seriousId = (*ppTask)->id.seriousId;
2386
  info.task.nodeId = (*ppTask)->id.nodeId;
2387
  info.task.taskIdx = pAction->id.taskIdx;
2388
  
2389
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2390
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2391
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, *ppTask, 1, -1));
2392
  
2393
  atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2394

2395
_exit:
2396

2397
  if (code) {
2398
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
2399
  }
2400

2401
  return code;
2402
}
2403
*/
2404

2405
static int32_t msmReLaunchRunnerDeploy(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
2✔
2406
  int32_t code = TSDB_CODE_SUCCESS;
2✔
2407
  int32_t lino = 0;
2✔
2408
  int64_t streamId = pAction->streamId;
2✔
2409
  
2410
/*
2411
  if (pAction->triggerStatus) {
2412
    pCtx->triggerTaskId = pAction->triggerStatus->id.taskId;
2413
    pAction->triggerStatus->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2414
    if (!GOT_SNODE(pAction->triggerStatus->id.nodeId)) {
2415
      mstsError("no avaible snode for deploying trigger task, seriousId:%" PRId64, pAction->triggerStatus->id.seriousId);
2416
      return TSDB_CODE_SUCCESS;
2417
    }
2418
  
2419
    pCtx->triggerNodeId = pAction->triggerStatus->id.nodeId;
2420
  } else {
2421
*/
2422
  pCtx->triggerTaskId = pStatus->triggerTask->id.taskId;
2✔
2423
  pCtx->triggerNodeId = pStatus->triggerTask->id.nodeId;
2✔
2424
//  }
2425
  
2426
  TAOS_CHECK_EXIT(msmUPPrepareReaderTasks(pCtx, pStatus, pStream));
2!
2427
  
2428
  SQueryPlan* pPlan = NULL;
2✔
2429
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
2!
2430
  
2431
  TAOS_CHECK_EXIT(msmReBuildRunnerTasks(pCtx, pPlan, pStatus, pStream, pAction));
2!
2432
  
2433
  taosHashClear(mStreamMgmt.toUpdateScanMap);
2✔
2434
  mStreamMgmt.toUpdateScanNum = 0;
2✔
2435
  
2436
/*
2437
  if (pAction->triggerStatus) {
2438
    SStmTaskDeploy info = {0};
2439
    info.task.type = STREAM_TRIGGER_TASK;
2440
    info.task.streamId = streamId;
2441
    info.task.taskId = pCtx->triggerTaskId;
2442
    info.task.seriousId = pAction->triggerStatus->id.seriousId;
2443
    info.task.nodeId = pCtx->triggerNodeId;
2444
    info.task.taskIdx = 0;
2445
  
2446
    TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2447
    TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2448
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pAction->triggerStatus, 1, -1));
2449
    
2450
    atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2451
  }
2452
*/
2453

2454
_exit:
2✔
2455

2456
  if (code) {
2!
2457
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2458
  }
2459

2460
  return code;
2✔
2461
}
2462

2463

2464
static int32_t msmLaunchTaskDeployAction(SStmGrpCtx* pCtx, SStmTaskAction* pAction) {
10✔
2465
  int32_t code = TSDB_CODE_SUCCESS;
10✔
2466
  int32_t lino = 0;
10✔
2467
  int64_t streamId = pAction->streamId;
10✔
2468
  int64_t taskId = pAction->id.taskId;
10✔
2469
  SStreamObj* pStream = NULL;
10✔
2470

2471
  mstsDebug("start to handle stream tasks action, action task type:%s", gStreamTaskTypeStr[pAction->type]);
10!
2472

2473
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &pAction->streamId, sizeof(pAction->streamId));
10✔
2474
  if (NULL == pStatus) {
10!
2475
    mstsWarn("stream not in streamMap, remain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2476
    return TSDB_CODE_SUCCESS;
×
2477
  }
2478

2479
  int8_t stopped = atomic_load_8(&pStatus->stopped);
10✔
2480
  if (stopped) {
10!
2481
    mstsWarn("stream %s is already stopped %d, ignore task deploy", pStatus->streamName, stopped);
×
2482
    return TSDB_CODE_SUCCESS;
×
2483
  }
2484

2485
  code = mndAcquireStream(pCtx->pMnode, pStatus->streamName, &pStream);
10✔
2486
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
10!
2487
    mstsWarn("stream %s no longer exists, ignore task deploy", pStatus->streamName);
×
2488
    return TSDB_CODE_SUCCESS;
×
2489
  }
2490

2491
  TAOS_CHECK_EXIT(code);
10!
2492

2493
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
10✔
2494
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
10✔
2495
  if (userStopped || userDropped) {
10!
2496
    mstsWarn("stream %s is stopped %d or removing %d, ignore task deploy", pStatus->streamName, userStopped, userDropped);
×
2497
    goto _exit;
×
2498
  }
2499

2500
  switch (pAction->type) {
10!
2501
    case STREAM_READER_TASK:
8✔
2502
      TAOS_CHECK_EXIT(msmReLaunchReaderTask(pStream, pAction, pStatus));
8!
2503
      break;
8✔
2504
/*
2505
    case STREAM_TRIGGER_TASK:
2506
      TAOS_CHECK_EXIT(msmReLaunchTriggerTask(pCtx, pStream, pAction, pStatus));
2507
      break;
2508
*/
2509
    case STREAM_RUNNER_TASK:
2✔
2510
      if (pAction->multiRunner) {
2!
2511
        TAOS_CHECK_EXIT(msmReLaunchRunnerDeploy(pCtx, pStream, pAction, pStatus));
2!
2512
      } else {
2513
        mstsError("runner TASK:%" PRId64 " requires relaunch", pAction->id.taskId);
×
2514
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
2515
      }
2516
      break;
2✔
2517
    default:
×
2518
      mstsError("TASK:%" PRId64 " invalid task type:%d", pAction->id.taskId, pAction->type);
×
2519
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2520
      break;
×
2521
  }
2522

2523
_exit:
10✔
2524

2525
  if (pStream) {
10!
2526
    mndReleaseStream(pCtx->pMnode, pStream);
10✔
2527
  }
2528

2529
  if (code) {
10!
2530
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2531
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2532
  }
2533

2534
  return code;
10✔
2535
}
2536

2537
static int32_t msmTDRemoveStream(int64_t streamId) {
×
2538
  void* pIter = NULL;
×
2539
  
2540
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
×
2541
    while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
×
2542
      SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
×
2543
      int32_t taskNum = taosArrayGetSize(pVg->taskList);
×
2544
      if (atomic_load_32(&pVg->deployed) == taskNum) {
×
2545
        continue;
×
2546
      }
2547
      
2548
      for (int32_t i = 0; i < taskNum; ++i) {
×
2549
        SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
×
2550
        if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2551
          pExt->deployed = true;
×
2552
        }
2553
      }
2554
    }
2555
  }
2556

2557
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
×
2558
    while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
×
2559
      SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
×
2560
      int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
×
2561
      if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
×
2562
        for (int32_t i = 0; i < taskNum; ++i) {
×
2563
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
×
2564
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2565
            pExt->deployed = true;
×
2566
          }
2567
        }
2568
      }
2569

2570
      taskNum = taosArrayGetSize(pSnode->runnerList);
×
2571
      if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
×
2572
        for (int32_t i = 0; i < taskNum; ++i) {
×
2573
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
×
2574
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2575
            pExt->deployed = true;
×
2576
          }
2577
        }
2578
      }
2579
    }
2580
  }
2581

2582
  return TSDB_CODE_SUCCESS;
×
2583
}
2584

2585
static int32_t msmRemoveStreamFromMaps(SMnode* pMnode, int64_t streamId) {
2✔
2586
  int32_t code = TSDB_CODE_SUCCESS;
2✔
2587
  int32_t lino = 0;
2✔
2588

2589
  mstsInfo("start to remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
2!
2590

2591
  TAOS_CHECK_EXIT(msmSTRemoveStream(streamId, true));
2!
2592

2593
_exit:
2✔
2594

2595
  if (code) {
2!
2596
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2597
  } else {
2598
    mstsInfo("end remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
2!
2599
  }
2600

2601
  return code;
2✔
2602
}
2603

2604
void msmUndeployStream(SMnode* pMnode, int64_t streamId, char* streamName) {
35✔
2605
  int32_t code = TSDB_CODE_SUCCESS;
35✔
2606
  int32_t lino = 0;
35✔
2607

2608
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
35✔
2609
  if (0 == active || MND_STM_STATE_NORMAL != state) {
35!
2610
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2611
    return;
×
2612
  }
2613

2614
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
35✔
2615
  if (NULL == pStream) {
35✔
2616
    mstsInfo("stream %s already not in streamMap", streamName);
2!
2617
    goto _exit;
2✔
2618
  }
2619

2620
  atomic_store_8(&pStream->stopped, 2);
33✔
2621

2622
  mstsInfo("set stream %s stopped by user", streamName);
33!
2623

2624
_exit:
×
2625

2626
  taosHashRelease(mStreamMgmt.streamMap, pStream);
35✔
2627

2628
  if (code) {
35!
2629
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2630
  }
2631

2632
  return;
35✔
2633
}
2634

2635
int32_t msmRecalcStream(SMnode* pMnode, int64_t streamId, STimeWindow* timeRange) {
17✔
2636
  int32_t code = TSDB_CODE_SUCCESS;
17✔
2637
  int32_t lino = 0;
17✔
2638

2639
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
17✔
2640
  if (0 == active || MND_STM_STATE_NORMAL != state) {
17!
2641
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2642
    return TSDB_CODE_MND_STREAM_NOT_AVAILABLE;
×
2643
  }
2644

2645
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
17✔
2646
  if (NULL == pStream || !STREAM_IS_RUNNING(pStream)) {
17!
2647
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
2648
    mstsInfo("stream still not in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2649
    goto _exit;
×
2650
  }
2651

2652
  TAOS_CHECK_EXIT(mstAppendNewRecalcRange(streamId, pStream, timeRange));
17!
2653

2654
_exit:
17✔
2655

2656
  taosHashRelease(mStreamMgmt.streamMap, pStream);
17✔
2657

2658
  if (code) {
17!
2659
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2660
  }
2661

2662
  return code;
17✔
2663
}
2664

2665
static void msmHandleStreamActions(SStmGrpCtx* pCtx) {
89✔
2666
  int32_t code = TSDB_CODE_SUCCESS;
89✔
2667
  int32_t lino = 0;
89✔
2668
  SStmQNode* pQNode = NULL;
89✔
2669

2670
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
442✔
2671
    switch (pQNode->type) {
353!
2672
      case STREAM_ACT_DEPLOY:
353✔
2673
        if (pQNode->streamAct) {
353✔
2674
          mstDebug("start to handle stream deploy action");
343✔
2675
          TAOS_CHECK_EXIT(msmLaunchStreamDeployAction(pCtx, &pQNode->action.stream));
343!
2676
        } else {
2677
          mstDebug("start to handle task deploy action");
10!
2678
          TAOS_CHECK_EXIT(msmLaunchTaskDeployAction(pCtx, &pQNode->action.task));
10!
2679
        }
2680
        break;
353✔
2681
      default:
×
2682
        break;
×
2683
    }
2684
  }
2685

2686
_exit:
89✔
2687

2688
  if (code) {
89!
2689
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2690
  }
2691
}
89✔
2692

2693
void msmStopAllStreamsByGrant(int32_t errCode) {
×
2694
  SStmStatus* pStatus = NULL;
×
2695
  void* pIter = NULL;
×
2696
  int64_t streamId = 0;
×
2697
  
2698
  while (true) {
2699
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
2700
    if (NULL == pIter) {
×
2701
      break;
×
2702
    }
2703

2704
    pStatus = (SStmStatus*)pIter;
×
2705

2706
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
2707
    atomic_store_8(&pStatus->stopped, 4);
×
2708

2709
    mstsInfo("set stream stopped since %s", tstrerror(errCode));
×
2710
  }
2711
}
×
2712

2713
int32_t msmHandleGrantExpired(SMnode *pMnode, int32_t errCode) {
×
2714
  mstInfo("stream grant expired");
×
2715

2716
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
×
2717
    mstWarn("mnode stream is NOT active, ignore handling");
×
2718
    return errCode;
×
2719
  }
2720

2721
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
2722

2723
  msmStopAllStreamsByGrant(errCode);
×
2724

2725
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
2726
  
2727
  return errCode;
×
2728
}
2729

2730
static int32_t msmInitStreamDeploy(SStmStreamDeploy* pStream, SStmTaskDeploy* pDeploy) {
2,169✔
2731
  int32_t code = TSDB_CODE_SUCCESS;
2,169✔
2732
  int32_t lino = 0;
2,169✔
2733
  int64_t streamId = pDeploy->task.streamId;
2,169✔
2734
  
2735
  switch (pDeploy->task.type) {
2,169!
2736
    case STREAM_READER_TASK:
752✔
2737
      if (NULL == pStream->readerTasks) {
752✔
2738
        pStream->streamId = streamId;
382✔
2739
        pStream->readerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
382✔
2740
        TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
382!
2741
      }
2742
      
2743
      TSDB_CHECK_NULL(taosArrayPush(pStream->readerTasks, pDeploy), code, lino, _exit, terrno);
1,504!
2744
      break;
752✔
2745
    case STREAM_TRIGGER_TASK:
338✔
2746
      pStream->streamId = streamId;
338✔
2747
      pStream->triggerTask = taosMemoryMalloc(sizeof(SStmTaskDeploy));
338!
2748
      TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
338!
2749
      memcpy(pStream->triggerTask, pDeploy, sizeof(SStmTaskDeploy));
338✔
2750
      break;
338✔
2751
    case STREAM_RUNNER_TASK:
1,079✔
2752
      if (NULL == pStream->runnerTasks) {
1,079✔
2753
        pStream->streamId = streamId;
354✔
2754
        pStream->runnerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
354✔
2755
        TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
354!
2756
      }      
2757
      TSDB_CHECK_NULL(taosArrayPush(pStream->runnerTasks, pDeploy), code, lino, _exit, terrno);
2,158!
2758
      break;
1,079✔
2759
    default:
×
2760
      break;
×
2761
  }
2762

2763
_exit:
2,169✔
2764

2765
  if (code) {
2,169!
2766
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2767
  }
2768

2769
  return code;
2,169✔
2770
}
2771

2772
static int32_t msmGrpAddDeployTask(SHashObj* pHash, SStmTaskDeploy* pDeploy) {
2,169✔
2773
  int32_t code = TSDB_CODE_SUCCESS;
2,169✔
2774
  int32_t lino = 0;
2,169✔
2775
  int64_t streamId = pDeploy->task.streamId;
2,169✔
2776
  SStreamTask* pTask = &pDeploy->task;
2,169✔
2777
  SStmStreamDeploy streamDeploy = {0};
2,169✔
2778
  SStmStreamDeploy* pStream = NULL;
2,169✔
2779
   
2780
  while (true) {
2781
    pStream = taosHashAcquire(pHash, &streamId, sizeof(streamId));
2,169✔
2782
    if (NULL == pStream) {
2,169✔
2783
      TAOS_CHECK_EXIT(msmInitStreamDeploy(&streamDeploy, pDeploy));
403!
2784
      code = taosHashPut(pHash, &streamId, sizeof(streamId), &streamDeploy, sizeof(streamDeploy));
403✔
2785
      if (TSDB_CODE_SUCCESS == code) {
403!
2786
        goto _exit;
403✔
2787
      }
2788

2789
      if (TSDB_CODE_DUP_KEY != code) {
×
2790
        goto _exit;
×
2791
      }    
2792

2793
      tFreeSStmStreamDeploy(&streamDeploy);
×
2794
      continue;
×
2795
    }
2796

2797
    TAOS_CHECK_EXIT(msmInitStreamDeploy(pStream, pDeploy));
1,766!
2798
    
2799
    break;
1,766✔
2800
  }
2801
  
2802
_exit:
2,169✔
2803

2804
  taosHashRelease(pHash, pStream);
2,169✔
2805

2806
  if (code) {
2,169!
2807
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2808
  } else {
2809
    msttDebug("task added to GRP deployMap, taskIdx:%d", pTask->taskIdx);
2,169✔
2810
  }
2811

2812
  return code;
2,169✔
2813
}
2814

2815

2816
int32_t msmGrpAddDeployTasks(SHashObj* pHash, SArray* pTasks, int32_t* deployed) {
463✔
2817
  int32_t code = TSDB_CODE_SUCCESS;
463✔
2818
  int32_t lino = 0;
463✔
2819
  int32_t taskNum = taosArrayGetSize(pTasks);
463✔
2820

2821
  for (int32_t i = 0; i < taskNum; ++i) {
2,650✔
2822
    SStmTaskToDeployExt* pExt = taosArrayGet(pTasks, i);
2,187✔
2823
    if (pExt->deployed) {
2,187✔
2824
      continue;
18✔
2825
    }
2826

2827
    TAOS_CHECK_EXIT(msmGrpAddDeployTask(pHash, &pExt->deploy));
2,169!
2828
    pExt->deployed = true;
2,169✔
2829

2830
    (void)atomic_add_fetch_32(deployed, 1);
2,169✔
2831
  }
2832

2833
_exit:
463✔
2834

2835
  if (code) {
463!
2836
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2837
  }
2838

2839
  return code;
463✔
2840
}
2841

2842
int32_t msmGrpAddDeployVgTasks(SStmGrpCtx* pCtx) {
118✔
2843
  int32_t code = TSDB_CODE_SUCCESS;
118✔
2844
  int32_t lino = 0;
118✔
2845
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
118✔
2846
  SStmVgTasksToDeploy* pVg = NULL;
118✔
2847
  //int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pCtx->pReq->streamGId);
2848

2849
  mstDebug("start to add stream vgroup tasks deploy");
118✔
2850
  
2851
  for (int32_t i = 0; i < vgNum; ++i) {
763✔
2852
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
645✔
2853

2854
    msmUpdateVgroupUpTs(pCtx, *vgId);
645✔
2855

2856
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
645✔
2857
    if (NULL == pVg) {
645✔
2858
      continue;
368✔
2859
    }
2860

2861
    if (taosRTryLockLatch(&pVg->lock)) {
277!
2862
      continue;
×
2863
    }
2864
    
2865
    if (atomic_load_32(&pVg->deployed) == taosArrayGetSize(pVg->taskList)) {
277!
2866
      taosRUnLockLatch(&pVg->lock);
×
2867
      continue;
×
2868
    }
2869
    
2870
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pVg->taskList, &pVg->deployed));
277!
2871
    taosRUnLockLatch(&pVg->lock);
277✔
2872
  }
2873

2874
_exit:
118✔
2875

2876
  if (code) {
118!
2877
    if (pVg) {
×
2878
      taosRUnLockLatch(&pVg->lock);
×
2879
    }
2880

2881
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2882
  }
2883

2884
  return code;
118✔
2885
}
2886

2887
int32_t msmGrpAddDeploySnodeTasks(SStmGrpCtx* pCtx) {
104✔
2888
  int32_t code = TSDB_CODE_SUCCESS;
104✔
2889
  int32_t lino = 0;
104✔
2890
  SStmSnodeTasksDeploy* pSnode = NULL;
104✔
2891
  SStreamHbMsg* pReq = pCtx->pReq;
104✔
2892

2893
  mstDebug("start to add stream snode tasks deploy");
104✔
2894
  
2895
  pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pReq->snodeId, sizeof(pReq->snodeId));
104✔
2896
  if (NULL == pSnode) {
104✔
2897
    return TSDB_CODE_SUCCESS;
8✔
2898
  }
2899

2900
  (void)mstWaitLock(&pSnode->lock, false);
96✔
2901
  
2902
  if (atomic_load_32(&pSnode->triggerDeployed) < taosArrayGetSize(pSnode->triggerList)) {
96✔
2903
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->triggerList, &pSnode->triggerDeployed));
91!
2904
  }
2905

2906
  if (atomic_load_32(&pSnode->runnerDeployed) < taosArrayGetSize(pSnode->runnerList)) {
96✔
2907
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->runnerList, &pSnode->runnerDeployed));
95!
2908
  }
2909
  
2910
  taosWUnLockLatch(&pSnode->lock);
96✔
2911

2912
_exit:
96✔
2913

2914
  if (code) {
96!
2915
    if (pSnode) {
×
2916
      taosWUnLockLatch(&pSnode->lock);
×
2917
    }
2918

2919
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2920
  }
2921

2922
  return code;
96✔
2923
}
2924

2925
int32_t msmUpdateStreamLastActTs(int64_t streamId, int64_t currTs) {
969✔
2926
  int32_t code = TSDB_CODE_SUCCESS;
969✔
2927
  int32_t lino = 0;
969✔
2928
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
969✔
2929
  if (NULL == pStatus) {
969!
2930
    mstsWarn("stream already not exists in streamMap, mapSize:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2931
    return TSDB_CODE_SUCCESS;
×
2932
  }
2933
  
2934
  pStatus->lastActionTs = currTs;
969✔
2935

2936
_exit:
969✔
2937

2938
  if (code) {
969!
2939
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2940
  }
2941

2942
  return code;
969✔
2943
}
2944

2945
int32_t msmRspAddStreamsDeploy(SStmGrpCtx* pCtx) {
114✔
2946
  int32_t code = TSDB_CODE_SUCCESS;
114✔
2947
  int32_t lino = 0;
114✔
2948
  int32_t streamNum = taosHashGetSize(pCtx->deployStm);
114✔
2949
  void* pIter = NULL;
114✔
2950

2951
  mstDebug("start to add group %d deploy streams, streamNum:%d", pCtx->pReq->streamGId, taosHashGetSize(pCtx->deployStm));
114✔
2952
  
2953
  pCtx->pRsp->deploy.streamList = taosArrayInit(streamNum, sizeof(SStmStreamDeploy));
114✔
2954
  TSDB_CHECK_NULL(pCtx->pRsp->deploy.streamList, code, lino, _exit, terrno);
114!
2955

2956
  while (1) {
403✔
2957
    pIter = taosHashIterate(pCtx->deployStm, pIter);
517✔
2958
    if (pIter == NULL) {
517✔
2959
      break;
114✔
2960
    }
2961
    
2962
    SStmStreamDeploy *pDeploy = (SStmStreamDeploy *)pIter;
403✔
2963
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->deploy.streamList, pDeploy), code, lino, _exit, terrno);
806!
2964

2965
    int64_t streamId = pDeploy->streamId;
403✔
2966
    mstsDebug("stream DEPLOY added to dnode %d hb rsp, readerTasks:%zu, triggerTask:%d, runnerTasks:%zu", 
403✔
2967
        pCtx->pReq->dnodeId, taosArrayGetSize(pDeploy->readerTasks), pDeploy->triggerTask ? 1 : 0, taosArrayGetSize(pDeploy->runnerTasks));
2968

2969
    mstClearSStmStreamDeploy(pDeploy);
403✔
2970
    
2971
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(pDeploy->streamId, pCtx->currTs));
403!
2972
  }
2973
  
2974
_exit:
114✔
2975

2976
  if (pIter) {
114!
2977
    taosHashCancelIterate(pCtx->deployStm, pIter);
×
2978
  }
2979

2980
  if (code) {
114!
2981
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2982
  }
2983
  
2984
  return code;
114✔
2985
}
2986

2987
void msmCleanDeployedVgTasks(SArray* pVgLeaders) {
118✔
2988
  int32_t code = TSDB_CODE_SUCCESS;
118✔
2989
  int32_t lino = 0;
118✔
2990
  int32_t vgNum = taosArrayGetSize(pVgLeaders);
118✔
2991
  SStmVgTasksToDeploy* pVg = NULL;
118✔
2992
  
2993
  for (int32_t i = 0; i < vgNum; ++i) {
763✔
2994
    int32_t* vgId = taosArrayGet(pVgLeaders, i);
645✔
2995
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
645✔
2996
    if (NULL == pVg) {
645✔
2997
      continue;
368✔
2998
    }
2999

3000
    if (taosWTryLockLatch(&pVg->lock)) {
277!
3001
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
3002
      continue;
×
3003
    }
3004
    
3005
    if (atomic_load_32(&pVg->deployed) <= 0) {
277!
3006
      taosWUnLockLatch(&pVg->lock);
×
3007
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
3008
      continue;
×
3009
    }
3010

3011
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
277✔
3012
    if (atomic_load_32(&pVg->deployed) == taskNum) {
277✔
3013
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, taskNum);
276✔
3014
      taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
276✔
3015
      pVg->taskList = NULL;
276✔
3016
      TAOS_UNUSED(taosHashRemove(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId)));
276✔
3017
      taosWUnLockLatch(&pVg->lock);
276✔
3018
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
276✔
3019
      continue;
276✔
3020
    }
3021

3022
    for (int32_t m = taskNum - 1; m >= 0; --m) {
13✔
3023
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, m);
12✔
3024
      if (!pExt->deployed) {
12!
3025
        continue;
×
3026
      }
3027

3028
      mstDestroySStmTaskToDeployExt(pExt);
12✔
3029

3030
      taosArrayRemove(pVg->taskList, m);
12✔
3031
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
12✔
3032
    }
3033
    atomic_store_32(&pVg->deployed, 0);
1✔
3034
    taosWUnLockLatch(&pVg->lock);
1✔
3035
    taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
1✔
3036
  }
3037

3038
_exit:
118✔
3039

3040
  if (code) {
118!
3041
    if (pVg) {
×
3042
      taosWUnLockLatch(&pVg->lock);
×
3043
    }
3044

3045
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3046
  }
3047
}
118✔
3048

3049
void msmCleanDeployedSnodeTasks (int32_t snodeId) {
104✔
3050
  if (!GOT_SNODE(snodeId)) {
104!
3051
    return;
×
3052
  }
3053
  
3054
  int32_t code = TSDB_CODE_SUCCESS;
104✔
3055
  SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId));
104✔
3056
  if (NULL == pSnode) {
104✔
3057
    return;
8✔
3058
  }
3059

3060
  if (taosWTryLockLatch(&pSnode->lock)) {
96!
3061
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3062
    return;
×
3063
  }
3064

3065
  int32_t triggerNum = taosArrayGetSize(pSnode->triggerList);
96✔
3066
  int32_t runnerNum = taosArrayGetSize(pSnode->runnerList);
96✔
3067
  
3068
  if (atomic_load_32(&pSnode->triggerDeployed) <= 0 && atomic_load_32(&pSnode->runnerDeployed) <= 0) {
96✔
3069
    taosWUnLockLatch(&pSnode->lock);
1✔
3070
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
1✔
3071
    return;
1✔
3072
  }
3073

3074
  if (atomic_load_32(&pSnode->triggerDeployed) == triggerNum) {
95✔
3075
    (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, triggerNum);
93✔
3076
    taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
93✔
3077
    pSnode->triggerList = NULL;
93✔
3078
  }
3079

3080
  if (atomic_load_32(&pSnode->runnerDeployed) == runnerNum) {
95✔
3081
    (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, runnerNum);
92✔
3082
    taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
92✔
3083
    pSnode->runnerList = NULL;
92✔
3084
  }
3085

3086
  if (NULL == pSnode->triggerList && NULL == pSnode->runnerList) {
95✔
3087
    TAOS_UNUSED(taosHashRemove(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId)));
92✔
3088
    taosWUnLockLatch(&pSnode->lock);
92✔
3089
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
92✔
3090
    return;
92✔
3091
  }
3092

3093
  if (atomic_load_32(&pSnode->triggerDeployed) > 0 && pSnode->triggerList) {
3!
3094
    for (int32_t m = triggerNum - 1; m >= 0; --m) {
8✔
3095
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, m);
6✔
3096
      if (!pExt->deployed) {
6!
3097
        continue;
×
3098
      }
3099

3100
      mstDestroySStmTaskToDeployExt(pExt);
6✔
3101
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
6✔
3102
      taosArrayRemove(pSnode->triggerList, m);
6✔
3103
    }
3104
    
3105
    pSnode->triggerDeployed = 0;
2✔
3106
  }
3107

3108
  if (atomic_load_32(&pSnode->runnerDeployed) > 0 && pSnode->runnerList) {
3!
3109
    for (int32_t m = runnerNum - 1; m >= 0; --m) {
21✔
3110
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, m);
18✔
3111
      if (!pExt->deployed) {
18!
3112
        continue;
×
3113
      }
3114

3115
      mstDestroySStmTaskToDeployExt(pExt);
18✔
3116
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
18✔
3117
      taosArrayRemove(pSnode->runnerList, m);
18✔
3118
    }
3119
    
3120
    pSnode->runnerDeployed = 0;
3✔
3121
  }
3122
  
3123
  taosWUnLockLatch(&pSnode->lock);
3✔
3124
  taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
3✔
3125
}
3126

3127
void msmClearStreamToDeployMaps(SStreamHbMsg* pHb) {
30,112✔
3128
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
30,112✔
3129
    msmCleanDeployedVgTasks(pHb->pVgLeaders);
118✔
3130
  }
3131

3132
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
30,112✔
3133
    msmCleanDeployedSnodeTasks(pHb->snodeId);
104✔
3134
  }
3135
}
30,112✔
3136

3137
void msmCleanStreamGrpCtx(SStreamHbMsg* pHb) {
30,112✔
3138
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
30,112✔
3139
  if (mStreamMgmt.tCtx) {
30,112✔
3140
    taosHashClear(mStreamMgmt.tCtx[tidx].actionStm[pHb->streamGId]);
29,409✔
3141
    taosHashClear(mStreamMgmt.tCtx[tidx].deployStm[pHb->streamGId]);
29,409✔
3142
  }
3143
}
30,112✔
3144

3145
int32_t msmGrpAddActionStart(SHashObj* pHash, int64_t streamId, SStmTaskId* pId) {
407✔
3146
  int32_t code = TSDB_CODE_SUCCESS;
407✔
3147
  int32_t lino = 0;
407✔
3148
  int32_t action = STREAM_ACT_START;
407✔
3149
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
407✔
3150
  if (pAction) {
407!
3151
    pAction->actions |= action;
×
3152
    pAction->start.triggerId = *pId;
×
3153
    mstsDebug("stream append START action, actions:%x", pAction->actions);
×
3154
  } else {
3155
    SStmAction newAction = {0};
407✔
3156
    newAction.actions = action;
407✔
3157
    newAction.start.triggerId = *pId;
407✔
3158
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
407!
3159
    mstsDebug("stream add START action, actions:%x", newAction.actions);
407✔
3160
  }
3161

3162
_exit:
407✔
3163

3164
  if (code) {
407!
3165
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3166
  }
3167

3168
  return code;
407✔
3169
}
3170

3171
int32_t msmGrpAddActionUpdateTrigger(SHashObj* pHash, int64_t streamId) {
×
3172
  int32_t code = TSDB_CODE_SUCCESS;
×
3173
  int32_t lino = 0;
×
3174
  int32_t action = STREAM_ACT_UPDATE_TRIGGER;
×
3175
  
3176
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
×
3177
  if (pAction) {
×
3178
    pAction->actions |= action;
×
3179
    mstsDebug("stream append UPDATE_TRIGGER action, actions:%x", pAction->actions);
×
3180
  } else {
3181
    SStmAction newAction = {0};
×
3182
    newAction.actions = action;
×
3183
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3184
    mstsDebug("stream add UPDATE_TRIGGER action, actions:%x", newAction.actions);
×
3185
  }
3186

3187
_exit:
×
3188

3189
  if (code) {
×
3190
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3191
  }
3192

3193
  return code;
×
3194
}
3195

3196

3197

3198
int32_t msmGrpAddActionUndeploy(SStmGrpCtx* pCtx, int64_t streamId, SStreamTask* pTask) {
159✔
3199
  int32_t code = TSDB_CODE_SUCCESS;
159✔
3200
  int32_t lino = 0;
159✔
3201
  int32_t action = STREAM_ACT_UNDEPLOY;
159✔
3202
  bool    dropped = false;
159✔
3203

3204
  TAOS_CHECK_EXIT(mstIsStreamDropped(pCtx->pMnode, streamId, &dropped));
159!
3205
  mstsDebug("stream dropped: %d", dropped);
159✔
3206
  
3207
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
159✔
3208
  if (pAction) {
159✔
3209
    pAction->actions |= action;
103✔
3210
    if (NULL == pAction->undeploy.taskList) {
103!
3211
      pAction->undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
×
3212
      TSDB_CHECK_NULL(pAction->undeploy.taskList, code, lino, _exit, terrno);
×
3213
    }
3214

3215
    TSDB_CHECK_NULL(taosArrayPush(pAction->undeploy.taskList, &pTask), code, lino, _exit, terrno);
206!
3216
    if (pAction->undeploy.doCheckpoint) {
103✔
3217
      pAction->undeploy.doCheckpoint = dropped ? false : true;
63✔
3218
    }
3219
    if (!pAction->undeploy.doCleanup) {
103✔
3220
      pAction->undeploy.doCleanup = dropped ? true : false;
63✔
3221
    }
3222
    
3223
    msttDebug("task append UNDEPLOY action[%d,%d], actions:%x", pAction->undeploy.doCheckpoint, pAction->undeploy.doCleanup, pAction->actions);
103✔
3224
  } else {
3225
    SStmAction newAction = {0};
56✔
3226
    newAction.actions = action;
56✔
3227
    newAction.undeploy.doCheckpoint = dropped ? false : true;
56✔
3228
    newAction.undeploy.doCleanup = dropped ? true : false;
56✔
3229
    newAction.undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
56✔
3230
    TSDB_CHECK_NULL(newAction.undeploy.taskList, code, lino, _exit, terrno);
56!
3231
    TSDB_CHECK_NULL(taosArrayPush(newAction.undeploy.taskList, &pTask), code, lino, _exit, terrno);
112!
3232
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
56!
3233
    
3234
    msttDebug("task add UNDEPLOY action[%d,%d]", newAction.undeploy.doCheckpoint, newAction.undeploy.doCleanup);
56✔
3235
  }
3236

3237
_exit:
159✔
3238

3239
  if (code) {
159!
3240
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3241
  }
3242

3243
  return code;
159✔
3244
}
3245

3246
int32_t msmGrpAddActionRecalc(SStmGrpCtx* pCtx, int64_t streamId, SArray* recalcList) {
16✔
3247
  int32_t code = TSDB_CODE_SUCCESS;
16✔
3248
  int32_t lino = 0;
16✔
3249
  int32_t action = STREAM_ACT_RECALC;
16✔
3250
  SStmAction newAction = {0};
16✔
3251
  
3252
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
16✔
3253
  if (pAction) {
16!
3254
    pAction->actions |= action;
×
3255
    pAction->recalc.recalcList = recalcList;
×
3256

3257
    mstsDebug("stream append recalc action, listSize:%d, actions:%x", (int32_t)taosArrayGetSize(recalcList), pAction->actions);
×
3258
  } else {
3259
    newAction.actions = action;
16✔
3260
    newAction.recalc.recalcList = recalcList;
16✔
3261
    
3262
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
16!
3263
    
3264
    mstsDebug("stream add recalc action, listSize:%d", (int32_t)taosArrayGetSize(recalcList));
16!
3265
  }
3266

3267
_exit:
×
3268

3269
  if (code) {
16!
3270
    mstDestroySStmAction(&newAction);
×
3271
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3272
  }
3273

3274
  return code;
16✔
3275
}
3276

3277
bool msmCheckStreamStartCond(int64_t streamId, int32_t snodeId) {
465✔
3278
  SStmStatus* pStream = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
465✔
3279
  if (NULL == pStream) {
465!
3280
    return false;
×
3281
  }
3282

3283
  if (pStream->triggerTask->id.nodeId != snodeId || STREAM_STATUS_INIT != pStream->triggerTask->status) {
465!
3284
    return false;
×
3285
  }
3286

3287
  int32_t readerNum = taosArrayGetSize(pStream->trigReaders);
465✔
3288
  for (int32_t i = 0; i < readerNum; ++i) {
1,038✔
3289
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigReaders, i);
573✔
3290
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
573!
3291
      return false;
×
3292
    }
3293
  }
3294

3295
  readerNum = taosArrayGetSize(pStream->trigOReaders);
465✔
3296
  for (int32_t i = 0; i < readerNum; ++i) {
495✔
3297
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigOReaders, i);
30✔
3298
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
30!
3299
      return false;
×
3300
    }
3301
  }
3302

3303
  readerNum = MST_LIST_SIZE(pStream->calcReaders);
465!
3304
  SListNode* pNode = listHead(pStream->calcReaders);
465✔
3305
  for (int32_t i = 0; i < readerNum; ++i) {
881✔
3306
    SStmTaskStatus* pStatus = (SStmTaskStatus*)pNode->data;
416✔
3307
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
416!
3308
      return false;
×
3309
    }
3310
    pNode = TD_DLIST_NODE_NEXT(pNode);
416✔
3311
  }
3312

3313
  for (int32_t i = 0; i < pStream->runnerDeploys; ++i) {
1,683✔
3314
    int32_t runnerNum = taosArrayGetSize(pStream->runners[i]);
1,276✔
3315
    for (int32_t m = 0; m < runnerNum; ++m) {
2,560✔
3316
      SStmTaskStatus* pStatus = taosArrayGet(pStream->runners[i], m);
1,342✔
3317
      if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
1,342!
3318
        return false;
58✔
3319
      }
3320
    }
3321
  }
3322
  
3323
  return true;
407✔
3324
}
3325

3326

3327
void msmHandleTaskAbnormalStatus(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pMsg, SStmTaskStatus* pTaskStatus) {
15,236✔
3328
  int32_t code = TSDB_CODE_SUCCESS;
15,236✔
3329
  int32_t lino = 0;
15,236✔
3330
  int32_t action = 0;
15,236✔
3331
  int64_t streamId = pMsg->streamId;
15,236✔
3332
  SStreamTask* pTask = (SStreamTask*)pMsg;
15,236✔
3333
  int8_t  stopped = 0;
15,236✔
3334

3335
  msttDebug("start to handle task abnormal status %d", pTask->status);
15,236✔
3336
  
3337
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
15,236✔
3338
  if (NULL == pStatus) {
15,236!
3339
    msttInfo("stream no longer exists in streamMap, try to undeploy current task, idx:%d", pMsg->taskIdx);
×
3340
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3341
    return;
14,749✔
3342
  }
3343

3344
  stopped = atomic_load_8(&pStatus->stopped);
15,236✔
3345
  if (stopped) {
15,236!
3346
    msttInfo("stream stopped %d, try to undeploy current task, idx:%d", stopped, pMsg->taskIdx);
×
3347
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3348
    return;
×
3349
  }
3350
  
3351
  switch (pMsg->status) {
15,236!
3352
    case STREAM_STATUS_INIT:      
15,214✔
3353
      if (STREAM_TRIGGER_TASK != pMsg->type) {
15,214✔
3354
        msttTrace("task status is INIT and not trigger task, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
14,439!
3355
        return;
14,439✔
3356
      }
3357
      
3358
      if (INT64_MIN == pStatus->lastActionTs) {
775!
3359
        msttDebug("task still not deployed, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
×
3360
        return;
×
3361
      }
3362
      
3363
      if ((pCtx->currTs - pStatus->lastActionTs) < STREAM_ACT_MIN_DELAY_MSEC) {
775✔
3364
        msttDebug("task wait not enough between actions, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
310✔
3365
        return;
310✔
3366
      }
3367

3368
      if (STREAM_IS_RUNNING(pStatus)) {
465!
3369
        msttDebug("stream already running, ignore status: %s", gStreamStatusStr[pTask->status]);
×
3370
      } else if (GOT_SNODE(pCtx->pReq->snodeId) && msmCheckStreamStartCond(streamId, pCtx->pReq->snodeId)) {
465!
3371
        TAOS_CHECK_EXIT(msmGrpAddActionStart(pCtx->actionStm, streamId, &pStatus->triggerTask->id));
407!
3372
      }
3373
      break;
465✔
3374
    case STREAM_STATUS_FAILED:
22✔
3375
      //STREAMTODO ADD ERRCODE HANDLE
3376
      msttInfo("task failed with error:%s, try to undeploy current task, idx:%d", tstrerror(pMsg->errorCode), pMsg->taskIdx);
22!
3377
      TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
22!
3378
      break;
22✔
3379
    default:
×
3380
      break;
×
3381
  }
3382

3383
_exit:
487✔
3384

3385
  if (code) {
487!
3386
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
3387
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3388
  }
3389
}
3390

3391
void msmHandleStatusUpdateErr(SStmGrpCtx* pCtx, EStmErrType err, SStmTaskStatusMsg* pStatus) {
137✔
3392
  int32_t code = TSDB_CODE_SUCCESS;
137✔
3393
  int32_t lino = 0;
137✔
3394
  SStreamTask* pTask = (SStreamTask*)pStatus;
137✔
3395
  int64_t streamId = pStatus->streamId;
137✔
3396

3397
  msttInfo("start to handle task status update exception, type: %d", err);
137!
3398
  
3399
  // STREAMTODO
3400

3401
  if (STM_ERR_TASK_NOT_EXISTS == err || STM_ERR_STREAM_STOPPED == err) {
137!
3402
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
137!
3403
  }
3404

3405
_exit:
137✔
3406

3407
  if (code) {
137!
3408
    // IGNORE STOP STREAM BY ERROR  
3409
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3410
  }
3411
}
137✔
3412

3413
void msmChkHandleTriggerOperations(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask, SStmTaskStatus* pStatus) {
6,080✔
3414
  int32_t code = TSDB_CODE_SUCCESS;
6,080✔
3415
  int32_t lino = 0;
6,080✔
3416
  SStmStatus* pStream = (SStmStatus*)pStatus->pStream;
6,080✔
3417

3418
  if (1 == atomic_val_compare_exchange_8(&pStream->triggerNeedUpdate, 1, 0)) {
6,080!
3419
    TAOS_CHECK_EXIT(msmGrpAddActionUpdateTrigger(pCtx->actionStm, pTask->streamId));
×
3420
  }
3421
  
3422
  SArray* userRecalcList = NULL;
6,080✔
3423
  if (atomic_load_ptr(&pStream->userRecalcList)) {
6,080✔
3424
    taosWLockLatch(&pStream->userRecalcLock);
16✔
3425
    if (pStream->userRecalcList) {
16!
3426
      userRecalcList = pStream->userRecalcList;
16✔
3427
      pStream->userRecalcList = NULL;
16✔
3428
    }
3429
    taosWUnLockLatch(&pStream->userRecalcLock);
16✔
3430
    
3431
    if (userRecalcList) {
16!
3432
      TAOS_CHECK_EXIT(msmGrpAddActionRecalc(pCtx, pTask->streamId, userRecalcList));
16!
3433
    }
3434
  }
3435

3436
  if (pTask->detailStatus >= 0 && pCtx->pReq->pTriggerStatus) {
6,080!
3437
    (void)mstWaitLock(&pStatus->detailStatusLock, false);
3,014✔
3438
    if (NULL == pStatus->detailStatus) {
3,014✔
3439
      pStatus->detailStatus = taosMemoryCalloc(1, sizeof(SSTriggerRuntimeStatus));
336!
3440
      if (NULL == pStatus->detailStatus) {
336!
3441
        taosWUnLockLatch(&pStatus->detailStatusLock);
×
3442
        TSDB_CHECK_NULL(pStatus->detailStatus, code, lino, _exit, terrno);
×
3443
      }
3444
    }
3445
    
3446
    memcpy(pStatus->detailStatus, taosArrayGet(pCtx->pReq->pTriggerStatus, pTask->detailStatus), sizeof(SSTriggerRuntimeStatus));
3,014✔
3447
    taosWUnLockLatch(&pStatus->detailStatusLock);
3,014✔
3448
  }
3449

3450
_exit:
3,066✔
3451

3452
  if (code) {
6,080!
3453
    // IGNORE STOP STREAM BY ERROR
3454
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3455
  }
3456
}
6,080✔
3457

3458
int32_t msmNormalHandleStatusUpdate(SStmGrpCtx* pCtx) {
2,628✔
3459
  int32_t code = TSDB_CODE_SUCCESS;
2,628✔
3460
  int32_t lino = 0;
2,628✔
3461
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
2,628✔
3462

3463
  mstDebug("NORMAL: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
2,628✔
3464

3465
  for (int32_t i = 0; i < num; ++i) {
40,734✔
3466
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
38,106✔
3467
    msttDebug("task status %s got on dnode %d, taskIdx:%d", gStreamStatusStr[pTask->status], pCtx->pReq->dnodeId, pTask->taskIdx);
38,106✔
3468
    
3469
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
38,106✔
3470
    if (NULL == ppStatus) {
38,106✔
3471
      msttWarn("task no longer exists in taskMap, will try to undeploy current task, taskIdx:%d", pTask->taskIdx);
34!
3472
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
34✔
3473
      continue;
34✔
3474
    }
3475

3476
    SStmStatus* pStream = (SStmStatus*)(*ppStatus)->pStream;
38,072✔
3477
    int8_t stopped = atomic_load_8(&pStream->stopped);
38,072✔
3478
    if (stopped) {
38,072✔
3479
      msttWarn("stream already stopped %d, will try to undeploy current task, taskIdx:%d", stopped, pTask->taskIdx);
103!
3480
      msmHandleStatusUpdateErr(pCtx, STM_ERR_STREAM_STOPPED, pTask);
103✔
3481
      continue;
103✔
3482
    }
3483

3484
    if ((pTask->seriousId != (*ppStatus)->id.seriousId) || (pTask->nodeId != (*ppStatus)->id.nodeId)) {
37,969!
3485
      msttInfo("task mismatch with it in taskMap, will try to rm it, current seriousId:%" PRId64 ", nodeId:%d", 
×
3486
          (*ppStatus)->id.seriousId, (*ppStatus)->id.nodeId);
3487
          
3488
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
×
3489
      continue;
×
3490
    }
3491

3492
    if ((*ppStatus)->status != pTask->status) {
37,969✔
3493
      if (STREAM_STATUS_RUNNING == pTask->status) {
3,706✔
3494
        (*ppStatus)->runningStartTs = pCtx->currTs;
1,544✔
3495
      } else if (MST_IS_RUNNER_GETTING_READY(pTask) && STREAM_IS_REDEPLOY_RUNNER((*ppStatus)->flags)) {
2,162!
3496
        if (pStream->triggerTask) {
×
3497
          atomic_store_8(&pStream->triggerNeedUpdate, 1);
×
3498
        }
3499
        
3500
        STREAM_CLR_FLAG((*ppStatus)->flags, STREAM_FLAG_REDEPLOY_RUNNER);
×
3501
      }
3502
    }
3503
    
3504
    (*ppStatus)->errCode = pTask->errorCode;
37,969✔
3505
    (*ppStatus)->status = pTask->status;
37,969✔
3506
    (*ppStatus)->lastUpTs = pCtx->currTs;
37,969✔
3507
    
3508
    if (STREAM_STATUS_RUNNING != pTask->status) {
37,969✔
3509
      msmHandleTaskAbnormalStatus(pCtx, pTask, *ppStatus);
15,236✔
3510
    }
3511
    
3512
    if (STREAM_TRIGGER_TASK == pTask->type) {
37,969✔
3513
      msmChkHandleTriggerOperations(pCtx, pTask, *ppStatus);
6,080✔
3514
    }
3515
  }
3516

3517
_exit:
2,628✔
3518

3519
  if (code) {
2,628!
3520
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3521
  }
3522

3523
  return code;
2,628✔
3524
}
3525

3526
int32_t msmWatchRecordNewTask(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
×
3527
  int32_t code = TSDB_CODE_SUCCESS;
×
3528
  int32_t lino = 0;
×
3529
  int64_t streamId = pTask->streamId;
×
3530
  SStreamObj* pStream = NULL;
×
3531

3532
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3533
  if (NULL == pStatus) {
×
3534
    SStmStatus status = {0};
×
3535
    TAOS_CHECK_EXIT(mndAcquireStreamById(pCtx->pMnode, streamId, &pStream));
×
3536
    TSDB_CHECK_NULL(pStream, code, lino, _exit, TSDB_CODE_MND_STREAM_NOT_EXIST);
×
3537
    if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags) || pStream->pCreate->vtableCalc) {
×
3538
      mndReleaseStream(pCtx->pMnode, pStream);
×
3539
      msttDebug("virtual table task ignored, triggerTblType:%d, vtableCalc:%dstatus:%s", 
×
3540
          pStream->pCreate->triggerTblType, pStream->pCreate->vtableCalc, gStreamStatusStr[pTask->status]);
3541
      return code;
×
3542
    }
3543

3544
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &status, pStream, true));
×
3545
    mndReleaseStream(pCtx->pMnode, pStream);
×
3546

3547
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &status, sizeof(status)));
×
3548
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3549
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
×
3550
    msttDebug("stream added to streamMap cause of new task status:%s", gStreamStatusStr[pTask->status]);
×
3551
  }
3552

3553
  SStmTaskStatus* pNewTask = NULL;
×
3554
  switch (pTask->type) {
×
3555
    case STREAM_READER_TASK: {
×
3556
      void* pList = STREAM_IS_TRIGGER_READER(pTask->flags) ? (void*)pStatus->trigReaders : (void*)pStatus->calcReaders;
×
3557
      if (NULL == pList) {
×
3558
        mstsError("%sReader list is NULL", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc");
×
3559
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3560
      }
3561
      int32_t readerSize = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaderNum : pStatus->calcReaderNum;
×
3562
      if ((STREAM_IS_TRIGGER_READER(pTask->flags) && taosArrayGetSize(pList) >= readerSize) ||
×
3563
          MST_LIST_SIZE((SList*)pList) >= readerSize){
×
3564
        mstsError("%sReader list is already full, size:%d, expSize:%d", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc",
×
3565
            (int32_t)taosArrayGetSize(pList), readerSize);
3566
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3567
      }
3568
      
3569
      SStmTaskStatus taskStatus = {0};
×
3570
      taskStatus.pStream = pStatus;
×
3571
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3572
      if (STREAM_IS_TRIGGER_READER(pTask->flags)) {
×
3573
        pNewTask = taosArrayPush(pList, &taskStatus);
×
3574
        TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3575
      } else {
3576
        TAOS_CHECK_EXIT(tdListAppend(pStatus->calcReaders, &taskStatus));
×
3577
      }
3578
      
3579
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, NULL, pNewTask));
×
3580
      TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pNewTask, STREAM_IS_TRIGGER_READER(pTask->flags)));
×
3581
      break;
×
3582
    }
3583
    case STREAM_TRIGGER_TASK: {
×
3584
      taosMemoryFreeClear(pStatus->triggerTask);
×
3585
      pStatus->triggerTask = taosMemoryCalloc(1, sizeof(*pStatus->triggerTask));
×
3586
      TSDB_CHECK_NULL(pStatus->triggerTask, code, lino, _exit, terrno);
×
3587
      pStatus->triggerTask->pStream = pStatus;
×
3588
      mstSetTaskStatusFromMsg(pCtx, pStatus->triggerTask, pTask);
×
3589
      pNewTask = pStatus->triggerTask;
×
3590

3591
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, NULL, pNewTask));
×
3592
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, 0));
×
3593
      break;
×
3594
    }
3595
    case STREAM_RUNNER_TASK:{
×
3596
      if (NULL == pStatus->runners[pTask->deployId]) {
×
3597
        mstsError("deploy %d runner list is NULL", pTask->deployId);
×
3598
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3599
      }
3600
      if (taosArrayGetSize(pStatus->runners[pTask->deployId]) >= pStatus->runnerNum) {
×
3601
        mstsError("deploy %d runner list is already full, size:%d, expSize:%d", pTask->deployId, 
×
3602
            (int32_t)taosArrayGetSize(pStatus->runners[pTask->deployId]), pStatus->runnerNum);
3603
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3604
      }    
3605
      
3606
      SStmTaskStatus taskStatus = {0};
×
3607
      taskStatus.pStream = pStatus;
×
3608
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3609
      pNewTask = taosArrayPush(pStatus->runners[pTask->deployId], &taskStatus);
×
3610
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3611

3612
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, NULL, pNewTask));
×
3613
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, pTask->deployId));
×
3614
      break;
×
3615
    }
3616
    default: {
×
3617
      msttError("invalid task type:%d in task status", pTask->type);
×
3618
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3619
      break;
×
3620
    }
3621
  }
3622

3623
_exit:
×
3624

3625
  if (code) {
×
3626
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3627
  } else {
3628
    msttDebug("new task recored to taskMap/streamMap, task status:%s", gStreamStatusStr[pTask->status]);
×
3629
  }
3630

3631
  return code;
×
3632
}
3633

3634
int32_t msmWatchHandleStatusUpdate(SStmGrpCtx* pCtx) {
×
3635
  int32_t code = TSDB_CODE_SUCCESS;
×
3636
  int32_t lino = 0;
×
3637
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
×
3638

3639
  mstDebug("WATCH: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
×
3640

3641
  for (int32_t i = 0; i < num; ++i) {
×
3642
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
×
3643
    msttDebug("task status %s got, taskIdx:%d", gStreamStatusStr[pTask->status], pTask->taskIdx);
×
3644

3645
    if (pTask->taskId >= mStreamMgmt.lastTaskId) {
×
3646
      mStreamMgmt.lastTaskId = pTask->taskId + 1;
×
3647
    }
3648
    
3649
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
×
3650
    if (NULL == ppStatus) {
×
3651
      msttInfo("task still not in taskMap, will try to add it, taskIdx:%d", pTask->taskIdx);
×
3652
      
3653
      TAOS_CHECK_EXIT(msmWatchRecordNewTask(pCtx, pTask));
×
3654
      
3655
      continue;
×
3656
    }
3657
    
3658
    (*ppStatus)->status = pTask->status;
×
3659
    (*ppStatus)->lastUpTs = pCtx->currTs;
×
3660
  }
3661

3662
_exit:
×
3663

3664
  if (code) {
×
3665
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3666
  }
3667

3668
  return code;
×
3669
}
3670

3671
void msmRspAddStreamStart(int64_t streamId, SStmGrpCtx* pCtx, int32_t streamNum, SStmAction *pAction) {
407✔
3672
  int32_t code = TSDB_CODE_SUCCESS;
407✔
3673
  int32_t lino = 0;
407✔
3674
  if (NULL == pCtx->pRsp->start.taskList) {
407✔
3675
    pCtx->pRsp->start.taskList = taosArrayInit(streamNum, sizeof(SStreamTaskStart));
198✔
3676
    TSDB_CHECK_NULL(pCtx->pRsp->start.taskList, code, lino, _exit, terrno);
198!
3677
  }
3678

3679
  SStmTaskId* pId = &pAction->start.triggerId;
407✔
3680
  SStreamTaskStart start = {0};
407✔
3681
  start.task.type = STREAM_TRIGGER_TASK;
407✔
3682
  start.task.streamId = streamId;
407✔
3683
  start.task.taskId = pId->taskId;
407✔
3684
  start.task.seriousId = pId->seriousId;
407✔
3685
  start.task.nodeId = pId->nodeId;
407✔
3686
  start.task.taskIdx = pId->taskIdx;
407✔
3687

3688
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->start.taskList, &start), code, lino, _exit, terrno);
814!
3689
  TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
407!
3690

3691
  mstsDebug("stream START added to dnode %d hb rsp, triggerTaskId:%" PRIx64, pId->nodeId, pId->taskId);
407✔
3692

3693
  return;
407✔
3694

3695
_exit:
×
3696

3697
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3698
}
3699

3700

3701
void msmRspAddStreamUndeploy(int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
56✔
3702
  int32_t code = TSDB_CODE_SUCCESS;
56✔
3703
  int32_t lino = 0;
56✔
3704
  int32_t dropNum = taosArrayGetSize(pAction->undeploy.taskList);
56✔
3705
  if (NULL == pCtx->pRsp->undeploy.taskList) {
56✔
3706
    pCtx->pRsp->undeploy.taskList = taosArrayInit(dropNum, sizeof(SStreamTaskUndeploy));
48✔
3707
    TSDB_CHECK_NULL(pCtx->pRsp->undeploy.taskList, code, lino, _exit, terrno);
48!
3708
  }
3709

3710
  SStreamTaskUndeploy undeploy;
3711
  for (int32_t i = 0; i < dropNum; ++i) {
215✔
3712
    SStreamTask* pTask = (SStreamTask*)taosArrayGetP(pAction->undeploy.taskList, i);
159✔
3713
    undeploy.task = *pTask;
159✔
3714
    undeploy.undeployMsg.doCheckpoint = pAction->undeploy.doCheckpoint;
159✔
3715
    undeploy.undeployMsg.doCleanup = pAction->undeploy.doCleanup;
159✔
3716

3717
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->undeploy.taskList, &undeploy), code, lino, _exit, terrno);
318!
3718
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
159!
3719

3720
    msttDebug("task UNDEPLOY added to hb rsp, doCheckpoint:%d, doCleanup:%d", undeploy.undeployMsg.doCheckpoint, undeploy.undeployMsg.doCleanup);
159✔
3721
  }
3722

3723
  return;
56✔
3724

3725
_exit:
×
3726

3727
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3728
}
3729

3730
void msmRspAddTriggerUpdate(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3731
  int32_t code = TSDB_CODE_SUCCESS;
×
3732
  int32_t lino = 0;
×
3733

3734
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3735
  if (NULL == pStream) {
×
3736
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3737
    return;
×
3738
  }
3739

3740
  if (NULL == pStream->triggerTask) {
×
3741
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3742
    return;
×
3743
  }
3744

3745
  SStreamMgmtRsp rsp = {0};
×
3746
  rsp.reqId = INT64_MIN;
×
3747
  rsp.header.msgType = STREAM_MSG_UPDATE_RUNNER;
×
3748
  rsp.task.streamId = streamId;
×
3749
  rsp.task.taskId = pStream->triggerTask->id.taskId;
×
3750

3751
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pStream, streamId, &rsp.cont.runnerList));  
×
3752

3753
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
3754
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
3755
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
3756
  }
3757

3758
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
3759

3760
_exit:
×
3761

3762
  if (code) {
×
3763
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3764
  } else {
3765
    mstsDebug("trigger update rsp added, runnerNum:%d", (int32_t)taosArrayGetSize(rsp.cont.runnerList));
×
3766
  }
3767
}
3768

3769
void msmRspAddUserRecalc(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
16✔
3770
  int32_t code = TSDB_CODE_SUCCESS;
16✔
3771
  int32_t lino = 0;
16✔
3772

3773
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
16✔
3774
  if (NULL == pStream) {
16!
3775
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3776
    return;
×
3777
  }
3778

3779
  if (NULL == pStream->triggerTask) {
16!
3780
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3781
    return;
×
3782
  }
3783

3784
  SStreamMgmtRsp rsp = {0};
16✔
3785
  rsp.reqId = INT64_MIN;
16✔
3786
  rsp.header.msgType = STREAM_MSG_USER_RECALC;
16✔
3787
  rsp.task.streamId = streamId;
16✔
3788
  rsp.task.taskId = pStream->triggerTask->id.taskId;
16✔
3789
  TSWAP(rsp.cont.recalcList, pAction->recalc.recalcList);
16✔
3790

3791
  if (NULL == pCtx->pRsp->rsps.rspList) {
16✔
3792
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
14✔
3793
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
14!
3794
  }
3795

3796
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
32!
3797

3798
_exit:
16✔
3799

3800
  if (code) {
16!
3801
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3802
  } else {
3803
    mstsDebug("user recalc rsp added, recalcNum:%d", (int32_t)taosArrayGetSize(rsp.cont.recalcList));
16!
3804
  }
3805
}
3806

3807

3808
int32_t msmHandleHbPostActions(SStmGrpCtx* pCtx) {
260✔
3809
  int32_t code = TSDB_CODE_SUCCESS;
260✔
3810
  int32_t lino = 0;
260✔
3811
  void* pIter = NULL;
260✔
3812
  int32_t streamNum = taosHashGetSize(pCtx->actionStm);
260✔
3813

3814
  mstDebug("start to handle stream group %d post actions", pCtx->pReq->streamGId);
260✔
3815

3816
  while (1) {
479✔
3817
    pIter = taosHashIterate(pCtx->actionStm, pIter);
739✔
3818
    if (pIter == NULL) {
739✔
3819
      break;
260✔
3820
    }
3821

3822
    int64_t* pStreamId = taosHashGetKey(pIter, NULL);
479✔
3823
    SStmAction *pAction = (SStmAction *)pIter;
479✔
3824
    
3825
    if (STREAM_ACT_UNDEPLOY & pAction->actions) {
479✔
3826
      msmRspAddStreamUndeploy(*pStreamId, pCtx, pAction);
56✔
3827
      continue;
56✔
3828
    }
3829

3830
    if (STREAM_ACT_UPDATE_TRIGGER & pAction->actions) {
423!
3831
      msmRspAddTriggerUpdate(pCtx->pMnode, *pStreamId, pCtx, pAction);
×
3832
    }
3833

3834
    if (STREAM_ACT_RECALC & pAction->actions) {
423✔
3835
      msmRspAddUserRecalc(pCtx->pMnode, *pStreamId, pCtx, pAction);
16✔
3836
    }
3837

3838
    if (STREAM_ACT_START & pAction->actions) {
423✔
3839
      msmRspAddStreamStart(*pStreamId, pCtx, streamNum, pAction);
407✔
3840
    }
3841
  }
3842
  
3843
_exit:
260✔
3844

3845
  if (pIter) {
260!
3846
    taosHashCancelIterate(pCtx->actionStm, pIter);
×
3847
  }
3848

3849
  if (code) {
260!
3850
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3851
  }
3852

3853
  return code;
260✔
3854
}
3855

3856
int32_t msmCheckUpdateDnodeTs(SStmGrpCtx* pCtx) {
29,409✔
3857
  int32_t  code = TSDB_CODE_SUCCESS;
29,409✔
3858
  int32_t  lino = 0;
29,409✔
3859
  int64_t* lastTs = NULL;
29,409✔
3860
  bool     noExists = false;
29,409✔
3861

3862
  while (true) {
3863
    lastTs = taosHashGet(mStreamMgmt.dnodeMap, &pCtx->pReq->dnodeId, sizeof(pCtx->pReq->dnodeId));
30,620✔
3864
    if (NULL == lastTs) {
30,620✔
3865
      if (noExists) {
1,520✔
3866
        mstWarn("Got unknown dnode %d hb msg, may be dropped", pCtx->pReq->dnodeId);
309!
3867
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
309!
3868
      }
3869

3870
      noExists = true;
1,211✔
3871
      TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pCtx->pMnode));
1,211!
3872
      
3873
      continue;
1,211✔
3874
    }
3875

3876
    while (true) {
×
3877
      int64_t lastTsValue = atomic_load_64(lastTs);
29,100✔
3878
      if (pCtx->currTs > lastTsValue) {
29,100✔
3879
        if (lastTsValue == atomic_val_compare_exchange_64(lastTs, lastTsValue, pCtx->currTs)) {
29,095!
3880
          mstDebug("dnode %d lastUpTs updated", pCtx->pReq->dnodeId);
29,095✔
3881
          return code;
29,095✔
3882
        }
3883

3884
        continue;
×
3885
      }
3886

3887
      return code;
5✔
3888
    }
3889

3890
    break;
3891
  }
3892

3893
_exit:
309✔
3894

3895
  if (code) {
309!
3896
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
309!
3897
  }
3898

3899
  return code;  
309✔
3900
}
3901

3902
void msmWatchCheckStreamMap(SStmGrpCtx* pCtx) {
×
3903
  SStmStatus* pStatus = NULL;
×
3904
  int32_t trigReaderNum = 0;
×
3905
  int32_t calcReaderNum = 0;
×
3906
  int32_t runnerNum = 0;
×
3907
  int64_t streamId = 0;
×
3908
  void* pIter = NULL;
×
3909
  while (true) {
3910
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
3911
    if (NULL == pIter) {
×
3912
      return;
×
3913
    }
3914

3915
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
3916
    pStatus = (SStmStatus*)pIter;
×
3917

3918
    if (NULL == pStatus->triggerTask) {
×
3919
      mstsWarn("no trigger task recored, deployTimes:%" PRId64, pStatus->deployTimes);
×
3920
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3921
      continue;
×
3922
    }
3923
    
3924
    trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
×
3925
    if (pStatus->trigReaderNum != trigReaderNum) {
×
3926
      mstsWarn("trigReaderNum %d mis-match with expected %d", trigReaderNum, pStatus->trigReaderNum);
×
3927
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3928
      continue;
×
3929
    }
3930

3931
    calcReaderNum = MST_LIST_SIZE(pStatus->calcReaders);
×
3932
    if (pStatus->calcReaderNum != calcReaderNum) {
×
3933
      mstsWarn("calcReaderNum %d mis-match with expected %d", calcReaderNum, pStatus->calcReaderNum);
×
3934
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3935
      continue;
×
3936
    }
3937

3938
    for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
3939
      runnerNum = taosArrayGetSize(pStatus->runners[i]);
×
3940
      if (runnerNum != pStatus->runnerNum) {
×
3941
        mstsWarn("runner deploy %d runnerNum %d mis-match with expected %d", i, runnerNum, pStatus->runnerNum);
×
3942
        msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3943
        continue;
×
3944
      }
3945
    }
3946
  }
3947
}
3948

3949
int32_t msmWatchHandleEnding(SStmGrpCtx* pCtx, bool watchError) {
×
3950
  int32_t code = TSDB_CODE_SUCCESS;
×
3951
  int32_t lino = 0;
×
3952
  int32_t minVal = watchError ? 0 : 1;
×
3953

3954
  if (0 != atomic_val_compare_exchange_8(&mStreamMgmt.watch.ending, 0, 1)) {
×
3955
    return code;
×
3956
  }
3957

3958
  while (atomic_load_32(&mStreamMgmt.watch.processing) > minVal) {
×
3959
    (void)sched_yield();
×
3960
  }
3961

3962
  if (watchError) {
×
3963
    taosHashClear(mStreamMgmt.vgroupMap);
×
3964
    taosHashClear(mStreamMgmt.snodeMap);
×
3965
    taosHashClear(mStreamMgmt.taskMap);
×
3966
    taosHashClear(mStreamMgmt.streamMap);
×
3967
    mstInfo("watch error happends, clear all maps");
×
3968
    goto _exit;
×
3969
  }
3970

3971
  if (0 == atomic_load_8(&mStreamMgmt.watch.taskRemains)) {
×
3972
    mstInfo("no stream tasks remain during watch state");
×
3973
    goto _exit;
×
3974
  }
3975

3976
  msmWatchCheckStreamMap(pCtx);
×
3977

3978
_exit:
×
3979

3980
  mStreamMgmt.lastTaskId += 100000;
×
3981

3982
  mstInfo("watch state end, new taskId begin from:%" PRIx64, mStreamMgmt.lastTaskId);
×
3983

3984
  msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
×
3985

3986
  if (code) {
×
3987
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3988
  }
3989

3990
  return code;
×
3991
}
3992

3993

3994
int32_t msmWatchHandleHbMsg(SStmGrpCtx* pCtx) {
×
3995
  int32_t code = TSDB_CODE_SUCCESS;
×
3996
  int32_t lino = 0;
×
3997
  SStreamHbMsg* pReq = pCtx->pReq;
×
3998

3999
  (void)atomic_add_fetch_32(&mStreamMgmt.watch.processing, 1);
×
4000
  
4001
  if (atomic_load_8(&mStreamMgmt.watch.ending)) {
×
4002
    goto _exit;
×
4003
  }
4004

4005
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
×
4006
  if (GOT_SNODE(pReq->snodeId)) {
×
4007
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
×
4008
  }
4009

4010
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
×
4011
    atomic_store_8(&mStreamMgmt.watch.taskRemains, 1);
×
4012
    TAOS_CHECK_EXIT(msmWatchHandleStatusUpdate(pCtx));
×
4013
  }
4014

4015
  if ((pCtx->currTs - MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN)) > MST_SHORT_ISOLATION_DURATION) {
×
4016
    TAOS_CHECK_EXIT(msmWatchHandleEnding(pCtx, false));
×
4017
  }
4018

4019
_exit:
×
4020

4021
  atomic_sub_fetch_32(&mStreamMgmt.watch.processing, 1);
×
4022
  
4023
  if (code) {
×
4024
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4025

4026
    (void)msmWatchHandleEnding(pCtx, true);
×
4027
  }
4028

4029
  return code;
×
4030
}
4031

4032
int32_t msmCheckDeployTrigReader(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStmTaskStatusMsg* pTask, int32_t vgId, int32_t vgNum) {
76✔
4033
  int32_t code = TSDB_CODE_SUCCESS;
76✔
4034
  int32_t lino = 0;
76✔
4035
  bool    readerExists = false;
76✔
4036
  int64_t streamId = pTask->streamId;
76✔
4037

4038
  int32_t readerNum = taosArrayGetSize(pStatus->trigReaders);
76✔
4039
  for (int32_t i = 0; i < readerNum; ++i) {
110✔
4040
    SStmTaskStatus* pReader = (SStmTaskStatus*)taosArrayGet(pStatus->trigReaders, i);
80✔
4041
    if (pReader->id.nodeId == vgId) {
80✔
4042
      readerExists = true;
46✔
4043
      break;
46✔
4044
    }
4045
  }
4046

4047
  if (!readerExists) {
76✔
4048
    if (NULL == pStatus->trigOReaders) {
30✔
4049
      pStatus->trigOReaders = taosArrayInit(vgNum, sizeof(SStmTaskStatus));
29✔
4050
      TSDB_CHECK_NULL(pStatus->trigOReaders, code, lino, _exit, terrno);
29!
4051
    }
4052
    
4053
    SStmTaskStatus* pState = taosArrayReserve(pStatus->trigOReaders, 1);
30✔
4054
    TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, vgId, pStatus, streamId));
30!
4055
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, NULL, pState));
30!
4056
    TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, NULL, NULL, pState, true));
30!
4057
  }
4058

4059
_exit:
76✔
4060

4061
  if (code) {
76!
4062
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4063
  }
4064

4065
  return code;
76✔
4066
}
4067

4068
int32_t msmDeployTriggerOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
73✔
4069
  int32_t code = TSDB_CODE_SUCCESS;
73✔
4070
  int32_t lino = 0;
73✔
4071
  int32_t vgId = 0;
73✔
4072
  int64_t streamId = pTask->streamId;
73✔
4073
  SArray* pTbs = pTask->pMgmtReq->cont.pReqs;
73✔
4074
  int32_t tbNum = taosArrayGetSize(pTbs);
73✔
4075
  SStreamDbTableName* pName = NULL;
73✔
4076
  SSHashObj* pDbVgroups = NULL;
73✔
4077
  SStreamMgmtRsp rsp = {0};
73✔
4078
  rsp.reqId = pTask->pMgmtReq->reqId;
73✔
4079
  rsp.header.msgType = STREAM_MSG_ORIGTBL_READER_INFO;
73✔
4080
  int32_t iter = 0;
73✔
4081
  void* p = NULL;
73✔
4082
  SSHashObj* pVgs = NULL;
73✔
4083
  SStreamMgmtReq* pMgmtReq = NULL;
73✔
4084
  int8_t stopped = 0;
73✔
4085
  
4086
  TSWAP(pTask->pMgmtReq, pMgmtReq);
73✔
4087
  rsp.task = *(SStreamTask*)pTask;
73✔
4088

4089
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
73✔
4090
  if (NULL == pStatus) {
73!
4091
    mstsError("stream not deployed, remainStreams:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4092
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_RUNNING);
×
4093
  }
4094

4095
  stopped = atomic_load_8(&pStatus->stopped);
73✔
4096
  if (stopped) {
73!
4097
    msttInfo("stream stopped %d, ignore deploy trigger reader, vgId:%d", stopped, vgId);
×
4098
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_STOPPED);
×
4099
  }
4100

4101
  if (tbNum <= 0) {
73!
4102
    mstsWarn("empty table list in origReader req, array:%p", pTbs);
×
4103
    goto _exit;
×
4104
  }
4105

4106
  int32_t oReaderNum = taosArrayGetSize(pStatus->trigOReaders);
73✔
4107
  if (oReaderNum > 0) {
73!
4108
    mstsWarn("origReaders already exits, num:%d", oReaderNum);
×
4109
    goto _exit;
×
4110
  }
4111

4112
  TAOS_CHECK_EXIT(mstBuildDBVgroupsMap(pCtx->pMnode, &pDbVgroups));
73!
4113
  rsp.cont.vgIds = taosArrayInit(tbNum, sizeof(int32_t));
73✔
4114
  TSDB_CHECK_NULL(rsp.cont.vgIds, code, lino, _exit, terrno);
73!
4115

4116
  pVgs = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
73✔
4117
  TSDB_CHECK_NULL(pVgs, code, lino, _exit, terrno);
73!
4118
  
4119
  for (int32_t i = 0; i < tbNum; ++i) {
212✔
4120
    pName = (SStreamDbTableName*)taosArrayGet(pTbs, i);
139✔
4121
    TAOS_CHECK_EXIT(mstGetTableVgId(pDbVgroups, pName->dbFName, pName->tbName, &vgId));
139!
4122
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.vgIds, &vgId), code, lino, _exit, terrno);
278!
4123
    TAOS_CHECK_EXIT(tSimpleHashPut(pVgs, &vgId, sizeof(vgId), &vgId, sizeof(vgId)));
139!
4124
  }
4125

4126
  int32_t vgNum = tSimpleHashGetSize(pVgs);
73✔
4127
  while (true) {
4128
    p = tSimpleHashIterate(pVgs, p, &iter);
149✔
4129
    if (NULL == p) {
149✔
4130
      break;
73✔
4131
    }
4132
    
4133
    TAOS_CHECK_EXIT(msmCheckDeployTrigReader(pCtx, pStatus, pTask, *(int32_t*)p, vgNum));
76!
4134
  }
4135
  
4136
  vgNum = taosArrayGetSize(pStatus->trigOReaders);
73✔
4137
  rsp.cont.readerList = taosArrayInit(vgNum, sizeof(SStreamTaskAddr));
73✔
4138
  TSDB_CHECK_NULL(rsp.cont.readerList, code, lino, _exit, terrno);
73!
4139

4140
  SStreamTaskAddr addr;
4141
  for (int32_t i = 0; i < vgNum; ++i) {
103✔
4142
    SStmTaskStatus* pOTask = taosArrayGet(pStatus->trigOReaders, i);
30✔
4143
    addr.taskId = pOTask->id.taskId;
30✔
4144
    addr.nodeId = pOTask->id.nodeId;
30✔
4145
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, pOTask->id.nodeId);
30✔
4146
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.readerList, &addr), code, lino, _exit, terrno);
60!
4147
    mstsDebug("the %dth otrigReader src added to trigger's virtual orig readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
30!
4148
  }
4149

4150
  if (NULL == pCtx->pRsp->rsps.rspList) {
73!
4151
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
4152
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
4153
  }
4154

4155
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
146!
4156

4157
_exit:
73✔
4158

4159
  tFreeSStreamMgmtReq(pMgmtReq);
73✔
4160
  taosMemoryFree(pMgmtReq);
73!
4161

4162
  tSimpleHashCleanup(pVgs);
73✔
4163

4164
  if (code) {
73!
4165
    tFreeSStreamMgmtRsp(&rsp);
×
4166
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4167
  }
4168

4169
  mstDestroyDbVgroupsHash(pDbVgroups);
73✔
4170

4171
  return code;
73✔
4172
}
4173

4174
int32_t msmGetCalcScanFromList(int64_t streamId, SArray* pList, int64_t uid, SStreamCalcScan** ppRes) {
11✔
4175
  int32_t num = taosArrayGetSize(pList);
11✔
4176
  SStreamCalcScan* pScan = NULL;
11✔
4177
  int32_t code = TSDB_CODE_SUCCESS, lino = 0;
11✔
4178
  int64_t planUid = 0;
11✔
4179
  for (int32_t i = 0; i < num; ++i) {
11!
4180
    pScan = (SStreamCalcScan*)taosArrayGet(pList, i);
11✔
4181
    TAOS_CHECK_EXIT(mstGetScanUidFromPlan(streamId, pScan->scanPlan, &planUid));
11!
4182
    if (0 != planUid && planUid == uid) {
11!
4183
      *ppRes = pScan;
11✔
4184
      break;
11✔
4185
    }
4186
  }
4187

4188
_exit:
×
4189

4190
  if (code) {
11!
4191
    mstsError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4192
  }
4193

4194
  return code;
11✔
4195
}
4196

4197
int32_t msmCheckDeployCalcReader(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStmTaskStatusMsg* pTask, int32_t vgId, int64_t uid, SStreamTaskAddr* pAddr) {
20✔
4198
  int32_t code = TSDB_CODE_SUCCESS;
20✔
4199
  int32_t lino = 0;
20✔
4200
  bool    readerExists = false;
20✔
4201
  int64_t streamId = pTask->streamId;
20✔
4202
  SListNode* pNode = listHead(pStatus->calcReaders);
20✔
4203
  SStmTaskStatus* pReader = NULL;
20✔
4204
  int32_t taskIdx = 0;
20✔
4205

4206
  int32_t readerNum = MST_LIST_SIZE(pStatus->calcReaders);
20!
4207
  for (int32_t i = 0; i < readerNum; ++i, pNode = TD_DLIST_NODE_NEXT(pNode)) {
180✔
4208
    pReader = (SStmTaskStatus*)pNode->data;
169✔
4209
    if (pReader->id.nodeId == vgId && pReader->id.uid == uid) {
169✔
4210
      readerExists = true;
9✔
4211
      pAddr->taskId = pReader->id.taskId;
9✔
4212
      break;
9✔
4213
    }
4214
  }
4215

4216
  if (!readerExists) {
20✔
4217
    if (NULL == pStatus->calcReaders) {
11!
4218
      pStatus->calcReaders = tdListNew(sizeof(SStmTaskStatus));
×
4219
      TSDB_CHECK_NULL(pStatus->calcReaders, code, lino, _exit, terrno);
×
4220
      taskIdx = 0;
×
4221
    } else {
4222
      pNode = listTail(pStatus->calcReaders);
11✔
4223
      pReader = (SStmTaskStatus*)pNode->data;
11✔
4224
      taskIdx = pReader->id.taskIdx + 1;
11✔
4225
    }
4226

4227
    SStreamCalcScan* pScan = NULL;
11✔
4228
    TAOS_CHECK_EXIT(msmGetCalcScanFromList(streamId, pStatus->pCreate->calcScanPlanList, uid, &pScan));
11!
4229
    TSDB_CHECK_NULL(pScan, code, lino, _exit, TSDB_CODE_STREAM_INTERNAL_ERROR);
11!
4230
    pReader = tdListReserve(pStatus->calcReaders);
11✔
4231
    TAOS_CHECK_EXIT(msmTDAddSingleCalcReader(pCtx, pReader, taskIdx, vgId, pScan->scanPlan, pStatus, streamId));
11!
4232
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, NULL, pReader));
11!
4233
    TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, NULL, NULL, pReader, false));
11!
4234
    pAddr->taskId = pReader->id.taskId;
11✔
4235
  }
4236

4237
  pAddr->epset = mndGetVgroupEpsetById(pCtx->pMnode, vgId);
20✔
4238
  pAddr->nodeId = vgId;
20✔
4239

4240
_exit:
20✔
4241

4242
  if (code) {
20!
4243
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4244
  }
4245

4246
  return code;
20✔
4247
}
4248

4249

4250
int32_t msmDeployRunnerOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
17✔
4251
  int32_t code = TSDB_CODE_SUCCESS;
17✔
4252
  int32_t lino = 0;
17✔
4253
  int32_t vgId = 0;
17✔
4254
  int64_t streamId = pTask->streamId;
17✔
4255
  SArray* pReqs = pTask->pMgmtReq->cont.pReqs;
17✔
4256
  int32_t reqNum = taosArrayGetSize(pReqs);
17✔
4257
  SStreamOReaderDeployReq* pReq = NULL;
17✔
4258
  SStreamOReaderDeployRsp* pRsp = NULL;
17✔
4259
  SStreamMgmtRsp rsp = {0};
17✔
4260
  rsp.reqId = pTask->pMgmtReq->reqId;
17✔
4261
  rsp.header.msgType = STREAM_MSG_RUNNER_ORIGTBL_READER;
17✔
4262
  SStreamMgmtReq* pMgmtReq = NULL;
17✔
4263
  int8_t stopped = 0;
17✔
4264
  int32_t vgNum = 0;
17✔
4265
  SStreamTaskAddr* pAddr = NULL;
17✔
4266
  
4267
  TSWAP(pTask->pMgmtReq, pMgmtReq);
17✔
4268
  rsp.task = *(SStreamTask*)pTask;
17✔
4269

4270
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
17✔
4271
  if (NULL == pStatus) {
17!
4272
    mstsError("stream not deployed, remainStreams:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4273
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_RUNNING);
×
4274
  }
4275

4276
  stopped = atomic_load_8(&pStatus->stopped);
17✔
4277
  if (stopped) {
17!
4278
    msttInfo("stream stopped %d, ignore deploy trigger reader, vgId:%d", stopped, vgId);
×
4279
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_STOPPED);
×
4280
  }
4281

4282
  if (reqNum <= 0) {
17!
4283
    mstsWarn("empty req list in origReader req, array:%p", pReqs);
×
4284
    goto _exit;
×
4285
  }
4286

4287
  rsp.cont.execRspList = taosArrayInit_s(sizeof(SStreamOReaderDeployRsp), reqNum);
17✔
4288
  TSDB_CHECK_NULL(rsp.cont.execRspList, code, lino, _exit, terrno);
17!
4289

4290
  for (int32_t i = 0; i < reqNum; ++i) {
34✔
4291
    pReq = (SStreamOReaderDeployReq*)taosArrayGet(pReqs, i);
17✔
4292
    pRsp = (SStreamOReaderDeployRsp*)taosArrayGet(rsp.cont.execRspList, i);
17✔
4293
    pRsp->execId = pReq->execId;
17✔
4294
    vgNum = taosArrayGetSize(pReq->vgIds);
17✔
4295
    pRsp->vgList = taosArrayInit_s(sizeof(SStreamTaskAddr), vgNum);
17✔
4296
    TSDB_CHECK_NULL(pRsp->vgList, code, lino, _exit, terrno);
17!
4297
    
4298
    for (int32_t n = 0; n < vgNum; ++n) {
37✔
4299
      vgId = *(int32_t*)taosArrayGet(pReq->vgIds, n);
20✔
4300
      pAddr = taosArrayGet(pRsp->vgList, n);
20✔
4301
      TAOS_CHECK_EXIT(msmCheckDeployCalcReader(pCtx, pStatus, pTask, vgId, pReq->uid, pAddr));
20!
4302
    }
4303
  }
4304

4305
  if (NULL == pCtx->pRsp->rsps.rspList) {
17!
4306
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
4307
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
4308
  }
4309

4310
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
34!
4311

4312
_exit:
17✔
4313

4314
  tFreeSStreamMgmtReq(pMgmtReq);
17✔
4315
  taosMemoryFree(pMgmtReq);
17!
4316

4317
  if (code) {
17!
4318
    tFreeSStreamMgmtRsp(&rsp);
×
4319
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4320
  }
4321

4322
  return code;
17✔
4323
}
4324

4325

4326
int32_t msmHandleTaskMgmtReq(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
90✔
4327
  int32_t code = TSDB_CODE_SUCCESS;
90✔
4328
  int32_t lino = 0;
90✔
4329

4330
  switch (pTask->pMgmtReq->type) {
90!
4331
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
73✔
4332
      TAOS_CHECK_EXIT(msmDeployTriggerOrigReader(pCtx, pTask));
73!
4333
      break;
73✔
4334
    case STREAM_MGMT_REQ_RUNNER_ORIGTBL_READER:
17✔
4335
      TAOS_CHECK_EXIT(msmDeployRunnerOrigReader(pCtx, pTask));
17!
4336
      break;
17✔
4337
    default:
×
4338
      msttError("Invalid mgmtReq type:%d", pTask->pMgmtReq->type);
×
4339
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4340
      break;
×
4341
  }
4342

4343
_exit:
90✔
4344

4345
  if (code) {
90!
4346
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4347
  }
4348

4349
  return code;
90✔
4350
}
4351

4352
int32_t msmHandleStreamRequests(SStmGrpCtx* pCtx) {
43✔
4353
  int32_t code = TSDB_CODE_SUCCESS;
43✔
4354
  int32_t lino = 0;
43✔
4355
  SStreamHbMsg* pReq = pCtx->pReq;
43✔
4356
  SStmTaskStatusMsg* pTask = NULL;
43✔
4357
  
4358
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
43✔
4359
  if (reqNum > 0 && NULL == pCtx->pRsp->rsps.rspList) {
43!
4360
    pCtx->pRsp->rsps.rspList = taosArrayInit(reqNum, sizeof(SStreamMgmtRsp));
43✔
4361
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
43!
4362
  }
4363
  
4364
  for (int32_t i = 0; i < reqNum; ++i) {
133✔
4365
    int32_t idx = *(int32_t*)taosArrayGet(pReq->pStreamReq, i);
90✔
4366
    pTask = (SStmTaskStatusMsg*)taosArrayGet(pReq->pStreamStatus, idx);
90✔
4367
    if (NULL == pTask) {
90!
4368
      mstError("idx %d is NULL, reqNum:%d", idx, reqNum);
×
4369
      continue;
×
4370
    }
4371

4372
    if (NULL == pTask->pMgmtReq) {
90!
4373
      msttError("idx %d without mgmtReq", idx);
×
4374
      continue;
×
4375
    }
4376

4377
    TAOS_CHECK_EXIT(msmHandleTaskMgmtReq(pCtx, pTask));
90!
4378
  }
4379

4380
_exit:
43✔
4381

4382
  if (code) {
43!
4383
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4384
  }
4385

4386
  return code;
43✔
4387
}
4388

4389
int32_t msmNormalHandleHbMsg(SStmGrpCtx* pCtx) {
29,409✔
4390
  int32_t code = TSDB_CODE_SUCCESS;
29,409✔
4391
  int32_t lino = 0;
29,409✔
4392
  SStreamHbMsg* pReq = pCtx->pReq;
29,409✔
4393

4394
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
29,409✔
4395
  if (GOT_SNODE(pReq->snodeId)) {
29,100✔
4396
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
2,820!
4397
  }
4398
  
4399
  if (atomic_load_64(&mStreamMgmt.actionQ->qRemainNum) > 0 && 0 == taosWTryLockLatch(&mStreamMgmt.actionQLock)) {
29,100!
4400
    msmHandleStreamActions(pCtx);
89✔
4401
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
89✔
4402
  }
4403

4404
  if (taosArrayGetSize(pReq->pStreamReq) > 0 && mstWaitLock(&mStreamMgmt.actionQLock, false)) {
29,100!
4405
    code = msmHandleStreamRequests(pCtx);
43✔
4406
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
43✔
4407
    TAOS_CHECK_EXIT(code);
43!
4408
  }
4409

4410
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
29,100✔
4411
    TAOS_CHECK_EXIT(msmGrpAddDeployVgTasks(pCtx));
118!
4412
  } else {
4413
    TAOS_CHECK_EXIT(msmUpdateVgroupsUpTs(pCtx));
28,982!
4414
  }
4415

4416
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0 && GOT_SNODE(pReq->snodeId)) {
29,100!
4417
    TAOS_CHECK_EXIT(msmGrpAddDeploySnodeTasks(pCtx));
104!
4418
  }
4419

4420
  if (taosHashGetSize(pCtx->deployStm) > 0) {
29,100✔
4421
    TAOS_CHECK_EXIT(msmRspAddStreamsDeploy(pCtx));
114!
4422
  }
4423

4424
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
29,100✔
4425
    TAOS_CHECK_EXIT(msmNormalHandleStatusUpdate(pCtx));
2,628!
4426
  }
4427

4428
  if (taosHashGetSize(pCtx->actionStm) > 0) {
29,100✔
4429
    TAOS_CHECK_EXIT(msmHandleHbPostActions(pCtx));
260!
4430
  }
4431

4432
_exit:
29,100✔
4433

4434
  if (code) {
29,409✔
4435
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
309!
4436
  }
4437

4438
  return code;
29,409✔
4439
}
4440

4441
void msmEncodeStreamHbRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SMStreamHbRspMsg* pRsp, SRpcMsg* pMsg) {
30,112✔
4442
  int32_t lino = 0;
30,112✔
4443
  int32_t tlen = 0;
30,112✔
4444
  void   *buf = NULL;
30,112✔
4445

4446
  if (TSDB_CODE_SUCCESS != code) {
30,112✔
4447
    goto _exit;
309✔
4448
  }
4449

4450
  tEncodeSize(tEncodeStreamHbRsp, pRsp, tlen, code);
29,803!
4451
  if (code < 0) {
29,803!
4452
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4453
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4454
  }
4455

4456
  buf = rpcMallocCont(tlen + sizeof(SStreamMsgGrpHeader));
29,803✔
4457
  if (buf == NULL) {
29,803!
4458
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
×
4459
    TAOS_CHECK_EXIT(terrno);    
×
4460
  }
4461

4462
  ((SStreamMsgGrpHeader *)buf)->streamGid = pRsp->streamGId;
29,803✔
4463
  void *abuf = POINTER_SHIFT(buf, sizeof(SStreamMsgGrpHeader));
29,803✔
4464

4465
  SEncoder encoder;
4466
  tEncoderInit(&encoder, abuf, tlen);
29,803✔
4467
  if ((code = tEncodeStreamHbRsp(&encoder, pRsp)) < 0) {
29,803!
4468
    rpcFreeCont(buf);
×
4469
    buf = NULL;
×
4470
    tEncoderClear(&encoder);
×
4471
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4472
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4473
  }
4474
  tEncoderClear(&encoder);
29,803✔
4475

4476
_exit:
30,112✔
4477

4478
  pMsg->code = code;
30,112✔
4479
  pMsg->info = *pRpcInfo;
30,112✔
4480
  if (TSDB_CODE_SUCCESS == code) {
30,112✔
4481
    pMsg->contLen = tlen + sizeof(SStreamMsgGrpHeader);
29,803✔
4482
    pMsg->pCont = buf;
29,803✔
4483
  }
4484
}
30,112✔
4485

4486

4487
int32_t msmHandleStreamHbMsg(SMnode* pMnode, int64_t currTs, SStreamHbMsg* pHb, SRpcMsg *pReq, SRpcMsg* pRspMsg) {
30,112✔
4488
  int32_t code = TSDB_CODE_SUCCESS;
30,112✔
4489
  SMStreamHbRspMsg rsp = {0};
30,112✔
4490
  rsp.streamGId = pHb->streamGId;
30,112✔
4491

4492
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
30,112✔
4493

4494
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
30,112✔
4495
    mstWarn("mnode stream become NOT active, ignore stream hb from dnode %d streamGid %d", pHb->dnodeId, pHb->streamGId);
703!
4496
    goto _exit;
703✔
4497
  }
4498

4499
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
29,409✔
4500
  SStmGrpCtx* pCtx = &mStreamMgmt.tCtx[tidx].grpCtx[pHb->streamGId];
29,409✔
4501

4502
  pCtx->tidx = tidx;
29,409✔
4503
  pCtx->pMnode = pMnode;
29,409✔
4504
  pCtx->currTs = currTs;
29,409✔
4505
  pCtx->pReq = pHb;
29,409✔
4506
  pCtx->pRsp = &rsp;
29,409✔
4507
  pCtx->deployStm = mStreamMgmt.tCtx[pCtx->tidx].deployStm[pHb->streamGId];
29,409✔
4508
  pCtx->actionStm = mStreamMgmt.tCtx[pCtx->tidx].actionStm[pHb->streamGId];
29,409✔
4509
  
4510
  switch (atomic_load_8(&mStreamMgmt.state)) {
29,409!
4511
    case MND_STM_STATE_WATCH:
×
4512
      code = msmWatchHandleHbMsg(pCtx);
×
4513
      break;
×
4514
    case MND_STM_STATE_NORMAL:
29,409✔
4515
      code = msmNormalHandleHbMsg(pCtx);
29,409✔
4516
      break;
29,409✔
4517
    default:
×
4518
      mstError("Invalid stream state: %d", mStreamMgmt.state);
×
4519
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4520
      break;
×
4521
  }
4522

4523
_exit:
30,112✔
4524

4525
  msmEncodeStreamHbRsp(code, &pReq->info, &rsp, pRspMsg);
30,112✔
4526

4527
  msmCleanStreamGrpCtx(pHb);
30,112✔
4528
  msmClearStreamToDeployMaps(pHb);
30,112✔
4529

4530
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
30,112✔
4531
  
4532
  tFreeSMStreamHbRspMsg(&rsp);
30,112✔
4533

4534
  return code;
30,112✔
4535
}
4536

4537
void msmHandleBecomeLeader(SMnode *pMnode) {
1,118✔
4538
  if (tsDisableStream) {
1,118!
4539
    return;
×
4540
  }
4541

4542
  mstInfo("start to process mnode become leader");
1,118!
4543

4544
  int32_t code = 0;
1,118✔
4545
  streamAddVnodeLeader(MNODE_HANDLE);
1,118✔
4546
  
4547
  taosWLockLatch(&mStreamMgmt.runtimeLock);
1,118✔
4548
  msmDestroyRuntimeInfo(pMnode);
1,118✔
4549
  code = msmInitRuntimeInfo(pMnode);
1,118✔
4550
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
1,118✔
4551

4552
  if (TSDB_CODE_SUCCESS == code) {
1,118!
4553
    atomic_store_8(&mStreamMgmt.active, 1);
1,118✔
4554
  }
4555

4556
  mstInfo("mnode stream mgmt active:%d", atomic_load_8(&mStreamMgmt.active));
1,118!
4557
}
4558

4559
void msmHandleBecomeNotLeader(SMnode *pMnode) {  
1,749✔
4560
  if (tsDisableStream) {
1,749!
4561
    return;
×
4562
  }
4563

4564
  mstInfo("start to process mnode become not leader");
1,749!
4565

4566
  streamRemoveVnodeLeader(MNODE_HANDLE);
1,749✔
4567

4568
  if (atomic_val_compare_exchange_8(&mStreamMgmt.active, 1, 0)) {
1,749✔
4569
    taosWLockLatch(&mStreamMgmt.runtimeLock);
1,118✔
4570
    msmDestroyRuntimeInfo(pMnode);
1,118✔
4571
    mStreamMgmt.stat.inactiveTimes++;
1,118✔
4572
    taosWUnLockLatch(&mStreamMgmt.runtimeLock);
1,118✔
4573
  }
4574
}
4575

4576

4577
static void msmRedeployStream(int64_t streamId, SStmStatus* pStatus) {
×
4578
  if (1 == atomic_val_compare_exchange_8(&pStatus->stopped, 1, 0)) {
×
4579
    mstsInfo("try to reset and redeploy stream, deployTimes:%" PRId64, pStatus->deployTimes);
×
4580
    mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4581
  } else {
4582
    mstsWarn("stream stopped %d already changed", atomic_load_8(&pStatus->stopped));
×
4583
  }
4584
}
×
4585

4586
static bool msmCheckStreamAssign(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
20✔
4587
  int32_t code = TSDB_CODE_SUCCESS;
20✔
4588
  int32_t lino = 0;
20✔
4589
  SStreamObj* pStream = pObj;
20✔
4590
  SSnodeObj* pSnode = p1;
20✔
4591
  SArray** ppRes = p2;
20✔
4592

4593
  if (pStream->mainSnodeId == pSnode->id) {
20✔
4594
    if (NULL == *ppRes) {
17✔
4595
      int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
2✔
4596
      *ppRes = taosArrayInit(streamNum, POINTER_BYTES);
2✔
4597
      TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
2!
4598
    }
4599

4600
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &pStream), code, lino, _exit, terrno);
34!
4601
  }
4602

4603
  return true;
20✔
4604

4605
_exit:
×
4606

4607
  if (code) {
×
4608
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4609
  }  
4610

4611
  *(int32_t*)p3 = code;
×
4612

4613
  return false;
×
4614
}
4615

4616

4617
int32_t msmCheckSnodeReassign(SMnode *pMnode, SSnodeObj* pSnode, SArray** ppRes) {
133✔
4618
  int32_t code = TSDB_CODE_SUCCESS;
133✔
4619
  int32_t lino = 0;
133✔
4620
  
4621
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckStreamAssign, pSnode, ppRes, &code);
133✔
4622
  TAOS_CHECK_EXIT(code);
133!
4623

4624
  int32_t streamNum = taosArrayGetSize(*ppRes);
133✔
4625
  if (streamNum > 0 && 0 == pSnode->replicaId) {
133✔
4626
    mstError("snode %d has no replica while %d streams assigned", pSnode->id, streamNum);
1!
4627
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_SNODE_IN_USE);
1!
4628
  }
4629

4630
  //STREAMTODO CHECK REPLICA UPDATED OR NOT
4631

4632
_exit:
132✔
4633

4634
  if (code) {
133✔
4635
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
1!
4636
  }  
4637

4638
  return code;
133✔
4639
}
4640

4641
static bool msmCheckLoopStreamSdb(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
1,016✔
4642
  SStreamObj* pStream = pObj;
1,016✔
4643
  int64_t streamId = pStream->pCreate->streamId;
1,016✔
4644
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1,016✔
4645
  SStmCheckStatusCtx* pCtx = (SStmCheckStatusCtx*)p1;
1,016✔
4646
  int8_t userDropped = atomic_load_8(&pStream->userDropped), userStopped = atomic_load_8(&pStream->userStopped);
1,016✔
4647
  
4648
  if ((userDropped || userStopped) && (NULL == pStatus)) {
1,016!
4649
    mstsDebug("stream userDropped %d userStopped %d and not in streamMap, ignore it", userDropped, userStopped);
1!
4650
    return true;
1✔
4651
  }
4652
  
4653
  if (pStatus && !MST_STM_PASS_ISOLATION(pStream, pStatus)) {
1,015✔
4654
    mstsDebug("stream not pass isolation time, updateTime:%" PRId64 ", lastActionTs:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
788✔
4655
        pStream->updateTime, pStatus->lastActionTs, mStreamMgmt.hCtx.currentTs);
4656
    return true;
788✔
4657
  }
4658

4659
  if (NULL == pStatus && !MST_STM_STATIC_PASS_SHORT_ISOLATION(pStream)) {
227✔
4660
    mstsDebug("stream not pass static isolation time, updateTime:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
71✔
4661
        pStream->updateTime, mStreamMgmt.hCtx.currentTs);
4662
    return true;
71✔
4663
  }  
4664

4665
  if (pStatus) {
156✔
4666
    if (userDropped || userStopped || MST_IS_USER_STOPPED(atomic_load_8(&pStatus->stopped))) {
155!
4667
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
1✔
4668
    }
4669

4670
    return true;
155✔
4671
  }
4672

4673
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, false, STREAM_ACT_DEPLOY);
1✔
4674

4675
  return true;
1✔
4676
}
4677

4678
void msmCheckLoopStreamMap(SMnode *pMnode) {
120✔
4679
  SStmStatus* pStatus = NULL;
120✔
4680
  void* pIter = NULL;
120✔
4681
  int8_t stopped = 0;
120✔
4682
  int64_t streamId = 0;
120✔
4683
  
4684
  while (true) {
4685
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
811✔
4686
    if (NULL == pIter) {
811✔
4687
      break;
120✔
4688
    }
4689

4690
    pStatus = (SStmStatus*)pIter;
691✔
4691

4692
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
691✔
4693
    stopped = atomic_load_8(&pStatus->stopped);
691✔
4694
    if (MST_IS_USER_STOPPED(stopped)) {
691!
4695
      mstsDebug("stream already stopped by user, deployTimes:%" PRId64, pStatus->deployTimes);
1!
4696
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
1✔
4697
      continue;
1✔
4698
    }
4699

4700
    if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, pStatus->streamName)) {
690!
4701
      mstsDebug("stream already not exists, deployTimes:%" PRId64, pStatus->deployTimes);
×
4702
      (void)msmRemoveStreamFromMaps(pMnode, *(int64_t*)taosHashGetKey(pIter, NULL));
×
4703
      continue;
×
4704
    }
4705

4706
    if (MST_IS_ERROR_STOPPED(stopped)) {
690✔
4707
      if (mStreamMgmt.hCtx.currentTs < pStatus->fatalRetryTs) {
30✔
4708
        mstsDebug("stream already stopped by error %s, retried times:%" PRId64 ", next time not reached, currTs:%" PRId64 ", nextRetryTs:%" PRId64,
20✔
4709
            tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes, mStreamMgmt.hCtx.currentTs, pStatus->fatalRetryTs);
4710
            
4711
        MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, mStreamMgmt.hCtx.currentTs);
20✔
4712
        continue;
20✔
4713
      }
4714

4715
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
10✔
4716
      continue;
10✔
4717
    }
4718

4719
    if (MST_IS_GRANT_STOPPED(stopped) && TSDB_CODE_SUCCESS == grantCheckExpire(TSDB_GRANT_STREAMS)) {
660!
4720
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4721
      continue;
×
4722
    }
4723
  }
4724
}
120✔
4725

4726
void msmCheckStreamsStatus(SMnode *pMnode) {
828✔
4727
  SStmCheckStatusCtx ctx = {0};
828✔
4728

4729
  mstDebug("start to check streams status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
828✔
4730
  
4731
  if (MST_READY_FOR_SDB_LOOP()) {
828!
4732
    mstDebug("ready to check sdb loop, lastLoopSdbTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SDB].ts);
170✔
4733
    sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckLoopStreamSdb, &ctx, NULL, NULL);
170✔
4734
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_SDB, mStreamMgmt.hCtx.currentTs);
170✔
4735
  }
4736

4737
  if (MST_READY_FOR_MAP_LOOP()) {
828!
4738
    mstDebug("ready to check map loop, lastLoopMapTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_MAP].ts);
120✔
4739
    msmCheckLoopStreamMap(pMnode);
120✔
4740
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
120✔
4741
  }
4742
}
828✔
4743

4744
void msmCheckTaskListStatus(int64_t streamId, SStmTaskStatus** pList, int32_t taskNum) {
4,132✔
4745
  for (int32_t i = 0; i < taskNum; ++i) {
8,428✔
4746
    SStmTaskStatus* pTask = *(pList + i);
4,305✔
4747

4748
    if (atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped)) {
4,305✔
4749
      continue;
4,288✔
4750
    }
4751
    
4752
    if (!MST_PASS_ISOLATION(pTask->lastUpTs, 1)) {
4,128✔
4753
      continue;
4,111✔
4754
    }
4755

4756
    int64_t noUpTs = mStreamMgmt.hCtx.currentTs - pTask->lastUpTs;
17✔
4757
    if (STREAM_RUNNER_TASK == pTask->type || STREAM_TRIGGER_TASK == pTask->type) {
17✔
4758
      mstsWarn("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
9!
4759
          gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4760
          
4761
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_TASK_LOST, mStreamMgmt.hCtx.currentTs);
9✔
4762
      break;
9✔
4763
    }
4764

4765
    mstsInfo("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
8!
4766
        gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4767

4768
    int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
8✔
4769
    mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
8!
4770

4771
    SStmTaskAction task = {0};
8✔
4772
    task.streamId = streamId;
8✔
4773
    task.id = pTask->id;
8✔
4774
    task.flag = pTask->flags;
8✔
4775
    task.type = pTask->type;
8✔
4776
    
4777
    mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
8✔
4778
  }
4779
}
4,132✔
4780

4781
void msmCheckVgroupStreamStatus(SHashObj* pStreams) {
364✔
4782
  void* pIter = NULL;
364✔
4783
  SStmVgStreamStatus* pVg = NULL;
364✔
4784
  int64_t streamId = 0;
364✔
4785
  
4786
  while (true) {
1,223✔
4787
    pIter = taosHashIterate(pStreams, pIter);
1,587✔
4788
    if (NULL == pIter) {
1,587✔
4789
      break;
364✔
4790
    }
4791

4792
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
1,223✔
4793
    pVg = (SStmVgStreamStatus*)pIter;
1,223✔
4794

4795
    int32_t taskNum = taosArrayGetSize(pVg->trigReaders);
1,223✔
4796
    if (taskNum > 0) {
1,223✔
4797
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->trigReaders, 0), taskNum);
866✔
4798
    }
4799

4800
    taskNum = taosArrayGetSize(pVg->calcReaders);
1,223✔
4801
    if (taskNum > 0) {
1,223✔
4802
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->calcReaders, 0), taskNum);
562✔
4803
    }
4804
  }
4805
}
364✔
4806

4807
void msmHandleVgroupLost(SMnode *pMnode, int32_t vgId, SStmVgroupStatus* pVg) {
×
4808
  int64_t streamId = 0;
×
4809
  void* pIter = NULL;
×
4810
  SStmVgStreamStatus* pStream = NULL;
×
4811

4812
  if (!MST_PASS_ISOLATION(pVg->lastUpTs, 5)) {
×
4813
    mstDebug("vgroup %d lost and still in watch time, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
×
4814
    return;
×
4815
  }
4816

4817
  
4818
  while (true) {
4819
    pIter = taosHashIterate(pVg->streamTasks, pIter);
×
4820
    if (NULL == pIter) {
×
4821
      break;
×
4822
    }
4823

4824
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4825
    
4826
    msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_VGROUP_LOST, mStreamMgmt.hCtx.currentTs);
×
4827
  }
4828

4829
  taosHashClear(pVg->streamTasks);
×
4830
}
4831

4832

4833
void msmCheckVgroupStatus(SMnode *pMnode) {
828✔
4834
  void* pIter = NULL;
828✔
4835
  int32_t code = 0;
828✔
4836
  
4837
  while (true) {
3,415✔
4838
    pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter);
4,243✔
4839
    if (NULL == pIter) {
4,243✔
4840
      break;
828✔
4841
    }
4842

4843
    int32_t vgId = *(int32_t*)taosHashGetKey(pIter, NULL);
3,415✔
4844
    if ((vgId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
3,415✔
4845
      continue;
3,051✔
4846
    }
4847
    
4848
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
370✔
4849

4850
    if (MST_PASS_ISOLATION(pVg->lastUpTs, 1)) {
370✔
4851
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
6✔
4852
      if (NULL == pVgroup) {
6!
4853
        mstDebug("vgroup %d no longer exits, will remove all %d tasks in it", vgId, (int32_t)taosHashGetSize(pVg->streamTasks));
6✔
4854
        code = taosHashRemove(mStreamMgmt.vgroupMap, &vgId, sizeof(vgId));
6✔
4855
        if (code) {
6!
4856
          mstWarn("remove vgroup %d from vgroupMap failed since %s", vgId, tstrerror(code));
×
4857
        }
4858
        continue;
6✔
4859
      }
4860
      mndReleaseVgroup(pMnode, pVgroup);
×
4861
      
4862
      mstWarn("vgroup %d lost, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
×
4863
      
4864
      msmHandleVgroupLost(pMnode, vgId, pVg);
×
4865
      continue;
×
4866
    }
4867

4868
    mstDebug("vgroup %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, vgId, mStreamMgmt.hCtx.currentTs, pVg->lastUpTs);
364✔
4869

4870
    msmCheckVgroupStreamStatus(pVg->streamTasks);
364✔
4871
  }
4872
}
828✔
4873

4874
void msmHandleRunnerRedeploy(int64_t streamId, SStmSnodeStreamStatus* pStream, int32_t* deployNum, int32_t* deployId) {
2✔
4875
  *deployNum = 0;
2✔
4876
  
4877
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
8✔
4878
    if (pStream->runners[i]) {
6✔
4879
      int32_t taskNum = taosArrayGetSize(pStream->runners[i]);
2✔
4880
      for (int32_t t = 0; t < taskNum; ++t) {
4✔
4881
        SStmTaskStatus* pTask = taosArrayGetP(pStream->runners[i], t);
2✔
4882
        int8_t stopped = atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped);
2✔
4883
        if (stopped) {
2!
4884
          mstsDebug("stream already stopped %d, ignore it", stopped);
×
4885
          *deployNum = 0;
×
4886
          return;
×
4887
        }
4888

4889
        int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
2✔
4890
        mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
2!
4891
      }
4892
      
4893
      deployId[*deployNum] = i;
2✔
4894
      (*deployNum)++;
2✔
4895
    }
4896
  }
4897
}
4898

4899
void msmHandleSnodeLost(SMnode *pMnode, SStmSnodeStatus* pSnode) {
5✔
4900
  pSnode->runnerThreadNum = -1;
5✔
4901

4902
  (void)msmSTAddSnodesToMap(pMnode);
5✔
4903

4904
  int64_t streamId = 0;
5✔
4905
  void* pIter = NULL;
5✔
4906
  SStmSnodeStreamStatus* pStream = NULL;
5✔
4907
  int32_t deployNum = 0;
5✔
4908
  SStmTaskAction task = {0};
5✔
4909
  
4910
  while (true) {
4911
    pIter = taosHashIterate(pSnode->streamTasks, pIter);
8✔
4912
    if (NULL == pIter) {
8✔
4913
      break;
5✔
4914
    }
4915

4916
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
3✔
4917
    
4918
    task.streamId = streamId;
3✔
4919
    
4920
    pStream = (SStmSnodeStreamStatus*)pIter;
3✔
4921
    if (pStream->trigger) {
3✔
4922
      int8_t stopped = atomic_load_8(&((SStmStatus*)pStream->trigger->pStream)->stopped);
1✔
4923
      if (stopped) {
1!
4924
        mstsDebug("stream already stopped %d, ignore it", stopped);
×
4925
        continue;
×
4926
      }
4927

4928
      mstsInfo("snode lost with trigger task %" PRIx64 ", will try to restart current stream", pStream->trigger->id.taskId);
1!
4929
      
4930
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_SNODE_LOST, mStreamMgmt.hCtx.currentTs);
1✔
4931
    } else {
4932
      msmHandleRunnerRedeploy(streamId, pStream, &task.deployNum, task.deployId);
2✔
4933
      
4934
      if (task.deployNum > 0) {
2!
4935
        //task.triggerStatus = pStream->trigger;
4936
        task.multiRunner = true;
2✔
4937
        task.type = STREAM_RUNNER_TASK;
2✔
4938
        
4939
        mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
2✔
4940
        
4941
        mstsInfo("runner tasks %d redeploys added to actionQ", task.deployNum);
2!
4942
      }
4943
    }
4944
  }
4945

4946
  taosHashClear(pSnode->streamTasks);
5✔
4947
}
5✔
4948

4949

4950
void msmCheckSnodeStreamStatus(SHashObj* pStreams) {
104✔
4951
  void* pIter = NULL;
104✔
4952
  SStmSnodeStreamStatus* pSnode = NULL;
104✔
4953
  int64_t streamId = 0;
104✔
4954
  
4955
  while (true) {
4956
    pIter = taosHashIterate(pStreams, pIter);
810✔
4957
    if (NULL == pIter) {
810✔
4958
      break;
104✔
4959
    }
4960

4961
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
706✔
4962
    pSnode = (SStmSnodeStreamStatus*)pIter;
706✔
4963

4964
    if (NULL != pSnode->trigger) {
706✔
4965
      msmCheckTaskListStatus(streamId, &pSnode->trigger, 1);
678✔
4966
    }
4967

4968
    for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
2,824✔
4969
      int32_t taskNum = taosArrayGetSize(pSnode->runners[i]);
2,118✔
4970
      if (taskNum > 0) {
2,118✔
4971
        msmCheckTaskListStatus(streamId, taosArrayGet(pSnode->runners[i], 0), taskNum);
2,026✔
4972
      }
4973
    }
4974
  }
4975
}
104✔
4976

4977

4978
void msmCheckSnodeStatus(SMnode *pMnode) {
828✔
4979
  void* pIter = NULL;
828✔
4980
  
4981
  while (true) {
1,011✔
4982
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
1,839✔
4983
    if (NULL == pIter) {
1,839✔
4984
      break;
828✔
4985
    }
4986

4987
    int32_t snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
1,011✔
4988
    if ((snodeId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
1,011✔
4989
      continue;
888✔
4990
    }
4991

4992
    mstDebug("start to check snode %d status, currTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs);
123✔
4993
    
4994
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
123✔
4995
    if (NULL == pSnode->streamTasks) {
123✔
4996
      mstDebug("ignore snode %d health check since empty tasks", snodeId);
15!
4997
      continue;
15✔
4998
    }
4999
    
5000
    if (MST_PASS_ISOLATION(pSnode->lastUpTs, 1)) {
108✔
5001
      mstInfo("snode %d lost, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
4!
5002
          snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
5003
      
5004
      msmHandleSnodeLost(pMnode, pSnode);
4✔
5005
      continue;
4✔
5006
    }
5007
    
5008
    mstDebug("snode %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs, pSnode->lastUpTs);
104✔
5009

5010
    msmCheckSnodeStreamStatus(pSnode->streamTasks);
104✔
5011
  }
5012
}
828✔
5013

5014

5015
void msmCheckTasksStatus(SMnode *pMnode) {
828✔
5016
  mstDebug("start to check tasks status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
828✔
5017

5018
  msmCheckVgroupStatus(pMnode);
828✔
5019
  msmCheckSnodeStatus(pMnode);
828✔
5020
}
828✔
5021

5022
void msmCheckSnodesState(SMnode *pMnode) {
828✔
5023
  if (!MST_READY_FOR_SNODE_LOOP()) {
828!
5024
    return;
723✔
5025
  }
5026

5027
  mstDebug("ready to check snode loop, lastTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SNODE].ts);
105✔
5028

5029
  void* pIter = NULL;
105✔
5030
  int32_t snodeId = 0;
105✔
5031
  while (true) {
114✔
5032
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
219✔
5033
    if (NULL == pIter) {
219✔
5034
      break;
105✔
5035
    }
5036

5037
    snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
114✔
5038
    if (sdbCheckExists(pMnode->pSdb, SDB_SNODE, &snodeId)) {
114✔
5039
      continue;
113✔
5040
    }
5041

5042
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
1✔
5043
    if (NULL == pSnode->streamTasks) {
1!
5044
      mstDebug("snode %d already cleanup, try to rm it", snodeId);
×
5045
      TAOS_UNUSED(taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId)));
×
5046
      continue;
×
5047
    }
5048
    
5049
    mstWarn("snode %d lost while streams remain, will redeploy all and rm it, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
1!
5050
        snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
5051
    
5052
    msmHandleSnodeLost(pMnode, pSnode);
1✔
5053
  }
5054

5055
  MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
105✔
5056
}
5057

5058
bool msmCheckNeedHealthCheck(SMnode *pMnode) {
13,681✔
5059
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
13,681✔
5060
  if (0 == active || MND_STM_STATE_NORMAL != state) {
13,681!
5061
    mstTrace("ignore health check since active:%d state:%d", active, state);
×
5062
    return false;
×
5063
  }
5064

5065
  if (sdbGetSize(pMnode->pSdb, SDB_STREAM) <= 0) {
13,681✔
5066
    mstTrace("ignore health check since no stream now");
12,025✔
5067
    return false;
12,025✔
5068
  }
5069

5070
  return true;
1,656✔
5071
}
5072

5073
void msmHealthCheck(SMnode *pMnode) {
12,853✔
5074
  if (!msmCheckNeedHealthCheck(pMnode)) {
12,853✔
5075
    return;
12,025✔
5076
  }
5077

5078
  mstDebug("start wait health check, currentTs:%" PRId64,  taosGetTimestampMs());
1,521✔
5079
  
5080
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, false);
828✔
5081
  if (!msmCheckNeedHealthCheck(pMnode)) {
828!
5082
    taosWUnLockLatch(&mStreamMgmt.runtimeLock);
×
5083
    return;
×
5084
  }
5085
  
5086
  mStreamMgmt.hCtx.slotIdx = (mStreamMgmt.hCtx.slotIdx + 1) % MND_STREAM_ISOLATION_PERIOD_NUM;
828✔
5087
  mStreamMgmt.hCtx.currentTs = taosGetTimestampMs();
828✔
5088

5089
  mstDebug("start health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
828✔
5090
  
5091
  msmCheckStreamsStatus(pMnode);
828✔
5092
  msmCheckTasksStatus(pMnode);
828✔
5093
  msmCheckSnodesState(pMnode);
828✔
5094

5095
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
828✔
5096

5097
  mstDebug("end health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
828✔
5098
}
5099

5100
static bool msmUpdateProfileStreams(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
5101
  SStreamObj *pStream = pObj;
×
5102
  if (atomic_load_8(&pStream->userDropped) || atomic_load_8(&pStream->userStopped)) {
×
5103
    return true;
×
5104
  }
5105
  
5106
  pStream->updateTime = *(int64_t*)p1;
×
5107
  
5108
  (*(int32_t*)p2)++;
×
5109
  
5110
  return true;
×
5111
}
5112

5113
int32_t msmGetTriggerTaskAddr(SMnode *pMnode, int64_t streamId, SStreamTaskAddr* pAddr) {
×
5114
  int32_t code = 0;
×
5115
  int8_t  stopped = 0;
×
5116
  
5117
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
5118
  
5119
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
5120
  if (NULL == pStatus) {
×
5121
    mstsError("stream not exists in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
5122
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
5123
    goto _exit;
×
5124
  }
5125

5126
  stopped = atomic_load_8(&pStatus->stopped);
×
5127
  if (stopped) {
×
5128
    mstsError("stream already stopped, stopped:%d", stopped);
×
5129
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
5130
    goto _exit;
×
5131
  }
5132

5133
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
×
5134
    pAddr->taskId = pStatus->triggerTask->id.taskId;
×
5135
    pAddr->nodeId = pStatus->triggerTask->id.nodeId;
×
5136
    pAddr->epset = mndGetDnodeEpsetById(pMnode, pAddr->nodeId);
×
5137
    mstsDebug("stream trigger task %" PRIx64 " got with nodeId %d", pAddr->taskId, pAddr->nodeId);
×
5138
    goto _exit;
×
5139
  }
5140

5141
  mstsError("trigger task %p not running, status:%s", pStatus->triggerTask, pStatus->triggerTask ? gStreamStatusStr[pStatus->triggerTask->status] : "unknown");
×
5142
  code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
5143

5144
_exit:
×
5145
  
5146
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
5147

5148
  return code;
×
5149
}
5150

5151
int32_t msmInitRuntimeInfo(SMnode *pMnode) {
1,118✔
5152
  int32_t code = TSDB_CODE_SUCCESS;
1,118✔
5153
  int32_t lino = 0;
1,118✔
5154
  int32_t vnodeNum = sdbGetSize(pMnode->pSdb, SDB_VGROUP);
1,118✔
5155
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,118✔
5156
  int32_t dnodeNum = sdbGetSize(pMnode->pSdb, SDB_DNODE);
1,118✔
5157

5158
  MND_STREAM_SET_LAST_TS(STM_EVENT_ACTIVE_BEGIN, taosGetTimestampMs());
2,210✔
5159

5160
  mStreamMgmt.stat.activeTimes++;
1,118✔
5161
  mStreamMgmt.threadNum = tsNumOfMnodeStreamMgmtThreads;
1,118✔
5162
  mStreamMgmt.tCtx = taosMemoryCalloc(mStreamMgmt.threadNum, sizeof(SStmThreadCtx));
1,118!
5163
  if (NULL == mStreamMgmt.tCtx) {
1,118!
5164
    code = terrno;
×
5165
    mstError("failed to initialize the stream runtime tCtx, threadNum:%d, error:%s", mStreamMgmt.threadNum, tstrerror(code));
×
5166
    goto _exit;
×
5167
  }
5168

5169
  mStreamMgmt.actionQ = taosMemoryCalloc(1, sizeof(SStmActionQ));
1,118!
5170
  if (mStreamMgmt.actionQ == NULL) {
1,118!
5171
    code = terrno;
×
5172
    mError("failed to initialize the stream runtime actionQ, error:%s", tstrerror(code));
×
5173
    goto _exit;
×
5174
  }
5175
  
5176
  mStreamMgmt.actionQ->head = taosMemoryCalloc(1, sizeof(SStmQNode));
1,118!
5177
  TSDB_CHECK_NULL(mStreamMgmt.actionQ->head, code, lino, _exit, terrno);
1,118!
5178
  
5179
  mStreamMgmt.actionQ->tail = mStreamMgmt.actionQ->head;
1,118✔
5180
  
5181
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
6,635✔
5182
    SStmThreadCtx* pCtx = mStreamMgmt.tCtx + i;
5,517✔
5183

5184
    for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
33,102✔
5185
      pCtx->deployStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
27,585✔
5186
      if (pCtx->deployStm[m] == NULL) {
27,585!
5187
        code = terrno;
×
5188
        mError("failed to initialize the stream runtime deployStm[%d][%d], error:%s", i, m, tstrerror(code));
×
5189
        goto _exit;
×
5190
      }
5191
      taosHashSetFreeFp(pCtx->deployStm[m], tDeepFreeSStmStreamDeploy);
27,585✔
5192
      
5193
      pCtx->actionStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
27,585✔
5194
      if (pCtx->actionStm[m] == NULL) {
27,585!
5195
        code = terrno;
×
5196
        mError("failed to initialize the stream runtime actionStm[%d][%d], error:%s", i, m, tstrerror(code));
×
5197
        goto _exit;
×
5198
      }
5199
      taosHashSetFreeFp(pCtx->actionStm[m], mstDestroySStmAction);
27,585✔
5200
    }
5201
  }
5202
  
5203
  mStreamMgmt.streamMap = taosHashInit(MND_STREAM_DEFAULT_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,118✔
5204
  if (mStreamMgmt.streamMap == NULL) {
1,118!
5205
    code = terrno;
×
5206
    mError("failed to initialize the stream runtime streamMap, error:%s", tstrerror(code));
×
5207
    goto _exit;
×
5208
  }
5209
  taosHashSetFreeFp(mStreamMgmt.streamMap, mstDestroySStmStatus);
1,118✔
5210
  
5211
  mStreamMgmt.taskMap = taosHashInit(MND_STREAM_DEFAULT_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,118✔
5212
  if (mStreamMgmt.taskMap == NULL) {
1,118!
5213
    code = terrno;
×
5214
    mError("failed to initialize the stream runtime taskMap, error:%s", tstrerror(code));
×
5215
    goto _exit;
×
5216
  }
5217
  
5218
  mStreamMgmt.vgroupMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,118✔
5219
  if (mStreamMgmt.vgroupMap == NULL) {
1,118!
5220
    code = terrno;
×
5221
    mError("failed to initialize the stream runtime vgroupMap, error:%s", tstrerror(code));
×
5222
    goto _exit;
×
5223
  }
5224
  taosHashSetFreeFp(mStreamMgmt.vgroupMap, mstDestroySStmVgroupStatus);
1,118✔
5225

5226
  mStreamMgmt.snodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,118✔
5227
  if (mStreamMgmt.snodeMap == NULL) {
1,118!
5228
    code = terrno;
×
5229
    mError("failed to initialize the stream runtime snodeMap, error:%s", tstrerror(code));
×
5230
    goto _exit;
×
5231
  }
5232
  taosHashSetFreeFp(mStreamMgmt.snodeMap, mstDestroySStmSnodeStatus);
1,118✔
5233
  
5234
  mStreamMgmt.dnodeMap = taosHashInit(dnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,118✔
5235
  if (mStreamMgmt.dnodeMap == NULL) {
1,118!
5236
    code = terrno;
×
5237
    mError("failed to initialize the stream runtime dnodeMap, error:%s", tstrerror(code));
×
5238
    goto _exit;
×
5239
  }
5240

5241
  mStreamMgmt.toDeployVgMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,118✔
5242
  if (mStreamMgmt.toDeployVgMap == NULL) {
1,118!
5243
    code = terrno;
×
5244
    mError("failed to initialize the stream runtime toDeployVgMap, error:%s", tstrerror(code));
×
5245
    goto _exit;
×
5246
  }
5247
  taosHashSetFreeFp(mStreamMgmt.toDeployVgMap, mstDestroySStmVgTasksToDeploy);
1,118✔
5248
  
5249
  mStreamMgmt.toDeploySnodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,118✔
5250
  if (mStreamMgmt.toDeploySnodeMap == NULL) {
1,118!
5251
    code = terrno;
×
5252
    mError("failed to initialize the stream runtime toDeploySnodeMap, error:%s", tstrerror(code));
×
5253
    goto _exit;
×
5254
  }
5255
  taosHashSetFreeFp(mStreamMgmt.toDeploySnodeMap, mstDestroySStmSnodeTasksDeploy);
1,118✔
5256

5257
  mStreamMgmt.toUpdateScanMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,118✔
5258
  if (mStreamMgmt.toUpdateScanMap == NULL) {
1,118!
5259
    code = terrno;
×
5260
    mError("failed to initialize the stream runtime toUpdateScanMap, error:%s", tstrerror(code));
×
5261
    goto _exit;
×
5262
  }
5263
  taosHashSetFreeFp(mStreamMgmt.toUpdateScanMap, mstDestroyScanAddrList);
1,118✔
5264

5265
  TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
1,118!
5266
  TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pMnode));
1,118!
5267

5268
  mStreamMgmt.lastTaskId = 1;
1,118✔
5269

5270
  int32_t activeStreamNum = 0;
1,118✔
5271
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmUpdateProfileStreams, &MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN), &activeStreamNum, NULL);
1,118✔
5272

5273
  if (activeStreamNum > 0) {
1,118!
5274
    msmSetInitRuntimeState(MND_STM_STATE_WATCH);
×
5275
  } else {
5276
    msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
1,118✔
5277
  }
5278

5279
_exit:
1,118✔
5280

5281
  if (code) {
1,118!
5282
    msmDestroyRuntimeInfo(pMnode);
×
5283
    mstError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
5284
  } else {
5285
    mstInfo("mnode stream runtime init done");
1,118!
5286
  }
5287

5288
  return code;
1,118✔
5289
}
5290

5291

5292

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc