• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4754

25 Sep 2025 05:58AM UTC coverage: 57.946% (-1.0%) from 58.977%
#4754

push

travis-ci

web-flow
enh: taos command line support '-uroot' on windows (#33055)

133189 of 293169 branches covered (45.43%)

Branch coverage included in aggregate %.

201677 of 284720 relevant lines covered (70.83%)

5398749.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.49
/source/dnode/mnode/impl/src/mndStreamMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndStb.h"
21
#include "mndTrans.h"
22
#include "osMemory.h"
23
#include "parser.h"
24
#include "taoserror.h"
25
#include "tmisce.h"
26
#include "tname.h"
27
#include "mndDnode.h"
28
#include "mndVgroup.h"
29
#include "mndSnode.h"
30
#include "mndMnode.h"
31

32
void msmDestroyActionQ() {
3,400✔
33
  SStmQNode* pQNode = NULL;
3,400✔
34

35
  if (NULL == mStreamMgmt.actionQ) {
3,400✔
36
    return;
1,700✔
37
  }
38

39
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
1,704✔
40
  }
41

42
  taosMemoryFreeClear(mStreamMgmt.actionQ->head);
1,700!
43
  taosMemoryFreeClear(mStreamMgmt.actionQ);
1,700!
44
}
45

46
void msmDestroySStmThreadCtx(SStmThreadCtx* pCtx) {
8,427✔
47
  for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
50,562✔
48
    taosHashCleanup(pCtx->deployStm[m]);
42,135✔
49
    taosHashCleanup(pCtx->actionStm[m]);
42,135✔
50
  }
51
}
8,427✔
52

53
void msmDestroyThreadCtxs() {
3,400✔
54
  if (NULL == mStreamMgmt.tCtx) {
3,400✔
55
    return;
1,700✔
56
  }
57
  
58
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
10,127✔
59
    msmDestroySStmThreadCtx(mStreamMgmt.tCtx + i);
8,427✔
60
  }
61
  taosMemoryFreeClear(mStreamMgmt.tCtx);
1,700!
62
}
63

64

65
void msmDestroyRuntimeInfo(SMnode *pMnode) {
3,400✔
66
  msmDestroyActionQ();
3,400✔
67
  msmDestroyThreadCtxs();
3,400✔
68

69
  taosHashCleanup(mStreamMgmt.toUpdateScanMap);
3,400✔
70
  mStreamMgmt.toUpdateScanMap = NULL;
3,400✔
71
  mStreamMgmt.toUpdateScanNum = 0;
3,400✔
72
  taosHashCleanup(mStreamMgmt.toDeployVgMap);
3,400✔
73
  mStreamMgmt.toDeployVgMap = NULL;
3,400✔
74
  mStreamMgmt.toDeployVgTaskNum = 0;
3,400✔
75
  taosHashCleanup(mStreamMgmt.toDeploySnodeMap);
3,400✔
76
  mStreamMgmt.toDeploySnodeMap = NULL;
3,400✔
77
  mStreamMgmt.toDeploySnodeTaskNum = 0;
3,400✔
78

79
  taosHashCleanup(mStreamMgmt.dnodeMap);
3,400✔
80
  mStreamMgmt.dnodeMap = NULL;
3,400✔
81
  taosHashCleanup(mStreamMgmt.snodeMap);
3,400✔
82
  mStreamMgmt.snodeMap = NULL;
3,400✔
83
  taosHashCleanup(mStreamMgmt.vgroupMap);
3,400✔
84
  mStreamMgmt.vgroupMap = NULL;
3,400✔
85
  taosHashCleanup(mStreamMgmt.taskMap);
3,400✔
86
  mStreamMgmt.taskMap = NULL;
3,400✔
87
  taosHashCleanup(mStreamMgmt.streamMap);
3,400✔
88
  mStreamMgmt.streamMap = NULL;
3,400✔
89

90
  memset(mStreamMgmt.lastTs, 0, sizeof(mStreamMgmt.lastTs));
3,400✔
91

92
  mstInfo("mnode stream mgmt destroyed");  
3,400!
93
}
3,400✔
94

95

96
void msmStopStreamByError(int64_t streamId, SStmStatus* pStatus, int32_t errCode, int64_t currTs) {
16✔
97
  int32_t code = TSDB_CODE_SUCCESS;
16✔
98
  int32_t lino = 0;
16✔
99
  SStmStatus* pStream = NULL;
16✔
100

101
  mstsInfo("try to stop stream for error: %s", tstrerror(errCode));
16!
102

103
  if (NULL == pStatus) {
16!
104
    pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
16✔
105
    if (NULL == pStream) {
16!
106
      mstsInfo("stream already not in streamMap, error:%s", tstrerror(terrno));
×
107
      goto _exit;
×
108
    }
109

110
    pStatus = pStream;
16✔
111
  }
112

113
  int8_t stopped = atomic_load_8(&pStatus->stopped);
16✔
114
  if (stopped) {
16!
115
    mstsDebug("stream already stopped %d, ignore stop", stopped);
×
116
    goto _exit;
×
117
  }
118

119
  if (pStatus->triggerTask && pStatus->triggerTask->runningStartTs && (currTs - pStatus->triggerTask->runningStartTs) > 2 * MST_ISOLATION_DURATION) {
16!
120
    pStatus->fatalRetryTimes = 0;
13✔
121
    mstsDebug("reset stream retryTimes, running duation:%" PRId64 "ms", currTs - pStatus->triggerTask->runningStartTs);
13✔
122
  }
123

124
  pStatus->fatalRetryTimes++;
16✔
125
  pStatus->fatalError = errCode;
16✔
126
  pStatus->fatalRetryDuration = (pStatus->fatalRetryTimes > 10) ? MST_MAX_RETRY_DURATION : MST_ISOLATION_DURATION;
16!
127
  pStatus->fatalRetryTs = currTs + pStatus->fatalRetryDuration;
16✔
128

129
  pStatus->stat.lastError = errCode;
16✔
130
    
131
  if (0 == atomic_val_compare_exchange_8(&pStatus->stopped, 0, 1)) {
16!
132
    MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, currTs);
16✔
133
  }
134

135
_exit:
7✔
136

137
  taosHashRelease(mStreamMgmt.streamMap, pStream);
16✔
138

139
  if (code) {
16!
140
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
141
  }
142
}
16✔
143

144

145
static void msmSetInitRuntimeState(int8_t state) {
1,700✔
146
  switch (state) {
1,700!
147
    case MND_STM_STATE_WATCH:
×
148
      mStreamMgmt.watch.ending = 0;
×
149
      mStreamMgmt.watch.taskRemains = 0;
×
150
      mStreamMgmt.watch.processing = 0;
×
151
      mstInfo("switch to WATCH state");
×
152
      break;
×
153
    case MND_STM_STATE_NORMAL:
1,700✔
154
      MND_STREAM_SET_LAST_TS(STM_EVENT_NORMAL_BEGIN, taosGetTimestampMs());
3,374✔
155
      mstInfo("switch to NORMAL state");
1,700!
156
      break;
1,700✔
157
    default:
×
158
      return;
×
159
  }
160
  
161
  atomic_store_8(&mStreamMgmt.state, state);
1,700✔
162
}
163

164
void msmSTDeleteSnodeFromMap(int32_t snodeId) {
×
165
  int32_t code = taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
×
166
  if (code) {
×
167
    mstWarn("remove snode %d from snodeMap failed, error:%s", snodeId, tstrerror(code));
×
168
  } else {
169
    mstInfo("snode %d removed from snodeMap", snodeId);
×
170
  }
171
}
×
172

173
static int32_t msmSTAddSnodesToMap(SMnode* pMnode) {
1,790✔
174
  int32_t code = TSDB_CODE_SUCCESS;
1,790✔
175
  int32_t lino = 0;
1,790✔
176
  SStmSnodeStatus tasks = {0};
1,790✔
177
  SSnodeObj *pSnode = NULL;
1,790✔
178
  void *pIter = NULL;
1,790✔
179
  while (1) {
180
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pSnode);
2,006✔
181
    if (pIter == NULL) {
2,006✔
182
      break;
1,790✔
183
    }
184

185
    tasks.lastUpTs = taosGetTimestampMs();
216✔
186
    code = taosHashPut(mStreamMgmt.snodeMap, &pSnode->id, sizeof(pSnode->id), &tasks, sizeof(tasks));
216✔
187
    if (code && TSDB_CODE_DUP_KEY != code) {
216!
188
      sdbRelease(pMnode->pSdb, pSnode);
×
189
      sdbCancelFetch(pMnode->pSdb, pIter);
×
190
      pSnode = NULL;
×
191
      TAOS_CHECK_EXIT(code);
×
192
    }
193

194
    code = TSDB_CODE_SUCCESS;
216✔
195
  
196
    sdbRelease(pMnode->pSdb, pSnode);
216✔
197
  }
198

199
  pSnode = NULL;
1,790✔
200

201
_exit:
1,790✔
202

203
  if (code) {
1,790!
204
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
205
  }
206

207
  return code;
1,790✔
208
}
209

210
static int32_t msmSTAddDnodesToMap(SMnode* pMnode) {
3,446✔
211
  int32_t code = TSDB_CODE_SUCCESS;
3,446✔
212
  int32_t lino = 0;
3,446✔
213
  int64_t lastUpTs = INT64_MIN;
3,446✔
214
  SDnodeObj *pDnode = NULL;
3,446✔
215
  void *pIter = NULL;
3,446✔
216
  while (1) {
217
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
7,281✔
218
    if (pIter == NULL) {
7,281✔
219
      break;
3,446✔
220
    }
221

222
    code = taosHashPut(mStreamMgmt.dnodeMap, &pDnode->id, sizeof(pDnode->id), &lastUpTs, sizeof(lastUpTs));
3,835✔
223
    if (code && TSDB_CODE_DUP_KEY != code) {
3,835!
224
      sdbRelease(pMnode->pSdb, pDnode);
×
225
      sdbCancelFetch(pMnode->pSdb, pIter);
×
226
      pDnode = NULL;
×
227
      TAOS_CHECK_EXIT(code);
×
228
    }
229

230
    code = TSDB_CODE_SUCCESS;
3,835✔
231
    sdbRelease(pMnode->pSdb, pDnode);
3,835✔
232
  }
233

234
  pDnode = NULL;
3,446✔
235

236
_exit:
3,446✔
237

238
  if (code) {
3,446!
239
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
240
  }
241

242
  return code;
3,446✔
243
}
244

245

246

247
static int32_t msmSTAddToTaskMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask) {
1,720✔
248
  int32_t code = TSDB_CODE_SUCCESS;
1,720✔
249
  int32_t lino = 0;
1,720✔
250
  int32_t taskNum = pTask ? 1 : taosArrayGetSize(pTasks);
1,720✔
251
  int64_t key[2] = {streamId, 0};
1,720✔
252
  
253
  for (int32_t i = 0; i < taskNum; ++i) {
3,473✔
254
    SStmTaskStatus* pStatus = pTask ? pTask : taosArrayGet(pTasks, i);
1,753✔
255
    key[1] = pStatus->id.taskId;
1,753✔
256
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.taskMap, key, sizeof(key), &pStatus, POINTER_BYTES));
1,753!
257
    mstsDebug("task %" PRIx64" tidx %d added to taskMap", pStatus->id.taskId, pStatus->id.taskIdx);
1,753✔
258
  }
259
  
260
_exit:
1,720✔
261

262
  if (code) {
1,720!
263
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
264
  }
265
  
266
  return code;
1,720✔
267
}
268

269
static int32_t msmSTAddToVgStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
546✔
270
  int32_t code = TSDB_CODE_SUCCESS;
546✔
271
  int32_t lino = 0;
546✔
272
  SStmVgStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
546✔
273
  if (NULL == pStream) {
546✔
274
    SStmVgStreamStatus stream = {0};
461✔
275
    if (trigReader) {
461✔
276
      stream.trigReaders = taosArrayInit(1, POINTER_BYTES);
376✔
277
      TSDB_CHECK_NULL(stream.trigReaders, code, lino, _exit, terrno);
376!
278
      TSDB_CHECK_NULL(taosArrayPush(stream.trigReaders, &pStatus), code, lino, _exit, terrno);
752!
279
    } else {
280
      stream.calcReaders = taosArrayInit(2, POINTER_BYTES);
85✔
281
      TSDB_CHECK_NULL(stream.calcReaders, code, lino, _exit, terrno);
85!
282
      TSDB_CHECK_NULL(taosArrayPush(stream.calcReaders, &pStatus), code, lino, _exit, terrno);
170!
283
    }
284
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
461!
285
    goto _exit;
461✔
286
  }
287
  
288
  if (trigReader) {
85✔
289
    if (NULL == pStream->trigReaders) {
2!
290
      pStream->trigReaders = taosArrayInit(1, POINTER_BYTES);
2✔
291
      TSDB_CHECK_NULL(pStream->trigReaders, code, lino, _exit, terrno);
2!
292
    }
293
    
294
    TSDB_CHECK_NULL(taosArrayPush(pStream->trigReaders, &pStatus), code, lino, _exit, terrno);
4!
295
    goto _exit;
2✔
296
  }
297
  
298
  if (NULL == pStream->calcReaders) {
83✔
299
    pStream->calcReaders = taosArrayInit(1, POINTER_BYTES);
79✔
300
    TSDB_CHECK_NULL(pStream->calcReaders, code, lino, _exit, terrno);
79!
301
  }
302

303
  TSDB_CHECK_NULL(taosArrayPush(pStream->calcReaders, &pStatus), code, lino, _exit, terrno);
166!
304

305
_exit:
83✔
306

307
  if (code) {
546!
308
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to vgroup %d streamHash in %s at line %d, error:%s", 
×
309
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, __FUNCTION__, lino, tstrerror(code));
310
  } else {
311
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to vgroup %d streamHash", 
546✔
312
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId);
313
  }
314

315
  return code;
546✔
316
}
317

318
static int32_t msmSTAddToVgroupMapImpl(int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
546✔
319
  int32_t code = TSDB_CODE_SUCCESS;
546✔
320
  int32_t lino = 0;
546✔
321
  SStmVgroupStatus vg = {0};
546✔
322

323
  SStmVgroupStatus* pVg = taosHashGet(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
546✔
324
  if (NULL == pVg) {
546✔
325
    vg.streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
173✔
326
    TSDB_CHECK_NULL(vg.streamTasks, code, lino, _exit, terrno);
173!
327
    taosHashSetFreeFp(vg.streamTasks, mstDestroySStmVgStreamStatus);
173✔
328

329
    vg.lastUpTs = taosGetTimestampMs();
173✔
330
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(vg.streamTasks, streamId, pStatus, trigReader));
173!
331
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId), &vg, sizeof(vg)));
173!
332
  } else {
333
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(pVg->streamTasks, streamId, pStatus, trigReader));
373!
334
  }
335
  
336
_exit:
373✔
337

338
  if (code) {
546!
339
    mstDestroyVgroupStatus(&vg);
×
340
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
341
  } else {
342
    mstsDebug("task %" PRIx64 " tidx %d added to vgroupMap %d", pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
546✔
343
  }
344

345
  return code;
546✔
346
}
347

348
static int32_t msmTDAddToVgroupMap(SHashObj* pVgMap, SStmTaskDeploy* pDeploy, int64_t streamId) {
587✔
349
  int32_t code = TSDB_CODE_SUCCESS;
587✔
350
  int32_t lino = 0;
587✔
351
  SStmVgTasksToDeploy vg = {0};
587✔
352
  SStreamTask* pTask = &pDeploy->task;
587✔
353
  SStmTaskToDeployExt ext = {0};
587✔
354
  ext.deploy = *pDeploy;
587✔
355

356
  while (true) {
×
357
    SStmVgTasksToDeploy* pVg = taosHashAcquire(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
587✔
358
    if (NULL == pVg) {
587✔
359
      vg.taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
217✔
360
      TSDB_CHECK_NULL(vg.taskList, code, lino, _return, terrno);
217!
361
      TSDB_CHECK_NULL(taosArrayPush(vg.taskList, &ext), code, lino, _return, terrno);
434!
362
      code = taosHashPut(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &vg, sizeof(SStmVgTasksToDeploy));
217✔
363
      if (TSDB_CODE_SUCCESS == code) {
217!
364
        goto _return;
217✔
365
      }
366

367
      if (TSDB_CODE_DUP_KEY != code) {
×
368
        goto _return;
×
369
      }    
370

371
      taosArrayDestroy(vg.taskList);
×
372
      continue;
×
373
    }
374

375
    taosWLockLatch(&pVg->lock);
370✔
376
    if (NULL == pVg->taskList) {
370!
377
      pVg->taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
×
378
      TSDB_CHECK_NULL(pVg->taskList, code, lino, _return, terrno);
×
379
    }
380
    if (NULL == taosArrayPush(pVg->taskList, &ext)) {
740!
381
      taosWUnLockLatch(&pVg->lock);
×
382
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
383
    }
384
    taosWUnLockLatch(&pVg->lock);
370✔
385
    
386
    taosHashRelease(pVgMap, pVg);
370✔
387
    break;
370✔
388
  }
389
  
390
_return:
587✔
391

392
  if (code) {
587!
393
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
394
  } else {
395
    int32_t num = atomic_add_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
587✔
396
    msttDebug("task added to toDeployVgTaskNum, vgToDeployTaskNum:%d", num);
587✔
397
  }
398

399
  return code;
587✔
400
}
401

402

403
static int32_t msmSTAddToSnodeStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
1,209✔
404
  int32_t code = TSDB_CODE_SUCCESS;
1,209✔
405
  int32_t lino = 0;
1,209✔
406
  SStmSnodeStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
1,209✔
407
  if (NULL == pStream) {
1,209✔
408
    SStmSnodeStreamStatus stream = {0};
318✔
409
    if (deployId < 0) {
318✔
410
      stream.trigger = pStatus;
1✔
411
    } else {
412
      stream.runners[deployId] = taosArrayInit(2, POINTER_BYTES);
317✔
413
      TSDB_CHECK_NULL(stream.runners[deployId], code, lino, _exit, terrno);
317!
414
      TSDB_CHECK_NULL(taosArrayPush(stream.runners[deployId], &pStatus), code, lino, _exit, terrno);
634!
415
    }
416
    
417
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
318!
418
    goto _exit;
318✔
419
  }
420
  
421
  if (deployId < 0) {
891✔
422
    if (NULL != pStream->trigger) {
285!
423
      mstsWarn("stream already got trigger task %" PRIx64 " SID:%" PRIx64 " in snode %d, replace it with task %" PRIx64 " SID:%" PRIx64, 
×
424
          pStream->trigger->id.taskId, pStream->trigger->id.seriousId, pStatus->id.nodeId, pStatus->id.taskId, pStatus->id.seriousId);
425
    }
426
    
427
    pStream->trigger = pStatus;
285✔
428
    goto _exit;
285✔
429
  }
430
  
431
  if (NULL == pStream->runners[deployId]) {
606✔
432
    pStream->runners[deployId] = taosArrayInit(2, POINTER_BYTES);
540✔
433
    TSDB_CHECK_NULL(pStream->runners[deployId], code, lino, _exit, terrno);
540!
434
  }
435

436
  TSDB_CHECK_NULL(taosArrayPush(pStream->runners[deployId], &pStatus), code, lino, _exit, terrno);
1,212!
437

438
_exit:
606✔
439

440
  if (code) {
1,209!
441
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to snode %d streamHash deployId:%d in %s at line %d, error:%s", 
×
442
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId, __FUNCTION__, lino, tstrerror(code));
443
  } else {
444
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to snode %d streamHash deployId:%d", 
1,209✔
445
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId);
446
  }
447

448
  return code;
1,209✔
449
}
450

451

452
static int32_t msmSTAddToSnodeMapImpl(int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
1,209✔
453
  int32_t code = TSDB_CODE_SUCCESS;
1,209✔
454
  int32_t lino = 0;
1,209✔
455

456
  SStmSnodeStatus* pSnode = taosHashGet(mStreamMgmt.snodeMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
1,209✔
457
  if (NULL == pSnode) {
1,209!
458
    mstsWarn("snode %d not exists in snodeMap anymore, may be dropped", pStatus->id.nodeId);
×
459
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
460
  } else {
461
    if (NULL == pSnode->streamTasks) {
1,209✔
462
      pSnode->streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
52✔
463
      TSDB_CHECK_NULL(pSnode->streamTasks, code, lino, _exit, terrno);
52!
464
      taosHashSetFreeFp(pSnode->streamTasks, mstDestroySStmSnodeStreamStatus);
52✔
465
    }
466
    
467
    TAOS_CHECK_EXIT(msmSTAddToSnodeStreamHash(pSnode->streamTasks, streamId, pStatus, deployId));
1,209!
468
  }
469
  
470
_exit:
1,209✔
471

472
  if (code) {
1,209!
473
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
474
  } else {
475
    mstsDebug("%s task %" PRIx64 " tidx %d added to snodeMap, snodeId:%d", (deployId < 0) ? "trigger" : "runner", 
1,209✔
476
        pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
477
  }
478

479
  return code;
1,209✔
480
}
481

482

483

484
static int32_t msmTDAddTriggerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
286✔
485
  int32_t code = TSDB_CODE_SUCCESS;
286✔
486
  int32_t lino = 0;
286✔
487
  int64_t streamId = pStream->pCreate->streamId;
286✔
488
  SStmSnodeTasksDeploy snode = {0};
286✔
489
  SStmTaskToDeployExt ext;
490
  SStreamTask* pTask = &pDeploy->task;
286✔
491

492
  while (true) {
×
493
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
286✔
494
    if (NULL == pSnode) {
286✔
495
      snode.triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
1✔
496
      TSDB_CHECK_NULL(snode.triggerList, code, lino, _return, terrno);
1!
497

498
      ext.deploy = *pDeploy;
1✔
499
      ext.deployed = false;
1✔
500
      TSDB_CHECK_NULL(taosArrayPush(snode.triggerList, &ext), code, lino, _return, terrno);
2!
501

502
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
1✔
503
      if (TSDB_CODE_SUCCESS == code) {
1!
504
        goto _return;
1✔
505
      }
506

507
      if (TSDB_CODE_DUP_KEY != code) {
×
508
        goto _return;
×
509
      }    
510

511
      taosArrayDestroy(snode.triggerList);
×
512
      continue;
×
513
    }
514
    
515
    taosWLockLatch(&pSnode->lock);
285✔
516
    if (NULL == pSnode->triggerList) {
285✔
517
      pSnode->triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
80✔
518
      if (NULL == pSnode->triggerList) {
80!
519
        taosWUnLockLatch(&pSnode->lock);
×
520
        TSDB_CHECK_NULL(pSnode->triggerList, code, lino, _return, terrno);
×
521
      }
522
    }
523
    
524
    ext.deploy = *pDeploy;
285✔
525
    ext.deployed = false;
285✔
526
    
527
    if (NULL == taosArrayPush(pSnode->triggerList, &ext)) {
570!
528
      taosWUnLockLatch(&pSnode->lock);
×
529
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
530
    }
531
    taosWUnLockLatch(&pSnode->lock);
285✔
532
    
533
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
285✔
534
    break;
285✔
535
  }
536
  
537
_return:
286✔
538

539
  if (code) {
286!
540
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
541
  } else {
542
    msttDebug("trigger task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
286✔
543
  }
544

545
  return code;
286✔
546
}
547

548
static int32_t msmTDAddRunnerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
923✔
549
  int32_t code = TSDB_CODE_SUCCESS;
923✔
550
  int32_t lino = 0;
923✔
551
  int64_t streamId = pStream->pCreate->streamId;
923✔
552
  SStmSnodeTasksDeploy snode = {0};
923✔
553
  SStmTaskToDeployExt ext;
554
  SStreamTask* pTask = &pDeploy->task;
923✔
555

556
  while (true) {
×
557
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
923✔
558
    if (NULL == pSnode) {
923✔
559
      snode.runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
92✔
560
      TSDB_CHECK_NULL(snode.runnerList, code, lino, _return, terrno);
92!
561

562
      ext.deploy = *pDeploy;
92✔
563
      ext.deployed = false;
92✔
564
      TSDB_CHECK_NULL(taosArrayPush(snode.runnerList, &ext), code, lino, _return, terrno);
184!
565

566
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
92✔
567
      if (TSDB_CODE_SUCCESS == code) {
92!
568
        goto _return;
92✔
569
      }
570

571
      if (TSDB_CODE_DUP_KEY != code) {
×
572
        goto _return;
×
573
      }    
574

575
      taosArrayDestroy(snode.runnerList);
×
576
      continue;
×
577
    }
578
    
579
    taosWLockLatch(&pSnode->lock);
831✔
580
    if (NULL == pSnode->runnerList) {
831✔
581
      pSnode->runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
1✔
582
      if (NULL == pSnode->runnerList) {
1!
583
        taosWUnLockLatch(&pSnode->lock);
×
584
        TSDB_CHECK_NULL(pSnode->runnerList, code, lino, _return, terrno);
×
585
      }
586
    }
587
    
588
    ext.deploy = *pDeploy;
831✔
589
    ext.deployed = false;
831✔
590
    
591
    if (NULL == taosArrayPush(pSnode->runnerList, &ext)) {
1,662!
592
      taosWUnLockLatch(&pSnode->lock);
×
593
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
594
    }
595
    taosWUnLockLatch(&pSnode->lock);
831✔
596
    
597
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
831✔
598
    break;
831✔
599
  }
600
  
601
_return:
923✔
602

603
  if (code) {
923!
604
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
605
  } else {
606
    msttDebug("task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
923✔
607
  }
608

609
  return code;
923✔
610
}
611

612
static int32_t msmTDAddRunnersToSnodeMap(SArray* runnerList, SStreamObj* pStream) {
857✔
613
  int32_t code = TSDB_CODE_SUCCESS;
857✔
614
  int32_t lino = 0;
857✔
615
  int32_t runnerNum = taosArrayGetSize(runnerList);
857✔
616
  SStmTaskDeploy* pDeploy = NULL;
857✔
617
  int64_t streamId = pStream->pCreate->streamId;
857✔
618

619
  for (int32_t i = 0; i < runnerNum; ++i) {
1,780✔
620
    pDeploy = taosArrayGet(runnerList, i);
923✔
621
    
622
    TAOS_CHECK_EXIT(msmTDAddRunnerToSnodeMap(pDeploy, pStream));
923!
623
    
624
    (void)atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);    
923✔
625
  }
626

627
_exit:
857✔
628

629
  if (code) {
857!
630
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
631
  }
632

633
  return code;
857✔
634
}
635

636

637
int32_t msmUpdateSnodeUpTs(SStmGrpCtx* pCtx) {
1,891✔
638
  int32_t  code = TSDB_CODE_SUCCESS;
1,891✔
639
  int32_t  lino = 0;
1,891✔
640
  SStmSnodeStatus* pStatus = NULL;
1,891✔
641
  bool     noExists = false;
1,891✔
642

643
  while (true) {
644
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &pCtx->pReq->snodeId, sizeof(pCtx->pReq->snodeId));
1,911✔
645
    if (NULL == pStatus) {
1,911✔
646
      if (noExists) {
20!
647
        mstWarn("snode %d not exists in snodeMap, may be dropped, ignore it", pCtx->pReq->snodeId);
×
648
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
×
649
      }
650

651
      noExists = true;
20✔
652
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pCtx->pMnode));
20!
653
      
654
      continue;
20✔
655
    }
656

657
    atomic_store_32(&pStatus->runnerThreadNum, pCtx->pReq->runnerThreadNum);
1,891✔
658
    
659
    while (true) {
×
660
      int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
1,891✔
661
      if (pCtx->currTs > lastTsValue) {
1,891✔
662
        if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
1,871!
663
          mstDebug("snode %d lastUpTs updated", pCtx->pReq->snodeId);
1,871✔
664
          return code;
1,871✔
665
        }
666

667
        continue;
×
668
      }
669

670
      return code;
20✔
671
    }
672

673
    break;
674
  }
675

676
_exit:
×
677

678
  if (code) {
×
679
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
680
  }
681

682
  return code;  
×
683
}
684

685
void msmUpdateVgroupUpTs(SStmGrpCtx* pCtx, int32_t vgId) {
88,889✔
686
  int32_t  code = TSDB_CODE_SUCCESS;
88,889✔
687
  int32_t  lino = 0;
88,889✔
688
  SStmVgroupStatus* pStatus = taosHashGet(mStreamMgmt.vgroupMap, &vgId, sizeof(vgId));
88,889✔
689
  if (NULL == pStatus) {
88,889✔
690
    mstDebug("vgroup %d not exists in vgroupMap, ignore update upTs", vgId);
82,540✔
691
    return;
82,540✔
692
  }
693

694
  while (true) {
×
695
    int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
6,349✔
696
    if (pCtx->currTs > lastTsValue) {
6,349✔
697
      if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
6,177!
698
        mstDebug("vgroup %d lastUpTs updated to %" PRId64, vgId, pCtx->currTs);
6,177✔
699
        return;
6,177✔
700
      }
701

702
      continue;
×
703
    }
704

705
    return;
172✔
706
  }  
707
}
708

709
int32_t msmUpdateVgroupsUpTs(SStmGrpCtx* pCtx) {
32,966✔
710
  int32_t code = TSDB_CODE_SUCCESS;
32,966✔
711
  int32_t lino = 0;
32,966✔
712
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
32,966✔
713

714
  mstDebug("start to update vgroups upTs");
32,966✔
715
  
716
  for (int32_t i = 0; i < vgNum; ++i) {
121,105✔
717
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
88,139✔
718

719
    msmUpdateVgroupUpTs(pCtx, *vgId);
88,139✔
720
  }
721

722
_exit:
32,966✔
723

724
  if (code) {
32,966!
725
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
726
  }
727

728
  return code;
32,966✔
729
}
730

731

732

733
void* msmSearchCalcCacheScanPlan(SArray* pList) {
458✔
734
  int32_t num = taosArrayGetSize(pList);
458✔
735
  for (int32_t i = 0; i < num; ++i) {
709✔
736
    SStreamCalcScan* pScan = taosArrayGet(pList, i);
527✔
737
    if (pScan->readFromCache) {
527✔
738
      return pScan->scanPlan;
276✔
739
    }
740
  }
741

742
  return NULL;
182✔
743
}
744

745
int32_t msmBuildReaderDeployInfo(SStmTaskDeploy* pDeploy, void* calcScanPlan, SStmStatus* pInfo, bool triggerReader) {
587✔
746
  SStreamReaderDeployMsg* pMsg = &pDeploy->msg.reader;
587✔
747
  pMsg->triggerReader = triggerReader;
587✔
748
  
749
  if (triggerReader) {
587✔
750
    SStreamReaderDeployFromTrigger* pTrigger = &pMsg->msg.trigger;
398✔
751
    pTrigger->triggerTblName = pInfo->pCreate->triggerTblName;
398✔
752
    pTrigger->triggerTblUid = pInfo->pCreate->triggerTblUid;
398✔
753
    pTrigger->triggerTblType = pInfo->pCreate->triggerTblType;
398✔
754
    pTrigger->deleteReCalc = pInfo->pCreate->deleteReCalc;
398✔
755
    pTrigger->deleteOutTbl = pInfo->pCreate->deleteOutTbl;
398✔
756
    pTrigger->partitionCols = pInfo->pCreate->partitionCols;
398✔
757
    pTrigger->triggerCols = pInfo->pCreate->triggerCols;
398✔
758
    //pTrigger->triggerPrevFilter = pStream->pCreate->triggerPrevFilter;
759
    pTrigger->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
398✔
760
    pTrigger->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
398✔
761
  } else {
762
    SStreamReaderDeployFromCalc* pCalc = &pMsg->msg.calc;
189✔
763
    pCalc->execReplica = pInfo->runnerDeploys * pInfo->runnerReplica;
189✔
764
    pCalc->calcScanPlan = calcScanPlan;
189✔
765
  }
766

767
  return TSDB_CODE_SUCCESS;
587✔
768
}
769

770
int32_t msmBuildTriggerRunnerTargets(SMnode* pMnode, SStmStatus* pInfo, int64_t streamId, SArray** ppRes) {
285✔
771
  int32_t code = TSDB_CODE_SUCCESS;
285✔
772
  int32_t lino = 0;
285✔
773

774
  if (pInfo->runnerDeploys > 0) {
285!
775
    *ppRes = taosArrayInit(pInfo->runnerDeploys, sizeof(SStreamRunnerTarget));
285✔
776
    TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
285!
777
  }
778
  
779
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,140✔
780
    SStmTaskStatus* pStatus = taosArrayGetLast(pInfo->runners[i]);
855✔
781
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
855!
782

783
    if (!STREAM_IS_TOP_RUNNER(pStatus->flags)) {
855!
784
      mstsError("the last runner task %" PRIx64 " SID:%" PRId64 " tidx:%d in deploy %d is not top runner", 
×
785
          pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.taskIdx, i);
786
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
787
    }
788
    
789
    SStreamRunnerTarget runner;
790
    runner.addr.taskId = pStatus->id.taskId;
855✔
791
    runner.addr.nodeId = pStatus->id.nodeId;
855✔
792
    runner.addr.epset = mndGetDnodeEpsetById(pMnode, pStatus->id.nodeId);
855✔
793
    runner.execReplica = pInfo->runnerReplica; 
855✔
794
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &runner), code, lino, _exit, terrno);
1,710!
795
    mstsDebug("the %dth runner target added to trigger's runnerList, TASK:%" PRIx64 , i, runner.addr.taskId);
855✔
796
  }
797

798
_exit:
285✔
799

800
  if (code) {
285!
801
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
802
  }
803
  
804
  return TSDB_CODE_SUCCESS;
285✔
805
}
806

807
int32_t msmBuildStreamSnodeInfo(SMnode* pMnode, SStreamObj* pStream, SStreamSnodeInfo* pInfo) {
×
808
  int64_t streamId = pStream->pCreate->streamId;
×
809
  int32_t leaderSnodeId = atomic_load_32(&pStream->mainSnodeId);
×
810
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, leaderSnodeId);
×
811
  if (NULL == pSnode) {
×
812
    mstsError("snode %d not longer exists, ignore build stream snode info", leaderSnodeId);
×
813
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
814
  }
815
  
816
  pInfo->leaderSnodeId = leaderSnodeId;
×
817
  pInfo->replicaSnodeId = pSnode->replicaId;
×
818

819
  mndReleaseSnode(pMnode, pSnode);
×
820

821
  pInfo->leaderEpSet = mndGetDnodeEpsetById(pMnode, pInfo->leaderSnodeId);
×
822
  if (GOT_SNODE(pInfo->replicaSnodeId)) {
×
823
    pInfo->replicaEpSet = mndGetDnodeEpsetById(pMnode, pInfo->replicaSnodeId);
×
824
  }
825

826
  return TSDB_CODE_SUCCESS;
×
827
}
828

829
int32_t msmBuildTriggerDeployInfo(SMnode* pMnode, SStmStatus* pInfo, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
286✔
830
  int32_t code = TSDB_CODE_SUCCESS;
286✔
831
  int32_t lino = 0;
286✔
832
  int64_t streamId = pStream->pCreate->streamId;
286✔
833
  SStreamTriggerDeployMsg* pMsg = &pDeploy->msg.trigger;
286✔
834
  
835
  pMsg->triggerType = pStream->pCreate->triggerType;
286✔
836
  pMsg->igDisorder = pStream->pCreate->igDisorder;
286✔
837
  pMsg->fillHistory = pStream->pCreate->fillHistory;
286✔
838
  pMsg->fillHistoryFirst = pStream->pCreate->fillHistoryFirst;
286✔
839
  pMsg->lowLatencyCalc = pStream->pCreate->lowLatencyCalc;
286✔
840
  pMsg->igNoDataTrigger = pStream->pCreate->igNoDataTrigger;
286✔
841
  pMsg->hasPartitionBy = (pStream->pCreate->partitionCols != NULL);
286✔
842
  pMsg->isTriggerTblVirt = STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags);
286✔
843
  pMsg->triggerHasPF = pStream->pCreate->triggerHasPF;
286✔
844

845
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
286✔
846
  pMsg->notifyEventTypes = pStream->pCreate->notifyEventTypes;
286✔
847
  pMsg->notifyErrorHandle = pStream->pCreate->notifyErrorHandle;
286✔
848
  pMsg->notifyHistory = pStream->pCreate->notifyHistory;
286✔
849

850
  pMsg->maxDelay = pStream->pCreate->maxDelay;
286✔
851
  pMsg->fillHistoryStartTime = pStream->pCreate->fillHistoryStartTime;
286✔
852
  pMsg->watermark = pStream->pCreate->watermark;
286✔
853
  pMsg->expiredTime = pStream->pCreate->expiredTime;
286✔
854
  pMsg->trigger = pInfo->pCreate->trigger;
286✔
855

856
  pMsg->eventTypes = pStream->pCreate->eventTypes;
286✔
857
  pMsg->placeHolderBitmap = pStream->pCreate->placeHolderBitmap;
286✔
858
  pMsg->calcTsSlotId = pStream->pCreate->calcTsSlotId;
286✔
859
  pMsg->triTsSlotId = pStream->pCreate->triTsSlotId;
286✔
860
  pMsg->triggerPrevFilter = pInfo->pCreate->triggerPrevFilter;
286✔
861
  if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
286✔
862
    pMsg->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
60✔
863
    pMsg->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
60✔
864
  }
865

866
  SStreamTaskAddr addr;
867
  int32_t triggerReaderNum = taosArrayGetSize(pInfo->trigReaders);
286✔
868
  if (triggerReaderNum > 0) {
286✔
869
    pMsg->readerList = taosArrayInit(triggerReaderNum, sizeof(SStreamTaskAddr));
285✔
870
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
285!
871
  }
872
  
873
  for (int32_t i = 0; i < triggerReaderNum; ++i) {
657✔
874
    SStmTaskStatus* pStatus = taosArrayGet(pInfo->trigReaders, i);
371✔
875
    addr.taskId = pStatus->id.taskId;
371✔
876
    addr.nodeId = pStatus->id.nodeId;
371✔
877
    addr.epset = mndGetVgroupEpsetById(pMnode, pStatus->id.nodeId);
371✔
878
    TSDB_CHECK_NULL(taosArrayPush(pMsg->readerList, &addr), code, lino, _exit, terrno);
742!
879
    mstsDebug("the %dth trigReader src added to trigger's readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
371✔
880
  }
881

882
  pMsg->leaderSnodeId = pStream->mainSnodeId;
286✔
883
  pMsg->streamName = pInfo->streamName;
286✔
884

885
  if (0 == pInfo->runnerNum) {
286✔
886
    mstsDebug("no runner task, skip set trigger's runner list, deployNum:%d", pInfo->runnerDeploys);
1!
887
    return code;
1✔
888
  }
889

890
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pInfo, streamId, &pMsg->runnerList));
285!
891

892
_exit:
285✔
893

894
  if (code) {
285!
895
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
896
  } else {
897
    mstsDebug("trigger deploy info built, readerNum:%d, runnerNum:%d", (int32_t)taosArrayGetSize(pMsg->readerList), (int32_t)taosArrayGetSize(pMsg->runnerList));
285✔
898
  }
899
  
900
  return TSDB_CODE_SUCCESS;
285✔
901
}
902

903

904
int32_t msmBuildRunnerDeployInfo(SStmTaskDeploy* pDeploy, SSubplan *plan, SStreamObj* pStream, SStmStatus* pInfo, bool topPlan) {
923✔
905
  int32_t code = TSDB_CODE_SUCCESS;
923✔
906
  int32_t lino = 0;
923✔
907
  int64_t streamId = pStream->pCreate->streamId;
923✔
908
  SStreamRunnerDeployMsg* pMsg = &pDeploy->msg.runner;
923✔
909
  //TAOS_CHECK_EXIT(qSubPlanToString(plan, &pMsg->pPlan, NULL));
910

911
  pMsg->execReplica = pInfo->runnerReplica;
923✔
912
  pMsg->streamName = pInfo->streamName;
923✔
913
  //TAOS_CHECK_EXIT(nodesCloneNode((SNode*)plan, (SNode**)&pMsg->pPlan));
914
  pMsg->pPlan = plan;
923✔
915
  pMsg->outDBFName = pInfo->pCreate->outDB;
923✔
916
  pMsg->outTblName = pInfo->pCreate->outTblName;
923✔
917
  pMsg->outTblType = pStream->pCreate->outTblType;
923✔
918
  pMsg->calcNotifyOnly = pStream->pCreate->calcNotifyOnly;
923✔
919
  pMsg->topPlan = topPlan;
923✔
920
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
923✔
921
  pMsg->notifyErrorHandle = pStream->pCreate->notifyErrorHandle;
923✔
922
  pMsg->outCols = pInfo->pCreate->outCols;
923✔
923
  pMsg->outTags = pInfo->pCreate->outTags;
923✔
924
  pMsg->outStbUid = pStream->pCreate->outStbUid;
923✔
925
  pMsg->outStbSversion = pStream->pCreate->outStbSversion;
923✔
926
  
927
  pMsg->subTblNameExpr = pInfo->pCreate->subTblNameExpr;
923✔
928
  pMsg->tagValueExpr = pInfo->pCreate->tagValueExpr;
923✔
929
  pMsg->forceOutCols = pInfo->pCreate->forceOutCols;
923✔
930

931
_exit:
923✔
932

933
  if (code) {
923!
934
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
935
  }
936
  
937
  return code;
923✔
938
}
939

940

941
static int32_t msmSTAddToVgroupMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, bool trigReader) {
579✔
942
  int32_t code = TSDB_CODE_SUCCESS;
579✔
943
  int32_t lino = 0;
579✔
944
  int32_t taskNum = pTask ? 1 : taosArrayGetSize(pTasks);
579✔
945
  
946
  for (int32_t i = 0; i < taskNum; ++i) {
1,125✔
947
    SStmTaskStatus* pStatus = pTask ? pTask : taosArrayGet(pTasks, i);
546✔
948
    TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pStatus, trigReader));
546!
949
  }
950
  
951
_exit:
579✔
952

953
  if (code) {
579!
954
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
955
  }
956
  
957
  return code;
579✔
958
}
959

960

961
static int32_t msmSTAddToSnodeMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, int32_t taskNum, int32_t deployId) {
1,143✔
962
  int32_t code = TSDB_CODE_SUCCESS;
1,143✔
963
  int32_t lino = 0;
1,143✔
964
  int32_t rtaskNum = (taskNum > 0) ? taskNum : taosArrayGetSize(pTasks);
1,143✔
965
  int32_t taskType = (deployId < 0) ? STREAM_TRIGGER_TASK : STREAM_RUNNER_TASK;
1,143✔
966
  
967
  for (int32_t i = 0; i < rtaskNum; ++i) {
2,352✔
968
    SStmTaskStatus* pStatus = (taskNum > 0) ? (pTask + i) : taosArrayGet(pTasks, i);
1,209✔
969
    TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pStatus, deployId));
1,209!
970
  }
971
  
972
_exit:
1,143✔
973

974
  if (code) {
1,143!
975
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
976
  }
977

978
  return code;
1,143✔
979
}
980

981
int64_t msmAssignTaskId(void) {
1,753✔
982
  return atomic_fetch_add_64(&mStreamMgmt.lastTaskId, 1);
1,753✔
983
}
984

985
int64_t msmAssignTaskSeriousId(void) {
1,753✔
986
  return taosGetTimestampNs();
1,753✔
987
}
988

989

990
int32_t msmIsSnodeAlive(SMnode* pMnode, int32_t snodeId, int64_t streamId, bool* alive) {
1,568✔
991
  int32_t code = TSDB_CODE_SUCCESS;
1,568✔
992
  int32_t lino = 0;
1,568✔
993
  bool     noExists = false;
1,568✔
994
  SStmSnodeStatus* pStatus = NULL;
1,568✔
995

996
  while (true) {
997
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
1,600✔
998
    if (NULL == pStatus) {
1,600✔
999
      if (noExists) {
32!
1000
        mstsError("snode %d not exists in snodeMap", snodeId);
×
1001
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1002
      }
1003

1004
      noExists = true;
32✔
1005
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
32!
1006
      
1007
      continue;
32✔
1008
    }
1009

1010
    *alive = (pStatus->runnerThreadNum >= 0);
1,568✔
1011
    break;
1,568✔
1012
  }
1013

1014
_exit:
1,568✔
1015

1016
  if (code) {
1,568!
1017
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1018
  }
1019

1020
  return code;
1,568✔
1021
}
1022

1023
int32_t msmRetrieveStaticSnodeId(SMnode* pMnode, SStreamObj* pStream) {
573✔
1024
  int32_t code = TSDB_CODE_SUCCESS;
573✔
1025
  int32_t lino = 0;
573✔
1026
  bool alive = false;
573✔
1027
  int32_t mainSnodeId = atomic_load_32(&pStream->mainSnodeId);
573✔
1028
  int32_t snodeId = mainSnodeId;
573✔
1029
  int64_t streamId = pStream->pCreate->streamId;
573✔
1030
  
1031
  while (true) {
1032
    TAOS_CHECK_EXIT(msmIsSnodeAlive(pMnode, snodeId, streamId, &alive));
575!
1033

1034
    if (alive) {
575✔
1035
      return snodeId;
573✔
1036
    }
1037
    
1038
    if (snodeId == mainSnodeId) {
2!
1039
      SSnodeObj* pSnode = mndAcquireSnode(pMnode, snodeId);
2✔
1040
      if (NULL == pSnode) {
2!
1041
        stsWarn("snode %d not longer exists, ignore assign snode", snodeId);
×
1042
        return 0;
×
1043
      }
1044
      
1045
      if (pSnode->replicaId <= 0) {
2!
1046
        mstsError("no available snode now, mainSnodeId:%d, replicaId:%d", mainSnodeId, pSnode->replicaId);
×
1047
        mndReleaseSnode(pMnode, pSnode);
×
1048
        return 0;
×
1049
      }
1050

1051
      snodeId = pSnode->replicaId;
2✔
1052
      mndReleaseSnode(pMnode, pSnode);
2✔
1053
      
1054
      continue;
2✔
1055
    }
1056

1057
    mstsError("no available snode now, mainSnodeId:%d, followerSnodeId:%d", mainSnodeId, snodeId);
×
1058
    return 0;
×
1059
  }
1060

1061
_exit:
×
1062

1063
  if (code) {
×
1064
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1065
  }
1066

1067
  return 0;
×
1068
}
1069

1070
int32_t msmAssignRandomSnodeId(SMnode* pMnode, int64_t streamId) {
901✔
1071
  int32_t code = TSDB_CODE_SUCCESS;
901✔
1072
  int32_t lino = 0;
901✔
1073
  int32_t snodeIdx = 0;
901✔
1074
  int32_t snodeId = 0;
901✔
1075
  void      *pIter = NULL;
901✔
1076
  SSnodeObj *pObj = NULL;
901✔
1077
  bool alive = false;
901✔
1078
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
901✔
1079
  if (snodeNum <= 0) {
901✔
1080
    mstsInfo("no available snode now, num:%d", snodeNum);
10!
1081
    goto _exit;
10✔
1082
  }
1083

1084
  int32_t snodeTarget = taosRand() % snodeNum;
891✔
1085

1086
  while (1) {
1087
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
994✔
1088
    if (pIter == NULL) {
994✔
1089
      if (0 == snodeId) {
1!
1090
        mstsError("no alive snode now, snodeNum:%d", snodeNum);
×
1091
        break;
×
1092
      }
1093
      
1094
      snodeId = 0;
1✔
1095
      continue;
1✔
1096
    }
1097

1098
    code = msmIsSnodeAlive(pMnode, pObj->id, streamId, &alive);
993✔
1099
    if (code) {
993!
1100
      sdbRelease(pMnode->pSdb, pObj);
×
1101
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1102
      pObj = NULL;
×
1103
      TAOS_CHECK_EXIT(code);
×
1104
    }
1105
    
1106
    if (!alive) {
993✔
1107
      sdbRelease(pMnode->pSdb, pObj);
1✔
1108
      continue;
1✔
1109
    }
1110

1111
    snodeId = pObj->id;
992✔
1112
    if (snodeIdx == snodeTarget) {
992✔
1113
      sdbRelease(pMnode->pSdb, pObj);
891✔
1114
      sdbCancelFetch(pMnode->pSdb, pIter);
891✔
1115
      pObj = NULL;
891✔
1116
      goto _exit;
891✔
1117
    }
1118

1119
    sdbRelease(pMnode->pSdb, pObj);
101✔
1120
    snodeIdx++;
101✔
1121
  }
1122

1123
_exit:
901✔
1124

1125
  if (code) {
901!
1126
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1127
  }
1128

1129
  if (0 == snodeId) {
901✔
1130
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
10✔
1131
  }
1132

1133
  return snodeId;
901✔
1134
}
1135

1136
int32_t msmAssignTaskSnodeId(SMnode* pMnode, SStreamObj* pStream, bool isStatic) {
1,143✔
1137
  int64_t streamId = pStream->pCreate->streamId;
1,143✔
1138
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,143✔
1139
  int32_t snodeId = 0;
1,143✔
1140
  if (snodeNum <= 0) {
1,143!
1141
    mstsInfo("no available snode now, num:%d", snodeNum);
×
1142
    goto _exit;
×
1143
  }
1144

1145
  snodeId = isStatic ? msmRetrieveStaticSnodeId(pMnode, pStream) : msmAssignRandomSnodeId(pMnode, streamId);
1,143✔
1146

1147
_exit:
1,143✔
1148

1149
  if (0 == snodeId) {
1,143!
1150
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
×
1151
  }
1152

1153
  return snodeId;
1,143✔
1154
}
1155

1156

1157
static int32_t msmBuildTriggerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
286✔
1158
  int32_t code = TSDB_CODE_SUCCESS;
286✔
1159
  int32_t lino = 0;
286✔
1160
  int64_t streamId = pStream->pCreate->streamId;
286✔
1161

1162
  pInfo->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskStatus));
286!
1163
  TSDB_CHECK_NULL(pInfo->triggerTask, code, lino, _exit, terrno);
286!
1164

1165
  pInfo->triggerTask->id.taskId = pCtx->triggerTaskId;
286✔
1166
  pInfo->triggerTask->id.deployId = 0;
286✔
1167
  pInfo->triggerTask->id.seriousId = msmAssignTaskSeriousId();
286✔
1168
  pInfo->triggerTask->id.nodeId = pCtx->triggerNodeId;
286✔
1169
  pInfo->triggerTask->id.taskIdx = 0;
286✔
1170
  pInfo->triggerTask->type = STREAM_TRIGGER_TASK;
286✔
1171
  pInfo->triggerTask->lastUpTs = pCtx->currTs;
286✔
1172
  pInfo->triggerTask->pStream = pInfo;
286✔
1173

1174
  SStmTaskDeploy info = {0};
286✔
1175
  info.task.type = pInfo->triggerTask->type;
286✔
1176
  info.task.streamId = streamId;
286✔
1177
  info.task.taskId =  pInfo->triggerTask->id.taskId;
286✔
1178
  info.task.seriousId = pInfo->triggerTask->id.seriousId;
286✔
1179
  info.task.nodeId =  pInfo->triggerTask->id.nodeId;
286✔
1180
  info.task.taskIdx =  pInfo->triggerTask->id.taskIdx;
286✔
1181
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pInfo, &info, pStream));
286!
1182
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
286!
1183
  
1184
  (void)atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
286✔
1185

1186
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pInfo->triggerTask));
286!
1187
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pInfo->triggerTask, 1, -1));
286!
1188

1189
_exit:
286✔
1190

1191
  if (code) {
286!
1192
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1193
  }
1194

1195
  return code;
286✔
1196
}
1197

1198
static int32_t msmTDAddSingleTrigReader(SStmGrpCtx* pCtx, SStmTaskStatus* pState, int32_t nodeId, SStmStatus* pInfo, SStreamObj* pStream, int64_t streamId) {
378✔
1199
  int32_t code = TSDB_CODE_SUCCESS;
378✔
1200
  int32_t lino = 0;
378✔
1201

1202
  pState->id.taskId = msmAssignTaskId();
378✔
1203
  pState->id.deployId = 0;
378✔
1204
  pState->id.seriousId = msmAssignTaskSeriousId();
378✔
1205
  pState->id.nodeId = nodeId;
378✔
1206
  pState->id.taskIdx = 0;
378✔
1207
  pState->type = STREAM_READER_TASK;
378✔
1208
  pState->flags = STREAM_FLAG_TRIGGER_READER;
378✔
1209
  pState->status = STREAM_STATUS_UNDEPLOYED;
378✔
1210
  pState->lastUpTs = pCtx->currTs;
378✔
1211
  pState->pStream = pInfo;
378✔
1212
  
1213
  SStmTaskDeploy info = {0};
378✔
1214
  info.task.type = pState->type;
378✔
1215
  info.task.streamId = streamId;
378✔
1216
  info.task.taskId = pState->id.taskId;
378✔
1217
  info.task.flags = pState->flags;
378✔
1218
  info.task.seriousId = pState->id.seriousId;
378✔
1219
  info.task.nodeId = pState->id.nodeId;
378✔
1220
  info.task.taskIdx = pState->id.taskIdx;
378✔
1221
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, NULL, pInfo, true));
378!
1222
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
378!
1223

1224
_exit:
378✔
1225

1226
  if (code) {
378!
1227
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1228
  }
1229

1230
  return code;
378✔
1231
}
1232

1233
static int32_t msmTDAddTrigReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
286✔
1234
  int32_t code = TSDB_CODE_SUCCESS;
286✔
1235
  int32_t lino = 0;
286✔
1236
  int64_t streamId = pStream->pCreate->streamId;
286✔
1237
  SSdb   *pSdb = pCtx->pMnode->pSdb;
286✔
1238
  SStmTaskStatus* pState = NULL;
286✔
1239
  SVgObj *pVgroup = NULL;
286✔
1240
  SDbObj* pDb = NULL;
286✔
1241
  
1242
  switch (pStream->pCreate->triggerTblType) {
286✔
1243
    case TSDB_NORMAL_TABLE:
95✔
1244
    case TSDB_CHILD_TABLE:
1245
    case TSDB_VIRTUAL_CHILD_TABLE:
1246
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1247
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
95✔
1248
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
95!
1249
      pState = taosArrayGet(pInfo->trigReaders, 0);
95✔
1250
      
1251
      TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, pStream->pCreate->triggerTblVgId, pInfo, pStream, streamId));
95!
1252
      break;
95✔
1253
    }
1254
    case TSDB_SUPER_TABLE: {
190✔
1255
      pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
190✔
1256
      if (NULL == pDb) {
190!
1257
        code = terrno;
×
1258
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
1259
        goto _exit;
×
1260
      }
1261

1262
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
190✔
1263
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
190!
1264
      
1265
      void *pIter = NULL;
190✔
1266
      while (1) {
992✔
1267
        SStmTaskDeploy info = {0};
1,182✔
1268
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
1,182✔
1269
        if (pIter == NULL) {
1,182✔
1270
          break;
190✔
1271
        }
1272
      
1273
        if (pVgroup->dbUid == pDb->uid && !pVgroup->isTsma) {
992!
1274
          pState = taosArrayReserve(pInfo->trigReaders, 1);
276✔
1275

1276
          code = msmTDAddSingleTrigReader(pCtx, pState, pVgroup->vgId, pInfo, pStream, streamId);
276✔
1277
          if (code) {
276!
1278
            sdbRelease(pSdb, pVgroup);
×
1279
            sdbCancelFetch(pSdb, pIter);
×
1280
            pVgroup = NULL;
×
1281
            TAOS_CHECK_EXIT(code);
×
1282
          }
1283
        }
1284

1285
        sdbRelease(pSdb, pVgroup);
992✔
1286
      }
1287
      break;
190✔
1288
    }
1289
    default:
1✔
1290
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
1!
1291
      break;
1✔
1292
  }
1293

1294
_exit:
286✔
1295

1296
  mndReleaseDb(pCtx->pMnode, pDb);
286✔
1297

1298
  if (code) {
286!
1299
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1300
  }
1301

1302
  return code;
286✔
1303
}
1304

1305
int32_t msmUPAddScanTask(SStmGrpCtx* pCtx, SStreamObj* pStream, char* scanPlan, int32_t vgId, int64_t taskId) {
170✔
1306
  int32_t code = TSDB_CODE_SUCCESS;
170✔
1307
  int32_t lino = 0;
170✔
1308
  SSubplan* pSubplan = NULL;
170✔
1309
  int64_t streamId = pStream->pCreate->streamId;
170✔
1310
  int64_t key[2] = {streamId, 0};
170✔
1311
  SStmTaskSrcAddr addr;
1312
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
170!
1313
  addr.isFromCache = false;
170✔
1314
  
1315
  if (MNODE_HANDLE == vgId) {
170!
1316
    mndGetMnodeEpSet(pCtx->pMnode, &addr.epset);
×
1317
  } else if (vgId > MNODE_HANDLE) {
170!
1318
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, vgId);
170✔
1319
  } else {
1320
    mstsError("invalid vgId %d in scanPlan", vgId);
×
1321
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1322
  }
1323
  
1324
  addr.taskId = taskId;
170✔
1325
  addr.vgId = vgId;
170✔
1326
  addr.groupId = pSubplan->id.groupId;
170✔
1327

1328
  key[1] = pSubplan->id.subplanId;
170✔
1329

1330
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
170✔
1331
  if (NULL == ppRes) {
170!
1332
    SArray* pRes = taosArrayInit(1, sizeof(addr));
170✔
1333
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
170!
1334
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
340!
1335
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
170!
1336
  } else {
1337
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1338
  }
1339

1340
  mstsDebug("calcReader %" PRIx64 " added to toUpdateScan, vgId:%d, groupId:%d, subplanId:%d", taskId, vgId, pSubplan->id.groupId, pSubplan->id.subplanId);
170✔
1341
  
1342
  (void)atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
170✔
1343
  
1344
_exit:
170✔
1345

1346
  nodesDestroyNode((SNode*)pSubplan);
170✔
1347

1348
  if (code) {
170!
1349
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1350
  }
1351

1352
  return code;
170✔
1353
}
1354

1355
int32_t msmUPAddCacheTask(SStmGrpCtx* pCtx, SStreamCalcScan* pScan, SStreamObj* pStream) {
152✔
1356
  int32_t code = TSDB_CODE_SUCCESS;
152✔
1357
  int32_t lino = 0;
152✔
1358
  SSubplan* pSubplan = NULL;
152✔
1359
  int64_t streamId = pStream->pCreate->streamId;
152✔
1360
  int64_t key[2] = {streamId, 0};
152✔
1361
  TAOS_CHECK_EXIT(nodesStringToNode(pScan->scanPlan, (SNode**)&pSubplan));
152!
1362

1363
  SStmTaskSrcAddr addr;
1364
  addr.isFromCache = true;
152✔
1365
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pCtx->triggerNodeId);
152✔
1366
  addr.taskId = pCtx->triggerTaskId;
152✔
1367
  addr.vgId = pCtx->triggerNodeId;
152✔
1368
  addr.groupId = pSubplan->id.groupId;
152✔
1369

1370
  key[1] = pSubplan->id.subplanId;
152✔
1371
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
152✔
1372
  if (NULL == ppRes) {
152!
1373
    SArray* pRes = taosArrayInit(1, sizeof(addr));
152✔
1374
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
152!
1375
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
304!
1376
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
152!
1377
  } else {
1378
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1379
  }
1380
  
1381
  (void)atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
152✔
1382
  
1383
_exit:
152✔
1384

1385
  nodesDestroyNode((SNode*)pSubplan);
152✔
1386
  
1387
  if (code) {
152!
1388
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1389
  }
1390

1391
  return code;
152✔
1392
}
1393

1394

1395
static int32_t msmTDAddCalcReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
286✔
1396
  int32_t code = TSDB_CODE_SUCCESS;
286✔
1397
  int32_t lino = 0;
286✔
1398
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
286✔
1399
  int64_t streamId = pStream->pCreate->streamId;
286✔
1400
  SStmTaskStatus* pState = NULL;
286✔
1401
  pInfo->calcReaders = taosArrayInit(calcTasksNum, sizeof(SStmTaskStatus));
286✔
1402
  TSDB_CHECK_NULL(pInfo->calcReaders, code, lino, _exit, terrno);
286!
1403
  
1404
  for (int32_t i = 0; i < calcTasksNum; ++i) {
606✔
1405
    SStreamCalcScan* pScan = taosArrayGet(pInfo->pCreate->calcScanPlanList, i);
320✔
1406
    if (pScan->readFromCache) {
320✔
1407
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
152!
1408
      continue;
152✔
1409
    }
1410
    
1411
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
168✔
1412
    for (int32_t m = 0; m < vgNum; ++m) {
336✔
1413
      pState = taosArrayReserve(pInfo->calcReaders, 1);
168✔
1414

1415
      pState->id.taskId = msmAssignTaskId();
168✔
1416
      pState->id.deployId = 0;
168✔
1417
      pState->id.seriousId = msmAssignTaskSeriousId();
168✔
1418
      pState->id.nodeId = *(int32_t*)taosArrayGet(pScan->vgList, m);
168✔
1419
      pState->id.taskIdx = i;
168✔
1420
      pState->type = STREAM_READER_TASK;
168✔
1421
      pState->flags = 0;
168✔
1422
      pState->status = STREAM_STATUS_UNDEPLOYED;
168✔
1423
      pState->lastUpTs = pCtx->currTs;
168✔
1424
      pState->pStream = pInfo;
168✔
1425

1426
      SStmTaskDeploy info = {0};
168✔
1427
      info.task.type = pState->type;
168✔
1428
      info.task.streamId = streamId;
168✔
1429
      info.task.taskId = pState->id.taskId;
168✔
1430
      info.task.flags = pState->flags;
168✔
1431
      info.task.seriousId = pState->id.seriousId;
168✔
1432
      info.task.nodeId = pState->id.nodeId;
168✔
1433
      info.task.taskIdx = pState->id.taskIdx;
168✔
1434
      TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, pScan->scanPlan, pInfo, false));
168!
1435
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pState->id.nodeId, pState->id.taskId));
168!
1436
      TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
168!
1437
    }
1438
  }
1439

1440
_exit:
286✔
1441

1442
  if (code) {
286!
1443
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1444
  }
1445

1446
  return code;
286✔
1447
}
1448

1449

1450

1451
static int32_t msmUPPrepareReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
2✔
1452
  int32_t code = TSDB_CODE_SUCCESS;
2✔
1453
  int32_t lino = 0;
2✔
1454
  int64_t streamId = pStream->pCreate->streamId;
2✔
1455
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
2✔
1456
  if (calcTasksNum <= 0) {
2!
1457
    mstsDebug("no calc scan plan, ignore parepare reader tasks, readerNum:%d", (int32_t)taosArrayGetSize(pInfo->calcReaders));
×
1458
    return code;    
×
1459
  }
1460
  
1461
  SStmTaskStatus* pReader = taosArrayGet(pInfo->calcReaders, 0);
2✔
1462
  
1463
  for (int32_t i = 0; i < calcTasksNum; ++i) {
4✔
1464
    SStreamCalcScan* pScan = taosArrayGet(pStream->pCreate->calcScanPlanList, i);
2✔
1465
    if (pScan->readFromCache) {
2!
1466
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
×
1467
      continue;
×
1468
    }
1469
    
1470
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
2✔
1471
    for (int32_t m = 0; m < vgNum; ++m) {
4✔
1472
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pReader->id.nodeId, pReader->id.taskId));
2!
1473
      pReader++;
2✔
1474
    }
1475
  }
1476

1477
_exit:
2✔
1478

1479
  if (code) {
2!
1480
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1481
  }
1482

1483
  return code;
2✔
1484
}
1485

1486
static int32_t msmBuildReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
286✔
1487
  int32_t code = TSDB_CODE_SUCCESS;
286✔
1488
  int32_t lino = 0;
286✔
1489
  int64_t streamId = pStream->pCreate->streamId;
286✔
1490
  
1491
  TAOS_CHECK_EXIT(msmTDAddTrigReaderTasks(pCtx, pInfo, pStream));
286!
1492
  TAOS_CHECK_EXIT(msmTDAddCalcReaderTasks(pCtx, pInfo, pStream));
286!
1493

1494
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->trigReaders, NULL));
286!
1495
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->calcReaders, NULL));
286!
1496
  
1497
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->trigReaders, NULL, true));
286!
1498
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->calcReaders, NULL, false));
286!
1499
  
1500
_exit:
286✔
1501

1502
  if (code) {
286!
1503
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1504
  }
1505
  
1506
  return code;
286✔
1507
}
1508

1509
int32_t msmUpdatePlanSourceAddr(SStreamTask* pTask, int64_t streamId, SSubplan* plan, int64_t clientId, SStmTaskSrcAddr* pSrc, int32_t msgType, int64_t srcSubplanId) {
1,028✔
1510
  SDownstreamSourceNode source = {
1,028✔
1511
      .type = QUERY_NODE_DOWNSTREAM_SOURCE,
1512
      .clientId = clientId,
1513
      .taskId = pSrc->taskId,
1,028✔
1514
      .sId = 0,
1515
      .execId = 0,
1516
      .fetchMsgType = msgType,
1517
      .localExec = false,
1518
  };
1519

1520
  source.addr.epSet = pSrc->epset;
1,028✔
1521
  source.addr.nodeId = pSrc->vgId;
1,028✔
1522

1523
  msttDebug("try to update subplan %d grp %d sourceAddr from subplan %" PRId64 ", clientId:%" PRIx64 ", srcTaskId:%" PRIx64 ", srcNodeId:%d, msgType:%s", 
1,028!
1524
      plan->id.subplanId, pSrc->groupId, srcSubplanId, source.clientId, source.taskId, source.addr.nodeId, TMSG_INFO(source.fetchMsgType));
1525
  
1526
  return qSetSubplanExecutionNode(plan, pSrc->groupId, &source);
1,028✔
1527
}
1528

1529
int32_t msmGetTaskIdFromSubplanId(SStreamObj* pStream, SArray* pRunners, int32_t beginIdx, int32_t subplanId, int64_t* taskId, SStreamTask** ppParent) {
66✔
1530
  int64_t streamId = pStream->pCreate->streamId;
66✔
1531
  int32_t runnerNum = taosArrayGetSize(pRunners);
66✔
1532
  for (int32_t i = beginIdx; i < runnerNum; ++i) {
165!
1533
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
165✔
1534
    SSubplan* pPlan = pDeploy->msg.runner.pPlan;
165✔
1535
    if (pPlan->id.subplanId == subplanId) {
165✔
1536
      *taskId = pDeploy->task.taskId;
66✔
1537
      *ppParent = &pDeploy->task;
66✔
1538
      return TSDB_CODE_SUCCESS;
66✔
1539
    }
1540
  }
1541

1542
  mstsError("subplanId %d not found in runner list", subplanId);
×
1543

1544
  return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
1545
}
1546

1547
int32_t msmUpdateLowestPlanSourceAddr(SSubplan* pPlan, SStmTaskDeploy* pDeploy, int64_t streamId) {
923✔
1548
  int32_t code = TSDB_CODE_SUCCESS;
923✔
1549
  int32_t lino = 0;
923✔
1550
  int64_t key[2] = {streamId, -1};
923✔
1551
  SNode* pNode = NULL;
923✔
1552
  SStreamTask* pTask = &pDeploy->task;
923✔
1553
  FOREACH(pNode, pPlan->pChildren) {
1,951!
1554
    if (QUERY_NODE_VALUE != nodeType(pNode)) {
1,028✔
1555
      msttDebug("node type %d is not valueNode, skip it", nodeType(pNode));
66!
1556
      continue;
66✔
1557
    }
1558
    
1559
    SValueNode* pVal = (SValueNode*)pNode;
962✔
1560
    if (TSDB_DATA_TYPE_BIGINT != pVal->node.resType.type) {
962!
1561
      msttWarn("invalid value node data type %d for runner's child subplan", pVal->node.resType.type);
×
1562
      continue;
×
1563
    }
1564

1565
    key[1] = MND_GET_RUNNER_SUBPLANID(pVal->datum.i);
962✔
1566

1567
    SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
962✔
1568
    if (NULL == ppRes) {
962!
1569
      msttError("lowest runner subplan ID:%d,%d can't get its child ID:%" PRId64 " addr", pPlan->id.groupId, pPlan->id.subplanId, key[1]);
×
1570
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1571
    }
1572

1573
    int32_t childrenNum = taosArrayGetSize(*ppRes);
962✔
1574
    for (int32_t i = 0; i < childrenNum; ++i) {
1,924✔
1575
      SStmTaskSrcAddr* pAddr = taosArrayGet(*ppRes, i);
962✔
1576
      TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(pTask, streamId, pPlan, pDeploy->task.taskId, pAddr, pAddr->isFromCache ? TDMT_STREAM_FETCH_FROM_CACHE : TDMT_STREAM_FETCH, key[1]));
962!
1577
    }
1578
  }
1579

1580
_exit:
923✔
1581

1582
  if (code) {
923!
1583
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1584
  }
1585

1586
  return code;
923✔
1587
}
1588

1589
int32_t msmUpdateRunnerPlan(SStmGrpCtx* pCtx, SArray* pRunners, int32_t beginIdx, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
923✔
1590
  int32_t code = TSDB_CODE_SUCCESS;
923✔
1591
  int32_t lino = 0;
923✔
1592
  SSubplan* pPlan = pDeploy->msg.runner.pPlan;
923✔
1593
  SStreamTask* pTask = &pDeploy->task;
923✔
1594
  SStreamTask* parentTask = NULL;
923✔
1595
  int64_t streamId = pStream->pCreate->streamId;
923✔
1596

1597
  TAOS_CHECK_EXIT(msmUpdateLowestPlanSourceAddr(pPlan, pDeploy, streamId));
923!
1598

1599
  SNode* pTmp = NULL;
923✔
1600
  WHERE_EACH(pTmp, pPlan->pChildren) {
1,951!
1601
    if (QUERY_NODE_VALUE == nodeType(pTmp)) {
1,028✔
1602
      ERASE_NODE(pPlan->pChildren);
962✔
1603
      continue;
962✔
1604
    }
1605
    WHERE_NEXT;
66✔
1606
  }
1607
  nodesClearList(pPlan->pChildren);
923✔
1608
  pPlan->pChildren = NULL;
923✔
1609

1610
  if (NULL == pPlan->pParents) {
923✔
1611
    goto _exit;
857✔
1612
  }
1613

1614
  SNode* pNode = NULL;
66✔
1615
  int64_t parentTaskId = 0;
66✔
1616
  SStmTaskSrcAddr addr = {0};
66✔
1617
  addr.taskId = pDeploy->task.taskId;
66✔
1618
  addr.vgId = pDeploy->task.nodeId;
66✔
1619
  addr.groupId = pPlan->id.groupId;
66✔
1620
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pDeploy->task.nodeId);
66✔
1621
  FOREACH(pNode, pPlan->pParents) {
132!
1622
    SSubplan* pSubplan = (SSubplan*)pNode;
66✔
1623
    TAOS_CHECK_EXIT(msmGetTaskIdFromSubplanId(pStream, pRunners, beginIdx, pSubplan->id.subplanId, &parentTaskId, &parentTask));
66!
1624
    TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(parentTask, streamId, pSubplan, parentTaskId, &addr, TDMT_STREAM_FETCH_FROM_RUNNER, pPlan->id.subplanId));
66!
1625
  }
1626
  
1627
_exit:
66✔
1628

1629
  if (code) {
923!
1630
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1631
  }
1632

1633
  return code;
923✔
1634
}
1635

1636
int32_t msmUpdateRunnerPlans(SStmGrpCtx* pCtx, SArray* pRunners, SStreamObj* pStream) {
857✔
1637
  int32_t code = TSDB_CODE_SUCCESS;
857✔
1638
  int32_t lino = 0;
857✔
1639
  int64_t streamId = pStream->pCreate->streamId;
857✔
1640
  int32_t runnerNum = taosArrayGetSize(pRunners);
857✔
1641
  
1642
  for (int32_t i = 0; i < runnerNum; ++i) {
1,780✔
1643
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
923✔
1644
    TAOS_CHECK_EXIT(msmUpdateRunnerPlan(pCtx, pRunners, i, pDeploy, pStream));
923!
1645
    TAOS_CHECK_EXIT(nodesNodeToString((SNode*)pDeploy->msg.runner.pPlan, false, (char**)&pDeploy->msg.runner.pPlan, NULL));
923!
1646

1647
    SStreamTask* pTask = &pDeploy->task;
923✔
1648
    msttDebugL("runner updated task plan:%s", (const char*)pDeploy->msg.runner.pPlan);
923✔
1649
  }
1650

1651
_exit:
857✔
1652

1653
  if (code) {
857!
1654
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1655
  }
1656

1657
  return code;
857✔
1658
}
1659

1660
int32_t msmBuildRunnerTasksImpl(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream) {
285✔
1661
  int32_t code = 0;
285✔
1662
  int32_t lino = 0;
285✔
1663
  int64_t streamId = pStream->pCreate->streamId;
285✔
1664
  SArray* deployTaskList = NULL;
285✔
1665
  SArray* deployList = NULL;
285✔
1666
  int32_t deployNodeId = 0;
285✔
1667
  SStmTaskStatus* pState = NULL;
285✔
1668
  int32_t taskIdx = 0;
285✔
1669
  SNodeListNode *plans = NULL;
285✔
1670
  int32_t        taskNum = 0;
285✔
1671
  int32_t        totalTaskNum = 0;
285✔
1672

1673
  if (pDag->numOfSubplans <= 0) {
285!
1674
    mstsError("invalid subplan num:%d", pDag->numOfSubplans);
×
1675
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1676
  }
1677

1678
  if (pDag->numOfSubplans != pStream->pCreate->numOfCalcSubplan) {
285!
1679
    mstsError("numOfCalcSubplan %d mismatch with numOfSubplans %d", pStream->pCreate->numOfCalcSubplan, pDag->numOfSubplans);
×
1680
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1681
  }
1682

1683
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
285!
1684
  if (levelNum <= 0) {
285!
1685
    mstsError("invalid level num:%d", levelNum);
×
1686
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1687
  }
1688

1689
  int32_t        lowestLevelIdx = levelNum - 1;
285✔
1690
  
1691
  plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, 0);
285✔
1692
  if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
285!
1693
    mstsError("invalid level plan, level:0, planNodeType:%d", nodeType(plans));
×
1694
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1695
  }
1696
  
1697
  taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
285!
1698
  if (taskNum != 1) {
285!
1699
    mstsError("invalid level plan number:%d, level:0", taskNum);
×
1700
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1701
  }
1702

1703
  deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
285✔
1704
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
285!
1705
  
1706
  for (int32_t deployId = 0; deployId < pInfo->runnerDeploys; ++deployId) {
1,140✔
1707
    totalTaskNum = 0;
855✔
1708

1709
    deployList = pInfo->runners[deployId];
855✔
1710
    deployNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == deployId) ? true : false);
855✔
1711
    if (!GOT_SNODE(deployNodeId)) {
855!
1712
      TAOS_CHECK_EXIT(terrno);
×
1713
    }
1714

1715
    taskIdx = 0;
855✔
1716
    
1717
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
1,743✔
1718
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
888✔
1719
      if (NULL == plans) {
888!
1720
        mstsError("empty level plan, level:%d", i);
×
1721
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1722
      }
1723

1724
      if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
888!
1725
        mstsError("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
×
1726
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1727
      }
1728

1729
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
888!
1730
      if (taskNum <= 0) {
888!
1731
        mstsError("invalid level plan number:%d, level:%d", taskNum, i);
×
1732
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1733
      }
1734

1735
      totalTaskNum += taskNum;
888✔
1736
      if (totalTaskNum > pDag->numOfSubplans) {
888!
1737
        mstsError("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
×
1738
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1739
      }
1740

1741
      for (int32_t n = 0; n < taskNum; ++n) {
1,809✔
1742
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
921✔
1743
        pState = taosArrayReserve(deployList, 1);
921✔
1744

1745
        pState->id.taskId = msmAssignTaskId();
921✔
1746
        pState->id.deployId = deployId;
921✔
1747
        pState->id.seriousId = msmAssignTaskSeriousId();
921✔
1748
        pState->id.nodeId = deployNodeId;
921✔
1749
        pState->id.taskIdx = MND_SET_RUNNER_TASKIDX(i, n);
921✔
1750
        pState->type = STREAM_RUNNER_TASK;
921✔
1751
        pState->flags = (0 == i) ? STREAM_FLAG_TOP_RUNNER : 0;
921✔
1752
        pState->status = STREAM_STATUS_UNDEPLOYED;
921✔
1753
        pState->lastUpTs = pCtx->currTs;
921✔
1754
        pState->pStream = pInfo;
921✔
1755

1756
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
921✔
1757
        pDeploy->task.type = pState->type;
921✔
1758
        pDeploy->task.streamId = streamId;
921✔
1759
        pDeploy->task.taskId = pState->id.taskId;
921✔
1760
        pDeploy->task.flags = pState->flags;
921✔
1761
        pDeploy->task.seriousId = pState->id.seriousId;
921✔
1762
        pDeploy->task.deployId = pState->id.deployId;
921✔
1763
        pDeploy->task.nodeId = pState->id.nodeId;
921✔
1764
        pDeploy->task.taskIdx = pState->id.taskIdx;
921✔
1765
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
921!
1766

1767
        SStreamTask* pTask = &pDeploy->task;
921✔
1768
        msttDebug("runner task deploy built, subplan level:%d, taskIdx:%d, groupId:%d, subplanId:%d",
921✔
1769
            i, pTask->taskIdx, plan->id.groupId, plan->id.subplanId);
1770
      }
1771

1772
      mstsDebug("deploy %d level %d initialized, taskNum:%d", deployId, i, taskNum);
888✔
1773
    }
1774

1775
    if (totalTaskNum != pDag->numOfSubplans) {
855!
1776
      mstsError("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
×
1777
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1778
    }
1779

1780
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
855!
1781

1782
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
855!
1783

1784
    nodesDestroyNode((SNode *)pDag);
855✔
1785
    pDag = NULL;
855✔
1786
    
1787
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
855!
1788

1789
    mstsDebug("total %d runner tasks added for deploy %d", totalTaskNum, deployId);
855✔
1790
  }
1791

1792
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,140✔
1793
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->runners[i], NULL));
855!
1794
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[i], NULL, 0, i));
855!
1795
  }
1796
  
1797
  pInfo->runnerNum = totalTaskNum;
285✔
1798
  
1799
_exit:
285✔
1800

1801
  if (code) {
285!
1802
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1803
  }
1804

1805
  taosArrayDestroy(deployTaskList);
285✔
1806
  nodesDestroyNode((SNode *)pDag);
285✔
1807

1808
  return code;
285✔
1809
}
1810

1811
int32_t msmReBuildRunnerTasks(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream, SStmTaskAction* pAction) {
2✔
1812
  int32_t code = 0;
2✔
1813
  int32_t lino = 0;
2✔
1814
  int64_t streamId = pStream->pCreate->streamId;
2✔
1815
  int32_t newNodeId = 0;
2✔
1816
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
2!
1817
  int32_t        lowestLevelIdx = levelNum - 1;
2✔
1818
  SNodeListNode *plans = NULL;
2✔
1819
  int32_t        taskNum = 0;
2✔
1820
  int32_t        totalTaskNum = 0;
2✔
1821
  int32_t        deployId = 0;
2✔
1822
  SStmTaskStatus* pRunner = NULL;
2✔
1823
  SStmTaskStatus* pStartRunner = NULL;
2✔
1824
  int32_t taskIdx = 0;
2✔
1825
  SArray* deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
2✔
1826
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
2!
1827

1828
  for (int32_t r = 0; r < pAction->deployNum; ++r) {
4✔
1829
    deployId = pAction->deployId[r];
2✔
1830

1831
    pRunner = taosArrayGet(pInfo->runners[deployId], 0);
2✔
1832

1833
    pStartRunner = pRunner;
2✔
1834
    totalTaskNum = 0;
2✔
1835

1836
    newNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == r) ? true : false);
2✔
1837
    if (!GOT_SNODE(newNodeId)) {
2!
1838
      TAOS_CHECK_EXIT(terrno);
×
1839
    }
1840

1841
    taskIdx = 0;
2✔
1842
    
1843
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
4✔
1844
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
2✔
1845
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
2!
1846
      totalTaskNum += taskNum;
2✔
1847

1848
      pRunner->flags &= STREAM_FLAG_REDEPLOY_RUNNER;
2✔
1849
      
1850
      for (int32_t n = 0; n < taskNum; ++n) {
4✔
1851
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
2✔
1852

1853
        int32_t newTaskIdx = MND_SET_RUNNER_TASKIDX(i, n);
2✔
1854
        if (pRunner->id.taskIdx != newTaskIdx) {
2!
1855
          mstsError("runner TASK:%" PRId64 " taskIdx %d mismatch with newTaskIdx:%d", pRunner->id.taskId, pRunner->id.taskIdx, newTaskIdx);
×
1856
          TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
1857
        }
1858

1859
        pRunner->id.nodeId = newNodeId;
2✔
1860

1861
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
2✔
1862
        pDeploy->task.type = pRunner->type;
2✔
1863
        pDeploy->task.streamId = streamId;
2✔
1864
        pDeploy->task.taskId = pRunner->id.taskId;
2✔
1865
        pDeploy->task.flags = pRunner->flags;
2✔
1866
        pDeploy->task.seriousId = pRunner->id.seriousId;
2✔
1867
        pDeploy->task.nodeId = pRunner->id.nodeId;
2✔
1868
        pDeploy->task.taskIdx = pRunner->id.taskIdx;
2✔
1869
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
2!
1870

1871
        pRunner++;
2✔
1872
      }
1873

1874
      mstsDebug("level %d initialized, taskNum:%d", i, taskNum);
2!
1875
    }
1876

1877
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
2!
1878

1879
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
2!
1880

1881
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[deployId], NULL, 0, deployId));
2!
1882

1883
    nodesDestroyNode((SNode *)pDag);
2✔
1884
    pDag = NULL;
2✔
1885

1886
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
2!
1887
  }
1888

1889
_exit:
2✔
1890

1891
  if (code) {
2!
1892
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1893
  }
1894

1895
  nodesDestroyNode((SNode *)pDag);
2✔
1896
  taosArrayDestroy(deployTaskList);
2✔
1897

1898
  return code;
2✔
1899
}
1900

1901

1902
int32_t msmSetStreamRunnerExecReplica(int64_t streamId, SStmStatus* pInfo) {
258✔
1903
  int32_t code = TSDB_CODE_SUCCESS;
258✔
1904
  int32_t lino = 0;
258✔
1905
  //STREAMTODO 
1906
  
1907
  pInfo->runnerDeploys = MND_STREAM_RUNNER_DEPLOY_NUM;
258✔
1908
  pInfo->runnerReplica = 3;
258✔
1909

1910
_exit:
258✔
1911

1912
  if (code) {
258!
1913
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1914
  }
1915

1916
  return code;
258✔
1917
}
1918

1919

1920
static int32_t msmBuildRunnerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
286✔
1921
  if (NULL == pStream->pCreate->calcPlan) {
286✔
1922
    return TSDB_CODE_SUCCESS;
1✔
1923
  }
1924
  
1925
  int32_t code = TSDB_CODE_SUCCESS;
285✔
1926
  int32_t lino = 0;
285✔
1927
  int64_t streamId = pStream->pCreate->streamId;
285✔
1928
  SQueryPlan* pPlan = NULL;
285✔
1929

1930
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
285!
1931

1932
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,140✔
1933
    pInfo->runners[i] = taosArrayInit(pPlan->numOfSubplans, sizeof(SStmTaskStatus));
855✔
1934
    TSDB_CHECK_NULL(pInfo->runners[i], code, lino, _exit, terrno);
855!
1935
  }
1936

1937
  code = msmBuildRunnerTasksImpl(pCtx, pPlan, pInfo, pStream);
285✔
1938
  pPlan = NULL;
285✔
1939
  
1940
  TAOS_CHECK_EXIT(code);
285!
1941

1942
  taosHashClear(mStreamMgmt.toUpdateScanMap);
285✔
1943
  mStreamMgmt.toUpdateScanNum = 0;
285✔
1944

1945
_exit:
285✔
1946

1947
  nodesDestroyNode((SNode *)pPlan);
285✔
1948

1949
  if (code) {
285!
1950
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1951
  }
1952

1953
  return code;
285✔
1954
}
1955

1956

1957
static int32_t msmBuildStreamTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
286✔
1958
  int32_t code = TSDB_CODE_SUCCESS;
286✔
1959
  int32_t lino = 0;
286✔
1960
  int64_t streamId = pStream->pCreate->streamId;
286✔
1961

1962
  mstsInfo("start to deploy stream tasks, deployTimes:%" PRId64, pInfo->deployTimes);
286!
1963

1964
  pCtx->triggerTaskId = msmAssignTaskId();
286✔
1965
  pCtx->triggerNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
286✔
1966
  if (!GOT_SNODE(pCtx->triggerNodeId)) {
286!
1967
    TAOS_CHECK_EXIT(terrno);
×
1968
  }
1969

1970
  TAOS_CHECK_EXIT(msmBuildReaderTasks(pCtx, pInfo, pStream));
286!
1971
  TAOS_CHECK_EXIT(msmBuildRunnerTasks(pCtx, pInfo, pStream));
286!
1972
  TAOS_CHECK_EXIT(msmBuildTriggerTasks(pCtx, pInfo, pStream));
286!
1973
  
1974
_exit:
286✔
1975

1976
  if (code) {
286!
1977
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1978
  }
1979

1980
  return code;
286✔
1981
}
1982

1983
static int32_t msmInitTrigReaderList(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1984
  int32_t code = TSDB_CODE_SUCCESS;
×
1985
  int32_t lino = 0;
×
1986
  int64_t streamId = pStream->pCreate->streamId;
×
1987
  SSdb   *pSdb = pCtx->pMnode->pSdb;
×
1988
  SStmTaskStatus* pState = NULL;
×
1989
  SDbObj* pDb = NULL;
×
1990
  
1991
  switch (pStream->pCreate->triggerTblType) {
×
1992
    case TSDB_NORMAL_TABLE:
×
1993
    case TSDB_CHILD_TABLE:
1994
    case TSDB_VIRTUAL_CHILD_TABLE:
1995
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1996
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
×
1997
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
1998
      pInfo->trigReaderNum = 1;
×
1999
      break;
×
2000
    }
2001
    case TSDB_SUPER_TABLE: {
×
2002
      pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
×
2003
      if (NULL == pDb) {
×
2004
        code = terrno;
×
2005
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
2006
        goto _exit;
×
2007
      }
2008

2009
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
×
2010
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
2011
      pInfo->trigReaderNum = pDb->cfg.numOfVgroups;
×
2012
      mndReleaseDb(pCtx->pMnode, pDb);
×
2013
      pDb = NULL;
×
2014
      break;
×
2015
    }
2016
    default:
×
2017
      pInfo->trigReaderNum = 0;
×
2018
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
×
2019
      break;
×
2020
  }
2021

2022
_exit:
×
2023

2024
  if (code) {
×
2025
    mndReleaseDb(pCtx->pMnode, pDb);
×
2026
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2027
  }
2028

2029
  return code;
×
2030
}
2031

2032

2033
static int32_t msmInitStmStatus(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStreamObj* pStream, bool initList) {
259✔
2034
  int32_t code = TSDB_CODE_SUCCESS;
259✔
2035
  int32_t lino = 0;
259✔
2036
  int64_t streamId = pStream->pCreate->streamId;
259✔
2037

2038
  pStatus->lastActionTs = INT64_MIN;
259✔
2039

2040
  if (NULL == pStatus->streamName) {
259!
2041
    pStatus->streamName = taosStrdup(pStream->name);
259!
2042
    TSDB_CHECK_NULL(pStatus->streamName, code, lino, _exit, terrno);
259!
2043
  }
2044

2045
  TAOS_CHECK_EXIT(tCloneStreamCreateDeployPointers(pStream->pCreate, &pStatus->pCreate));
259!
2046
  
2047
  if (pStream->pCreate->numOfCalcSubplan > 0) {
259✔
2048
    pStatus->runnerNum = pStream->pCreate->numOfCalcSubplan;
258✔
2049
    
2050
    TAOS_CHECK_EXIT(msmSetStreamRunnerExecReplica(streamId, pStatus));
258!
2051
  }
2052

2053
  if (initList) {
259!
2054
    TAOS_CHECK_EXIT(msmInitTrigReaderList(pCtx, pStatus, pStream));
×
2055

2056
    int32_t subPlanNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
×
2057
    if (subPlanNum > 0) {
×
2058
      pStatus->calcReaderNum = subPlanNum;
×
2059
      pStatus->calcReaders = taosArrayInit(subPlanNum, sizeof(SStmTaskStatus));
×
2060
      TSDB_CHECK_NULL(pStatus->calcReaders, code, lino, _exit, terrno);
×
2061
    }
2062

2063
    if (pStatus->runnerNum > 0) {
×
2064
      for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
2065
        pStatus->runners[i] = taosArrayInit(pStatus->runnerNum, sizeof(SStmTaskStatus));
×
2066
        TSDB_CHECK_NULL(pStatus->runners[i], code, lino, _exit, terrno);
×
2067
      }
2068
    }
2069
  }
2070
  
2071
_exit:
259✔
2072

2073
  if (code) {
259!
2074
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2075
  }
2076

2077
  return code;
259✔
2078
}
2079

2080
static int32_t msmDeployStreamTasks(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmStatus* pStatus) {
286✔
2081
  int32_t code = TSDB_CODE_SUCCESS;
286✔
2082
  int32_t lino = 0;
286✔
2083
  int64_t streamId = pStream->pCreate->streamId;
286✔
2084
  SStmStatus info = {0};
286✔
2085

2086
  if (NULL == pStatus) {
286✔
2087
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &info, pStream, false));
259!
2088

2089
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &info, sizeof(info)));
259!
2090

2091
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
259✔
2092
  }
2093
  
2094
  TAOS_CHECK_EXIT(msmBuildStreamTasks(pCtx, pStatus, pStream));
286!
2095

2096
  mstLogSStmStatus("stream deployed", streamId, pStatus);
286✔
2097

2098
_exit:
286✔
2099

2100
  if (code) {
286!
2101
    if (NULL != pStatus) {
×
2102
      msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2103
      mstsError("stream build error:%s, will try to stop current stream", tstrerror(code));
×
2104
    }
2105
    
2106
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2107
  }
2108

2109
  return code;
286✔
2110
}
2111

2112

2113
static int32_t msmSTRemoveStream(int64_t streamId, bool fromStreamMap) {
36✔
2114
  int32_t code = TSDB_CODE_SUCCESS;
36✔
2115
  void* pIter = NULL;
36✔
2116

2117
  while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
66✔
2118
    SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
30✔
2119
    (void)mstWaitLock(&pVg->lock, true);
30✔
2120

2121
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
30✔
2122
    if (atomic_load_32(&pVg->deployed) == taskNum) {
30✔
2123
      taosRUnLockLatch(&pVg->lock);
1✔
2124
      continue;
1✔
2125
    }
2126

2127
    for (int32_t i = 0; i < taskNum; ++i) {
111✔
2128
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
82✔
2129
      if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
82✔
2130
        continue;
76✔
2131
      }
2132

2133
      mstDestroySStmTaskToDeployExt(pExt);
6✔
2134
      pExt->deployed = true;
6✔
2135
    }
2136
    
2137
    taosRUnLockLatch(&pVg->lock);
29✔
2138
  }
2139

2140
  while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
69✔
2141
    SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
33✔
2142
    (void)mstWaitLock(&pSnode->lock, true);
33✔
2143

2144
    int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
33✔
2145
    if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
33✔
2146
      for (int32_t i = 0; i < taskNum; ++i) {
62✔
2147
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
38✔
2148
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
38!
2149
          continue;
37✔
2150
        }
2151
        
2152
        mstDestroySStmTaskToDeployExt(pExt);
1✔
2153
        pExt->deployed = true;
1✔
2154
      }
2155
    }
2156

2157
    taskNum = taosArrayGetSize(pSnode->runnerList);
33✔
2158
    if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
33!
2159
      for (int32_t i = 0; i < taskNum; ++i) {
140✔
2160
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
107✔
2161
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
107!
2162
          continue;
105✔
2163
        }
2164
        
2165
        mstDestroySStmTaskToDeployExt(pExt);
2✔
2166
        pExt->deployed = true;
2✔
2167
      }
2168
    }
2169

2170
    taosRUnLockLatch(&pSnode->lock);
33✔
2171
  }
2172

2173
  
2174
  while ((pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter))) {
160✔
2175
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
124✔
2176
    code = taosHashRemove(pSnode->streamTasks, &streamId, sizeof(streamId));
124✔
2177
    if (TSDB_CODE_SUCCESS == code) {
124✔
2178
      mstsDebug("stream removed from snodeMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pSnode->streamTasks));
54✔
2179
    }
2180
  }
2181

2182
  while ((pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter))) {
155✔
2183
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
119✔
2184
    code = taosHashRemove(pVg->streamTasks, &streamId, sizeof(streamId));
119✔
2185
    if (TSDB_CODE_SUCCESS == code) {
119✔
2186
      mstsDebug("stream removed from vgroupMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pVg->streamTasks));
49✔
2187
    }
2188
  }
2189

2190
  size_t keyLen = 0;
36✔
2191
  while ((pIter = taosHashIterate(mStreamMgmt.taskMap, pIter))) {
1,512✔
2192
    int64_t* pStreamId = taosHashGetKey(pIter, &keyLen);
1,476✔
2193
    if (*pStreamId == streamId) {
1,476✔
2194
      int64_t taskId = *(pStreamId + 1);
225✔
2195
      code = taosHashRemove(mStreamMgmt.taskMap, pStreamId, keyLen);
225✔
2196
      if (code) {
225!
2197
        mstsError("TASK:%" PRIx64 " remove from taskMap failed, error:%s", taskId, tstrerror(code));
×
2198
      } else {
2199
        mstsDebug("TASK:%" PRIx64 " removed from taskMap", taskId);
225✔
2200
      }
2201
    }
2202
  }
2203

2204
  if (fromStreamMap) {
36✔
2205
    code = taosHashRemove(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
9✔
2206
    if (code) {
9!
2207
      mstsError("stream remove from streamMap failed, error:%s", tstrerror(code));
×
2208
    } else {
2209
      mstsDebug("stream removed from streamMap, remains:%d", taosHashGetSize(mStreamMgmt.streamMap));
9✔
2210
    }
2211
  }
2212
  
2213
  return code;
36✔
2214
}
2215

2216
static void msmResetStreamForRedeploy(int64_t streamId, SStmStatus* pStatus) {
27✔
2217
  mstsInfo("try to reset stream for redeploy, stopped:%d, current deployTimes:%" PRId64, atomic_load_8(&pStatus->stopped), pStatus->deployTimes);
27!
2218
  
2219
  (void)msmSTRemoveStream(streamId, false);  
27✔
2220

2221
  mstResetSStmStatus(pStatus);
27✔
2222

2223
  pStatus->deployTimes++;
27✔
2224
}
27✔
2225

2226
static int32_t msmLaunchStreamDeployAction(SStmGrpCtx* pCtx, SStmStreamAction* pAction) {
287✔
2227
  int32_t code = TSDB_CODE_SUCCESS;
287✔
2228
  int32_t lino = 0;
287✔
2229
  int64_t streamId = pAction->streamId;
287✔
2230
  char* streamName = pAction->streamName;
287✔
2231
  SStreamObj* pStream = NULL;
287✔
2232
  int8_t stopped = 0;
287✔
2233

2234
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
287✔
2235
  if (pStatus) {
287✔
2236
    stopped = atomic_load_8(&pStatus->stopped);
27✔
2237
    if (0 == stopped) {
27!
2238
      mstsDebug("stream %s will try to reset and redeploy it", pAction->streamName);
×
2239
      msmResetStreamForRedeploy(streamId, pStatus);
×
2240
    } else {
2241
      if (MST_IS_USER_STOPPED(stopped) && !pAction->userAction) {
27!
2242
        mstsWarn("stream %s already stopped by user, stopped:%d, ignore deploy it", pAction->streamName, stopped);
×
2243
        return code;
×
2244
      }
2245
      
2246
      if (stopped == atomic_val_compare_exchange_8(&pStatus->stopped, stopped, 0)) {
27!
2247
        mstsDebug("stream %s will try to reset and redeploy it from stopped %d", pAction->streamName, stopped);
27✔
2248
        msmResetStreamForRedeploy(streamId, pStatus);
27✔
2249
      }
2250
    }
2251
  }
2252

2253
  code = mndAcquireStream(pCtx->pMnode, streamName, &pStream);
287✔
2254
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
287✔
2255
    mstsWarn("stream %s no longer exists, ignore deploy", streamName);
1!
2256
    return TSDB_CODE_SUCCESS;
1✔
2257
  }
2258

2259
  TAOS_CHECK_EXIT(code);
286!
2260

2261
  if (pStatus && pStream->pCreate->streamId != streamId) {
286!
2262
    mstsWarn("stream %s already dropped by user, ignore deploy it", pAction->streamName);
×
2263
    atomic_store_8(&pStatus->stopped, 2);
×
2264
    mstsInfo("set stream %s stopped by user since streamId mismatch", streamName);
×
2265
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2266
  }
2267

2268
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
286✔
2269
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
286✔
2270
  if (userStopped || userDropped) {
286!
2271
    mstsWarn("stream %s is stopped %d or removing %d, ignore deploy", streamName, userStopped, userDropped);
×
2272
    goto _exit;
×
2273
  }
2274
  
2275
  TAOS_CHECK_EXIT(msmDeployStreamTasks(pCtx, pStream, pStatus));
286!
2276

2277
_exit:
286✔
2278

2279
  mndReleaseStream(pCtx->pMnode, pStream);
286✔
2280

2281
  if (code) {
286!
2282
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2283
  }
2284

2285
  return code;
286✔
2286
}
2287

2288
static int32_t msmReLaunchReaderTask(SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
41✔
2289
  int32_t code = TSDB_CODE_SUCCESS;
41✔
2290
  int32_t lino = 0;
41✔
2291
  int64_t streamId = pAction->streamId;
41✔
2292
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
41✔
2293
  if (NULL == ppTask) {
41!
2294
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
×
2295
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2296
  }
2297
  
2298
  SStmTaskDeploy info = {0};
41✔
2299
  info.task.type = pAction->type;
41✔
2300
  info.task.streamId = pAction->streamId;
41✔
2301
  info.task.taskId = pAction->id.taskId;
41✔
2302
  info.task.seriousId = (*ppTask)->id.seriousId;
41✔
2303
  info.task.nodeId = pAction->id.nodeId;
41✔
2304
  info.task.taskIdx = pAction->id.taskIdx;
41✔
2305
  
2306
  bool isTriggerReader = STREAM_IS_TRIGGER_READER(pAction->flag);
41✔
2307
  SStreamCalcScan* scanPlan = NULL;
41✔
2308
  if (!isTriggerReader) {
41✔
2309
    scanPlan = taosArrayGet(pStatus->pCreate->calcScanPlanList, pAction->id.taskIdx);
21✔
2310
    if (NULL == scanPlan) {
21!
2311
      mstsError("fail to get TASK:%" PRId64 " scanPlan, taskIdx:%d, scanPlanNum:%zu", 
×
2312
          pAction->id.taskId, pAction->id.taskIdx, taosArrayGetSize(pStatus->pCreate->calcScanPlanList));
2313
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2314
    }
2315
  }
2316
  
2317
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, scanPlan ? scanPlan->scanPlan : NULL, pStatus, isTriggerReader));
41!
2318
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, pAction->streamId));
41!
2319

2320
_exit:
41✔
2321

2322
  if (code) {
41!
2323
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2324
  }
2325

2326
  return code;
41✔
2327
}
2328

2329
/*
2330
static int32_t msmReLaunchTriggerTask(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
2331
  int32_t code = TSDB_CODE_SUCCESS;
2332
  int32_t lino = 0;
2333
  int64_t streamId = pAction->streamId;
2334
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
2335
  if (NULL == ppTask) {
2336
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
2337
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
2338
  }
2339
  
2340
  (*ppTask)->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2341
  if (!GOT_SNODE((*ppTask)->id.nodeId)) {
2342
    mstsError("no avaible snode for deploying trigger task, seriousId: %" PRId64, (*ppTask)->id.seriousId);
2343
    return TSDB_CODE_SUCCESS;
2344
  }
2345
  
2346
  SStmTaskDeploy info = {0};
2347
  info.task.type = pAction->type;
2348
  info.task.streamId = streamId;
2349
  info.task.taskId = pAction->id.taskId;
2350
  info.task.seriousId = (*ppTask)->id.seriousId;
2351
  info.task.nodeId = (*ppTask)->id.nodeId;
2352
  info.task.taskIdx = pAction->id.taskIdx;
2353
  
2354
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2355
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2356
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, *ppTask, 1, -1));
2357
  
2358
  atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2359

2360
_exit:
2361

2362
  if (code) {
2363
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
2364
  }
2365

2366
  return code;
2367
}
2368
*/
2369

2370
static int32_t msmReLaunchRunnerDeploy(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
2✔
2371
  int32_t code = TSDB_CODE_SUCCESS;
2✔
2372
  int32_t lino = 0;
2✔
2373
  int64_t streamId = pAction->streamId;
2✔
2374
  
2375
/*
2376
  if (pAction->triggerStatus) {
2377
    pCtx->triggerTaskId = pAction->triggerStatus->id.taskId;
2378
    pAction->triggerStatus->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2379
    if (!GOT_SNODE(pAction->triggerStatus->id.nodeId)) {
2380
      mstsError("no avaible snode for deploying trigger task, seriousId:%" PRId64, pAction->triggerStatus->id.seriousId);
2381
      return TSDB_CODE_SUCCESS;
2382
    }
2383
  
2384
    pCtx->triggerNodeId = pAction->triggerStatus->id.nodeId;
2385
  } else {
2386
*/
2387
  pCtx->triggerTaskId = pStatus->triggerTask->id.taskId;
2✔
2388
  pCtx->triggerNodeId = pStatus->triggerTask->id.nodeId;
2✔
2389
//  }
2390
  
2391
  TAOS_CHECK_EXIT(msmUPPrepareReaderTasks(pCtx, pStatus, pStream));
2!
2392
  
2393
  SQueryPlan* pPlan = NULL;
2✔
2394
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
2!
2395
  
2396
  TAOS_CHECK_EXIT(msmReBuildRunnerTasks(pCtx, pPlan, pStatus, pStream, pAction));
2!
2397
  
2398
  taosHashClear(mStreamMgmt.toUpdateScanMap);
2✔
2399
  mStreamMgmt.toUpdateScanNum = 0;
2✔
2400
  
2401
/*
2402
  if (pAction->triggerStatus) {
2403
    SStmTaskDeploy info = {0};
2404
    info.task.type = STREAM_TRIGGER_TASK;
2405
    info.task.streamId = streamId;
2406
    info.task.taskId = pCtx->triggerTaskId;
2407
    info.task.seriousId = pAction->triggerStatus->id.seriousId;
2408
    info.task.nodeId = pCtx->triggerNodeId;
2409
    info.task.taskIdx = 0;
2410
  
2411
    TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2412
    TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2413
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pAction->triggerStatus, 1, -1));
2414
    
2415
    atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2416
  }
2417
*/
2418

2419
_exit:
2✔
2420

2421
  if (code) {
2!
2422
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2423
  }
2424

2425
  return code;
2✔
2426
}
2427

2428

2429
static int32_t msmLaunchTaskDeployAction(SStmGrpCtx* pCtx, SStmTaskAction* pAction) {
43✔
2430
  int32_t code = TSDB_CODE_SUCCESS;
43✔
2431
  int32_t lino = 0;
43✔
2432
  int64_t streamId = pAction->streamId;
43✔
2433
  int64_t taskId = pAction->id.taskId;
43✔
2434
  SStreamObj* pStream = NULL;
43✔
2435

2436
  mstsDebug("start to handle stream tasks action, action task type:%s", gStreamTaskTypeStr[pAction->type]);
43✔
2437

2438
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &pAction->streamId, sizeof(pAction->streamId));
43✔
2439
  if (NULL == pStatus) {
43!
2440
    mstsWarn("stream not in streamMap, remain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2441
    return TSDB_CODE_SUCCESS;
×
2442
  }
2443

2444
  int8_t stopped = atomic_load_8(&pStatus->stopped);
43✔
2445
  if (stopped) {
43!
2446
    mstsWarn("stream %s is already stopped %d, ignore task deploy", pStatus->streamName, stopped);
×
2447
    return TSDB_CODE_SUCCESS;
×
2448
  }
2449

2450
  code = mndAcquireStream(pCtx->pMnode, pStatus->streamName, &pStream);
43✔
2451
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
43!
2452
    mstsWarn("stream %s no longer exists, ignore task deploy", pStatus->streamName);
×
2453
    return TSDB_CODE_SUCCESS;
×
2454
  }
2455

2456
  TAOS_CHECK_EXIT(code);
43!
2457

2458
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
43✔
2459
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
43✔
2460
  if (userStopped || userDropped) {
43!
2461
    mstsWarn("stream %s is stopped %d or removing %d, ignore task deploy", pStatus->streamName, userStopped, userDropped);
×
2462
    goto _exit;
×
2463
  }
2464

2465
  switch (pAction->type) {
43!
2466
    case STREAM_READER_TASK:
41✔
2467
      TAOS_CHECK_EXIT(msmReLaunchReaderTask(pStream, pAction, pStatus));
41!
2468
      break;
41✔
2469
/*
2470
    case STREAM_TRIGGER_TASK:
2471
      TAOS_CHECK_EXIT(msmReLaunchTriggerTask(pCtx, pStream, pAction, pStatus));
2472
      break;
2473
*/
2474
    case STREAM_RUNNER_TASK:
2✔
2475
      if (pAction->multiRunner) {
2!
2476
        TAOS_CHECK_EXIT(msmReLaunchRunnerDeploy(pCtx, pStream, pAction, pStatus));
2!
2477
      } else {
2478
        mstsError("runner TASK:%" PRId64 " requires relaunch", pAction->id.taskId);
×
2479
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
2480
      }
2481
      break;
2✔
2482
    default:
×
2483
      mstsError("TASK:%" PRId64 " invalid task type:%d", pAction->id.taskId, pAction->type);
×
2484
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2485
      break;
×
2486
  }
2487

2488
_exit:
43✔
2489

2490
  if (pStream) {
43!
2491
    mndReleaseStream(pCtx->pMnode, pStream);
43✔
2492
  }
2493

2494
  if (code) {
43!
2495
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2496
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2497
  }
2498

2499
  return code;
43✔
2500
}
2501

2502
static int32_t msmTDRemoveStream(int64_t streamId) {
×
2503
  void* pIter = NULL;
×
2504
  
2505
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
×
2506
    while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
×
2507
      SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
×
2508
      int32_t taskNum = taosArrayGetSize(pVg->taskList);
×
2509
      if (atomic_load_32(&pVg->deployed) == taskNum) {
×
2510
        continue;
×
2511
      }
2512
      
2513
      for (int32_t i = 0; i < taskNum; ++i) {
×
2514
        SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
×
2515
        if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2516
          pExt->deployed = true;
×
2517
        }
2518
      }
2519
    }
2520
  }
2521

2522
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
×
2523
    while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
×
2524
      SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
×
2525
      int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
×
2526
      if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
×
2527
        for (int32_t i = 0; i < taskNum; ++i) {
×
2528
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
×
2529
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2530
            pExt->deployed = true;
×
2531
          }
2532
        }
2533
      }
2534

2535
      taskNum = taosArrayGetSize(pSnode->runnerList);
×
2536
      if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
×
2537
        for (int32_t i = 0; i < taskNum; ++i) {
×
2538
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
×
2539
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2540
            pExt->deployed = true;
×
2541
          }
2542
        }
2543
      }
2544
    }
2545
  }
2546

2547
  return TSDB_CODE_SUCCESS;
×
2548
}
2549

2550
static int32_t msmRemoveStreamFromMaps(SMnode* pMnode, int64_t streamId) {
9✔
2551
  int32_t code = TSDB_CODE_SUCCESS;
9✔
2552
  int32_t lino = 0;
9✔
2553

2554
  mstsInfo("start to remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
9!
2555

2556
  TAOS_CHECK_EXIT(msmSTRemoveStream(streamId, true));
9!
2557

2558
_exit:
9✔
2559

2560
  if (code) {
9!
2561
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2562
  } else {
2563
    mstsInfo("end remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
9!
2564
  }
2565

2566
  return code;
9✔
2567
}
2568

2569
void msmUndeployStream(SMnode* pMnode, int64_t streamId, char* streamName) {
33✔
2570
  int32_t code = TSDB_CODE_SUCCESS;
33✔
2571
  int32_t lino = 0;
33✔
2572

2573
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
33✔
2574
  if (0 == active || MND_STM_STATE_NORMAL != state) {
33!
2575
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2576
    return;
×
2577
  }
2578

2579
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
33✔
2580
  if (NULL == pStream) {
33✔
2581
    mstsInfo("stream %s already not in streamMap", streamName);
2!
2582
    goto _exit;
2✔
2583
  }
2584

2585
  atomic_store_8(&pStream->stopped, 2);
31✔
2586

2587
  mstsInfo("set stream %s stopped by user", streamName);
31!
2588

2589
_exit:
×
2590

2591
  taosHashRelease(mStreamMgmt.streamMap, pStream);
33✔
2592

2593
  if (code) {
33!
2594
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2595
  }
2596

2597
  return;
33✔
2598
}
2599

2600
int32_t msmRecalcStream(SMnode* pMnode, int64_t streamId, STimeWindow* timeRange) {
5✔
2601
  int32_t code = TSDB_CODE_SUCCESS;
5✔
2602
  int32_t lino = 0;
5✔
2603

2604
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
5✔
2605
  if (0 == active || MND_STM_STATE_NORMAL != state) {
5!
2606
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2607
    return TSDB_CODE_MND_STREAM_NOT_AVAILABLE;
×
2608
  }
2609

2610
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
5✔
2611
  if (NULL == pStream || !STREAM_IS_RUNNING(pStream)) {
5!
2612
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
2613
    mstsInfo("stream still not in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2614
    goto _exit;
×
2615
  }
2616

2617
  TAOS_CHECK_EXIT(mstAppendNewRecalcRange(streamId, pStream, timeRange));
5!
2618

2619
_exit:
5✔
2620

2621
  taosHashRelease(mStreamMgmt.streamMap, pStream);
5✔
2622

2623
  if (code) {
5!
2624
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2625
  }
2626

2627
  return code;
5✔
2628
}
2629

2630
static void msmHandleStreamActions(SStmGrpCtx* pCtx) {
82✔
2631
  int32_t code = TSDB_CODE_SUCCESS;
82✔
2632
  int32_t lino = 0;
82✔
2633
  SStmQNode* pQNode = NULL;
82✔
2634

2635
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
412✔
2636
    switch (pQNode->type) {
330!
2637
      case STREAM_ACT_DEPLOY:
330✔
2638
        if (pQNode->streamAct) {
330✔
2639
          mstDebug("start to handle stream deploy action");
287✔
2640
          TAOS_CHECK_EXIT(msmLaunchStreamDeployAction(pCtx, &pQNode->action.stream));
287!
2641
        } else {
2642
          mstDebug("start to handle task deploy action");
43✔
2643
          TAOS_CHECK_EXIT(msmLaunchTaskDeployAction(pCtx, &pQNode->action.task));
43!
2644
        }
2645
        break;
330✔
2646
      default:
×
2647
        break;
×
2648
    }
2649
  }
2650

2651
_exit:
82✔
2652

2653
  if (code) {
82!
2654
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2655
  }
2656
}
82✔
2657

2658
void msmStopAllStreamsByGrant(int32_t errCode) {
×
2659
  SStmStatus* pStatus = NULL;
×
2660
  void* pIter = NULL;
×
2661
  int64_t streamId = 0;
×
2662
  
2663
  while (true) {
2664
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
2665
    if (NULL == pIter) {
×
2666
      break;
×
2667
    }
2668

2669
    pStatus = (SStmStatus*)pIter;
×
2670

2671
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
2672
    atomic_store_8(&pStatus->stopped, 4);
×
2673

2674
    mstsInfo("set stream stopped since %s", tstrerror(errCode));
×
2675
  }
2676
}
×
2677

2678
int32_t msmHandleGrantExpired(SMnode *pMnode, int32_t errCode) {
×
2679
  mstInfo("stream grant expired");
×
2680

2681
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
×
2682
    mstWarn("mnode stream is NOT active, ignore handling");
×
2683
    return errCode;
×
2684
  }
2685

2686
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
2687

2688
  msmStopAllStreamsByGrant(errCode);
×
2689

2690
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
2691
  
2692
  return errCode;
×
2693
}
2694

2695
static int32_t msmInitStreamDeploy(SStmStreamDeploy* pStream, SStmTaskDeploy* pDeploy) {
1,786✔
2696
  int32_t code = TSDB_CODE_SUCCESS;
1,786✔
2697
  int32_t lino = 0;
1,786✔
2698
  int64_t streamId = pDeploy->task.streamId;
1,786✔
2699
  
2700
  switch (pDeploy->task.type) {
1,786!
2701
    case STREAM_READER_TASK:
580✔
2702
      if (NULL == pStream->readerTasks) {
580✔
2703
        pStream->streamId = streamId;
310✔
2704
        pStream->readerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
310✔
2705
        TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
310!
2706
      }
2707
      
2708
      TSDB_CHECK_NULL(taosArrayPush(pStream->readerTasks, pDeploy), code, lino, _exit, terrno);
1,160!
2709
      break;
580✔
2710
    case STREAM_TRIGGER_TASK:
285✔
2711
      pStream->streamId = streamId;
285✔
2712
      pStream->triggerTask = taosMemoryMalloc(sizeof(SStmTaskDeploy));
285!
2713
      TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
285!
2714
      memcpy(pStream->triggerTask, pDeploy, sizeof(SStmTaskDeploy));
285✔
2715
      break;
285✔
2716
    case STREAM_RUNNER_TASK:
921✔
2717
      if (NULL == pStream->runnerTasks) {
921✔
2718
        pStream->streamId = streamId;
318✔
2719
        pStream->runnerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
318✔
2720
        TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
318!
2721
      }      
2722
      TSDB_CHECK_NULL(taosArrayPush(pStream->runnerTasks, pDeploy), code, lino, _exit, terrno);
1,842!
2723
      break;
921✔
2724
    default:
×
2725
      break;
×
2726
  }
2727

2728
_exit:
1,786✔
2729

2730
  if (code) {
1,786!
2731
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2732
  }
2733

2734
  return code;
1,786✔
2735
}
2736

2737
static int32_t msmGrpAddDeployTask(SHashObj* pHash, SStmTaskDeploy* pDeploy) {
1,786✔
2738
  int32_t code = TSDB_CODE_SUCCESS;
1,786✔
2739
  int32_t lino = 0;
1,786✔
2740
  int64_t streamId = pDeploy->task.streamId;
1,786✔
2741
  SStreamTask* pTask = &pDeploy->task;
1,786✔
2742
  SStmStreamDeploy streamDeploy = {0};
1,786✔
2743
  SStmStreamDeploy* pStream = NULL;
1,786✔
2744
   
2745
  while (true) {
2746
    pStream = taosHashAcquire(pHash, &streamId, sizeof(streamId));
1,786✔
2747
    if (NULL == pStream) {
1,786✔
2748
      TAOS_CHECK_EXIT(msmInitStreamDeploy(&streamDeploy, pDeploy));
358!
2749
      code = taosHashPut(pHash, &streamId, sizeof(streamId), &streamDeploy, sizeof(streamDeploy));
358✔
2750
      if (TSDB_CODE_SUCCESS == code) {
358!
2751
        goto _exit;
358✔
2752
      }
2753

2754
      if (TSDB_CODE_DUP_KEY != code) {
×
2755
        goto _exit;
×
2756
      }    
2757

2758
      tFreeSStmStreamDeploy(&streamDeploy);
×
2759
      continue;
×
2760
    }
2761

2762
    TAOS_CHECK_EXIT(msmInitStreamDeploy(pStream, pDeploy));
1,428!
2763
    
2764
    break;
1,428✔
2765
  }
2766
  
2767
_exit:
1,786✔
2768

2769
  taosHashRelease(pHash, pStream);
1,786✔
2770

2771
  if (code) {
1,786!
2772
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2773
  } else {
2774
    msttDebug("task added to GRP deployMap, taskIdx:%d", pTask->taskIdx);
1,786✔
2775
  }
2776

2777
  return code;
1,786✔
2778
}
2779

2780

2781
int32_t msmGrpAddDeployTasks(SHashObj* pHash, SArray* pTasks, int32_t* deployed) {
389✔
2782
  int32_t code = TSDB_CODE_SUCCESS;
389✔
2783
  int32_t lino = 0;
389✔
2784
  int32_t taskNum = taosArrayGetSize(pTasks);
389✔
2785

2786
  for (int32_t i = 0; i < taskNum; ++i) {
2,181✔
2787
    SStmTaskToDeployExt* pExt = taosArrayGet(pTasks, i);
1,792✔
2788
    if (pExt->deployed) {
1,792✔
2789
      continue;
6✔
2790
    }
2791

2792
    TAOS_CHECK_EXIT(msmGrpAddDeployTask(pHash, &pExt->deploy));
1,786!
2793
    pExt->deployed = true;
1,786✔
2794

2795
    (void)atomic_add_fetch_32(deployed, 1);
1,786✔
2796
  }
2797

2798
_exit:
389✔
2799

2800
  if (code) {
389!
2801
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2802
  }
2803

2804
  return code;
389✔
2805
}
2806

2807
int32_t msmGrpAddDeployVgTasks(SStmGrpCtx* pCtx) {
161✔
2808
  int32_t code = TSDB_CODE_SUCCESS;
161✔
2809
  int32_t lino = 0;
161✔
2810
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
161✔
2811
  SStmVgTasksToDeploy* pVg = NULL;
161✔
2812
  //int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pCtx->pReq->streamGId);
2813

2814
  mstDebug("start to add stream vgroup tasks deploy");
161✔
2815
  
2816
  for (int32_t i = 0; i < vgNum; ++i) {
911✔
2817
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
750✔
2818

2819
    msmUpdateVgroupUpTs(pCtx, *vgId);
750✔
2820

2821
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
750✔
2822
    if (NULL == pVg) {
750✔
2823
      continue;
533✔
2824
    }
2825

2826
    if (taosRTryLockLatch(&pVg->lock)) {
217!
2827
      continue;
×
2828
    }
2829
    
2830
    if (atomic_load_32(&pVg->deployed) == taosArrayGetSize(pVg->taskList)) {
217!
2831
      taosRUnLockLatch(&pVg->lock);
×
2832
      continue;
×
2833
    }
2834
    
2835
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pVg->taskList, &pVg->deployed));
217!
2836
    taosRUnLockLatch(&pVg->lock);
217✔
2837
  }
2838

2839
_exit:
161✔
2840

2841
  if (code) {
161!
2842
    if (pVg) {
×
2843
      taosRUnLockLatch(&pVg->lock);
×
2844
    }
2845

2846
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2847
  }
2848

2849
  return code;
161✔
2850
}
2851

2852
int32_t msmGrpAddDeploySnodeTasks(SStmGrpCtx* pCtx) {
145✔
2853
  int32_t code = TSDB_CODE_SUCCESS;
145✔
2854
  int32_t lino = 0;
145✔
2855
  SStmSnodeTasksDeploy* pSnode = NULL;
145✔
2856
  SStreamHbMsg* pReq = pCtx->pReq;
145✔
2857

2858
  mstDebug("start to add stream snode tasks deploy");
145✔
2859
  
2860
  pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pReq->snodeId, sizeof(pReq->snodeId));
145✔
2861
  if (NULL == pSnode) {
145✔
2862
    return TSDB_CODE_SUCCESS;
53✔
2863
  }
2864

2865
  (void)mstWaitLock(&pSnode->lock, false);
92✔
2866
  
2867
  if (atomic_load_32(&pSnode->triggerDeployed) < taosArrayGetSize(pSnode->triggerList)) {
92✔
2868
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->triggerList, &pSnode->triggerDeployed));
80!
2869
  }
2870

2871
  if (atomic_load_32(&pSnode->runnerDeployed) < taosArrayGetSize(pSnode->runnerList)) {
92!
2872
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->runnerList, &pSnode->runnerDeployed));
92!
2873
  }
2874
  
2875
  taosWUnLockLatch(&pSnode->lock);
92✔
2876

2877
_exit:
92✔
2878

2879
  if (code) {
92!
2880
    if (pSnode) {
×
2881
      taosWUnLockLatch(&pSnode->lock);
×
2882
    }
2883

2884
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2885
  }
2886

2887
  return code;
92✔
2888
}
2889

2890
int32_t msmUpdateStreamLastActTs(int64_t streamId, int64_t currTs) {
874✔
2891
  int32_t code = TSDB_CODE_SUCCESS;
874✔
2892
  int32_t lino = 0;
874✔
2893
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
874✔
2894
  if (NULL == pStatus) {
874!
2895
    mstsWarn("stream already not exists in streamMap, mapSize:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2896
    return TSDB_CODE_SUCCESS;
×
2897
  }
2898
  
2899
  pStatus->lastActionTs = currTs;
874✔
2900

2901
_exit:
874✔
2902

2903
  if (code) {
874!
2904
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2905
  }
2906

2907
  return code;
874✔
2908
}
2909

2910
int32_t msmRspAddStreamsDeploy(SStmGrpCtx* pCtx) {
107✔
2911
  int32_t code = TSDB_CODE_SUCCESS;
107✔
2912
  int32_t lino = 0;
107✔
2913
  int32_t streamNum = taosHashGetSize(pCtx->deployStm);
107✔
2914
  void* pIter = NULL;
107✔
2915

2916
  mstDebug("start to add group %d deploy streams, streamNum:%d", pCtx->pReq->streamGId, taosHashGetSize(pCtx->deployStm));
107✔
2917
  
2918
  pCtx->pRsp->deploy.streamList = taosArrayInit(streamNum, sizeof(SStmStreamDeploy));
107✔
2919
  TSDB_CHECK_NULL(pCtx->pRsp->deploy.streamList, code, lino, _exit, terrno);
107!
2920

2921
  while (1) {
358✔
2922
    pIter = taosHashIterate(pCtx->deployStm, pIter);
465✔
2923
    if (pIter == NULL) {
465✔
2924
      break;
107✔
2925
    }
2926
    
2927
    SStmStreamDeploy *pDeploy = (SStmStreamDeploy *)pIter;
358✔
2928
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->deploy.streamList, pDeploy), code, lino, _exit, terrno);
716!
2929

2930
    int64_t streamId = pDeploy->streamId;
358✔
2931
    mstsDebug("stream DEPLOY added to dnode %d hb rsp, readerTasks:%zu, triggerTask:%d, runnerTasks:%zu", 
358✔
2932
        pCtx->pReq->dnodeId, taosArrayGetSize(pDeploy->readerTasks), pDeploy->triggerTask ? 1 : 0, taosArrayGetSize(pDeploy->runnerTasks));
2933

2934
    mstClearSStmStreamDeploy(pDeploy);
358✔
2935
    
2936
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(pDeploy->streamId, pCtx->currTs));
358!
2937
  }
2938
  
2939
_exit:
107✔
2940

2941
  if (pIter) {
107!
2942
    taosHashCancelIterate(pCtx->deployStm, pIter);
×
2943
  }
2944

2945
  if (code) {
107!
2946
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2947
  }
2948
  
2949
  return code;
107✔
2950
}
2951

2952
void msmCleanDeployedVgTasks(SArray* pVgLeaders) {
161✔
2953
  int32_t code = TSDB_CODE_SUCCESS;
161✔
2954
  int32_t lino = 0;
161✔
2955
  int32_t vgNum = taosArrayGetSize(pVgLeaders);
161✔
2956
  SStmVgTasksToDeploy* pVg = NULL;
161✔
2957
  
2958
  for (int32_t i = 0; i < vgNum; ++i) {
911✔
2959
    int32_t* vgId = taosArrayGet(pVgLeaders, i);
750✔
2960
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
750✔
2961
    if (NULL == pVg) {
750✔
2962
      continue;
533✔
2963
    }
2964

2965
    if (taosWTryLockLatch(&pVg->lock)) {
217!
2966
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2967
      continue;
×
2968
    }
2969
    
2970
    if (atomic_load_32(&pVg->deployed) <= 0) {
217!
2971
      taosWUnLockLatch(&pVg->lock);
×
2972
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2973
      continue;
×
2974
    }
2975

2976
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
217✔
2977
    if (atomic_load_32(&pVg->deployed) == taskNum) {
217✔
2978
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, taskNum);
216✔
2979
      taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
216✔
2980
      pVg->taskList = NULL;
216✔
2981
      TAOS_UNUSED(taosHashRemove(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId)));
216✔
2982
      taosWUnLockLatch(&pVg->lock);
216✔
2983
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
216✔
2984
      continue;
216✔
2985
    }
2986

2987
    for (int32_t m = taskNum - 1; m >= 0; --m) {
13✔
2988
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, m);
12✔
2989
      if (!pExt->deployed) {
12!
2990
        continue;
×
2991
      }
2992

2993
      mstDestroySStmTaskToDeployExt(pExt);
12✔
2994

2995
      taosArrayRemove(pVg->taskList, m);
12✔
2996
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
12✔
2997
    }
2998
    atomic_store_32(&pVg->deployed, 0);
1✔
2999
    taosWUnLockLatch(&pVg->lock);
1✔
3000
    taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
1✔
3001
  }
3002

3003
_exit:
161✔
3004

3005
  if (code) {
161!
3006
    if (pVg) {
×
3007
      taosWUnLockLatch(&pVg->lock);
×
3008
    }
3009

3010
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3011
  }
3012
}
161✔
3013

3014
void msmCleanDeployedSnodeTasks (int32_t snodeId) {
166✔
3015
  if (!GOT_SNODE(snodeId)) {
166✔
3016
    return;
21✔
3017
  }
3018
  
3019
  int32_t code = TSDB_CODE_SUCCESS;
145✔
3020
  SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId));
145✔
3021
  if (NULL == pSnode) {
145✔
3022
    return;
53✔
3023
  }
3024

3025
  if (taosWTryLockLatch(&pSnode->lock)) {
92!
3026
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3027
    return;
×
3028
  }
3029

3030
  int32_t triggerNum = taosArrayGetSize(pSnode->triggerList);
92✔
3031
  int32_t runnerNum = taosArrayGetSize(pSnode->runnerList);
92✔
3032
  
3033
  if (atomic_load_32(&pSnode->triggerDeployed) <= 0 && atomic_load_32(&pSnode->runnerDeployed) <= 0) {
92!
3034
    taosWUnLockLatch(&pSnode->lock);
×
3035
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3036
    return;
×
3037
  }
3038

3039
  if (atomic_load_32(&pSnode->triggerDeployed) == triggerNum) {
92!
3040
    (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, triggerNum);
92✔
3041
    taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
92✔
3042
    pSnode->triggerList = NULL;
92✔
3043
  }
3044

3045
  if (atomic_load_32(&pSnode->runnerDeployed) == runnerNum) {
92!
3046
    (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, runnerNum);
92✔
3047
    taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
92✔
3048
    pSnode->runnerList = NULL;
92✔
3049
  }
3050

3051
  if (NULL == pSnode->triggerList && NULL == pSnode->runnerList) {
92!
3052
    TAOS_UNUSED(taosHashRemove(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId)));
92✔
3053
    taosWUnLockLatch(&pSnode->lock);
92✔
3054
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
92✔
3055
    return;
92✔
3056
  }
3057

3058
  if (atomic_load_32(&pSnode->triggerDeployed) > 0 && pSnode->triggerList) {
×
3059
    for (int32_t m = triggerNum - 1; m >= 0; --m) {
×
3060
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, m);
×
3061
      if (!pExt->deployed) {
×
3062
        continue;
×
3063
      }
3064

3065
      mstDestroySStmTaskToDeployExt(pExt);
×
3066
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
×
3067
      taosArrayRemove(pSnode->triggerList, m);
×
3068
    }
3069
    
3070
    pSnode->triggerDeployed = 0;
×
3071
  }
3072

3073
  if (atomic_load_32(&pSnode->runnerDeployed) > 0 && pSnode->runnerList) {
×
3074
    for (int32_t m = runnerNum - 1; m >= 0; --m) {
×
3075
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, m);
×
3076
      if (!pExt->deployed) {
×
3077
        continue;
×
3078
      }
3079

3080
      mstDestroySStmTaskToDeployExt(pExt);
×
3081
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
×
3082
      taosArrayRemove(pSnode->runnerList, m);
×
3083
    }
3084
    
3085
    pSnode->runnerDeployed = 0;
×
3086
  }
3087
  
3088
  taosWUnLockLatch(&pSnode->lock);
×
3089
  taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3090
}
3091

3092
void msmClearStreamToDeployMaps(SStreamHbMsg* pHb) {
34,163✔
3093
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
34,163✔
3094
    msmCleanDeployedVgTasks(pHb->pVgLeaders);
161✔
3095
  }
3096

3097
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
34,163✔
3098
    msmCleanDeployedSnodeTasks(pHb->snodeId);
166✔
3099
  }
3100
}
34,163✔
3101

3102
void msmCleanStreamGrpCtx(SStreamHbMsg* pHb) {
34,163✔
3103
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
34,163✔
3104
  if (mStreamMgmt.tCtx) {
34,163✔
3105
    taosHashClear(mStreamMgmt.tCtx[tidx].actionStm[pHb->streamGId]);
33,459✔
3106
    taosHashClear(mStreamMgmt.tCtx[tidx].deployStm[pHb->streamGId]);
33,459✔
3107
  }
3108
}
34,163✔
3109

3110
int32_t msmGrpAddActionStart(SHashObj* pHash, int64_t streamId, SStmTaskId* pId) {
336✔
3111
  int32_t code = TSDB_CODE_SUCCESS;
336✔
3112
  int32_t lino = 0;
336✔
3113
  int32_t action = STREAM_ACT_START;
336✔
3114
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
336✔
3115
  if (pAction) {
336!
3116
    pAction->actions |= action;
×
3117
    pAction->start.triggerId = *pId;
×
3118
    mstsDebug("stream append START action, actions:%x", pAction->actions);
×
3119
  } else {
3120
    SStmAction newAction = {0};
336✔
3121
    newAction.actions = action;
336✔
3122
    newAction.start.triggerId = *pId;
336✔
3123
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
336!
3124
    mstsDebug("stream add START action, actions:%x", newAction.actions);
336✔
3125
  }
3126

3127
_exit:
336✔
3128

3129
  if (code) {
336!
3130
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3131
  }
3132

3133
  return code;
336✔
3134
}
3135

3136
int32_t msmGrpAddActionUpdateTrigger(SHashObj* pHash, int64_t streamId) {
×
3137
  int32_t code = TSDB_CODE_SUCCESS;
×
3138
  int32_t lino = 0;
×
3139
  int32_t action = STREAM_ACT_UPDATE_TRIGGER;
×
3140
  
3141
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
×
3142
  if (pAction) {
×
3143
    pAction->actions |= action;
×
3144
    mstsDebug("stream append UPDATE_TRIGGER action, actions:%x", pAction->actions);
×
3145
  } else {
3146
    SStmAction newAction = {0};
×
3147
    newAction.actions = action;
×
3148
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3149
    mstsDebug("stream add UPDATE_TRIGGER action, actions:%x", newAction.actions);
×
3150
  }
3151

3152
_exit:
×
3153

3154
  if (code) {
×
3155
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3156
  }
3157

3158
  return code;
×
3159
}
3160

3161

3162

3163
int32_t msmGrpAddActionUndeploy(SStmGrpCtx* pCtx, int64_t streamId, SStreamTask* pTask) {
180✔
3164
  int32_t code = TSDB_CODE_SUCCESS;
180✔
3165
  int32_t lino = 0;
180✔
3166
  int32_t action = STREAM_ACT_UNDEPLOY;
180✔
3167
  bool    dropped = false;
180✔
3168

3169
  TAOS_CHECK_EXIT(mstIsStreamDropped(pCtx->pMnode, streamId, &dropped));
180!
3170
  mstsDebug("stream dropped: %d", dropped);
180✔
3171
  
3172
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
180✔
3173
  if (pAction) {
180✔
3174
    pAction->actions |= action;
109✔
3175
    if (NULL == pAction->undeploy.taskList) {
109✔
3176
      pAction->undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
2✔
3177
      TSDB_CHECK_NULL(pAction->undeploy.taskList, code, lino, _exit, terrno);
2!
3178
    }
3179

3180
    TSDB_CHECK_NULL(taosArrayPush(pAction->undeploy.taskList, &pTask), code, lino, _exit, terrno);
218!
3181
    if (pAction->undeploy.doCheckpoint) {
109✔
3182
      pAction->undeploy.doCheckpoint = dropped ? false : true;
74✔
3183
    }
3184
    if (!pAction->undeploy.doCleanup) {
109✔
3185
      pAction->undeploy.doCleanup = dropped ? true : false;
76✔
3186
    }
3187
    
3188
    msttDebug("task append UNDEPLOY action[%d,%d], actions:%x", pAction->undeploy.doCheckpoint, pAction->undeploy.doCleanup, pAction->actions);
109✔
3189
  } else {
3190
    SStmAction newAction = {0};
71✔
3191
    newAction.actions = action;
71✔
3192
    newAction.undeploy.doCheckpoint = dropped ? false : true;
71✔
3193
    newAction.undeploy.doCleanup = dropped ? true : false;
71✔
3194
    newAction.undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
71✔
3195
    TSDB_CHECK_NULL(newAction.undeploy.taskList, code, lino, _exit, terrno);
71!
3196
    TSDB_CHECK_NULL(taosArrayPush(newAction.undeploy.taskList, &pTask), code, lino, _exit, terrno);
142!
3197
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
71!
3198
    
3199
    msttDebug("task add UNDEPLOY action[%d,%d]", newAction.undeploy.doCheckpoint, newAction.undeploy.doCleanup);
71✔
3200
  }
3201

3202
_exit:
180✔
3203

3204
  if (code) {
180!
3205
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3206
  }
3207

3208
  return code;
180✔
3209
}
3210

3211
int32_t msmGrpAddActionRecalc(SStmGrpCtx* pCtx, int64_t streamId, SArray* recalcList) {
4✔
3212
  int32_t code = TSDB_CODE_SUCCESS;
4✔
3213
  int32_t lino = 0;
4✔
3214
  int32_t action = STREAM_ACT_RECALC;
4✔
3215
  SStmAction newAction = {0};
4✔
3216
  
3217
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
4✔
3218
  if (pAction) {
4!
3219
    pAction->actions |= action;
×
3220
    pAction->recalc.recalcList = recalcList;
×
3221

3222
    mstsDebug("stream append recalc action, listSize:%d, actions:%x", (int32_t)taosArrayGetSize(recalcList), pAction->actions);
×
3223
  } else {
3224
    newAction.actions = action;
4✔
3225
    newAction.recalc.recalcList = recalcList;
4✔
3226
    
3227
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
4!
3228
    
3229
    mstsDebug("stream add recalc action, listSize:%d", (int32_t)taosArrayGetSize(recalcList));
4!
3230
  }
3231

3232
_exit:
×
3233

3234
  if (code) {
4!
3235
    mstDestroySStmAction(&newAction);
×
3236
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3237
  }
3238

3239
  return code;
4✔
3240
}
3241

3242
bool msmCheckStreamStartCond(int64_t streamId, int32_t snodeId) {
397✔
3243
  SStmStatus* pStream = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
397✔
3244
  if (NULL == pStream) {
397!
3245
    return false;
×
3246
  }
3247

3248
  if (pStream->triggerTask->id.nodeId != snodeId || STREAM_STATUS_INIT != pStream->triggerTask->status) {
397!
3249
    return false;
×
3250
  }
3251

3252
  int32_t readerNum = taosArrayGetSize(pStream->trigReaders);
397✔
3253
  for (int32_t i = 0; i < readerNum; ++i) {
893✔
3254
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigReaders, i);
496✔
3255
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
496!
3256
      return false;
×
3257
    }
3258
  }
3259

3260
  readerNum = taosArrayGetSize(pStream->trigOReaders);
397✔
3261
  for (int32_t i = 0; i < readerNum; ++i) {
404✔
3262
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigOReaders, i);
7✔
3263
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
7!
3264
      return false;
×
3265
    }
3266
  }
3267

3268
  readerNum = taosArrayGetSize(pStream->calcReaders);
397✔
3269
  for (int32_t i = 0; i < readerNum; ++i) {
624✔
3270
    SStmTaskStatus* pStatus = taosArrayGet(pStream->calcReaders, i);
227✔
3271
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
227!
3272
      return false;
×
3273
    }
3274
  }
3275

3276
  for (int32_t i = 0; i < pStream->runnerDeploys; ++i) {
1,402✔
3277
    int32_t runnerNum = taosArrayGetSize(pStream->runners[i]);
1,066✔
3278
    for (int32_t m = 0; m < runnerNum; ++m) {
2,137✔
3279
      SStmTaskStatus* pStatus = taosArrayGet(pStream->runners[i], m);
1,132✔
3280
      if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
1,132!
3281
        return false;
61✔
3282
      }
3283
    }
3284
  }
3285
  
3286
  return true;
336✔
3287
}
3288

3289

3290
void msmHandleTaskAbnormalStatus(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pMsg, SStmTaskStatus* pTaskStatus) {
9,104✔
3291
  int32_t code = TSDB_CODE_SUCCESS;
9,104✔
3292
  int32_t lino = 0;
9,104✔
3293
  int32_t action = 0;
9,104✔
3294
  int64_t streamId = pMsg->streamId;
9,104✔
3295
  SStreamTask* pTask = (SStreamTask*)pMsg;
9,104✔
3296
  int8_t  stopped = 0;
9,104✔
3297

3298
  msttDebug("start to handle task abnormal status %d", pTask->status);
9,104✔
3299
  
3300
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
9,104✔
3301
  if (NULL == pStatus) {
9,104!
3302
    msttInfo("stream no longer exists in streamMap, try to undeploy current task, idx:%d", pMsg->taskIdx);
×
3303
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3304
    return;
8,688✔
3305
  }
3306

3307
  stopped = atomic_load_8(&pStatus->stopped);
9,104✔
3308
  if (stopped) {
9,104!
3309
    msttInfo("stream stopped %d, try to undeploy current task, idx:%d", stopped, pMsg->taskIdx);
×
3310
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3311
    return;
×
3312
  }
3313
  
3314
  switch (pMsg->status) {
9,104!
3315
    case STREAM_STATUS_INIT:      
9,085✔
3316
      if (STREAM_TRIGGER_TASK != pMsg->type) {
9,085✔
3317
        msttTrace("task status is INIT and not trigger task, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
8,456!
3318
        return;
8,456✔
3319
      }
3320
      
3321
      if (INT64_MIN == pStatus->lastActionTs) {
629!
3322
        msttDebug("task still not deployed, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
×
3323
        return;
×
3324
      }
3325
      
3326
      if ((pCtx->currTs - pStatus->lastActionTs) < STREAM_ACT_MIN_DELAY_MSEC) {
629✔
3327
        msttDebug("task wait not enough between actions, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
232✔
3328
        return;
232✔
3329
      }
3330

3331
      if (STREAM_IS_RUNNING(pStatus)) {
397!
3332
        msttDebug("stream already running, ignore status: %s", gStreamStatusStr[pTask->status]);
×
3333
      } else if (GOT_SNODE(pCtx->pReq->snodeId) && msmCheckStreamStartCond(streamId, pCtx->pReq->snodeId)) {
397!
3334
        TAOS_CHECK_EXIT(msmGrpAddActionStart(pCtx->actionStm, streamId, &pStatus->triggerTask->id));
336!
3335
      }
3336
      break;
397✔
3337
    case STREAM_STATUS_FAILED:
19✔
3338
      //STREAMTODO ADD ERRCODE HANDLE
3339
      msttInfo("task failed with error:%s, try to undeploy current task, idx:%d", tstrerror(pMsg->errorCode), pMsg->taskIdx);
19!
3340
      TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
19!
3341
      break;
19✔
3342
    default:
×
3343
      break;
×
3344
  }
3345

3346
_exit:
416✔
3347

3348
  if (code) {
416!
3349
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
3350
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3351
  }
3352
}
3353

3354
void msmHandleStatusUpdateErr(SStmGrpCtx* pCtx, EStmErrType err, SStmTaskStatusMsg* pStatus) {
161✔
3355
  int32_t code = TSDB_CODE_SUCCESS;
161✔
3356
  int32_t lino = 0;
161✔
3357
  SStreamTask* pTask = (SStreamTask*)pStatus;
161✔
3358
  int64_t streamId = pStatus->streamId;
161✔
3359

3360
  msttInfo("start to handle task status update exception, type: %d", err);
161!
3361
  
3362
  // STREAMTODO
3363

3364
  if (STM_ERR_TASK_NOT_EXISTS == err || STM_ERR_STREAM_STOPPED == err) {
161!
3365
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
161!
3366
  }
3367

3368
_exit:
161✔
3369

3370
  if (code) {
161!
3371
    // IGNORE STOP STREAM BY ERROR  
3372
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3373
  }
3374
}
161✔
3375

3376
void msmChkHandleTriggerOperations(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask, SStmTaskStatus* pStatus) {
3,446✔
3377
  int32_t code = TSDB_CODE_SUCCESS;
3,446✔
3378
  int32_t lino = 0;
3,446✔
3379
  SStmStatus* pStream = (SStmStatus*)pStatus->pStream;
3,446✔
3380

3381
  if (1 == atomic_val_compare_exchange_8(&pStream->triggerNeedUpdate, 1, 0)) {
3,446!
3382
    TAOS_CHECK_EXIT(msmGrpAddActionUpdateTrigger(pCtx->actionStm, pTask->streamId));
×
3383
  }
3384
  
3385
  SArray* userRecalcList = NULL;
3,446✔
3386
  if (atomic_load_ptr(&pStream->userRecalcList)) {
3,446✔
3387
    taosWLockLatch(&pStream->userRecalcLock);
4✔
3388
    if (pStream->userRecalcList) {
4!
3389
      userRecalcList = pStream->userRecalcList;
4✔
3390
      pStream->userRecalcList = NULL;
4✔
3391
    }
3392
    taosWUnLockLatch(&pStream->userRecalcLock);
4✔
3393
    
3394
    if (userRecalcList) {
4!
3395
      TAOS_CHECK_EXIT(msmGrpAddActionRecalc(pCtx, pTask->streamId, userRecalcList));
4!
3396
    }
3397
  }
3398

3399
  if (pTask->detailStatus >= 0 && pCtx->pReq->pTriggerStatus) {
3,446!
3400
    (void)mstWaitLock(&pStatus->detailStatusLock, false);
1,692✔
3401
    if (NULL == pStatus->detailStatus) {
1,692✔
3402
      pStatus->detailStatus = taosMemoryCalloc(1, sizeof(SSTriggerRuntimeStatus));
281!
3403
      if (NULL == pStatus->detailStatus) {
281!
3404
        taosWUnLockLatch(&pStatus->detailStatusLock);
×
3405
        TSDB_CHECK_NULL(pStatus->detailStatus, code, lino, _exit, terrno);
×
3406
      }
3407
    }
3408
    
3409
    memcpy(pStatus->detailStatus, taosArrayGet(pCtx->pReq->pTriggerStatus, pTask->detailStatus), sizeof(SSTriggerRuntimeStatus));
1,692✔
3410
    taosWUnLockLatch(&pStatus->detailStatusLock);
1,692✔
3411
  }
3412

3413
_exit:
1,754✔
3414

3415
  if (code) {
3,446!
3416
    // IGNORE STOP STREAM BY ERROR
3417
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3418
  }
3419
}
3,446✔
3420

3421
int32_t msmNormalHandleStatusUpdate(SStmGrpCtx* pCtx) {
1,698✔
3422
  int32_t code = TSDB_CODE_SUCCESS;
1,698✔
3423
  int32_t lino = 0;
1,698✔
3424
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
1,698✔
3425

3426
  mstDebug("NORMAL: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
1,698✔
3427

3428
  for (int32_t i = 0; i < num; ++i) {
22,047✔
3429
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
20,349✔
3430
    msttDebug("task status %s got on dnode %d, taskIdx:%d", gStreamStatusStr[pTask->status], pCtx->pReq->dnodeId, pTask->taskIdx);
20,349✔
3431
    
3432
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
20,349✔
3433
    if (NULL == ppStatus) {
20,349✔
3434
      msttWarn("task no longer exists in taskMap, will try to undeploy current task, taskIdx:%d", pTask->taskIdx);
50!
3435
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
50✔
3436
      continue;
50✔
3437
    }
3438

3439
    SStmStatus* pStream = (SStmStatus*)(*ppStatus)->pStream;
20,299✔
3440
    int8_t stopped = atomic_load_8(&pStream->stopped);
20,299✔
3441
    if (stopped) {
20,299✔
3442
      msttWarn("stream already stopped %d, will try to undeploy current task, taskIdx:%d", stopped, pTask->taskIdx);
111!
3443
      msmHandleStatusUpdateErr(pCtx, STM_ERR_STREAM_STOPPED, pTask);
111✔
3444
      continue;
111✔
3445
    }
3446

3447
    if ((pTask->seriousId != (*ppStatus)->id.seriousId) || (pTask->nodeId != (*ppStatus)->id.nodeId)) {
20,188!
3448
      msttInfo("task mismatch with it in taskMap, will try to rm it, current seriousId:%" PRId64 ", nodeId:%d", 
×
3449
          (*ppStatus)->id.seriousId, (*ppStatus)->id.nodeId);
3450
          
3451
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
×
3452
      continue;
×
3453
    }
3454

3455
    if ((*ppStatus)->status != pTask->status) {
20,188✔
3456
      if (STREAM_STATUS_RUNNING == pTask->status) {
2,987✔
3457
        (*ppStatus)->runningStartTs = pCtx->currTs;
1,214✔
3458
      } else if (MST_IS_RUNNER_GETTING_READY(pTask) && STREAM_IS_REDEPLOY_RUNNER((*ppStatus)->flags)) {
1,773!
3459
        if (pStream->triggerTask) {
×
3460
          atomic_store_8(&pStream->triggerNeedUpdate, 1);
×
3461
        }
3462
        
3463
        STREAM_CLR_FLAG((*ppStatus)->flags, STREAM_FLAG_REDEPLOY_RUNNER);
×
3464
      }
3465
    }
3466
    
3467
    (*ppStatus)->errCode = pTask->errorCode;
20,188✔
3468
    (*ppStatus)->status = pTask->status;
20,188✔
3469
    (*ppStatus)->lastUpTs = pCtx->currTs;
20,188✔
3470
    
3471
    if (STREAM_STATUS_RUNNING != pTask->status) {
20,188✔
3472
      msmHandleTaskAbnormalStatus(pCtx, pTask, *ppStatus);
9,104✔
3473
    }
3474
    
3475
    if (STREAM_TRIGGER_TASK == pTask->type) {
20,188✔
3476
      msmChkHandleTriggerOperations(pCtx, pTask, *ppStatus);
3,446✔
3477
    }
3478
  }
3479

3480
_exit:
1,698✔
3481

3482
  if (code) {
1,698!
3483
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3484
  }
3485

3486
  return code;
1,698✔
3487
}
3488

3489
int32_t msmWatchRecordNewTask(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
×
3490
  int32_t code = TSDB_CODE_SUCCESS;
×
3491
  int32_t lino = 0;
×
3492
  int64_t streamId = pTask->streamId;
×
3493
  SStreamObj* pStream = NULL;
×
3494

3495
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3496
  if (NULL == pStatus) {
×
3497
    SStmStatus status = {0};
×
3498
    TAOS_CHECK_EXIT(mndAcquireStreamById(pCtx->pMnode, streamId, &pStream));
×
3499
    TSDB_CHECK_NULL(pStream, code, lino, _exit, TSDB_CODE_MND_STREAM_NOT_EXIST);
×
3500
    if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
×
3501
      mndReleaseStream(pCtx->pMnode, pStream);
×
3502
      msttDebug("virtual table task ignored, status:%s", gStreamStatusStr[pTask->status]);
×
3503
      return code;
×
3504
    }
3505

3506
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &status, pStream, true));
×
3507
    mndReleaseStream(pCtx->pMnode, pStream);
×
3508

3509
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &status, sizeof(status)));
×
3510
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3511
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
×
3512
    msttDebug("stream added to streamMap cause of new task status:%s", gStreamStatusStr[pTask->status]);
×
3513
  }
3514

3515
  SStmTaskStatus* pNewTask = NULL;
×
3516
  switch (pTask->type) {
×
3517
    case STREAM_READER_TASK: {
×
3518
      SArray* pList = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaders : pStatus->calcReaders;
×
3519
      if (NULL == pList) {
×
3520
        mstsError("%sReader list is NULL", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc");
×
3521
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3522
      }
3523
      int32_t readerSize = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaderNum : pStatus->calcReaderNum;
×
3524
      if (taosArrayGetSize(pList) >= readerSize) {
×
3525
        mstsError("%sReader list is already full, size:%d, expSize:%d", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc",
×
3526
            (int32_t)taosArrayGetSize(pList), readerSize);
3527
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3528
      }
3529
      
3530
      SStmTaskStatus taskStatus = {0};
×
3531
      taskStatus.pStream = pStatus;
×
3532
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3533
      pNewTask = taosArrayPush(pList, &taskStatus);
×
3534
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3535

3536
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3537
      TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pNewTask, STREAM_IS_TRIGGER_READER(pTask->flags)));
×
3538
      break;
×
3539
    }
3540
    case STREAM_TRIGGER_TASK: {
×
3541
      taosMemoryFreeClear(pStatus->triggerTask);
×
3542
      pStatus->triggerTask = taosMemoryCalloc(1, sizeof(*pStatus->triggerTask));
×
3543
      TSDB_CHECK_NULL(pStatus->triggerTask, code, lino, _exit, terrno);
×
3544
      pStatus->triggerTask->pStream = pStatus;
×
3545
      mstSetTaskStatusFromMsg(pCtx, pStatus->triggerTask, pTask);
×
3546
      pNewTask = pStatus->triggerTask;
×
3547

3548
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3549
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, 0));
×
3550
      break;
×
3551
    }
3552
    case STREAM_RUNNER_TASK:{
×
3553
      if (NULL == pStatus->runners[pTask->deployId]) {
×
3554
        mstsError("deploy %d runner list is NULL", pTask->deployId);
×
3555
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3556
      }
3557
      if (taosArrayGetSize(pStatus->runners[pTask->deployId]) >= pStatus->runnerNum) {
×
3558
        mstsError("deploy %d runner list is already full, size:%d, expSize:%d", pTask->deployId, 
×
3559
            (int32_t)taosArrayGetSize(pStatus->runners[pTask->deployId]), pStatus->runnerNum);
3560
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3561
      }    
3562
      
3563
      SStmTaskStatus taskStatus = {0};
×
3564
      taskStatus.pStream = pStatus;
×
3565
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3566
      pNewTask = taosArrayPush(pStatus->runners[pTask->deployId], &taskStatus);
×
3567
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3568

3569
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3570
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, pTask->deployId));
×
3571
      break;
×
3572
    }
3573
    default: {
×
3574
      msttError("invalid task type:%d in task status", pTask->type);
×
3575
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3576
      break;
×
3577
    }
3578
  }
3579

3580
_exit:
×
3581

3582
  if (code) {
×
3583
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3584
  } else {
3585
    msttDebug("new task recored to taskMap/streamMap, task status:%s", gStreamStatusStr[pTask->status]);
×
3586
  }
3587

3588
  return code;
×
3589
}
3590

3591
int32_t msmWatchHandleStatusUpdate(SStmGrpCtx* pCtx) {
×
3592
  int32_t code = TSDB_CODE_SUCCESS;
×
3593
  int32_t lino = 0;
×
3594
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
×
3595

3596
  mstDebug("WATCH: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
×
3597

3598
  for (int32_t i = 0; i < num; ++i) {
×
3599
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
×
3600
    msttDebug("task status %s got, taskIdx:%d", gStreamStatusStr[pTask->status], pTask->taskIdx);
×
3601

3602
    if (pTask->taskId >= mStreamMgmt.lastTaskId) {
×
3603
      mStreamMgmt.lastTaskId = pTask->taskId + 1;
×
3604
    }
3605
    
3606
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
×
3607
    if (NULL == ppStatus) {
×
3608
      msttInfo("task still not in taskMap, will try to add it, taskIdx:%d", pTask->taskIdx);
×
3609
      
3610
      TAOS_CHECK_EXIT(msmWatchRecordNewTask(pCtx, pTask));
×
3611
      
3612
      continue;
×
3613
    }
3614
    
3615
    (*ppStatus)->status = pTask->status;
×
3616
    (*ppStatus)->lastUpTs = pCtx->currTs;
×
3617
  }
3618

3619
_exit:
×
3620

3621
  if (code) {
×
3622
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3623
  }
3624

3625
  return code;
×
3626
}
3627

3628
void msmRspAddStreamStart(int64_t streamId, SStmGrpCtx* pCtx, int32_t streamNum, SStmAction *pAction) {
336✔
3629
  int32_t code = TSDB_CODE_SUCCESS;
336✔
3630
  int32_t lino = 0;
336✔
3631
  if (NULL == pCtx->pRsp->start.taskList) {
336✔
3632
    pCtx->pRsp->start.taskList = taosArrayInit(streamNum, sizeof(SStreamTaskStart));
171✔
3633
    TSDB_CHECK_NULL(pCtx->pRsp->start.taskList, code, lino, _exit, terrno);
171!
3634
  }
3635

3636
  SStmTaskId* pId = &pAction->start.triggerId;
336✔
3637
  SStreamTaskStart start = {0};
336✔
3638
  start.task.type = STREAM_TRIGGER_TASK;
336✔
3639
  start.task.streamId = streamId;
336✔
3640
  start.task.taskId = pId->taskId;
336✔
3641
  start.task.seriousId = pId->seriousId;
336✔
3642
  start.task.nodeId = pId->nodeId;
336✔
3643
  start.task.taskIdx = pId->taskIdx;
336✔
3644

3645
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->start.taskList, &start), code, lino, _exit, terrno);
672!
3646
  TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
336!
3647

3648
  mstsDebug("stream START added to dnode %d hb rsp, triggerTaskId:%" PRIx64, pId->nodeId, pId->taskId);
336✔
3649

3650
  return;
336✔
3651

3652
_exit:
×
3653

3654
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3655
}
3656

3657

3658
void msmRspAddStreamUndeploy(int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
73✔
3659
  int32_t code = TSDB_CODE_SUCCESS;
73✔
3660
  int32_t lino = 0;
73✔
3661
  int32_t dropNum = taosArrayGetSize(pAction->undeploy.taskList);
73✔
3662
  if (NULL == pCtx->pRsp->undeploy.taskList) {
73✔
3663
    pCtx->pRsp->undeploy.taskList = taosArrayInit(dropNum, sizeof(SStreamTaskUndeploy));
66✔
3664
    TSDB_CHECK_NULL(pCtx->pRsp->undeploy.taskList, code, lino, _exit, terrno);
66!
3665
  }
3666

3667
  SStreamTaskUndeploy undeploy;
3668
  for (int32_t i = 0; i < dropNum; ++i) {
253✔
3669
    SStreamTask* pTask = (SStreamTask*)taosArrayGetP(pAction->undeploy.taskList, i);
180✔
3670
    undeploy.task = *pTask;
180✔
3671
    undeploy.undeployMsg.doCheckpoint = pAction->undeploy.doCheckpoint;
180✔
3672
    undeploy.undeployMsg.doCleanup = pAction->undeploy.doCleanup;
180✔
3673

3674
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->undeploy.taskList, &undeploy), code, lino, _exit, terrno);
360!
3675
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
180!
3676

3677
    msttDebug("task UNDEPLOY added to hb rsp, doCheckpoint:%d, doCleanup:%d", undeploy.undeployMsg.doCheckpoint, undeploy.undeployMsg.doCleanup);
180✔
3678
  }
3679

3680
  return;
73✔
3681

3682
_exit:
×
3683

3684
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3685
}
3686

3687
void msmRspAddTriggerUpdate(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3688
  int32_t code = TSDB_CODE_SUCCESS;
×
3689
  int32_t lino = 0;
×
3690

3691
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3692
  if (NULL == pStream) {
×
3693
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3694
    return;
×
3695
  }
3696

3697
  if (NULL == pStream->triggerTask) {
×
3698
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3699
    return;
×
3700
  }
3701

3702
  SStreamMgmtRsp rsp = {0};
×
3703
  rsp.reqId = INT64_MIN;
×
3704
  rsp.header.msgType = STREAM_MSG_UPDATE_RUNNER;
×
3705
  rsp.task.streamId = streamId;
×
3706
  rsp.task.taskId = pStream->triggerTask->id.taskId;
×
3707

3708
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pStream, streamId, &rsp.cont.runnerList));  
×
3709

3710
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
3711
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
3712
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
3713
  }
3714

3715
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
3716

3717
_exit:
×
3718

3719
  if (code) {
×
3720
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3721
  } else {
3722
    mstsDebug("trigger update rsp added, runnerNum:%d", (int32_t)taosArrayGetSize(rsp.cont.runnerList));
×
3723
  }
3724
}
3725

3726
void msmRspAddUserRecalc(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
2✔
3727
  int32_t code = TSDB_CODE_SUCCESS;
2✔
3728
  int32_t lino = 0;
2✔
3729

3730
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
2✔
3731
  if (NULL == pStream) {
2!
3732
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3733
    return;
×
3734
  }
3735

3736
  if (NULL == pStream->triggerTask) {
2!
3737
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3738
    return;
×
3739
  }
3740

3741
  SStreamMgmtRsp rsp = {0};
2✔
3742
  rsp.reqId = INT64_MIN;
2✔
3743
  rsp.header.msgType = STREAM_MSG_USER_RECALC;
2✔
3744
  rsp.task.streamId = streamId;
2✔
3745
  rsp.task.taskId = pStream->triggerTask->id.taskId;
2✔
3746
  TSWAP(rsp.cont.recalcList, pAction->recalc.recalcList);
2✔
3747

3748
  if (NULL == pCtx->pRsp->rsps.rspList) {
2!
3749
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
2✔
3750
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
2!
3751
  }
3752

3753
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
4!
3754

3755
_exit:
2✔
3756

3757
  if (code) {
2!
3758
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3759
  } else {
3760
    mstsDebug("user recalc rsp added, recalcNum:%d", (int32_t)taosArrayGetSize(rsp.cont.recalcList));
2!
3761
  }
3762
}
3763

3764

3765
int32_t msmHandleHbPostActions(SStmGrpCtx* pCtx) {
238✔
3766
  int32_t code = TSDB_CODE_SUCCESS;
238✔
3767
  int32_t lino = 0;
238✔
3768
  void* pIter = NULL;
238✔
3769
  int32_t streamNum = taosHashGetSize(pCtx->actionStm);
238✔
3770

3771
  mstDebug("start to handle stream group %d post actions", pCtx->pReq->streamGId);
238✔
3772

3773
  while (1) {
411✔
3774
    pIter = taosHashIterate(pCtx->actionStm, pIter);
649✔
3775
    if (pIter == NULL) {
649✔
3776
      break;
238✔
3777
    }
3778

3779
    int64_t* pStreamId = taosHashGetKey(pIter, NULL);
411✔
3780
    SStmAction *pAction = (SStmAction *)pIter;
411✔
3781
    
3782
    if (STREAM_ACT_UNDEPLOY & pAction->actions) {
411✔
3783
      msmRspAddStreamUndeploy(*pStreamId, pCtx, pAction);
73✔
3784
      continue;
73✔
3785
    }
3786

3787
    if (STREAM_ACT_UPDATE_TRIGGER & pAction->actions) {
338!
3788
      msmRspAddTriggerUpdate(pCtx->pMnode, *pStreamId, pCtx, pAction);
×
3789
    }
3790

3791
    if (STREAM_ACT_RECALC & pAction->actions) {
338✔
3792
      msmRspAddUserRecalc(pCtx->pMnode, *pStreamId, pCtx, pAction);
2✔
3793
    }
3794

3795
    if (STREAM_ACT_START & pAction->actions) {
338✔
3796
      msmRspAddStreamStart(*pStreamId, pCtx, streamNum, pAction);
336✔
3797
    }
3798
  }
3799
  
3800
_exit:
238✔
3801

3802
  if (pIter) {
238!
3803
    taosHashCancelIterate(pCtx->actionStm, pIter);
×
3804
  }
3805

3806
  if (code) {
238!
3807
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3808
  }
3809

3810
  return code;
238✔
3811
}
3812

3813
int32_t msmCheckUpdateDnodeTs(SStmGrpCtx* pCtx) {
33,459✔
3814
  int32_t  code = TSDB_CODE_SUCCESS;
33,459✔
3815
  int32_t  lino = 0;
33,459✔
3816
  int64_t* lastTs = NULL;
33,459✔
3817
  bool     noExists = false;
33,459✔
3818

3819
  while (true) {
3820
    lastTs = taosHashGet(mStreamMgmt.dnodeMap, &pCtx->pReq->dnodeId, sizeof(pCtx->pReq->dnodeId));
35,205✔
3821
    if (NULL == lastTs) {
35,205✔
3822
      if (noExists) {
2,078✔
3823
        mstWarn("Got unknown dnode %d hb msg, may be dropped", pCtx->pReq->dnodeId);
332!
3824
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
332!
3825
      }
3826

3827
      noExists = true;
1,746✔
3828
      TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pCtx->pMnode));
1,746!
3829
      
3830
      continue;
1,746✔
3831
    }
3832

3833
    while (true) {
×
3834
      int64_t lastTsValue = atomic_load_64(lastTs);
33,127✔
3835
      if (pCtx->currTs > lastTsValue) {
33,127✔
3836
        if (lastTsValue == atomic_val_compare_exchange_64(lastTs, lastTsValue, pCtx->currTs)) {
33,123!
3837
          mstDebug("dnode %d lastUpTs updated", pCtx->pReq->dnodeId);
33,123✔
3838
          return code;
33,123✔
3839
        }
3840

3841
        continue;
×
3842
      }
3843

3844
      return code;
4✔
3845
    }
3846

3847
    break;
3848
  }
3849

3850
_exit:
332✔
3851

3852
  if (code) {
332!
3853
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
332!
3854
  }
3855

3856
  return code;  
332✔
3857
}
3858

3859
void msmWatchCheckStreamMap(SStmGrpCtx* pCtx) {
×
3860
  SStmStatus* pStatus = NULL;
×
3861
  int32_t trigReaderNum = 0;
×
3862
  int32_t calcReaderNum = 0;
×
3863
  int32_t runnerNum = 0;
×
3864
  int64_t streamId = 0;
×
3865
  void* pIter = NULL;
×
3866
  while (true) {
3867
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
3868
    if (NULL == pIter) {
×
3869
      return;
×
3870
    }
3871

3872
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
3873
    pStatus = (SStmStatus*)pIter;
×
3874

3875
    if (NULL == pStatus->triggerTask) {
×
3876
      mstsWarn("no trigger task recored, deployTimes:%" PRId64, pStatus->deployTimes);
×
3877
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3878
      continue;
×
3879
    }
3880
    
3881
    trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
×
3882
    if (pStatus->trigReaderNum != trigReaderNum) {
×
3883
      mstsWarn("trigReaderNum %d mis-match with expected %d", trigReaderNum, pStatus->trigReaderNum);
×
3884
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3885
      continue;
×
3886
    }
3887

3888
    calcReaderNum = taosArrayGetSize(pStatus->calcReaders);
×
3889
    if (pStatus->calcReaderNum != calcReaderNum) {
×
3890
      mstsWarn("calcReaderNum %d mis-match with expected %d", calcReaderNum, pStatus->calcReaderNum);
×
3891
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3892
      continue;
×
3893
    }
3894

3895
    for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
3896
      runnerNum = taosArrayGetSize(pStatus->runners[i]);
×
3897
      if (runnerNum != pStatus->runnerNum) {
×
3898
        mstsWarn("runner deploy %d runnerNum %d mis-match with expected %d", i, runnerNum, pStatus->runnerNum);
×
3899
        msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3900
        continue;
×
3901
      }
3902
    }
3903
  }
3904
}
3905

3906
int32_t msmWatchHandleEnding(SStmGrpCtx* pCtx, bool watchError) {
×
3907
  int32_t code = TSDB_CODE_SUCCESS;
×
3908
  int32_t lino = 0;
×
3909
  int32_t minVal = watchError ? 0 : 1;
×
3910

3911
  if (0 != atomic_val_compare_exchange_8(&mStreamMgmt.watch.ending, 0, 1)) {
×
3912
    return code;
×
3913
  }
3914

3915
  while (atomic_load_32(&mStreamMgmt.watch.processing) > minVal) {
×
3916
    (void)sched_yield();
×
3917
  }
3918

3919
  if (watchError) {
×
3920
    taosHashClear(mStreamMgmt.vgroupMap);
×
3921
    taosHashClear(mStreamMgmt.snodeMap);
×
3922
    taosHashClear(mStreamMgmt.taskMap);
×
3923
    taosHashClear(mStreamMgmt.streamMap);
×
3924
    mstInfo("watch error happends, clear all maps");
×
3925
    goto _exit;
×
3926
  }
3927

3928
  if (0 == atomic_load_8(&mStreamMgmt.watch.taskRemains)) {
×
3929
    mstInfo("no stream tasks remain during watch state");
×
3930
    goto _exit;
×
3931
  }
3932

3933
  msmWatchCheckStreamMap(pCtx);
×
3934

3935
_exit:
×
3936

3937
  mStreamMgmt.lastTaskId += 100000;
×
3938

3939
  mstInfo("watch state end, new taskId begin from:%" PRIx64, mStreamMgmt.lastTaskId);
×
3940

3941
  msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
×
3942

3943
  if (code) {
×
3944
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3945
  }
3946

3947
  return code;
×
3948
}
3949

3950

3951
int32_t msmWatchHandleHbMsg(SStmGrpCtx* pCtx) {
×
3952
  int32_t code = TSDB_CODE_SUCCESS;
×
3953
  int32_t lino = 0;
×
3954
  SStreamHbMsg* pReq = pCtx->pReq;
×
3955

3956
  (void)atomic_add_fetch_32(&mStreamMgmt.watch.processing, 1);
×
3957
  
3958
  if (atomic_load_8(&mStreamMgmt.watch.ending)) {
×
3959
    goto _exit;
×
3960
  }
3961

3962
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
×
3963
  if (GOT_SNODE(pReq->snodeId)) {
×
3964
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
×
3965
  }
3966

3967
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
×
3968
    atomic_store_8(&mStreamMgmt.watch.taskRemains, 1);
×
3969
    TAOS_CHECK_EXIT(msmWatchHandleStatusUpdate(pCtx));
×
3970
  }
3971

3972
  if ((pCtx->currTs - MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN)) > MST_ISOLATION_DURATION) {
×
3973
    TAOS_CHECK_EXIT(msmWatchHandleEnding(pCtx, false));
×
3974
  }
3975

3976
_exit:
×
3977

3978
  atomic_sub_fetch_32(&mStreamMgmt.watch.processing, 1);
×
3979
  
3980
  if (code) {
×
3981
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3982

3983
    (void)msmWatchHandleEnding(pCtx, true);
×
3984
  }
3985

3986
  return code;
×
3987
}
3988

3989
int32_t msmCheckDeployTrigReader(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStmTaskStatusMsg* pTask, int32_t vgId, int32_t vgNum) {
61✔
3990
  int32_t code = TSDB_CODE_SUCCESS;
61✔
3991
  int32_t lino = 0;
61✔
3992
  bool    readerExists = false;
61✔
3993
  int64_t streamId = pTask->streamId;
61✔
3994

3995
  int32_t readerNum = taosArrayGetSize(pStatus->trigReaders);
61✔
3996
  for (int32_t i = 0; i < readerNum; ++i) {
70✔
3997
    SStmTaskStatus* pReader = (SStmTaskStatus*)taosArrayGet(pStatus->trigReaders, i);
63✔
3998
    if (pReader->id.nodeId == vgId) {
63✔
3999
      readerExists = true;
54✔
4000
      break;
54✔
4001
    }
4002
  }
4003

4004
  if (!readerExists) {
61✔
4005
    if (NULL == pStatus->trigOReaders) {
7!
4006
      pStatus->trigOReaders = taosArrayInit(vgNum, sizeof(SStmTaskStatus));
7✔
4007
      TSDB_CHECK_NULL(pStatus->trigOReaders, code, lino, _exit, terrno);
7!
4008
    }
4009
    
4010
    SStmTaskStatus* pState = taosArrayReserve(pStatus->trigOReaders, 1);
7✔
4011
    TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, vgId, pStatus, NULL, streamId));
7!
4012
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pState));
7!
4013
    TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, NULL, pState, true));
7!
4014
  }
4015

4016
_exit:
61✔
4017

4018
  if (code) {
61!
4019
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4020
  }
4021

4022
  return code;
61✔
4023
}
4024

4025
int32_t msmProcessDeployOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
59✔
4026
  int32_t code = TSDB_CODE_SUCCESS;
59✔
4027
  int32_t lino = 0;
59✔
4028
  int32_t vgId = 0;
59✔
4029
  int64_t streamId = pTask->streamId;
59✔
4030
  SArray* pTbs = pTask->pMgmtReq->cont.fullTableNames;
59✔
4031
  int32_t tbNum = taosArrayGetSize(pTbs);
59✔
4032
  SStreamDbTableName* pName = NULL;
59✔
4033
  SSHashObj* pDbVgroups = NULL;
59✔
4034
  SStreamMgmtRsp rsp = {0};
59✔
4035
  rsp.reqId = pTask->pMgmtReq->reqId;
59✔
4036
  rsp.header.msgType = STREAM_MSG_ORIGTBL_READER_INFO;
59✔
4037
  int32_t iter = 0;
59✔
4038
  void* p = NULL;
59✔
4039
  SSHashObj* pVgs = NULL;
59✔
4040
  SStreamMgmtReq* pMgmtReq = NULL;
59✔
4041
  int8_t stopped = 0;
59✔
4042
  
4043
  TSWAP(pTask->pMgmtReq, pMgmtReq);
59✔
4044
  rsp.task = *(SStreamTask*)pTask;
59✔
4045

4046
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
59✔
4047
  if (NULL == pStatus) {
59!
4048
    mstsError("stream not deployed, remainStreams:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4049
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_RUNNING);
×
4050
  }
4051

4052
  stopped = atomic_load_8(&pStatus->stopped);
59✔
4053
  if (stopped) {
59!
4054
    msttInfo("stream stopped %d, ignore deploy trigger reader, vgId:%d", stopped, vgId);
×
4055
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_STOPPED);
×
4056
  }
4057

4058
  if (tbNum <= 0) {
59!
4059
    mstsWarn("empty table list in origReader req, array:%p", pTbs);
×
4060
    goto _exit;
×
4061
  }
4062

4063
  int32_t oReaderNum = taosArrayGetSize(pStatus->trigOReaders);
59✔
4064
  if (oReaderNum > 0) {
59!
4065
    mstsWarn("origReaders already exits, num:%d", oReaderNum);
×
4066
    goto _exit;
×
4067
  }
4068

4069
  TAOS_CHECK_EXIT(mstBuildDBVgroupsMap(pCtx->pMnode, &pDbVgroups));
59!
4070
  rsp.cont.vgIds = taosArrayInit(tbNum, sizeof(int32_t));
59✔
4071
  TSDB_CHECK_NULL(rsp.cont.vgIds, code, lino, _exit, terrno);
59!
4072

4073
  pVgs = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
59✔
4074
  TSDB_CHECK_NULL(pVgs, code, lino, _exit, terrno);
59!
4075
  
4076
  for (int32_t i = 0; i < tbNum; ++i) {
181✔
4077
    pName = (SStreamDbTableName*)taosArrayGet(pTbs, i);
122✔
4078
    TAOS_CHECK_EXIT(mstGetTableVgId(pDbVgroups, pName->dbFName, pName->tbName, &vgId));
122!
4079
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.vgIds, &vgId), code, lino, _exit, terrno);
244!
4080
    TAOS_CHECK_EXIT(tSimpleHashPut(pVgs, &vgId, sizeof(vgId), &vgId, sizeof(vgId)));
122!
4081
  }
4082

4083
  int32_t vgNum = tSimpleHashGetSize(pVgs);
59✔
4084
  while (true) {
4085
    p = tSimpleHashIterate(pVgs, p, &iter);
120✔
4086
    if (NULL == p) {
120✔
4087
      break;
59✔
4088
    }
4089
    
4090
    TAOS_CHECK_EXIT(msmCheckDeployTrigReader(pCtx, pStatus, pTask, *(int32_t*)p, vgNum));
61!
4091
  }
4092
  
4093
  vgNum = taosArrayGetSize(pStatus->trigOReaders);
59✔
4094
  rsp.cont.readerList = taosArrayInit(vgNum, sizeof(SStreamTaskAddr));
59✔
4095
  TSDB_CHECK_NULL(rsp.cont.readerList, code, lino, _exit, terrno);
59!
4096

4097
  SStreamTaskAddr addr;
4098
  for (int32_t i = 0; i < vgNum; ++i) {
66✔
4099
    SStmTaskStatus* pOTask = taosArrayGet(pStatus->trigOReaders, i);
7✔
4100
    addr.taskId = pOTask->id.taskId;
7✔
4101
    addr.nodeId = pOTask->id.nodeId;
7✔
4102
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, pOTask->id.nodeId);
7✔
4103
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.readerList, &addr), code, lino, _exit, terrno);
14!
4104
    mstsDebug("the %dth otrigReader src added to trigger's virtual orig readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
7!
4105
  }
4106

4107
  if (NULL == pCtx->pRsp->rsps.rspList) {
59!
4108
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
4109
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
4110
  }
4111

4112
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
118!
4113

4114
_exit:
59✔
4115

4116
  tFreeSStreamMgmtReq(pMgmtReq);
59✔
4117
  taosMemoryFree(pMgmtReq);
59!
4118

4119
  tSimpleHashCleanup(pVgs);
59✔
4120

4121
  if (code) {
59!
4122
    mndStreamDestroySStreamMgmtRsp(&rsp);
×
4123
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4124
  }
4125

4126
  mstDestroyDbVgroupsHash(pDbVgroups);
59✔
4127

4128
  return code;
59✔
4129
}
4130

4131
int32_t msmHandleTaskMgmtReq(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
59✔
4132
  int32_t code = TSDB_CODE_SUCCESS;
59✔
4133
  int32_t lino = 0;
59✔
4134

4135
  switch (pTask->pMgmtReq->type) {
59!
4136
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
59✔
4137
      TAOS_CHECK_EXIT(msmProcessDeployOrigReader(pCtx, pTask));
59!
4138
      break;
59✔
4139
    default:
×
4140
      msttError("Invalid mgmtReq type:%d", pTask->pMgmtReq->type);
×
4141
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4142
      break;
×
4143
  }
4144

4145
_exit:
59✔
4146

4147
  if (code) {
59!
4148
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4149
  }
4150

4151
  return code;
59✔
4152
}
4153

4154
int32_t msmHandleStreamRequests(SStmGrpCtx* pCtx) {
28✔
4155
  int32_t code = TSDB_CODE_SUCCESS;
28✔
4156
  int32_t lino = 0;
28✔
4157
  SStreamHbMsg* pReq = pCtx->pReq;
28✔
4158
  SStmTaskStatusMsg* pTask = NULL;
28✔
4159
  
4160
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
28✔
4161
  if (reqNum > 0 && NULL == pCtx->pRsp->rsps.rspList) {
28!
4162
    pCtx->pRsp->rsps.rspList = taosArrayInit(reqNum, sizeof(SStreamMgmtRsp));
28✔
4163
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
28!
4164
  }
4165
  
4166
  for (int32_t i = 0; i < reqNum; ++i) {
87✔
4167
    int32_t idx = *(int32_t*)taosArrayGet(pReq->pStreamReq, i);
59✔
4168
    pTask = (SStmTaskStatusMsg*)taosArrayGet(pReq->pStreamStatus, idx);
59✔
4169
    if (NULL == pTask) {
59!
4170
      mstError("idx %d is NULL, reqNum:%d", idx, reqNum);
×
4171
      continue;
×
4172
    }
4173

4174
    if (NULL == pTask->pMgmtReq) {
59!
4175
      msttError("idx %d without mgmtReq", idx);
×
4176
      continue;
×
4177
    }
4178

4179
    TAOS_CHECK_EXIT(msmHandleTaskMgmtReq(pCtx, pTask));
59!
4180
  }
4181

4182
_exit:
28✔
4183

4184
  if (code) {
28!
4185
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4186
  }
4187

4188
  return code;
28✔
4189
}
4190

4191
int32_t msmNormalHandleHbMsg(SStmGrpCtx* pCtx) {
33,459✔
4192
  int32_t code = TSDB_CODE_SUCCESS;
33,459✔
4193
  int32_t lino = 0;
33,459✔
4194
  SStreamHbMsg* pReq = pCtx->pReq;
33,459✔
4195

4196
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
33,459✔
4197
  if (GOT_SNODE(pReq->snodeId)) {
33,127✔
4198
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
1,891!
4199
  }
4200
  
4201
  if (atomic_load_64(&mStreamMgmt.actionQ->qRemainNum) > 0 && 0 == taosWTryLockLatch(&mStreamMgmt.actionQLock)) {
33,127!
4202
    msmHandleStreamActions(pCtx);
82✔
4203
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
82✔
4204
  }
4205

4206
  if (taosArrayGetSize(pReq->pStreamReq) > 0 && mstWaitLock(&mStreamMgmt.actionQLock, false)) {
33,127!
4207
    code = msmHandleStreamRequests(pCtx);
28✔
4208
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
28✔
4209
    TAOS_CHECK_EXIT(code);
28!
4210
  }
4211

4212
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
33,127✔
4213
    TAOS_CHECK_EXIT(msmGrpAddDeployVgTasks(pCtx));
161!
4214
  } else {
4215
    TAOS_CHECK_EXIT(msmUpdateVgroupsUpTs(pCtx));
32,966!
4216
  }
4217

4218
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0 && GOT_SNODE(pReq->snodeId)) {
33,127✔
4219
    TAOS_CHECK_EXIT(msmGrpAddDeploySnodeTasks(pCtx));
145!
4220
  }
4221

4222
  if (taosHashGetSize(pCtx->deployStm) > 0) {
33,127✔
4223
    TAOS_CHECK_EXIT(msmRspAddStreamsDeploy(pCtx));
107!
4224
  }
4225

4226
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
33,127✔
4227
    TAOS_CHECK_EXIT(msmNormalHandleStatusUpdate(pCtx));
1,698!
4228
  }
4229

4230
  if (taosHashGetSize(pCtx->actionStm) > 0) {
33,127✔
4231
    TAOS_CHECK_EXIT(msmHandleHbPostActions(pCtx));
238!
4232
  }
4233

4234
_exit:
33,127✔
4235

4236
  if (code) {
33,459✔
4237
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
332!
4238
  }
4239

4240
  return code;
33,459✔
4241
}
4242

4243
void msmEncodeStreamHbRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SMStreamHbRspMsg* pRsp, SRpcMsg* pMsg) {
34,163✔
4244
  int32_t lino = 0;
34,163✔
4245
  int32_t tlen = 0;
34,163✔
4246
  void   *buf = NULL;
34,163✔
4247

4248
  if (TSDB_CODE_SUCCESS != code) {
34,163✔
4249
    goto _exit;
332✔
4250
  }
4251

4252
  tEncodeSize(tEncodeStreamHbRsp, pRsp, tlen, code);
33,831!
4253
  if (code < 0) {
33,831!
4254
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4255
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4256
  }
4257

4258
  buf = rpcMallocCont(tlen + sizeof(SStreamMsgGrpHeader));
33,831✔
4259
  if (buf == NULL) {
33,831!
4260
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
×
4261
    TAOS_CHECK_EXIT(terrno);    
×
4262
  }
4263

4264
  ((SStreamMsgGrpHeader *)buf)->streamGid = pRsp->streamGId;
33,831✔
4265
  void *abuf = POINTER_SHIFT(buf, sizeof(SStreamMsgGrpHeader));
33,831✔
4266

4267
  SEncoder encoder;
4268
  tEncoderInit(&encoder, abuf, tlen);
33,831✔
4269
  if ((code = tEncodeStreamHbRsp(&encoder, pRsp)) < 0) {
33,831!
4270
    rpcFreeCont(buf);
×
4271
    buf = NULL;
×
4272
    tEncoderClear(&encoder);
×
4273
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4274
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4275
  }
4276
  tEncoderClear(&encoder);
33,831✔
4277

4278
_exit:
34,163✔
4279

4280
  pMsg->code = code;
34,163✔
4281
  pMsg->info = *pRpcInfo;
34,163✔
4282
  if (TSDB_CODE_SUCCESS == code) {
34,163✔
4283
    pMsg->contLen = tlen + sizeof(SStreamMsgGrpHeader);
33,831✔
4284
    pMsg->pCont = buf;
33,831✔
4285
  }
4286
}
34,163✔
4287

4288

4289
int32_t msmHandleStreamHbMsg(SMnode* pMnode, int64_t currTs, SStreamHbMsg* pHb, SRpcMsg *pReq, SRpcMsg* pRspMsg) {
34,163✔
4290
  int32_t code = TSDB_CODE_SUCCESS;
34,163✔
4291
  SMStreamHbRspMsg rsp = {0};
34,163✔
4292
  rsp.streamGId = pHb->streamGId;
34,163✔
4293

4294
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
34,163✔
4295

4296
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
34,163✔
4297
    mstWarn("mnode stream become NOT active, ignore stream hb from dnode %d streamGid %d", pHb->dnodeId, pHb->streamGId);
704!
4298
    goto _exit;
704✔
4299
  }
4300

4301
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
33,459✔
4302
  SStmGrpCtx* pCtx = &mStreamMgmt.tCtx[tidx].grpCtx[pHb->streamGId];
33,459✔
4303

4304
  pCtx->tidx = tidx;
33,459✔
4305
  pCtx->pMnode = pMnode;
33,459✔
4306
  pCtx->currTs = currTs;
33,459✔
4307
  pCtx->pReq = pHb;
33,459✔
4308
  pCtx->pRsp = &rsp;
33,459✔
4309
  pCtx->deployStm = mStreamMgmt.tCtx[pCtx->tidx].deployStm[pHb->streamGId];
33,459✔
4310
  pCtx->actionStm = mStreamMgmt.tCtx[pCtx->tidx].actionStm[pHb->streamGId];
33,459✔
4311
  
4312
  switch (atomic_load_8(&mStreamMgmt.state)) {
33,459!
4313
    case MND_STM_STATE_WATCH:
×
4314
      code = msmWatchHandleHbMsg(pCtx);
×
4315
      break;
×
4316
    case MND_STM_STATE_NORMAL:
33,459✔
4317
      code = msmNormalHandleHbMsg(pCtx);
33,459✔
4318
      break;
33,459✔
4319
    default:
×
4320
      mstError("Invalid stream state: %d", mStreamMgmt.state);
×
4321
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4322
      break;
×
4323
  }
4324

4325
_exit:
34,163✔
4326

4327
  msmEncodeStreamHbRsp(code, &pReq->info, &rsp, pRspMsg);
34,163✔
4328

4329
  msmCleanStreamGrpCtx(pHb);
34,163✔
4330
  msmClearStreamToDeployMaps(pHb);
34,163✔
4331

4332
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
34,163✔
4333
  
4334
  tFreeSMStreamHbRspMsg(&rsp);
34,163✔
4335

4336
  return code;
34,163✔
4337
}
4338

4339
void msmHandleBecomeLeader(SMnode *pMnode) {
1,700✔
4340
  if (tsDisableStream) {
1,700!
4341
    return;
×
4342
  }
4343

4344
  mstInfo("start to process mnode become leader");
1,700!
4345

4346
  int32_t code = 0;
1,700✔
4347
  streamAddVnodeLeader(MNODE_HANDLE);
1,700✔
4348
  
4349
  taosWLockLatch(&mStreamMgmt.runtimeLock);
1,700✔
4350
  msmDestroyRuntimeInfo(pMnode);
1,700✔
4351
  code = msmInitRuntimeInfo(pMnode);
1,700✔
4352
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
1,700✔
4353

4354
  if (TSDB_CODE_SUCCESS == code) {
1,700!
4355
    atomic_store_8(&mStreamMgmt.active, 1);
1,700✔
4356
  }
4357

4358
  mstInfo("mnode stream mgmt active:%d", atomic_load_8(&mStreamMgmt.active));
1,700!
4359
}
4360

4361
void msmHandleBecomeNotLeader(SMnode *pMnode) {  
2,322✔
4362
  if (tsDisableStream) {
2,322!
4363
    return;
×
4364
  }
4365

4366
  mstInfo("start to process mnode become not leader");
2,322!
4367

4368
  streamRemoveVnodeLeader(MNODE_HANDLE);
2,322✔
4369

4370
  if (atomic_val_compare_exchange_8(&mStreamMgmt.active, 1, 0)) {
2,322✔
4371
    taosWLockLatch(&mStreamMgmt.runtimeLock);
1,700✔
4372
    msmDestroyRuntimeInfo(pMnode);
1,700✔
4373
    mStreamMgmt.stat.inactiveTimes++;
1,700✔
4374
    taosWUnLockLatch(&mStreamMgmt.runtimeLock);
1,700✔
4375
  }
4376
}
4377

4378

4379
static void msmRedeployStream(int64_t streamId, SStmStatus* pStatus) {
×
4380
  if (1 == atomic_val_compare_exchange_8(&pStatus->stopped, 1, 0)) {
×
4381
    mstsInfo("try to reset and redeploy stream, deployTimes:%" PRId64, pStatus->deployTimes);
×
4382
    mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4383
  } else {
4384
    mstsWarn("stream stopped %d already changed", atomic_load_8(&pStatus->stopped));
×
4385
  }
4386
}
×
4387

4388
static bool msmCheckStreamAssign(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
35✔
4389
  int32_t code = TSDB_CODE_SUCCESS;
35✔
4390
  int32_t lino = 0;
35✔
4391
  SStreamObj* pStream = pObj;
35✔
4392
  SSnodeObj* pSnode = p1;
35✔
4393
  SArray** ppRes = p2;
35✔
4394

4395
  if (pStream->mainSnodeId == pSnode->id) {
35✔
4396
    if (NULL == *ppRes) {
19✔
4397
      int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
3✔
4398
      *ppRes = taosArrayInit(streamNum, POINTER_BYTES);
3✔
4399
      TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
3!
4400
    }
4401

4402
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &pStream), code, lino, _exit, terrno);
38!
4403
  }
4404

4405
  return true;
35✔
4406

4407
_exit:
×
4408

4409
  if (code) {
×
4410
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4411
  }  
4412

4413
  *(int32_t*)p3 = code;
×
4414

4415
  return false;
×
4416
}
4417

4418

4419
int32_t msmCheckSnodeReassign(SMnode *pMnode, SSnodeObj* pSnode, SArray** ppRes) {
156✔
4420
  int32_t code = TSDB_CODE_SUCCESS;
156✔
4421
  int32_t lino = 0;
156✔
4422
  
4423
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckStreamAssign, pSnode, ppRes, &code);
156✔
4424
  TAOS_CHECK_EXIT(code);
156!
4425

4426
  int32_t streamNum = taosArrayGetSize(*ppRes);
156✔
4427
  if (streamNum > 0 && 0 == pSnode->replicaId) {
156✔
4428
    mstError("snode %d has no replica while %d streams assigned", pSnode->id, streamNum);
1!
4429
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_SNODE_IN_USE);
1!
4430
  }
4431

4432
  //STREAMTODO CHECK REPLICA UPDATED OR NOT
4433

4434
_exit:
155✔
4435

4436
  if (code) {
156✔
4437
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
1!
4438
  }  
4439

4440
  return code;
156✔
4441
}
4442

4443
static bool msmCheckLoopStreamSdb(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
854✔
4444
  SStreamObj* pStream = pObj;
854✔
4445
  int64_t streamId = pStream->pCreate->streamId;
854✔
4446
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
854✔
4447
  SStmCheckStatusCtx* pCtx = (SStmCheckStatusCtx*)p1;
854✔
4448
  int8_t userDropped = atomic_load_8(&pStream->userDropped), userStopped = atomic_load_8(&pStream->userStopped);
854✔
4449
  
4450
  if ((userDropped || userStopped) && (NULL == pStatus)) {
854!
4451
    mstsDebug("stream userDropped %d userStopped %d and not in streamMap, ignore it", userDropped, userStopped);
1!
4452
    return true;
1✔
4453
  }
4454
  
4455
  if (pStatus && !MST_STM_PASS_ISOLATION(pStream, pStatus)) {
853✔
4456
    mstsDebug("stream not pass isolation time, updateTime:%" PRId64 ", lastActionTs:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
683✔
4457
        pStream->updateTime, pStatus->lastActionTs, mStreamMgmt.hCtx.currentTs);
4458
    return true;
683✔
4459
  }
4460

4461
  if (NULL == pStatus && !MST_STM_STATIC_PASS_ISOLATION(pStream)) {
170!
4462
    mstsDebug("stream not pass static isolation time, updateTime:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
42!
4463
        pStream->updateTime, mStreamMgmt.hCtx.currentTs);
4464
    return true;
42✔
4465
  }  
4466

4467
  if (pStatus) {
128!
4468
    if (userDropped || userStopped || MST_IS_USER_STOPPED(atomic_load_8(&pStatus->stopped))) {
128!
4469
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
1✔
4470
    }
4471

4472
    return true;
128✔
4473
  }
4474

4475
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, false, STREAM_ACT_DEPLOY);
×
4476

4477
  return true;
×
4478
}
4479

4480
void msmCheckLoopStreamMap(SMnode *pMnode) {
97✔
4481
  SStmStatus* pStatus = NULL;
97✔
4482
  void* pIter = NULL;
97✔
4483
  int8_t stopped = 0;
97✔
4484
  int64_t streamId = 0;
97✔
4485
  
4486
  while (true) {
4487
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
602✔
4488
    if (NULL == pIter) {
602✔
4489
      break;
97✔
4490
    }
4491

4492
    pStatus = (SStmStatus*)pIter;
505✔
4493

4494
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
505✔
4495
    stopped = atomic_load_8(&pStatus->stopped);
505✔
4496
    if (MST_IS_USER_STOPPED(stopped)) {
505!
4497
      mstsDebug("stream already stopped by user, deployTimes:%" PRId64, pStatus->deployTimes);
8✔
4498
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
8✔
4499
      continue;
8✔
4500
    }
4501

4502
    if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, pStatus->streamName)) {
497!
4503
      mstsDebug("stream already not exists, deployTimes:%" PRId64, pStatus->deployTimes);
×
4504
      (void)msmRemoveStreamFromMaps(pMnode, *(int64_t*)taosHashGetKey(pIter, NULL));
×
4505
      continue;
×
4506
    }
4507

4508
    if (MST_IS_ERROR_STOPPED(stopped)) {
497✔
4509
      if (mStreamMgmt.hCtx.currentTs < pStatus->fatalRetryTs) {
16!
4510
        mstsDebug("stream already stopped by error %s, retried times:%" PRId64 ", next time not reached, currTs:%" PRId64 ", nextRetryTs:%" PRId64,
×
4511
            tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes, mStreamMgmt.hCtx.currentTs, pStatus->fatalRetryTs);
4512
            
4513
        MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, mStreamMgmt.hCtx.currentTs);
×
4514
        continue;
×
4515
      }
4516

4517
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
16✔
4518
      continue;
16✔
4519
    }
4520

4521
    if (MST_IS_GRANT_STOPPED(stopped) && TSDB_CODE_SUCCESS == grantCheckExpire(TSDB_GRANT_STREAMS)) {
481!
4522
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4523
      continue;
×
4524
    }
4525
  }
4526
}
97✔
4527

4528
void msmCheckStreamsStatus(SMnode *pMnode) {
483✔
4529
  SStmCheckStatusCtx ctx = {0};
483✔
4530

4531
  mstDebug("start to check streams status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
483✔
4532
  
4533
  if (MST_READY_FOR_SDB_LOOP()) {
483!
4534
    mstDebug("ready to check sdb loop, lastLoopSdbTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SDB].ts);
144✔
4535
    sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckLoopStreamSdb, &ctx, NULL, NULL);
144✔
4536
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_SDB, mStreamMgmt.hCtx.currentTs);
144✔
4537
  }
4538

4539
  if (MST_READY_FOR_MAP_LOOP()) {
483!
4540
    mstDebug("ready to check map loop, lastLoopMapTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_MAP].ts);
97✔
4541
    msmCheckLoopStreamMap(pMnode);
97✔
4542
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
97✔
4543
  }
4544
}
483✔
4545

4546
void msmCheckTaskListStatus(int64_t streamId, SStmTaskStatus** pList, int32_t taskNum) {
7,114✔
4547
  for (int32_t i = 0; i < taskNum; ++i) {
14,480✔
4548
    SStmTaskStatus* pTask = *(pList + i);
7,378✔
4549

4550
    if (atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped)) {
7,378✔
4551
      continue;
7,325✔
4552
    }
4553
    
4554
    if (!MST_PASS_ISOLATION(pTask->lastUpTs, 1)) {
7,160✔
4555
      continue;
7,107✔
4556
    }
4557

4558
    int64_t noUpTs = mStreamMgmt.hCtx.currentTs - pTask->lastUpTs;
53✔
4559
    if (STREAM_RUNNER_TASK == pTask->type || STREAM_TRIGGER_TASK == pTask->type) {
53✔
4560
      mstsWarn("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
12!
4561
          gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4562
          
4563
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_TASK_LOST, mStreamMgmt.hCtx.currentTs);
12✔
4564
      break;
12✔
4565
    }
4566

4567
    mstsInfo("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
41!
4568
        gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4569

4570
    int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
41✔
4571
    mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
41✔
4572

4573
    SStmTaskAction task = {0};
41✔
4574
    task.streamId = streamId;
41✔
4575
    task.id = pTask->id;
41✔
4576
    task.flag = pTask->flags;
41✔
4577
    task.type = pTask->type;
41✔
4578
    
4579
    mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
41✔
4580
  }
4581
}
7,114✔
4582

4583
void msmCheckVgroupStreamStatus(SHashObj* pStreams) {
604✔
4584
  void* pIter = NULL;
604✔
4585
  SStmVgStreamStatus* pVg = NULL;
604✔
4586
  int64_t streamId = 0;
604✔
4587
  
4588
  while (true) {
1,833✔
4589
    pIter = taosHashIterate(pStreams, pIter);
2,437✔
4590
    if (NULL == pIter) {
2,437✔
4591
      break;
604✔
4592
    }
4593

4594
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
1,833✔
4595
    pVg = (SStmVgStreamStatus*)pIter;
1,833✔
4596

4597
    int32_t taskNum = taosArrayGetSize(pVg->trigReaders);
1,833✔
4598
    if (taskNum > 0) {
1,833✔
4599
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->trigReaders, 0), taskNum);
1,452✔
4600
    }
4601

4602
    taskNum = taosArrayGetSize(pVg->calcReaders);
1,833✔
4603
    if (taskNum > 0) {
1,833✔
4604
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->calcReaders, 0), taskNum);
673✔
4605
    }
4606
  }
4607
}
604✔
4608

4609
void msmHandleVgroupLost(SMnode *pMnode, int32_t vgId, SStmVgroupStatus* pVg) {
6✔
4610
  int64_t streamId = 0;
6✔
4611
  void* pIter = NULL;
6✔
4612
  SStmVgStreamStatus* pStream = NULL;
6✔
4613

4614
  if (!MST_PASS_ISOLATION(pVg->lastUpTs, 5)) {
6!
4615
    mstDebug("vgroup %d lost and still in watch time, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
6✔
4616
    return;
6✔
4617
  }
4618

4619
  
4620
  while (true) {
4621
    pIter = taosHashIterate(pVg->streamTasks, pIter);
×
4622
    if (NULL == pIter) {
×
4623
      break;
×
4624
    }
4625

4626
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4627
    
4628
    msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_VGROUP_LOST, mStreamMgmt.hCtx.currentTs);
×
4629
  }
4630

4631
  taosHashClear(pVg->streamTasks);
×
4632
}
4633

4634

4635
void msmCheckVgroupStatus(SMnode *pMnode) {
483✔
4636
  void* pIter = NULL;
483✔
4637
  
4638
  while (true) {
1,828✔
4639
    pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter);
2,311✔
4640
    if (NULL == pIter) {
2,311✔
4641
      break;
483✔
4642
    }
4643

4644
    int32_t vgId = *(int32_t*)taosHashGetKey(pIter, NULL);
1,828✔
4645
    if ((vgId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
1,828✔
4646
      continue;
1,218✔
4647
    }
4648
    
4649
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
610✔
4650

4651
    if (MST_PASS_ISOLATION(pVg->lastUpTs, 1)) {
610✔
4652
      mstWarn("vgroup %d lost, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
6!
4653
      
4654
      msmHandleVgroupLost(pMnode, vgId, pVg);
6✔
4655
      continue;
6✔
4656
    }
4657

4658
    mstDebug("vgroup %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, vgId, mStreamMgmt.hCtx.currentTs, pVg->lastUpTs);
604✔
4659

4660
    msmCheckVgroupStreamStatus(pVg->streamTasks);
604✔
4661
  }
4662
}
483✔
4663

4664
void msmHandleRunnerRedeploy(int64_t streamId, SStmSnodeStreamStatus* pStream, int32_t* deployNum, int32_t* deployId) {
5✔
4665
  *deployNum = 0;
5✔
4666
  
4667
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
15✔
4668
    if (pStream->runners[i]) {
13✔
4669
      int32_t taskNum = taosArrayGetSize(pStream->runners[i]);
5✔
4670
      for (int32_t t = 0; t < taskNum; ++t) {
7✔
4671
        SStmTaskStatus* pTask = taosArrayGetP(pStream->runners[i], t);
5✔
4672
        int8_t stopped = atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped);
5✔
4673
        if (stopped) {
5✔
4674
          mstsDebug("stream already stopped %d, ignore it", stopped);
3!
4675
          *deployNum = 0;
3✔
4676
          return;
3✔
4677
        }
4678

4679
        int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
2✔
4680
        mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
2!
4681
      }
4682
      
4683
      deployId[*deployNum] = i;
2✔
4684
      (*deployNum)++;
2✔
4685
    }
4686
  }
4687
}
4688

4689
void msmHandleSnodeLost(SMnode *pMnode, SStmSnodeStatus* pSnode) {
38✔
4690
  pSnode->runnerThreadNum = -1;
38✔
4691

4692
  (void)msmSTAddSnodesToMap(pMnode);
38✔
4693

4694
  int64_t streamId = 0;
38✔
4695
  void* pIter = NULL;
38✔
4696
  SStmSnodeStreamStatus* pStream = NULL;
38✔
4697
  int32_t deployNum = 0;
38✔
4698
  SStmTaskAction task = {0};
38✔
4699
  
4700
  while (true) {
4701
    pIter = taosHashIterate(pSnode->streamTasks, pIter);
48✔
4702
    if (NULL == pIter) {
48✔
4703
      break;
38✔
4704
    }
4705

4706
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
10✔
4707
    
4708
    task.streamId = streamId;
10✔
4709
    
4710
    pStream = (SStmSnodeStreamStatus*)pIter;
10✔
4711
    if (pStream->trigger) {
10✔
4712
      int8_t stopped = atomic_load_8(&((SStmStatus*)pStream->trigger->pStream)->stopped);
5✔
4713
      if (stopped) {
5✔
4714
        mstsDebug("stream already stopped %d, ignore it", stopped);
1!
4715
        continue;
1✔
4716
      }
4717

4718
      mstsInfo("snode lost with trigger task %" PRIx64 ", will try to restart current stream", pStream->trigger->id.taskId);
4!
4719
      
4720
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_SNODE_LOST, mStreamMgmt.hCtx.currentTs);
4✔
4721
    } else {
4722
      msmHandleRunnerRedeploy(streamId, pStream, &task.deployNum, task.deployId);
5✔
4723
      
4724
      if (task.deployNum > 0) {
5✔
4725
        //task.triggerStatus = pStream->trigger;
4726
        task.multiRunner = true;
2✔
4727
        task.type = STREAM_RUNNER_TASK;
2✔
4728
        
4729
        mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
2✔
4730
        
4731
        mstsInfo("runner tasks %d redeploys added to actionQ", task.deployNum);
2!
4732
      }
4733
    }
4734
  }
4735

4736
  taosHashClear(pSnode->streamTasks);
38✔
4737
}
38✔
4738

4739

4740
void msmCheckSnodeStreamStatus(SHashObj* pStreams) {
233✔
4741
  void* pIter = NULL;
233✔
4742
  SStmSnodeStreamStatus* pSnode = NULL;
233✔
4743
  int64_t streamId = 0;
233✔
4744
  
4745
  while (true) {
4746
    pIter = taosHashIterate(pStreams, pIter);
1,610✔
4747
    if (NULL == pIter) {
1,610✔
4748
      break;
233✔
4749
    }
4750

4751
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
1,377✔
4752
    pSnode = (SStmSnodeStreamStatus*)pIter;
1,377✔
4753

4754
    if (NULL != pSnode->trigger) {
1,377✔
4755
      msmCheckTaskListStatus(streamId, &pSnode->trigger, 1);
1,251✔
4756
    }
4757

4758
    for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
5,508✔
4759
      int32_t taskNum = taosArrayGetSize(pSnode->runners[i]);
4,131✔
4760
      if (taskNum > 0) {
4,131✔
4761
        msmCheckTaskListStatus(streamId, taosArrayGet(pSnode->runners[i], 0), taskNum);
3,738✔
4762
      }
4763
    }
4764
  }
4765
}
233✔
4766

4767

4768
void msmCheckSnodeStatus(SMnode *pMnode) {
483✔
4769
  void* pIter = NULL;
483✔
4770
  
4771
  while (true) {
792✔
4772
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
1,275✔
4773
    if (NULL == pIter) {
1,275✔
4774
      break;
483✔
4775
    }
4776

4777
    int32_t snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
792✔
4778
    if ((snodeId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
792✔
4779
      continue;
515✔
4780
    }
4781

4782
    mstDebug("start to check snode %d status, currTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs);
277✔
4783
    
4784
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
277✔
4785
    if (NULL == pSnode->streamTasks) {
277✔
4786
      mstDebug("ignore snode %d health check since empty tasks", snodeId);
9!
4787
      continue;
9✔
4788
    }
4789
    
4790
    if (MST_PASS_ISOLATION(pSnode->lastUpTs, 1)) {
268✔
4791
      mstInfo("snode %d lost, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
35!
4792
          snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
4793
      
4794
      msmHandleSnodeLost(pMnode, pSnode);
35✔
4795
      continue;
35✔
4796
    }
4797
    
4798
    mstDebug("snode %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs, pSnode->lastUpTs);
233✔
4799

4800
    msmCheckSnodeStreamStatus(pSnode->streamTasks);
233✔
4801
  }
4802
}
483✔
4803

4804

4805
void msmCheckTasksStatus(SMnode *pMnode) {
483✔
4806
  mstDebug("start to check tasks status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
483✔
4807

4808
  msmCheckVgroupStatus(pMnode);
483✔
4809
  msmCheckSnodeStatus(pMnode);
483✔
4810
}
483✔
4811

4812
void msmCheckSnodesState(SMnode *pMnode) {
483✔
4813
  if (!MST_READY_FOR_SNODE_LOOP()) {
483!
4814
    return;
394✔
4815
  }
4816

4817
  mstDebug("ready to check snode loop, lastTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SNODE].ts);
89✔
4818

4819
  void* pIter = NULL;
89✔
4820
  int32_t snodeId = 0;
89✔
4821
  while (true) {
108✔
4822
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
197✔
4823
    if (NULL == pIter) {
197✔
4824
      break;
89✔
4825
    }
4826

4827
    snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
108✔
4828
    if (sdbCheckExists(pMnode->pSdb, SDB_SNODE, &snodeId)) {
108✔
4829
      continue;
105✔
4830
    }
4831

4832
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
3✔
4833
    if (NULL == pSnode->streamTasks) {
3!
4834
      mstDebug("snode %d already cleanup, try to rm it", snodeId);
×
4835
      TAOS_UNUSED(taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId)));
×
4836
      continue;
×
4837
    }
4838
    
4839
    mstWarn("snode %d lost while streams remain, will redeploy all and rm it, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
3!
4840
        snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
4841
    
4842
    msmHandleSnodeLost(pMnode, pSnode);
3✔
4843
  }
4844

4845
  MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
89✔
4846
}
4847

4848
bool msmCheckNeedHealthCheck(SMnode *pMnode) {
19,267✔
4849
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
19,267✔
4850
  if (0 == active || MND_STM_STATE_NORMAL != state) {
19,267!
4851
    mstTrace("ignore health check since active:%d state:%d", active, state);
×
4852
    return false;
×
4853
  }
4854

4855
  if (sdbGetSize(pMnode->pSdb, SDB_STREAM) <= 0) {
19,267✔
4856
    mstTrace("ignore health check since no stream now");
18,301✔
4857
    return false;
18,301✔
4858
  }
4859

4860
  return true;
966✔
4861
}
4862

4863
void msmHealthCheck(SMnode *pMnode) {
18,784✔
4864
  if (!msmCheckNeedHealthCheck(pMnode)) {
18,784✔
4865
    return;
18,301✔
4866
  }
4867

4868
  mstDebug("start wait health check, currentTs:%" PRId64,  taosGetTimestampMs());
884✔
4869
  
4870
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, false);
483✔
4871
  if (!msmCheckNeedHealthCheck(pMnode)) {
483!
4872
    taosWUnLockLatch(&mStreamMgmt.runtimeLock);
×
4873
    return;
×
4874
  }
4875
  
4876
  mStreamMgmt.hCtx.slotIdx = (mStreamMgmt.hCtx.slotIdx + 1) % MND_STREAM_ISOLATION_PERIOD_NUM;
483✔
4877
  mStreamMgmt.hCtx.currentTs = taosGetTimestampMs();
483✔
4878

4879
  mstDebug("start health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
483✔
4880
  
4881
  msmCheckStreamsStatus(pMnode);
483✔
4882
  msmCheckTasksStatus(pMnode);
483✔
4883
  msmCheckSnodesState(pMnode);
483✔
4884

4885
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
483✔
4886

4887
  mstDebug("end health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
483✔
4888
}
4889

4890
static bool msmUpdateProfileStreams(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
4891
  SStreamObj *pStream = pObj;
×
4892
  if (atomic_load_8(&pStream->userDropped) || atomic_load_8(&pStream->userStopped)) {
×
4893
    return true;
×
4894
  }
4895
  
4896
  pStream->updateTime = *(int64_t*)p1;
×
4897
  
4898
  (*(int32_t*)p2)++;
×
4899
  
4900
  return true;
×
4901
}
4902

4903
int32_t msmGetTriggerTaskAddr(SMnode *pMnode, int64_t streamId, SStreamTaskAddr* pAddr) {
×
4904
  int32_t code = 0;
×
4905
  int8_t  stopped = 0;
×
4906
  
4907
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
4908
  
4909
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
4910
  if (NULL == pStatus) {
×
4911
    mstsError("stream not exists in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4912
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4913
    goto _exit;
×
4914
  }
4915

4916
  stopped = atomic_load_8(&pStatus->stopped);
×
4917
  if (stopped) {
×
4918
    mstsError("stream already stopped, stopped:%d", stopped);
×
4919
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4920
    goto _exit;
×
4921
  }
4922

4923
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
×
4924
    pAddr->taskId = pStatus->triggerTask->id.taskId;
×
4925
    pAddr->nodeId = pStatus->triggerTask->id.nodeId;
×
4926
    pAddr->epset = mndGetDnodeEpsetById(pMnode, pAddr->nodeId);
×
4927
    mstsDebug("stream trigger task %" PRIx64 " got with nodeId %d", pAddr->taskId, pAddr->nodeId);
×
4928
    goto _exit;
×
4929
  }
4930

4931
  mstsError("trigger task %p not running, status:%s", pStatus->triggerTask, pStatus->triggerTask ? gStreamStatusStr[pStatus->triggerTask->status] : "unknown");
×
4932
  code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4933

4934
_exit:
×
4935
  
4936
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
4937

4938
  return code;
×
4939
}
4940

4941
int32_t msmInitRuntimeInfo(SMnode *pMnode) {
1,700✔
4942
  int32_t code = TSDB_CODE_SUCCESS;
1,700✔
4943
  int32_t lino = 0;
1,700✔
4944
  int32_t vnodeNum = sdbGetSize(pMnode->pSdb, SDB_VGROUP);
1,700✔
4945
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,700✔
4946
  int32_t dnodeNum = sdbGetSize(pMnode->pSdb, SDB_DNODE);
1,700✔
4947

4948
  MND_STREAM_SET_LAST_TS(STM_EVENT_ACTIVE_BEGIN, taosGetTimestampMs());
3,374✔
4949

4950
  mStreamMgmt.stat.activeTimes++;
1,700✔
4951
  mStreamMgmt.threadNum = tsNumOfMnodeStreamMgmtThreads;
1,700✔
4952
  mStreamMgmt.tCtx = taosMemoryCalloc(mStreamMgmt.threadNum, sizeof(SStmThreadCtx));
1,700!
4953
  if (NULL == mStreamMgmt.tCtx) {
1,700!
4954
    code = terrno;
×
4955
    mstError("failed to initialize the stream runtime tCtx, threadNum:%d, error:%s", mStreamMgmt.threadNum, tstrerror(code));
×
4956
    goto _exit;
×
4957
  }
4958

4959
  mStreamMgmt.actionQ = taosMemoryCalloc(1, sizeof(SStmActionQ));
1,700!
4960
  if (mStreamMgmt.actionQ == NULL) {
1,700!
4961
    code = terrno;
×
4962
    mError("failed to initialize the stream runtime actionQ, error:%s", tstrerror(code));
×
4963
    goto _exit;
×
4964
  }
4965
  
4966
  mStreamMgmt.actionQ->head = taosMemoryCalloc(1, sizeof(SStmQNode));
1,700!
4967
  TSDB_CHECK_NULL(mStreamMgmt.actionQ->head, code, lino, _exit, terrno);
1,700!
4968
  
4969
  mStreamMgmt.actionQ->tail = mStreamMgmt.actionQ->head;
1,700✔
4970
  
4971
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
10,127✔
4972
    SStmThreadCtx* pCtx = mStreamMgmt.tCtx + i;
8,427✔
4973

4974
    for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
50,562✔
4975
      pCtx->deployStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
42,135✔
4976
      if (pCtx->deployStm[m] == NULL) {
42,135!
4977
        code = terrno;
×
4978
        mError("failed to initialize the stream runtime deployStm[%d][%d], error:%s", i, m, tstrerror(code));
×
4979
        goto _exit;
×
4980
      }
4981
      taosHashSetFreeFp(pCtx->deployStm[m], tDeepFreeSStmStreamDeploy);
42,135✔
4982
      
4983
      pCtx->actionStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
42,135✔
4984
      if (pCtx->actionStm[m] == NULL) {
42,135!
4985
        code = terrno;
×
4986
        mError("failed to initialize the stream runtime actionStm[%d][%d], error:%s", i, m, tstrerror(code));
×
4987
        goto _exit;
×
4988
      }
4989
      taosHashSetFreeFp(pCtx->actionStm[m], mstDestroySStmAction);
42,135✔
4990
    }
4991
  }
4992
  
4993
  mStreamMgmt.streamMap = taosHashInit(MND_STREAM_DEFAULT_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,700✔
4994
  if (mStreamMgmt.streamMap == NULL) {
1,700!
4995
    code = terrno;
×
4996
    mError("failed to initialize the stream runtime streamMap, error:%s", tstrerror(code));
×
4997
    goto _exit;
×
4998
  }
4999
  taosHashSetFreeFp(mStreamMgmt.streamMap, mstDestroySStmStatus);
1,700✔
5000
  
5001
  mStreamMgmt.taskMap = taosHashInit(MND_STREAM_DEFAULT_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,700✔
5002
  if (mStreamMgmt.taskMap == NULL) {
1,700!
5003
    code = terrno;
×
5004
    mError("failed to initialize the stream runtime taskMap, error:%s", tstrerror(code));
×
5005
    goto _exit;
×
5006
  }
5007
  
5008
  mStreamMgmt.vgroupMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,700✔
5009
  if (mStreamMgmt.vgroupMap == NULL) {
1,700!
5010
    code = terrno;
×
5011
    mError("failed to initialize the stream runtime vgroupMap, error:%s", tstrerror(code));
×
5012
    goto _exit;
×
5013
  }
5014
  taosHashSetFreeFp(mStreamMgmt.vgroupMap, mstDestroySStmVgroupStatus);
1,700✔
5015

5016
  mStreamMgmt.snodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,700✔
5017
  if (mStreamMgmt.snodeMap == NULL) {
1,700!
5018
    code = terrno;
×
5019
    mError("failed to initialize the stream runtime snodeMap, error:%s", tstrerror(code));
×
5020
    goto _exit;
×
5021
  }
5022
  taosHashSetFreeFp(mStreamMgmt.snodeMap, mstDestroySStmSnodeStatus);
1,700✔
5023
  
5024
  mStreamMgmt.dnodeMap = taosHashInit(dnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,700✔
5025
  if (mStreamMgmt.dnodeMap == NULL) {
1,700!
5026
    code = terrno;
×
5027
    mError("failed to initialize the stream runtime dnodeMap, error:%s", tstrerror(code));
×
5028
    goto _exit;
×
5029
  }
5030

5031
  mStreamMgmt.toDeployVgMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,700✔
5032
  if (mStreamMgmt.toDeployVgMap == NULL) {
1,700!
5033
    code = terrno;
×
5034
    mError("failed to initialize the stream runtime toDeployVgMap, error:%s", tstrerror(code));
×
5035
    goto _exit;
×
5036
  }
5037
  taosHashSetFreeFp(mStreamMgmt.toDeployVgMap, mstDestroySStmVgTasksToDeploy);
1,700✔
5038
  
5039
  mStreamMgmt.toDeploySnodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,700✔
5040
  if (mStreamMgmt.toDeploySnodeMap == NULL) {
1,700!
5041
    code = terrno;
×
5042
    mError("failed to initialize the stream runtime toDeploySnodeMap, error:%s", tstrerror(code));
×
5043
    goto _exit;
×
5044
  }
5045
  taosHashSetFreeFp(mStreamMgmt.toDeploySnodeMap, mstDestroySStmSnodeTasksDeploy);
1,700✔
5046

5047
  mStreamMgmt.toUpdateScanMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,700✔
5048
  if (mStreamMgmt.toUpdateScanMap == NULL) {
1,700!
5049
    code = terrno;
×
5050
    mError("failed to initialize the stream runtime toUpdateScanMap, error:%s", tstrerror(code));
×
5051
    goto _exit;
×
5052
  }
5053
  taosHashSetFreeFp(mStreamMgmt.toUpdateScanMap, mstDestroyScanAddrList);
1,700✔
5054

5055
  TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
1,700!
5056
  TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pMnode));
1,700!
5057

5058
  mStreamMgmt.lastTaskId = 1;
1,700✔
5059

5060
  int32_t activeStreamNum = 0;
1,700✔
5061
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmUpdateProfileStreams, &MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN), &activeStreamNum, NULL);
1,700✔
5062

5063
  if (activeStreamNum > 0) {
1,700!
5064
    msmSetInitRuntimeState(MND_STM_STATE_WATCH);
×
5065
  } else {
5066
    msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
1,700✔
5067
  }
5068

5069
_exit:
1,700✔
5070

5071
  if (code) {
1,700!
5072
    msmDestroyRuntimeInfo(pMnode);
×
5073
    mstError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
5074
  } else {
5075
    mstInfo("mnode stream runtime init done");
1,700!
5076
  }
5077

5078
  return code;
1,700✔
5079
}
5080

5081

5082

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc