• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.77
/source/dnode/mnode/impl/src/mndStreamMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndStb.h"
21
#include "mndTrans.h"
22
#include "osMemory.h"
23
#include "parser.h"
24
#include "taoserror.h"
25
#include "tmisce.h"
26
#include "tname.h"
27
#include "mndDnode.h"
28
#include "mndVgroup.h"
29
#include "mndSnode.h"
30
#include "mndMnode.h"
31

32
void msmDestroyActionQ() {
3,422✔
33
  SStmQNode* pQNode = NULL;
3,422✔
34

35
  if (NULL == mStreamMgmt.actionQ) {
3,422✔
36
    return;
1,711✔
37
  }
38

39
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
1,715✔
40
  }
41

42
  taosMemoryFreeClear(mStreamMgmt.actionQ->head);
1,711!
43
  taosMemoryFreeClear(mStreamMgmt.actionQ);
1,711!
44
}
45

46
void msmDestroySStmThreadCtx(SStmThreadCtx* pCtx) {
8,483✔
47
  for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
50,898✔
48
    taosHashCleanup(pCtx->deployStm[m]);
42,415✔
49
    taosHashCleanup(pCtx->actionStm[m]);
42,415✔
50
  }
51
}
8,483✔
52

53
void msmDestroyThreadCtxs() {
3,422✔
54
  if (NULL == mStreamMgmt.tCtx) {
3,422✔
55
    return;
1,711✔
56
  }
57
  
58
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
10,194✔
59
    msmDestroySStmThreadCtx(mStreamMgmt.tCtx + i);
8,483✔
60
  }
61
  taosMemoryFreeClear(mStreamMgmt.tCtx);
1,711!
62
}
63

64

65
void msmDestroyRuntimeInfo(SMnode *pMnode) {
3,422✔
66
  msmDestroyActionQ();
3,422✔
67
  msmDestroyThreadCtxs();
3,422✔
68

69
  taosHashCleanup(mStreamMgmt.toUpdateScanMap);
3,422✔
70
  mStreamMgmt.toUpdateScanMap = NULL;
3,422✔
71
  mStreamMgmt.toUpdateScanNum = 0;
3,422✔
72
  taosHashCleanup(mStreamMgmt.toDeployVgMap);
3,422✔
73
  mStreamMgmt.toDeployVgMap = NULL;
3,422✔
74
  mStreamMgmt.toDeployVgTaskNum = 0;
3,422✔
75
  taosHashCleanup(mStreamMgmt.toDeploySnodeMap);
3,422✔
76
  mStreamMgmt.toDeploySnodeMap = NULL;
3,422✔
77
  mStreamMgmt.toDeploySnodeTaskNum = 0;
3,422✔
78

79
  taosHashCleanup(mStreamMgmt.dnodeMap);
3,422✔
80
  mStreamMgmt.dnodeMap = NULL;
3,422✔
81
  taosHashCleanup(mStreamMgmt.snodeMap);
3,422✔
82
  mStreamMgmt.snodeMap = NULL;
3,422✔
83
  taosHashCleanup(mStreamMgmt.vgroupMap);
3,422✔
84
  mStreamMgmt.vgroupMap = NULL;
3,422✔
85
  taosHashCleanup(mStreamMgmt.taskMap);
3,422✔
86
  mStreamMgmt.taskMap = NULL;
3,422✔
87
  taosHashCleanup(mStreamMgmt.streamMap);
3,422✔
88
  mStreamMgmt.streamMap = NULL;
3,422✔
89

90
  memset(mStreamMgmt.lastTs, 0, sizeof(mStreamMgmt.lastTs));
3,422✔
91

92
  mstInfo("mnode stream mgmt destroyed");  
3,422!
93
}
3,422✔
94

95

96
void msmStopStreamByError(int64_t streamId, SStmStatus* pStatus, int32_t errCode, int64_t currTs) {
17✔
97
  int32_t code = TSDB_CODE_SUCCESS;
17✔
98
  int32_t lino = 0;
17✔
99
  SStmStatus* pStream = NULL;
17✔
100

101
  mstsInfo("try to stop stream for error: %s", tstrerror(errCode));
17!
102

103
  if (NULL == pStatus) {
17!
104
    pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
17✔
105
    if (NULL == pStream) {
17!
106
      mstsInfo("stream already not in streamMap, error:%s", tstrerror(terrno));
×
107
      goto _exit;
×
108
    }
109

110
    pStatus = pStream;
17✔
111
  }
112

113
  int8_t stopped = atomic_load_8(&pStatus->stopped);
17✔
114
  if (stopped) {
17!
115
    mstsDebug("stream already stopped %d, ignore stop", stopped);
×
116
    goto _exit;
×
117
  }
118

119
  if (pStatus->triggerTask && pStatus->triggerTask->runningStartTs && (currTs - pStatus->triggerTask->runningStartTs) > 2 * MST_ISOLATION_DURATION) {
17!
120
    pStatus->fatalRetryTimes = 0;
12✔
121
    mstsDebug("reset stream retryTimes, running duation:%" PRId64 "ms", currTs - pStatus->triggerTask->runningStartTs);
12✔
122
  }
123

124
  pStatus->fatalRetryTimes++;
17✔
125
  pStatus->fatalError = errCode;
17✔
126
  pStatus->fatalRetryDuration = (pStatus->fatalRetryTimes > 10) ? MST_MAX_RETRY_DURATION : MST_ISOLATION_DURATION;
17!
127
  pStatus->fatalRetryTs = currTs + pStatus->fatalRetryDuration;
17✔
128

129
  pStatus->stat.lastError = errCode;
17✔
130
    
131
  if (0 == atomic_val_compare_exchange_8(&pStatus->stopped, 0, 1)) {
17!
132
    MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, currTs);
17✔
133
  }
134

135
_exit:
7✔
136

137
  taosHashRelease(mStreamMgmt.streamMap, pStream);
17✔
138

139
  if (code) {
17!
140
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
141
  }
142
}
17✔
143

144

145
static void msmSetInitRuntimeState(int8_t state) {
1,711✔
146
  switch (state) {
1,711!
147
    case MND_STM_STATE_WATCH:
×
148
      mStreamMgmt.watch.ending = 0;
×
149
      mStreamMgmt.watch.taskRemains = 0;
×
150
      mStreamMgmt.watch.processing = 0;
×
151
      mstInfo("switch to WATCH state");
×
152
      break;
×
153
    case MND_STM_STATE_NORMAL:
1,711✔
154
      MND_STREAM_SET_LAST_TS(STM_EVENT_NORMAL_BEGIN, taosGetTimestampMs());
3,396✔
155
      mstInfo("switch to NORMAL state");
1,711!
156
      break;
1,711✔
157
    default:
×
158
      return;
×
159
  }
160
  
161
  atomic_store_8(&mStreamMgmt.state, state);
1,711✔
162
}
163

164
void msmSTDeleteSnodeFromMap(int32_t snodeId) {
×
165
  int32_t code = taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
×
166
  if (code) {
×
167
    mstWarn("remove snode %d from snodeMap failed, error:%s", snodeId, tstrerror(code));
×
168
  } else {
169
    mstInfo("snode %d removed from snodeMap", snodeId);
×
170
  }
171
}
×
172

173
static int32_t msmSTAddSnodesToMap(SMnode* pMnode) {
1,803✔
174
  int32_t code = TSDB_CODE_SUCCESS;
1,803✔
175
  int32_t lino = 0;
1,803✔
176
  SStmSnodeStatus tasks = {0};
1,803✔
177
  SSnodeObj *pSnode = NULL;
1,803✔
178
  void *pIter = NULL;
1,803✔
179
  while (1) {
180
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pSnode);
2,023✔
181
    if (pIter == NULL) {
2,023✔
182
      break;
1,803✔
183
    }
184

185
    tasks.lastUpTs = taosGetTimestampMs();
220✔
186
    code = taosHashPut(mStreamMgmt.snodeMap, &pSnode->id, sizeof(pSnode->id), &tasks, sizeof(tasks));
220✔
187
    if (code && TSDB_CODE_DUP_KEY != code) {
220!
188
      sdbRelease(pMnode->pSdb, pSnode);
×
189
      sdbCancelFetch(pMnode->pSdb, pIter);
×
190
      pSnode = NULL;
×
191
      TAOS_CHECK_EXIT(code);
×
192
    }
193

194
    code = TSDB_CODE_SUCCESS;
220✔
195
  
196
    sdbRelease(pMnode->pSdb, pSnode);
220✔
197
  }
198

199
  pSnode = NULL;
1,803✔
200

201
_exit:
1,803✔
202

203
  if (code) {
1,803!
204
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
205
  }
206

207
  return code;
1,803✔
208
}
209

210
static int32_t msmSTAddDnodesToMap(SMnode* pMnode) {
3,426✔
211
  int32_t code = TSDB_CODE_SUCCESS;
3,426✔
212
  int32_t lino = 0;
3,426✔
213
  int64_t lastUpTs = INT64_MIN;
3,426✔
214
  SDnodeObj *pDnode = NULL;
3,426✔
215
  void *pIter = NULL;
3,426✔
216
  while (1) {
217
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
7,221✔
218
    if (pIter == NULL) {
7,221✔
219
      break;
3,426✔
220
    }
221

222
    code = taosHashPut(mStreamMgmt.dnodeMap, &pDnode->id, sizeof(pDnode->id), &lastUpTs, sizeof(lastUpTs));
3,795✔
223
    if (code && TSDB_CODE_DUP_KEY != code) {
3,795!
224
      sdbRelease(pMnode->pSdb, pDnode);
×
225
      sdbCancelFetch(pMnode->pSdb, pIter);
×
226
      pDnode = NULL;
×
227
      TAOS_CHECK_EXIT(code);
×
228
    }
229

230
    code = TSDB_CODE_SUCCESS;
3,795✔
231
    sdbRelease(pMnode->pSdb, pDnode);
3,795✔
232
  }
233

234
  pDnode = NULL;
3,426✔
235

236
_exit:
3,426✔
237

238
  if (code) {
3,426!
239
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
240
  }
241

242
  return code;
3,426✔
243
}
244

245

246

247
static int32_t msmSTAddToTaskMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask) {
1,815✔
248
  int32_t code = TSDB_CODE_SUCCESS;
1,815✔
249
  int32_t lino = 0;
1,815✔
250
  int32_t taskNum = pTask ? 1 : taosArrayGetSize(pTasks);
1,815✔
251
  int64_t key[2] = {streamId, 0};
1,815✔
252
  
253
  for (int32_t i = 0; i < taskNum; ++i) {
3,559✔
254
    SStmTaskStatus* pStatus = pTask ? pTask : taosArrayGet(pTasks, i);
1,744✔
255
    key[1] = pStatus->id.taskId;
1,744✔
256
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.taskMap, key, sizeof(key), &pStatus, POINTER_BYTES));
1,744!
257
    mstsDebug("task %" PRIx64" tidx %d added to taskMap", pStatus->id.taskId, pStatus->id.taskIdx);
1,744✔
258
  }
259
  
260
_exit:
1,815✔
261

262
  if (code) {
1,815!
263
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
264
  }
265
  
266
  return code;
1,815✔
267
}
268

269
static int32_t msmSTAddToVgStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
551✔
270
  int32_t code = TSDB_CODE_SUCCESS;
551✔
271
  int32_t lino = 0;
551✔
272
  SStmVgStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
551✔
273
  if (NULL == pStream) {
551✔
274
    SStmVgStreamStatus stream = {0};
474✔
275
    if (trigReader) {
474✔
276
      stream.trigReaders = taosArrayInit(1, POINTER_BYTES);
396✔
277
      TSDB_CHECK_NULL(stream.trigReaders, code, lino, _exit, terrno);
396!
278
      TSDB_CHECK_NULL(taosArrayPush(stream.trigReaders, &pStatus), code, lino, _exit, terrno);
792!
279
    } else {
280
      stream.calcReaders = taosArrayInit(2, POINTER_BYTES);
78✔
281
      TSDB_CHECK_NULL(stream.calcReaders, code, lino, _exit, terrno);
78!
282
      TSDB_CHECK_NULL(taosArrayPush(stream.calcReaders, &pStatus), code, lino, _exit, terrno);
156!
283
    }
284
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
474!
285
    goto _exit;
474✔
286
  }
287
  
288
  if (trigReader) {
77✔
289
    if (NULL == pStream->trigReaders) {
2!
290
      pStream->trigReaders = taosArrayInit(1, POINTER_BYTES);
2✔
291
      TSDB_CHECK_NULL(pStream->trigReaders, code, lino, _exit, terrno);
2!
292
    }
293
    
294
    TSDB_CHECK_NULL(taosArrayPush(pStream->trigReaders, &pStatus), code, lino, _exit, terrno);
4!
295
    goto _exit;
2✔
296
  }
297
  
298
  if (NULL == pStream->calcReaders) {
75✔
299
    pStream->calcReaders = taosArrayInit(1, POINTER_BYTES);
71✔
300
    TSDB_CHECK_NULL(pStream->calcReaders, code, lino, _exit, terrno);
71!
301
  }
302

303
  TSDB_CHECK_NULL(taosArrayPush(pStream->calcReaders, &pStatus), code, lino, _exit, terrno);
150!
304

305
_exit:
75✔
306

307
  if (code) {
551!
308
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to vgroup %d streamHash in %s at line %d, error:%s", 
×
309
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, __FUNCTION__, lino, tstrerror(code));
310
  } else {
311
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to vgroup %d streamHash", 
551✔
312
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId);
313
  }
314

315
  return code;
551✔
316
}
317

318
static int32_t msmSTAddToVgroupMapImpl(int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
551✔
319
  int32_t code = TSDB_CODE_SUCCESS;
551✔
320
  int32_t lino = 0;
551✔
321
  SStmVgroupStatus vg = {0};
551✔
322

323
  SStmVgroupStatus* pVg = taosHashGet(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
551✔
324
  if (NULL == pVg) {
551✔
325
    vg.streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
186✔
326
    TSDB_CHECK_NULL(vg.streamTasks, code, lino, _exit, terrno);
186!
327
    taosHashSetFreeFp(vg.streamTasks, mstDestroySStmVgStreamStatus);
186✔
328

329
    vg.lastUpTs = taosGetTimestampMs();
186✔
330
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(vg.streamTasks, streamId, pStatus, trigReader));
186!
331
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId), &vg, sizeof(vg)));
186!
332
  } else {
333
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(pVg->streamTasks, streamId, pStatus, trigReader));
365!
334
  }
335
  
336
_exit:
365✔
337

338
  if (code) {
551!
339
    mstDestroyVgroupStatus(&vg);
×
340
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
341
  } else {
342
    mstsDebug("task %" PRIx64 " tidx %d added to vgroupMap %d", pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
551✔
343
  }
344

345
  return code;
551✔
346
}
347

348
static int32_t msmTDAddToVgroupMap(SHashObj* pVgMap, SStmTaskDeploy* pDeploy, int64_t streamId) {
589✔
349
  int32_t code = TSDB_CODE_SUCCESS;
589✔
350
  int32_t lino = 0;
589✔
351
  SStmVgTasksToDeploy vg = {0};
589✔
352
  SStreamTask* pTask = &pDeploy->task;
589✔
353
  SStmTaskToDeployExt ext = {0};
589✔
354
  ext.deploy = *pDeploy;
589✔
355

356
  while (true) {
×
357
    SStmVgTasksToDeploy* pVg = taosHashAcquire(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
589✔
358
    if (NULL == pVg) {
589✔
359
      vg.taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
231✔
360
      TSDB_CHECK_NULL(vg.taskList, code, lino, _return, terrno);
231!
361
      TSDB_CHECK_NULL(taosArrayPush(vg.taskList, &ext), code, lino, _return, terrno);
462!
362
      code = taosHashPut(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &vg, sizeof(SStmVgTasksToDeploy));
231✔
363
      if (TSDB_CODE_SUCCESS == code) {
231!
364
        goto _return;
231✔
365
      }
366

367
      if (TSDB_CODE_DUP_KEY != code) {
×
368
        goto _return;
×
369
      }    
370

371
      taosArrayDestroy(vg.taskList);
×
372
      continue;
×
373
    }
374

375
    taosWLockLatch(&pVg->lock);
358✔
376
    if (NULL == pVg->taskList) {
358!
377
      pVg->taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
×
378
      TSDB_CHECK_NULL(pVg->taskList, code, lino, _return, terrno);
×
379
    }
380
    if (NULL == taosArrayPush(pVg->taskList, &ext)) {
716!
381
      taosWUnLockLatch(&pVg->lock);
×
382
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
383
    }
384
    taosWUnLockLatch(&pVg->lock);
358✔
385
    
386
    taosHashRelease(pVgMap, pVg);
358✔
387
    break;
358✔
388
  }
389
  
390
_return:
589✔
391

392
  if (code) {
589!
393
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
394
  } else {
395
    int32_t num = atomic_add_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
589✔
396
    msttDebug("task added to toDeployVgTaskNum, vgToDeployTaskNum:%d", num);
589✔
397
  }
398

399
  return code;
589✔
400
}
401

402

403
static int32_t msmSTAddToSnodeStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
1,200✔
404
  int32_t code = TSDB_CODE_SUCCESS;
1,200✔
405
  int32_t lino = 0;
1,200✔
406
  SStmSnodeStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
1,200✔
407
  if (NULL == pStream) {
1,200✔
408
    SStmSnodeStreamStatus stream = {0};
324✔
409
    if (deployId < 0) {
324✔
410
      stream.trigger = pStatus;
1✔
411
    } else {
412
      stream.runners[deployId] = taosArrayInit(2, POINTER_BYTES);
323✔
413
      TSDB_CHECK_NULL(stream.runners[deployId], code, lino, _exit, terrno);
323!
414
      TSDB_CHECK_NULL(taosArrayPush(stream.runners[deployId], &pStatus), code, lino, _exit, terrno);
646!
415
    }
416
    
417
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
324!
418
    goto _exit;
324✔
419
  }
420
  
421
  if (deployId < 0) {
876✔
422
    if (NULL != pStream->trigger) {
298!
423
      mstsWarn("stream already got trigger task %" PRIx64 " SID:%" PRIx64 " in snode %d, replace it with task %" PRIx64 " SID:%" PRIx64, 
×
424
          pStream->trigger->id.taskId, pStream->trigger->id.seriousId, pStatus->id.nodeId, pStatus->id.taskId, pStatus->id.seriousId);
425
    }
426
    
427
    pStream->trigger = pStatus;
298✔
428
    goto _exit;
298✔
429
  }
430
  
431
  if (NULL == pStream->runners[deployId]) {
578!
432
    pStream->runners[deployId] = taosArrayInit(2, POINTER_BYTES);
578✔
433
    TSDB_CHECK_NULL(pStream->runners[deployId], code, lino, _exit, terrno);
578!
434
  }
435

436
  TSDB_CHECK_NULL(taosArrayPush(pStream->runners[deployId], &pStatus), code, lino, _exit, terrno);
1,156!
437

438
_exit:
578✔
439

440
  if (code) {
1,200!
441
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to snode %d streamHash deployId:%d in %s at line %d, error:%s", 
×
442
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId, __FUNCTION__, lino, tstrerror(code));
443
  } else {
444
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to snode %d streamHash deployId:%d", 
1,200✔
445
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId);
446
  }
447

448
  return code;
1,200✔
449
}
450

451

452
static int32_t msmSTAddToSnodeMapImpl(int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
1,200✔
453
  int32_t code = TSDB_CODE_SUCCESS;
1,200✔
454
  int32_t lino = 0;
1,200✔
455

456
  SStmSnodeStatus* pSnode = taosHashGet(mStreamMgmt.snodeMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
1,200✔
457
  if (NULL == pSnode) {
1,200!
458
    mstsWarn("snode %d not exists in snodeMap anymore, may be dropped", pStatus->id.nodeId);
×
459
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
460
  } else {
461
    if (NULL == pSnode->streamTasks) {
1,200✔
462
      pSnode->streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
55✔
463
      TSDB_CHECK_NULL(pSnode->streamTasks, code, lino, _exit, terrno);
55!
464
      taosHashSetFreeFp(pSnode->streamTasks, mstDestroySStmSnodeStreamStatus);
55✔
465
    }
466
    
467
    TAOS_CHECK_EXIT(msmSTAddToSnodeStreamHash(pSnode->streamTasks, streamId, pStatus, deployId));
1,200!
468
  }
469
  
470
_exit:
1,200✔
471

472
  if (code) {
1,200!
473
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
474
  } else {
475
    mstsDebug("%s task %" PRIx64 " tidx %d added to snodeMap, snodeId:%d", (deployId < 0) ? "trigger" : "runner", 
1,200✔
476
        pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
477
  }
478

479
  return code;
1,200✔
480
}
481

482

483

484
static int32_t msmTDAddTriggerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
299✔
485
  int32_t code = TSDB_CODE_SUCCESS;
299✔
486
  int32_t lino = 0;
299✔
487
  int64_t streamId = pStream->pCreate->streamId;
299✔
488
  SStmSnodeTasksDeploy snode = {0};
299✔
489
  SStmTaskToDeployExt ext;
490
  SStreamTask* pTask = &pDeploy->task;
299✔
491

492
  while (true) {
×
493
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
299✔
494
    if (NULL == pSnode) {
299✔
495
      snode.triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
1✔
496
      TSDB_CHECK_NULL(snode.triggerList, code, lino, _return, terrno);
1!
497

498
      ext.deploy = *pDeploy;
1✔
499
      ext.deployed = false;
1✔
500
      TSDB_CHECK_NULL(taosArrayPush(snode.triggerList, &ext), code, lino, _return, terrno);
2!
501

502
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
1✔
503
      if (TSDB_CODE_SUCCESS == code) {
1!
504
        goto _return;
1✔
505
      }
506

507
      if (TSDB_CODE_DUP_KEY != code) {
×
508
        goto _return;
×
509
      }    
510

511
      taosArrayDestroy(snode.triggerList);
×
512
      continue;
×
513
    }
514
    
515
    taosWLockLatch(&pSnode->lock);
298✔
516
    if (NULL == pSnode->triggerList) {
298✔
517
      pSnode->triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
80✔
518
      if (NULL == pSnode->triggerList) {
80!
519
        taosWUnLockLatch(&pSnode->lock);
×
520
        TSDB_CHECK_NULL(pSnode->triggerList, code, lino, _return, terrno);
×
521
      }
522
    }
523
    
524
    ext.deploy = *pDeploy;
298✔
525
    ext.deployed = false;
298✔
526
    
527
    if (NULL == taosArrayPush(pSnode->triggerList, &ext)) {
596!
528
      taosWUnLockLatch(&pSnode->lock);
×
529
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
530
    }
531
    taosWUnLockLatch(&pSnode->lock);
298✔
532
    
533
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
298✔
534
    break;
298✔
535
  }
536
  
537
_return:
299✔
538

539
  if (code) {
299!
540
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
541
  } else {
542
    msttDebug("trigger task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
299✔
543
  }
544

545
  return code;
299✔
546
}
547

548
static int32_t msmTDAddRunnerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
901✔
549
  int32_t code = TSDB_CODE_SUCCESS;
901✔
550
  int32_t lino = 0;
901✔
551
  int64_t streamId = pStream->pCreate->streamId;
901✔
552
  SStmSnodeTasksDeploy snode = {0};
901✔
553
  SStmTaskToDeployExt ext;
554
  SStreamTask* pTask = &pDeploy->task;
901✔
555

556
  while (true) {
×
557
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
901✔
558
    if (NULL == pSnode) {
901✔
559
      snode.runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
96✔
560
      TSDB_CHECK_NULL(snode.runnerList, code, lino, _return, terrno);
96!
561

562
      ext.deploy = *pDeploy;
96✔
563
      ext.deployed = false;
96✔
564
      TSDB_CHECK_NULL(taosArrayPush(snode.runnerList, &ext), code, lino, _return, terrno);
192!
565

566
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
96✔
567
      if (TSDB_CODE_SUCCESS == code) {
96!
568
        goto _return;
96✔
569
      }
570

571
      if (TSDB_CODE_DUP_KEY != code) {
×
572
        goto _return;
×
573
      }    
574

575
      taosArrayDestroy(snode.runnerList);
×
576
      continue;
×
577
    }
578
    
579
    taosWLockLatch(&pSnode->lock);
805✔
580
    if (NULL == pSnode->runnerList) {
805✔
581
      pSnode->runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
1✔
582
      if (NULL == pSnode->runnerList) {
1!
583
        taosWUnLockLatch(&pSnode->lock);
×
584
        TSDB_CHECK_NULL(pSnode->runnerList, code, lino, _return, terrno);
×
585
      }
586
    }
587
    
588
    ext.deploy = *pDeploy;
805✔
589
    ext.deployed = false;
805✔
590
    
591
    if (NULL == taosArrayPush(pSnode->runnerList, &ext)) {
1,610!
592
      taosWUnLockLatch(&pSnode->lock);
×
593
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
594
    }
595
    taosWUnLockLatch(&pSnode->lock);
805✔
596
    
597
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
805✔
598
    break;
805✔
599
  }
600
  
601
_return:
901✔
602

603
  if (code) {
901!
604
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
605
  } else {
606
    msttDebug("task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
901✔
607
  }
608

609
  return code;
901✔
610
}
611

612
static int32_t msmTDAddRunnersToSnodeMap(SArray* runnerList, SStreamObj* pStream) {
901✔
613
  int32_t code = TSDB_CODE_SUCCESS;
901✔
614
  int32_t lino = 0;
901✔
615
  int32_t runnerNum = taosArrayGetSize(runnerList);
901✔
616
  SStmTaskDeploy* pDeploy = NULL;
901✔
617
  int64_t streamId = pStream->pCreate->streamId;
901✔
618

619
  for (int32_t i = 0; i < runnerNum; ++i) {
1,802✔
620
    pDeploy = taosArrayGet(runnerList, i);
901✔
621
    
622
    TAOS_CHECK_EXIT(msmTDAddRunnerToSnodeMap(pDeploy, pStream));
901!
623
    
624
    (void)atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);    
901✔
625
  }
626

627
_exit:
901✔
628

629
  if (code) {
901!
630
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
631
  }
632

633
  return code;
901✔
634
}
635

636

637
int32_t msmUpdateSnodeUpTs(SStmGrpCtx* pCtx) {
1,833✔
638
  int32_t  code = TSDB_CODE_SUCCESS;
1,833✔
639
  int32_t  lino = 0;
1,833✔
640
  SStmSnodeStatus* pStatus = NULL;
1,833✔
641
  bool     noExists = false;
1,833✔
642

643
  while (true) {
644
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &pCtx->pReq->snodeId, sizeof(pCtx->pReq->snodeId));
1,856✔
645
    if (NULL == pStatus) {
1,856✔
646
      if (noExists) {
23!
647
        mstWarn("snode %d not exists in snodeMap, may be dropped, ignore it", pCtx->pReq->snodeId);
×
648
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
×
649
      }
650

651
      noExists = true;
23✔
652
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pCtx->pMnode));
23!
653
      
654
      continue;
23✔
655
    }
656

657
    atomic_store_32(&pStatus->runnerThreadNum, pCtx->pReq->runnerThreadNum);
1,833✔
658
    
659
    while (true) {
×
660
      int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
1,833✔
661
      if (pCtx->currTs > lastTsValue) {
1,833✔
662
        if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
1,810!
663
          mstDebug("snode %d lastUpTs updated", pCtx->pReq->snodeId);
1,810✔
664
          return code;
1,810✔
665
        }
666

667
        continue;
×
668
      }
669

670
      return code;
23✔
671
    }
672

673
    break;
674
  }
675

676
_exit:
×
677

678
  if (code) {
×
679
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
680
  }
681

682
  return code;  
×
683
}
684

685
void msmUpdateVgroupUpTs(SStmGrpCtx* pCtx, int32_t vgId) {
87,214✔
686
  int32_t  code = TSDB_CODE_SUCCESS;
87,214✔
687
  int32_t  lino = 0;
87,214✔
688
  SStmVgroupStatus* pStatus = taosHashGet(mStreamMgmt.vgroupMap, &vgId, sizeof(vgId));
87,214✔
689
  if (NULL == pStatus) {
87,214✔
690
    mstDebug("vgroup %d not exists in vgroupMap, ignore update upTs", vgId);
80,710✔
691
    return;
80,710✔
692
  }
693

694
  while (true) {
×
695
    int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
6,504✔
696
    if (pCtx->currTs > lastTsValue) {
6,504✔
697
      if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
6,318!
698
        mstDebug("vgroup %d lastUpTs updated to %" PRId64, vgId, pCtx->currTs);
6,318✔
699
        return;
6,318✔
700
      }
701

702
      continue;
×
703
    }
704

705
    return;
186✔
706
  }  
707
}
708

709
int32_t msmUpdateVgroupsUpTs(SStmGrpCtx* pCtx) {
31,627✔
710
  int32_t code = TSDB_CODE_SUCCESS;
31,627✔
711
  int32_t lino = 0;
31,627✔
712
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
31,627✔
713

714
  mstDebug("start to update vgroups upTs");
31,627✔
715
  
716
  for (int32_t i = 0; i < vgNum; ++i) {
118,394✔
717
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
86,767✔
718

719
    msmUpdateVgroupUpTs(pCtx, *vgId);
86,767✔
720
  }
721

722
_exit:
31,627✔
723

724
  if (code) {
31,627!
725
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
726
  }
727

728
  return code;
31,627✔
729
}
730

731

732

733
void* msmSearchCalcCacheScanPlan(SArray* pList) {
485✔
734
  int32_t num = taosArrayGetSize(pList);
485✔
735
  for (int32_t i = 0; i < num; ++i) {
695✔
736
    SStreamCalcScan* pScan = taosArrayGet(pList, i);
532✔
737
    if (pScan->readFromCache) {
532✔
738
      return pScan->scanPlan;
322✔
739
    }
740
  }
741

742
  return NULL;
163✔
743
}
744

745
int32_t msmBuildReaderDeployInfo(SStmTaskDeploy* pDeploy, void* calcScanPlan, SStmStatus* pInfo, bool triggerReader) {
589✔
746
  SStreamReaderDeployMsg* pMsg = &pDeploy->msg.reader;
589✔
747
  pMsg->triggerReader = triggerReader;
589✔
748
  
749
  if (triggerReader) {
589✔
750
    SStreamReaderDeployFromTrigger* pTrigger = &pMsg->msg.trigger;
417✔
751
    pTrigger->triggerTblName = pInfo->pCreate->triggerTblName;
417✔
752
    pTrigger->triggerTblUid = pInfo->pCreate->triggerTblUid;
417✔
753
    pTrigger->triggerTblType = pInfo->pCreate->triggerTblType;
417✔
754
    pTrigger->deleteReCalc = pInfo->pCreate->deleteReCalc;
417✔
755
    pTrigger->deleteOutTbl = pInfo->pCreate->deleteOutTbl;
417✔
756
    pTrigger->partitionCols = pInfo->pCreate->partitionCols;
417✔
757
    pTrigger->triggerCols = pInfo->pCreate->triggerCols;
417✔
758
    //pTrigger->triggerPrevFilter = pStream->pCreate->triggerPrevFilter;
759
    pTrigger->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
417✔
760
    pTrigger->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
417✔
761
  } else {
762
    SStreamReaderDeployFromCalc* pCalc = &pMsg->msg.calc;
172✔
763
    pCalc->execReplica = pInfo->runnerDeploys * pInfo->runnerReplica;
172✔
764
    pCalc->calcScanPlan = calcScanPlan;
172✔
765
  }
766

767
  return TSDB_CODE_SUCCESS;
589✔
768
}
769

770
int32_t msmBuildTriggerRunnerTargets(SMnode* pMnode, SStmStatus* pInfo, int64_t streamId, SArray** ppRes) {
298✔
771
  int32_t code = TSDB_CODE_SUCCESS;
298✔
772
  int32_t lino = 0;
298✔
773

774
  if (pInfo->runnerDeploys > 0) {
298!
775
    *ppRes = taosArrayInit(pInfo->runnerDeploys, sizeof(SStreamRunnerTarget));
298✔
776
    TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
298!
777
  }
778
  
779
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,192✔
780
    SStmTaskStatus* pStatus = taosArrayGetLast(pInfo->runners[i]);
894✔
781
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
894!
782

783
    if (!STREAM_IS_TOP_RUNNER(pStatus->flags)) {
894!
784
      mstsError("the last runner task %" PRIx64 " SID:%" PRId64 " tidx:%d in deploy %d is not top runner", 
×
785
          pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.taskIdx, i);
786
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
787
    }
788
    
789
    SStreamRunnerTarget runner;
790
    runner.addr.taskId = pStatus->id.taskId;
894✔
791
    runner.addr.nodeId = pStatus->id.nodeId;
894✔
792
    runner.addr.epset = mndGetDnodeEpsetById(pMnode, pStatus->id.nodeId);
894✔
793
    runner.execReplica = pInfo->runnerReplica; 
894✔
794
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &runner), code, lino, _exit, terrno);
1,788!
795
    mstsDebug("the %dth runner target added to trigger's runnerList, TASK:%" PRIx64 , i, runner.addr.taskId);
894✔
796
  }
797

798
_exit:
298✔
799

800
  if (code) {
298!
801
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
802
  }
803
  
804
  return TSDB_CODE_SUCCESS;
298✔
805
}
806

807
int32_t msmBuildStreamSnodeInfo(SMnode* pMnode, SStreamObj* pStream, SStreamSnodeInfo* pInfo) {
×
808
  int64_t streamId = pStream->pCreate->streamId;
×
809
  int32_t leaderSnodeId = atomic_load_32(&pStream->mainSnodeId);
×
810
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, leaderSnodeId);
×
811
  if (NULL == pSnode) {
×
812
    mstsError("snode %d not longer exists, ignore build stream snode info", leaderSnodeId);
×
813
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
814
  }
815
  
816
  pInfo->leaderSnodeId = leaderSnodeId;
×
817
  pInfo->replicaSnodeId = pSnode->replicaId;
×
818

819
  mndReleaseSnode(pMnode, pSnode);
×
820

821
  pInfo->leaderEpSet = mndGetDnodeEpsetById(pMnode, pInfo->leaderSnodeId);
×
822
  if (GOT_SNODE(pInfo->replicaSnodeId)) {
×
823
    pInfo->replicaEpSet = mndGetDnodeEpsetById(pMnode, pInfo->replicaSnodeId);
×
824
  }
825

826
  return TSDB_CODE_SUCCESS;
×
827
}
828

829
int32_t msmBuildTriggerDeployInfo(SMnode* pMnode, SStmStatus* pInfo, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
299✔
830
  int32_t code = TSDB_CODE_SUCCESS;
299✔
831
  int32_t lino = 0;
299✔
832
  int64_t streamId = pStream->pCreate->streamId;
299✔
833
  SStreamTriggerDeployMsg* pMsg = &pDeploy->msg.trigger;
299✔
834
  
835
  pMsg->triggerType = pStream->pCreate->triggerType;
299✔
836
  pMsg->igDisorder = pStream->pCreate->igDisorder;
299✔
837
  pMsg->fillHistory = pStream->pCreate->fillHistory;
299✔
838
  pMsg->fillHistoryFirst = pStream->pCreate->fillHistoryFirst;
299✔
839
  pMsg->lowLatencyCalc = pStream->pCreate->lowLatencyCalc;
299✔
840
  pMsg->igNoDataTrigger = pStream->pCreate->igNoDataTrigger;
299✔
841
  pMsg->hasPartitionBy = (pStream->pCreate->partitionCols != NULL);
299✔
842
  pMsg->isTriggerTblVirt = STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags);
299✔
843
  pMsg->triggerHasPF = pStream->pCreate->triggerHasPF;
299✔
844

845
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
299✔
846
  pMsg->notifyEventTypes = pStream->pCreate->notifyEventTypes;
299✔
847
  pMsg->notifyErrorHandle = pStream->pCreate->notifyErrorHandle;
299✔
848
  pMsg->notifyHistory = pStream->pCreate->notifyHistory;
299✔
849

850
  pMsg->maxDelay = pStream->pCreate->maxDelay;
299✔
851
  pMsg->fillHistoryStartTime = pStream->pCreate->fillHistoryStartTime;
299✔
852
  pMsg->watermark = pStream->pCreate->watermark;
299✔
853
  pMsg->expiredTime = pStream->pCreate->expiredTime;
299✔
854
  pMsg->trigger = pInfo->pCreate->trigger;
299✔
855

856
  pMsg->eventTypes = pStream->pCreate->eventTypes;
299✔
857
  pMsg->placeHolderBitmap = pStream->pCreate->placeHolderBitmap;
299✔
858
  pMsg->calcTsSlotId = pStream->pCreate->calcTsSlotId;
299✔
859
  pMsg->triTsSlotId = pStream->pCreate->triTsSlotId;
299✔
860
  pMsg->triggerPrevFilter = pInfo->pCreate->triggerPrevFilter;
299✔
861
  if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
299✔
862
    pMsg->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
68✔
863
    pMsg->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
68✔
864
  }
865

866
  SStreamTaskAddr addr;
867
  int32_t triggerReaderNum = taosArrayGetSize(pInfo->trigReaders);
299✔
868
  if (triggerReaderNum > 0) {
299✔
869
    pMsg->readerList = taosArrayInit(triggerReaderNum, sizeof(SStreamTaskAddr));
298✔
870
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
298!
871
  }
872
  
873
  for (int32_t i = 0; i < triggerReaderNum; ++i) {
673✔
874
    SStmTaskStatus* pStatus = taosArrayGet(pInfo->trigReaders, i);
374✔
875
    addr.taskId = pStatus->id.taskId;
374✔
876
    addr.nodeId = pStatus->id.nodeId;
374✔
877
    addr.epset = mndGetVgroupEpsetById(pMnode, pStatus->id.nodeId);
374✔
878
    TSDB_CHECK_NULL(taosArrayPush(pMsg->readerList, &addr), code, lino, _exit, terrno);
748!
879
    mstsDebug("the %dth trigReader src added to trigger's readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
374✔
880
  }
881

882
  pMsg->leaderSnodeId = pStream->mainSnodeId;
299✔
883
  pMsg->streamName = pInfo->streamName;
299✔
884

885
  if (0 == pInfo->runnerNum) {
299✔
886
    mstsDebug("no runner task, skip set trigger's runner list, deployNum:%d", pInfo->runnerDeploys);
1!
887
    return code;
1✔
888
  }
889

890
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pInfo, streamId, &pMsg->runnerList));
298!
891

892
_exit:
298✔
893

894
  if (code) {
298!
895
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
896
  } else {
897
    mstsDebug("trigger deploy info built, readerNum:%d, runnerNum:%d", (int32_t)taosArrayGetSize(pMsg->readerList), (int32_t)taosArrayGetSize(pMsg->runnerList));
298✔
898
  }
899
  
900
  return TSDB_CODE_SUCCESS;
298✔
901
}
902

903

904
int32_t msmBuildRunnerDeployInfo(SStmTaskDeploy* pDeploy, SSubplan *plan, SStreamObj* pStream, SStmStatus* pInfo, bool topPlan) {
901✔
905
  int32_t code = TSDB_CODE_SUCCESS;
901✔
906
  int32_t lino = 0;
901✔
907
  int64_t streamId = pStream->pCreate->streamId;
901✔
908
  SStreamRunnerDeployMsg* pMsg = &pDeploy->msg.runner;
901✔
909
  //TAOS_CHECK_EXIT(qSubPlanToString(plan, &pMsg->pPlan, NULL));
910

911
  pMsg->execReplica = pInfo->runnerReplica;
901✔
912
  pMsg->streamName = pInfo->streamName;
901✔
913
  //TAOS_CHECK_EXIT(nodesCloneNode((SNode*)plan, (SNode**)&pMsg->pPlan));
914
  pMsg->pPlan = plan;
901✔
915
  pMsg->outDBFName = pInfo->pCreate->outDB;
901✔
916
  pMsg->outTblName = pInfo->pCreate->outTblName;
901✔
917
  pMsg->outTblType = pStream->pCreate->outTblType;
901✔
918
  pMsg->calcNotifyOnly = pStream->pCreate->calcNotifyOnly;
901✔
919
  pMsg->topPlan = topPlan;
901✔
920
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
901✔
921
  pMsg->notifyErrorHandle = pStream->pCreate->notifyErrorHandle;
901✔
922
  pMsg->outCols = pInfo->pCreate->outCols;
901✔
923
  pMsg->outTags = pInfo->pCreate->outTags;
901✔
924
  pMsg->outStbUid = pStream->pCreate->outStbUid;
901✔
925
  pMsg->outStbSversion = pStream->pCreate->outStbSversion;
901✔
926
  
927
  pMsg->subTblNameExpr = pInfo->pCreate->subTblNameExpr;
901✔
928
  pMsg->tagValueExpr = pInfo->pCreate->tagValueExpr;
901✔
929
  pMsg->forceOutCols = pInfo->pCreate->forceOutCols;
901✔
930

931
_exit:
901✔
932

933
  if (code) {
901!
934
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
935
  }
936
  
937
  return code;
901✔
938
}
939

940

941
static int32_t msmSTAddToVgroupMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, bool trigReader) {
622✔
942
  int32_t code = TSDB_CODE_SUCCESS;
622✔
943
  int32_t lino = 0;
622✔
944
  int32_t taskNum = pTask ? 1 : taosArrayGetSize(pTasks);
622✔
945
  
946
  for (int32_t i = 0; i < taskNum; ++i) {
1,173✔
947
    SStmTaskStatus* pStatus = pTask ? pTask : taosArrayGet(pTasks, i);
551✔
948
    TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pStatus, trigReader));
551!
949
  }
950
  
951
_exit:
622✔
952

953
  if (code) {
622!
954
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
955
  }
956
  
957
  return code;
622✔
958
}
959

960

961
static int32_t msmSTAddToSnodeMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, int32_t taskNum, int32_t deployId) {
1,200✔
962
  int32_t code = TSDB_CODE_SUCCESS;
1,200✔
963
  int32_t lino = 0;
1,200✔
964
  int32_t rtaskNum = (taskNum > 0) ? taskNum : taosArrayGetSize(pTasks);
1,200✔
965
  int32_t taskType = (deployId < 0) ? STREAM_TRIGGER_TASK : STREAM_RUNNER_TASK;
1,200✔
966
  
967
  for (int32_t i = 0; i < rtaskNum; ++i) {
2,400✔
968
    SStmTaskStatus* pStatus = (taskNum > 0) ? (pTask + i) : taosArrayGet(pTasks, i);
1,200✔
969
    TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pStatus, deployId));
1,200!
970
  }
971
  
972
_exit:
1,200✔
973

974
  if (code) {
1,200!
975
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
976
  }
977

978
  return code;
1,200✔
979
}
980

981
int64_t msmAssignTaskId(void) {
1,744✔
982
  return atomic_fetch_add_64(&mStreamMgmt.lastTaskId, 1);
1,744✔
983
}
984

985
int64_t msmAssignTaskSeriousId(void) {
1,744✔
986
  return taosGetTimestampNs();
1,744✔
987
}
988

989

990
int32_t msmIsSnodeAlive(SMnode* pMnode, int32_t snodeId, int64_t streamId, bool* alive) {
1,612✔
991
  int32_t code = TSDB_CODE_SUCCESS;
1,612✔
992
  int32_t lino = 0;
1,612✔
993
  bool     noExists = false;
1,612✔
994
  SStmSnodeStatus* pStatus = NULL;
1,612✔
995

996
  while (true) {
997
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
1,645✔
998
    if (NULL == pStatus) {
1,645✔
999
      if (noExists) {
33!
1000
        mstsError("snode %d not exists in snodeMap", snodeId);
×
1001
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1002
      }
1003

1004
      noExists = true;
33✔
1005
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
33!
1006
      
1007
      continue;
33✔
1008
    }
1009

1010
    *alive = (pStatus->runnerThreadNum >= 0);
1,612✔
1011
    break;
1,612✔
1012
  }
1013

1014
_exit:
1,612✔
1015

1016
  if (code) {
1,612!
1017
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1018
  }
1019

1020
  return code;
1,612✔
1021
}
1022

1023
int32_t msmRetrieveStaticSnodeId(SMnode* pMnode, SStreamObj* pStream) {
604✔
1024
  int32_t code = TSDB_CODE_SUCCESS;
604✔
1025
  int32_t lino = 0;
604✔
1026
  bool alive = false;
604✔
1027
  int32_t mainSnodeId = atomic_load_32(&pStream->mainSnodeId);
604✔
1028
  int32_t snodeId = mainSnodeId;
604✔
1029
  int64_t streamId = pStream->pCreate->streamId;
604✔
1030
  
1031
  while (true) {
1032
    TAOS_CHECK_EXIT(msmIsSnodeAlive(pMnode, snodeId, streamId, &alive));
604!
1033

1034
    if (alive) {
604!
1035
      return snodeId;
604✔
1036
    }
1037
    
1038
    if (snodeId == mainSnodeId) {
×
1039
      SSnodeObj* pSnode = mndAcquireSnode(pMnode, snodeId);
×
1040
      if (NULL == pSnode) {
×
1041
        stsWarn("snode %d not longer exists, ignore assign snode", snodeId);
×
1042
        return 0;
×
1043
      }
1044
      
1045
      if (pSnode->replicaId <= 0) {
×
1046
        mstsError("no available snode now, mainSnodeId:%d, replicaId:%d", mainSnodeId, pSnode->replicaId);
×
1047
        mndReleaseSnode(pMnode, pSnode);
×
1048
        return 0;
×
1049
      }
1050

1051
      snodeId = pSnode->replicaId;
×
1052
      mndReleaseSnode(pMnode, pSnode);
×
1053
      
1054
      continue;
×
1055
    }
1056

1057
    mstsError("no available snode now, mainSnodeId:%d, followerSnodeId:%d", mainSnodeId, snodeId);
×
1058
    return 0;
×
1059
  }
1060

1061
_exit:
×
1062

1063
  if (code) {
×
1064
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1065
  }
1066

1067
  return 0;
×
1068
}
1069

1070
int32_t msmAssignRandomSnodeId(SMnode* pMnode, int64_t streamId) {
931✔
1071
  int32_t code = TSDB_CODE_SUCCESS;
931✔
1072
  int32_t lino = 0;
931✔
1073
  int32_t snodeIdx = 0;
931✔
1074
  int32_t snodeId = 0;
931✔
1075
  void      *pIter = NULL;
931✔
1076
  SSnodeObj *pObj = NULL;
931✔
1077
  bool alive = false;
931✔
1078
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
931✔
1079
  if (snodeNum <= 0) {
931✔
1080
    mstsInfo("no available snode now, num:%d", snodeNum);
10!
1081
    goto _exit;
10✔
1082
  }
1083

1084
  int32_t snodeTarget = taosRand() % snodeNum;
921✔
1085

1086
  while (1) {
1087
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
1,008✔
1088
    if (pIter == NULL) {
1,008!
1089
      if (0 == snodeId) {
×
1090
        mstsError("no alive snode now, snodeNum:%d", snodeNum);
×
1091
        break;
×
1092
      }
1093
      
1094
      snodeId = 0;
×
1095
      continue;
×
1096
    }
1097

1098
    code = msmIsSnodeAlive(pMnode, pObj->id, streamId, &alive);
1,008✔
1099
    if (code) {
1,008!
1100
      sdbRelease(pMnode->pSdb, pObj);
×
1101
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1102
      pObj = NULL;
×
1103
      TAOS_CHECK_EXIT(code);
×
1104
    }
1105
    
1106
    if (!alive) {
1,008!
1107
      sdbRelease(pMnode->pSdb, pObj);
×
1108
      continue;
×
1109
    }
1110

1111
    snodeId = pObj->id;
1,008✔
1112
    if (snodeIdx == snodeTarget) {
1,008✔
1113
      sdbRelease(pMnode->pSdb, pObj);
921✔
1114
      sdbCancelFetch(pMnode->pSdb, pIter);
921✔
1115
      pObj = NULL;
921✔
1116
      goto _exit;
921✔
1117
    }
1118

1119
    sdbRelease(pMnode->pSdb, pObj);
87✔
1120
    snodeIdx++;
87✔
1121
  }
1122

1123
_exit:
931✔
1124

1125
  if (code) {
931!
1126
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1127
  }
1128

1129
  if (0 == snodeId) {
931✔
1130
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
10✔
1131
  }
1132

1133
  return snodeId;
931✔
1134
}
1135

1136
int32_t msmAssignTaskSnodeId(SMnode* pMnode, SStreamObj* pStream, bool isStatic) {
1,200✔
1137
  int64_t streamId = pStream->pCreate->streamId;
1,200✔
1138
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,200✔
1139
  int32_t snodeId = 0;
1,200✔
1140
  if (snodeNum <= 0) {
1,200!
1141
    mstsInfo("no available snode now, num:%d", snodeNum);
×
1142
    goto _exit;
×
1143
  }
1144

1145
  snodeId = isStatic ? msmRetrieveStaticSnodeId(pMnode, pStream) : msmAssignRandomSnodeId(pMnode, streamId);
1,200✔
1146

1147
_exit:
1,200✔
1148

1149
  if (0 == snodeId) {
1,200!
1150
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
×
1151
  }
1152

1153
  return snodeId;
1,200✔
1154
}
1155

1156

1157
static int32_t msmBuildTriggerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
299✔
1158
  int32_t code = TSDB_CODE_SUCCESS;
299✔
1159
  int32_t lino = 0;
299✔
1160
  int64_t streamId = pStream->pCreate->streamId;
299✔
1161

1162
  pInfo->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskStatus));
299!
1163
  TSDB_CHECK_NULL(pInfo->triggerTask, code, lino, _exit, terrno);
299!
1164

1165
  pInfo->triggerTask->id.taskId = pCtx->triggerTaskId;
299✔
1166
  pInfo->triggerTask->id.deployId = 0;
299✔
1167
  pInfo->triggerTask->id.seriousId = msmAssignTaskSeriousId();
299✔
1168
  pInfo->triggerTask->id.nodeId = pCtx->triggerNodeId;
299✔
1169
  pInfo->triggerTask->id.taskIdx = 0;
299✔
1170
  pInfo->triggerTask->type = STREAM_TRIGGER_TASK;
299✔
1171
  pInfo->triggerTask->lastUpTs = pCtx->currTs;
299✔
1172
  pInfo->triggerTask->pStream = pInfo;
299✔
1173

1174
  SStmTaskDeploy info = {0};
299✔
1175
  info.task.type = pInfo->triggerTask->type;
299✔
1176
  info.task.streamId = streamId;
299✔
1177
  info.task.taskId =  pInfo->triggerTask->id.taskId;
299✔
1178
  info.task.seriousId = pInfo->triggerTask->id.seriousId;
299✔
1179
  info.task.nodeId =  pInfo->triggerTask->id.nodeId;
299✔
1180
  info.task.taskIdx =  pInfo->triggerTask->id.taskIdx;
299✔
1181
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pInfo, &info, pStream));
299!
1182
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
299!
1183
  
1184
  (void)atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
299✔
1185

1186
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pInfo->triggerTask));
299!
1187
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pInfo->triggerTask, 1, -1));
299!
1188

1189
_exit:
299✔
1190

1191
  if (code) {
299!
1192
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1193
  }
1194

1195
  return code;
299✔
1196
}
1197

1198
static int32_t msmTDAddSingleTrigReader(SStmGrpCtx* pCtx, SStmTaskStatus* pState, int32_t nodeId, SStmStatus* pInfo, SStreamObj* pStream, int64_t streamId) {
398✔
1199
  int32_t code = TSDB_CODE_SUCCESS;
398✔
1200
  int32_t lino = 0;
398✔
1201

1202
  pState->id.taskId = msmAssignTaskId();
398✔
1203
  pState->id.deployId = 0;
398✔
1204
  pState->id.seriousId = msmAssignTaskSeriousId();
398✔
1205
  pState->id.nodeId = nodeId;
398✔
1206
  pState->id.taskIdx = 0;
398✔
1207
  pState->type = STREAM_READER_TASK;
398✔
1208
  pState->flags = STREAM_FLAG_TRIGGER_READER;
398✔
1209
  pState->status = STREAM_STATUS_UNDEPLOYED;
398✔
1210
  pState->lastUpTs = pCtx->currTs;
398✔
1211
  pState->pStream = pInfo;
398✔
1212
  
1213
  SStmTaskDeploy info = {0};
398✔
1214
  info.task.type = pState->type;
398✔
1215
  info.task.streamId = streamId;
398✔
1216
  info.task.taskId = pState->id.taskId;
398✔
1217
  info.task.flags = pState->flags;
398✔
1218
  info.task.seriousId = pState->id.seriousId;
398✔
1219
  info.task.nodeId = pState->id.nodeId;
398✔
1220
  info.task.taskIdx = pState->id.taskIdx;
398✔
1221
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, NULL, pInfo, true));
398!
1222
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
398!
1223

1224
_exit:
398✔
1225

1226
  if (code) {
398!
1227
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1228
  }
1229

1230
  return code;
398✔
1231
}
1232

1233
static int32_t msmTDAddTrigReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
299✔
1234
  int32_t code = TSDB_CODE_SUCCESS;
299✔
1235
  int32_t lino = 0;
299✔
1236
  int64_t streamId = pStream->pCreate->streamId;
299✔
1237
  SSdb   *pSdb = pCtx->pMnode->pSdb;
299✔
1238
  SStmTaskStatus* pState = NULL;
299✔
1239
  SVgObj *pVgroup = NULL;
299✔
1240
  SDbObj* pDb = NULL;
299✔
1241
  
1242
  switch (pStream->pCreate->triggerTblType) {
299✔
1243
    case TSDB_NORMAL_TABLE:
107✔
1244
    case TSDB_CHILD_TABLE:
1245
    case TSDB_VIRTUAL_CHILD_TABLE:
1246
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1247
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
107✔
1248
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
107!
1249
      pState = taosArrayGet(pInfo->trigReaders, 0);
107✔
1250
      
1251
      TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, pStream->pCreate->triggerTblVgId, pInfo, pStream, streamId));
107!
1252
      break;
107✔
1253
    }
1254
    case TSDB_SUPER_TABLE: {
191✔
1255
      pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
191✔
1256
      if (NULL == pDb) {
191!
1257
        code = terrno;
×
1258
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
1259
        goto _exit;
×
1260
      }
1261

1262
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
191✔
1263
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
191!
1264
      
1265
      void *pIter = NULL;
191✔
1266
      while (1) {
949✔
1267
        SStmTaskDeploy info = {0};
1,140✔
1268
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
1,140✔
1269
        if (pIter == NULL) {
1,140✔
1270
          break;
191✔
1271
        }
1272
      
1273
        if (pVgroup->dbUid == pDb->uid && !pVgroup->isTsma) {
949!
1274
          pState = taosArrayReserve(pInfo->trigReaders, 1);
267✔
1275

1276
          code = msmTDAddSingleTrigReader(pCtx, pState, pVgroup->vgId, pInfo, pStream, streamId);
267✔
1277
          if (code) {
267!
1278
            sdbRelease(pSdb, pVgroup);
×
1279
            sdbCancelFetch(pSdb, pIter);
×
1280
            pVgroup = NULL;
×
1281
            TAOS_CHECK_EXIT(code);
×
1282
          }
1283
        }
1284

1285
        sdbRelease(pSdb, pVgroup);
949✔
1286
      }
1287
      break;
191✔
1288
    }
1289
    default:
1✔
1290
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
1!
1291
      break;
1✔
1292
  }
1293

1294
_exit:
299✔
1295

1296
  mndReleaseDb(pCtx->pMnode, pDb);
299✔
1297

1298
  if (code) {
299!
1299
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1300
  }
1301

1302
  return code;
299✔
1303
}
1304

1305
int32_t msmUPAddScanTask(SStmGrpCtx* pCtx, SStreamObj* pStream, char* scanPlan, int32_t vgId, int64_t taskId) {
160✔
1306
  int32_t code = TSDB_CODE_SUCCESS;
160✔
1307
  int32_t lino = 0;
160✔
1308
  SSubplan* pSubplan = NULL;
160✔
1309
  int64_t streamId = pStream->pCreate->streamId;
160✔
1310
  int64_t key[2] = {streamId, 0};
160✔
1311
  SStmTaskSrcAddr addr;
1312
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
160!
1313
  addr.isFromCache = false;
160✔
1314
  
1315
  if (MNODE_HANDLE == vgId) {
160!
1316
    mndGetMnodeEpSet(pCtx->pMnode, &addr.epset);
×
1317
  } else if (vgId > MNODE_HANDLE) {
160!
1318
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, vgId);
160✔
1319
  } else {
1320
    mstsError("invalid vgId %d in scanPlan", vgId);
×
1321
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1322
  }
1323
  
1324
  addr.taskId = taskId;
160✔
1325
  addr.vgId = vgId;
160✔
1326
  addr.groupId = pSubplan->id.groupId;
160✔
1327

1328
  key[1] = pSubplan->id.subplanId;
160✔
1329

1330
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
160✔
1331
  if (NULL == ppRes) {
160!
1332
    SArray* pRes = taosArrayInit(1, sizeof(addr));
160✔
1333
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
160!
1334
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
320!
1335
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
160!
1336
  } else {
1337
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1338
  }
1339

1340
  mstsDebug("calcReader %" PRIx64 " added to toUpdateScan, vgId:%d, groupId:%d, subplanId:%d", taskId, vgId, pSubplan->id.groupId, pSubplan->id.subplanId);
160✔
1341
  
1342
  (void)atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
160✔
1343
  
1344
_exit:
160✔
1345

1346
  nodesDestroyNode((SNode*)pSubplan);
160✔
1347

1348
  if (code) {
160!
1349
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1350
  }
1351

1352
  return code;
160✔
1353
}
1354

1355
int32_t msmUPAddCacheTask(SStmGrpCtx* pCtx, SStreamCalcScan* pScan, SStreamObj* pStream) {
169✔
1356
  int32_t code = TSDB_CODE_SUCCESS;
169✔
1357
  int32_t lino = 0;
169✔
1358
  SSubplan* pSubplan = NULL;
169✔
1359
  int64_t streamId = pStream->pCreate->streamId;
169✔
1360
  int64_t key[2] = {streamId, 0};
169✔
1361
  TAOS_CHECK_EXIT(nodesStringToNode(pScan->scanPlan, (SNode**)&pSubplan));
169!
1362

1363
  SStmTaskSrcAddr addr;
1364
  addr.isFromCache = true;
169✔
1365
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pCtx->triggerNodeId);
169✔
1366
  addr.taskId = pCtx->triggerTaskId;
169✔
1367
  addr.vgId = pCtx->triggerNodeId;
169✔
1368
  addr.groupId = pSubplan->id.groupId;
169✔
1369

1370
  key[1] = pSubplan->id.subplanId;
169✔
1371
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
169✔
1372
  if (NULL == ppRes) {
169!
1373
    SArray* pRes = taosArrayInit(1, sizeof(addr));
169✔
1374
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
169!
1375
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
338!
1376
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
169!
1377
  } else {
1378
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1379
  }
1380
  
1381
  (void)atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
169✔
1382
  
1383
_exit:
169✔
1384

1385
  nodesDestroyNode((SNode*)pSubplan);
169✔
1386
  
1387
  if (code) {
169!
1388
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1389
  }
1390

1391
  return code;
169✔
1392
}
1393

1394

1395
static int32_t msmTDAddCalcReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
299✔
1396
  int32_t code = TSDB_CODE_SUCCESS;
299✔
1397
  int32_t lino = 0;
299✔
1398
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
299✔
1399
  int64_t streamId = pStream->pCreate->streamId;
299✔
1400
  SStmTaskStatus* pState = NULL;
299✔
1401
  pInfo->calcReaders = taosArrayInit(calcTasksNum, sizeof(SStmTaskStatus));
299✔
1402
  TSDB_CHECK_NULL(pInfo->calcReaders, code, lino, _exit, terrno);
299!
1403
  
1404
  for (int32_t i = 0; i < calcTasksNum; ++i) {
621✔
1405
    SStreamCalcScan* pScan = taosArrayGet(pInfo->pCreate->calcScanPlanList, i);
322✔
1406
    if (pScan->readFromCache) {
322✔
1407
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
169!
1408
      continue;
169✔
1409
    }
1410
    
1411
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
153✔
1412
    for (int32_t m = 0; m < vgNum; ++m) {
306✔
1413
      pState = taosArrayReserve(pInfo->calcReaders, 1);
153✔
1414

1415
      pState->id.taskId = msmAssignTaskId();
153✔
1416
      pState->id.deployId = 0;
153✔
1417
      pState->id.seriousId = msmAssignTaskSeriousId();
153✔
1418
      pState->id.nodeId = *(int32_t*)taosArrayGet(pScan->vgList, m);
153✔
1419
      pState->id.taskIdx = i;
153✔
1420
      pState->type = STREAM_READER_TASK;
153✔
1421
      pState->flags = 0;
153✔
1422
      pState->status = STREAM_STATUS_UNDEPLOYED;
153✔
1423
      pState->lastUpTs = pCtx->currTs;
153✔
1424
      pState->pStream = pInfo;
153✔
1425

1426
      SStmTaskDeploy info = {0};
153✔
1427
      info.task.type = pState->type;
153✔
1428
      info.task.streamId = streamId;
153✔
1429
      info.task.taskId = pState->id.taskId;
153✔
1430
      info.task.flags = pState->flags;
153✔
1431
      info.task.seriousId = pState->id.seriousId;
153✔
1432
      info.task.nodeId = pState->id.nodeId;
153✔
1433
      info.task.taskIdx = pState->id.taskIdx;
153✔
1434
      TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, pScan->scanPlan, pInfo, false));
153!
1435
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pState->id.nodeId, pState->id.taskId));
153!
1436
      TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
153!
1437
    }
1438
  }
1439

1440
_exit:
299✔
1441

1442
  if (code) {
299!
1443
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1444
  }
1445

1446
  return code;
299✔
1447
}
1448

1449

1450

1451
static int32_t msmUPPrepareReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
7✔
1452
  int32_t code = TSDB_CODE_SUCCESS;
7✔
1453
  int32_t lino = 0;
7✔
1454
  int64_t streamId = pStream->pCreate->streamId;
7✔
1455
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
7✔
1456
  if (calcTasksNum <= 0) {
7!
1457
    mstsDebug("no calc scan plan, ignore parepare reader tasks, readerNum:%d", (int32_t)taosArrayGetSize(pInfo->calcReaders));
×
1458
    return code;    
×
1459
  }
1460
  
1461
  SStmTaskStatus* pReader = taosArrayGet(pInfo->calcReaders, 0);
7✔
1462
  
1463
  for (int32_t i = 0; i < calcTasksNum; ++i) {
14✔
1464
    SStreamCalcScan* pScan = taosArrayGet(pStream->pCreate->calcScanPlanList, i);
7✔
1465
    if (pScan->readFromCache) {
7!
1466
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
×
1467
      continue;
×
1468
    }
1469
    
1470
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
7✔
1471
    for (int32_t m = 0; m < vgNum; ++m) {
14✔
1472
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pReader->id.nodeId, pReader->id.taskId));
7!
1473
      pReader++;
7✔
1474
    }
1475
  }
1476

1477
_exit:
7✔
1478

1479
  if (code) {
7!
1480
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1481
  }
1482

1483
  return code;
7✔
1484
}
1485

1486
static int32_t msmBuildReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
299✔
1487
  int32_t code = TSDB_CODE_SUCCESS;
299✔
1488
  int32_t lino = 0;
299✔
1489
  int64_t streamId = pStream->pCreate->streamId;
299✔
1490
  
1491
  TAOS_CHECK_EXIT(msmTDAddTrigReaderTasks(pCtx, pInfo, pStream));
299!
1492
  TAOS_CHECK_EXIT(msmTDAddCalcReaderTasks(pCtx, pInfo, pStream));
299!
1493

1494
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->trigReaders, NULL));
299!
1495
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->calcReaders, NULL));
299!
1496
  
1497
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->trigReaders, NULL, true));
299!
1498
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->calcReaders, NULL, false));
299!
1499
  
1500
_exit:
299✔
1501

1502
  if (code) {
299!
1503
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1504
  }
1505
  
1506
  return code;
299✔
1507
}
1508

1509
int32_t msmUpdatePlanSourceAddr(SStreamTask* pTask, int64_t streamId, SSubplan* plan, int64_t clientId, SStmTaskSrcAddr* pSrc, int32_t msgType, int64_t srcSubplanId) {
973✔
1510
  SDownstreamSourceNode source = {
973✔
1511
      .type = QUERY_NODE_DOWNSTREAM_SOURCE,
1512
      .clientId = clientId,
1513
      .taskId = pSrc->taskId,
973✔
1514
      .sId = 0,
1515
      .execId = 0,
1516
      .fetchMsgType = msgType,
1517
      .localExec = false,
1518
  };
1519

1520
  source.addr.epSet = pSrc->epset;
973✔
1521
  source.addr.nodeId = pSrc->vgId;
973✔
1522

1523
  msttDebug("try to update subplan %d grp %d sourceAddr from subplan %" PRId64 ", clientId:%" PRIx64 ", srcTaskId:%" PRIx64 ", srcNodeId:%d, msgType:%s", 
973!
1524
      plan->id.subplanId, pSrc->groupId, srcSubplanId, source.clientId, source.taskId, source.addr.nodeId, TMSG_INFO(source.fetchMsgType));
1525
  
1526
  return qSetSubplanExecutionNode(plan, pSrc->groupId, &source);
973✔
1527
}
1528

1529
int32_t msmGetTaskIdFromSubplanId(SStreamObj* pStream, SArray* pRunners, int32_t beginIdx, int32_t subplanId, int64_t* taskId, SStreamTask** ppParent) {
×
1530
  int64_t streamId = pStream->pCreate->streamId;
×
1531
  int32_t runnerNum = taosArrayGetSize(pRunners);
×
1532
  for (int32_t i = beginIdx; i < runnerNum; ++i) {
×
1533
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
×
1534
    SSubplan* pPlan = pDeploy->msg.runner.pPlan;
×
1535
    if (pPlan->id.subplanId == subplanId) {
×
1536
      *taskId = pDeploy->task.taskId;
×
1537
      *ppParent = &pDeploy->task;
×
1538
      return TSDB_CODE_SUCCESS;
×
1539
    }
1540
  }
1541

1542
  mstsError("subplanId %d not found in runner list", subplanId);
×
1543

1544
  return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
1545
}
1546

1547
int32_t msmUpdateLowestPlanSourceAddr(SSubplan* pPlan, SStmTaskDeploy* pDeploy, int64_t streamId) {
901✔
1548
  int32_t code = TSDB_CODE_SUCCESS;
901✔
1549
  int32_t lino = 0;
901✔
1550
  int64_t key[2] = {streamId, -1};
901✔
1551
  SNode* pNode = NULL;
901✔
1552
  SStreamTask* pTask = &pDeploy->task;
901✔
1553
  FOREACH(pNode, pPlan->pChildren) {
1,874!
1554
    if (QUERY_NODE_VALUE != nodeType(pNode)) {
973!
1555
      msttDebug("node type %d is not valueNode, skip it", nodeType(pNode));
×
1556
      continue;
×
1557
    }
1558
    
1559
    SValueNode* pVal = (SValueNode*)pNode;
973✔
1560
    if (TSDB_DATA_TYPE_BIGINT != pVal->node.resType.type) {
973!
1561
      msttWarn("invalid value node data type %d for runner's child subplan", pVal->node.resType.type);
×
1562
      continue;
×
1563
    }
1564

1565
    key[1] = MND_GET_RUNNER_SUBPLANID(pVal->datum.i);
973✔
1566

1567
    SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
973✔
1568
    if (NULL == ppRes) {
973!
1569
      msttError("lowest runner subplan ID:%d,%d can't get its child ID:%" PRId64 " addr", pPlan->id.groupId, pPlan->id.subplanId, key[1]);
×
1570
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1571
    }
1572

1573
    int32_t childrenNum = taosArrayGetSize(*ppRes);
973✔
1574
    for (int32_t i = 0; i < childrenNum; ++i) {
1,946✔
1575
      SStmTaskSrcAddr* pAddr = taosArrayGet(*ppRes, i);
973✔
1576
      TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(pTask, streamId, pPlan, pDeploy->task.taskId, pAddr, pAddr->isFromCache ? TDMT_STREAM_FETCH_FROM_CACHE : TDMT_STREAM_FETCH, key[1]));
973!
1577
    }
1578
  }
1579

1580
_exit:
901✔
1581

1582
  if (code) {
901!
1583
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1584
  }
1585

1586
  return code;
901✔
1587
}
1588

1589
int32_t msmUpdateRunnerPlan(SStmGrpCtx* pCtx, SArray* pRunners, int32_t beginIdx, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
901✔
1590
  int32_t code = TSDB_CODE_SUCCESS;
901✔
1591
  int32_t lino = 0;
901✔
1592
  SSubplan* pPlan = pDeploy->msg.runner.pPlan;
901✔
1593
  SStreamTask* pTask = &pDeploy->task;
901✔
1594
  SStreamTask* parentTask = NULL;
901✔
1595
  int64_t streamId = pStream->pCreate->streamId;
901✔
1596

1597
  TAOS_CHECK_EXIT(msmUpdateLowestPlanSourceAddr(pPlan, pDeploy, streamId));
901!
1598

1599
  SNode* pTmp = NULL;
901✔
1600
  WHERE_EACH(pTmp, pPlan->pChildren) {
1,874!
1601
    if (QUERY_NODE_VALUE == nodeType(pTmp)) {
973!
1602
      ERASE_NODE(pPlan->pChildren);
973✔
1603
      continue;
973✔
1604
    }
1605
    WHERE_NEXT;
×
1606
  }
1607
  nodesClearList(pPlan->pChildren);
901✔
1608
  pPlan->pChildren = NULL;
901✔
1609

1610
  if (NULL == pPlan->pParents) {
901!
1611
    goto _exit;
901✔
1612
  }
1613

1614
  SNode* pNode = NULL;
×
1615
  int64_t parentTaskId = 0;
×
1616
  SStmTaskSrcAddr addr = {0};
×
1617
  addr.taskId = pDeploy->task.taskId;
×
1618
  addr.vgId = pDeploy->task.nodeId;
×
1619
  addr.groupId = pPlan->id.groupId;
×
1620
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pDeploy->task.nodeId);
×
1621
  FOREACH(pNode, pPlan->pParents) {
×
1622
    SSubplan* pSubplan = (SSubplan*)pNode;
×
1623
    TAOS_CHECK_EXIT(msmGetTaskIdFromSubplanId(pStream, pRunners, beginIdx, pSubplan->id.subplanId, &parentTaskId, &parentTask));
×
1624
    TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(parentTask, streamId, pSubplan, parentTaskId, &addr, TDMT_STREAM_FETCH_FROM_RUNNER, pPlan->id.subplanId));
×
1625
  }
1626
  
1627
_exit:
×
1628

1629
  if (code) {
901!
1630
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1631
  }
1632

1633
  return code;
901✔
1634
}
1635

1636
int32_t msmUpdateRunnerPlans(SStmGrpCtx* pCtx, SArray* pRunners, SStreamObj* pStream) {
901✔
1637
  int32_t code = TSDB_CODE_SUCCESS;
901✔
1638
  int32_t lino = 0;
901✔
1639
  int64_t streamId = pStream->pCreate->streamId;
901✔
1640
  int32_t runnerNum = taosArrayGetSize(pRunners);
901✔
1641
  
1642
  for (int32_t i = 0; i < runnerNum; ++i) {
1,802✔
1643
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
901✔
1644
    TAOS_CHECK_EXIT(msmUpdateRunnerPlan(pCtx, pRunners, i, pDeploy, pStream));
901!
1645
    TAOS_CHECK_EXIT(nodesNodeToString((SNode*)pDeploy->msg.runner.pPlan, false, (char**)&pDeploy->msg.runner.pPlan, NULL));
901!
1646

1647
    SStreamTask* pTask = &pDeploy->task;
901✔
1648
    msttDebugL("runner updated task plan:%s", (const char*)pDeploy->msg.runner.pPlan);
901✔
1649
  }
1650

1651
_exit:
901✔
1652

1653
  if (code) {
901!
1654
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1655
  }
1656

1657
  return code;
901✔
1658
}
1659

1660
int32_t msmBuildRunnerTasksImpl(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream) {
298✔
1661
  int32_t code = 0;
298✔
1662
  int32_t lino = 0;
298✔
1663
  int64_t streamId = pStream->pCreate->streamId;
298✔
1664
  SArray* deployTaskList = NULL;
298✔
1665
  SArray* deployList = NULL;
298✔
1666
  int32_t deployNodeId = 0;
298✔
1667
  SStmTaskStatus* pState = NULL;
298✔
1668
  int32_t taskIdx = 0;
298✔
1669
  SNodeListNode *plans = NULL;
298✔
1670
  int32_t        taskNum = 0;
298✔
1671
  int32_t        totalTaskNum = 0;
298✔
1672

1673
  if (pDag->numOfSubplans <= 0) {
298!
1674
    mstsError("invalid subplan num:%d", pDag->numOfSubplans);
×
1675
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1676
  }
1677

1678
  if (pDag->numOfSubplans != pStream->pCreate->numOfCalcSubplan) {
298!
1679
    mstsError("numOfCalcSubplan %d mismatch with numOfSubplans %d", pStream->pCreate->numOfCalcSubplan, pDag->numOfSubplans);
×
1680
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1681
  }
1682

1683
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
298!
1684
  if (levelNum <= 0) {
298!
1685
    mstsError("invalid level num:%d", levelNum);
×
1686
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1687
  }
1688

1689
  int32_t        lowestLevelIdx = levelNum - 1;
298✔
1690
  
1691
  plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, 0);
298✔
1692
  if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
298!
1693
    mstsError("invalid level plan, level:0, planNodeType:%d", nodeType(plans));
×
1694
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1695
  }
1696
  
1697
  taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
298!
1698
  if (taskNum != 1) {
298!
1699
    mstsError("invalid level plan number:%d, level:0", taskNum);
×
1700
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1701
  }
1702

1703
  deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
298✔
1704
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
298!
1705
  
1706
  for (int32_t deployId = 0; deployId < pInfo->runnerDeploys; ++deployId) {
1,192✔
1707
    totalTaskNum = 0;
894✔
1708

1709
    deployList = pInfo->runners[deployId];
894✔
1710
    deployNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == deployId) ? true : false);
894✔
1711
    if (!GOT_SNODE(deployNodeId)) {
894!
1712
      TAOS_CHECK_EXIT(terrno);
×
1713
    }
1714

1715
    taskIdx = 0;
894✔
1716
    
1717
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
1,788✔
1718
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
894✔
1719
      if (NULL == plans) {
894!
1720
        mstsError("empty level plan, level:%d", i);
×
1721
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1722
      }
1723

1724
      if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
894!
1725
        mstsError("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
×
1726
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1727
      }
1728

1729
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
894!
1730
      if (taskNum <= 0) {
894!
1731
        mstsError("invalid level plan number:%d, level:%d", taskNum, i);
×
1732
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1733
      }
1734

1735
      totalTaskNum += taskNum;
894✔
1736
      if (totalTaskNum > pDag->numOfSubplans) {
894!
1737
        mstsError("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
×
1738
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1739
      }
1740

1741
      for (int32_t n = 0; n < taskNum; ++n) {
1,788✔
1742
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
894✔
1743
        pState = taosArrayReserve(deployList, 1);
894✔
1744

1745
        pState->id.taskId = msmAssignTaskId();
894✔
1746
        pState->id.deployId = deployId;
894✔
1747
        pState->id.seriousId = msmAssignTaskSeriousId();
894✔
1748
        pState->id.nodeId = deployNodeId;
894✔
1749
        pState->id.taskIdx = MND_SET_RUNNER_TASKIDX(i, n);
894✔
1750
        pState->type = STREAM_RUNNER_TASK;
894✔
1751
        pState->flags = (0 == i) ? STREAM_FLAG_TOP_RUNNER : 0;
894!
1752
        pState->status = STREAM_STATUS_UNDEPLOYED;
894✔
1753
        pState->lastUpTs = pCtx->currTs;
894✔
1754
        pState->pStream = pInfo;
894✔
1755

1756
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
894✔
1757
        pDeploy->task.type = pState->type;
894✔
1758
        pDeploy->task.streamId = streamId;
894✔
1759
        pDeploy->task.taskId = pState->id.taskId;
894✔
1760
        pDeploy->task.flags = pState->flags;
894✔
1761
        pDeploy->task.seriousId = pState->id.seriousId;
894✔
1762
        pDeploy->task.deployId = pState->id.deployId;
894✔
1763
        pDeploy->task.nodeId = pState->id.nodeId;
894✔
1764
        pDeploy->task.taskIdx = pState->id.taskIdx;
894✔
1765
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
894!
1766

1767
        SStreamTask* pTask = &pDeploy->task;
894✔
1768
        msttDebug("runner task deploy built, subplan level:%d, taskIdx:%d, groupId:%d, subplanId:%d",
894✔
1769
            i, pTask->taskIdx, plan->id.groupId, plan->id.subplanId);
1770
      }
1771

1772
      mstsDebug("deploy %d level %d initialized, taskNum:%d", deployId, i, taskNum);
894✔
1773
    }
1774

1775
    if (totalTaskNum != pDag->numOfSubplans) {
894!
1776
      mstsError("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
×
1777
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1778
    }
1779

1780
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
894!
1781

1782
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
894!
1783

1784
    nodesDestroyNode((SNode *)pDag);
894✔
1785
    pDag = NULL;
894✔
1786
    
1787
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
894!
1788

1789
    mstsDebug("total %d runner tasks added for deploy %d", totalTaskNum, deployId);
894✔
1790
  }
1791

1792
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,192✔
1793
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->runners[i], NULL));
894!
1794
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[i], NULL, 0, i));
894!
1795
  }
1796
  
1797
  pInfo->runnerNum = totalTaskNum;
298✔
1798
  
1799
_exit:
298✔
1800

1801
  if (code) {
298!
1802
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1803
  }
1804

1805
  taosArrayDestroy(deployTaskList);
298✔
1806
  nodesDestroyNode((SNode *)pDag);
298✔
1807

1808
  return code;
298✔
1809
}
1810

1811
int32_t msmReBuildRunnerTasks(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream, SStmTaskAction* pAction) {
7✔
1812
  int32_t code = 0;
7✔
1813
  int32_t lino = 0;
7✔
1814
  int64_t streamId = pStream->pCreate->streamId;
7✔
1815
  int32_t newNodeId = 0;
7✔
1816
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
7!
1817
  int32_t        lowestLevelIdx = levelNum - 1;
7✔
1818
  SNodeListNode *plans = NULL;
7✔
1819
  int32_t        taskNum = 0;
7✔
1820
  int32_t        totalTaskNum = 0;
7✔
1821
  int32_t        deployId = 0;
7✔
1822
  SStmTaskStatus* pRunner = NULL;
7✔
1823
  SStmTaskStatus* pStartRunner = NULL;
7✔
1824
  int32_t taskIdx = 0;
7✔
1825
  SArray* deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
7✔
1826
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
7!
1827

1828
  for (int32_t r = 0; r < pAction->deployNum; ++r) {
14✔
1829
    deployId = pAction->deployId[r];
7✔
1830

1831
    pRunner = taosArrayGet(pInfo->runners[deployId], 0);
7✔
1832

1833
    pStartRunner = pRunner;
7✔
1834
    totalTaskNum = 0;
7✔
1835

1836
    newNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == r) ? true : false);
7✔
1837
    if (!GOT_SNODE(newNodeId)) {
7!
1838
      TAOS_CHECK_EXIT(terrno);
×
1839
    }
1840

1841
    taskIdx = 0;
7✔
1842
    
1843
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
14✔
1844
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
7✔
1845
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
7!
1846
      totalTaskNum += taskNum;
7✔
1847

1848
      pRunner->flags &= STREAM_FLAG_REDEPLOY_RUNNER;
7✔
1849
      
1850
      for (int32_t n = 0; n < taskNum; ++n) {
14✔
1851
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
7✔
1852

1853
        int32_t newTaskIdx = MND_SET_RUNNER_TASKIDX(i, n);
7✔
1854
        if (pRunner->id.taskIdx != newTaskIdx) {
7!
1855
          mstsError("runner TASK:%" PRId64 " taskIdx %d mismatch with newTaskIdx:%d", pRunner->id.taskId, pRunner->id.taskIdx, newTaskIdx);
×
1856
          TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
1857
        }
1858

1859
        pRunner->id.nodeId = newNodeId;
7✔
1860

1861
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
7✔
1862
        pDeploy->task.type = pRunner->type;
7✔
1863
        pDeploy->task.streamId = streamId;
7✔
1864
        pDeploy->task.taskId = pRunner->id.taskId;
7✔
1865
        pDeploy->task.flags = pRunner->flags;
7✔
1866
        pDeploy->task.seriousId = pRunner->id.seriousId;
7✔
1867
        pDeploy->task.nodeId = pRunner->id.nodeId;
7✔
1868
        pDeploy->task.taskIdx = pRunner->id.taskIdx;
7✔
1869
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
7!
1870

1871
        pRunner++;
7✔
1872
      }
1873

1874
      mstsDebug("level %d initialized, taskNum:%d", i, taskNum);
7!
1875
    }
1876

1877
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
7!
1878

1879
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
7!
1880

1881
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[deployId], NULL, 0, deployId));
7!
1882

1883
    nodesDestroyNode((SNode *)pDag);
7✔
1884
    pDag = NULL;
7✔
1885

1886
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
7!
1887
  }
1888

1889
_exit:
7✔
1890

1891
  if (code) {
7!
1892
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1893
  }
1894

1895
  nodesDestroyNode((SNode *)pDag);
7✔
1896
  taosArrayDestroy(deployTaskList);
7✔
1897

1898
  return code;
7✔
1899
}
1900

1901

1902
int32_t msmSetStreamRunnerExecReplica(int64_t streamId, SStmStatus* pInfo) {
278✔
1903
  int32_t code = TSDB_CODE_SUCCESS;
278✔
1904
  int32_t lino = 0;
278✔
1905
  //STREAMTODO 
1906
  
1907
  pInfo->runnerDeploys = MND_STREAM_RUNNER_DEPLOY_NUM;
278✔
1908
  pInfo->runnerReplica = 3;
278✔
1909

1910
_exit:
278✔
1911

1912
  if (code) {
278!
1913
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1914
  }
1915

1916
  return code;
278✔
1917
}
1918

1919

1920
static int32_t msmBuildRunnerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
299✔
1921
  if (NULL == pStream->pCreate->calcPlan) {
299✔
1922
    return TSDB_CODE_SUCCESS;
1✔
1923
  }
1924
  
1925
  int32_t code = TSDB_CODE_SUCCESS;
298✔
1926
  int32_t lino = 0;
298✔
1927
  int64_t streamId = pStream->pCreate->streamId;
298✔
1928
  SQueryPlan* pPlan = NULL;
298✔
1929

1930
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
298!
1931

1932
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,192✔
1933
    pInfo->runners[i] = taosArrayInit(pPlan->numOfSubplans, sizeof(SStmTaskStatus));
894✔
1934
    TSDB_CHECK_NULL(pInfo->runners[i], code, lino, _exit, terrno);
894!
1935
  }
1936

1937
  code = msmBuildRunnerTasksImpl(pCtx, pPlan, pInfo, pStream);
298✔
1938
  pPlan = NULL;
298✔
1939
  
1940
  TAOS_CHECK_EXIT(code);
298!
1941

1942
  taosHashClear(mStreamMgmt.toUpdateScanMap);
298✔
1943
  mStreamMgmt.toUpdateScanNum = 0;
298✔
1944

1945
_exit:
298✔
1946

1947
  nodesDestroyNode((SNode *)pPlan);
298✔
1948

1949
  if (code) {
298!
1950
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1951
  }
1952

1953
  return code;
298✔
1954
}
1955

1956

1957
static int32_t msmBuildStreamTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
299✔
1958
  int32_t code = TSDB_CODE_SUCCESS;
299✔
1959
  int32_t lino = 0;
299✔
1960
  int64_t streamId = pStream->pCreate->streamId;
299✔
1961

1962
  mstsInfo("start to deploy stream tasks, deployTimes:%" PRId64, pInfo->deployTimes);
299!
1963

1964
  pCtx->triggerTaskId = msmAssignTaskId();
299✔
1965
  pCtx->triggerNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
299✔
1966
  if (!GOT_SNODE(pCtx->triggerNodeId)) {
299!
1967
    TAOS_CHECK_EXIT(terrno);
×
1968
  }
1969

1970
  TAOS_CHECK_EXIT(msmBuildReaderTasks(pCtx, pInfo, pStream));
299!
1971
  TAOS_CHECK_EXIT(msmBuildRunnerTasks(pCtx, pInfo, pStream));
299!
1972
  TAOS_CHECK_EXIT(msmBuildTriggerTasks(pCtx, pInfo, pStream));
299!
1973
  
1974
_exit:
299✔
1975

1976
  if (code) {
299!
1977
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1978
  }
1979

1980
  return code;
299✔
1981
}
1982

1983
static int32_t msmInitTrigReaderList(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1984
  int32_t code = TSDB_CODE_SUCCESS;
×
1985
  int32_t lino = 0;
×
1986
  int64_t streamId = pStream->pCreate->streamId;
×
1987
  SSdb   *pSdb = pCtx->pMnode->pSdb;
×
1988
  SStmTaskStatus* pState = NULL;
×
1989
  SDbObj* pDb = NULL;
×
1990
  
1991
  switch (pStream->pCreate->triggerTblType) {
×
1992
    case TSDB_NORMAL_TABLE:
×
1993
    case TSDB_CHILD_TABLE:
1994
    case TSDB_VIRTUAL_CHILD_TABLE:
1995
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1996
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
×
1997
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
1998
      pInfo->trigReaderNum = 1;
×
1999
      break;
×
2000
    }
2001
    case TSDB_SUPER_TABLE: {
×
2002
      pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
×
2003
      if (NULL == pDb) {
×
2004
        code = terrno;
×
2005
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
2006
        goto _exit;
×
2007
      }
2008

2009
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
×
2010
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
2011
      pInfo->trigReaderNum = pDb->cfg.numOfVgroups;
×
2012
      mndReleaseDb(pCtx->pMnode, pDb);
×
2013
      pDb = NULL;
×
2014
      break;
×
2015
    }
2016
    default:
×
2017
      pInfo->trigReaderNum = 0;
×
2018
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
×
2019
      break;
×
2020
  }
2021

2022
_exit:
×
2023

2024
  if (code) {
×
2025
    mndReleaseDb(pCtx->pMnode, pDb);
×
2026
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2027
  }
2028

2029
  return code;
×
2030
}
2031

2032

2033
static int32_t msmInitStmStatus(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStreamObj* pStream, bool initList) {
279✔
2034
  int32_t code = TSDB_CODE_SUCCESS;
279✔
2035
  int32_t lino = 0;
279✔
2036
  int64_t streamId = pStream->pCreate->streamId;
279✔
2037

2038
  pStatus->lastActionTs = INT64_MIN;
279✔
2039

2040
  if (NULL == pStatus->streamName) {
279!
2041
    pStatus->streamName = taosStrdup(pStream->name);
279!
2042
    TSDB_CHECK_NULL(pStatus->streamName, code, lino, _exit, terrno);
279!
2043
  }
2044

2045
  TAOS_CHECK_EXIT(tCloneStreamCreateDeployPointers(pStream->pCreate, &pStatus->pCreate));
279!
2046
  
2047
  if (pStream->pCreate->numOfCalcSubplan > 0) {
279✔
2048
    pStatus->runnerNum = pStream->pCreate->numOfCalcSubplan;
278✔
2049
    
2050
    TAOS_CHECK_EXIT(msmSetStreamRunnerExecReplica(streamId, pStatus));
278!
2051
  }
2052

2053
  if (initList) {
279!
2054
    TAOS_CHECK_EXIT(msmInitTrigReaderList(pCtx, pStatus, pStream));
×
2055

2056
    int32_t subPlanNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
×
2057
    if (subPlanNum > 0) {
×
2058
      pStatus->calcReaderNum = subPlanNum;
×
2059
      pStatus->calcReaders = taosArrayInit(subPlanNum, sizeof(SStmTaskStatus));
×
2060
      TSDB_CHECK_NULL(pStatus->calcReaders, code, lino, _exit, terrno);
×
2061
    }
2062

2063
    if (pStatus->runnerNum > 0) {
×
2064
      for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
2065
        pStatus->runners[i] = taosArrayInit(pStatus->runnerNum, sizeof(SStmTaskStatus));
×
2066
        TSDB_CHECK_NULL(pStatus->runners[i], code, lino, _exit, terrno);
×
2067
      }
2068
    }
2069
  }
2070
  
2071
_exit:
279✔
2072

2073
  if (code) {
279!
2074
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2075
  }
2076

2077
  return code;
279✔
2078
}
2079

2080
static int32_t msmDeployStreamTasks(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmStatus* pStatus) {
299✔
2081
  int32_t code = TSDB_CODE_SUCCESS;
299✔
2082
  int32_t lino = 0;
299✔
2083
  int64_t streamId = pStream->pCreate->streamId;
299✔
2084
  SStmStatus info = {0};
299✔
2085

2086
  if (NULL == pStatus) {
299✔
2087
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &info, pStream, false));
279!
2088

2089
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &info, sizeof(info)));
279!
2090

2091
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
279✔
2092
  }
2093
  
2094
  TAOS_CHECK_EXIT(msmBuildStreamTasks(pCtx, pStatus, pStream));
299!
2095

2096
  mstLogSStmStatus("stream deployed", streamId, pStatus);
299✔
2097

2098
_exit:
299✔
2099

2100
  if (code) {
299!
2101
    if (NULL != pStatus) {
×
2102
      msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2103
      mstsError("stream build error:%s, will try to stop current stream", tstrerror(code));
×
2104
    }
2105
    
2106
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2107
  }
2108

2109
  return code;
299✔
2110
}
2111

2112

2113
static int32_t msmSTRemoveStream(int64_t streamId, bool fromStreamMap) {
27✔
2114
  int32_t code = TSDB_CODE_SUCCESS;
27✔
2115
  void* pIter = NULL;
27✔
2116

2117
  while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
37✔
2118
    SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
10✔
2119
    (void)mstWaitLock(&pVg->lock, true);
10✔
2120

2121
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
10✔
2122
    if (atomic_load_32(&pVg->deployed) == taskNum) {
10!
2123
      taosRUnLockLatch(&pVg->lock);
×
2124
      continue;
×
2125
    }
2126

2127
    for (int32_t i = 0; i < taskNum; ++i) {
66✔
2128
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
56✔
2129
      if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
56✔
2130
        continue;
48✔
2131
      }
2132

2133
      mstDestroySStmTaskToDeployExt(pExt);
8✔
2134
      pExt->deployed = true;
8✔
2135
    }
2136
    
2137
    taosRUnLockLatch(&pVg->lock);
10✔
2138
  }
2139

2140
  while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
53✔
2141
    SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
26✔
2142
    (void)mstWaitLock(&pSnode->lock, true);
26✔
2143

2144
    int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
26✔
2145
    if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
26✔
2146
      for (int32_t i = 0; i < taskNum; ++i) {
33✔
2147
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
22✔
2148
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
22✔
2149
          continue;
19✔
2150
        }
2151
        
2152
        mstDestroySStmTaskToDeployExt(pExt);
3✔
2153
        pExt->deployed = true;
3✔
2154
      }
2155
    }
2156

2157
    taskNum = taosArrayGetSize(pSnode->runnerList);
26✔
2158
    if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
26!
2159
      for (int32_t i = 0; i < taskNum; ++i) {
99✔
2160
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
73✔
2161
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
73✔
2162
          continue;
63✔
2163
        }
2164
        
2165
        mstDestroySStmTaskToDeployExt(pExt);
10✔
2166
        pExt->deployed = true;
10✔
2167
      }
2168
    }
2169

2170
    taosRUnLockLatch(&pSnode->lock);
26✔
2171
  }
2172

2173
  
2174
  while ((pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter))) {
135✔
2175
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
108✔
2176
    code = taosHashRemove(pSnode->streamTasks, &streamId, sizeof(streamId));
108✔
2177
    if (TSDB_CODE_SUCCESS == code) {
108✔
2178
      mstsDebug("stream removed from snodeMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pSnode->streamTasks));
38✔
2179
    }
2180
  }
2181

2182
  while ((pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter))) {
86✔
2183
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
59✔
2184
    code = taosHashRemove(pVg->streamTasks, &streamId, sizeof(streamId));
59✔
2185
    if (TSDB_CODE_SUCCESS == code) {
59✔
2186
      mstsDebug("stream removed from vgroupMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pVg->streamTasks));
39✔
2187
    }
2188
  }
2189

2190
  size_t keyLen = 0;
27✔
2191
  while ((pIter = taosHashIterate(mStreamMgmt.taskMap, pIter))) {
966✔
2192
    int64_t* pStreamId = taosHashGetKey(pIter, &keyLen);
939✔
2193
    if (*pStreamId == streamId) {
939✔
2194
      int64_t taskId = *(pStreamId + 1);
173✔
2195
      code = taosHashRemove(mStreamMgmt.taskMap, pStreamId, keyLen);
173✔
2196
      if (code) {
173!
2197
        mstsError("TASK:%" PRIx64 " remove from taskMap failed, error:%s", taskId, tstrerror(code));
×
2198
      } else {
2199
        mstsDebug("TASK:%" PRIx64 " removed from taskMap", taskId);
173✔
2200
      }
2201
    }
2202
  }
2203

2204
  if (fromStreamMap) {
27✔
2205
    code = taosHashRemove(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
7✔
2206
    if (code) {
7!
2207
      mstsError("stream remove from streamMap failed, error:%s", tstrerror(code));
×
2208
    } else {
2209
      mstsDebug("stream removed from streamMap, remains:%d", taosHashGetSize(mStreamMgmt.streamMap));
7✔
2210
    }
2211
  }
2212
  
2213
  return code;
27✔
2214
}
2215

2216
static void msmResetStreamForRedeploy(int64_t streamId, SStmStatus* pStatus) {
20✔
2217
  mstsInfo("try to reset stream for redeploy, stopped:%d, current deployTimes:%" PRId64, atomic_load_8(&pStatus->stopped), pStatus->deployTimes);
20!
2218
  
2219
  (void)msmSTRemoveStream(streamId, false);  
20✔
2220

2221
  mstResetSStmStatus(pStatus);
20✔
2222

2223
  pStatus->deployTimes++;
20✔
2224
}
20✔
2225

2226
static int32_t msmLaunchStreamDeployAction(SStmGrpCtx* pCtx, SStmStreamAction* pAction) {
300✔
2227
  int32_t code = TSDB_CODE_SUCCESS;
300✔
2228
  int32_t lino = 0;
300✔
2229
  int64_t streamId = pAction->streamId;
300✔
2230
  char* streamName = pAction->streamName;
300✔
2231
  SStreamObj* pStream = NULL;
300✔
2232
  int8_t stopped = 0;
300✔
2233

2234
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
300✔
2235
  if (pStatus) {
300✔
2236
    stopped = atomic_load_8(&pStatus->stopped);
20✔
2237
    if (0 == stopped) {
20✔
2238
      mstsDebug("stream %s will try to reset and redeploy it", pAction->streamName);
3!
2239
      msmResetStreamForRedeploy(streamId, pStatus);
3✔
2240
    } else {
2241
      if (MST_IS_USER_STOPPED(stopped) && !pAction->userAction) {
17!
2242
        mstsWarn("stream %s already stopped by user, stopped:%d, ignore deploy it", pAction->streamName, stopped);
×
2243
        return code;
×
2244
      }
2245
      
2246
      if (stopped == atomic_val_compare_exchange_8(&pStatus->stopped, stopped, 0)) {
17!
2247
        mstsDebug("stream %s will try to reset and redeploy it from stopped %d", pAction->streamName, stopped);
17✔
2248
        msmResetStreamForRedeploy(streamId, pStatus);
17✔
2249
      }
2250
    }
2251
  }
2252

2253
  code = mndAcquireStream(pCtx->pMnode, streamName, &pStream);
300✔
2254
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
300✔
2255
    mstsWarn("stream %s no longer exists, ignore deploy", streamName);
1!
2256
    return TSDB_CODE_SUCCESS;
1✔
2257
  }
2258

2259
  TAOS_CHECK_EXIT(code);
299!
2260

2261
  if (pStatus && pStream->pCreate->streamId != streamId) {
299!
2262
    mstsWarn("stream %s already dropped by user, ignore deploy it", pAction->streamName);
×
2263
    atomic_store_8(&pStatus->stopped, 2);
×
2264
    mstsInfo("set stream %s stopped by user since streamId mismatch", streamName);
×
2265
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2266
  }
2267

2268
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
299✔
2269
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
299✔
2270
  if (userStopped || userDropped) {
299!
2271
    mstsWarn("stream %s is stopped %d or removing %d, ignore deploy", streamName, userStopped, userDropped);
×
2272
    goto _exit;
×
2273
  }
2274
  
2275
  TAOS_CHECK_EXIT(msmDeployStreamTasks(pCtx, pStream, pStatus));
299!
2276

2277
_exit:
299✔
2278

2279
  mndReleaseStream(pCtx->pMnode, pStream);
299✔
2280

2281
  if (code) {
299!
2282
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2283
  }
2284

2285
  return code;
299✔
2286
}
2287

2288
static int32_t msmReLaunchReaderTask(SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
38✔
2289
  int32_t code = TSDB_CODE_SUCCESS;
38✔
2290
  int32_t lino = 0;
38✔
2291
  int64_t streamId = pAction->streamId;
38✔
2292
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
38✔
2293
  if (NULL == ppTask) {
38!
2294
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
×
2295
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2296
  }
2297
  
2298
  SStmTaskDeploy info = {0};
38✔
2299
  info.task.type = pAction->type;
38✔
2300
  info.task.streamId = pAction->streamId;
38✔
2301
  info.task.taskId = pAction->id.taskId;
38✔
2302
  info.task.seriousId = (*ppTask)->id.seriousId;
38✔
2303
  info.task.nodeId = pAction->id.nodeId;
38✔
2304
  info.task.taskIdx = pAction->id.taskIdx;
38✔
2305
  
2306
  bool isTriggerReader = STREAM_IS_TRIGGER_READER(pAction->flag);
38✔
2307
  SStreamCalcScan* scanPlan = NULL;
38✔
2308
  if (!isTriggerReader) {
38✔
2309
    scanPlan = taosArrayGet(pStatus->pCreate->calcScanPlanList, pAction->id.taskIdx);
19✔
2310
    if (NULL == scanPlan) {
19!
2311
      mstsError("fail to get TASK:%" PRId64 " scanPlan, taskIdx:%d, scanPlanNum:%zu", 
×
2312
          pAction->id.taskId, pAction->id.taskIdx, taosArrayGetSize(pStatus->pCreate->calcScanPlanList));
2313
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2314
    }
2315
  }
2316
  
2317
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, scanPlan ? scanPlan->scanPlan : NULL, pStatus, isTriggerReader));
38!
2318
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, pAction->streamId));
38!
2319

2320
_exit:
38✔
2321

2322
  if (code) {
38!
2323
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2324
  }
2325

2326
  return code;
38✔
2327
}
2328

2329
/*
2330
static int32_t msmReLaunchTriggerTask(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
2331
  int32_t code = TSDB_CODE_SUCCESS;
2332
  int32_t lino = 0;
2333
  int64_t streamId = pAction->streamId;
2334
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
2335
  if (NULL == ppTask) {
2336
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
2337
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
2338
  }
2339
  
2340
  (*ppTask)->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2341
  if (!GOT_SNODE((*ppTask)->id.nodeId)) {
2342
    mstsError("no avaible snode for deploying trigger task, seriousId: %" PRId64, (*ppTask)->id.seriousId);
2343
    return TSDB_CODE_SUCCESS;
2344
  }
2345
  
2346
  SStmTaskDeploy info = {0};
2347
  info.task.type = pAction->type;
2348
  info.task.streamId = streamId;
2349
  info.task.taskId = pAction->id.taskId;
2350
  info.task.seriousId = (*ppTask)->id.seriousId;
2351
  info.task.nodeId = (*ppTask)->id.nodeId;
2352
  info.task.taskIdx = pAction->id.taskIdx;
2353
  
2354
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2355
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2356
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, *ppTask, 1, -1));
2357
  
2358
  atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2359

2360
_exit:
2361

2362
  if (code) {
2363
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
2364
  }
2365

2366
  return code;
2367
}
2368
*/
2369

2370
static int32_t msmReLaunchRunnerDeploy(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
7✔
2371
  int32_t code = TSDB_CODE_SUCCESS;
7✔
2372
  int32_t lino = 0;
7✔
2373
  int64_t streamId = pAction->streamId;
7✔
2374
  
2375
/*
2376
  if (pAction->triggerStatus) {
2377
    pCtx->triggerTaskId = pAction->triggerStatus->id.taskId;
2378
    pAction->triggerStatus->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2379
    if (!GOT_SNODE(pAction->triggerStatus->id.nodeId)) {
2380
      mstsError("no avaible snode for deploying trigger task, seriousId:%" PRId64, pAction->triggerStatus->id.seriousId);
2381
      return TSDB_CODE_SUCCESS;
2382
    }
2383
  
2384
    pCtx->triggerNodeId = pAction->triggerStatus->id.nodeId;
2385
  } else {
2386
*/
2387
  pCtx->triggerTaskId = pStatus->triggerTask->id.taskId;
7✔
2388
  pCtx->triggerNodeId = pStatus->triggerTask->id.nodeId;
7✔
2389
//  }
2390
  
2391
  TAOS_CHECK_EXIT(msmUPPrepareReaderTasks(pCtx, pStatus, pStream));
7!
2392
  
2393
  SQueryPlan* pPlan = NULL;
7✔
2394
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
7!
2395
  
2396
  TAOS_CHECK_EXIT(msmReBuildRunnerTasks(pCtx, pPlan, pStatus, pStream, pAction));
7!
2397
  
2398
  taosHashClear(mStreamMgmt.toUpdateScanMap);
7✔
2399
  mStreamMgmt.toUpdateScanNum = 0;
7✔
2400
  
2401
/*
2402
  if (pAction->triggerStatus) {
2403
    SStmTaskDeploy info = {0};
2404
    info.task.type = STREAM_TRIGGER_TASK;
2405
    info.task.streamId = streamId;
2406
    info.task.taskId = pCtx->triggerTaskId;
2407
    info.task.seriousId = pAction->triggerStatus->id.seriousId;
2408
    info.task.nodeId = pCtx->triggerNodeId;
2409
    info.task.taskIdx = 0;
2410
  
2411
    TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2412
    TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2413
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pAction->triggerStatus, 1, -1));
2414
    
2415
    atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2416
  }
2417
*/
2418

2419
_exit:
7✔
2420

2421
  if (code) {
7!
2422
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2423
  }
2424

2425
  return code;
7✔
2426
}
2427

2428

2429
static int32_t msmLaunchTaskDeployAction(SStmGrpCtx* pCtx, SStmTaskAction* pAction) {
45✔
2430
  int32_t code = TSDB_CODE_SUCCESS;
45✔
2431
  int32_t lino = 0;
45✔
2432
  int64_t streamId = pAction->streamId;
45✔
2433
  int64_t taskId = pAction->id.taskId;
45✔
2434
  SStreamObj* pStream = NULL;
45✔
2435

2436
  mstsDebug("start to handle stream tasks action, action task type:%s", gStreamTaskTypeStr[pAction->type]);
45!
2437

2438
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &pAction->streamId, sizeof(pAction->streamId));
45✔
2439
  if (NULL == pStatus) {
45!
2440
    mstsWarn("stream not in streamMap, remain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2441
    return TSDB_CODE_SUCCESS;
×
2442
  }
2443

2444
  int8_t stopped = atomic_load_8(&pStatus->stopped);
45✔
2445
  if (stopped) {
45!
2446
    mstsWarn("stream %s is already stopped %d, ignore task deploy", pStatus->streamName, stopped);
×
2447
    return TSDB_CODE_SUCCESS;
×
2448
  }
2449

2450
  code = mndAcquireStream(pCtx->pMnode, pStatus->streamName, &pStream);
45✔
2451
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
45!
2452
    mstsWarn("stream %s no longer exists, ignore task deploy", pStatus->streamName);
×
2453
    return TSDB_CODE_SUCCESS;
×
2454
  }
2455

2456
  TAOS_CHECK_EXIT(code);
45!
2457

2458
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
45✔
2459
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
45✔
2460
  if (userStopped || userDropped) {
45!
2461
    mstsWarn("stream %s is stopped %d or removing %d, ignore task deploy", pStatus->streamName, userStopped, userDropped);
×
2462
    goto _exit;
×
2463
  }
2464

2465
  switch (pAction->type) {
45!
2466
    case STREAM_READER_TASK:
38✔
2467
      TAOS_CHECK_EXIT(msmReLaunchReaderTask(pStream, pAction, pStatus));
38!
2468
      break;
38✔
2469
/*
2470
    case STREAM_TRIGGER_TASK:
2471
      TAOS_CHECK_EXIT(msmReLaunchTriggerTask(pCtx, pStream, pAction, pStatus));
2472
      break;
2473
*/
2474
    case STREAM_RUNNER_TASK:
7✔
2475
      if (pAction->multiRunner) {
7!
2476
        TAOS_CHECK_EXIT(msmReLaunchRunnerDeploy(pCtx, pStream, pAction, pStatus));
7!
2477
      } else {
2478
        mstsError("runner TASK:%" PRId64 " requires relaunch", pAction->id.taskId);
×
2479
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
2480
      }
2481
      break;
7✔
2482
    default:
×
2483
      mstsError("TASK:%" PRId64 " invalid task type:%d", pAction->id.taskId, pAction->type);
×
2484
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2485
      break;
×
2486
  }
2487

2488
_exit:
45✔
2489

2490
  if (pStream) {
45!
2491
    mndReleaseStream(pCtx->pMnode, pStream);
45✔
2492
  }
2493

2494
  if (code) {
45!
2495
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2496
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2497
  }
2498

2499
  return code;
45✔
2500
}
2501

2502
static int32_t msmTDRemoveStream(int64_t streamId) {
×
2503
  void* pIter = NULL;
×
2504
  
2505
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
×
2506
    while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
×
2507
      SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
×
2508
      int32_t taskNum = taosArrayGetSize(pVg->taskList);
×
2509
      if (atomic_load_32(&pVg->deployed) == taskNum) {
×
2510
        continue;
×
2511
      }
2512
      
2513
      for (int32_t i = 0; i < taskNum; ++i) {
×
2514
        SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
×
2515
        if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2516
          pExt->deployed = true;
×
2517
        }
2518
      }
2519
    }
2520
  }
2521

2522
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
×
2523
    while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
×
2524
      SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
×
2525
      int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
×
2526
      if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
×
2527
        for (int32_t i = 0; i < taskNum; ++i) {
×
2528
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
×
2529
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2530
            pExt->deployed = true;
×
2531
          }
2532
        }
2533
      }
2534

2535
      taskNum = taosArrayGetSize(pSnode->runnerList);
×
2536
      if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
×
2537
        for (int32_t i = 0; i < taskNum; ++i) {
×
2538
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
×
2539
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2540
            pExt->deployed = true;
×
2541
          }
2542
        }
2543
      }
2544
    }
2545
  }
2546

2547
  return TSDB_CODE_SUCCESS;
×
2548
}
2549

2550
static int32_t msmRemoveStreamFromMaps(SMnode* pMnode, int64_t streamId) {
7✔
2551
  int32_t code = TSDB_CODE_SUCCESS;
7✔
2552
  int32_t lino = 0;
7✔
2553

2554
  mstsInfo("start to remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
7!
2555

2556
  TAOS_CHECK_EXIT(msmSTRemoveStream(streamId, true));
7!
2557

2558
_exit:
7✔
2559

2560
  if (code) {
7!
2561
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2562
  } else {
2563
    mstsInfo("end remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
7!
2564
  }
2565

2566
  return code;
7✔
2567
}
2568

2569
void msmUndeployStream(SMnode* pMnode, int64_t streamId, char* streamName) {
33✔
2570
  int32_t code = TSDB_CODE_SUCCESS;
33✔
2571
  int32_t lino = 0;
33✔
2572

2573
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
33✔
2574
  if (0 == active || MND_STM_STATE_NORMAL != state) {
33!
2575
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2576
    return;
×
2577
  }
2578

2579
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
33✔
2580
  if (NULL == pStream) {
33✔
2581
    mstsInfo("stream %s already not in streamMap", streamName);
2!
2582
    goto _exit;
2✔
2583
  }
2584

2585
  atomic_store_8(&pStream->stopped, 2);
31✔
2586

2587
  mstsInfo("set stream %s stopped by user", streamName);
31!
2588

2589
_exit:
×
2590

2591
  taosHashRelease(mStreamMgmt.streamMap, pStream);
33✔
2592

2593
  if (code) {
33!
2594
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2595
  }
2596

2597
  return;
33✔
2598
}
2599

2600
int32_t msmRecalcStream(SMnode* pMnode, int64_t streamId, STimeWindow* timeRange) {
1✔
2601
  int32_t code = TSDB_CODE_SUCCESS;
1✔
2602
  int32_t lino = 0;
1✔
2603

2604
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
1✔
2605
  if (0 == active || MND_STM_STATE_NORMAL != state) {
1!
2606
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2607
    return TSDB_CODE_MND_STREAM_NOT_AVAILABLE;
×
2608
  }
2609

2610
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1✔
2611
  if (NULL == pStream || !STREAM_IS_RUNNING(pStream)) {
1!
2612
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
2613
    mstsInfo("stream still not in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2614
    goto _exit;
×
2615
  }
2616

2617
  TAOS_CHECK_EXIT(mstAppendNewRecalcRange(streamId, pStream, timeRange));
1!
2618

2619
_exit:
1✔
2620

2621
  taosHashRelease(mStreamMgmt.streamMap, pStream);
1✔
2622

2623
  if (code) {
1!
2624
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2625
  }
2626

2627
  return code;
1✔
2628
}
2629

2630
static void msmHandleStreamActions(SStmGrpCtx* pCtx) {
86✔
2631
  int32_t code = TSDB_CODE_SUCCESS;
86✔
2632
  int32_t lino = 0;
86✔
2633
  SStmQNode* pQNode = NULL;
86✔
2634

2635
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
431✔
2636
    switch (pQNode->type) {
345!
2637
      case STREAM_ACT_DEPLOY:
345✔
2638
        if (pQNode->streamAct) {
345✔
2639
          mstDebug("start to handle stream deploy action");
300✔
2640
          TAOS_CHECK_EXIT(msmLaunchStreamDeployAction(pCtx, &pQNode->action.stream));
300!
2641
        } else {
2642
          mstDebug("start to handle task deploy action");
45!
2643
          TAOS_CHECK_EXIT(msmLaunchTaskDeployAction(pCtx, &pQNode->action.task));
45!
2644
        }
2645
        break;
345✔
2646
      default:
×
2647
        break;
×
2648
    }
2649
  }
2650

2651
_exit:
86✔
2652

2653
  if (code) {
86!
2654
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2655
  }
2656
}
86✔
2657

2658
void msmStopAllStreamsByGrant(int32_t errCode) {
×
2659
  SStmStatus* pStatus = NULL;
×
2660
  void* pIter = NULL;
×
2661
  int64_t streamId = 0;
×
2662
  
2663
  while (true) {
2664
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
2665
    if (NULL == pIter) {
×
2666
      break;
×
2667
    }
2668

2669
    pStatus = (SStmStatus*)pIter;
×
2670

2671
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
2672
    atomic_store_8(&pStatus->stopped, 4);
×
2673

2674
    mstsInfo("set stream stopped since %s", tstrerror(errCode));
×
2675
  }
2676
}
×
2677

2678
int32_t msmHandleGrantExpired(SMnode *pMnode, int32_t errCode) {
×
2679
  mstInfo("stream grant expired");
×
2680

2681
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
×
2682
    mstWarn("mnode stream is NOT active, ignore handling");
×
2683
    return errCode;
×
2684
  }
2685

2686
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
2687

2688
  msmStopAllStreamsByGrant(errCode);
×
2689

2690
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
2691
  
2692
  return errCode;
×
2693
}
2694

2695
static int32_t msmInitStreamDeploy(SStmStreamDeploy* pStream, SStmTaskDeploy* pDeploy) {
1,768✔
2696
  int32_t code = TSDB_CODE_SUCCESS;
1,768✔
2697
  int32_t lino = 0;
1,768✔
2698
  int64_t streamId = pDeploy->task.streamId;
1,768✔
2699
  
2700
  switch (pDeploy->task.type) {
1,768!
2701
    case STREAM_READER_TASK:
581✔
2702
      if (NULL == pStream->readerTasks) {
581✔
2703
        pStream->streamId = streamId;
336✔
2704
        pStream->readerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
336✔
2705
        TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
336!
2706
      }
2707
      
2708
      TSDB_CHECK_NULL(taosArrayPush(pStream->readerTasks, pDeploy), code, lino, _exit, terrno);
1,162!
2709
      break;
581✔
2710
    case STREAM_TRIGGER_TASK:
296✔
2711
      pStream->streamId = streamId;
296✔
2712
      pStream->triggerTask = taosMemoryMalloc(sizeof(SStmTaskDeploy));
296!
2713
      TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
296!
2714
      memcpy(pStream->triggerTask, pDeploy, sizeof(SStmTaskDeploy));
296✔
2715
      break;
296✔
2716
    case STREAM_RUNNER_TASK:
891✔
2717
      if (NULL == pStream->runnerTasks) {
891✔
2718
        pStream->streamId = streamId;
325✔
2719
        pStream->runnerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
325✔
2720
        TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
325!
2721
      }      
2722
      TSDB_CHECK_NULL(taosArrayPush(pStream->runnerTasks, pDeploy), code, lino, _exit, terrno);
1,782!
2723
      break;
891✔
2724
    default:
×
2725
      break;
×
2726
  }
2727

2728
_exit:
1,768✔
2729

2730
  if (code) {
1,768!
2731
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2732
  }
2733

2734
  return code;
1,768✔
2735
}
2736

2737
static int32_t msmGrpAddDeployTask(SHashObj* pHash, SStmTaskDeploy* pDeploy) {
1,768✔
2738
  int32_t code = TSDB_CODE_SUCCESS;
1,768✔
2739
  int32_t lino = 0;
1,768✔
2740
  int64_t streamId = pDeploy->task.streamId;
1,768✔
2741
  SStreamTask* pTask = &pDeploy->task;
1,768✔
2742
  SStmStreamDeploy streamDeploy = {0};
1,768✔
2743
  SStmStreamDeploy* pStream = NULL;
1,768✔
2744
   
2745
  while (true) {
2746
    pStream = taosHashAcquire(pHash, &streamId, sizeof(streamId));
1,768✔
2747
    if (NULL == pStream) {
1,768✔
2748
      TAOS_CHECK_EXIT(msmInitStreamDeploy(&streamDeploy, pDeploy));
380!
2749
      code = taosHashPut(pHash, &streamId, sizeof(streamId), &streamDeploy, sizeof(streamDeploy));
380✔
2750
      if (TSDB_CODE_SUCCESS == code) {
380!
2751
        goto _exit;
380✔
2752
      }
2753

2754
      if (TSDB_CODE_DUP_KEY != code) {
×
2755
        goto _exit;
×
2756
      }    
2757

2758
      tFreeSStmStreamDeploy(&streamDeploy);
×
2759
      continue;
×
2760
    }
2761

2762
    TAOS_CHECK_EXIT(msmInitStreamDeploy(pStream, pDeploy));
1,388!
2763
    
2764
    break;
1,388✔
2765
  }
2766
  
2767
_exit:
1,768✔
2768

2769
  taosHashRelease(pHash, pStream);
1,768✔
2770

2771
  if (code) {
1,768!
2772
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2773
  } else {
2774
    msttDebug("task added to GRP deployMap, taskIdx:%d", pTask->taskIdx);
1,768✔
2775
  }
2776

2777
  return code;
1,768✔
2778
}
2779

2780

2781
int32_t msmGrpAddDeployTasks(SHashObj* pHash, SArray* pTasks, int32_t* deployed) {
408✔
2782
  int32_t code = TSDB_CODE_SUCCESS;
408✔
2783
  int32_t lino = 0;
408✔
2784
  int32_t taskNum = taosArrayGetSize(pTasks);
408✔
2785

2786
  for (int32_t i = 0; i < taskNum; ++i) {
2,196✔
2787
    SStmTaskToDeployExt* pExt = taosArrayGet(pTasks, i);
1,788✔
2788
    if (pExt->deployed) {
1,788✔
2789
      continue;
20✔
2790
    }
2791

2792
    TAOS_CHECK_EXIT(msmGrpAddDeployTask(pHash, &pExt->deploy));
1,768!
2793
    pExt->deployed = true;
1,768✔
2794

2795
    (void)atomic_add_fetch_32(deployed, 1);
1,768✔
2796
  }
2797

2798
_exit:
408✔
2799

2800
  if (code) {
408!
2801
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2802
  }
2803

2804
  return code;
408✔
2805
}
2806

2807
int32_t msmGrpAddDeployVgTasks(SStmGrpCtx* pCtx) {
158✔
2808
  int32_t code = TSDB_CODE_SUCCESS;
158✔
2809
  int32_t lino = 0;
158✔
2810
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
158✔
2811
  SStmVgTasksToDeploy* pVg = NULL;
158✔
2812
  //int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pCtx->pReq->streamGId);
2813

2814
  mstDebug("start to add stream vgroup tasks deploy");
158✔
2815
  
2816
  for (int32_t i = 0; i < vgNum; ++i) {
605✔
2817
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
447✔
2818

2819
    msmUpdateVgroupUpTs(pCtx, *vgId);
447✔
2820

2821
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
447✔
2822
    if (NULL == pVg) {
447✔
2823
      continue;
216✔
2824
    }
2825

2826
    if (taosRTryLockLatch(&pVg->lock)) {
231!
2827
      continue;
×
2828
    }
2829
    
2830
    if (atomic_load_32(&pVg->deployed) == taosArrayGetSize(pVg->taskList)) {
231!
2831
      taosRUnLockLatch(&pVg->lock);
×
2832
      continue;
×
2833
    }
2834
    
2835
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pVg->taskList, &pVg->deployed));
231!
2836
    taosRUnLockLatch(&pVg->lock);
231✔
2837
  }
2838

2839
_exit:
158✔
2840

2841
  if (code) {
158!
2842
    if (pVg) {
×
2843
      taosRUnLockLatch(&pVg->lock);
×
2844
    }
2845

2846
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2847
  }
2848

2849
  return code;
158✔
2850
}
2851

2852
int32_t msmGrpAddDeploySnodeTasks(SStmGrpCtx* pCtx) {
145✔
2853
  int32_t code = TSDB_CODE_SUCCESS;
145✔
2854
  int32_t lino = 0;
145✔
2855
  SStmSnodeTasksDeploy* pSnode = NULL;
145✔
2856
  SStreamHbMsg* pReq = pCtx->pReq;
145✔
2857

2858
  mstDebug("start to add stream snode tasks deploy");
145✔
2859
  
2860
  pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pReq->snodeId, sizeof(pReq->snodeId));
145✔
2861
  if (NULL == pSnode) {
145✔
2862
    return TSDB_CODE_SUCCESS;
49✔
2863
  }
2864

2865
  (void)mstWaitLock(&pSnode->lock, false);
96✔
2866
  
2867
  if (atomic_load_32(&pSnode->triggerDeployed) < taosArrayGetSize(pSnode->triggerList)) {
96✔
2868
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->triggerList, &pSnode->triggerDeployed));
81!
2869
  }
2870

2871
  if (atomic_load_32(&pSnode->runnerDeployed) < taosArrayGetSize(pSnode->runnerList)) {
96!
2872
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->runnerList, &pSnode->runnerDeployed));
96!
2873
  }
2874
  
2875
  taosWUnLockLatch(&pSnode->lock);
96✔
2876

2877
_exit:
96✔
2878

2879
  if (code) {
96!
2880
    if (pSnode) {
×
2881
      taosWUnLockLatch(&pSnode->lock);
×
2882
    }
2883

2884
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2885
  }
2886

2887
  return code;
96✔
2888
}
2889

2890
int32_t msmUpdateStreamLastActTs(int64_t streamId, int64_t currTs) {
905✔
2891
  int32_t code = TSDB_CODE_SUCCESS;
905✔
2892
  int32_t lino = 0;
905✔
2893
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
905✔
2894
  if (NULL == pStatus) {
905!
2895
    mstsWarn("stream already not exists in streamMap, mapSize:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2896
    return TSDB_CODE_SUCCESS;
×
2897
  }
2898
  
2899
  pStatus->lastActionTs = currTs;
905✔
2900

2901
_exit:
905✔
2902

2903
  if (code) {
905!
2904
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2905
  }
2906

2907
  return code;
905✔
2908
}
2909

2910
int32_t msmRspAddStreamsDeploy(SStmGrpCtx* pCtx) {
108✔
2911
  int32_t code = TSDB_CODE_SUCCESS;
108✔
2912
  int32_t lino = 0;
108✔
2913
  int32_t streamNum = taosHashGetSize(pCtx->deployStm);
108✔
2914
  void* pIter = NULL;
108✔
2915

2916
  mstDebug("start to add group %d deploy streams, streamNum:%d", pCtx->pReq->streamGId, taosHashGetSize(pCtx->deployStm));
108✔
2917
  
2918
  pCtx->pRsp->deploy.streamList = taosArrayInit(streamNum, sizeof(SStmStreamDeploy));
108✔
2919
  TSDB_CHECK_NULL(pCtx->pRsp->deploy.streamList, code, lino, _exit, terrno);
108!
2920

2921
  while (1) {
380✔
2922
    pIter = taosHashIterate(pCtx->deployStm, pIter);
488✔
2923
    if (pIter == NULL) {
488✔
2924
      break;
108✔
2925
    }
2926
    
2927
    SStmStreamDeploy *pDeploy = (SStmStreamDeploy *)pIter;
380✔
2928
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->deploy.streamList, pDeploy), code, lino, _exit, terrno);
760!
2929

2930
    int64_t streamId = pDeploy->streamId;
380✔
2931
    mstsDebug("stream DEPLOY added to dnode %d hb rsp, readerTasks:%zu, triggerTask:%d, runnerTasks:%zu", 
380✔
2932
        pCtx->pReq->dnodeId, taosArrayGetSize(pDeploy->readerTasks), pDeploy->triggerTask ? 1 : 0, taosArrayGetSize(pDeploy->runnerTasks));
2933

2934
    mstClearSStmStreamDeploy(pDeploy);
380✔
2935
    
2936
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(pDeploy->streamId, pCtx->currTs));
380!
2937
  }
2938
  
2939
_exit:
108✔
2940

2941
  if (pIter) {
108!
2942
    taosHashCancelIterate(pCtx->deployStm, pIter);
×
2943
  }
2944

2945
  if (code) {
108!
2946
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2947
  }
2948
  
2949
  return code;
108✔
2950
}
2951

2952
void msmCleanDeployedVgTasks(SArray* pVgLeaders) {
158✔
2953
  int32_t code = TSDB_CODE_SUCCESS;
158✔
2954
  int32_t lino = 0;
158✔
2955
  int32_t vgNum = taosArrayGetSize(pVgLeaders);
158✔
2956
  SStmVgTasksToDeploy* pVg = NULL;
158✔
2957
  
2958
  for (int32_t i = 0; i < vgNum; ++i) {
605✔
2959
    int32_t* vgId = taosArrayGet(pVgLeaders, i);
447✔
2960
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
447✔
2961
    if (NULL == pVg) {
447✔
2962
      continue;
216✔
2963
    }
2964

2965
    if (taosWTryLockLatch(&pVg->lock)) {
231!
2966
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2967
      continue;
×
2968
    }
2969
    
2970
    if (atomic_load_32(&pVg->deployed) <= 0) {
231!
2971
      taosWUnLockLatch(&pVg->lock);
×
2972
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2973
      continue;
×
2974
    }
2975

2976
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
231✔
2977
    if (atomic_load_32(&pVg->deployed) == taskNum) {
231✔
2978
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, taskNum);
230✔
2979
      taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
230✔
2980
      pVg->taskList = NULL;
230✔
2981
      TAOS_UNUSED(taosHashRemove(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId)));
230✔
2982
      taosWUnLockLatch(&pVg->lock);
230✔
2983
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
230✔
2984
      continue;
230✔
2985
    }
2986

2987
    for (int32_t m = taskNum - 1; m >= 0; --m) {
15✔
2988
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, m);
14✔
2989
      if (!pExt->deployed) {
14!
2990
        continue;
×
2991
      }
2992

2993
      mstDestroySStmTaskToDeployExt(pExt);
14✔
2994

2995
      taosArrayRemove(pVg->taskList, m);
14✔
2996
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
14✔
2997
    }
2998
    atomic_store_32(&pVg->deployed, 0);
1✔
2999
    taosWUnLockLatch(&pVg->lock);
1✔
3000
    taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
1✔
3001
  }
3002

3003
_exit:
158✔
3004

3005
  if (code) {
158!
3006
    if (pVg) {
×
3007
      taosWUnLockLatch(&pVg->lock);
×
3008
    }
3009

3010
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3011
  }
3012
}
158✔
3013

3014
void msmCleanDeployedSnodeTasks (int32_t snodeId) {
171✔
3015
  if (!GOT_SNODE(snodeId)) {
171✔
3016
    return;
26✔
3017
  }
3018
  
3019
  int32_t code = TSDB_CODE_SUCCESS;
145✔
3020
  SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId));
145✔
3021
  if (NULL == pSnode) {
145✔
3022
    return;
49✔
3023
  }
3024

3025
  if (taosWTryLockLatch(&pSnode->lock)) {
96!
3026
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3027
    return;
×
3028
  }
3029

3030
  int32_t triggerNum = taosArrayGetSize(pSnode->triggerList);
96✔
3031
  int32_t runnerNum = taosArrayGetSize(pSnode->runnerList);
96✔
3032
  
3033
  if (atomic_load_32(&pSnode->triggerDeployed) <= 0 && atomic_load_32(&pSnode->runnerDeployed) <= 0) {
96!
3034
    taosWUnLockLatch(&pSnode->lock);
×
3035
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3036
    return;
×
3037
  }
3038

3039
  if (atomic_load_32(&pSnode->triggerDeployed) == triggerNum) {
96✔
3040
    (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, triggerNum);
95✔
3041
    taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
95✔
3042
    pSnode->triggerList = NULL;
95✔
3043
  }
3044

3045
  if (atomic_load_32(&pSnode->runnerDeployed) == runnerNum) {
96✔
3046
    (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, runnerNum);
94✔
3047
    taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
94✔
3048
    pSnode->runnerList = NULL;
94✔
3049
  }
3050

3051
  if (NULL == pSnode->triggerList && NULL == pSnode->runnerList) {
96✔
3052
    TAOS_UNUSED(taosHashRemove(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId)));
94✔
3053
    taosWUnLockLatch(&pSnode->lock);
94✔
3054
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
94✔
3055
    return;
94✔
3056
  }
3057

3058
  if (atomic_load_32(&pSnode->triggerDeployed) > 0 && pSnode->triggerList) {
2!
3059
    for (int32_t m = triggerNum - 1; m >= 0; --m) {
7✔
3060
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, m);
6✔
3061
      if (!pExt->deployed) {
6!
3062
        continue;
×
3063
      }
3064

3065
      mstDestroySStmTaskToDeployExt(pExt);
6✔
3066
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
6✔
3067
      taosArrayRemove(pSnode->triggerList, m);
6✔
3068
    }
3069
    
3070
    pSnode->triggerDeployed = 0;
1✔
3071
  }
3072

3073
  if (atomic_load_32(&pSnode->runnerDeployed) > 0 && pSnode->runnerList) {
2!
3074
    for (int32_t m = runnerNum - 1; m >= 0; --m) {
20✔
3075
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, m);
18✔
3076
      if (!pExt->deployed) {
18!
3077
        continue;
×
3078
      }
3079

3080
      mstDestroySStmTaskToDeployExt(pExt);
18✔
3081
      (void)atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
18✔
3082
      taosArrayRemove(pSnode->runnerList, m);
18✔
3083
    }
3084
    
3085
    pSnode->runnerDeployed = 0;
2✔
3086
  }
3087
  
3088
  taosWUnLockLatch(&pSnode->lock);
2✔
3089
  taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
2✔
3090
}
3091

3092
void msmClearStreamToDeployMaps(SStreamHbMsg* pHb) {
32,764✔
3093
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
32,764✔
3094
    msmCleanDeployedVgTasks(pHb->pVgLeaders);
158✔
3095
  }
3096

3097
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
32,764✔
3098
    msmCleanDeployedSnodeTasks(pHb->snodeId);
171✔
3099
  }
3100
}
32,764✔
3101

3102
void msmCleanStreamGrpCtx(SStreamHbMsg* pHb) {
32,764✔
3103
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
32,764✔
3104
  if (mStreamMgmt.tCtx) {
32,764✔
3105
    taosHashClear(mStreamMgmt.tCtx[tidx].actionStm[pHb->streamGId]);
32,084✔
3106
    taosHashClear(mStreamMgmt.tCtx[tidx].deployStm[pHb->streamGId]);
32,084✔
3107
  }
3108
}
32,764✔
3109

3110
int32_t msmGrpAddActionStart(SHashObj* pHash, int64_t streamId, SStmTaskId* pId) {
356✔
3111
  int32_t code = TSDB_CODE_SUCCESS;
356✔
3112
  int32_t lino = 0;
356✔
3113
  int32_t action = STREAM_ACT_START;
356✔
3114
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
356✔
3115
  if (pAction) {
356!
3116
    pAction->actions |= action;
×
3117
    pAction->start.triggerId = *pId;
×
3118
    mstsDebug("stream append START action, actions:%x", pAction->actions);
×
3119
  } else {
3120
    SStmAction newAction = {0};
356✔
3121
    newAction.actions = action;
356✔
3122
    newAction.start.triggerId = *pId;
356✔
3123
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
356!
3124
    mstsDebug("stream add START action, actions:%x", newAction.actions);
356✔
3125
  }
3126

3127
_exit:
356✔
3128

3129
  if (code) {
356!
3130
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3131
  }
3132

3133
  return code;
356✔
3134
}
3135

3136
int32_t msmGrpAddActionUpdateTrigger(SHashObj* pHash, int64_t streamId) {
×
3137
  int32_t code = TSDB_CODE_SUCCESS;
×
3138
  int32_t lino = 0;
×
3139
  int32_t action = STREAM_ACT_UPDATE_TRIGGER;
×
3140
  
3141
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
×
3142
  if (pAction) {
×
3143
    pAction->actions |= action;
×
3144
    mstsDebug("stream append UPDATE_TRIGGER action, actions:%x", pAction->actions);
×
3145
  } else {
3146
    SStmAction newAction = {0};
×
3147
    newAction.actions = action;
×
3148
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3149
    mstsDebug("stream add UPDATE_TRIGGER action, actions:%x", newAction.actions);
×
3150
  }
3151

3152
_exit:
×
3153

3154
  if (code) {
×
3155
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3156
  }
3157

3158
  return code;
×
3159
}
3160

3161

3162

3163
int32_t msmGrpAddActionUndeploy(SStmGrpCtx* pCtx, int64_t streamId, SStreamTask* pTask) {
169✔
3164
  int32_t code = TSDB_CODE_SUCCESS;
169✔
3165
  int32_t lino = 0;
169✔
3166
  int32_t action = STREAM_ACT_UNDEPLOY;
169✔
3167
  bool    dropped = false;
169✔
3168

3169
  TAOS_CHECK_EXIT(mstIsStreamDropped(pCtx->pMnode, streamId, &dropped));
169!
3170
  mstsDebug("stream dropped: %d", dropped);
169✔
3171
  
3172
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
169✔
3173
  if (pAction) {
169✔
3174
    pAction->actions |= action;
106✔
3175
    if (NULL == pAction->undeploy.taskList) {
106!
3176
      pAction->undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
×
3177
      TSDB_CHECK_NULL(pAction->undeploy.taskList, code, lino, _exit, terrno);
×
3178
    }
3179

3180
    TSDB_CHECK_NULL(taosArrayPush(pAction->undeploy.taskList, &pTask), code, lino, _exit, terrno);
212!
3181
    if (pAction->undeploy.doCheckpoint) {
106✔
3182
      pAction->undeploy.doCheckpoint = dropped ? false : true;
64✔
3183
    }
3184
    if (!pAction->undeploy.doCleanup) {
106✔
3185
      pAction->undeploy.doCleanup = dropped ? true : false;
64✔
3186
    }
3187
    
3188
    msttDebug("task append UNDEPLOY action[%d,%d], actions:%x", pAction->undeploy.doCheckpoint, pAction->undeploy.doCleanup, pAction->actions);
106✔
3189
  } else {
3190
    SStmAction newAction = {0};
63✔
3191
    newAction.actions = action;
63✔
3192
    newAction.undeploy.doCheckpoint = dropped ? false : true;
63✔
3193
    newAction.undeploy.doCleanup = dropped ? true : false;
63✔
3194
    newAction.undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
63✔
3195
    TSDB_CHECK_NULL(newAction.undeploy.taskList, code, lino, _exit, terrno);
63!
3196
    TSDB_CHECK_NULL(taosArrayPush(newAction.undeploy.taskList, &pTask), code, lino, _exit, terrno);
126!
3197
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
63!
3198
    
3199
    msttDebug("task add UNDEPLOY action[%d,%d]", newAction.undeploy.doCheckpoint, newAction.undeploy.doCleanup);
63✔
3200
  }
3201

3202
_exit:
169✔
3203

3204
  if (code) {
169!
3205
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3206
  }
3207

3208
  return code;
169✔
3209
}
3210

3211
int32_t msmGrpAddActionRecalc(SStmGrpCtx* pCtx, int64_t streamId, SArray* recalcList) {
×
3212
  int32_t code = TSDB_CODE_SUCCESS;
×
3213
  int32_t lino = 0;
×
3214
  int32_t action = STREAM_ACT_RECALC;
×
3215
  SStmAction newAction = {0};
×
3216
  
3217
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
×
3218
  if (pAction) {
×
3219
    pAction->actions |= action;
×
3220
    pAction->recalc.recalcList = recalcList;
×
3221

3222
    mstsDebug("stream append recalc action, listSize:%d, actions:%x", (int32_t)taosArrayGetSize(recalcList), pAction->actions);
×
3223
  } else {
3224
    newAction.actions = action;
×
3225
    newAction.recalc.recalcList = recalcList;
×
3226
    
3227
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3228
    
3229
    mstsDebug("stream add recalc action, listSize:%d", (int32_t)taosArrayGetSize(recalcList));
×
3230
  }
3231

3232
_exit:
×
3233

3234
  if (code) {
×
3235
    mstDestroySStmAction(&newAction);
×
3236
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3237
  }
3238

3239
  return code;
×
3240
}
3241

3242
bool msmCheckStreamStartCond(int64_t streamId, int32_t snodeId) {
413✔
3243
  SStmStatus* pStream = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
413✔
3244
  if (NULL == pStream) {
413!
3245
    return false;
×
3246
  }
3247

3248
  if (pStream->triggerTask->id.nodeId != snodeId || STREAM_STATUS_INIT != pStream->triggerTask->status) {
413!
3249
    return false;
×
3250
  }
3251

3252
  int32_t readerNum = taosArrayGetSize(pStream->trigReaders);
413✔
3253
  for (int32_t i = 0; i < readerNum; ++i) {
920✔
3254
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigReaders, i);
507✔
3255
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
507!
3256
      return false;
×
3257
    }
3258
  }
3259

3260
  readerNum = taosArrayGetSize(pStream->trigOReaders);
413✔
3261
  for (int32_t i = 0; i < readerNum; ++i) {
437✔
3262
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigOReaders, i);
24✔
3263
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
24!
3264
      return false;
×
3265
    }
3266
  }
3267

3268
  readerNum = taosArrayGetSize(pStream->calcReaders);
413✔
3269
  for (int32_t i = 0; i < readerNum; ++i) {
616✔
3270
    SStmTaskStatus* pStatus = taosArrayGet(pStream->calcReaders, i);
203✔
3271
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
203!
3272
      return false;
×
3273
    }
3274
  }
3275

3276
  for (int32_t i = 0; i < pStream->runnerDeploys; ++i) {
1,482✔
3277
    int32_t runnerNum = taosArrayGetSize(pStream->runners[i]);
1,126✔
3278
    for (int32_t m = 0; m < runnerNum; ++m) {
2,195✔
3279
      SStmTaskStatus* pStatus = taosArrayGet(pStream->runners[i], m);
1,126✔
3280
      if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
1,126!
3281
        return false;
57✔
3282
      }
3283
    }
3284
  }
3285
  
3286
  return true;
356✔
3287
}
3288

3289

3290
void msmHandleTaskAbnormalStatus(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pMsg, SStmTaskStatus* pTaskStatus) {
8,567✔
3291
  int32_t code = TSDB_CODE_SUCCESS;
8,567✔
3292
  int32_t lino = 0;
8,567✔
3293
  int32_t action = 0;
8,567✔
3294
  int64_t streamId = pMsg->streamId;
8,567✔
3295
  SStreamTask* pTask = (SStreamTask*)pMsg;
8,567✔
3296
  int8_t  stopped = 0;
8,567✔
3297

3298
  msttDebug("start to handle task abnormal status %d", pTask->status);
8,567✔
3299
  
3300
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
8,567✔
3301
  if (NULL == pStatus) {
8,567!
3302
    msttInfo("stream no longer exists in streamMap, try to undeploy current task, idx:%d", pMsg->taskIdx);
×
3303
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3304
    return;
8,132✔
3305
  }
3306

3307
  stopped = atomic_load_8(&pStatus->stopped);
8,567✔
3308
  if (stopped) {
8,567!
3309
    msttInfo("stream stopped %d, try to undeploy current task, idx:%d", stopped, pMsg->taskIdx);
×
3310
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3311
    return;
×
3312
  }
3313
  
3314
  switch (pMsg->status) {
8,567!
3315
    case STREAM_STATUS_INIT:      
8,545✔
3316
      if (STREAM_TRIGGER_TASK != pMsg->type) {
8,545✔
3317
        msttTrace("task status is INIT and not trigger task, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
7,872!
3318
        return;
7,872✔
3319
      }
3320
      
3321
      if (INT64_MIN == pStatus->lastActionTs) {
673!
3322
        msttDebug("task still not deployed, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
×
3323
        return;
×
3324
      }
3325
      
3326
      if ((pCtx->currTs - pStatus->lastActionTs) < STREAM_ACT_MIN_DELAY_MSEC) {
673✔
3327
        msttDebug("task wait not enough between actions, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
260✔
3328
        return;
260✔
3329
      }
3330

3331
      if (STREAM_IS_RUNNING(pStatus)) {
413!
3332
        msttDebug("stream already running, ignore status: %s", gStreamStatusStr[pTask->status]);
×
3333
      } else if (GOT_SNODE(pCtx->pReq->snodeId) && msmCheckStreamStartCond(streamId, pCtx->pReq->snodeId)) {
413!
3334
        TAOS_CHECK_EXIT(msmGrpAddActionStart(pCtx->actionStm, streamId, &pStatus->triggerTask->id));
356!
3335
      }
3336
      break;
413✔
3337
    case STREAM_STATUS_FAILED:
22✔
3338
      //STREAMTODO ADD ERRCODE HANDLE
3339
      msttInfo("task failed with error:%s, try to undeploy current task, idx:%d", tstrerror(pMsg->errorCode), pMsg->taskIdx);
22!
3340
      TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
22!
3341
      break;
22✔
3342
    default:
×
3343
      break;
×
3344
  }
3345

3346
_exit:
435✔
3347

3348
  if (code) {
435!
3349
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
3350
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3351
  }
3352
}
3353

3354
void msmHandleStatusUpdateErr(SStmGrpCtx* pCtx, EStmErrType err, SStmTaskStatusMsg* pStatus) {
147✔
3355
  int32_t code = TSDB_CODE_SUCCESS;
147✔
3356
  int32_t lino = 0;
147✔
3357
  SStreamTask* pTask = (SStreamTask*)pStatus;
147✔
3358
  int64_t streamId = pStatus->streamId;
147✔
3359

3360
  msttInfo("start to handle task status update exception, type: %d", err);
147!
3361
  
3362
  // STREAMTODO
3363

3364
  if (STM_ERR_TASK_NOT_EXISTS == err || STM_ERR_STREAM_STOPPED == err) {
147!
3365
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
147!
3366
  }
3367

3368
_exit:
147✔
3369

3370
  if (code) {
147!
3371
    // IGNORE STOP STREAM BY ERROR  
3372
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3373
  }
3374
}
147✔
3375

3376
void msmChkHandleTriggerOperations(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask, SStmTaskStatus* pStatus) {
3,447✔
3377
  int32_t code = TSDB_CODE_SUCCESS;
3,447✔
3378
  int32_t lino = 0;
3,447✔
3379
  SStmStatus* pStream = (SStmStatus*)pStatus->pStream;
3,447✔
3380

3381
  if (1 == atomic_val_compare_exchange_8(&pStream->triggerNeedUpdate, 1, 0)) {
3,447!
3382
    TAOS_CHECK_EXIT(msmGrpAddActionUpdateTrigger(pCtx->actionStm, pTask->streamId));
×
3383
  }
3384
  
3385
  SArray* userRecalcList = NULL;
3,447✔
3386
  if (atomic_load_ptr(&pStream->userRecalcList)) {
3,447!
3387
    taosWLockLatch(&pStream->userRecalcLock);
×
3388
    if (pStream->userRecalcList) {
×
3389
      userRecalcList = pStream->userRecalcList;
×
3390
      pStream->userRecalcList = NULL;
×
3391
    }
3392
    taosWUnLockLatch(&pStream->userRecalcLock);
×
3393
    
3394
    if (userRecalcList) {
×
3395
      TAOS_CHECK_EXIT(msmGrpAddActionRecalc(pCtx, pTask->streamId, userRecalcList));
×
3396
    }
3397
  }
3398

3399
  if (pTask->detailStatus >= 0 && pCtx->pReq->pTriggerStatus) {
3,447!
3400
    (void)mstWaitLock(&pStatus->detailStatusLock, false);
1,685✔
3401
    if (NULL == pStatus->detailStatus) {
1,685✔
3402
      pStatus->detailStatus = taosMemoryCalloc(1, sizeof(SSTriggerRuntimeStatus));
291!
3403
      if (NULL == pStatus->detailStatus) {
291!
3404
        taosWUnLockLatch(&pStatus->detailStatusLock);
×
3405
        TSDB_CHECK_NULL(pStatus->detailStatus, code, lino, _exit, terrno);
×
3406
      }
3407
    }
3408
    
3409
    memcpy(pStatus->detailStatus, taosArrayGet(pCtx->pReq->pTriggerStatus, pTask->detailStatus), sizeof(SSTriggerRuntimeStatus));
1,685✔
3410
    taosWUnLockLatch(&pStatus->detailStatusLock);
1,685✔
3411
  }
3412

3413
_exit:
1,762✔
3414

3415
  if (code) {
3,447!
3416
    // IGNORE STOP STREAM BY ERROR
3417
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3418
  }
3419
}
3,447✔
3420

3421
int32_t msmNormalHandleStatusUpdate(SStmGrpCtx* pCtx) {
1,638✔
3422
  int32_t code = TSDB_CODE_SUCCESS;
1,638✔
3423
  int32_t lino = 0;
1,638✔
3424
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
1,638✔
3425

3426
  mstDebug("NORMAL: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
1,638✔
3427

3428
  for (int32_t i = 0; i < num; ++i) {
21,127✔
3429
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
19,489✔
3430
    msttDebug("task status %s got on dnode %d, taskIdx:%d", gStreamStatusStr[pTask->status], pCtx->pReq->dnodeId, pTask->taskIdx);
19,489✔
3431
    
3432
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
19,489✔
3433
    if (NULL == ppStatus) {
19,489✔
3434
      msttWarn("task no longer exists in taskMap, will try to undeploy current task, taskIdx:%d", pTask->taskIdx);
32!
3435
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
32✔
3436
      continue;
32✔
3437
    }
3438

3439
    SStmStatus* pStream = (SStmStatus*)(*ppStatus)->pStream;
19,457✔
3440
    int8_t stopped = atomic_load_8(&pStream->stopped);
19,457✔
3441
    if (stopped) {
19,457✔
3442
      msttWarn("stream already stopped %d, will try to undeploy current task, taskIdx:%d", stopped, pTask->taskIdx);
111!
3443
      msmHandleStatusUpdateErr(pCtx, STM_ERR_STREAM_STOPPED, pTask);
111✔
3444
      continue;
111✔
3445
    }
3446

3447
    if ((pTask->seriousId != (*ppStatus)->id.seriousId) || (pTask->nodeId != (*ppStatus)->id.nodeId)) {
19,346!
3448
      msttInfo("task mismatch with it in taskMap, will try to rm it, current seriousId:%" PRId64 ", nodeId:%d", 
4!
3449
          (*ppStatus)->id.seriousId, (*ppStatus)->id.nodeId);
3450
          
3451
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
4✔
3452
      continue;
4✔
3453
    }
3454

3455
    if ((*ppStatus)->status != pTask->status) {
19,342✔
3456
      if (STREAM_STATUS_RUNNING == pTask->status) {
3,021✔
3457
        (*ppStatus)->runningStartTs = pCtx->currTs;
1,279✔
3458
      } else if (MST_IS_RUNNER_GETTING_READY(pTask) && STREAM_IS_REDEPLOY_RUNNER((*ppStatus)->flags)) {
1,742!
3459
        if (pStream->triggerTask) {
×
3460
          atomic_store_8(&pStream->triggerNeedUpdate, 1);
×
3461
        }
3462
        
3463
        STREAM_CLR_FLAG((*ppStatus)->flags, STREAM_FLAG_REDEPLOY_RUNNER);
×
3464
      }
3465
    }
3466
    
3467
    (*ppStatus)->errCode = pTask->errorCode;
19,342✔
3468
    (*ppStatus)->status = pTask->status;
19,342✔
3469
    (*ppStatus)->lastUpTs = pCtx->currTs;
19,342✔
3470
    
3471
    if (STREAM_STATUS_RUNNING != pTask->status) {
19,342✔
3472
      msmHandleTaskAbnormalStatus(pCtx, pTask, *ppStatus);
8,567✔
3473
    }
3474
    
3475
    if (STREAM_TRIGGER_TASK == pTask->type) {
19,342✔
3476
      msmChkHandleTriggerOperations(pCtx, pTask, *ppStatus);
3,447✔
3477
    }
3478
  }
3479

3480
_exit:
1,638✔
3481

3482
  if (code) {
1,638!
3483
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3484
  }
3485

3486
  return code;
1,638✔
3487
}
3488

3489
int32_t msmWatchRecordNewTask(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
×
3490
  int32_t code = TSDB_CODE_SUCCESS;
×
3491
  int32_t lino = 0;
×
3492
  int64_t streamId = pTask->streamId;
×
3493
  SStreamObj* pStream = NULL;
×
3494

3495
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3496
  if (NULL == pStatus) {
×
3497
    SStmStatus status = {0};
×
3498
    TAOS_CHECK_EXIT(mndAcquireStreamById(pCtx->pMnode, streamId, &pStream));
×
3499
    TSDB_CHECK_NULL(pStream, code, lino, _exit, TSDB_CODE_MND_STREAM_NOT_EXIST);
×
3500
    if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
×
3501
      mndReleaseStream(pCtx->pMnode, pStream);
×
3502
      msttDebug("virtual table task ignored, status:%s", gStreamStatusStr[pTask->status]);
×
3503
      return code;
×
3504
    }
3505

3506
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &status, pStream, true));
×
3507
    mndReleaseStream(pCtx->pMnode, pStream);
×
3508

3509
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &status, sizeof(status)));
×
3510
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3511
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
×
3512
    msttDebug("stream added to streamMap cause of new task status:%s", gStreamStatusStr[pTask->status]);
×
3513
  }
3514

3515
  SStmTaskStatus* pNewTask = NULL;
×
3516
  switch (pTask->type) {
×
3517
    case STREAM_READER_TASK: {
×
3518
      SArray* pList = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaders : pStatus->calcReaders;
×
3519
      if (NULL == pList) {
×
3520
        mstsError("%sReader list is NULL", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc");
×
3521
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3522
      }
3523
      int32_t readerSize = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaderNum : pStatus->calcReaderNum;
×
3524
      if (taosArrayGetSize(pList) >= readerSize) {
×
3525
        mstsError("%sReader list is already full, size:%d, expSize:%d", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc",
×
3526
            (int32_t)taosArrayGetSize(pList), readerSize);
3527
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3528
      }
3529
      
3530
      SStmTaskStatus taskStatus = {0};
×
3531
      taskStatus.pStream = pStatus;
×
3532
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3533
      pNewTask = taosArrayPush(pList, &taskStatus);
×
3534
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3535

3536
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3537
      TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pNewTask, STREAM_IS_TRIGGER_READER(pTask->flags)));
×
3538
      break;
×
3539
    }
3540
    case STREAM_TRIGGER_TASK: {
×
3541
      taosMemoryFreeClear(pStatus->triggerTask);
×
3542
      pStatus->triggerTask = taosMemoryCalloc(1, sizeof(*pStatus->triggerTask));
×
3543
      TSDB_CHECK_NULL(pStatus->triggerTask, code, lino, _exit, terrno);
×
3544
      pStatus->triggerTask->pStream = pStatus;
×
3545
      mstSetTaskStatusFromMsg(pCtx, pStatus->triggerTask, pTask);
×
3546
      pNewTask = pStatus->triggerTask;
×
3547

3548
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3549
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, 0));
×
3550
      break;
×
3551
    }
3552
    case STREAM_RUNNER_TASK:{
×
3553
      if (NULL == pStatus->runners[pTask->deployId]) {
×
3554
        mstsError("deploy %d runner list is NULL", pTask->deployId);
×
3555
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3556
      }
3557
      if (taosArrayGetSize(pStatus->runners[pTask->deployId]) >= pStatus->runnerNum) {
×
3558
        mstsError("deploy %d runner list is already full, size:%d, expSize:%d", pTask->deployId, 
×
3559
            (int32_t)taosArrayGetSize(pStatus->runners[pTask->deployId]), pStatus->runnerNum);
3560
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3561
      }    
3562
      
3563
      SStmTaskStatus taskStatus = {0};
×
3564
      taskStatus.pStream = pStatus;
×
3565
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3566
      pNewTask = taosArrayPush(pStatus->runners[pTask->deployId], &taskStatus);
×
3567
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3568

3569
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3570
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, pTask->deployId));
×
3571
      break;
×
3572
    }
3573
    default: {
×
3574
      msttError("invalid task type:%d in task status", pTask->type);
×
3575
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3576
      break;
×
3577
    }
3578
  }
3579

3580
_exit:
×
3581

3582
  if (code) {
×
3583
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3584
  } else {
3585
    msttDebug("new task recored to taskMap/streamMap, task status:%s", gStreamStatusStr[pTask->status]);
×
3586
  }
3587

3588
  return code;
×
3589
}
3590

3591
int32_t msmWatchHandleStatusUpdate(SStmGrpCtx* pCtx) {
×
3592
  int32_t code = TSDB_CODE_SUCCESS;
×
3593
  int32_t lino = 0;
×
3594
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
×
3595

3596
  mstDebug("WATCH: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
×
3597

3598
  for (int32_t i = 0; i < num; ++i) {
×
3599
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
×
3600
    msttDebug("task status %s got, taskIdx:%d", gStreamStatusStr[pTask->status], pTask->taskIdx);
×
3601

3602
    if (pTask->taskId >= mStreamMgmt.lastTaskId) {
×
3603
      mStreamMgmt.lastTaskId = pTask->taskId + 1;
×
3604
    }
3605
    
3606
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
×
3607
    if (NULL == ppStatus) {
×
3608
      msttInfo("task still not in taskMap, will try to add it, taskIdx:%d", pTask->taskIdx);
×
3609
      
3610
      TAOS_CHECK_EXIT(msmWatchRecordNewTask(pCtx, pTask));
×
3611
      
3612
      continue;
×
3613
    }
3614
    
3615
    (*ppStatus)->status = pTask->status;
×
3616
    (*ppStatus)->lastUpTs = pCtx->currTs;
×
3617
  }
3618

3619
_exit:
×
3620

3621
  if (code) {
×
3622
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3623
  }
3624

3625
  return code;
×
3626
}
3627

3628
void msmRspAddStreamStart(int64_t streamId, SStmGrpCtx* pCtx, int32_t streamNum, SStmAction *pAction) {
356✔
3629
  int32_t code = TSDB_CODE_SUCCESS;
356✔
3630
  int32_t lino = 0;
356✔
3631
  if (NULL == pCtx->pRsp->start.taskList) {
356✔
3632
    pCtx->pRsp->start.taskList = taosArrayInit(streamNum, sizeof(SStreamTaskStart));
166✔
3633
    TSDB_CHECK_NULL(pCtx->pRsp->start.taskList, code, lino, _exit, terrno);
166!
3634
  }
3635

3636
  SStmTaskId* pId = &pAction->start.triggerId;
356✔
3637
  SStreamTaskStart start = {0};
356✔
3638
  start.task.type = STREAM_TRIGGER_TASK;
356✔
3639
  start.task.streamId = streamId;
356✔
3640
  start.task.taskId = pId->taskId;
356✔
3641
  start.task.seriousId = pId->seriousId;
356✔
3642
  start.task.nodeId = pId->nodeId;
356✔
3643
  start.task.taskIdx = pId->taskIdx;
356✔
3644

3645
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->start.taskList, &start), code, lino, _exit, terrno);
712!
3646
  TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
356!
3647

3648
  mstsDebug("stream START added to dnode %d hb rsp, triggerTaskId:%" PRIx64, pId->nodeId, pId->taskId);
356✔
3649

3650
  return;
356✔
3651

3652
_exit:
×
3653

3654
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3655
}
3656

3657

3658
void msmRspAddStreamUndeploy(int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
63✔
3659
  int32_t code = TSDB_CODE_SUCCESS;
63✔
3660
  int32_t lino = 0;
63✔
3661
  int32_t dropNum = taosArrayGetSize(pAction->undeploy.taskList);
63✔
3662
  if (NULL == pCtx->pRsp->undeploy.taskList) {
63✔
3663
    pCtx->pRsp->undeploy.taskList = taosArrayInit(dropNum, sizeof(SStreamTaskUndeploy));
50✔
3664
    TSDB_CHECK_NULL(pCtx->pRsp->undeploy.taskList, code, lino, _exit, terrno);
50!
3665
  }
3666

3667
  SStreamTaskUndeploy undeploy;
3668
  for (int32_t i = 0; i < dropNum; ++i) {
232✔
3669
    SStreamTask* pTask = (SStreamTask*)taosArrayGetP(pAction->undeploy.taskList, i);
169✔
3670
    undeploy.task = *pTask;
169✔
3671
    undeploy.undeployMsg.doCheckpoint = pAction->undeploy.doCheckpoint;
169✔
3672
    undeploy.undeployMsg.doCleanup = pAction->undeploy.doCleanup;
169✔
3673

3674
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->undeploy.taskList, &undeploy), code, lino, _exit, terrno);
338!
3675
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
169!
3676

3677
    msttDebug("task UNDEPLOY added to hb rsp, doCheckpoint:%d, doCleanup:%d", undeploy.undeployMsg.doCheckpoint, undeploy.undeployMsg.doCleanup);
169✔
3678
  }
3679

3680
  return;
63✔
3681

3682
_exit:
×
3683

3684
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3685
}
3686

3687
void msmRspAddTriggerUpdate(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3688
  int32_t code = TSDB_CODE_SUCCESS;
×
3689
  int32_t lino = 0;
×
3690

3691
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3692
  if (NULL == pStream) {
×
3693
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3694
    return;
×
3695
  }
3696

3697
  if (NULL == pStream->triggerTask) {
×
3698
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3699
    return;
×
3700
  }
3701

3702
  SStreamMgmtRsp rsp = {0};
×
3703
  rsp.reqId = INT64_MIN;
×
3704
  rsp.header.msgType = STREAM_MSG_UPDATE_RUNNER;
×
3705
  rsp.task.streamId = streamId;
×
3706
  rsp.task.taskId = pStream->triggerTask->id.taskId;
×
3707

3708
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pStream, streamId, &rsp.cont.runnerList));  
×
3709

3710
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
3711
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
3712
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
3713
  }
3714

3715
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
3716

3717
_exit:
×
3718

3719
  if (code) {
×
3720
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3721
  } else {
3722
    mstsDebug("trigger update rsp added, runnerNum:%d", (int32_t)taosArrayGetSize(rsp.cont.runnerList));
×
3723
  }
3724
}
3725

3726
void msmRspAddUserRecalc(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3727
  int32_t code = TSDB_CODE_SUCCESS;
×
3728
  int32_t lino = 0;
×
3729

3730
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3731
  if (NULL == pStream) {
×
3732
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3733
    return;
×
3734
  }
3735

3736
  if (NULL == pStream->triggerTask) {
×
3737
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3738
    return;
×
3739
  }
3740

3741
  SStreamMgmtRsp rsp = {0};
×
3742
  rsp.reqId = INT64_MIN;
×
3743
  rsp.header.msgType = STREAM_MSG_USER_RECALC;
×
3744
  rsp.task.streamId = streamId;
×
3745
  rsp.task.taskId = pStream->triggerTask->id.taskId;
×
3746
  TSWAP(rsp.cont.recalcList, pAction->recalc.recalcList);
×
3747

3748
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
3749
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
3750
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
3751
  }
3752

3753
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
3754

3755
_exit:
×
3756

3757
  if (code) {
×
3758
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3759
  } else {
3760
    mstsDebug("user recalc rsp added, recalcNum:%d", (int32_t)taosArrayGetSize(rsp.cont.recalcList));
×
3761
  }
3762
}
3763

3764

3765
int32_t msmHandleHbPostActions(SStmGrpCtx* pCtx) {
216✔
3766
  int32_t code = TSDB_CODE_SUCCESS;
216✔
3767
  int32_t lino = 0;
216✔
3768
  void* pIter = NULL;
216✔
3769
  int32_t streamNum = taosHashGetSize(pCtx->actionStm);
216✔
3770

3771
  mstDebug("start to handle stream group %d post actions", pCtx->pReq->streamGId);
216✔
3772

3773
  while (1) {
419✔
3774
    pIter = taosHashIterate(pCtx->actionStm, pIter);
635✔
3775
    if (pIter == NULL) {
635✔
3776
      break;
216✔
3777
    }
3778

3779
    int64_t* pStreamId = taosHashGetKey(pIter, NULL);
419✔
3780
    SStmAction *pAction = (SStmAction *)pIter;
419✔
3781
    
3782
    if (STREAM_ACT_UNDEPLOY & pAction->actions) {
419✔
3783
      msmRspAddStreamUndeploy(*pStreamId, pCtx, pAction);
63✔
3784
      continue;
63✔
3785
    }
3786

3787
    if (STREAM_ACT_UPDATE_TRIGGER & pAction->actions) {
356!
3788
      msmRspAddTriggerUpdate(pCtx->pMnode, *pStreamId, pCtx, pAction);
×
3789
    }
3790

3791
    if (STREAM_ACT_RECALC & pAction->actions) {
356!
3792
      msmRspAddUserRecalc(pCtx->pMnode, *pStreamId, pCtx, pAction);
×
3793
    }
3794

3795
    if (STREAM_ACT_START & pAction->actions) {
356!
3796
      msmRspAddStreamStart(*pStreamId, pCtx, streamNum, pAction);
356✔
3797
    }
3798
  }
3799
  
3800
_exit:
216✔
3801

3802
  if (pIter) {
216!
3803
    taosHashCancelIterate(pCtx->actionStm, pIter);
×
3804
  }
3805

3806
  if (code) {
216!
3807
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3808
  }
3809

3810
  return code;
216✔
3811
}
3812

3813
int32_t msmCheckUpdateDnodeTs(SStmGrpCtx* pCtx) {
32,084✔
3814
  int32_t  code = TSDB_CODE_SUCCESS;
32,084✔
3815
  int32_t  lino = 0;
32,084✔
3816
  int64_t* lastTs = NULL;
32,084✔
3817
  bool     noExists = false;
32,084✔
3818

3819
  while (true) {
3820
    lastTs = taosHashGet(mStreamMgmt.dnodeMap, &pCtx->pReq->dnodeId, sizeof(pCtx->pReq->dnodeId));
33,799✔
3821
    if (NULL == lastTs) {
33,799✔
3822
      if (noExists) {
2,014✔
3823
        mstWarn("Got unknown dnode %d hb msg, may be dropped", pCtx->pReq->dnodeId);
299!
3824
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
299!
3825
      }
3826

3827
      noExists = true;
1,715✔
3828
      TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pCtx->pMnode));
1,715!
3829
      
3830
      continue;
1,715✔
3831
    }
3832

3833
    while (true) {
×
3834
      int64_t lastTsValue = atomic_load_64(lastTs);
31,785✔
3835
      if (pCtx->currTs > lastTsValue) {
31,785✔
3836
        if (lastTsValue == atomic_val_compare_exchange_64(lastTs, lastTsValue, pCtx->currTs)) {
31,784!
3837
          mstDebug("dnode %d lastUpTs updated", pCtx->pReq->dnodeId);
31,784✔
3838
          return code;
31,784✔
3839
        }
3840

3841
        continue;
×
3842
      }
3843

3844
      return code;
1✔
3845
    }
3846

3847
    break;
3848
  }
3849

3850
_exit:
299✔
3851

3852
  if (code) {
299!
3853
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
299!
3854
  }
3855

3856
  return code;  
299✔
3857
}
3858

3859
void msmWatchCheckStreamMap(SStmGrpCtx* pCtx) {
×
3860
  SStmStatus* pStatus = NULL;
×
3861
  int32_t trigReaderNum = 0;
×
3862
  int32_t calcReaderNum = 0;
×
3863
  int32_t runnerNum = 0;
×
3864
  int64_t streamId = 0;
×
3865
  void* pIter = NULL;
×
3866
  while (true) {
3867
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
3868
    if (NULL == pIter) {
×
3869
      return;
×
3870
    }
3871

3872
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
3873
    pStatus = (SStmStatus*)pIter;
×
3874

3875
    if (NULL == pStatus->triggerTask) {
×
3876
      mstsWarn("no trigger task recored, deployTimes:%" PRId64, pStatus->deployTimes);
×
3877
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3878
      continue;
×
3879
    }
3880
    
3881
    trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
×
3882
    if (pStatus->trigReaderNum != trigReaderNum) {
×
3883
      mstsWarn("trigReaderNum %d mis-match with expected %d", trigReaderNum, pStatus->trigReaderNum);
×
3884
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3885
      continue;
×
3886
    }
3887

3888
    calcReaderNum = taosArrayGetSize(pStatus->calcReaders);
×
3889
    if (pStatus->calcReaderNum != calcReaderNum) {
×
3890
      mstsWarn("calcReaderNum %d mis-match with expected %d", calcReaderNum, pStatus->calcReaderNum);
×
3891
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3892
      continue;
×
3893
    }
3894

3895
    for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
3896
      runnerNum = taosArrayGetSize(pStatus->runners[i]);
×
3897
      if (runnerNum != pStatus->runnerNum) {
×
3898
        mstsWarn("runner deploy %d runnerNum %d mis-match with expected %d", i, runnerNum, pStatus->runnerNum);
×
3899
        msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3900
        continue;
×
3901
      }
3902
    }
3903
  }
3904
}
3905

3906
int32_t msmWatchHandleEnding(SStmGrpCtx* pCtx, bool watchError) {
×
3907
  int32_t code = TSDB_CODE_SUCCESS;
×
3908
  int32_t lino = 0;
×
3909
  int32_t minVal = watchError ? 0 : 1;
×
3910

3911
  if (0 != atomic_val_compare_exchange_8(&mStreamMgmt.watch.ending, 0, 1)) {
×
3912
    return code;
×
3913
  }
3914

3915
  while (atomic_load_32(&mStreamMgmt.watch.processing) > minVal) {
×
3916
    (void)sched_yield();
×
3917
  }
3918

3919
  if (watchError) {
×
3920
    taosHashClear(mStreamMgmt.vgroupMap);
×
3921
    taosHashClear(mStreamMgmt.snodeMap);
×
3922
    taosHashClear(mStreamMgmt.taskMap);
×
3923
    taosHashClear(mStreamMgmt.streamMap);
×
3924
    mstInfo("watch error happends, clear all maps");
×
3925
    goto _exit;
×
3926
  }
3927

3928
  if (0 == atomic_load_8(&mStreamMgmt.watch.taskRemains)) {
×
3929
    mstInfo("no stream tasks remain during watch state");
×
3930
    goto _exit;
×
3931
  }
3932

3933
  msmWatchCheckStreamMap(pCtx);
×
3934

3935
_exit:
×
3936

3937
  mStreamMgmt.lastTaskId += 100000;
×
3938

3939
  mstInfo("watch state end, new taskId begin from:%" PRIx64, mStreamMgmt.lastTaskId);
×
3940

3941
  msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
×
3942

3943
  if (code) {
×
3944
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3945
  }
3946

3947
  return code;
×
3948
}
3949

3950

3951
int32_t msmWatchHandleHbMsg(SStmGrpCtx* pCtx) {
×
3952
  int32_t code = TSDB_CODE_SUCCESS;
×
3953
  int32_t lino = 0;
×
3954
  SStreamHbMsg* pReq = pCtx->pReq;
×
3955

3956
  (void)atomic_add_fetch_32(&mStreamMgmt.watch.processing, 1);
×
3957
  
3958
  if (atomic_load_8(&mStreamMgmt.watch.ending)) {
×
3959
    goto _exit;
×
3960
  }
3961

3962
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
×
3963
  if (GOT_SNODE(pReq->snodeId)) {
×
3964
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
×
3965
  }
3966

3967
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
×
3968
    atomic_store_8(&mStreamMgmt.watch.taskRemains, 1);
×
3969
    TAOS_CHECK_EXIT(msmWatchHandleStatusUpdate(pCtx));
×
3970
  }
3971

3972
  if ((pCtx->currTs - MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN)) > MST_ISOLATION_DURATION) {
×
3973
    TAOS_CHECK_EXIT(msmWatchHandleEnding(pCtx, false));
×
3974
  }
3975

3976
_exit:
×
3977

3978
  atomic_sub_fetch_32(&mStreamMgmt.watch.processing, 1);
×
3979
  
3980
  if (code) {
×
3981
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3982

3983
    (void)msmWatchHandleEnding(pCtx, true);
×
3984
  }
3985

3986
  return code;
×
3987
}
3988

3989
int32_t msmCheckDeployTrigReader(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStmTaskStatusMsg* pTask, int32_t vgId, int32_t vgNum) {
69✔
3990
  int32_t code = TSDB_CODE_SUCCESS;
69✔
3991
  int32_t lino = 0;
69✔
3992
  bool    readerExists = false;
69✔
3993
  int64_t streamId = pTask->streamId;
69✔
3994

3995
  int32_t readerNum = taosArrayGetSize(pStatus->trigReaders);
69✔
3996
  for (int32_t i = 0; i < readerNum; ++i) {
97✔
3997
    SStmTaskStatus* pReader = (SStmTaskStatus*)taosArrayGet(pStatus->trigReaders, i);
73✔
3998
    if (pReader->id.nodeId == vgId) {
73✔
3999
      readerExists = true;
45✔
4000
      break;
45✔
4001
    }
4002
  }
4003

4004
  if (!readerExists) {
69✔
4005
    if (NULL == pStatus->trigOReaders) {
24✔
4006
      pStatus->trigOReaders = taosArrayInit(vgNum, sizeof(SStmTaskStatus));
22✔
4007
      TSDB_CHECK_NULL(pStatus->trigOReaders, code, lino, _exit, terrno);
22!
4008
    }
4009
    
4010
    SStmTaskStatus* pState = taosArrayReserve(pStatus->trigOReaders, 1);
24✔
4011
    TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, vgId, pStatus, NULL, streamId));
24!
4012
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pState));
24!
4013
    TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, NULL, pState, true));
24!
4014
  }
4015

4016
_exit:
69✔
4017

4018
  if (code) {
69!
4019
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4020
  }
4021

4022
  return code;
69✔
4023
}
4024

4025
int32_t msmProcessDeployOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
67✔
4026
  int32_t code = TSDB_CODE_SUCCESS;
67✔
4027
  int32_t lino = 0;
67✔
4028
  int32_t vgId = 0;
67✔
4029
  int64_t streamId = pTask->streamId;
67✔
4030
  SArray* pTbs = pTask->pMgmtReq->cont.fullTableNames;
67✔
4031
  int32_t tbNum = taosArrayGetSize(pTbs);
67✔
4032
  SStreamDbTableName* pName = NULL;
67✔
4033
  SSHashObj* pDbVgroups = NULL;
67✔
4034
  SStreamMgmtRsp rsp = {0};
67✔
4035
  rsp.reqId = pTask->pMgmtReq->reqId;
67✔
4036
  rsp.header.msgType = STREAM_MSG_ORIGTBL_READER_INFO;
67✔
4037
  int32_t iter = 0;
67✔
4038
  void* p = NULL;
67✔
4039
  SSHashObj* pVgs = NULL;
67✔
4040
  SStreamMgmtReq* pMgmtReq = NULL;
67✔
4041
  int8_t stopped = 0;
67✔
4042
  
4043
  TSWAP(pTask->pMgmtReq, pMgmtReq);
67✔
4044
  rsp.task = *(SStreamTask*)pTask;
67✔
4045

4046
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
67✔
4047
  if (NULL == pStatus) {
67!
4048
    mstsError("stream not deployed, remainStreams:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4049
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_RUNNING);
×
4050
  }
4051

4052
  stopped = atomic_load_8(&pStatus->stopped);
67✔
4053
  if (stopped) {
67!
4054
    msttInfo("stream stopped %d, ignore deploy trigger reader, vgId:%d", stopped, vgId);
×
4055
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_STOPPED);
×
4056
  }
4057

4058
  if (tbNum <= 0) {
67!
4059
    mstsWarn("empty table list in origReader req, array:%p", pTbs);
×
4060
    goto _exit;
×
4061
  }
4062

4063
  int32_t oReaderNum = taosArrayGetSize(pStatus->trigOReaders);
67✔
4064
  if (oReaderNum > 0) {
67!
4065
    mstsWarn("origReaders already exits, num:%d", oReaderNum);
×
4066
    goto _exit;
×
4067
  }
4068

4069
  TAOS_CHECK_EXIT(mstBuildDBVgroupsMap(pCtx->pMnode, &pDbVgroups));
67!
4070
  rsp.cont.vgIds = taosArrayInit(tbNum, sizeof(int32_t));
67✔
4071
  TSDB_CHECK_NULL(rsp.cont.vgIds, code, lino, _exit, terrno);
67!
4072

4073
  pVgs = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
67✔
4074
  TSDB_CHECK_NULL(pVgs, code, lino, _exit, terrno);
67!
4075
  
4076
  for (int32_t i = 0; i < tbNum; ++i) {
207✔
4077
    pName = (SStreamDbTableName*)taosArrayGet(pTbs, i);
140✔
4078
    TAOS_CHECK_EXIT(mstGetTableVgId(pDbVgroups, pName->dbFName, pName->tbName, &vgId));
140!
4079
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.vgIds, &vgId), code, lino, _exit, terrno);
280!
4080
    TAOS_CHECK_EXIT(tSimpleHashPut(pVgs, &vgId, sizeof(vgId), &vgId, sizeof(vgId)));
140!
4081
  }
4082

4083
  int32_t vgNum = tSimpleHashGetSize(pVgs);
67✔
4084
  while (true) {
4085
    p = tSimpleHashIterate(pVgs, p, &iter);
136✔
4086
    if (NULL == p) {
136✔
4087
      break;
67✔
4088
    }
4089
    
4090
    TAOS_CHECK_EXIT(msmCheckDeployTrigReader(pCtx, pStatus, pTask, *(int32_t*)p, vgNum));
69!
4091
  }
4092
  
4093
  vgNum = taosArrayGetSize(pStatus->trigOReaders);
67✔
4094
  rsp.cont.readerList = taosArrayInit(vgNum, sizeof(SStreamTaskAddr));
67✔
4095
  TSDB_CHECK_NULL(rsp.cont.readerList, code, lino, _exit, terrno);
67!
4096

4097
  SStreamTaskAddr addr;
4098
  for (int32_t i = 0; i < vgNum; ++i) {
91✔
4099
    SStmTaskStatus* pOTask = taosArrayGet(pStatus->trigOReaders, i);
24✔
4100
    addr.taskId = pOTask->id.taskId;
24✔
4101
    addr.nodeId = pOTask->id.nodeId;
24✔
4102
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, pOTask->id.nodeId);
24✔
4103
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.readerList, &addr), code, lino, _exit, terrno);
48!
4104
    mstsDebug("the %dth otrigReader src added to trigger's virtual orig readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
24!
4105
  }
4106

4107
  if (NULL == pCtx->pRsp->rsps.rspList) {
67!
4108
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
4109
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
4110
  }
4111

4112
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
134!
4113

4114
_exit:
67✔
4115

4116
  tFreeSStreamMgmtReq(pMgmtReq);
67✔
4117
  taosMemoryFree(pMgmtReq);
67!
4118

4119
  tSimpleHashCleanup(pVgs);
67✔
4120

4121
  if (code) {
67!
4122
    mndStreamDestroySStreamMgmtRsp(&rsp);
×
4123
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4124
  }
4125

4126
  mstDestroyDbVgroupsHash(pDbVgroups);
67✔
4127

4128
  return code;
67✔
4129
}
4130

4131
int32_t msmHandleTaskMgmtReq(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
67✔
4132
  int32_t code = TSDB_CODE_SUCCESS;
67✔
4133
  int32_t lino = 0;
67✔
4134

4135
  switch (pTask->pMgmtReq->type) {
67!
4136
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
67✔
4137
      TAOS_CHECK_EXIT(msmProcessDeployOrigReader(pCtx, pTask));
67!
4138
      break;
67✔
4139
    default:
×
4140
      msttError("Invalid mgmtReq type:%d", pTask->pMgmtReq->type);
×
4141
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4142
      break;
×
4143
  }
4144

4145
_exit:
67✔
4146

4147
  if (code) {
67!
4148
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4149
  }
4150

4151
  return code;
67✔
4152
}
4153

4154
int32_t msmHandleStreamRequests(SStmGrpCtx* pCtx) {
25✔
4155
  int32_t code = TSDB_CODE_SUCCESS;
25✔
4156
  int32_t lino = 0;
25✔
4157
  SStreamHbMsg* pReq = pCtx->pReq;
25✔
4158
  SStmTaskStatusMsg* pTask = NULL;
25✔
4159
  
4160
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
25✔
4161
  if (reqNum > 0 && NULL == pCtx->pRsp->rsps.rspList) {
25!
4162
    pCtx->pRsp->rsps.rspList = taosArrayInit(reqNum, sizeof(SStreamMgmtRsp));
25✔
4163
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
25!
4164
  }
4165
  
4166
  for (int32_t i = 0; i < reqNum; ++i) {
92✔
4167
    int32_t idx = *(int32_t*)taosArrayGet(pReq->pStreamReq, i);
67✔
4168
    pTask = (SStmTaskStatusMsg*)taosArrayGet(pReq->pStreamStatus, idx);
67✔
4169
    if (NULL == pTask) {
67!
4170
      mstError("idx %d is NULL, reqNum:%d", idx, reqNum);
×
4171
      continue;
×
4172
    }
4173

4174
    if (NULL == pTask->pMgmtReq) {
67!
4175
      msttError("idx %d without mgmtReq", idx);
×
4176
      continue;
×
4177
    }
4178

4179
    TAOS_CHECK_EXIT(msmHandleTaskMgmtReq(pCtx, pTask));
67!
4180
  }
4181

4182
_exit:
25✔
4183

4184
  if (code) {
25!
4185
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4186
  }
4187

4188
  return code;
25✔
4189
}
4190

4191
int32_t msmNormalHandleHbMsg(SStmGrpCtx* pCtx) {
32,084✔
4192
  int32_t code = TSDB_CODE_SUCCESS;
32,084✔
4193
  int32_t lino = 0;
32,084✔
4194
  SStreamHbMsg* pReq = pCtx->pReq;
32,084✔
4195

4196
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
32,084✔
4197
  if (GOT_SNODE(pReq->snodeId)) {
31,785✔
4198
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
1,833!
4199
  }
4200
  
4201
  if (atomic_load_64(&mStreamMgmt.actionQ->qRemainNum) > 0 && 0 == taosWTryLockLatch(&mStreamMgmt.actionQLock)) {
31,785!
4202
    msmHandleStreamActions(pCtx);
86✔
4203
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
86✔
4204
  }
4205

4206
  if (taosArrayGetSize(pReq->pStreamReq) > 0 && mstWaitLock(&mStreamMgmt.actionQLock, false)) {
31,785!
4207
    code = msmHandleStreamRequests(pCtx);
25✔
4208
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
25✔
4209
    TAOS_CHECK_EXIT(code);
25!
4210
  }
4211

4212
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
31,785✔
4213
    TAOS_CHECK_EXIT(msmGrpAddDeployVgTasks(pCtx));
158!
4214
  } else {
4215
    TAOS_CHECK_EXIT(msmUpdateVgroupsUpTs(pCtx));
31,627!
4216
  }
4217

4218
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0 && GOT_SNODE(pReq->snodeId)) {
31,785✔
4219
    TAOS_CHECK_EXIT(msmGrpAddDeploySnodeTasks(pCtx));
145!
4220
  }
4221

4222
  if (taosHashGetSize(pCtx->deployStm) > 0) {
31,785✔
4223
    TAOS_CHECK_EXIT(msmRspAddStreamsDeploy(pCtx));
108!
4224
  }
4225

4226
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
31,785✔
4227
    TAOS_CHECK_EXIT(msmNormalHandleStatusUpdate(pCtx));
1,638!
4228
  }
4229

4230
  if (taosHashGetSize(pCtx->actionStm) > 0) {
31,785✔
4231
    TAOS_CHECK_EXIT(msmHandleHbPostActions(pCtx));
216!
4232
  }
4233

4234
_exit:
31,785✔
4235

4236
  if (code) {
32,084✔
4237
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
299!
4238
  }
4239

4240
  return code;
32,084✔
4241
}
4242

4243
void msmEncodeStreamHbRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SMStreamHbRspMsg* pRsp, SRpcMsg* pMsg) {
32,764✔
4244
  int32_t lino = 0;
32,764✔
4245
  int32_t tlen = 0;
32,764✔
4246
  void   *buf = NULL;
32,764✔
4247

4248
  if (TSDB_CODE_SUCCESS != code) {
32,764✔
4249
    goto _exit;
299✔
4250
  }
4251

4252
  tEncodeSize(tEncodeStreamHbRsp, pRsp, tlen, code);
32,465!
4253
  if (code < 0) {
32,465!
4254
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4255
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4256
  }
4257

4258
  buf = rpcMallocCont(tlen + sizeof(SStreamMsgGrpHeader));
32,465✔
4259
  if (buf == NULL) {
32,465!
4260
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
×
4261
    TAOS_CHECK_EXIT(terrno);    
×
4262
  }
4263

4264
  ((SStreamMsgGrpHeader *)buf)->streamGid = pRsp->streamGId;
32,465✔
4265
  void *abuf = POINTER_SHIFT(buf, sizeof(SStreamMsgGrpHeader));
32,465✔
4266

4267
  SEncoder encoder;
4268
  tEncoderInit(&encoder, abuf, tlen);
32,465✔
4269
  if ((code = tEncodeStreamHbRsp(&encoder, pRsp)) < 0) {
32,465!
4270
    rpcFreeCont(buf);
×
4271
    buf = NULL;
×
4272
    tEncoderClear(&encoder);
×
4273
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4274
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4275
  }
4276
  tEncoderClear(&encoder);
32,465✔
4277

4278
_exit:
32,764✔
4279

4280
  pMsg->code = code;
32,764✔
4281
  pMsg->info = *pRpcInfo;
32,764✔
4282
  if (TSDB_CODE_SUCCESS == code) {
32,764✔
4283
    pMsg->contLen = tlen + sizeof(SStreamMsgGrpHeader);
32,465✔
4284
    pMsg->pCont = buf;
32,465✔
4285
  }
4286
}
32,764✔
4287

4288

4289
int32_t msmHandleStreamHbMsg(SMnode* pMnode, int64_t currTs, SStreamHbMsg* pHb, SRpcMsg *pReq, SRpcMsg* pRspMsg) {
32,764✔
4290
  int32_t code = TSDB_CODE_SUCCESS;
32,764✔
4291
  SMStreamHbRspMsg rsp = {0};
32,764✔
4292
  rsp.streamGId = pHb->streamGId;
32,764✔
4293

4294
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
32,764✔
4295

4296
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
32,764✔
4297
    mstWarn("mnode stream become NOT active, ignore stream hb from dnode %d streamGid %d", pHb->dnodeId, pHb->streamGId);
680!
4298
    goto _exit;
680✔
4299
  }
4300

4301
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
32,084✔
4302
  SStmGrpCtx* pCtx = &mStreamMgmt.tCtx[tidx].grpCtx[pHb->streamGId];
32,084✔
4303

4304
  pCtx->tidx = tidx;
32,084✔
4305
  pCtx->pMnode = pMnode;
32,084✔
4306
  pCtx->currTs = currTs;
32,084✔
4307
  pCtx->pReq = pHb;
32,084✔
4308
  pCtx->pRsp = &rsp;
32,084✔
4309
  pCtx->deployStm = mStreamMgmt.tCtx[pCtx->tidx].deployStm[pHb->streamGId];
32,084✔
4310
  pCtx->actionStm = mStreamMgmt.tCtx[pCtx->tidx].actionStm[pHb->streamGId];
32,084✔
4311
  
4312
  switch (atomic_load_8(&mStreamMgmt.state)) {
32,084!
4313
    case MND_STM_STATE_WATCH:
×
4314
      code = msmWatchHandleHbMsg(pCtx);
×
4315
      break;
×
4316
    case MND_STM_STATE_NORMAL:
32,084✔
4317
      code = msmNormalHandleHbMsg(pCtx);
32,084✔
4318
      break;
32,084✔
4319
    default:
×
4320
      mstError("Invalid stream state: %d", mStreamMgmt.state);
×
4321
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4322
      break;
×
4323
  }
4324

4325
_exit:
32,764✔
4326

4327
  msmEncodeStreamHbRsp(code, &pReq->info, &rsp, pRspMsg);
32,764✔
4328

4329
  msmCleanStreamGrpCtx(pHb);
32,764✔
4330
  msmClearStreamToDeployMaps(pHb);
32,764✔
4331

4332
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
32,764✔
4333
  
4334
  tFreeSMStreamHbRspMsg(&rsp);
32,764✔
4335

4336
  return code;
32,764✔
4337
}
4338

4339
void msmHandleBecomeLeader(SMnode *pMnode) {
1,711✔
4340
  if (tsDisableStream) {
1,711!
4341
    return;
×
4342
  }
4343

4344
  mstInfo("start to process mnode become leader");
1,711!
4345

4346
  int32_t code = 0;
1,711✔
4347
  streamAddVnodeLeader(MNODE_HANDLE);
1,711✔
4348
  
4349
  taosWLockLatch(&mStreamMgmt.runtimeLock);
1,711✔
4350
  msmDestroyRuntimeInfo(pMnode);
1,711✔
4351
  code = msmInitRuntimeInfo(pMnode);
1,711✔
4352
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
1,711✔
4353

4354
  if (TSDB_CODE_SUCCESS == code) {
1,711!
4355
    atomic_store_8(&mStreamMgmt.active, 1);
1,711✔
4356
  }
4357

4358
  mstInfo("mnode stream mgmt active:%d", atomic_load_8(&mStreamMgmt.active));
1,711!
4359
}
4360

4361
void msmHandleBecomeNotLeader(SMnode *pMnode) {  
2,339✔
4362
  if (tsDisableStream) {
2,339!
4363
    return;
×
4364
  }
4365

4366
  mstInfo("start to process mnode become not leader");
2,339!
4367

4368
  streamRemoveVnodeLeader(MNODE_HANDLE);
2,339✔
4369

4370
  if (atomic_val_compare_exchange_8(&mStreamMgmt.active, 1, 0)) {
2,339✔
4371
    taosWLockLatch(&mStreamMgmt.runtimeLock);
1,711✔
4372
    msmDestroyRuntimeInfo(pMnode);
1,711✔
4373
    mStreamMgmt.stat.inactiveTimes++;
1,711✔
4374
    taosWUnLockLatch(&mStreamMgmt.runtimeLock);
1,711✔
4375
  }
4376
}
4377

4378

4379
static void msmRedeployStream(int64_t streamId, SStmStatus* pStatus) {
×
4380
  if (1 == atomic_val_compare_exchange_8(&pStatus->stopped, 1, 0)) {
×
4381
    mstsInfo("try to reset and redeploy stream, deployTimes:%" PRId64, pStatus->deployTimes);
×
4382
    mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4383
  } else {
4384
    mstsWarn("stream stopped %d already changed", atomic_load_8(&pStatus->stopped));
×
4385
  }
4386
}
×
4387

4388
static bool msmCheckStreamAssign(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
19✔
4389
  int32_t code = TSDB_CODE_SUCCESS;
19✔
4390
  int32_t lino = 0;
19✔
4391
  SStreamObj* pStream = pObj;
19✔
4392
  SSnodeObj* pSnode = p1;
19✔
4393
  SArray** ppRes = p2;
19✔
4394

4395
  if (pStream->mainSnodeId == pSnode->id) {
19✔
4396
    if (NULL == *ppRes) {
4✔
4397
      int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
3✔
4398
      *ppRes = taosArrayInit(streamNum, POINTER_BYTES);
3✔
4399
      TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
3!
4400
    }
4401

4402
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &pStream), code, lino, _exit, terrno);
8!
4403
  }
4404

4405
  return true;
19✔
4406

4407
_exit:
×
4408

4409
  if (code) {
×
4410
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4411
  }  
4412

4413
  *(int32_t*)p3 = code;
×
4414

4415
  return false;
×
4416
}
4417

4418

4419
int32_t msmCheckSnodeReassign(SMnode *pMnode, SSnodeObj* pSnode, SArray** ppRes) {
147✔
4420
  int32_t code = TSDB_CODE_SUCCESS;
147✔
4421
  int32_t lino = 0;
147✔
4422
  
4423
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckStreamAssign, pSnode, ppRes, &code);
147✔
4424
  TAOS_CHECK_EXIT(code);
147!
4425

4426
  int32_t streamNum = taosArrayGetSize(*ppRes);
147✔
4427
  if (streamNum > 0 && 0 == pSnode->replicaId) {
147!
4428
    mstError("snode %d has no replica while %d streams assigned", pSnode->id, streamNum);
×
4429
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_SNODE_IN_USE);
×
4430
  }
4431

4432
  //STREAMTODO CHECK REPLICA UPDATED OR NOT
4433

4434
_exit:
147✔
4435

4436
  if (code) {
147!
4437
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4438
  }  
4439

4440
  return code;
147✔
4441
}
4442

4443
static bool msmCheckLoopStreamSdb(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
870✔
4444
  SStreamObj* pStream = pObj;
870✔
4445
  int64_t streamId = pStream->pCreate->streamId;
870✔
4446
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
870✔
4447
  SStmCheckStatusCtx* pCtx = (SStmCheckStatusCtx*)p1;
870✔
4448
  int8_t userDropped = atomic_load_8(&pStream->userDropped), userStopped = atomic_load_8(&pStream->userStopped);
870✔
4449
  
4450
  if ((userDropped || userStopped) && (NULL == pStatus)) {
870!
4451
    mstsDebug("stream userDropped %d userStopped %d and not in streamMap, ignore it", userDropped, userStopped);
×
4452
    return true;
×
4453
  }
4454
  
4455
  if (pStatus && !MST_STM_PASS_ISOLATION(pStream, pStatus)) {
870✔
4456
    mstsDebug("stream not pass isolation time, updateTime:%" PRId64 ", lastActionTs:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
753✔
4457
        pStream->updateTime, pStatus->lastActionTs, mStreamMgmt.hCtx.currentTs);
4458
    return true;
753✔
4459
  }
4460

4461
  if (NULL == pStatus && !MST_STM_STATIC_PASS_ISOLATION(pStream)) {
117!
4462
    mstsDebug("stream not pass static isolation time, updateTime:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
17!
4463
        pStream->updateTime, mStreamMgmt.hCtx.currentTs);
4464
    return true;
17✔
4465
  }  
4466

4467
  if (pStatus) {
100!
4468
    if (userDropped || userStopped || MST_IS_USER_STOPPED(atomic_load_8(&pStatus->stopped))) {
100!
4469
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
×
4470
    }
4471

4472
    return true;
100✔
4473
  }
4474

4475
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, false, STREAM_ACT_DEPLOY);
×
4476

4477
  return true;
×
4478
}
4479

4480
void msmCheckLoopStreamMap(SMnode *pMnode) {
104✔
4481
  SStmStatus* pStatus = NULL;
104✔
4482
  void* pIter = NULL;
104✔
4483
  int8_t stopped = 0;
104✔
4484
  int64_t streamId = 0;
104✔
4485
  
4486
  while (true) {
4487
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
662✔
4488
    if (NULL == pIter) {
662✔
4489
      break;
104✔
4490
    }
4491

4492
    pStatus = (SStmStatus*)pIter;
558✔
4493

4494
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
558✔
4495
    stopped = atomic_load_8(&pStatus->stopped);
558✔
4496
    if (MST_IS_USER_STOPPED(stopped)) {
558!
4497
      mstsDebug("stream already stopped by user, deployTimes:%" PRId64, pStatus->deployTimes);
7✔
4498
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
7✔
4499
      continue;
7✔
4500
    }
4501

4502
    if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, pStatus->streamName)) {
551!
4503
      mstsDebug("stream already not exists, deployTimes:%" PRId64, pStatus->deployTimes);
×
4504
      (void)msmRemoveStreamFromMaps(pMnode, *(int64_t*)taosHashGetKey(pIter, NULL));
×
4505
      continue;
×
4506
    }
4507

4508
    if (MST_IS_ERROR_STOPPED(stopped)) {
551✔
4509
      if (mStreamMgmt.hCtx.currentTs < pStatus->fatalRetryTs) {
9!
4510
        mstsDebug("stream already stopped by error %s, retried times:%" PRId64 ", next time not reached, currTs:%" PRId64 ", nextRetryTs:%" PRId64,
×
4511
            tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes, mStreamMgmt.hCtx.currentTs, pStatus->fatalRetryTs);
4512
            
4513
        MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, mStreamMgmt.hCtx.currentTs);
×
4514
        continue;
×
4515
      }
4516

4517
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
9✔
4518
      continue;
9✔
4519
    }
4520

4521
    if (MST_IS_GRANT_STOPPED(stopped) && TSDB_CODE_SUCCESS == grantCheckExpire(TSDB_GRANT_STREAMS)) {
542!
4522
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4523
      continue;
×
4524
    }
4525
  }
4526
}
104✔
4527

4528
void msmCheckStreamsStatus(SMnode *pMnode) {
499✔
4529
  SStmCheckStatusCtx ctx = {0};
499✔
4530

4531
  mstDebug("start to check streams status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
499✔
4532
  
4533
  if (MST_READY_FOR_SDB_LOOP()) {
499!
4534
    mstDebug("ready to check sdb loop, lastLoopSdbTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SDB].ts);
151✔
4535
    sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckLoopStreamSdb, &ctx, NULL, NULL);
151✔
4536
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_SDB, mStreamMgmt.hCtx.currentTs);
151✔
4537
  }
4538

4539
  if (MST_READY_FOR_MAP_LOOP()) {
499!
4540
    mstDebug("ready to check map loop, lastLoopMapTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_MAP].ts);
104✔
4541
    msmCheckLoopStreamMap(pMnode);
104✔
4542
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
104✔
4543
  }
4544
}
499✔
4545

4546
void msmCheckTaskListStatus(int64_t streamId, SStmTaskStatus** pList, int32_t taskNum) {
7,017✔
4547
  for (int32_t i = 0; i < taskNum; ++i) {
14,038✔
4548
    SStmTaskStatus* pTask = *(pList + i);
7,035✔
4549

4550
    if (atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped)) {
7,035✔
4551
      continue;
6,983✔
4552
    }
4553
    
4554
    if (!MST_PASS_ISOLATION(pTask->lastUpTs, 1)) {
6,868✔
4555
      continue;
6,816✔
4556
    }
4557

4558
    int64_t noUpTs = mStreamMgmt.hCtx.currentTs - pTask->lastUpTs;
52✔
4559
    if (STREAM_RUNNER_TASK == pTask->type || STREAM_TRIGGER_TASK == pTask->type) {
52✔
4560
      mstsWarn("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
14!
4561
          gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4562
          
4563
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_TASK_LOST, mStreamMgmt.hCtx.currentTs);
14✔
4564
      break;
14✔
4565
    }
4566

4567
    mstsInfo("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
38!
4568
        gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4569

4570
    int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
38✔
4571
    mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
38!
4572

4573
    SStmTaskAction task = {0};
38✔
4574
    task.streamId = streamId;
38✔
4575
    task.id = pTask->id;
38✔
4576
    task.flag = pTask->flags;
38✔
4577
    task.type = pTask->type;
38✔
4578
    
4579
    mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
38✔
4580
  }
4581
}
7,017✔
4582

4583
void msmCheckVgroupStreamStatus(SHashObj* pStreams) {
623✔
4584
  void* pIter = NULL;
623✔
4585
  SStmVgStreamStatus* pVg = NULL;
623✔
4586
  int64_t streamId = 0;
623✔
4587
  
4588
  while (true) {
1,794✔
4589
    pIter = taosHashIterate(pStreams, pIter);
2,417✔
4590
    if (NULL == pIter) {
2,417✔
4591
      break;
623✔
4592
    }
4593

4594
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
1,794✔
4595
    pVg = (SStmVgStreamStatus*)pIter;
1,794✔
4596

4597
    int32_t taskNum = taosArrayGetSize(pVg->trigReaders);
1,794✔
4598
    if (taskNum > 0) {
1,794✔
4599
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->trigReaders, 0), taskNum);
1,455✔
4600
    }
4601

4602
    taskNum = taosArrayGetSize(pVg->calcReaders);
1,794✔
4603
    if (taskNum > 0) {
1,794✔
4604
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->calcReaders, 0), taskNum);
594✔
4605
    }
4606
  }
4607
}
623✔
4608

4609
void msmHandleVgroupLost(SMnode *pMnode, int32_t vgId, SStmVgroupStatus* pVg) {
2✔
4610
  int64_t streamId = 0;
2✔
4611
  void* pIter = NULL;
2✔
4612
  SStmVgStreamStatus* pStream = NULL;
2✔
4613

4614
  if (!MST_PASS_ISOLATION(pVg->lastUpTs, 5)) {
2!
4615
    mstDebug("vgroup %d lost and still in watch time, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
2✔
4616
    return;
2✔
4617
  }
4618

4619
  
4620
  while (true) {
4621
    pIter = taosHashIterate(pVg->streamTasks, pIter);
×
4622
    if (NULL == pIter) {
×
4623
      break;
×
4624
    }
4625

4626
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4627
    
4628
    msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_VGROUP_LOST, mStreamMgmt.hCtx.currentTs);
×
4629
  }
4630

4631
  taosHashClear(pVg->streamTasks);
×
4632
}
4633

4634

4635
void msmCheckVgroupStatus(SMnode *pMnode) {
499✔
4636
  void* pIter = NULL;
499✔
4637
  
4638
  while (true) {
1,882✔
4639
    pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter);
2,381✔
4640
    if (NULL == pIter) {
2,381✔
4641
      break;
499✔
4642
    }
4643

4644
    int32_t vgId = *(int32_t*)taosHashGetKey(pIter, NULL);
1,882✔
4645
    if ((vgId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
1,882✔
4646
      continue;
1,257✔
4647
    }
4648
    
4649
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
625✔
4650

4651
    if (MST_PASS_ISOLATION(pVg->lastUpTs, 1)) {
625✔
4652
      mstWarn("vgroup %d lost, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
2!
4653
      
4654
      msmHandleVgroupLost(pMnode, vgId, pVg);
2✔
4655
      continue;
2✔
4656
    }
4657

4658
    mstDebug("vgroup %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, vgId, mStreamMgmt.hCtx.currentTs, pVg->lastUpTs);
623✔
4659

4660
    msmCheckVgroupStreamStatus(pVg->streamTasks);
623✔
4661
  }
4662
}
499✔
4663

4664
void msmHandleRunnerRedeploy(int64_t streamId, SStmSnodeStreamStatus* pStream, int32_t* deployNum, int32_t* deployId) {
8✔
4665
  *deployNum = 0;
8✔
4666
  
4667
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
30✔
4668
    if (pStream->runners[i]) {
23✔
4669
      int32_t taskNum = taosArrayGetSize(pStream->runners[i]);
8✔
4670
      for (int32_t t = 0; t < taskNum; ++t) {
15✔
4671
        SStmTaskStatus* pTask = taosArrayGetP(pStream->runners[i], t);
8✔
4672
        int8_t stopped = atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped);
8✔
4673
        if (stopped) {
8✔
4674
          mstsDebug("stream already stopped %d, ignore it", stopped);
1!
4675
          *deployNum = 0;
1✔
4676
          return;
1✔
4677
        }
4678

4679
        int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
7✔
4680
        mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
7!
4681
      }
4682
      
4683
      deployId[*deployNum] = i;
7✔
4684
      (*deployNum)++;
7✔
4685
    }
4686
  }
4687
}
4688

4689
void msmHandleSnodeLost(SMnode *pMnode, SStmSnodeStatus* pSnode) {
36✔
4690
  pSnode->runnerThreadNum = -1;
36✔
4691

4692
  (void)msmSTAddSnodesToMap(pMnode);
36✔
4693

4694
  int64_t streamId = 0;
36✔
4695
  void* pIter = NULL;
36✔
4696
  SStmSnodeStreamStatus* pStream = NULL;
36✔
4697
  int32_t deployNum = 0;
36✔
4698
  SStmTaskAction task = {0};
36✔
4699
  
4700
  while (true) {
4701
    pIter = taosHashIterate(pSnode->streamTasks, pIter);
49✔
4702
    if (NULL == pIter) {
49✔
4703
      break;
36✔
4704
    }
4705

4706
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
13✔
4707
    
4708
    task.streamId = streamId;
13✔
4709
    
4710
    pStream = (SStmSnodeStreamStatus*)pIter;
13✔
4711
    if (pStream->trigger) {
13✔
4712
      int8_t stopped = atomic_load_8(&((SStmStatus*)pStream->trigger->pStream)->stopped);
5✔
4713
      if (stopped) {
5✔
4714
        mstsDebug("stream already stopped %d, ignore it", stopped);
2!
4715
        continue;
2✔
4716
      }
4717

4718
      mstsInfo("snode lost with trigger task %" PRIx64 ", will try to restart current stream", pStream->trigger->id.taskId);
3!
4719
      
4720
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_SNODE_LOST, mStreamMgmt.hCtx.currentTs);
3✔
4721
    } else {
4722
      msmHandleRunnerRedeploy(streamId, pStream, &task.deployNum, task.deployId);
8✔
4723
      
4724
      if (task.deployNum > 0) {
8✔
4725
        //task.triggerStatus = pStream->trigger;
4726
        task.multiRunner = true;
7✔
4727
        task.type = STREAM_RUNNER_TASK;
7✔
4728
        
4729
        mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
7✔
4730
        
4731
        mstsInfo("runner tasks %d redeploys added to actionQ", task.deployNum);
7!
4732
      }
4733
    }
4734
  }
4735

4736
  taosHashClear(pSnode->streamTasks);
36✔
4737
}
36✔
4738

4739

4740
void msmCheckSnodeStreamStatus(SHashObj* pStreams) {
243✔
4741
  void* pIter = NULL;
243✔
4742
  SStmSnodeStreamStatus* pSnode = NULL;
243✔
4743
  int64_t streamId = 0;
243✔
4744
  
4745
  while (true) {
4746
    pIter = taosHashIterate(pStreams, pIter);
1,601✔
4747
    if (NULL == pIter) {
1,601✔
4748
      break;
243✔
4749
    }
4750

4751
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
1,358✔
4752
    pSnode = (SStmSnodeStreamStatus*)pIter;
1,358✔
4753

4754
    if (NULL != pSnode->trigger) {
1,358✔
4755
      msmCheckTaskListStatus(streamId, &pSnode->trigger, 1);
1,248✔
4756
    }
4757

4758
    for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
5,432✔
4759
      int32_t taskNum = taosArrayGetSize(pSnode->runners[i]);
4,074✔
4760
      if (taskNum > 0) {
4,074✔
4761
        msmCheckTaskListStatus(streamId, taosArrayGet(pSnode->runners[i], 0), taskNum);
3,720✔
4762
      }
4763
    }
4764
  }
4765
}
243✔
4766

4767

4768
void msmCheckSnodeStatus(SMnode *pMnode) {
499✔
4769
  void* pIter = NULL;
499✔
4770
  
4771
  while (true) {
806✔
4772
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
1,305✔
4773
    if (NULL == pIter) {
1,305✔
4774
      break;
499✔
4775
    }
4776

4777
    int32_t snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
806✔
4778
    if ((snodeId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
806✔
4779
      continue;
523✔
4780
    }
4781

4782
    mstDebug("start to check snode %d status, currTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs);
283✔
4783
    
4784
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
283✔
4785
    if (NULL == pSnode->streamTasks) {
283✔
4786
      mstDebug("ignore snode %d health check since empty tasks", snodeId);
7!
4787
      continue;
7✔
4788
    }
4789
    
4790
    if (MST_PASS_ISOLATION(pSnode->lastUpTs, 1)) {
276✔
4791
      mstInfo("snode %d lost, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
33!
4792
          snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
4793
      
4794
      msmHandleSnodeLost(pMnode, pSnode);
33✔
4795
      continue;
33✔
4796
    }
4797
    
4798
    mstDebug("snode %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs, pSnode->lastUpTs);
243✔
4799

4800
    msmCheckSnodeStreamStatus(pSnode->streamTasks);
243✔
4801
  }
4802
}
499✔
4803

4804

4805
void msmCheckTasksStatus(SMnode *pMnode) {
499✔
4806
  mstDebug("start to check tasks status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
499✔
4807

4808
  msmCheckVgroupStatus(pMnode);
499✔
4809
  msmCheckSnodeStatus(pMnode);
499✔
4810
}
499✔
4811

4812
void msmCheckSnodesState(SMnode *pMnode) {
499✔
4813
  if (!MST_READY_FOR_SNODE_LOOP()) {
499!
4814
    return;
403✔
4815
  }
4816

4817
  mstDebug("ready to check snode loop, lastTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SNODE].ts);
96✔
4818

4819
  void* pIter = NULL;
96✔
4820
  int32_t snodeId = 0;
96✔
4821
  while (true) {
115✔
4822
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
211✔
4823
    if (NULL == pIter) {
211✔
4824
      break;
96✔
4825
    }
4826

4827
    snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
115✔
4828
    if (sdbCheckExists(pMnode->pSdb, SDB_SNODE, &snodeId)) {
115✔
4829
      continue;
112✔
4830
    }
4831

4832
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
3✔
4833
    if (NULL == pSnode->streamTasks) {
3!
4834
      mstDebug("snode %d already cleanup, try to rm it", snodeId);
×
4835
      TAOS_UNUSED(taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId)));
×
4836
      continue;
×
4837
    }
4838
    
4839
    mstWarn("snode %d lost while streams remain, will redeploy all and rm it, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
3!
4840
        snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
4841
    
4842
    msmHandleSnodeLost(pMnode, pSnode);
3✔
4843
  }
4844

4845
  MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
96✔
4846
}
4847

4848
bool msmCheckNeedHealthCheck(SMnode *pMnode) {
19,020✔
4849
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
19,020✔
4850
  if (0 == active || MND_STM_STATE_NORMAL != state) {
19,020!
4851
    mstTrace("ignore health check since active:%d state:%d", active, state);
×
4852
    return false;
×
4853
  }
4854

4855
  if (sdbGetSize(pMnode->pSdb, SDB_STREAM) <= 0) {
19,020✔
4856
    mstTrace("ignore health check since no stream now");
18,022✔
4857
    return false;
18,022✔
4858
  }
4859

4860
  return true;
998✔
4861
}
4862

4863
void msmHealthCheck(SMnode *pMnode) {
18,521✔
4864
  if (!msmCheckNeedHealthCheck(pMnode)) {
18,521✔
4865
    return;
18,022✔
4866
  }
4867

4868
  mstDebug("start wait health check, currentTs:%" PRId64,  taosGetTimestampMs());
913✔
4869
  
4870
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, false);
499✔
4871
  if (!msmCheckNeedHealthCheck(pMnode)) {
499!
4872
    taosWUnLockLatch(&mStreamMgmt.runtimeLock);
×
4873
    return;
×
4874
  }
4875
  
4876
  mStreamMgmt.hCtx.slotIdx = (mStreamMgmt.hCtx.slotIdx + 1) % MND_STREAM_ISOLATION_PERIOD_NUM;
499✔
4877
  mStreamMgmt.hCtx.currentTs = taosGetTimestampMs();
499✔
4878

4879
  mstDebug("start health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
499✔
4880
  
4881
  msmCheckStreamsStatus(pMnode);
499✔
4882
  msmCheckTasksStatus(pMnode);
499✔
4883
  msmCheckSnodesState(pMnode);
499✔
4884

4885
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
499✔
4886

4887
  mstDebug("end health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
499✔
4888
}
4889

4890
static bool msmUpdateProfileStreams(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
4891
  SStreamObj *pStream = pObj;
×
4892
  if (atomic_load_8(&pStream->userDropped) || atomic_load_8(&pStream->userStopped)) {
×
4893
    return true;
×
4894
  }
4895
  
4896
  pStream->updateTime = *(int64_t*)p1;
×
4897
  
4898
  (*(int32_t*)p2)++;
×
4899
  
4900
  return true;
×
4901
}
4902

4903
int32_t msmGetTriggerTaskAddr(SMnode *pMnode, int64_t streamId, SStreamTaskAddr* pAddr) {
×
4904
  int32_t code = 0;
×
4905
  int8_t  stopped = 0;
×
4906
  
4907
  (void)mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
4908
  
4909
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
4910
  if (NULL == pStatus) {
×
4911
    mstsError("stream not exists in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4912
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4913
    goto _exit;
×
4914
  }
4915

4916
  stopped = atomic_load_8(&pStatus->stopped);
×
4917
  if (stopped) {
×
4918
    mstsError("stream already stopped, stopped:%d", stopped);
×
4919
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4920
    goto _exit;
×
4921
  }
4922

4923
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
×
4924
    pAddr->taskId = pStatus->triggerTask->id.taskId;
×
4925
    pAddr->nodeId = pStatus->triggerTask->id.nodeId;
×
4926
    pAddr->epset = mndGetDnodeEpsetById(pMnode, pAddr->nodeId);
×
4927
    mstsDebug("stream trigger task %" PRIx64 " got with nodeId %d", pAddr->taskId, pAddr->nodeId);
×
4928
    goto _exit;
×
4929
  }
4930

4931
  mstsError("trigger task %p not running, status:%s", pStatus->triggerTask, pStatus->triggerTask ? gStreamStatusStr[pStatus->triggerTask->status] : "unknown");
×
4932
  code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4933

4934
_exit:
×
4935
  
4936
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
4937

4938
  return code;
×
4939
}
4940

4941
int32_t msmInitRuntimeInfo(SMnode *pMnode) {
1,711✔
4942
  int32_t code = TSDB_CODE_SUCCESS;
1,711✔
4943
  int32_t lino = 0;
1,711✔
4944
  int32_t vnodeNum = sdbGetSize(pMnode->pSdb, SDB_VGROUP);
1,711✔
4945
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,711✔
4946
  int32_t dnodeNum = sdbGetSize(pMnode->pSdb, SDB_DNODE);
1,711✔
4947

4948
  MND_STREAM_SET_LAST_TS(STM_EVENT_ACTIVE_BEGIN, taosGetTimestampMs());
3,396✔
4949

4950
  mStreamMgmt.stat.activeTimes++;
1,711✔
4951
  mStreamMgmt.threadNum = tsNumOfMnodeStreamMgmtThreads;
1,711✔
4952
  mStreamMgmt.tCtx = taosMemoryCalloc(mStreamMgmt.threadNum, sizeof(SStmThreadCtx));
1,711!
4953
  if (NULL == mStreamMgmt.tCtx) {
1,711!
4954
    code = terrno;
×
4955
    mstError("failed to initialize the stream runtime tCtx, threadNum:%d, error:%s", mStreamMgmt.threadNum, tstrerror(code));
×
4956
    goto _exit;
×
4957
  }
4958

4959
  mStreamMgmt.actionQ = taosMemoryCalloc(1, sizeof(SStmActionQ));
1,711!
4960
  if (mStreamMgmt.actionQ == NULL) {
1,711!
4961
    code = terrno;
×
4962
    mError("failed to initialize the stream runtime actionQ, error:%s", tstrerror(code));
×
4963
    goto _exit;
×
4964
  }
4965
  
4966
  mStreamMgmt.actionQ->head = taosMemoryCalloc(1, sizeof(SStmQNode));
1,711!
4967
  TSDB_CHECK_NULL(mStreamMgmt.actionQ->head, code, lino, _exit, terrno);
1,711!
4968
  
4969
  mStreamMgmt.actionQ->tail = mStreamMgmt.actionQ->head;
1,711✔
4970
  
4971
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
10,194✔
4972
    SStmThreadCtx* pCtx = mStreamMgmt.tCtx + i;
8,483✔
4973

4974
    for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
50,898✔
4975
      pCtx->deployStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
42,415✔
4976
      if (pCtx->deployStm[m] == NULL) {
42,415!
4977
        code = terrno;
×
4978
        mError("failed to initialize the stream runtime deployStm[%d][%d], error:%s", i, m, tstrerror(code));
×
4979
        goto _exit;
×
4980
      }
4981
      taosHashSetFreeFp(pCtx->deployStm[m], tDeepFreeSStmStreamDeploy);
42,415✔
4982
      
4983
      pCtx->actionStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
42,415✔
4984
      if (pCtx->actionStm[m] == NULL) {
42,415!
4985
        code = terrno;
×
4986
        mError("failed to initialize the stream runtime actionStm[%d][%d], error:%s", i, m, tstrerror(code));
×
4987
        goto _exit;
×
4988
      }
4989
      taosHashSetFreeFp(pCtx->actionStm[m], mstDestroySStmAction);
42,415✔
4990
    }
4991
  }
4992
  
4993
  mStreamMgmt.streamMap = taosHashInit(MND_STREAM_DEFAULT_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,711✔
4994
  if (mStreamMgmt.streamMap == NULL) {
1,711!
4995
    code = terrno;
×
4996
    mError("failed to initialize the stream runtime streamMap, error:%s", tstrerror(code));
×
4997
    goto _exit;
×
4998
  }
4999
  taosHashSetFreeFp(mStreamMgmt.streamMap, mstDestroySStmStatus);
1,711✔
5000
  
5001
  mStreamMgmt.taskMap = taosHashInit(MND_STREAM_DEFAULT_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,711✔
5002
  if (mStreamMgmt.taskMap == NULL) {
1,711!
5003
    code = terrno;
×
5004
    mError("failed to initialize the stream runtime taskMap, error:%s", tstrerror(code));
×
5005
    goto _exit;
×
5006
  }
5007
  
5008
  mStreamMgmt.vgroupMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,711✔
5009
  if (mStreamMgmt.vgroupMap == NULL) {
1,711!
5010
    code = terrno;
×
5011
    mError("failed to initialize the stream runtime vgroupMap, error:%s", tstrerror(code));
×
5012
    goto _exit;
×
5013
  }
5014
  taosHashSetFreeFp(mStreamMgmt.vgroupMap, mstDestroySStmVgroupStatus);
1,711✔
5015

5016
  mStreamMgmt.snodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,711✔
5017
  if (mStreamMgmt.snodeMap == NULL) {
1,711!
5018
    code = terrno;
×
5019
    mError("failed to initialize the stream runtime snodeMap, error:%s", tstrerror(code));
×
5020
    goto _exit;
×
5021
  }
5022
  taosHashSetFreeFp(mStreamMgmt.snodeMap, mstDestroySStmSnodeStatus);
1,711✔
5023
  
5024
  mStreamMgmt.dnodeMap = taosHashInit(dnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,711✔
5025
  if (mStreamMgmt.dnodeMap == NULL) {
1,711!
5026
    code = terrno;
×
5027
    mError("failed to initialize the stream runtime dnodeMap, error:%s", tstrerror(code));
×
5028
    goto _exit;
×
5029
  }
5030

5031
  mStreamMgmt.toDeployVgMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,711✔
5032
  if (mStreamMgmt.toDeployVgMap == NULL) {
1,711!
5033
    code = terrno;
×
5034
    mError("failed to initialize the stream runtime toDeployVgMap, error:%s", tstrerror(code));
×
5035
    goto _exit;
×
5036
  }
5037
  taosHashSetFreeFp(mStreamMgmt.toDeployVgMap, mstDestroySStmVgTasksToDeploy);
1,711✔
5038
  
5039
  mStreamMgmt.toDeploySnodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,711✔
5040
  if (mStreamMgmt.toDeploySnodeMap == NULL) {
1,711!
5041
    code = terrno;
×
5042
    mError("failed to initialize the stream runtime toDeploySnodeMap, error:%s", tstrerror(code));
×
5043
    goto _exit;
×
5044
  }
5045
  taosHashSetFreeFp(mStreamMgmt.toDeploySnodeMap, mstDestroySStmSnodeTasksDeploy);
1,711✔
5046

5047
  mStreamMgmt.toUpdateScanMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,711✔
5048
  if (mStreamMgmt.toUpdateScanMap == NULL) {
1,711!
5049
    code = terrno;
×
5050
    mError("failed to initialize the stream runtime toUpdateScanMap, error:%s", tstrerror(code));
×
5051
    goto _exit;
×
5052
  }
5053
  taosHashSetFreeFp(mStreamMgmt.toUpdateScanMap, mstDestroyScanAddrList);
1,711✔
5054

5055
  TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
1,711!
5056
  TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pMnode));
1,711!
5057

5058
  mStreamMgmt.lastTaskId = 1;
1,711✔
5059

5060
  int32_t activeStreamNum = 0;
1,711✔
5061
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmUpdateProfileStreams, &MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN), &activeStreamNum, NULL);
1,711✔
5062

5063
  if (activeStreamNum > 0) {
1,711!
5064
    msmSetInitRuntimeState(MND_STM_STATE_WATCH);
×
5065
  } else {
5066
    msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
1,711✔
5067
  }
5068

5069
_exit:
1,711✔
5070

5071
  if (code) {
1,711!
5072
    msmDestroyRuntimeInfo(pMnode);
×
5073
    mstError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
5074
  } else {
5075
    mstInfo("mnode stream runtime init done");
1,711!
5076
  }
5077

5078
  return code;
1,711✔
5079
}
5080

5081

5082

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc