• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4661

08 Aug 2025 08:36AM UTC coverage: 59.883% (-0.2%) from 60.053%
#4661

push

travis-ci

web-flow
test: update cases desc (#32498)

137331 of 291923 branches covered (47.04%)

Branch coverage included in aggregate %.

207730 of 284307 relevant lines covered (73.07%)

4552406.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.53
/source/dnode/mnode/impl/src/mndStreamMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndStb.h"
21
#include "mndTrans.h"
22
#include "osMemory.h"
23
#include "parser.h"
24
#include "taoserror.h"
25
#include "tmisce.h"
26
#include "tname.h"
27
#include "mndDnode.h"
28
#include "mndVgroup.h"
29
#include "mndSnode.h"
30
#include "mndMnode.h"
31

32
void msmDestroyActionQ() {
4,774✔
33
  SStmQNode* pQNode = NULL;
4,774✔
34

35
  if (NULL == mStreamMgmt.actionQ) {
4,774✔
36
    return;
2,515✔
37
  }
38

39
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
2,263✔
40
  }
41

42
  taosMemoryFreeClear(mStreamMgmt.actionQ->head);
2,259!
43
  taosMemoryFreeClear(mStreamMgmt.actionQ);
2,259!
44
}
45

46
void msmDestroySStmThreadCtx(SStmThreadCtx* pCtx) {
11,271✔
47
  for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
67,626✔
48
    taosHashCleanup(pCtx->deployStm[m]);
56,355✔
49
    taosHashCleanup(pCtx->actionStm[m]);
56,355✔
50
  }
51
}
11,271✔
52

53
void msmDestroyThreadCtxs() {
4,774✔
54
  if (NULL == mStreamMgmt.tCtx) {
4,774✔
55
    return;
2,515✔
56
  }
57
  
58
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
13,530✔
59
    msmDestroySStmThreadCtx(mStreamMgmt.tCtx + i);
11,271✔
60
  }
61
  taosMemoryFreeClear(mStreamMgmt.tCtx);
2,259!
62
}
63

64

65
void msmDestroyRuntimeInfo(SMnode *pMnode) {
4,774✔
66
  msmDestroyActionQ();
4,774✔
67
  msmDestroyThreadCtxs();
4,774✔
68

69
  taosHashCleanup(mStreamMgmt.toUpdateScanMap);
4,774✔
70
  mStreamMgmt.toUpdateScanMap = NULL;
4,774✔
71
  mStreamMgmt.toUpdateScanNum = 0;
4,774✔
72
  taosHashCleanup(mStreamMgmt.toDeployVgMap);
4,774✔
73
  mStreamMgmt.toDeployVgMap = NULL;
4,774✔
74
  mStreamMgmt.toDeployVgTaskNum = 0;
4,774✔
75
  taosHashCleanup(mStreamMgmt.toDeploySnodeMap);
4,774✔
76
  mStreamMgmt.toDeploySnodeMap = NULL;
4,774✔
77
  mStreamMgmt.toDeploySnodeTaskNum = 0;
4,774✔
78

79
  taosHashCleanup(mStreamMgmt.dnodeMap);
4,774✔
80
  mStreamMgmt.dnodeMap = NULL;
4,774✔
81
  taosHashCleanup(mStreamMgmt.snodeMap);
4,774✔
82
  mStreamMgmt.snodeMap = NULL;
4,774✔
83
  taosHashCleanup(mStreamMgmt.vgroupMap);
4,774✔
84
  mStreamMgmt.vgroupMap = NULL;
4,774✔
85
  taosHashCleanup(mStreamMgmt.taskMap);
4,774✔
86
  mStreamMgmt.taskMap = NULL;
4,774✔
87
  taosHashCleanup(mStreamMgmt.streamMap);
4,774✔
88
  mStreamMgmt.streamMap = NULL;
4,774✔
89

90
  memset(mStreamMgmt.lastTs, 0, sizeof(mStreamMgmt.lastTs));
4,774✔
91

92
  mstInfo("mnode stream mgmt destroyed");  
4,774!
93
}
4,774✔
94

95

96
int32_t msmStopStreamByError(int64_t streamId, SStmStatus* pStatus, int32_t errCode, int64_t currTs) {
16✔
97
  int32_t code = TSDB_CODE_SUCCESS;
16✔
98
  int32_t lino = 0;
16✔
99
  SStmStatus* pStream = NULL;
16✔
100

101
  mstsInfo("try to stop stream for error: %s", tstrerror(errCode));
16!
102

103
  if (NULL == pStatus) {
16!
104
    pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
16✔
105
    if (NULL == pStream) {
16!
106
      mstsInfo("stream already not in streamMap, error:%s", tstrerror(terrno));
×
107
      goto _exit;
×
108
    }
109

110
    pStatus = pStream;
16✔
111
  }
112

113
  int8_t stopped = atomic_load_8(&pStatus->stopped);
16✔
114
  if (stopped) {
16!
115
    mstsDebug("stream already stopped %d, ignore stop", stopped);
×
116
    goto _exit;
×
117
  }
118

119
  if (pStatus->triggerTask && pStatus->triggerTask->runningStartTs && (currTs - pStatus->triggerTask->runningStartTs) > 2 * MST_ISOLATION_DURATION) {
16!
120
    pStatus->fatalRetryTimes = 0;
8✔
121
    mstsDebug("reset stream retryTimes, running duation:%" PRId64 "ms", currTs - pStatus->triggerTask->runningStartTs);
8!
122
  }
123

124
  pStatus->fatalRetryTimes++;
16✔
125
  pStatus->fatalError = errCode;
16✔
126
  pStatus->fatalRetryDuration = (pStatus->fatalRetryTimes > 10) ? MST_MAX_RETRY_DURATION : MST_ISOLATION_DURATION;
16!
127
  pStatus->fatalRetryTs = currTs + pStatus->fatalRetryDuration;
16✔
128

129
  pStatus->stat.lastError = errCode;
16✔
130
    
131
  if (0 == atomic_val_compare_exchange_8(&pStatus->stopped, 0, 1)) {
16!
132
    MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, currTs);
16!
133
  }
134

135
_exit:
×
136

137
  taosHashRelease(mStreamMgmt.streamMap, pStream);
16✔
138

139
  if (code) {
16!
140
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
141
  }
142

143
  return code;
16✔
144
}
145

146

147
static void msmSetInitRuntimeState(int8_t state) {
2,259✔
148
  switch (state) {
2,259!
149
    case MND_STM_STATE_WATCH:
×
150
      mStreamMgmt.watch.ending = 0;
×
151
      mStreamMgmt.watch.taskRemains = 0;
×
152
      mStreamMgmt.watch.processing = 0;
×
153
      mstInfo("switch to WATCH state");
×
154
      break;
×
155
    case MND_STM_STATE_NORMAL:
2,259✔
156
      MND_STREAM_SET_LAST_TS(STM_EVENT_NORMAL_BEGIN, taosGetTimestampMs());
4,497✔
157
      mstInfo("switch to NORMAL state");
2,259!
158
      break;
2,259✔
159
    default:
×
160
      return;
×
161
  }
162
  
163
  atomic_store_8(&mStreamMgmt.state, state);
2,259✔
164
}
165

166
void msmSTDeleteSnodeFromMap(int32_t snodeId) {
×
167
  int32_t code = taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
×
168
  if (code) {
×
169
    mstWarn("remove snode %d from snodeMap failed, error:%s", snodeId, tstrerror(code));
×
170
  } else {
171
    mstInfo("snode %d removed from snodeMap", snodeId);
×
172
  }
173
}
×
174

175
static int32_t msmSTAddSnodesToMap(SMnode* pMnode) {
2,347✔
176
  int32_t code = TSDB_CODE_SUCCESS;
2,347✔
177
  int32_t lino = 0;
2,347✔
178
  SStmSnodeStatus tasks = {0};
2,347✔
179
  SSnodeObj *pSnode = NULL;
2,347✔
180
  void *pIter = NULL;
2,347✔
181
  while (1) {
182
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pSnode);
2,562✔
183
    if (pIter == NULL) {
2,562✔
184
      break;
2,347✔
185
    }
186

187
    tasks.lastUpTs = taosGetTimestampMs();
215✔
188
    code = taosHashPut(mStreamMgmt.snodeMap, &pSnode->id, sizeof(pSnode->id), &tasks, sizeof(tasks));
215✔
189
    if (code && TSDB_CODE_DUP_KEY != code) {
215!
190
      sdbRelease(pMnode->pSdb, pSnode);
×
191
      sdbCancelFetch(pMnode->pSdb, pIter);
×
192
      pSnode = NULL;
×
193
      TAOS_CHECK_EXIT(code);
×
194
    }
195

196
    code = TSDB_CODE_SUCCESS;
215✔
197
  
198
    sdbRelease(pMnode->pSdb, pSnode);
215✔
199
  }
200

201
  pSnode = NULL;
2,347✔
202

203
_exit:
2,347✔
204

205
  if (code) {
2,347!
206
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
207
  }
208

209
  return code;
2,347✔
210
}
211

212
static int32_t msmSTAddDnodesToMap(SMnode* pMnode) {
4,420✔
213
  int32_t code = TSDB_CODE_SUCCESS;
4,420✔
214
  int32_t lino = 0;
4,420✔
215
  int64_t lastUpTs = INT64_MIN;
4,420✔
216
  SDnodeObj *pDnode = NULL;
4,420✔
217
  void *pIter = NULL;
4,420✔
218
  while (1) {
219
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
8,801✔
220
    if (pIter == NULL) {
8,801✔
221
      break;
4,420✔
222
    }
223

224
    code = taosHashPut(mStreamMgmt.dnodeMap, &pDnode->id, sizeof(pDnode->id), &lastUpTs, sizeof(lastUpTs));
4,381✔
225
    if (code && TSDB_CODE_DUP_KEY != code) {
4,381!
226
      sdbRelease(pMnode->pSdb, pDnode);
×
227
      sdbCancelFetch(pMnode->pSdb, pIter);
×
228
      pDnode = NULL;
×
229
      TAOS_CHECK_EXIT(code);
×
230
    }
231

232
    code = TSDB_CODE_SUCCESS;
4,381✔
233
    sdbRelease(pMnode->pSdb, pDnode);
4,381✔
234
  }
235

236
  pDnode = NULL;
4,420✔
237

238
_exit:
4,420✔
239

240
  if (code) {
4,420!
241
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
242
  }
243

244
  return code;
4,420✔
245
}
246

247

248

249
static int32_t msmSTAddToTaskMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask) {
1,600✔
250
  int32_t code = TSDB_CODE_SUCCESS;
1,600✔
251
  int32_t lino = 0;
1,600✔
252
  int32_t taskNum = pTask ? 1 : taosArrayGetSize(pTasks);
1,600✔
253
  int64_t key[2] = {streamId, 0};
1,600✔
254
  
255
  for (int32_t i = 0; i < taskNum; ++i) {
3,133✔
256
    SStmTaskStatus* pStatus = pTask ? pTask : taosArrayGet(pTasks, i);
1,533✔
257
    key[1] = pStatus->id.taskId;
1,533✔
258
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.taskMap, key, sizeof(key), &pStatus, POINTER_BYTES));
1,533!
259
    mstsDebug("task %" PRIx64" tidx %d added to taskMap", pStatus->id.taskId, pStatus->id.taskIdx);
1,533✔
260
  }
261
  
262
_exit:
1,600✔
263

264
  if (code) {
1,600!
265
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
266
  }
267
  
268
  return code;
1,600✔
269
}
270

271
static int32_t msmSTAddToVgStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
480✔
272
  int32_t code = TSDB_CODE_SUCCESS;
480✔
273
  int32_t lino = 0;
480✔
274
  SStmVgStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
480✔
275
  if (NULL == pStream) {
480✔
276
    SStmVgStreamStatus stream = {0};
414✔
277
    if (trigReader) {
414✔
278
      stream.trigReaders = taosArrayInit(1, POINTER_BYTES);
357✔
279
      TSDB_CHECK_NULL(stream.trigReaders, code, lino, _exit, terrno);
357!
280
      TSDB_CHECK_NULL(taosArrayPush(stream.trigReaders, &pStatus), code, lino, _exit, terrno);
714!
281
    } else {
282
      stream.calcReaders = taosArrayInit(2, POINTER_BYTES);
57✔
283
      TSDB_CHECK_NULL(stream.calcReaders, code, lino, _exit, terrno);
57!
284
      TSDB_CHECK_NULL(taosArrayPush(stream.calcReaders, &pStatus), code, lino, _exit, terrno);
114!
285
    }
286
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
414!
287
    goto _exit;
414✔
288
  }
289
  
290
  if (trigReader) {
66✔
291
    if (NULL == pStream->trigReaders) {
2!
292
      pStream->trigReaders = taosArrayInit(1, POINTER_BYTES);
2✔
293
      TSDB_CHECK_NULL(pStream->trigReaders, code, lino, _exit, terrno);
2!
294
    }
295
    
296
    TSDB_CHECK_NULL(taosArrayPush(pStream->trigReaders, &pStatus), code, lino, _exit, terrno);
4!
297
    goto _exit;
2✔
298
  }
299
  
300
  if (NULL == pStream->calcReaders) {
64!
301
    pStream->calcReaders = taosArrayInit(1, POINTER_BYTES);
64✔
302
    TSDB_CHECK_NULL(pStream->calcReaders, code, lino, _exit, terrno);
64!
303
  }
304

305
  TSDB_CHECK_NULL(taosArrayPush(pStream->calcReaders, &pStatus), code, lino, _exit, terrno);
128!
306

307
_exit:
64✔
308

309
  if (code) {
480!
310
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to vgroup %d streamHash in %s at line %d, error:%s", 
×
311
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, __FUNCTION__, lino, tstrerror(code));
312
  } else {
313
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to vgroup %d streamHash", 
480✔
314
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId);
315
  }
316

317
  return code;
480✔
318
}
319

320
static int32_t msmSTAddToVgroupMapImpl(int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
480✔
321
  int32_t code = TSDB_CODE_SUCCESS;
480✔
322
  int32_t lino = 0;
480✔
323
  SStmVgroupStatus vg = {0};
480✔
324

325
  SStmVgroupStatus* pVg = taosHashGet(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
480✔
326
  if (NULL == pVg) {
480✔
327
    vg.streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
167✔
328
    TSDB_CHECK_NULL(vg.streamTasks, code, lino, _exit, terrno);
167!
329
    taosHashSetFreeFp(vg.streamTasks, mstDestroySStmVgStreamStatus);
167✔
330

331
    vg.lastUpTs = taosGetTimestampMs();
167✔
332
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(vg.streamTasks, streamId, pStatus, trigReader));
167!
333
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId), &vg, sizeof(vg)));
167!
334
  } else {
335
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(pVg->streamTasks, streamId, pStatus, trigReader));
313!
336
  }
337
  
338
_exit:
313✔
339

340
  if (code) {
480!
341
    mstDestroyVgroupStatus(&vg);
×
342
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
343
  } else {
344
    mstsDebug("task %" PRIx64 " tidx %d added to vgroupMap %d", pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
480✔
345
  }
346

347
  return code;
480✔
348
}
349

350
static int32_t msmTDAddToVgroupMap(SHashObj* pVgMap, SStmTaskDeploy* pDeploy, int64_t streamId) {
508✔
351
  int32_t code = TSDB_CODE_SUCCESS;
508✔
352
  int32_t lino = 0;
508✔
353
  SStmVgTasksToDeploy vg = {0};
508✔
354
  SStreamTask* pTask = &pDeploy->task;
508✔
355
  SStmTaskToDeployExt ext = {0};
508✔
356
  ext.deploy = *pDeploy;
508✔
357

358
  while (true) {
×
359
    SStmVgTasksToDeploy* pVg = taosHashAcquire(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
508✔
360
    if (NULL == pVg) {
508✔
361
      vg.taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
201✔
362
      TSDB_CHECK_NULL(vg.taskList, code, lino, _return, terrno);
201!
363
      TSDB_CHECK_NULL(taosArrayPush(vg.taskList, &ext), code, lino, _return, terrno);
402!
364
      code = taosHashPut(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &vg, sizeof(SStmVgTasksToDeploy));
201✔
365
      if (TSDB_CODE_SUCCESS == code) {
201!
366
        goto _return;
201✔
367
      }
368

369
      if (TSDB_CODE_DUP_KEY != code) {
×
370
        goto _return;
×
371
      }    
372

373
      taosArrayDestroy(vg.taskList);
×
374
      continue;
×
375
    }
376

377
    taosWLockLatch(&pVg->lock);
307✔
378
    if (NULL == pVg->taskList) {
307!
379
      pVg->taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
×
380
      TSDB_CHECK_NULL(pVg->taskList, code, lino, _return, terrno);
×
381
    }
382
    if (NULL == taosArrayPush(pVg->taskList, &ext)) {
614!
383
      taosWUnLockLatch(&pVg->lock);
×
384
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
385
    }
386
    taosWUnLockLatch(&pVg->lock);
307✔
387
    
388
    taosHashRelease(pVgMap, pVg);
307✔
389
    break;
307✔
390
  }
391
  
392
_return:
508✔
393

394
  if (code) {
508!
395
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
396
  } else {
397
    int32_t num = atomic_add_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
508✔
398
    msttDebug("task added to toDeployVgTaskNum, vgToDeployTaskNum:%d", num);
508✔
399
  }
400

401
  return code;
508✔
402
}
403

404

405
static int32_t msmSTAddToSnodeStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
1,056✔
406
  int32_t code = TSDB_CODE_SUCCESS;
1,056✔
407
  int32_t lino = 0;
1,056✔
408
  SStmSnodeStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
1,056✔
409
  if (NULL == pStream) {
1,056✔
410
    SStmSnodeStreamStatus stream = {0};
303✔
411
    if (deployId < 0) {
303✔
412
      stream.trigger = pStatus;
1✔
413
    } else {
414
      stream.runners[deployId] = taosArrayInit(2, POINTER_BYTES);
302✔
415
      TSDB_CHECK_NULL(stream.runners[deployId], code, lino, _exit, terrno);
302!
416
      TSDB_CHECK_NULL(taosArrayPush(stream.runners[deployId], &pStatus), code, lino, _exit, terrno);
604!
417
    }
418
    
419
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
303!
420
    goto _exit;
303✔
421
  }
422
  
423
  if (deployId < 0) {
753✔
424
    if (NULL != pStream->trigger) {
263!
425
      mstsWarn("stream already got trigger task %" PRIx64 " SID:%" PRIx64 " in snode %d, replace it with task %" PRIx64 " SID:%" PRIx64, 
×
426
          pStream->trigger->id.taskId, pStream->trigger->id.seriousId, pStatus->id.nodeId, pStatus->id.taskId, pStatus->id.seriousId);
427
    }
428
    
429
    pStream->trigger = pStatus;
263✔
430
    goto _exit;
263✔
431
  }
432
  
433
  if (NULL == pStream->runners[deployId]) {
490!
434
    pStream->runners[deployId] = taosArrayInit(2, POINTER_BYTES);
490✔
435
    TSDB_CHECK_NULL(pStream->runners[deployId], code, lino, _exit, terrno);
490!
436
  }
437

438
  TSDB_CHECK_NULL(taosArrayPush(pStream->runners[deployId], &pStatus), code, lino, _exit, terrno);
980!
439

440
_exit:
490✔
441

442
  if (code) {
1,056!
443
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to snode %d streamHash deployId:%d in %s at line %d, error:%s", 
×
444
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId, __FUNCTION__, lino, tstrerror(code));
445
  } else {
446
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to snode %d streamHash deployId:%d", 
1,056✔
447
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId);
448
  }
449

450
  return code;
1,056✔
451
}
452

453

454
static int32_t msmSTAddToSnodeMapImpl(int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
1,056✔
455
  int32_t code = TSDB_CODE_SUCCESS;
1,056✔
456
  int32_t lino = 0;
1,056✔
457

458
  SStmSnodeStatus* pSnode = taosHashGet(mStreamMgmt.snodeMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
1,056✔
459
  if (NULL == pSnode) {
1,056!
460
    mstsWarn("snode %d not exists in snodeMap anymore, may be dropped", pStatus->id.nodeId);
×
461
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
462
  } else {
463
    if (NULL == pSnode->streamTasks) {
1,056✔
464
      pSnode->streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
51✔
465
      TSDB_CHECK_NULL(pSnode->streamTasks, code, lino, _exit, terrno);
51!
466
      taosHashSetFreeFp(pSnode->streamTasks, mstDestroySStmSnodeStreamStatus);
51✔
467
    }
468
    
469
    TAOS_CHECK_EXIT(msmSTAddToSnodeStreamHash(pSnode->streamTasks, streamId, pStatus, deployId));
1,056!
470
  }
471
  
472
_exit:
1,056✔
473

474
  if (code) {
1,056!
475
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
476
  } else {
477
    mstsDebug("%s task %" PRIx64 " tidx %d added to snodeMap, snodeId:%d", (deployId < 0) ? "trigger" : "runner", 
1,056✔
478
        pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
479
  }
480

481
  return code;
1,056✔
482
}
483

484

485

486
static int32_t msmTDAddTriggerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
264✔
487
  int32_t code = TSDB_CODE_SUCCESS;
264✔
488
  int32_t lino = 0;
264✔
489
  int64_t streamId = pStream->pCreate->streamId;
264✔
490
  SStmSnodeTasksDeploy snode = {0};
264✔
491
  SStmTaskToDeployExt ext;
492
  SStreamTask* pTask = &pDeploy->task;
264✔
493

494
  while (true) {
×
495
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
264✔
496
    if (NULL == pSnode) {
264✔
497
      snode.triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
1✔
498
      TSDB_CHECK_NULL(snode.triggerList, code, lino, _return, terrno);
1!
499

500
      ext.deploy = *pDeploy;
1✔
501
      ext.deployed = false;
1✔
502
      TSDB_CHECK_NULL(taosArrayPush(snode.triggerList, &ext), code, lino, _return, terrno);
2!
503

504
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
1✔
505
      if (TSDB_CODE_SUCCESS == code) {
1!
506
        goto _return;
1✔
507
      }
508

509
      if (TSDB_CODE_DUP_KEY != code) {
×
510
        goto _return;
×
511
      }    
512

513
      taosArrayDestroy(snode.triggerList);
×
514
      continue;
×
515
    }
516
    
517
    taosWLockLatch(&pSnode->lock);
263✔
518
    if (NULL == pSnode->triggerList) {
263✔
519
      pSnode->triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
76✔
520
      if (NULL == pSnode->triggerList) {
76!
521
        taosWUnLockLatch(&pSnode->lock);
×
522
        TSDB_CHECK_NULL(pSnode->triggerList, code, lino, _return, terrno);
×
523
      }
524
    }
525
    
526
    ext.deploy = *pDeploy;
263✔
527
    ext.deployed = false;
263✔
528
    
529
    if (NULL == taosArrayPush(pSnode->triggerList, &ext)) {
526!
530
      taosWUnLockLatch(&pSnode->lock);
×
531
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
532
    }
533
    taosWUnLockLatch(&pSnode->lock);
263✔
534
    
535
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
263✔
536
    break;
263✔
537
  }
538
  
539
_return:
264✔
540

541
  if (code) {
264!
542
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
543
  } else {
544
    msttDebug("trigger task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
264✔
545
  }
546

547
  return code;
264✔
548
}
549

550
static int32_t msmTDAddRunnerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
792✔
551
  int32_t code = TSDB_CODE_SUCCESS;
792✔
552
  int32_t lino = 0;
792✔
553
  int64_t streamId = pStream->pCreate->streamId;
792✔
554
  SStmSnodeTasksDeploy snode = {0};
792✔
555
  SStmTaskToDeployExt ext;
556
  SStreamTask* pTask = &pDeploy->task;
792✔
557

558
  while (true) {
×
559
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
792✔
560
    if (NULL == pSnode) {
792✔
561
      snode.runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
100✔
562
      TSDB_CHECK_NULL(snode.runnerList, code, lino, _return, terrno);
100!
563

564
      ext.deploy = *pDeploy;
100✔
565
      ext.deployed = false;
100✔
566
      TSDB_CHECK_NULL(taosArrayPush(snode.runnerList, &ext), code, lino, _return, terrno);
200!
567

568
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
100✔
569
      if (TSDB_CODE_SUCCESS == code) {
100!
570
        goto _return;
100✔
571
      }
572

573
      if (TSDB_CODE_DUP_KEY != code) {
×
574
        goto _return;
×
575
      }    
576

577
      taosArrayDestroy(snode.runnerList);
×
578
      continue;
×
579
    }
580
    
581
    taosWLockLatch(&pSnode->lock);
692✔
582
    if (NULL == pSnode->runnerList) {
692✔
583
      pSnode->runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
1✔
584
      if (NULL == pSnode->runnerList) {
1!
585
        taosWUnLockLatch(&pSnode->lock);
×
586
        TSDB_CHECK_NULL(pSnode->runnerList, code, lino, _return, terrno);
×
587
      }
588
    }
589
    
590
    ext.deploy = *pDeploy;
692✔
591
    ext.deployed = false;
692✔
592
    
593
    if (NULL == taosArrayPush(pSnode->runnerList, &ext)) {
1,384!
594
      taosWUnLockLatch(&pSnode->lock);
×
595
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
596
    }
597
    taosWUnLockLatch(&pSnode->lock);
692✔
598
    
599
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
692✔
600
    break;
692✔
601
  }
602
  
603
_return:
792✔
604

605
  if (code) {
792!
606
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
607
  } else {
608
    msttDebug("task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
792✔
609
  }
610

611
  return code;
792✔
612
}
613

614
static int32_t msmTDAddRunnersToSnodeMap(SArray* runnerList, SStreamObj* pStream) {
792✔
615
  int32_t code = TSDB_CODE_SUCCESS;
792✔
616
  int32_t lino = 0;
792✔
617
  int32_t runnerNum = taosArrayGetSize(runnerList);
792✔
618
  SStmTaskDeploy* pDeploy = NULL;
792✔
619
  int64_t streamId = pStream->pCreate->streamId;
792✔
620

621
  for (int32_t i = 0; i < runnerNum; ++i) {
1,584✔
622
    pDeploy = taosArrayGet(runnerList, i);
792✔
623
    
624
    TAOS_CHECK_EXIT(msmTDAddRunnerToSnodeMap(pDeploy, pStream));
792!
625
    
626
    atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);    
792✔
627
  }
628

629
_exit:
792✔
630

631
  if (code) {
792!
632
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
633
  }
634

635
  return code;
792✔
636
}
637

638

639
int32_t msmUpdateSnodeUpTs(SStmGrpCtx* pCtx) {
1,778✔
640
  int32_t  code = TSDB_CODE_SUCCESS;
1,778✔
641
  int32_t  lino = 0;
1,778✔
642
  SStmSnodeStatus* pStatus = NULL;
1,778✔
643
  bool     noExists = false;
1,778✔
644

645
  while (true) {
646
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &pCtx->pReq->snodeId, sizeof(pCtx->pReq->snodeId));
1,804✔
647
    if (NULL == pStatus) {
1,804✔
648
      if (noExists) {
26!
649
        mstWarn("snode %d not exists in snodeMap, may be dropped, ignore it", pCtx->pReq->snodeId);
×
650
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
×
651
      }
652

653
      noExists = true;
26✔
654
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pCtx->pMnode));
26!
655
      
656
      continue;
26✔
657
    }
658

659
    atomic_store_32(&pStatus->runnerThreadNum, pCtx->pReq->runnerThreadNum);
1,778✔
660
    
661
    while (true) {
×
662
      int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
1,778✔
663
      if (pCtx->currTs > lastTsValue) {
1,778✔
664
        if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
1,752!
665
          mstDebug("snode %d lastUpTs updated", pCtx->pReq->snodeId);
1,752✔
666
          return code;
1,752✔
667
        }
668

669
        continue;
×
670
      }
671

672
      return code;
26✔
673
    }
674

675
    break;
676
  }
677

678
_exit:
×
679

680
  if (code) {
×
681
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
682
  }
683

684
  return code;  
×
685
}
686

687
void msmUpdateVgroupUpTs(SStmGrpCtx* pCtx, int32_t vgId) {
85,063✔
688
  int32_t  code = TSDB_CODE_SUCCESS;
85,063✔
689
  int32_t  lino = 0;
85,063✔
690
  SStmVgroupStatus* pStatus = taosHashGet(mStreamMgmt.vgroupMap, &vgId, sizeof(vgId));
85,063✔
691
  if (NULL == pStatus) {
85,063✔
692
    mstDebug("vgroup %d not exists in vgroupMap, ignore update upTs", vgId);
79,536✔
693
    return;
79,536✔
694
  }
695

696
  while (true) {
×
697
    int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
5,527✔
698
    if (pCtx->currTs > lastTsValue) {
5,527✔
699
      if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
5,361!
700
        mstDebug("vgroup %d lastUpTs updated to %" PRId64, vgId, pCtx->currTs);
5,361✔
701
        return;
5,361✔
702
      }
703

704
      continue;
×
705
    }
706

707
    return;
166✔
708
  }  
709
}
710

711
int32_t msmUpdateVgroupsUpTs(SStmGrpCtx* pCtx) {
31,859✔
712
  int32_t code = TSDB_CODE_SUCCESS;
31,859✔
713
  int32_t lino = 0;
31,859✔
714
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
31,859✔
715

716
  mstDebug("start to update vgroups upTs");
31,859✔
717
  
718
  for (int32_t i = 0; i < vgNum; ++i) {
116,539✔
719
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
84,680✔
720

721
    msmUpdateVgroupUpTs(pCtx, *vgId);
84,680✔
722
  }
723

724
_exit:
31,859✔
725

726
  if (code) {
31,859!
727
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
728
  }
729

730
  return code;
31,859✔
731
}
732

733

734

735
void* msmSearchCalcCacheScanPlan(SArray* pList) {
416✔
736
  int32_t num = taosArrayGetSize(pList);
416✔
737
  for (int32_t i = 0; i < num; ++i) {
556✔
738
    SStreamCalcScan* pScan = taosArrayGet(pList, i);
415✔
739
    if (pScan->readFromCache) {
415✔
740
      return pScan->scanPlan;
275✔
741
    }
742
  }
743

744
  return NULL;
141✔
745
}
746

747
int32_t msmBuildReaderDeployInfo(SStmTaskDeploy* pDeploy, void* calcScanPlan, SStmStatus* pInfo, bool triggerReader) {
508✔
748
  SStreamReaderDeployMsg* pMsg = &pDeploy->msg.reader;
508✔
749
  pMsg->triggerReader = triggerReader;
508✔
750
  
751
  if (triggerReader) {
508✔
752
    SStreamReaderDeployFromTrigger* pTrigger = &pMsg->msg.trigger;
373✔
753
    pTrigger->triggerTblName = pInfo->pCreate->triggerTblName;
373✔
754
    pTrigger->triggerTblUid = pInfo->pCreate->triggerTblUid;
373✔
755
    pTrigger->triggerTblType = pInfo->pCreate->triggerTblType;
373✔
756
    pTrigger->deleteReCalc = pInfo->pCreate->deleteReCalc;
373✔
757
    pTrigger->deleteOutTbl = pInfo->pCreate->deleteOutTbl;
373✔
758
    pTrigger->partitionCols = pInfo->pCreate->partitionCols;
373✔
759
    pTrigger->triggerCols = pInfo->pCreate->triggerCols;
373✔
760
    //pTrigger->triggerPrevFilter = pStream->pCreate->triggerPrevFilter;
761
    pTrigger->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
373✔
762
    pTrigger->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
373✔
763
  } else {
764
    SStreamReaderDeployFromCalc* pCalc = &pMsg->msg.calc;
135✔
765
    pCalc->execReplica = pInfo->runnerDeploys * pInfo->runnerReplica;
135✔
766
    pCalc->calcScanPlan = calcScanPlan;
135✔
767
  }
768

769
  return TSDB_CODE_SUCCESS;
508✔
770
}
771

772
int32_t msmBuildTriggerRunnerTargets(SMnode* pMnode, SStmStatus* pInfo, int64_t streamId, SArray** ppRes) {
263✔
773
  int32_t code = TSDB_CODE_SUCCESS;
263✔
774
  int32_t lino = 0;
263✔
775

776
  if (pInfo->runnerDeploys > 0) {
263!
777
    *ppRes = taosArrayInit(pInfo->runnerDeploys, sizeof(SStreamRunnerTarget));
263✔
778
    TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
263!
779
  }
780
  
781
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,052✔
782
    SStmTaskStatus* pStatus = taosArrayGetLast(pInfo->runners[i]);
789✔
783
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
789!
784

785
    if (!STREAM_IS_TOP_RUNNER(pStatus->flags)) {
789!
786
      mstsError("the last runner task %" PRIx64 " SID:%" PRId64 " tidx:%d in deploy %d is not top runner", 
×
787
          pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.taskIdx, i);
788
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
789
    }
790
    
791
    SStreamRunnerTarget runner;
792
    runner.addr.taskId = pStatus->id.taskId;
789✔
793
    runner.addr.nodeId = pStatus->id.nodeId;
789✔
794
    runner.addr.epset = mndGetDnodeEpsetById(pMnode, pStatus->id.nodeId);
789✔
795
    runner.execReplica = pInfo->runnerReplica; 
789✔
796
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &runner), code, lino, _exit, terrno);
1,578!
797
    mstsDebug("the %dth runner target added to trigger's runnerList, TASK:%" PRIx64 , i, runner.addr.taskId);
789✔
798
  }
799

800
_exit:
263✔
801

802
  if (code) {
263!
803
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
804
  }
805
  
806
  return TSDB_CODE_SUCCESS;
263✔
807
}
808

809
int32_t msmBuildStreamSnodeInfo(SMnode* pMnode, SStreamObj* pStream, SStreamSnodeInfo* pInfo) {
×
810
  int64_t streamId = pStream->pCreate->streamId;
×
811
  int32_t leaderSnodeId = atomic_load_32(&pStream->mainSnodeId);
×
812
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, leaderSnodeId);
×
813
  if (NULL == pSnode) {
×
814
    mstsError("snode %d not longer exists, ignore build stream snode info", leaderSnodeId);
×
815
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
816
  }
817
  
818
  pInfo->leaderSnodeId = leaderSnodeId;
×
819
  pInfo->replicaSnodeId = pSnode->replicaId;
×
820

821
  mndReleaseSnode(pMnode, pSnode);
×
822

823
  pInfo->leaderEpSet = mndGetDnodeEpsetById(pMnode, pInfo->leaderSnodeId);
×
824
  if (GOT_SNODE(pInfo->replicaSnodeId)) {
×
825
    pInfo->replicaEpSet = mndGetDnodeEpsetById(pMnode, pInfo->replicaSnodeId);
×
826
  }
827

828
  return TSDB_CODE_SUCCESS;
×
829
}
830

831
int32_t msmBuildTriggerDeployInfo(SMnode* pMnode, SStmStatus* pInfo, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
264✔
832
  int32_t code = TSDB_CODE_SUCCESS;
264✔
833
  int32_t lino = 0;
264✔
834
  int64_t streamId = pStream->pCreate->streamId;
264✔
835
  SStreamTriggerDeployMsg* pMsg = &pDeploy->msg.trigger;
264✔
836
  
837
  pMsg->triggerType = pStream->pCreate->triggerType;
264✔
838
  pMsg->igDisorder = pStream->pCreate->igDisorder;
264✔
839
  pMsg->fillHistory = pStream->pCreate->fillHistory;
264✔
840
  pMsg->fillHistoryFirst = pStream->pCreate->fillHistoryFirst;
264✔
841
  pMsg->lowLatencyCalc = pStream->pCreate->lowLatencyCalc;
264✔
842
  pMsg->igNoDataTrigger = pStream->pCreate->igNoDataTrigger;
264✔
843
  pMsg->hasPartitionBy = (pStream->pCreate->partitionCols != NULL);
264✔
844
  pMsg->isTriggerTblVirt = STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags);
264✔
845
  pMsg->triggerHasPF = pStream->pCreate->triggerHasPF;
264✔
846

847
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
264✔
848
  pMsg->notifyEventTypes = pStream->pCreate->notifyEventTypes;
264✔
849
  pMsg->notifyErrorHandle = pStream->pCreate->notifyErrorHandle;
264✔
850
  pMsg->notifyHistory = pStream->pCreate->notifyHistory;
264✔
851

852
  pMsg->maxDelay = pStream->pCreate->maxDelay;
264✔
853
  pMsg->fillHistoryStartTime = pStream->pCreate->fillHistoryStartTime;
264✔
854
  pMsg->watermark = pStream->pCreate->watermark;
264✔
855
  pMsg->expiredTime = pStream->pCreate->expiredTime;
264✔
856
  pMsg->trigger = pInfo->pCreate->trigger;
264✔
857

858
  pMsg->eventTypes = pStream->pCreate->eventTypes;
264✔
859
  pMsg->placeHolderBitmap = pStream->pCreate->placeHolderBitmap;
264✔
860
  pMsg->calcTsSlotId = pStream->pCreate->calcTsSlotId;
264✔
861
  pMsg->triTsSlotId = pStream->pCreate->triTsSlotId;
264✔
862
  pMsg->triggerPrevFilter = pInfo->pCreate->triggerPrevFilter;
264✔
863
  if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
264✔
864
    pMsg->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
43✔
865
    pMsg->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
43✔
866
  }
867

868
  SStreamTaskAddr addr;
869
  int32_t triggerReaderNum = taosArrayGetSize(pInfo->trigReaders);
264✔
870
  if (triggerReaderNum > 0) {
264✔
871
    pMsg->readerList = taosArrayInit(triggerReaderNum, sizeof(SStreamTaskAddr));
263✔
872
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
263!
873
  }
874
  
875
  for (int32_t i = 0; i < triggerReaderNum; ++i) {
604✔
876
    SStmTaskStatus* pStatus = taosArrayGet(pInfo->trigReaders, i);
340✔
877
    addr.taskId = pStatus->id.taskId;
340✔
878
    addr.nodeId = pStatus->id.nodeId;
340✔
879
    addr.epset = mndGetVgroupEpsetById(pMnode, pStatus->id.nodeId);
340✔
880
    TSDB_CHECK_NULL(taosArrayPush(pMsg->readerList, &addr), code, lino, _exit, terrno);
680!
881
    mstsDebug("the %dth trigReader src added to trigger's readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
340✔
882
  }
883

884
  pMsg->leaderSnodeId = pStream->mainSnodeId;
264✔
885
  pMsg->streamName = pInfo->streamName;
264✔
886

887
  if (0 == pInfo->runnerNum) {
264✔
888
    mstsDebug("no runner task, skip set trigger's runner list, deployNum:%d", pInfo->runnerDeploys);
1!
889
    return code;
1✔
890
  }
891

892
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pInfo, streamId, &pMsg->runnerList));
263!
893

894
_exit:
263✔
895

896
  if (code) {
263!
897
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
898
  } else {
899
    mstsDebug("trigger deploy info built, readerNum:%d, runnerNum:%d", (int32_t)taosArrayGetSize(pMsg->readerList), (int32_t)taosArrayGetSize(pMsg->runnerList));
263✔
900
  }
901
  
902
  return TSDB_CODE_SUCCESS;
263✔
903
}
904

905

906
int32_t msmBuildRunnerDeployInfo(SStmTaskDeploy* pDeploy, SSubplan *plan, SStreamObj* pStream, SStmStatus* pInfo, bool topPlan) {
792✔
907
  int32_t code = TSDB_CODE_SUCCESS;
792✔
908
  int32_t lino = 0;
792✔
909
  int64_t streamId = pStream->pCreate->streamId;
792✔
910
  SStreamRunnerDeployMsg* pMsg = &pDeploy->msg.runner;
792✔
911
  //TAOS_CHECK_EXIT(qSubPlanToString(plan, &pMsg->pPlan, NULL));
912

913
  pMsg->execReplica = pInfo->runnerReplica;
792✔
914
  pMsg->streamName = pInfo->streamName;
792✔
915
  //TAOS_CHECK_EXIT(nodesCloneNode((SNode*)plan, (SNode**)&pMsg->pPlan));
916
  pMsg->pPlan = plan;
792✔
917
  pMsg->outDBFName = pInfo->pCreate->outDB;
792✔
918
  pMsg->outTblName = pInfo->pCreate->outTblName;
792✔
919
  pMsg->outTblType = pStream->pCreate->outTblType;
792✔
920
  pMsg->calcNotifyOnly = pStream->pCreate->calcNotifyOnly;
792✔
921
  pMsg->topPlan = topPlan;
792✔
922
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
792✔
923
  pMsg->notifyErrorHandle = pStream->pCreate->notifyErrorHandle;
792✔
924
  pMsg->outCols = pInfo->pCreate->outCols;
792✔
925
  pMsg->outTags = pInfo->pCreate->outTags;
792✔
926
  pMsg->outStbUid = pStream->pCreate->outStbUid;
792✔
927
  pMsg->outStbSversion = pStream->pCreate->outStbSversion;
792✔
928
  
929
  pMsg->subTblNameExpr = pInfo->pCreate->subTblNameExpr;
792✔
930
  pMsg->tagValueExpr = pInfo->pCreate->tagValueExpr;
792✔
931
  pMsg->forceOutCols = pInfo->pCreate->forceOutCols;
792✔
932

933
_exit:
792✔
934

935
  if (code) {
792!
936
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
937
  }
938
  
939
  return code;
792✔
940
}
941

942

943
static int32_t msmSTAddToVgroupMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, bool trigReader) {
547✔
944
  int32_t code = TSDB_CODE_SUCCESS;
547✔
945
  int32_t lino = 0;
547✔
946
  int32_t taskNum = pTask ? 1 : taosArrayGetSize(pTasks);
547✔
947
  
948
  for (int32_t i = 0; i < taskNum; ++i) {
1,027✔
949
    SStmTaskStatus* pStatus = pTask ? pTask : taosArrayGet(pTasks, i);
480✔
950
    TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pStatus, trigReader));
480!
951
  }
952
  
953
_exit:
547✔
954

955
  if (code) {
547!
956
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
957
  }
958
  
959
  return code;
547✔
960
}
961

962

963
static int32_t msmSTAddToSnodeMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, int32_t taskNum, int32_t deployId) {
1,056✔
964
  int32_t code = TSDB_CODE_SUCCESS;
1,056✔
965
  int32_t lino = 0;
1,056✔
966
  int32_t rtaskNum = (taskNum > 0) ? taskNum : taosArrayGetSize(pTasks);
1,056✔
967
  int32_t taskType = (deployId < 0) ? STREAM_TRIGGER_TASK : STREAM_RUNNER_TASK;
1,056✔
968
  
969
  for (int32_t i = 0; i < rtaskNum; ++i) {
2,112✔
970
    SStmTaskStatus* pStatus = (taskNum > 0) ? (pTask + i) : taosArrayGet(pTasks, i);
1,056✔
971
    TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pStatus, deployId));
1,056!
972
  }
973
  
974
_exit:
1,056✔
975

976
  if (code) {
1,056!
977
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
978
  }
979

980
  return code;
1,056✔
981
}
982

983
int64_t msmAssignTaskId(void) {
1,533✔
984
  return atomic_fetch_add_64(&mStreamMgmt.lastTaskId, 1);
1,533✔
985
}
986

987
int64_t msmAssignTaskSeriousId(void) {
1,533✔
988
  return taosGetTimestampNs();
1,533✔
989
}
990

991

992
int32_t msmIsSnodeAlive(SMnode* pMnode, int32_t snodeId, int64_t streamId, bool* alive) {
1,477✔
993
  int32_t code = TSDB_CODE_SUCCESS;
1,477✔
994
  int32_t lino = 0;
1,477✔
995
  bool     noExists = false;
1,477✔
996
  SStmSnodeStatus* pStatus = NULL;
1,477✔
997

998
  while (true) {
999
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
1,506✔
1000
    if (NULL == pStatus) {
1,506✔
1001
      if (noExists) {
29!
1002
        mstsError("snode %d not exists in snodeMap", snodeId);
×
1003
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1004
      }
1005

1006
      noExists = true;
29✔
1007
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
29!
1008
      
1009
      continue;
29✔
1010
    }
1011

1012
    *alive = (pStatus->runnerThreadNum >= 0);
1,477✔
1013
    break;
1,477✔
1014
  }
1015

1016
_exit:
1,477✔
1017

1018
  if (code) {
1,477!
1019
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1020
  }
1021

1022
  return code;
1,477✔
1023
}
1024

1025
int32_t msmRetrieveStaticSnodeId(SMnode* pMnode, SStreamObj* pStream) {
530✔
1026
  int32_t code = TSDB_CODE_SUCCESS;
530✔
1027
  int32_t lino = 0;
530✔
1028
  bool alive = false;
530✔
1029
  int32_t mainSnodeId = atomic_load_32(&pStream->mainSnodeId);
530✔
1030
  int32_t snodeId = mainSnodeId;
530✔
1031
  int64_t streamId = pStream->pCreate->streamId;
530✔
1032
  
1033
  while (true) {
1034
    TAOS_CHECK_EXIT(msmIsSnodeAlive(pMnode, snodeId, streamId, &alive));
530!
1035

1036
    if (alive) {
530!
1037
      return snodeId;
530✔
1038
    }
1039
    
1040
    if (snodeId == mainSnodeId) {
×
1041
      SSnodeObj* pSnode = mndAcquireSnode(pMnode, snodeId);
×
1042
      if (NULL == pSnode) {
×
1043
        stsWarn("snode %d not longer exists, ignore assign snode", snodeId);
×
1044
        return 0;
×
1045
      }
1046
      
1047
      if (pSnode->replicaId <= 0) {
×
1048
        mstsError("no available snode now, mainSnodeId:%d, replicaId:%d", mainSnodeId, pSnode->replicaId);
×
1049
        mndReleaseSnode(pMnode, pSnode);
×
1050
        return 0;
×
1051
      }
1052

1053
      snodeId = pSnode->replicaId;
×
1054
      mndReleaseSnode(pMnode, pSnode);
×
1055
      
1056
      continue;
×
1057
    }
1058

1059
    mstsError("no available snode now, mainSnodeId:%d, followerSnodeId:%d", mainSnodeId, snodeId);
×
1060
    return 0;
×
1061
  }
1062

1063
_exit:
×
1064

1065
  if (code) {
×
1066
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1067
  }
1068

1069
  return 0;
×
1070
}
1071

1072
int32_t msmAssignRandomSnodeId(SMnode* pMnode, int64_t streamId) {
810✔
1073
  int32_t code = TSDB_CODE_SUCCESS;
810✔
1074
  int32_t lino = 0;
810✔
1075
  int32_t snodeIdx = 0;
810✔
1076
  int32_t snodeId = 0;
810✔
1077
  void      *pIter = NULL;
810✔
1078
  SSnodeObj *pObj = NULL;
810✔
1079
  bool alive = false;
810✔
1080
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
810✔
1081
  if (snodeNum <= 0) {
810!
1082
    mstsInfo("no available snode now, num:%d", snodeNum);
×
1083
    goto _exit;
×
1084
  }
1085

1086
  int32_t snodeTarget = taosRand() % snodeNum;
810✔
1087

1088
  while (1) {
1089
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
947✔
1090
    if (pIter == NULL) {
947!
1091
      if (0 == snodeId) {
×
1092
        mstsError("no alive snode now, snodeNum:%d", snodeNum);
×
1093
        break;
×
1094
      }
1095
      
1096
      snodeId = 0;
×
1097
      continue;
×
1098
    }
1099

1100
    code = msmIsSnodeAlive(pMnode, pObj->id, streamId, &alive);
947✔
1101
    if (code) {
947!
1102
      sdbRelease(pMnode->pSdb, pObj);
×
1103
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1104
      pObj = NULL;
×
1105
      TAOS_CHECK_EXIT(code);
×
1106
    }
1107
    
1108
    if (!alive) {
947!
1109
      sdbRelease(pMnode->pSdb, pObj);
×
1110
      continue;
×
1111
    }
1112

1113
    snodeId = pObj->id;
947✔
1114
    if (snodeIdx == snodeTarget) {
947✔
1115
      sdbRelease(pMnode->pSdb, pObj);
810✔
1116
      sdbCancelFetch(pMnode->pSdb, pIter);
810✔
1117
      pObj = NULL;
810✔
1118
      goto _exit;
810✔
1119
    }
1120

1121
    sdbRelease(pMnode->pSdb, pObj);
137✔
1122
    snodeIdx++;
137✔
1123
  }
1124

1125
_exit:
810✔
1126

1127
  if (code) {
810!
1128
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1129
  }
1130

1131
  if (0 == snodeId) {
810!
1132
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
×
1133
  }
1134

1135
  return snodeId;
810✔
1136
}
1137

1138
int32_t msmAssignTaskSnodeId(SMnode* pMnode, SStreamObj* pStream, bool isStatic) {
1,056✔
1139
  int64_t streamId = pStream->pCreate->streamId;
1,056✔
1140
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,056✔
1141
  int32_t snodeId = 0;
1,056✔
1142
  if (snodeNum <= 0) {
1,056!
1143
    mstsInfo("no available snode now, num:%d", snodeNum);
×
1144
    goto _exit;
×
1145
  }
1146

1147
  snodeId = isStatic ? msmRetrieveStaticSnodeId(pMnode, pStream) : msmAssignRandomSnodeId(pMnode, streamId);
1,056✔
1148

1149
_exit:
1,056✔
1150

1151
  if (0 == snodeId) {
1,056!
1152
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
×
1153
  }
1154

1155
  return snodeId;
1,056✔
1156
}
1157

1158

1159
static int32_t msmBuildTriggerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
264✔
1160
  int32_t code = TSDB_CODE_SUCCESS;
264✔
1161
  int32_t lino = 0;
264✔
1162
  int64_t streamId = pStream->pCreate->streamId;
264✔
1163

1164
  pInfo->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskStatus));
264!
1165
  TSDB_CHECK_NULL(pInfo->triggerTask, code, lino, _exit, terrno);
264!
1166

1167
  pInfo->triggerTask->id.taskId = pCtx->triggerTaskId;
264✔
1168
  pInfo->triggerTask->id.deployId = 0;
264✔
1169
  pInfo->triggerTask->id.seriousId = msmAssignTaskSeriousId();
264✔
1170
  pInfo->triggerTask->id.nodeId = pCtx->triggerNodeId;
264✔
1171
  pInfo->triggerTask->id.taskIdx = 0;
264✔
1172
  pInfo->triggerTask->type = STREAM_TRIGGER_TASK;
264✔
1173
  pInfo->triggerTask->lastUpTs = pCtx->currTs;
264✔
1174
  pInfo->triggerTask->pStream = pInfo;
264✔
1175

1176
  SStmTaskDeploy info = {0};
264✔
1177
  info.task.type = pInfo->triggerTask->type;
264✔
1178
  info.task.streamId = streamId;
264✔
1179
  info.task.taskId =  pInfo->triggerTask->id.taskId;
264✔
1180
  info.task.seriousId = pInfo->triggerTask->id.seriousId;
264✔
1181
  info.task.nodeId =  pInfo->triggerTask->id.nodeId;
264✔
1182
  info.task.taskIdx =  pInfo->triggerTask->id.taskIdx;
264✔
1183
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pInfo, &info, pStream));
264!
1184
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
264!
1185
  
1186
  atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
264✔
1187

1188
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pInfo->triggerTask));
264!
1189
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pInfo->triggerTask, 1, -1));
264!
1190

1191
_exit:
264✔
1192

1193
  if (code) {
264!
1194
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1195
  }
1196

1197
  return code;
264✔
1198
}
1199

1200
static int32_t msmTDAddSingleTrigReader(SStmGrpCtx* pCtx, SStmTaskStatus* pState, int32_t nodeId, SStmStatus* pInfo, SStreamObj* pStream, int64_t streamId) {
359✔
1201
  int32_t code = TSDB_CODE_SUCCESS;
359✔
1202
  int32_t lino = 0;
359✔
1203

1204
  pState->id.taskId = msmAssignTaskId();
359✔
1205
  pState->id.deployId = 0;
359✔
1206
  pState->id.seriousId = msmAssignTaskSeriousId();
359✔
1207
  pState->id.nodeId = nodeId;
359✔
1208
  pState->id.taskIdx = 0;
359✔
1209
  pState->type = STREAM_READER_TASK;
359✔
1210
  pState->flags = STREAM_FLAG_TRIGGER_READER;
359✔
1211
  pState->status = STREAM_STATUS_UNDEPLOYED;
359✔
1212
  pState->lastUpTs = pCtx->currTs;
359✔
1213
  pState->pStream = pInfo;
359✔
1214
  
1215
  SStmTaskDeploy info = {0};
359✔
1216
  info.task.type = pState->type;
359✔
1217
  info.task.streamId = streamId;
359✔
1218
  info.task.taskId = pState->id.taskId;
359✔
1219
  info.task.flags = pState->flags;
359✔
1220
  info.task.seriousId = pState->id.seriousId;
359✔
1221
  info.task.nodeId = pState->id.nodeId;
359✔
1222
  info.task.taskIdx = pState->id.taskIdx;
359✔
1223
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, NULL, pInfo, true));
359!
1224
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
359!
1225

1226
_exit:
359✔
1227

1228
  if (code) {
359!
1229
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1230
  }
1231

1232
  return code;
359✔
1233
}
1234

1235
static int32_t msmTDAddTrigReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
264✔
1236
  int32_t code = TSDB_CODE_SUCCESS;
264✔
1237
  int32_t lino = 0;
264✔
1238
  int64_t streamId = pStream->pCreate->streamId;
264✔
1239
  SSdb   *pSdb = pCtx->pMnode->pSdb;
264✔
1240
  SStmTaskStatus* pState = NULL;
264✔
1241
  SVgObj *pVgroup = NULL;
264✔
1242
  SDbObj* pDb = NULL;
264✔
1243
  
1244
  switch (pStream->pCreate->triggerTblType) {
264✔
1245
    case TSDB_NORMAL_TABLE:
87✔
1246
    case TSDB_CHILD_TABLE:
1247
    case TSDB_VIRTUAL_CHILD_TABLE:
1248
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1249
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
87✔
1250
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
87!
1251
      pState = taosArrayGet(pInfo->trigReaders, 0);
87✔
1252
      
1253
      TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, pStream->pCreate->triggerTblVgId, pInfo, pStream, streamId));
87!
1254
      break;
87✔
1255
    }
1256
    case TSDB_SUPER_TABLE: {
176✔
1257
      pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
176✔
1258
      if (NULL == pDb) {
176!
1259
        code = terrno;
×
1260
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
1261
        goto _exit;
×
1262
      }
1263

1264
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
176✔
1265
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
176!
1266
      
1267
      void *pIter = NULL;
176✔
1268
      while (1) {
730✔
1269
        SStmTaskDeploy info = {0};
906✔
1270
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
906✔
1271
        if (pIter == NULL) {
906✔
1272
          break;
176✔
1273
        }
1274
      
1275
        if (pVgroup->dbUid == pDb->uid && !pVgroup->isTsma) {
730!
1276
          pState = taosArrayReserve(pInfo->trigReaders, 1);
253✔
1277

1278
          code = msmTDAddSingleTrigReader(pCtx, pState, pVgroup->vgId, pInfo, pStream, streamId);
253✔
1279
          if (code) {
253!
1280
            sdbRelease(pSdb, pVgroup);
×
1281
            sdbCancelFetch(pSdb, pIter);
×
1282
            pVgroup = NULL;
×
1283
            TAOS_CHECK_EXIT(code);
×
1284
          }
1285
        }
1286

1287
        sdbRelease(pSdb, pVgroup);
730✔
1288
      }
1289
      break;
176✔
1290
    }
1291
    default:
1✔
1292
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
1!
1293
      break;
1✔
1294
  }
1295

1296
_exit:
264✔
1297

1298
  mndReleaseDb(pCtx->pMnode, pDb);
264✔
1299

1300
  if (code) {
264!
1301
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1302
  }
1303

1304
  return code;
264✔
1305
}
1306

1307
int32_t msmUPAddScanTask(SStmGrpCtx* pCtx, SStreamObj* pStream, char* scanPlan, int32_t vgId, int64_t taskId) {
124✔
1308
  int32_t code = TSDB_CODE_SUCCESS;
124✔
1309
  int32_t lino = 0;
124✔
1310
  SSubplan* pSubplan = NULL;
124✔
1311
  int64_t streamId = pStream->pCreate->streamId;
124✔
1312
  int64_t key[2] = {streamId, 0};
124✔
1313
  SStmTaskSrcAddr addr;
1314
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
124!
1315
  addr.isFromCache = false;
124✔
1316
  
1317
  if (MNODE_HANDLE == vgId) {
124!
1318
    mndGetMnodeEpSet(pCtx->pMnode, &addr.epset);
×
1319
  } else if (vgId > MNODE_HANDLE) {
124!
1320
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, vgId);
124✔
1321
  } else {
1322
    mstsError("invalid vgId %d in scanPlan", vgId);
×
1323
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1324
  }
1325
  
1326
  addr.taskId = taskId;
124✔
1327
  addr.vgId = vgId;
124✔
1328
  addr.groupId = pSubplan->id.groupId;
124✔
1329

1330
  key[1] = pSubplan->id.subplanId;
124✔
1331

1332
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
124✔
1333
  if (NULL == ppRes) {
124!
1334
    SArray* pRes = taosArrayInit(1, sizeof(addr));
124✔
1335
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
124!
1336
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
248!
1337
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
124!
1338
  } else {
1339
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1340
  }
1341

1342
  mstsDebug("calcReader %" PRIx64 " added to toUpdateScan, vgId:%d, groupId:%d, subplanId:%d", taskId, vgId, pSubplan->id.groupId, pSubplan->id.subplanId);
124✔
1343
  
1344
  atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
124✔
1345
  
1346
_exit:
124✔
1347

1348
  nodesDestroyNode((SNode*)pSubplan);
124✔
1349

1350
  if (code) {
124!
1351
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1352
  }
1353

1354
  return code;
124✔
1355
}
1356

1357
int32_t msmUPAddCacheTask(SStmGrpCtx* pCtx, SStreamCalcScan* pScan, SStreamObj* pStream) {
142✔
1358
  int32_t code = TSDB_CODE_SUCCESS;
142✔
1359
  int32_t lino = 0;
142✔
1360
  SSubplan* pSubplan = NULL;
142✔
1361
  int64_t streamId = pStream->pCreate->streamId;
142✔
1362
  int64_t key[2] = {streamId, 0};
142✔
1363
  TAOS_CHECK_EXIT(nodesStringToNode(pScan->scanPlan, (SNode**)&pSubplan));
142!
1364

1365
  SStmTaskSrcAddr addr;
1366
  addr.isFromCache = true;
142✔
1367
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pCtx->triggerNodeId);
142✔
1368
  addr.taskId = pCtx->triggerTaskId;
142✔
1369
  addr.vgId = pCtx->triggerNodeId;
142✔
1370
  addr.groupId = pSubplan->id.groupId;
142✔
1371

1372
  key[1] = pSubplan->id.subplanId;
142✔
1373
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
142✔
1374
  if (NULL == ppRes) {
142!
1375
    SArray* pRes = taosArrayInit(1, sizeof(addr));
142✔
1376
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
142!
1377
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
284!
1378
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
142!
1379
  } else {
1380
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1381
  }
1382
  
1383
  atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
142✔
1384
  
1385
_exit:
142✔
1386

1387
  nodesDestroyNode((SNode*)pSubplan);
142✔
1388
  
1389
  if (code) {
142!
1390
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1391
  }
1392

1393
  return code;
142✔
1394
}
1395

1396

1397
static int32_t msmTDAddCalcReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
264✔
1398
  int32_t code = TSDB_CODE_SUCCESS;
264✔
1399
  int32_t lino = 0;
264✔
1400
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
264✔
1401
  int64_t streamId = pStream->pCreate->streamId;
264✔
1402
  SStmTaskStatus* pState = NULL;
264✔
1403
  pInfo->calcReaders = taosArrayInit(calcTasksNum, sizeof(SStmTaskStatus));
264✔
1404
  TSDB_CHECK_NULL(pInfo->calcReaders, code, lino, _exit, terrno);
264!
1405
  
1406
  for (int32_t i = 0; i < calcTasksNum; ++i) {
527✔
1407
    SStreamCalcScan* pScan = taosArrayGet(pInfo->pCreate->calcScanPlanList, i);
263✔
1408
    if (pScan->readFromCache) {
263✔
1409
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
142!
1410
      continue;
142✔
1411
    }
1412
    
1413
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
121✔
1414
    for (int32_t m = 0; m < vgNum; ++m) {
242✔
1415
      pState = taosArrayReserve(pInfo->calcReaders, 1);
121✔
1416

1417
      pState->id.taskId = msmAssignTaskId();
121✔
1418
      pState->id.deployId = 0;
121✔
1419
      pState->id.seriousId = msmAssignTaskSeriousId();
121✔
1420
      pState->id.nodeId = *(int32_t*)taosArrayGet(pScan->vgList, m);
121✔
1421
      pState->id.taskIdx = i;
121✔
1422
      pState->type = STREAM_READER_TASK;
121✔
1423
      pState->flags = 0;
121✔
1424
      pState->status = STREAM_STATUS_UNDEPLOYED;
121✔
1425
      pState->lastUpTs = pCtx->currTs;
121✔
1426
      pState->pStream = pInfo;
121✔
1427

1428
      SStmTaskDeploy info = {0};
121✔
1429
      info.task.type = pState->type;
121✔
1430
      info.task.streamId = streamId;
121✔
1431
      info.task.taskId = pState->id.taskId;
121✔
1432
      info.task.flags = pState->flags;
121✔
1433
      info.task.seriousId = pState->id.seriousId;
121✔
1434
      info.task.nodeId = pState->id.nodeId;
121✔
1435
      info.task.taskIdx = pState->id.taskIdx;
121✔
1436
      TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, pScan->scanPlan, pInfo, false));
121!
1437
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pState->id.nodeId, pState->id.taskId));
121!
1438
      TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
121!
1439
    }
1440
  }
1441

1442
_exit:
264✔
1443

1444
  if (code) {
264!
1445
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1446
  }
1447

1448
  return code;
264✔
1449
}
1450

1451

1452

1453
static int32_t msmUPPrepareReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
3✔
1454
  int32_t code = TSDB_CODE_SUCCESS;
3✔
1455
  int32_t lino = 0;
3✔
1456
  int64_t streamId = pStream->pCreate->streamId;
3✔
1457
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
3✔
1458
  if (calcTasksNum <= 0) {
3!
1459
    mstsDebug("no calc scan plan, ignore parepare reader tasks, readerNum:%d", (int32_t)taosArrayGetSize(pInfo->calcReaders));
×
1460
    return code;    
×
1461
  }
1462
  
1463
  SStmTaskStatus* pReader = taosArrayGet(pInfo->calcReaders, 0);
3✔
1464
  
1465
  for (int32_t i = 0; i < calcTasksNum; ++i) {
6✔
1466
    SStreamCalcScan* pScan = taosArrayGet(pStream->pCreate->calcScanPlanList, i);
3✔
1467
    if (pScan->readFromCache) {
3!
1468
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
×
1469
      continue;
×
1470
    }
1471
    
1472
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
3✔
1473
    for (int32_t m = 0; m < vgNum; ++m) {
6✔
1474
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pReader->id.nodeId, pReader->id.taskId));
3!
1475
      pReader++;
3✔
1476
    }
1477
  }
1478

1479
_exit:
3✔
1480

1481
  if (code) {
3!
1482
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1483
  }
1484

1485
  return code;
3✔
1486
}
1487

1488
static int32_t msmBuildReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
264✔
1489
  int32_t code = TSDB_CODE_SUCCESS;
264✔
1490
  int32_t lino = 0;
264✔
1491
  int64_t streamId = pStream->pCreate->streamId;
264✔
1492
  
1493
  TAOS_CHECK_EXIT(msmTDAddTrigReaderTasks(pCtx, pInfo, pStream));
264!
1494
  TAOS_CHECK_EXIT(msmTDAddCalcReaderTasks(pCtx, pInfo, pStream));
264!
1495

1496
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->trigReaders, NULL));
264!
1497
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->calcReaders, NULL));
264!
1498
  
1499
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->trigReaders, NULL, true));
264!
1500
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->calcReaders, NULL, false));
264!
1501
  
1502
_exit:
264✔
1503

1504
  if (code) {
264!
1505
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1506
  }
1507
  
1508
  return code;
264✔
1509
}
1510

1511
int32_t msmUpdatePlanSourceAddr(SStreamTask* pTask, int64_t streamId, SSubplan* plan, int64_t clientId, SStmTaskSrcAddr* pSrc, int32_t msgType, int64_t srcSubplanId) {
792✔
1512
  SDownstreamSourceNode source = {
792✔
1513
      .type = QUERY_NODE_DOWNSTREAM_SOURCE,
1514
      .clientId = clientId,
1515
      .taskId = pSrc->taskId,
792✔
1516
      .sId = 0,
1517
      .execId = 0,
1518
      .fetchMsgType = msgType,
1519
      .localExec = false,
1520
  };
1521

1522
  source.addr.epSet = pSrc->epset;
792✔
1523
  source.addr.nodeId = pSrc->vgId;
792✔
1524

1525
  msttDebug("try to update subplan %d grp %d sourceAddr from subplan %" PRId64 ", clientId:%" PRIx64 ", srcTaskId:%" PRIx64 ", srcNodeId:%d, msgType:%s", 
792!
1526
      plan->id.subplanId, pSrc->groupId, srcSubplanId, source.clientId, source.taskId, source.addr.nodeId, TMSG_INFO(source.fetchMsgType));
1527
  
1528
  return qSetSubplanExecutionNode(plan, pSrc->groupId, &source);
792✔
1529
}
1530

1531
int32_t msmGetTaskIdFromSubplanId(SStreamObj* pStream, SArray* pRunners, int32_t beginIdx, int32_t subplanId, int64_t* taskId, SStreamTask** ppParent) {
×
1532
  int64_t streamId = pStream->pCreate->streamId;
×
1533
  int32_t runnerNum = taosArrayGetSize(pRunners);
×
1534
  for (int32_t i = beginIdx; i < runnerNum; ++i) {
×
1535
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
×
1536
    SSubplan* pPlan = pDeploy->msg.runner.pPlan;
×
1537
    if (pPlan->id.subplanId == subplanId) {
×
1538
      *taskId = pDeploy->task.taskId;
×
1539
      *ppParent = &pDeploy->task;
×
1540
      return TSDB_CODE_SUCCESS;
×
1541
    }
1542
  }
1543

1544
  mstsError("subplanId %d not found in runner list", subplanId);
×
1545

1546
  return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
1547
}
1548

1549
int32_t msmUpdateLowestPlanSourceAddr(SSubplan* pPlan, SStmTaskDeploy* pDeploy, int64_t streamId) {
792✔
1550
  int32_t code = TSDB_CODE_SUCCESS;
792✔
1551
  int32_t lino = 0;
792✔
1552
  int64_t key[2] = {streamId, -1};
792✔
1553
  SNode* pNode = NULL;
792✔
1554
  SStreamTask* pTask = &pDeploy->task;
792✔
1555
  FOREACH(pNode, pPlan->pChildren) {
1,584!
1556
    if (QUERY_NODE_VALUE != nodeType(pNode)) {
792!
1557
      msttDebug("node type %d is not valueNode, skip it", nodeType(pNode));
×
1558
      continue;
×
1559
    }
1560
    
1561
    SValueNode* pVal = (SValueNode*)pNode;
792✔
1562
    if (TSDB_DATA_TYPE_BIGINT != pVal->node.resType.type) {
792!
1563
      msttWarn("invalid value node data type %d for runner's child subplan", pVal->node.resType.type);
×
1564
      continue;
×
1565
    }
1566

1567
    key[1] = MND_GET_RUNNER_SUBPLANID(pVal->datum.i);
792✔
1568

1569
    SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
792✔
1570
    if (NULL == ppRes) {
792!
1571
      msttError("lowest runner subplan ID:%d,%d can't get its child ID:%" PRId64 " addr", pPlan->id.groupId, pPlan->id.subplanId, key[1]);
×
1572
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1573
    }
1574

1575
    int32_t childrenNum = taosArrayGetSize(*ppRes);
792✔
1576
    for (int32_t i = 0; i < childrenNum; ++i) {
1,584✔
1577
      SStmTaskSrcAddr* pAddr = taosArrayGet(*ppRes, i);
792✔
1578
      TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(pTask, streamId, pPlan, pDeploy->task.taskId, pAddr, pAddr->isFromCache ? TDMT_STREAM_FETCH_FROM_CACHE : TDMT_STREAM_FETCH, key[1]));
792!
1579
    }
1580
  }
1581

1582
_exit:
792✔
1583

1584
  if (code) {
792!
1585
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1586
  }
1587

1588
  return code;
792✔
1589
}
1590

1591
int32_t msmUpdateRunnerPlan(SStmGrpCtx* pCtx, SArray* pRunners, int32_t beginIdx, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
792✔
1592
  int32_t code = TSDB_CODE_SUCCESS;
792✔
1593
  int32_t lino = 0;
792✔
1594
  SSubplan* pPlan = pDeploy->msg.runner.pPlan;
792✔
1595
  SStreamTask* pTask = &pDeploy->task;
792✔
1596
  SStreamTask* parentTask = NULL;
792✔
1597
  int64_t streamId = pStream->pCreate->streamId;
792✔
1598

1599
  TAOS_CHECK_EXIT(msmUpdateLowestPlanSourceAddr(pPlan, pDeploy, streamId));
792!
1600

1601
  SNode* pTmp = NULL;
792✔
1602
  WHERE_EACH(pTmp, pPlan->pChildren) {
1,584!
1603
    if (QUERY_NODE_VALUE == nodeType(pTmp)) {
792!
1604
      ERASE_NODE(pPlan->pChildren);
792✔
1605
      continue;
792✔
1606
    }
1607
    WHERE_NEXT;
×
1608
  }
1609
  nodesClearList(pPlan->pChildren);
792✔
1610
  pPlan->pChildren = NULL;
792✔
1611

1612
  if (NULL == pPlan->pParents) {
792!
1613
    goto _exit;
792✔
1614
  }
1615

1616
  SNode* pNode = NULL;
×
1617
  int64_t parentTaskId = 0;
×
1618
  SStmTaskSrcAddr addr = {0};
×
1619
  addr.taskId = pDeploy->task.taskId;
×
1620
  addr.vgId = pDeploy->task.nodeId;
×
1621
  addr.groupId = pPlan->id.groupId;
×
1622
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pDeploy->task.nodeId);
×
1623
  FOREACH(pNode, pPlan->pParents) {
×
1624
    SSubplan* pSubplan = (SSubplan*)pNode;
×
1625
    TAOS_CHECK_EXIT(msmGetTaskIdFromSubplanId(pStream, pRunners, beginIdx, pSubplan->id.subplanId, &parentTaskId, &parentTask));
×
1626
    TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(parentTask, streamId, pSubplan, parentTaskId, &addr, TDMT_STREAM_FETCH_FROM_RUNNER, pPlan->id.subplanId));
×
1627
  }
1628
  
1629
_exit:
×
1630

1631
  if (code) {
792!
1632
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1633
  }
1634

1635
  return code;
792✔
1636
}
1637

1638
int32_t msmUpdateRunnerPlans(SStmGrpCtx* pCtx, SArray* pRunners, SStreamObj* pStream) {
792✔
1639
  int32_t code = TSDB_CODE_SUCCESS;
792✔
1640
  int32_t lino = 0;
792✔
1641
  int64_t streamId = pStream->pCreate->streamId;
792✔
1642
  int32_t runnerNum = taosArrayGetSize(pRunners);
792✔
1643
  
1644
  for (int32_t i = 0; i < runnerNum; ++i) {
1,584✔
1645
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
792✔
1646
    TAOS_CHECK_EXIT(msmUpdateRunnerPlan(pCtx, pRunners, i, pDeploy, pStream));
792!
1647
    TAOS_CHECK_EXIT(nodesNodeToString((SNode*)pDeploy->msg.runner.pPlan, false, (char**)&pDeploy->msg.runner.pPlan, NULL));
792!
1648

1649
    SStreamTask* pTask = &pDeploy->task;
792✔
1650
    msttDebugL("runner updated task plan:%s", (const char*)pDeploy->msg.runner.pPlan);
792✔
1651
  }
1652

1653
_exit:
792✔
1654

1655
  if (code) {
792!
1656
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1657
  }
1658

1659
  return code;
792✔
1660
}
1661

1662
int32_t msmBuildRunnerTasksImpl(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream) {
263✔
1663
  int32_t code = 0;
263✔
1664
  int32_t lino = 0;
263✔
1665
  int64_t streamId = pStream->pCreate->streamId;
263✔
1666
  SArray* deployTaskList = NULL;
263✔
1667
  SArray* deployList = NULL;
263✔
1668
  int32_t deployNodeId = 0;
263✔
1669
  SStmTaskStatus* pState = NULL;
263✔
1670
  int32_t taskIdx = 0;
263✔
1671
  SNodeListNode *plans = NULL;
263✔
1672
  int32_t        taskNum = 0;
263✔
1673
  int32_t        totalTaskNum = 0;
263✔
1674

1675
  if (pDag->numOfSubplans <= 0) {
263!
1676
    mstsError("invalid subplan num:%d", pDag->numOfSubplans);
×
1677
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1678
  }
1679

1680
  if (pDag->numOfSubplans != pStream->pCreate->numOfCalcSubplan) {
263!
1681
    mstsError("numOfCalcSubplan %d mismatch with numOfSubplans %d", pStream->pCreate->numOfCalcSubplan, pDag->numOfSubplans);
×
1682
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1683
  }
1684

1685
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
263!
1686
  if (levelNum <= 0) {
263!
1687
    mstsError("invalid level num:%d", levelNum);
×
1688
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1689
  }
1690

1691
  int32_t        lowestLevelIdx = levelNum - 1;
263✔
1692
  
1693
  plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, 0);
263✔
1694
  if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
263!
1695
    mstsError("invalid level plan, level:0, planNodeType:%d", nodeType(plans));
×
1696
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1697
  }
1698
  
1699
  taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
263!
1700
  if (taskNum != 1) {
263!
1701
    mstsError("invalid level plan number:%d, level:0", taskNum);
×
1702
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1703
  }
1704

1705
  deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
263✔
1706
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
263!
1707
  
1708
  for (int32_t deployId = 0; deployId < pInfo->runnerDeploys; ++deployId) {
1,052✔
1709
    totalTaskNum = 0;
789✔
1710

1711
    deployList = pInfo->runners[deployId];
789✔
1712
    deployNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == deployId) ? true : false);
789✔
1713
    if (!GOT_SNODE(deployNodeId)) {
789!
1714
      TAOS_CHECK_EXIT(terrno);
×
1715
    }
1716

1717
    taskIdx = 0;
789✔
1718
    
1719
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
1,578✔
1720
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
789✔
1721
      if (NULL == plans) {
789!
1722
        mstsError("empty level plan, level:%d", i);
×
1723
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1724
      }
1725

1726
      if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
789!
1727
        mstsError("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
×
1728
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1729
      }
1730

1731
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
789!
1732
      if (taskNum <= 0) {
789!
1733
        mstsError("invalid level plan number:%d, level:%d", taskNum, i);
×
1734
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1735
      }
1736

1737
      totalTaskNum += taskNum;
789✔
1738
      if (totalTaskNum > pDag->numOfSubplans) {
789!
1739
        mstsError("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
×
1740
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1741
      }
1742

1743
      for (int32_t n = 0; n < taskNum; ++n) {
1,578✔
1744
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
789✔
1745
        pState = taosArrayReserve(deployList, 1);
789✔
1746

1747
        pState->id.taskId = msmAssignTaskId();
789✔
1748
        pState->id.deployId = deployId;
789✔
1749
        pState->id.seriousId = msmAssignTaskSeriousId();
789✔
1750
        pState->id.nodeId = deployNodeId;
789✔
1751
        pState->id.taskIdx = MND_SET_RUNNER_TASKIDX(i, n);
789✔
1752
        pState->type = STREAM_RUNNER_TASK;
789✔
1753
        pState->flags = (0 == i) ? STREAM_FLAG_TOP_RUNNER : 0;
789!
1754
        pState->status = STREAM_STATUS_UNDEPLOYED;
789✔
1755
        pState->lastUpTs = pCtx->currTs;
789✔
1756
        pState->pStream = pInfo;
789✔
1757

1758
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
789✔
1759
        pDeploy->task.type = pState->type;
789✔
1760
        pDeploy->task.streamId = streamId;
789✔
1761
        pDeploy->task.taskId = pState->id.taskId;
789✔
1762
        pDeploy->task.flags = pState->flags;
789✔
1763
        pDeploy->task.seriousId = pState->id.seriousId;
789✔
1764
        pDeploy->task.deployId = pState->id.deployId;
789✔
1765
        pDeploy->task.nodeId = pState->id.nodeId;
789✔
1766
        pDeploy->task.taskIdx = pState->id.taskIdx;
789✔
1767
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
789!
1768

1769
        SStreamTask* pTask = &pDeploy->task;
789✔
1770
        msttDebug("runner task deploy built, subplan level:%d, taskIdx:%d, groupId:%d, subplanId:%d",
789✔
1771
            i, pTask->taskIdx, plan->id.groupId, plan->id.subplanId);
1772
      }
1773

1774
      mstsDebug("deploy %d level %d initialized, taskNum:%d", deployId, i, taskNum);
789✔
1775
    }
1776

1777
    if (totalTaskNum != pDag->numOfSubplans) {
789!
1778
      mstsError("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
×
1779
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1780
    }
1781

1782
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
789!
1783

1784
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
789!
1785

1786
    nodesDestroyNode((SNode *)pDag);
789✔
1787
    pDag = NULL;
789✔
1788
    
1789
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
789!
1790

1791
    mstsDebug("total %d runner tasks added for deploy %d", totalTaskNum, deployId);
789✔
1792
  }
1793

1794
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,052✔
1795
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->runners[i], NULL));
789!
1796
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[i], NULL, 0, i));
789!
1797
  }
1798
  
1799
  pInfo->runnerNum = totalTaskNum;
263✔
1800
  
1801
_exit:
263✔
1802

1803
  if (code) {
263!
1804
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1805
  }
1806

1807
  taosArrayDestroy(deployTaskList);
263✔
1808
  nodesDestroyNode((SNode *)pDag);
263✔
1809

1810
  return code;
263✔
1811
}
1812

1813
int32_t msmReBuildRunnerTasks(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream, SStmTaskAction* pAction) {
3✔
1814
  int32_t code = 0;
3✔
1815
  int32_t lino = 0;
3✔
1816
  int64_t streamId = pStream->pCreate->streamId;
3✔
1817
  int32_t newNodeId = 0;
3✔
1818
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
3!
1819
  int32_t        lowestLevelIdx = levelNum - 1;
3✔
1820
  SNodeListNode *plans = NULL;
3✔
1821
  int32_t        taskNum = 0;
3✔
1822
  int32_t        totalTaskNum = 0;
3✔
1823
  int32_t        deployId = 0;
3✔
1824
  SStmTaskStatus* pRunner = NULL;
3✔
1825
  SStmTaskStatus* pStartRunner = NULL;
3✔
1826
  int32_t taskIdx = 0;
3✔
1827
  SArray* deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
3✔
1828
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
3!
1829

1830
  for (int32_t r = 0; r < pAction->deployNum; ++r) {
6✔
1831
    deployId = pAction->deployId[r];
3✔
1832

1833
    pRunner = taosArrayGet(pInfo->runners[deployId], 0);
3✔
1834

1835
    pStartRunner = pRunner;
3✔
1836
    totalTaskNum = 0;
3✔
1837

1838
    newNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == r) ? true : false);
3✔
1839
    if (!GOT_SNODE(newNodeId)) {
3!
1840
      TAOS_CHECK_EXIT(terrno);
×
1841
    }
1842

1843
    taskIdx = 0;
3✔
1844
    
1845
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
6✔
1846
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
3✔
1847
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
3!
1848
      totalTaskNum += taskNum;
3✔
1849

1850
      pRunner->flags &= STREAM_FLAG_REDEPLOY_RUNNER;
3✔
1851
      
1852
      for (int32_t n = 0; n < taskNum; ++n) {
6✔
1853
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
3✔
1854

1855
        int32_t newTaskIdx = MND_SET_RUNNER_TASKIDX(i, n);
3✔
1856
        if (pRunner->id.taskIdx != newTaskIdx) {
3!
1857
          mstsError("runner TASK:%" PRId64 " taskIdx %d mismatch with newTaskIdx:%d", pRunner->id.taskId, pRunner->id.taskIdx, newTaskIdx);
×
1858
          TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
1859
        }
1860

1861
        pRunner->id.nodeId = newNodeId;
3✔
1862

1863
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
3✔
1864
        pDeploy->task.type = pRunner->type;
3✔
1865
        pDeploy->task.streamId = streamId;
3✔
1866
        pDeploy->task.taskId = pRunner->id.taskId;
3✔
1867
        pDeploy->task.flags = pRunner->flags;
3✔
1868
        pDeploy->task.seriousId = pRunner->id.seriousId;
3✔
1869
        pDeploy->task.nodeId = pRunner->id.nodeId;
3✔
1870
        pDeploy->task.taskIdx = pRunner->id.taskIdx;
3✔
1871
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
3!
1872

1873
        pRunner++;
3✔
1874
      }
1875

1876
      mstsDebug("level %d initialized, taskNum:%d", i, taskNum);
3!
1877
    }
1878

1879
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
3!
1880

1881
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
3!
1882

1883
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[deployId], NULL, 0, deployId));
3!
1884

1885
    nodesDestroyNode((SNode *)pDag);
3✔
1886
    pDag = NULL;
3✔
1887

1888
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
3!
1889
  }
1890

1891
_exit:
3✔
1892

1893
  if (code) {
3!
1894
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1895
  }
1896

1897
  nodesDestroyNode((SNode *)pDag);
3✔
1898
  taosArrayDestroy(deployTaskList);
3✔
1899

1900
  return code;
3✔
1901
}
1902

1903

1904
int32_t msmSetStreamRunnerExecReplica(int64_t streamId, SStmStatus* pInfo) {
237✔
1905
  int32_t code = TSDB_CODE_SUCCESS;
237✔
1906
  int32_t lino = 0;
237✔
1907
  //STREAMTODO 
1908
  
1909
  pInfo->runnerDeploys = MND_STREAM_RUNNER_DEPLOY_NUM;
237✔
1910
  pInfo->runnerReplica = 3;
237✔
1911

1912
_exit:
237✔
1913

1914
  if (code) {
237!
1915
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1916
  }
1917

1918
  return code;
237✔
1919
}
1920

1921

1922
static int32_t msmBuildRunnerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
264✔
1923
  if (NULL == pStream->pCreate->calcPlan) {
264✔
1924
    return TSDB_CODE_SUCCESS;
1✔
1925
  }
1926
  
1927
  int32_t code = TSDB_CODE_SUCCESS;
263✔
1928
  int32_t lino = 0;
263✔
1929
  int64_t streamId = pStream->pCreate->streamId;
263✔
1930
  SQueryPlan* pPlan = NULL;
263✔
1931

1932
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
263!
1933

1934
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
1,052✔
1935
    pInfo->runners[i] = taosArrayInit(pPlan->numOfSubplans, sizeof(SStmTaskStatus));
789✔
1936
    TSDB_CHECK_NULL(pInfo->runners[i], code, lino, _exit, terrno);
789!
1937
  }
1938

1939
  code = msmBuildRunnerTasksImpl(pCtx, pPlan, pInfo, pStream);
263✔
1940
  pPlan = NULL;
263✔
1941
  
1942
  TAOS_CHECK_EXIT(code);
263!
1943

1944
  taosHashClear(mStreamMgmt.toUpdateScanMap);
263✔
1945
  mStreamMgmt.toUpdateScanNum = 0;
263✔
1946

1947
_exit:
263✔
1948

1949
  nodesDestroyNode((SNode *)pPlan);
263✔
1950

1951
  if (code) {
263!
1952
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1953
  }
1954

1955
  return code;
263✔
1956
}
1957

1958

1959
static int32_t msmBuildStreamTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
264✔
1960
  int32_t code = TSDB_CODE_SUCCESS;
264✔
1961
  int32_t lino = 0;
264✔
1962
  int64_t streamId = pStream->pCreate->streamId;
264✔
1963

1964
  mstsInfo("start to deploy stream tasks, deployTimes:%" PRId64, pInfo->deployTimes);
264!
1965

1966
  pCtx->triggerTaskId = msmAssignTaskId();
264✔
1967
  pCtx->triggerNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
264✔
1968
  if (!GOT_SNODE(pCtx->triggerNodeId)) {
264!
1969
    TAOS_CHECK_EXIT(terrno);
×
1970
  }
1971

1972
  TAOS_CHECK_EXIT(msmBuildReaderTasks(pCtx, pInfo, pStream));
264!
1973
  TAOS_CHECK_EXIT(msmBuildRunnerTasks(pCtx, pInfo, pStream));
264!
1974
  TAOS_CHECK_EXIT(msmBuildTriggerTasks(pCtx, pInfo, pStream));
264!
1975
  
1976
_exit:
264✔
1977

1978
  if (code) {
264!
1979
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1980
  }
1981

1982
  return code;
264✔
1983
}
1984

1985
static int32_t msmInitTrigReaderList(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1986
  int32_t code = TSDB_CODE_SUCCESS;
×
1987
  int32_t lino = 0;
×
1988
  int64_t streamId = pStream->pCreate->streamId;
×
1989
  SSdb   *pSdb = pCtx->pMnode->pSdb;
×
1990
  SStmTaskStatus* pState = NULL;
×
1991
  SDbObj* pDb = NULL;
×
1992
  
1993
  switch (pStream->pCreate->triggerTblType) {
×
1994
    case TSDB_NORMAL_TABLE:
×
1995
    case TSDB_CHILD_TABLE:
1996
    case TSDB_VIRTUAL_CHILD_TABLE:
1997
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1998
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
×
1999
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
2000
      pInfo->trigReaderNum = 1;
×
2001
      break;
×
2002
    }
2003
    case TSDB_SUPER_TABLE: {
×
2004
      pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
×
2005
      if (NULL == pDb) {
×
2006
        code = terrno;
×
2007
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
2008
        goto _exit;
×
2009
      }
2010

2011
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
×
2012
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
2013
      pInfo->trigReaderNum = pDb->cfg.numOfVgroups;
×
2014
      mndReleaseDb(pCtx->pMnode, pDb);
×
2015
      pDb = NULL;
×
2016
      break;
×
2017
    }
2018
    default:
×
2019
      pInfo->trigReaderNum = 0;
×
2020
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
×
2021
      break;
×
2022
  }
2023

2024
_exit:
×
2025

2026
  if (code) {
×
2027
    mndReleaseDb(pCtx->pMnode, pDb);
×
2028
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2029
  }
2030

2031
  return code;
×
2032
}
2033

2034

2035
static int32_t msmInitStmStatus(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStreamObj* pStream, bool initList) {
238✔
2036
  int32_t code = TSDB_CODE_SUCCESS;
238✔
2037
  int32_t lino = 0;
238✔
2038
  int64_t streamId = pStream->pCreate->streamId;
238✔
2039

2040
  pStatus->lastActionTs = INT64_MIN;
238✔
2041

2042
  if (NULL == pStatus->streamName) {
238!
2043
    pStatus->streamName = taosStrdup(pStream->name);
238!
2044
    TSDB_CHECK_NULL(pStatus->streamName, code, lino, _exit, terrno);
238!
2045
  }
2046

2047
  TAOS_CHECK_EXIT(tCloneStreamCreateDeployPointers(pStream->pCreate, &pStatus->pCreate));
238!
2048
  
2049
  if (pStream->pCreate->numOfCalcSubplan > 0) {
238✔
2050
    pStatus->runnerNum = pStream->pCreate->numOfCalcSubplan;
237✔
2051
    
2052
    TAOS_CHECK_EXIT(msmSetStreamRunnerExecReplica(streamId, pStatus));
237!
2053
  }
2054

2055
  if (initList) {
238!
2056
    TAOS_CHECK_EXIT(msmInitTrigReaderList(pCtx, pStatus, pStream));
×
2057

2058
    int32_t subPlanNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
×
2059
    if (subPlanNum > 0) {
×
2060
      pStatus->calcReaderNum = subPlanNum;
×
2061
      pStatus->calcReaders = taosArrayInit(subPlanNum, sizeof(SStmTaskStatus));
×
2062
      TSDB_CHECK_NULL(pStatus->calcReaders, code, lino, _exit, terrno);
×
2063
    }
2064

2065
    if (pStatus->runnerNum > 0) {
×
2066
      for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
2067
        pStatus->runners[i] = taosArrayInit(pStatus->runnerNum, sizeof(SStmTaskStatus));
×
2068
        TSDB_CHECK_NULL(pStatus->runners[i], code, lino, _exit, terrno);
×
2069
      }
2070
    }
2071
  }
2072
  
2073
_exit:
238✔
2074

2075
  if (code) {
238!
2076
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2077
  }
2078

2079
  return code;
238✔
2080
}
2081

2082
static int32_t msmDeployStreamTasks(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmStatus* pStatus) {
264✔
2083
  int32_t code = TSDB_CODE_SUCCESS;
264✔
2084
  int32_t lino = 0;
264✔
2085
  int64_t streamId = pStream->pCreate->streamId;
264✔
2086
  SStmStatus info = {0};
264✔
2087

2088
  if (NULL == pStatus) {
264✔
2089
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &info, pStream, false));
238!
2090

2091
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &info, sizeof(info)));
238!
2092

2093
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
238✔
2094
  }
2095
  
2096
  TAOS_CHECK_EXIT(msmBuildStreamTasks(pCtx, pStatus, pStream));
264!
2097

2098
  mstLogSStmStatus("stream deployed", streamId, pStatus);
264✔
2099

2100
_exit:
264✔
2101

2102
  if (code) {
264!
2103
    if (NULL != pStatus) {
×
2104
      msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2105
      mstsError("stream build error:%s, will try to stop current stream", tstrerror(code));
×
2106
    }
2107
    
2108
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2109
  }
2110

2111
  return code;
264✔
2112
}
2113

2114

2115
static int32_t msmSTRemoveStream(int64_t streamId, bool fromStreamMap) {
27✔
2116
  int32_t code = TSDB_CODE_SUCCESS;
27✔
2117
  void* pIter = NULL;
27✔
2118

2119
  while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
44✔
2120
    SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
17✔
2121
    mstWaitLock(&pVg->lock, true);
17✔
2122

2123
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
17✔
2124
    if (atomic_load_32(&pVg->deployed) == taskNum) {
17✔
2125
      taosRUnLockLatch(&pVg->lock);
1✔
2126
      continue;
1✔
2127
    }
2128

2129
    for (int32_t i = 0; i < taskNum; ++i) {
72✔
2130
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
56✔
2131
      if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
56✔
2132
        continue;
54✔
2133
      }
2134

2135
      mstDestroySStmTaskToDeployExt(pExt);
2✔
2136
      pExt->deployed = true;
2✔
2137
    }
2138
    
2139
    taosRUnLockLatch(&pVg->lock);
16✔
2140
  }
2141

2142
  while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
63✔
2143
    SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
36✔
2144
    mstWaitLock(&pSnode->lock, true);
36✔
2145

2146
    int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
36✔
2147
    if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
36✔
2148
      for (int32_t i = 0; i < taskNum; ++i) {
41✔
2149
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
25✔
2150
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
25!
2151
          continue;
25✔
2152
        }
2153
        
2154
        mstDestroySStmTaskToDeployExt(pExt);
×
2155
        pExt->deployed = true;
×
2156
      }
2157
    }
2158

2159
    taskNum = taosArrayGetSize(pSnode->runnerList);
36✔
2160
    if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
36!
2161
      for (int32_t i = 0; i < taskNum; ++i) {
111✔
2162
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
75✔
2163
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
75!
2164
          continue;
75✔
2165
        }
2166
        
2167
        mstDestroySStmTaskToDeployExt(pExt);
×
2168
        pExt->deployed = true;
×
2169
      }
2170
    }
2171

2172
    taosRUnLockLatch(&pSnode->lock);
36✔
2173
  }
2174

2175
  
2176
  while ((pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter))) {
137✔
2177
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
110✔
2178
    code = taosHashRemove(pSnode->streamTasks, &streamId, sizeof(streamId));
110✔
2179
    if (TSDB_CODE_SUCCESS == code) {
110✔
2180
      mstsDebug("stream removed from snodeMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pSnode->streamTasks));
45!
2181
    }
2182
  }
2183

2184
  while ((pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter))) {
55✔
2185
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
28✔
2186
    code = taosHashRemove(pVg->streamTasks, &streamId, sizeof(streamId));
28✔
2187
    if (TSDB_CODE_SUCCESS == code) {
28!
2188
      mstsDebug("stream removed from vgroupMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pVg->streamTasks));
28!
2189
    }
2190
  }
2191

2192
  size_t keyLen = 0;
27✔
2193
  while ((pIter = taosHashIterate(mStreamMgmt.taskMap, pIter))) {
749✔
2194
    int64_t* pStreamId = taosHashGetKey(pIter, &keyLen);
722✔
2195
    if (*pStreamId == streamId) {
722✔
2196
      int64_t taskId = *(pStreamId + 1);
162✔
2197
      code = taosHashRemove(mStreamMgmt.taskMap, pStreamId, keyLen);
162✔
2198
      if (code) {
162!
2199
        mstsError("TASK:%" PRIx64 " remove from taskMap failed, error:%s", taskId, tstrerror(code));
×
2200
      } else {
2201
        mstsDebug("TASK:%" PRIx64 " removed from taskMap", taskId);
162!
2202
      }
2203
    }
2204
  }
2205

2206
  if (fromStreamMap) {
27✔
2207
    code = taosHashRemove(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1✔
2208
    if (code) {
1!
2209
      mstsError("stream remove from streamMap failed, error:%s", tstrerror(code));
×
2210
    } else {
2211
      mstsDebug("stream removed from streamMap, remains:%d", taosHashGetSize(mStreamMgmt.streamMap));
1!
2212
    }
2213
  }
2214
  
2215
  return code;
27✔
2216
}
2217

2218
static void msmResetStreamForRedeploy(int64_t streamId, SStmStatus* pStatus) {
26✔
2219
  mstsInfo("try to reset stream for redeploy, stopped:%d, current deployTimes:%" PRId64, atomic_load_8(&pStatus->stopped), pStatus->deployTimes);
26!
2220
  
2221
  msmSTRemoveStream(streamId, false);  
26✔
2222

2223
  mstResetSStmStatus(pStatus);
26✔
2224

2225
  pStatus->deployTimes++;
26✔
2226
}
26✔
2227

2228
static int32_t msmLaunchStreamDeployAction(SStmGrpCtx* pCtx, SStmStreamAction* pAction) {
265✔
2229
  int32_t code = TSDB_CODE_SUCCESS;
265✔
2230
  int32_t lino = 0;
265✔
2231
  int64_t streamId = pAction->streamId;
265✔
2232
  char* streamName = pAction->streamName;
265✔
2233
  SStreamObj* pStream = NULL;
265✔
2234
  int8_t stopped = 0;
265✔
2235

2236
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
265✔
2237
  if (pStatus) {
265✔
2238
    stopped = atomic_load_8(&pStatus->stopped);
26✔
2239
    if (0 == stopped) {
26!
2240
      mstsDebug("stream %s will try to reset and redeploy it", pAction->streamName);
×
2241
      msmResetStreamForRedeploy(streamId, pStatus);
×
2242
    } else {
2243
      if (MST_IS_USER_STOPPED(stopped) && !pAction->userAction) {
26!
2244
        mstsWarn("stream %s already stopped by user, stopped:%d, ignore deploy it", pAction->streamName, stopped);
×
2245
        return code;
×
2246
      }
2247
      
2248
      if (stopped == atomic_val_compare_exchange_8(&pStatus->stopped, stopped, 0)) {
26!
2249
        mstsDebug("stream %s will try to reset and redeploy it from stopped %d", pAction->streamName, stopped);
26!
2250
        msmResetStreamForRedeploy(streamId, pStatus);
26✔
2251
      }
2252
    }
2253
  }
2254

2255
  code = mndAcquireStream(pCtx->pMnode, streamName, &pStream);
265✔
2256
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
265✔
2257
    mstsWarn("stream %s no longer exists, ignore deploy", streamName);
1!
2258
    return TSDB_CODE_SUCCESS;
1✔
2259
  }
2260

2261
  TAOS_CHECK_EXIT(code);
264!
2262

2263
  if (pStatus && pStream->pCreate->streamId != streamId) {
264!
2264
    mstsWarn("stream %s already dropped by user, ignore deploy it", pAction->streamName);
×
2265
    atomic_store_8(&pStatus->stopped, 2);
×
2266
    mstsInfo("set stream %s stopped by user since streamId mismatch", streamName);
×
2267
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_EXIST);
×
2268
  }
2269

2270
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
264✔
2271
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
264✔
2272
  if (userStopped || userDropped) {
264!
2273
    mstsWarn("stream %s is stopped %d or removing %d, ignore deploy", streamName, userStopped, userDropped);
×
2274
    goto _exit;
×
2275
  }
2276
  
2277
  TAOS_CHECK_EXIT(msmDeployStreamTasks(pCtx, pStream, pStatus));
264!
2278

2279
_exit:
264✔
2280

2281
  mndReleaseStream(pCtx->pMnode, pStream);
264✔
2282

2283
  if (code) {
264!
2284
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2285
  }
2286

2287
  return code;
264✔
2288
}
2289

2290
static int32_t msmReLaunchReaderTask(SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
28✔
2291
  int32_t code = TSDB_CODE_SUCCESS;
28✔
2292
  int32_t lino = 0;
28✔
2293
  int64_t streamId = pAction->streamId;
28✔
2294
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
28✔
2295
  if (NULL == ppTask) {
28!
2296
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
×
2297
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2298
  }
2299
  
2300
  SStmTaskDeploy info = {0};
28✔
2301
  info.task.type = pAction->type;
28✔
2302
  info.task.streamId = pAction->streamId;
28✔
2303
  info.task.taskId = pAction->id.taskId;
28✔
2304
  info.task.seriousId = (*ppTask)->id.seriousId;
28✔
2305
  info.task.nodeId = pAction->id.nodeId;
28✔
2306
  info.task.taskIdx = pAction->id.taskIdx;
28✔
2307
  
2308
  bool isTriggerReader = STREAM_IS_TRIGGER_READER(pAction->flag);
28✔
2309
  SStreamCalcScan* scanPlan = NULL;
28✔
2310
  if (!isTriggerReader) {
28✔
2311
    scanPlan = taosArrayGet(pStatus->pCreate->calcScanPlanList, pAction->id.taskIdx);
14✔
2312
    if (NULL == scanPlan) {
14!
2313
      mstsError("fail to get TASK:%" PRId64 " scanPlan, taskIdx:%d, scanPlanNum:%zu", 
×
2314
          pAction->id.taskId, pAction->id.taskIdx, taosArrayGetSize(pStatus->pCreate->calcScanPlanList));
2315
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2316
    }
2317
  }
2318
  
2319
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, scanPlan ? scanPlan->scanPlan : NULL, pStatus, isTriggerReader));
28!
2320
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, pAction->streamId));
28!
2321

2322
_exit:
28✔
2323

2324
  if (code) {
28!
2325
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2326
  }
2327

2328
  return code;
28✔
2329
}
2330

2331
/*
2332
static int32_t msmReLaunchTriggerTask(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
2333
  int32_t code = TSDB_CODE_SUCCESS;
2334
  int32_t lino = 0;
2335
  int64_t streamId = pAction->streamId;
2336
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
2337
  if (NULL == ppTask) {
2338
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
2339
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
2340
  }
2341
  
2342
  (*ppTask)->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2343
  if (!GOT_SNODE((*ppTask)->id.nodeId)) {
2344
    mstsError("no avaible snode for deploying trigger task, seriousId: %" PRId64, (*ppTask)->id.seriousId);
2345
    return TSDB_CODE_SUCCESS;
2346
  }
2347
  
2348
  SStmTaskDeploy info = {0};
2349
  info.task.type = pAction->type;
2350
  info.task.streamId = streamId;
2351
  info.task.taskId = pAction->id.taskId;
2352
  info.task.seriousId = (*ppTask)->id.seriousId;
2353
  info.task.nodeId = (*ppTask)->id.nodeId;
2354
  info.task.taskIdx = pAction->id.taskIdx;
2355
  
2356
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2357
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2358
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, *ppTask, 1, -1));
2359
  
2360
  atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2361

2362
_exit:
2363

2364
  if (code) {
2365
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
2366
  }
2367

2368
  return code;
2369
}
2370
*/
2371

2372
static int32_t msmReLaunchRunnerDeploy(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
3✔
2373
  int32_t code = TSDB_CODE_SUCCESS;
3✔
2374
  int32_t lino = 0;
3✔
2375
  int64_t streamId = pAction->streamId;
3✔
2376
  
2377
/*
2378
  if (pAction->triggerStatus) {
2379
    pCtx->triggerTaskId = pAction->triggerStatus->id.taskId;
2380
    pAction->triggerStatus->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2381
    if (!GOT_SNODE(pAction->triggerStatus->id.nodeId)) {
2382
      mstsError("no avaible snode for deploying trigger task, seriousId:%" PRId64, pAction->triggerStatus->id.seriousId);
2383
      return TSDB_CODE_SUCCESS;
2384
    }
2385
  
2386
    pCtx->triggerNodeId = pAction->triggerStatus->id.nodeId;
2387
  } else {
2388
*/
2389
  pCtx->triggerTaskId = pStatus->triggerTask->id.taskId;
3✔
2390
  pCtx->triggerNodeId = pStatus->triggerTask->id.nodeId;
3✔
2391
//  }
2392
  
2393
  TAOS_CHECK_EXIT(msmUPPrepareReaderTasks(pCtx, pStatus, pStream));
3!
2394
  
2395
  SQueryPlan* pPlan = NULL;
3✔
2396
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
3!
2397
  
2398
  TAOS_CHECK_EXIT(msmReBuildRunnerTasks(pCtx, pPlan, pStatus, pStream, pAction));
3!
2399
  
2400
  taosHashClear(mStreamMgmt.toUpdateScanMap);
3✔
2401
  mStreamMgmt.toUpdateScanNum = 0;
3✔
2402
  
2403
/*
2404
  if (pAction->triggerStatus) {
2405
    SStmTaskDeploy info = {0};
2406
    info.task.type = STREAM_TRIGGER_TASK;
2407
    info.task.streamId = streamId;
2408
    info.task.taskId = pCtx->triggerTaskId;
2409
    info.task.seriousId = pAction->triggerStatus->id.seriousId;
2410
    info.task.nodeId = pCtx->triggerNodeId;
2411
    info.task.taskIdx = 0;
2412
  
2413
    TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2414
    TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2415
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pAction->triggerStatus, 1, -1));
2416
    
2417
    atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2418
  }
2419
*/
2420

2421
_exit:
3✔
2422

2423
  if (code) {
3!
2424
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2425
  }
2426

2427
  return code;
3✔
2428
}
2429

2430

2431
static int32_t msmLaunchTaskDeployAction(SStmGrpCtx* pCtx, SStmTaskAction* pAction) {
31✔
2432
  int32_t code = TSDB_CODE_SUCCESS;
31✔
2433
  int32_t lino = 0;
31✔
2434
  int64_t streamId = pAction->streamId;
31✔
2435
  int64_t taskId = pAction->id.taskId;
31✔
2436
  SStreamObj* pStream = NULL;
31✔
2437

2438
  mstsDebug("start to handle stream tasks action, action task type:%s", gStreamTaskTypeStr[pAction->type]);
31!
2439

2440
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &pAction->streamId, sizeof(pAction->streamId));
31✔
2441
  if (NULL == pStatus) {
31!
2442
    mstsWarn("stream not in streamMap, remain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2443
    return TSDB_CODE_SUCCESS;
×
2444
  }
2445

2446
  int8_t stopped = atomic_load_8(&pStatus->stopped);
31✔
2447
  if (stopped) {
31!
2448
    mstsWarn("stream %s is already stopped %d, ignore task deploy", pStatus->streamName, stopped);
×
2449
    return TSDB_CODE_SUCCESS;
×
2450
  }
2451

2452
  code = mndAcquireStream(pCtx->pMnode, pStatus->streamName, &pStream);
31✔
2453
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
31!
2454
    mstsWarn("stream %s no longer exists, ignore task deploy", pStatus->streamName);
×
2455
    return TSDB_CODE_SUCCESS;
×
2456
  }
2457

2458
  TAOS_CHECK_EXIT(code);
31!
2459

2460
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
31✔
2461
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
31✔
2462
  if (userStopped || userDropped) {
31!
2463
    mstsWarn("stream %s is stopped %d or removing %d, ignore task deploy", pStatus->streamName, userStopped, userDropped);
×
2464
    goto _exit;
×
2465
  }
2466

2467
  switch (pAction->type) {
31!
2468
    case STREAM_READER_TASK:
28✔
2469
      TAOS_CHECK_EXIT(msmReLaunchReaderTask(pStream, pAction, pStatus));
28!
2470
      break;
28✔
2471
/*
2472
    case STREAM_TRIGGER_TASK:
2473
      TAOS_CHECK_EXIT(msmReLaunchTriggerTask(pCtx, pStream, pAction, pStatus));
2474
      break;
2475
*/
2476
    case STREAM_RUNNER_TASK:
3✔
2477
      if (pAction->multiRunner) {
3!
2478
        TAOS_CHECK_EXIT(msmReLaunchRunnerDeploy(pCtx, pStream, pAction, pStatus));
3!
2479
      } else {
2480
        mstsError("runner TASK:%" PRId64 " requires relaunch", pAction->id.taskId);
×
2481
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
2482
      }
2483
      break;
3✔
2484
    default:
×
2485
      mstsError("TASK:%" PRId64 " invalid task type:%d", pAction->id.taskId, pAction->type);
×
2486
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2487
      break;
×
2488
  }
2489

2490
_exit:
31✔
2491

2492
  if (pStream) {
31!
2493
    mndReleaseStream(pCtx->pMnode, pStream);
31✔
2494
  }
2495

2496
  if (code) {
31!
2497
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2498
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2499
  }
2500

2501
  return code;
31✔
2502
}
2503

2504
static int32_t msmTDRemoveStream(int64_t streamId) {
×
2505
  void* pIter = NULL;
×
2506
  
2507
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
×
2508
    while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
×
2509
      SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
×
2510
      int32_t taskNum = taosArrayGetSize(pVg->taskList);
×
2511
      if (atomic_load_32(&pVg->deployed) == taskNum) {
×
2512
        continue;
×
2513
      }
2514
      
2515
      for (int32_t i = 0; i < taskNum; ++i) {
×
2516
        SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
×
2517
        if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2518
          pExt->deployed = true;
×
2519
        }
2520
      }
2521
    }
2522
  }
2523

2524
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
×
2525
    while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
×
2526
      SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
×
2527
      int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
×
2528
      if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
×
2529
        for (int32_t i = 0; i < taskNum; ++i) {
×
2530
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
×
2531
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2532
            pExt->deployed = true;
×
2533
          }
2534
        }
2535
      }
2536

2537
      taskNum = taosArrayGetSize(pSnode->runnerList);
×
2538
      if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
×
2539
        for (int32_t i = 0; i < taskNum; ++i) {
×
2540
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
×
2541
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2542
            pExt->deployed = true;
×
2543
          }
2544
        }
2545
      }
2546
    }
2547
  }
2548

2549
  return TSDB_CODE_SUCCESS;
×
2550
}
2551

2552
static int32_t msmRemoveStreamFromMaps(SMnode* pMnode, int64_t streamId) {
1✔
2553
  int32_t code = TSDB_CODE_SUCCESS;
1✔
2554
  int32_t lino = 0;
1✔
2555

2556
  mstsInfo("start to remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
1!
2557

2558
  TAOS_CHECK_EXIT(msmSTRemoveStream(streamId, true));
1!
2559

2560
_exit:
1✔
2561

2562
  if (code) {
1!
2563
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2564
  } else {
2565
    mstsInfo("end remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
1!
2566
  }
2567

2568
  return code;
1✔
2569
}
2570

2571
int32_t msmUndeployStream(SMnode* pMnode, int64_t streamId, char* streamName) {
30✔
2572
  int32_t code = TSDB_CODE_SUCCESS;
30✔
2573
  int32_t lino = 0;
30✔
2574

2575
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
30✔
2576
  if (0 == active || MND_STM_STATE_NORMAL != state) {
30!
2577
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2578
    return TSDB_CODE_MND_STREAM_NOT_AVAILABLE;
×
2579
  }
2580

2581
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
30✔
2582
  if (NULL == pStream) {
30✔
2583
    mstsInfo("stream %s already not in streamMap", streamName);
2!
2584
    goto _exit;
2✔
2585
  }
2586

2587
  atomic_store_8(&pStream->stopped, 2);
28✔
2588

2589
  mstsInfo("set stream %s stopped by user", streamName);
28!
2590

2591
_exit:
×
2592

2593
  taosHashRelease(mStreamMgmt.streamMap, pStream);
30✔
2594

2595
  if (code) {
30!
2596
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2597
  }
2598

2599
  return code;
30✔
2600
}
2601

2602
int32_t msmRecalcStream(SMnode* pMnode, int64_t streamId, STimeWindow* timeRange) {
1✔
2603
  int32_t code = TSDB_CODE_SUCCESS;
1✔
2604
  int32_t lino = 0;
1✔
2605

2606
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
1✔
2607
  if (0 == active || MND_STM_STATE_NORMAL != state) {
1!
2608
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2609
    return TSDB_CODE_MND_STREAM_NOT_AVAILABLE;
×
2610
  }
2611

2612
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
1✔
2613
  if (NULL == pStream || !STREAM_IS_RUNNING(pStream)) {
1!
2614
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
2615
    mstsInfo("stream still not in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2616
    goto _exit;
×
2617
  }
2618

2619
  TAOS_CHECK_EXIT(mstAppendNewRecalcRange(streamId, pStream, timeRange));
1!
2620

2621
_exit:
1✔
2622

2623
  taosHashRelease(mStreamMgmt.streamMap, pStream);
1✔
2624

2625
  if (code) {
1!
2626
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2627
  }
2628

2629
  return code;
1✔
2630
}
2631

2632
static int32_t msmHandleStreamActions(SStmGrpCtx* pCtx) {
79✔
2633
  int32_t code = TSDB_CODE_SUCCESS;
79✔
2634
  int32_t lino = 0;
79✔
2635
  SStmQNode* pQNode = NULL;
79✔
2636

2637
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
375✔
2638
    switch (pQNode->type) {
296!
2639
      case STREAM_ACT_DEPLOY:
296✔
2640
        if (pQNode->streamAct) {
296✔
2641
          mstDebug("start to handle stream deploy action");
265✔
2642
          msmLaunchStreamDeployAction(pCtx, &pQNode->action.stream);
265✔
2643
        } else {
2644
          mstDebug("start to handle task deploy action");
31!
2645
          msmLaunchTaskDeployAction(pCtx, &pQNode->action.task);
31✔
2646
        }
2647
        break;
296✔
2648
      default:
×
2649
        break;
×
2650
    }
2651
  }
2652

2653
_exit:
79✔
2654

2655
  if (code) {
79!
2656
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2657
  }
2658

2659
  return code;
79✔
2660
}
2661

2662
void msmStopAllStreamsByGrant(int32_t errCode) {
×
2663
  SStmStatus* pStatus = NULL;
×
2664
  void* pIter = NULL;
×
2665
  int64_t streamId = 0;
×
2666
  
2667
  while (true) {
2668
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
2669
    if (NULL == pIter) {
×
2670
      break;
×
2671
    }
2672

2673
    pStatus = (SStmStatus*)pIter;
×
2674

2675
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
2676
    atomic_store_8(&pStatus->stopped, 4);
×
2677

2678
    mstsInfo("set stream stopped since %s", tstrerror(errCode));
×
2679
  }
2680
}
×
2681

2682
int32_t msmHandleGrantExpired(SMnode *pMnode, int32_t errCode) {
×
2683
  mstInfo("stream grant expired");
×
2684

2685
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
×
2686
    mstWarn("mnode stream is NOT active, ignore handling");
×
2687
    return errCode;
×
2688
  }
2689

2690
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
2691

2692
  msmStopAllStreamsByGrant(errCode);
×
2693

2694
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
2695
  
2696
  return errCode;
×
2697
}
2698

2699
static int32_t msmInitStreamDeploy(SStmStreamDeploy* pStream, SStmTaskDeploy* pDeploy) {
1,562✔
2700
  int32_t code = TSDB_CODE_SUCCESS;
1,562✔
2701
  int32_t lino = 0;
1,562✔
2702
  int64_t streamId = pDeploy->task.streamId;
1,562✔
2703
  
2704
  switch (pDeploy->task.type) {
1,562!
2705
    case STREAM_READER_TASK:
506✔
2706
      if (NULL == pStream->readerTasks) {
506✔
2707
        pStream->streamId = streamId;
294✔
2708
        pStream->readerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
294✔
2709
        TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
294!
2710
      }
2711
      
2712
      TSDB_CHECK_NULL(taosArrayPush(pStream->readerTasks, pDeploy), code, lino, _exit, terrno);
1,012!
2713
      break;
506✔
2714
    case STREAM_TRIGGER_TASK:
264✔
2715
      pStream->streamId = streamId;
264✔
2716
      pStream->triggerTask = taosMemoryMalloc(sizeof(SStmTaskDeploy));
264!
2717
      TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
264!
2718
      memcpy(pStream->triggerTask, pDeploy, sizeof(SStmTaskDeploy));
264✔
2719
      break;
264✔
2720
    case STREAM_RUNNER_TASK:
792✔
2721
      if (NULL == pStream->runnerTasks) {
792✔
2722
        pStream->streamId = streamId;
305✔
2723
        pStream->runnerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
305✔
2724
        TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
305!
2725
      }      
2726
      TSDB_CHECK_NULL(taosArrayPush(pStream->runnerTasks, pDeploy), code, lino, _exit, terrno);
1,584!
2727
      break;
792✔
2728
    default:
×
2729
      break;
×
2730
  }
2731

2732
_exit:
1,562✔
2733

2734
  if (code) {
1,562!
2735
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2736
  }
2737

2738
  return code;
1,562✔
2739
}
2740

2741
static int32_t msmGrpAddDeployTask(SHashObj* pHash, SStmTaskDeploy* pDeploy) {
1,562✔
2742
  int32_t code = TSDB_CODE_SUCCESS;
1,562✔
2743
  int32_t lino = 0;
1,562✔
2744
  int64_t streamId = pDeploy->task.streamId;
1,562✔
2745
  SStreamTask* pTask = &pDeploy->task;
1,562✔
2746
  SStmStreamDeploy streamDeploy = {0};
1,562✔
2747
  SStmStreamDeploy* pStream = NULL;
1,562✔
2748
   
2749
  while (true) {
2750
    pStream = taosHashAcquire(pHash, &streamId, sizeof(streamId));
1,562✔
2751
    if (NULL == pStream) {
1,562✔
2752
      TAOS_CHECK_EXIT(msmInitStreamDeploy(&streamDeploy, pDeploy));
361!
2753
      code = taosHashPut(pHash, &streamId, sizeof(streamId), &streamDeploy, sizeof(streamDeploy));
361✔
2754
      if (TSDB_CODE_SUCCESS == code) {
361!
2755
        goto _exit;
361✔
2756
      }
2757

2758
      if (TSDB_CODE_DUP_KEY != code) {
×
2759
        goto _exit;
×
2760
      }    
2761

2762
      tFreeSStmStreamDeploy(&streamDeploy);
×
2763
      continue;
×
2764
    }
2765

2766
    TAOS_CHECK_EXIT(msmInitStreamDeploy(pStream, pDeploy));
1,201!
2767
    
2768
    break;
1,201✔
2769
  }
2770
  
2771
_exit:
1,562✔
2772

2773
  taosHashRelease(pHash, pStream);
1,562✔
2774

2775
  if (code) {
1,562!
2776
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2777
  } else {
2778
    msttDebug("task added to GRP deployMap, taskIdx:%d", pTask->taskIdx);
1,562✔
2779
  }
2780

2781
  return code;
1,562✔
2782
}
2783

2784

2785
int32_t msmGrpAddDeployTasks(SHashObj* pHash, SArray* pTasks, int32_t* deployed) {
380✔
2786
  int32_t code = TSDB_CODE_SUCCESS;
380✔
2787
  int32_t lino = 0;
380✔
2788
  int32_t taskNum = taosArrayGetSize(pTasks);
380✔
2789

2790
  for (int32_t i = 0; i < taskNum; ++i) {
1,944✔
2791
    SStmTaskToDeployExt* pExt = taosArrayGet(pTasks, i);
1,564✔
2792
    if (pExt->deployed) {
1,564✔
2793
      continue;
2✔
2794
    }
2795

2796
    TAOS_CHECK_EXIT(msmGrpAddDeployTask(pHash, &pExt->deploy));
1,562!
2797
    pExt->deployed = true;
1,562✔
2798

2799
    atomic_add_fetch_32(deployed, 1);
1,562✔
2800
  }
2801

2802
_exit:
380✔
2803

2804
  if (code) {
380!
2805
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2806
  }
2807

2808
  return code;
380✔
2809
}
2810

2811
int32_t msmGrpAddDeployVgTasks(SStmGrpCtx* pCtx) {
140✔
2812
  int32_t code = TSDB_CODE_SUCCESS;
140✔
2813
  int32_t lino = 0;
140✔
2814
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
140✔
2815
  SStmVgTasksToDeploy* pVg = NULL;
140✔
2816
  //int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pCtx->pReq->streamGId);
2817

2818
  mstDebug("start to add stream vgroup tasks deploy");
140✔
2819
  
2820
  for (int32_t i = 0; i < vgNum; ++i) {
523✔
2821
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
383✔
2822

2823
    msmUpdateVgroupUpTs(pCtx, *vgId);
383✔
2824

2825
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
383✔
2826
    if (NULL == pVg) {
383✔
2827
      continue;
181✔
2828
    }
2829

2830
    if (taosRTryLockLatch(&pVg->lock)) {
202!
2831
      continue;
×
2832
    }
2833
    
2834
    if (atomic_load_32(&pVg->deployed) == taosArrayGetSize(pVg->taskList)) {
202!
2835
      taosRUnLockLatch(&pVg->lock);
×
2836
      continue;
×
2837
    }
2838
    
2839
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pVg->taskList, &pVg->deployed));
202!
2840
    taosRUnLockLatch(&pVg->lock);
202✔
2841
  }
2842

2843
_exit:
140✔
2844

2845
  if (code) {
140!
2846
    if (pVg) {
×
2847
      taosRUnLockLatch(&pVg->lock);
×
2848
    }
2849

2850
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2851
  }
2852

2853
  return code;
140✔
2854
}
2855

2856
int32_t msmGrpAddDeploySnodeTasks(SStmGrpCtx* pCtx) {
133✔
2857
  int32_t code = TSDB_CODE_SUCCESS;
133✔
2858
  int32_t lino = 0;
133✔
2859
  SStmSnodeTasksDeploy* pSnode = NULL;
133✔
2860
  SStreamHbMsg* pReq = pCtx->pReq;
133✔
2861

2862
  mstDebug("start to add stream snode tasks deploy");
133✔
2863
  
2864
  pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pReq->snodeId, sizeof(pReq->snodeId));
133✔
2865
  if (NULL == pSnode) {
133✔
2866
    return TSDB_CODE_SUCCESS;
32✔
2867
  }
2868

2869
  mstWaitLock(&pSnode->lock, false);
101✔
2870
  
2871
  if (atomic_load_32(&pSnode->triggerDeployed) < taosArrayGetSize(pSnode->triggerList)) {
101✔
2872
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->triggerList, &pSnode->triggerDeployed));
77!
2873
  }
2874

2875
  if (atomic_load_32(&pSnode->runnerDeployed) < taosArrayGetSize(pSnode->runnerList)) {
101!
2876
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->runnerList, &pSnode->runnerDeployed));
101!
2877
  }
2878
  
2879
  taosWUnLockLatch(&pSnode->lock);
101✔
2880

2881
_exit:
101✔
2882

2883
  if (code) {
101!
2884
    if (pSnode) {
×
2885
      taosWUnLockLatch(&pSnode->lock);
×
2886
    }
2887

2888
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2889
  }
2890

2891
  return code;
101✔
2892
}
2893

2894
int32_t msmUpdateStreamLastActTs(int64_t streamId, int64_t currTs) {
812✔
2895
  int32_t code = TSDB_CODE_SUCCESS;
812✔
2896
  int32_t lino = 0;
812✔
2897
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
812✔
2898
  if (NULL == pStatus) {
812!
2899
    mstsWarn("stream already not exists in streamMap, mapSize:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2900
    return TSDB_CODE_SUCCESS;
×
2901
  }
2902
  
2903
  pStatus->lastActionTs = currTs;
812✔
2904

2905
_exit:
812✔
2906

2907
  if (code) {
812!
2908
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2909
  }
2910

2911
  return code;
812✔
2912
}
2913

2914
int32_t msmRspAddStreamsDeploy(SStmGrpCtx* pCtx) {
114✔
2915
  int32_t code = TSDB_CODE_SUCCESS;
114✔
2916
  int32_t lino = 0;
114✔
2917
  int32_t streamNum = taosHashGetSize(pCtx->deployStm);
114✔
2918
  void* pIter = NULL;
114✔
2919

2920
  mstDebug("start to add group %d deploy streams, streamNum:%d", pCtx->pReq->streamGId, taosHashGetSize(pCtx->deployStm));
114✔
2921
  
2922
  pCtx->pRsp->deploy.streamList = taosArrayInit(streamNum, sizeof(SStmStreamDeploy));
114✔
2923
  TSDB_CHECK_NULL(pCtx->pRsp->deploy.streamList, code, lino, _exit, terrno);
114!
2924

2925
  while (1) {
361✔
2926
    pIter = taosHashIterate(pCtx->deployStm, pIter);
475✔
2927
    if (pIter == NULL) {
475✔
2928
      break;
114✔
2929
    }
2930
    
2931
    SStmStreamDeploy *pDeploy = (SStmStreamDeploy *)pIter;
361✔
2932
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->deploy.streamList, pDeploy), code, lino, _exit, terrno);
722!
2933

2934
    int64_t streamId = pDeploy->streamId;
361✔
2935
    mstsDebug("stream DEPLOY added to dnode %d hb rsp, readerTasks:%zu, triggerTask:%d, runnerTasks:%zu", 
361✔
2936
        pCtx->pReq->dnodeId, taosArrayGetSize(pDeploy->readerTasks), pDeploy->triggerTask ? 1 : 0, taosArrayGetSize(pDeploy->runnerTasks));
2937

2938
    mstClearSStmStreamDeploy(pDeploy);
361✔
2939
    
2940
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(pDeploy->streamId, pCtx->currTs));
361!
2941
  }
2942
  
2943
_exit:
114✔
2944

2945
  if (pIter) {
114!
2946
    taosHashCancelIterate(pCtx->deployStm, pIter);
×
2947
  }
2948

2949
  if (code) {
114!
2950
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2951
  }
2952
  
2953
  return code;
114✔
2954
}
2955

2956
void msmCleanDeployedVgTasks(SArray* pVgLeaders) {
140✔
2957
  int32_t code = TSDB_CODE_SUCCESS;
140✔
2958
  int32_t lino = 0;
140✔
2959
  int32_t vgNum = taosArrayGetSize(pVgLeaders);
140✔
2960
  SStmVgTasksToDeploy* pVg = NULL;
140✔
2961
  
2962
  for (int32_t i = 0; i < vgNum; ++i) {
523✔
2963
    int32_t* vgId = taosArrayGet(pVgLeaders, i);
383✔
2964
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
383✔
2965
    if (NULL == pVg) {
383✔
2966
      continue;
181✔
2967
    }
2968

2969
    if (taosWTryLockLatch(&pVg->lock)) {
202!
2970
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2971
      continue;
×
2972
    }
2973
    
2974
    if (atomic_load_32(&pVg->deployed) <= 0) {
202!
2975
      taosWUnLockLatch(&pVg->lock);
×
2976
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2977
      continue;
×
2978
    }
2979

2980
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
202✔
2981
    if (atomic_load_32(&pVg->deployed) == taskNum) {
202✔
2982
      atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, taskNum);
201✔
2983
      taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
201✔
2984
      pVg->taskList = NULL;
201✔
2985
      taosHashRemove(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
201✔
2986
      taosWUnLockLatch(&pVg->lock);
201✔
2987
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
201✔
2988
      continue;
201✔
2989
    }
2990

2991
    for (int32_t m = taskNum - 1; m >= 0; --m) {
9✔
2992
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, m);
8✔
2993
      if (!pExt->deployed) {
8!
2994
        continue;
×
2995
      }
2996

2997
      mstDestroySStmTaskToDeployExt(pExt);
8✔
2998

2999
      taosArrayRemove(pVg->taskList, m);
8✔
3000
      atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
8✔
3001
    }
3002
    atomic_store_32(&pVg->deployed, 0);
1✔
3003
    taosWUnLockLatch(&pVg->lock);
1✔
3004
    taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
1✔
3005
  }
3006

3007
_exit:
140✔
3008

3009
  if (code) {
140!
3010
    if (pVg) {
×
3011
      taosWUnLockLatch(&pVg->lock);
×
3012
    }
3013

3014
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3015
  }
3016
}
140✔
3017

3018
void msmCleanDeployedSnodeTasks (int32_t snodeId) {
151✔
3019
  if (!GOT_SNODE(snodeId)) {
151✔
3020
    return;
18✔
3021
  }
3022
  
3023
  int32_t code = TSDB_CODE_SUCCESS;
133✔
3024
  SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId));
133✔
3025
  if (NULL == pSnode) {
133✔
3026
    return;
32✔
3027
  }
3028

3029
  if (taosWTryLockLatch(&pSnode->lock)) {
101!
3030
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3031
    return;
×
3032
  }
3033

3034
  int32_t triggerNum = taosArrayGetSize(pSnode->triggerList);
101✔
3035
  int32_t runnerNum = taosArrayGetSize(pSnode->runnerList);
101✔
3036
  
3037
  if (atomic_load_32(&pSnode->triggerDeployed) <= 0 && atomic_load_32(&pSnode->runnerDeployed) <= 0) {
101!
3038
    taosWUnLockLatch(&pSnode->lock);
×
3039
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3040
    return;
×
3041
  }
3042

3043
  if (atomic_load_32(&pSnode->triggerDeployed) == triggerNum) {
101!
3044
    atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, triggerNum);
101✔
3045
    taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
101✔
3046
    pSnode->triggerList = NULL;
101✔
3047
  }
3048

3049
  if (atomic_load_32(&pSnode->runnerDeployed) == runnerNum) {
101!
3050
    atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, runnerNum);
101✔
3051
    taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
101✔
3052
    pSnode->runnerList = NULL;
101✔
3053
  }
3054

3055
  if (NULL == pSnode->triggerList && NULL == pSnode->runnerList) {
101!
3056
    taosHashRemove(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId));
101✔
3057
    taosWUnLockLatch(&pSnode->lock);
101✔
3058
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
101✔
3059
    return;
101✔
3060
  }
3061

3062
  if (atomic_load_32(&pSnode->triggerDeployed) > 0 && pSnode->triggerList) {
×
3063
    for (int32_t m = triggerNum - 1; m >= 0; --m) {
×
3064
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, m);
×
3065
      if (!pExt->deployed) {
×
3066
        continue;
×
3067
      }
3068

3069
      mstDestroySStmTaskToDeployExt(pExt);
×
3070
      atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
×
3071
      taosArrayRemove(pSnode->triggerList, m);
×
3072
    }
3073
    
3074
    pSnode->triggerDeployed = 0;
×
3075
  }
3076

3077
  if (atomic_load_32(&pSnode->runnerDeployed) > 0 && pSnode->runnerList) {
×
3078
    for (int32_t m = runnerNum - 1; m >= 0; --m) {
×
3079
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, m);
×
3080
      if (!pExt->deployed) {
×
3081
        continue;
×
3082
      }
3083

3084
      mstDestroySStmTaskToDeployExt(pExt);
×
3085
      atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
×
3086
      taosArrayRemove(pSnode->runnerList, m);
×
3087
    }
3088
    
3089
    pSnode->runnerDeployed = 0;
×
3090
  }
3091
  
3092
  taosWUnLockLatch(&pSnode->lock);
×
3093
  taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3094
}
3095

3096
void msmClearStreamToDeployMaps(SStreamHbMsg* pHb) {
32,324✔
3097
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
32,324!
3098
    return;
×
3099
  }
3100

3101
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
32,324✔
3102
    msmCleanDeployedVgTasks(pHb->pVgLeaders);
140✔
3103
  }
3104

3105
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
32,324✔
3106
    msmCleanDeployedSnodeTasks(pHb->snodeId);
151✔
3107
  }
3108
}
3109

3110
void msmCleanStreamGrpCtx(SStreamHbMsg* pHb) {
32,324✔
3111
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
32,324!
3112
    return;
×
3113
  }
3114

3115
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
32,324✔
3116
  taosHashClear(mStreamMgmt.tCtx[tidx].actionStm[pHb->streamGId]);
32,324✔
3117
  taosHashClear(mStreamMgmt.tCtx[tidx].deployStm[pHb->streamGId]);
32,324✔
3118
}
3119

3120
int32_t msmGrpAddActionStart(SHashObj* pHash, int64_t streamId, SStmTaskId* pId) {
295✔
3121
  int32_t code = TSDB_CODE_SUCCESS;
295✔
3122
  int32_t lino = 0;
295✔
3123
  int32_t action = STREAM_ACT_START;
295✔
3124
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
295✔
3125
  if (pAction) {
295!
3126
    pAction->actions |= action;
×
3127
    pAction->start.triggerId = *pId;
×
3128
    mstsDebug("stream append START action, actions:%x", pAction->actions);
×
3129
  } else {
3130
    SStmAction newAction = {0};
295✔
3131
    newAction.actions = action;
295✔
3132
    newAction.start.triggerId = *pId;
295✔
3133
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
295!
3134
    mstsDebug("stream add START action, actions:%x", newAction.actions);
295✔
3135
  }
3136

3137
_exit:
295✔
3138

3139
  if (code) {
295!
3140
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3141
  }
3142

3143
  return code;
295✔
3144
}
3145

3146
int32_t msmGrpAddActionUpdateTrigger(SHashObj* pHash, int64_t streamId) {
×
3147
  int32_t code = TSDB_CODE_SUCCESS;
×
3148
  int32_t lino = 0;
×
3149
  int32_t action = STREAM_ACT_UPDATE_TRIGGER;
×
3150
  
3151
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
×
3152
  if (pAction) {
×
3153
    pAction->actions |= action;
×
3154
    mstsDebug("stream append UPDATE_TRIGGER action, actions:%x", pAction->actions);
×
3155
  } else {
3156
    SStmAction newAction = {0};
×
3157
    newAction.actions = action;
×
3158
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3159
    mstsDebug("stream add UPDATE_TRIGGER action, actions:%x", newAction.actions);
×
3160
  }
3161

3162
_exit:
×
3163

3164
  if (code) {
×
3165
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3166
  }
3167

3168
  return code;
×
3169
}
3170

3171

3172

3173
int32_t msmGrpAddActionUndeploy(SStmGrpCtx* pCtx, int64_t streamId, SStreamTask* pTask) {
156✔
3174
  int32_t code = TSDB_CODE_SUCCESS;
156✔
3175
  int32_t lino = 0;
156✔
3176
  int32_t action = STREAM_ACT_UNDEPLOY;
156✔
3177
  bool    dropped = false;
156✔
3178

3179
  TAOS_CHECK_EXIT(mstIsStreamDropped(pCtx->pMnode, streamId, &dropped));
156!
3180
  mstsDebug("stream dropped: %d", dropped);
156!
3181
  
3182
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
156✔
3183
  if (pAction) {
156✔
3184
    pAction->actions |= action;
84✔
3185
    if (NULL == pAction->undeploy.taskList) {
84!
3186
      pAction->undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
×
3187
      TSDB_CHECK_NULL(pAction->undeploy.taskList, code, lino, _exit, terrno);
×
3188
    }
3189

3190
    TSDB_CHECK_NULL(taosArrayPush(pAction->undeploy.taskList, &pTask), code, lino, _exit, terrno);
168!
3191
    if (pAction->undeploy.doCheckpoint) {
84✔
3192
      pAction->undeploy.doCheckpoint = dropped ? false : true;
68✔
3193
    }
3194
    if (!pAction->undeploy.doCleanup) {
84✔
3195
      pAction->undeploy.doCleanup = dropped ? true : false;
68✔
3196
    }
3197
    
3198
    msttDebug("task append UNDEPLOY action[%d,%d], actions:%x", pAction->undeploy.doCheckpoint, pAction->undeploy.doCleanup, pAction->actions);
84!
3199
  } else {
3200
    SStmAction newAction = {0};
72✔
3201
    newAction.actions = action;
72✔
3202
    newAction.undeploy.doCheckpoint = dropped ? false : true;
72✔
3203
    newAction.undeploy.doCleanup = dropped ? true : false;
72✔
3204
    newAction.undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
72✔
3205
    TSDB_CHECK_NULL(newAction.undeploy.taskList, code, lino, _exit, terrno);
72!
3206
    TSDB_CHECK_NULL(taosArrayPush(newAction.undeploy.taskList, &pTask), code, lino, _exit, terrno);
144!
3207
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
72!
3208
    
3209
    msttDebug("task add UNDEPLOY action[%d,%d]", newAction.undeploy.doCheckpoint, newAction.undeploy.doCleanup);
72!
3210
  }
3211

3212
_exit:
156✔
3213

3214
  if (code) {
156!
3215
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3216
  }
3217

3218
  return code;
156✔
3219
}
3220

3221
int32_t msmGrpAddActionRecalc(SStmGrpCtx* pCtx, int64_t streamId, SArray* recalcList) {
×
3222
  int32_t code = TSDB_CODE_SUCCESS;
×
3223
  int32_t lino = 0;
×
3224
  int32_t action = STREAM_ACT_RECALC;
×
3225
  SStmAction newAction = {0};
×
3226
  
3227
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
×
3228
  if (pAction) {
×
3229
    pAction->actions |= action;
×
3230
    pAction->recalc.recalcList = recalcList;
×
3231

3232
    mstsDebug("stream append recalc action, listSize:%d, actions:%x", (int32_t)taosArrayGetSize(recalcList), pAction->actions);
×
3233
  } else {
3234
    newAction.actions = action;
×
3235
    newAction.recalc.recalcList = recalcList;
×
3236
    
3237
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3238
    
3239
    mstsDebug("stream add recalc action, listSize:%d", (int32_t)taosArrayGetSize(recalcList));
×
3240
  }
3241

3242
_exit:
×
3243

3244
  if (code) {
×
3245
    mstDestroySStmAction(&newAction);
×
3246
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3247
  }
3248

3249
  return code;
×
3250
}
3251

3252
bool msmCheckStreamStartCond(int64_t streamId, int32_t snodeId) {
350✔
3253
  SStmStatus* pStream = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
350✔
3254
  if (NULL == pStream) {
350!
3255
    return false;
×
3256
  }
3257

3258
  if (pStream->triggerTask->id.nodeId != snodeId || STREAM_STATUS_INIT != pStream->triggerTask->status) {
350!
3259
    return false;
×
3260
  }
3261

3262
  int32_t readerNum = taosArrayGetSize(pStream->trigReaders);
350✔
3263
  for (int32_t i = 0; i < readerNum; ++i) {
789✔
3264
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigReaders, i);
439✔
3265
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
439!
3266
      return false;
×
3267
    }
3268
  }
3269

3270
  readerNum = taosArrayGetSize(pStream->trigOReaders);
350✔
3271
  for (int32_t i = 0; i < readerNum; ++i) {
369✔
3272
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigOReaders, i);
19✔
3273
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
19!
3274
      return false;
×
3275
    }
3276
  }
3277

3278
  readerNum = taosArrayGetSize(pStream->calcReaders);
350✔
3279
  for (int32_t i = 0; i < readerNum; ++i) {
488✔
3280
    SStmTaskStatus* pStatus = taosArrayGet(pStream->calcReaders, i);
138✔
3281
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
138!
3282
      return false;
×
3283
    }
3284
  }
3285

3286
  for (int32_t i = 0; i < pStream->runnerDeploys; ++i) {
1,232✔
3287
    int32_t runnerNum = taosArrayGetSize(pStream->runners[i]);
937✔
3288
    for (int32_t m = 0; m < runnerNum; ++m) {
1,819✔
3289
      SStmTaskStatus* pStatus = taosArrayGet(pStream->runners[i], m);
937✔
3290
      if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
937!
3291
        return false;
55✔
3292
      }
3293
    }
3294
  }
3295
  
3296
  return true;
295✔
3297
}
3298

3299

3300
void msmHandleTaskAbnormalStatus(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pMsg, SStmTaskStatus* pTaskStatus) {
7,123✔
3301
  int32_t code = TSDB_CODE_SUCCESS;
7,123✔
3302
  int32_t lino = 0;
7,123✔
3303
  int32_t action = 0;
7,123✔
3304
  int64_t streamId = pMsg->streamId;
7,123✔
3305
  SStreamTask* pTask = (SStreamTask*)pMsg;
7,123✔
3306
  int8_t  stopped = 0;
7,123✔
3307

3308
  msttDebug("start to handle task abnormal status %d", pTask->status);
7,123✔
3309
  
3310
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
7,123✔
3311
  if (NULL == pStatus) {
7,123!
3312
    msttInfo("stream no longer exists in streamMap, try to undeploy current task, idx:%d", pMsg->taskIdx);
×
3313
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3314
    return;
6,770✔
3315
  }
3316

3317
  stopped = atomic_load_8(&pStatus->stopped);
7,123✔
3318
  if (stopped) {
7,123!
3319
    msttInfo("stream stopped %d, try to undeploy current task, idx:%d", stopped, pMsg->taskIdx);
×
3320
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3321
    return;
×
3322
  }
3323
  
3324
  switch (pMsg->status) {
7,123!
3325
    case STREAM_STATUS_INIT:      
7,120✔
3326
      if (STREAM_TRIGGER_TASK != pMsg->type) {
7,120✔
3327
        msttTrace("task status is INIT and not trigger task, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
6,554!
3328
        return;
6,554✔
3329
      }
3330
      
3331
      if (INT64_MIN == pStatus->lastActionTs) {
566!
3332
        msttDebug("task still not deployed, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
×
3333
        return;
×
3334
      }
3335
      
3336
      if ((pCtx->currTs - pStatus->lastActionTs) < STREAM_ACT_MIN_DELAY_MSEC) {
566✔
3337
        msttDebug("task wait not enough between actions, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
216✔
3338
        return;
216✔
3339
      }
3340

3341
      if (STREAM_IS_RUNNING(pStatus)) {
350!
3342
        msttDebug("stream already running, ignore status: %s", gStreamStatusStr[pTask->status]);
×
3343
      } else if (GOT_SNODE(pCtx->pReq->snodeId) && msmCheckStreamStartCond(streamId, pCtx->pReq->snodeId)) {
350!
3344
        TAOS_CHECK_EXIT(msmGrpAddActionStart(pCtx->actionStm, streamId, &pStatus->triggerTask->id));
295!
3345
      }
3346
      break;
350✔
3347
    case STREAM_STATUS_FAILED:
3✔
3348
      //STREAMTODO ADD ERRCODE HANDLE
3349
      msttInfo("task failed with error:%s, try to undeploy current task, idx:%d", tstrerror(pMsg->errorCode), pMsg->taskIdx);
3!
3350
      TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
3!
3351
      break;
3✔
3352
    default:
×
3353
      break;
×
3354
  }
3355

3356
_exit:
353✔
3357

3358
  if (code) {
353!
3359
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
3360
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3361
  }
3362
}
3363

3364
void msmHandleStatusUpdateErr(SStmGrpCtx* pCtx, EStmErrType err, SStmTaskStatusMsg* pStatus) {
153✔
3365
  int32_t code = TSDB_CODE_SUCCESS;
153✔
3366
  int32_t lino = 0;
153✔
3367
  SStreamTask* pTask = (SStreamTask*)pStatus;
153✔
3368
  int64_t streamId = pStatus->streamId;
153✔
3369

3370
  msttInfo("start to handle task status update exception, type: %d", err);
153!
3371
  
3372
  // STREAMTODO
3373

3374
  if (STM_ERR_TASK_NOT_EXISTS == err || STM_ERR_STREAM_STOPPED == err) {
153!
3375
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
153!
3376
  }
3377

3378
_exit:
153✔
3379

3380
  if (code) {
153!
3381
    // IGNORE STOP STREAM BY ERROR  
3382
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3383
  }
3384
}
153✔
3385

3386
void msmChkHandleTriggerOperations(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask, SStmTaskStatus* pStatus) {
2,910✔
3387
  int32_t code = TSDB_CODE_SUCCESS;
2,910✔
3388
  int32_t lino = 0;
2,910✔
3389
  SStmStatus* pStream = (SStmStatus*)pStatus->pStream;
2,910✔
3390

3391
  if (1 == atomic_val_compare_exchange_8(&pStream->triggerNeedUpdate, 1, 0)) {
2,910!
3392
    TAOS_CHECK_EXIT(msmGrpAddActionUpdateTrigger(pCtx->actionStm, pTask->streamId));
×
3393
  }
3394
  
3395
  SArray* userRecalcList = NULL;
2,910✔
3396
  if (atomic_load_ptr(&pStream->userRecalcList)) {
2,910!
3397
    taosWLockLatch(&pStream->userRecalcLock);
×
3398
    if (pStream->userRecalcList) {
×
3399
      userRecalcList = pStream->userRecalcList;
×
3400
      pStream->userRecalcList = NULL;
×
3401
    }
3402
    taosWUnLockLatch(&pStream->userRecalcLock);
×
3403
    
3404
    if (userRecalcList) {
×
3405
      TAOS_CHECK_EXIT(msmGrpAddActionRecalc(pCtx, pTask->streamId, userRecalcList));
×
3406
    }
3407
  }
3408

3409
  if (pTask->detailStatus >= 0 && pCtx->pReq->pTriggerStatus) {
2,910!
3410
    mstWaitLock(&pStatus->detailStatusLock, false);
1,442✔
3411
    if (NULL == pStatus->detailStatus) {
1,442✔
3412
      pStatus->detailStatus = taosMemoryCalloc(1, sizeof(SSTriggerRuntimeStatus));
252!
3413
      if (NULL == pStatus->detailStatus) {
252!
3414
        taosWUnLockLatch(&pStatus->detailStatusLock);
×
3415
        TSDB_CHECK_NULL(pStatus->detailStatus, code, lino, _exit, terrno);
×
3416
      }
3417
    }
3418
    
3419
    memcpy(pStatus->detailStatus, taosArrayGet(pCtx->pReq->pTriggerStatus, pTask->detailStatus), sizeof(SSTriggerRuntimeStatus));
1,442✔
3420
    taosWUnLockLatch(&pStatus->detailStatusLock);
1,442✔
3421
  }
3422

3423
_exit:
1,468✔
3424

3425
  if (code) {
2,910!
3426
    // IGNORE STOP STREAM BY ERROR
3427
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3428
  }
3429
}
2,910✔
3430

3431
int32_t msmNormalHandleStatusUpdate(SStmGrpCtx* pCtx) {
1,676✔
3432
  int32_t code = TSDB_CODE_SUCCESS;
1,676✔
3433
  int32_t lino = 0;
1,676✔
3434
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
1,676✔
3435

3436
  mstDebug("NORMAL: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
1,676✔
3437

3438
  for (int32_t i = 0; i < num; ++i) {
18,220✔
3439
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
16,544✔
3440
    msttDebug("task status %s got on dnode %d, taskIdx:%d", gStreamStatusStr[pTask->status], pCtx->pReq->dnodeId, pTask->taskIdx);
16,544✔
3441
    
3442
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
16,544✔
3443
    if (NULL == ppStatus) {
16,544✔
3444
      msttWarn("task no longer exists in taskMap, will try to undeploy current task, taskIdx:%d", pTask->taskIdx);
60!
3445
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
60✔
3446
      continue;
60✔
3447
    }
3448

3449
    SStmStatus* pStream = (SStmStatus*)(*ppStatus)->pStream;
16,484✔
3450
    int8_t stopped = atomic_load_8(&pStream->stopped);
16,484✔
3451
    if (stopped) {
16,484✔
3452
      msttWarn("stream already stopped %d, will try to undeploy current task, taskIdx:%d", stopped, pTask->taskIdx);
91!
3453
      msmHandleStatusUpdateErr(pCtx, STM_ERR_STREAM_STOPPED, pTask);
91✔
3454
      continue;
91✔
3455
    }
3456

3457
    if ((pTask->seriousId != (*ppStatus)->id.seriousId) || (pTask->nodeId != (*ppStatus)->id.nodeId)) {
16,393!
3458
      msttInfo("task mismatch with it in taskMap, will try to rm it, current seriousId:%" PRId64 ", nodeId:%d", 
2!
3459
          (*ppStatus)->id.seriousId, (*ppStatus)->id.nodeId);
3460
          
3461
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
2✔
3462
      continue;
2✔
3463
    }
3464

3465
    if ((*ppStatus)->status != pTask->status) {
16,391✔
3466
      if (STREAM_STATUS_RUNNING == pTask->status) {
2,645✔
3467
        (*ppStatus)->runningStartTs = pCtx->currTs;
1,115✔
3468
      } else if (MST_IS_RUNNER_GETTING_READY(pTask) && STREAM_IS_REDEPLOY_RUNNER((*ppStatus)->flags)) {
1,530!
3469
        if (pStream->triggerTask) {
×
3470
          atomic_store_8(&pStream->triggerNeedUpdate, 1);
×
3471
        }
3472
        
3473
        STREAM_CLR_FLAG((*ppStatus)->flags, STREAM_FLAG_REDEPLOY_RUNNER);
×
3474
      }
3475
    }
3476
    
3477
    (*ppStatus)->errCode = pTask->errorCode;
16,391✔
3478
    (*ppStatus)->status = pTask->status;
16,391✔
3479
    (*ppStatus)->lastUpTs = pCtx->currTs;
16,391✔
3480
    
3481
    if (STREAM_STATUS_RUNNING != pTask->status) {
16,391✔
3482
      msmHandleTaskAbnormalStatus(pCtx, pTask, *ppStatus);
7,123✔
3483
    }
3484
    
3485
    if (STREAM_TRIGGER_TASK == pTask->type) {
16,391✔
3486
      msmChkHandleTriggerOperations(pCtx, pTask, *ppStatus);
2,910✔
3487
    }
3488
  }
3489

3490
_exit:
1,676✔
3491

3492
  if (code) {
1,676!
3493
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3494
  }
3495

3496
  return code;
1,676✔
3497
}
3498

3499
int32_t msmWatchRecordNewTask(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
×
3500
  int32_t code = TSDB_CODE_SUCCESS;
×
3501
  int32_t lino = 0;
×
3502
  int64_t streamId = pTask->streamId;
×
3503
  SStreamObj* pStream = NULL;
×
3504

3505
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3506
  if (NULL == pStatus) {
×
3507
    SStmStatus status = {0};
×
3508
    TAOS_CHECK_EXIT(mndAcquireStreamById(pCtx->pMnode, streamId, &pStream));
×
3509
    TSDB_CHECK_NULL(pStream, code, lino, _exit, TSDB_CODE_MND_STREAM_NOT_EXIST);
×
3510
    if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
×
3511
      mndReleaseStream(pCtx->pMnode, pStream);
×
3512
      msttDebug("virtual table task ignored, status:%s", gStreamStatusStr[pTask->status]);
×
3513
      return code;
×
3514
    }
3515

3516
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &status, pStream, true));
×
3517
    mndReleaseStream(pCtx->pMnode, pStream);
×
3518

3519
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &status, sizeof(status)));
×
3520
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3521
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
×
3522
    msttDebug("stream added to streamMap cause of new task status:%s", gStreamStatusStr[pTask->status]);
×
3523
  }
3524

3525
  SStmTaskStatus* pNewTask = NULL;
×
3526
  switch (pTask->type) {
×
3527
    case STREAM_READER_TASK: {
×
3528
      SArray* pList = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaders : pStatus->calcReaders;
×
3529
      if (NULL == pList) {
×
3530
        mstsError("%sReader list is NULL", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc");
×
3531
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3532
      }
3533
      int32_t readerSize = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaderNum : pStatus->calcReaderNum;
×
3534
      if (taosArrayGetSize(pList) >= readerSize) {
×
3535
        mstsError("%sReader list is already full, size:%d, expSize:%d", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc",
×
3536
            (int32_t)taosArrayGetSize(pList), readerSize);
3537
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3538
      }
3539
      
3540
      SStmTaskStatus taskStatus = {0};
×
3541
      taskStatus.pStream = pStatus;
×
3542
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3543
      pNewTask = taosArrayPush(pList, &taskStatus);
×
3544
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3545

3546
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3547
      TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pNewTask, STREAM_IS_TRIGGER_READER(pTask->flags)));
×
3548
      break;
×
3549
    }
3550
    case STREAM_TRIGGER_TASK: {
×
3551
      taosMemoryFreeClear(pStatus->triggerTask);
×
3552
      pStatus->triggerTask = taosMemoryCalloc(1, sizeof(*pStatus->triggerTask));
×
3553
      TSDB_CHECK_NULL(pStatus->triggerTask, code, lino, _exit, terrno);
×
3554
      pStatus->triggerTask->pStream = pStatus;
×
3555
      mstSetTaskStatusFromMsg(pCtx, pStatus->triggerTask, pTask);
×
3556
      pNewTask = pStatus->triggerTask;
×
3557

3558
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3559
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, 0));
×
3560
      break;
×
3561
    }
3562
    case STREAM_RUNNER_TASK:{
×
3563
      if (NULL == pStatus->runners[pTask->deployId]) {
×
3564
        mstsError("deploy %d runner list is NULL", pTask->deployId);
×
3565
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3566
      }
3567
      if (taosArrayGetSize(pStatus->runners[pTask->deployId]) >= pStatus->runnerNum) {
×
3568
        mstsError("deploy %d runner list is already full, size:%d, expSize:%d", pTask->deployId, 
×
3569
            (int32_t)taosArrayGetSize(pStatus->runners[pTask->deployId]), pStatus->runnerNum);
3570
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3571
      }    
3572
      
3573
      SStmTaskStatus taskStatus = {0};
×
3574
      taskStatus.pStream = pStatus;
×
3575
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3576
      pNewTask = taosArrayPush(pStatus->runners[pTask->deployId], &taskStatus);
×
3577
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3578

3579
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3580
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, pTask->deployId));
×
3581
      break;
×
3582
    }
3583
    default: {
×
3584
      msttError("invalid task type:%d in task status", pTask->type);
×
3585
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3586
      break;
×
3587
    }
3588
  }
3589

3590
_exit:
×
3591

3592
  if (code) {
×
3593
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3594
  } else {
3595
    msttDebug("new task recored to taskMap/streamMap, task status:%s", gStreamStatusStr[pTask->status]);
×
3596
  }
3597

3598
  return code;
×
3599
}
3600

3601
int32_t msmWatchHandleStatusUpdate(SStmGrpCtx* pCtx) {
×
3602
  int32_t code = TSDB_CODE_SUCCESS;
×
3603
  int32_t lino = 0;
×
3604
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
×
3605

3606
  mstDebug("WATCH: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
×
3607

3608
  for (int32_t i = 0; i < num; ++i) {
×
3609
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
×
3610
    msttDebug("task status %s got, taskIdx:%d", gStreamStatusStr[pTask->status], pTask->taskIdx);
×
3611

3612
    if (pTask->taskId >= mStreamMgmt.lastTaskId) {
×
3613
      mStreamMgmt.lastTaskId = pTask->taskId + 1;
×
3614
    }
3615
    
3616
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
×
3617
    if (NULL == ppStatus) {
×
3618
      msttInfo("task still not in taskMap, will try to add it, taskIdx:%d", pTask->taskIdx);
×
3619
      
3620
      TAOS_CHECK_EXIT(msmWatchRecordNewTask(pCtx, pTask));
×
3621
      
3622
      continue;
×
3623
    }
3624
    
3625
    (*ppStatus)->status = pTask->status;
×
3626
    (*ppStatus)->lastUpTs = pCtx->currTs;
×
3627
  }
3628

3629
_exit:
×
3630

3631
  if (code) {
×
3632
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3633
  }
3634

3635
  return code;
×
3636
}
3637

3638
void msmRspAddStreamStart(int64_t streamId, SStmGrpCtx* pCtx, int32_t streamNum, SStmAction *pAction) {
295✔
3639
  int32_t code = TSDB_CODE_SUCCESS;
295✔
3640
  int32_t lino = 0;
295✔
3641
  if (NULL == pCtx->pRsp->start.taskList) {
295✔
3642
    pCtx->pRsp->start.taskList = taosArrayInit(streamNum, sizeof(SStreamTaskStart));
144✔
3643
    TSDB_CHECK_NULL(pCtx->pRsp->start.taskList, code, lino, _exit, terrno);
144!
3644
  }
3645

3646
  SStmTaskId* pId = &pAction->start.triggerId;
295✔
3647
  SStreamTaskStart start = {0};
295✔
3648
  start.task.type = STREAM_TRIGGER_TASK;
295✔
3649
  start.task.streamId = streamId;
295✔
3650
  start.task.taskId = pId->taskId;
295✔
3651
  start.task.seriousId = pId->seriousId;
295✔
3652
  start.task.nodeId = pId->nodeId;
295✔
3653
  start.task.taskIdx = pId->taskIdx;
295✔
3654

3655
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->start.taskList, &start), code, lino, _exit, terrno);
590!
3656
  TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
295!
3657

3658
  mstsDebug("stream START added to dnode %d hb rsp, triggerTaskId:%" PRIx64, pId->nodeId, pId->taskId);
295✔
3659

3660
  return;
295✔
3661

3662
_exit:
×
3663

3664
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3665
}
3666

3667

3668
void msmRspAddStreamUndeploy(int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
72✔
3669
  int32_t code = TSDB_CODE_SUCCESS;
72✔
3670
  int32_t lino = 0;
72✔
3671
  int32_t dropNum = taosArrayGetSize(pAction->undeploy.taskList);
72✔
3672
  if (NULL == pCtx->pRsp->undeploy.taskList) {
72✔
3673
    pCtx->pRsp->undeploy.taskList = taosArrayInit(dropNum, sizeof(SStreamTaskUndeploy));
61✔
3674
    TSDB_CHECK_NULL(pCtx->pRsp->undeploy.taskList, code, lino, _exit, terrno);
61!
3675
  }
3676

3677
  SStreamTaskUndeploy undeploy;
3678
  for (int32_t i = 0; i < dropNum; ++i) {
228✔
3679
    SStreamTask* pTask = (SStreamTask*)taosArrayGetP(pAction->undeploy.taskList, i);
156✔
3680
    undeploy.task = *pTask;
156✔
3681
    undeploy.undeployMsg.doCheckpoint = pAction->undeploy.doCheckpoint;
156✔
3682
    undeploy.undeployMsg.doCleanup = pAction->undeploy.doCleanup;
156✔
3683

3684
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->undeploy.taskList, &undeploy), code, lino, _exit, terrno);
312!
3685
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
156!
3686

3687
    msttDebug("task UNDEPLOY added to hb rsp, doCheckpoint:%d, doCleanup:%d", undeploy.undeployMsg.doCheckpoint, undeploy.undeployMsg.doCleanup);
156!
3688
  }
3689

3690
  return;
72✔
3691

3692
_exit:
×
3693

3694
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3695
}
3696

3697
void msmRspAddTriggerUpdate(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3698
  int32_t code = TSDB_CODE_SUCCESS;
×
3699
  int32_t lino = 0;
×
3700

3701
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3702
  if (NULL == pStream) {
×
3703
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3704
    return;
×
3705
  }
3706

3707
  if (NULL == pStream->triggerTask) {
×
3708
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3709
    return;
×
3710
  }
3711

3712
  SStreamMgmtRsp rsp = {0};
×
3713
  rsp.reqId = INT64_MIN;
×
3714
  rsp.header.msgType = STREAM_MSG_UPDATE_RUNNER;
×
3715
  rsp.task.streamId = streamId;
×
3716
  rsp.task.taskId = pStream->triggerTask->id.taskId;
×
3717

3718
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pStream, streamId, &rsp.cont.runnerList));  
×
3719

3720
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
3721
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
3722
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
3723
  }
3724

3725
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
3726

3727
_exit:
×
3728

3729
  if (code) {
×
3730
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3731
  } else {
3732
    mstsDebug("trigger update rsp added, runnerNum:%d", (int32_t)taosArrayGetSize(rsp.cont.runnerList));
×
3733
  }
3734
}
3735

3736
void msmRspAddUserRecalc(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3737
  int32_t code = TSDB_CODE_SUCCESS;
×
3738
  int32_t lino = 0;
×
3739

3740
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3741
  if (NULL == pStream) {
×
3742
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3743
    return;
×
3744
  }
3745

3746
  if (NULL == pStream->triggerTask) {
×
3747
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3748
    return;
×
3749
  }
3750

3751
  SStreamMgmtRsp rsp = {0};
×
3752
  rsp.reqId = INT64_MIN;
×
3753
  rsp.header.msgType = STREAM_MSG_USER_RECALC;
×
3754
  rsp.task.streamId = streamId;
×
3755
  rsp.task.taskId = pStream->triggerTask->id.taskId;
×
3756
  TSWAP(rsp.cont.recalcList, pAction->recalc.recalcList);
×
3757

3758
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
3759
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
3760
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
3761
  }
3762

3763
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
3764

3765
_exit:
×
3766

3767
  if (code) {
×
3768
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3769
  } else {
3770
    mstsDebug("user recalc rsp added, recalcNum:%d", (int32_t)taosArrayGetSize(rsp.cont.recalcList));
×
3771
  }
3772
}
3773

3774

3775
int32_t msmHandleHbPostActions(SStmGrpCtx* pCtx) {
205✔
3776
  int32_t code = TSDB_CODE_SUCCESS;
205✔
3777
  int32_t lino = 0;
205✔
3778
  void* pIter = NULL;
205✔
3779
  int32_t streamNum = taosHashGetSize(pCtx->actionStm);
205✔
3780

3781
  mstDebug("start to handle stream group %d post actions", pCtx->pReq->streamGId);
205✔
3782

3783
  while (1) {
367✔
3784
    pIter = taosHashIterate(pCtx->actionStm, pIter);
572✔
3785
    if (pIter == NULL) {
572✔
3786
      break;
205✔
3787
    }
3788

3789
    int64_t* pStreamId = taosHashGetKey(pIter, NULL);
367✔
3790
    SStmAction *pAction = (SStmAction *)pIter;
367✔
3791
    
3792
    if (STREAM_ACT_UNDEPLOY & pAction->actions) {
367✔
3793
      msmRspAddStreamUndeploy(*pStreamId, pCtx, pAction);
72✔
3794
      continue;
72✔
3795
    }
3796

3797
    if (STREAM_ACT_UPDATE_TRIGGER & pAction->actions) {
295!
3798
      msmRspAddTriggerUpdate(pCtx->pMnode, *pStreamId, pCtx, pAction);
×
3799
    }
3800

3801
    if (STREAM_ACT_RECALC & pAction->actions) {
295!
3802
      msmRspAddUserRecalc(pCtx->pMnode, *pStreamId, pCtx, pAction);
×
3803
    }
3804

3805
    if (STREAM_ACT_START & pAction->actions) {
295!
3806
      msmRspAddStreamStart(*pStreamId, pCtx, streamNum, pAction);
295✔
3807
    }
3808
  }
3809
  
3810
_exit:
205✔
3811

3812
  if (pIter) {
205!
3813
    taosHashCancelIterate(pCtx->actionStm, pIter);
×
3814
  }
3815

3816
  if (code) {
205!
3817
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3818
  }
3819

3820
  return code;
205✔
3821
}
3822

3823
int32_t msmCheckUpdateDnodeTs(SStmGrpCtx* pCtx) {
32,324✔
3824
  int32_t  code = TSDB_CODE_SUCCESS;
32,324✔
3825
  int32_t  lino = 0;
32,324✔
3826
  int64_t* lastTs = NULL;
32,324✔
3827
  bool     noExists = false;
32,324✔
3828

3829
  while (true) {
3830
    lastTs = taosHashGet(mStreamMgmt.dnodeMap, &pCtx->pReq->dnodeId, sizeof(pCtx->pReq->dnodeId));
34,485✔
3831
    if (NULL == lastTs) {
34,485✔
3832
      if (noExists) {
2,486✔
3833
        mstWarn("Got unknown dnode %d hb msg, may be dropped", pCtx->pReq->dnodeId);
325!
3834
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
325!
3835
      }
3836

3837
      noExists = true;
2,161✔
3838
      TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pCtx->pMnode));
2,161!
3839
      
3840
      continue;
2,161✔
3841
    }
3842

3843
    while (true) {
×
3844
      int64_t lastTsValue = atomic_load_64(lastTs);
31,999✔
3845
      if (pCtx->currTs > lastTsValue) {
31,999✔
3846
        if (lastTsValue == atomic_val_compare_exchange_64(lastTs, lastTsValue, pCtx->currTs)) {
31,998!
3847
          mstDebug("dnode %d lastUpTs updated", pCtx->pReq->dnodeId);
31,998✔
3848
          return code;
31,998✔
3849
        }
3850

3851
        continue;
×
3852
      }
3853

3854
      return code;
1✔
3855
    }
3856

3857
    break;
3858
  }
3859

3860
_exit:
325✔
3861

3862
  if (code) {
325!
3863
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
325!
3864
  }
3865

3866
  return code;  
325✔
3867
}
3868

3869
void msmWatchCheckStreamMap(SStmGrpCtx* pCtx) {
×
3870
  SStmStatus* pStatus = NULL;
×
3871
  int32_t trigReaderNum = 0;
×
3872
  int32_t calcReaderNum = 0;
×
3873
  int32_t runnerNum = 0;
×
3874
  int64_t streamId = 0;
×
3875
  void* pIter = NULL;
×
3876
  while (true) {
3877
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
3878
    if (NULL == pIter) {
×
3879
      return;
×
3880
    }
3881

3882
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
3883
    pStatus = (SStmStatus*)pIter;
×
3884

3885
    if (NULL == pStatus->triggerTask) {
×
3886
      mstsWarn("no trigger task recored, deployTimes:%" PRId64, pStatus->deployTimes);
×
3887
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3888
      continue;
×
3889
    }
3890
    
3891
    trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
×
3892
    if (pStatus->trigReaderNum != trigReaderNum) {
×
3893
      mstsWarn("trigReaderNum %d mis-match with expected %d", trigReaderNum, pStatus->trigReaderNum);
×
3894
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3895
      continue;
×
3896
    }
3897

3898
    calcReaderNum = taosArrayGetSize(pStatus->calcReaders);
×
3899
    if (pStatus->calcReaderNum != calcReaderNum) {
×
3900
      mstsWarn("calcReaderNum %d mis-match with expected %d", calcReaderNum, pStatus->calcReaderNum);
×
3901
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3902
      continue;
×
3903
    }
3904

3905
    for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
3906
      runnerNum = taosArrayGetSize(pStatus->runners[i]);
×
3907
      if (runnerNum != pStatus->runnerNum) {
×
3908
        mstsWarn("runner deploy %d runnerNum %d mis-match with expected %d", i, runnerNum, pStatus->runnerNum);
×
3909
        msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3910
        continue;
×
3911
      }
3912
    }
3913
  }
3914
}
3915

3916
int32_t msmWatchHandleEnding(SStmGrpCtx* pCtx, bool watchError) {
×
3917
  int32_t code = TSDB_CODE_SUCCESS;
×
3918
  int32_t lino = 0;
×
3919

3920
  if (0 != atomic_val_compare_exchange_8(&mStreamMgmt.watch.ending, 0, 1)) {
×
3921
    return code;
×
3922
  }
3923

3924
  while (atomic_load_32(&mStreamMgmt.watch.processing) > 1) {
×
3925
    (void)sched_yield();
×
3926
  }
3927

3928
  if (watchError) {
×
3929
    taosHashClear(mStreamMgmt.vgroupMap);
×
3930
    taosHashClear(mStreamMgmt.snodeMap);
×
3931
    taosHashClear(mStreamMgmt.taskMap);
×
3932
    taosHashClear(mStreamMgmt.streamMap);
×
3933
    mstInfo("watch error happends, clear all maps");
×
3934
    goto _exit;
×
3935
  }
3936

3937
  if (0 == atomic_load_8(&mStreamMgmt.watch.taskRemains)) {
×
3938
    mstInfo("no stream tasks remain during watch state");
×
3939
    goto _exit;
×
3940
  }
3941

3942
  msmWatchCheckStreamMap(pCtx);
×
3943

3944
_exit:
×
3945

3946
  mStreamMgmt.lastTaskId += 100000;
×
3947

3948
  mstInfo("watch state end, new taskId begin from:%" PRIx64, mStreamMgmt.lastTaskId);
×
3949

3950
  msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
×
3951

3952
  if (code) {
×
3953
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3954
  }
3955

3956
  return code;
×
3957
}
3958

3959

3960
int32_t msmWatchHandleHbMsg(SStmGrpCtx* pCtx) {
×
3961
  int32_t code = TSDB_CODE_SUCCESS;
×
3962
  int32_t lino = 0;
×
3963
  SStreamHbMsg* pReq = pCtx->pReq;
×
3964

3965
  atomic_add_fetch_32(&mStreamMgmt.watch.processing, 1);
×
3966
  
3967
  if (atomic_load_8(&mStreamMgmt.watch.ending)) {
×
3968
    goto _exit;
×
3969
  }
3970

3971
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
×
3972
  if (GOT_SNODE(pReq->snodeId)) {
×
3973
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
×
3974
  }
3975

3976
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
×
3977
    atomic_store_8(&mStreamMgmt.watch.taskRemains, 1);
×
3978
    TAOS_CHECK_EXIT(msmWatchHandleStatusUpdate(pCtx));
×
3979
  }
3980

3981
  if ((pCtx->currTs - MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN)) > MST_ISOLATION_DURATION) {
×
3982
    TAOS_CHECK_EXIT(msmWatchHandleEnding(pCtx, false));
×
3983
  }
3984

3985
_exit:
×
3986

3987
  atomic_sub_fetch_32(&mStreamMgmt.watch.processing, 1);
×
3988
  
3989
  if (code) {
×
3990
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3991

3992
    msmWatchHandleEnding(pCtx, true);
×
3993
  }
3994

3995
  return code;
×
3996
}
3997

3998
int32_t msmCheckDeployTrigReader(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStmTaskStatusMsg* pTask, int32_t vgId, int32_t vgNum) {
45✔
3999
  int32_t code = TSDB_CODE_SUCCESS;
45✔
4000
  int32_t lino = 0;
45✔
4001
  bool    readerExists = false;
45✔
4002
  int64_t streamId = pTask->streamId;
45✔
4003

4004
  int32_t readerNum = taosArrayGetSize(pStatus->trigReaders);
45✔
4005
  for (int32_t i = 0; i < readerNum; ++i) {
68✔
4006
    SStmTaskStatus* pReader = (SStmTaskStatus*)taosArrayGet(pStatus->trigReaders, i);
49✔
4007
    if (pReader->id.nodeId == vgId) {
49✔
4008
      readerExists = true;
26✔
4009
      break;
26✔
4010
    }
4011
  }
4012

4013
  if (!readerExists) {
45✔
4014
    if (NULL == pStatus->trigOReaders) {
19✔
4015
      pStatus->trigOReaders = taosArrayInit(vgNum, sizeof(SStmTaskStatus));
17✔
4016
      TSDB_CHECK_NULL(pStatus->trigOReaders, code, lino, _exit, terrno);
17!
4017
    }
4018
    
4019
    SStmTaskStatus* pState = taosArrayReserve(pStatus->trigOReaders, 1);
19✔
4020
    TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, vgId, pStatus, NULL, streamId));
19!
4021
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pState));
19!
4022
    TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, NULL, pState, true));
19!
4023
  }
4024

4025
_exit:
45✔
4026

4027
  if (code) {
45!
4028
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4029
  }
4030

4031
  return code;
45✔
4032
}
4033

4034
int32_t msmProcessDeployOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
43✔
4035
  int32_t code = TSDB_CODE_SUCCESS;
43✔
4036
  int32_t lino = 0;
43✔
4037
  int32_t vgId = 0;
43✔
4038
  int64_t streamId = pTask->streamId;
43✔
4039
  SArray* pTbs = pTask->pMgmtReq->cont.fullTableNames;
43✔
4040
  int32_t tbNum = taosArrayGetSize(pTbs);
43✔
4041
  SStreamDbTableName* pName = NULL;
43✔
4042
  SSHashObj* pDbVgroups = NULL;
43✔
4043
  SStreamMgmtRsp rsp = {0};
43✔
4044
  rsp.reqId = pTask->pMgmtReq->reqId;
43✔
4045
  rsp.header.msgType = STREAM_MSG_ORIGTBL_READER_INFO;
43✔
4046
  int32_t iter = 0;
43✔
4047
  void* p = NULL;
43✔
4048
  SSHashObj* pVgs = NULL;
43✔
4049
  SStreamMgmtReq* pMgmtReq = NULL;
43✔
4050
  int8_t stopped = 0;
43✔
4051
  
4052
  TSWAP(pTask->pMgmtReq, pMgmtReq);
43✔
4053
  rsp.task = *(SStreamTask*)pTask;
43✔
4054

4055
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
43✔
4056
  if (NULL == pStatus) {
43!
4057
    mstsError("stream not deployed, remainStreams:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4058
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_RUNNING);
×
4059
  }
4060

4061
  stopped = atomic_load_8(&pStatus->stopped);
43✔
4062
  if (stopped) {
43!
4063
    msttInfo("stream stopped %d, ignore deploy trigger reader, vgId:%d", stopped, vgId);
×
4064
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_STOPPED);
×
4065
  }
4066

4067
  if (tbNum <= 0) {
43!
4068
    mstsWarn("empty table list in origReader req, array:%p", pTbs);
×
4069
    goto _exit;
×
4070
  }
4071

4072
  int32_t oReaderNum = taosArrayGetSize(pStatus->trigOReaders);
43✔
4073
  if (oReaderNum > 0) {
43!
4074
    mstsWarn("origReaders already exits, num:%d", oReaderNum);
×
4075
    goto _exit;
×
4076
  }
4077

4078
  TAOS_CHECK_EXIT(mstBuildDBVgroupsMap(pCtx->pMnode, &pDbVgroups));
43!
4079
  rsp.cont.vgIds = taosArrayInit(tbNum, sizeof(int32_t));
43✔
4080
  TSDB_CHECK_NULL(rsp.cont.vgIds, code, lino, _exit, terrno);
43!
4081

4082
  pVgs = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
43✔
4083
  TSDB_CHECK_NULL(pVgs, code, lino, _exit, terrno);
43!
4084
  
4085
  for (int32_t i = 0; i < tbNum; ++i) {
133✔
4086
    pName = (SStreamDbTableName*)taosArrayGet(pTbs, i);
90✔
4087
    TAOS_CHECK_EXIT(mstGetTableVgId(pDbVgroups, pName->dbFName, pName->tbName, &vgId));
90!
4088
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.vgIds, &vgId), code, lino, _exit, terrno);
180!
4089
    TAOS_CHECK_EXIT(tSimpleHashPut(pVgs, &vgId, sizeof(vgId), &vgId, sizeof(vgId)));
90!
4090
  }
4091

4092
  int32_t vgNum = tSimpleHashGetSize(pVgs);
43✔
4093
  while (true) {
4094
    p = tSimpleHashIterate(pVgs, p, &iter);
88✔
4095
    if (NULL == p) {
88✔
4096
      break;
43✔
4097
    }
4098
    
4099
    TAOS_CHECK_EXIT(msmCheckDeployTrigReader(pCtx, pStatus, pTask, *(int32_t*)p, vgNum));
45!
4100
  }
4101
  
4102
  vgNum = taosArrayGetSize(pStatus->trigOReaders);
43✔
4103
  rsp.cont.readerList = taosArrayInit(vgNum, sizeof(SStreamTaskAddr));
43✔
4104
  TSDB_CHECK_NULL(rsp.cont.readerList, code, lino, _exit, terrno);
43!
4105

4106
  SStreamTaskAddr addr;
4107
  for (int32_t i = 0; i < vgNum; ++i) {
62✔
4108
    SStmTaskStatus* pOTask = taosArrayGet(pStatus->trigOReaders, i);
19✔
4109
    addr.taskId = pOTask->id.taskId;
19✔
4110
    addr.nodeId = pOTask->id.nodeId;
19✔
4111
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, pOTask->id.nodeId);
19✔
4112
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.readerList, &addr), code, lino, _exit, terrno);
38!
4113
    mstsDebug("the %dth otrigReader src added to trigger's virtual orig readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
19!
4114
  }
4115

4116
  if (NULL == pCtx->pRsp->rsps.rspList) {
43!
4117
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
4118
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
4119
  }
4120

4121
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
86!
4122

4123
_exit:
43✔
4124

4125
  tFreeSStreamMgmtReq(pMgmtReq);
43✔
4126
  taosMemoryFree(pMgmtReq);
43!
4127

4128
  tSimpleHashCleanup(pVgs);
43✔
4129

4130
  if (code) {
43!
4131
    mndStreamDestroySStreamMgmtRsp(&rsp);
×
4132
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4133
  }
4134

4135
  mstDestroyDbVgroupsHash(pDbVgroups);
43✔
4136

4137
  return code;
43✔
4138
}
4139

4140
int32_t msmHandleTaskMgmtReq(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
43✔
4141
  int32_t code = TSDB_CODE_SUCCESS;
43✔
4142
  int32_t lino = 0;
43✔
4143

4144
  switch (pTask->pMgmtReq->type) {
43!
4145
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
43✔
4146
      TAOS_CHECK_EXIT(msmProcessDeployOrigReader(pCtx, pTask));
43!
4147
      break;
43✔
4148
    default:
×
4149
      msttError("Invalid mgmtReq type:%d", pTask->pMgmtReq->type);
×
4150
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4151
      break;
×
4152
  }
4153

4154
_exit:
43✔
4155

4156
  if (code) {
43!
4157
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4158
  }
4159

4160
  return code;
43✔
4161
}
4162

4163
int32_t msmHandleStreamRequests(SStmGrpCtx* pCtx) {
15✔
4164
  int32_t code = TSDB_CODE_SUCCESS;
15✔
4165
  int32_t lino = 0;
15✔
4166
  SStreamHbMsg* pReq = pCtx->pReq;
15✔
4167
  SStmTaskStatusMsg* pTask = NULL;
15✔
4168
  
4169
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
15✔
4170
  if (reqNum > 0 && NULL == pCtx->pRsp->rsps.rspList) {
15!
4171
    pCtx->pRsp->rsps.rspList = taosArrayInit(reqNum, sizeof(SStreamMgmtRsp));
15✔
4172
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
15!
4173
  }
4174
  
4175
  for (int32_t i = 0; i < reqNum; ++i) {
58✔
4176
    int32_t idx = *(int32_t*)taosArrayGet(pReq->pStreamReq, i);
43✔
4177
    pTask = (SStmTaskStatusMsg*)taosArrayGet(pReq->pStreamStatus, idx);
43✔
4178
    if (NULL == pTask) {
43!
4179
      mstError("idx %d is NULL, reqNum:%d", idx, reqNum);
×
4180
      continue;
×
4181
    }
4182

4183
    if (NULL == pTask->pMgmtReq) {
43!
4184
      msttError("idx %d without mgmtReq", idx);
×
4185
      continue;
×
4186
    }
4187

4188
    TAOS_CHECK_EXIT(msmHandleTaskMgmtReq(pCtx, pTask));
43!
4189
  }
4190

4191
_exit:
15✔
4192

4193
  if (code) {
15!
4194
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4195
  }
4196

4197
  return code;
15✔
4198
}
4199

4200
int32_t msmNormalHandleHbMsg(SStmGrpCtx* pCtx) {
32,324✔
4201
  int32_t code = TSDB_CODE_SUCCESS;
32,324✔
4202
  int32_t lino = 0;
32,324✔
4203
  SStreamHbMsg* pReq = pCtx->pReq;
32,324✔
4204

4205
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
32,324✔
4206
  if (GOT_SNODE(pReq->snodeId)) {
31,999✔
4207
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
1,778!
4208
  }
4209
  
4210
  if (atomic_load_64(&mStreamMgmt.actionQ->qRemainNum) > 0 && 0 == taosWTryLockLatch(&mStreamMgmt.actionQLock)) {
31,999!
4211
    code = msmHandleStreamActions(pCtx);
79✔
4212
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
79✔
4213
    TAOS_CHECK_EXIT(code);
79!
4214
  }
4215

4216
  if (taosArrayGetSize(pReq->pStreamReq) > 0 && mstWaitLock(&mStreamMgmt.actionQLock, false)) {
31,999!
4217
    code = msmHandleStreamRequests(pCtx);
15✔
4218
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
15✔
4219
    TAOS_CHECK_EXIT(code);
15!
4220
  }
4221

4222
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
31,999✔
4223
    TAOS_CHECK_EXIT(msmGrpAddDeployVgTasks(pCtx));
140!
4224
  } else {
4225
    TAOS_CHECK_EXIT(msmUpdateVgroupsUpTs(pCtx));
31,859!
4226
  }
4227

4228
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0 && GOT_SNODE(pReq->snodeId)) {
31,999✔
4229
    TAOS_CHECK_EXIT(msmGrpAddDeploySnodeTasks(pCtx));
133!
4230
  }
4231

4232
  if (taosHashGetSize(pCtx->deployStm) > 0) {
31,999✔
4233
    TAOS_CHECK_EXIT(msmRspAddStreamsDeploy(pCtx));
114!
4234
  }
4235

4236
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
31,999✔
4237
    TAOS_CHECK_EXIT(msmNormalHandleStatusUpdate(pCtx));
1,676!
4238
  }
4239

4240
  if (taosHashGetSize(pCtx->actionStm) > 0) {
31,999✔
4241
    TAOS_CHECK_EXIT(msmHandleHbPostActions(pCtx));
205!
4242
  }
4243

4244
_exit:
31,999✔
4245

4246
  if (code) {
32,324✔
4247
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
325!
4248
  }
4249

4250
  return code;
32,324✔
4251
}
4252

4253
void msmEncodeStreamHbRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SMStreamHbRspMsg* pRsp, SRpcMsg* pMsg) {
32,324✔
4254
  int32_t lino = 0;
32,324✔
4255
  int32_t tlen = 0;
32,324✔
4256
  void   *buf = NULL;
32,324✔
4257

4258
  if (TSDB_CODE_SUCCESS != code) {
32,324✔
4259
    goto _exit;
325✔
4260
  }
4261

4262
  tEncodeSize(tEncodeStreamHbRsp, pRsp, tlen, code);
31,999!
4263
  if (code < 0) {
31,999!
4264
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4265
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4266
  }
4267

4268
  buf = rpcMallocCont(tlen + sizeof(SStreamMsgGrpHeader));
31,999✔
4269
  if (buf == NULL) {
31,999!
4270
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
×
4271
    TAOS_CHECK_EXIT(terrno);    
×
4272
  }
4273

4274
  ((SStreamMsgGrpHeader *)buf)->streamGid = pRsp->streamGId;
31,999✔
4275
  void *abuf = POINTER_SHIFT(buf, sizeof(SStreamMsgGrpHeader));
31,999✔
4276

4277
  SEncoder encoder;
4278
  tEncoderInit(&encoder, abuf, tlen);
31,999✔
4279
  if ((code = tEncodeStreamHbRsp(&encoder, pRsp)) < 0) {
31,999!
4280
    rpcFreeCont(buf);
×
4281
    buf = NULL;
×
4282
    tEncoderClear(&encoder);
×
4283
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4284
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4285
  }
4286
  tEncoderClear(&encoder);
31,999✔
4287

4288
_exit:
32,324✔
4289

4290
  pMsg->code = code;
32,324✔
4291
  pMsg->info = *pRpcInfo;
32,324✔
4292
  if (TSDB_CODE_SUCCESS == code) {
32,324✔
4293
    pMsg->contLen = tlen + sizeof(SStreamMsgGrpHeader);
31,999✔
4294
    pMsg->pCont = buf;
31,999✔
4295
  }
4296
}
32,324✔
4297

4298

4299
int32_t msmHandleStreamHbMsg(SMnode* pMnode, int64_t currTs, SStreamHbMsg* pHb, SRpcMsg *pReq, SRpcMsg* pRspMsg) {
33,120✔
4300
  int32_t code = TSDB_CODE_SUCCESS;
33,120✔
4301

4302
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
33,120✔
4303
    mstWarn("mnode stream is NOT active, ignore stream hb from dnode %d streamGid %d", pHb->dnodeId, pHb->streamGId);
796!
4304
    return code;
796✔
4305
  }
4306

4307
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
32,324✔
4308

4309
  SMStreamHbRspMsg rsp = {0};
32,324✔
4310
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
32,324✔
4311
  SStmGrpCtx* pCtx = &mStreamMgmt.tCtx[tidx].grpCtx[pHb->streamGId];
32,324✔
4312

4313
  pCtx->tidx = tidx;
32,324✔
4314
  pCtx->pMnode = pMnode;
32,324✔
4315
  pCtx->currTs = currTs;
32,324✔
4316
  pCtx->pReq = pHb;
32,324✔
4317
  pCtx->pRsp = &rsp;
32,324✔
4318
  pCtx->deployStm = mStreamMgmt.tCtx[pCtx->tidx].deployStm[pHb->streamGId];
32,324✔
4319
  pCtx->actionStm = mStreamMgmt.tCtx[pCtx->tidx].actionStm[pHb->streamGId];
32,324✔
4320
  
4321
  switch (atomic_load_8(&mStreamMgmt.state)) {
32,324!
4322
    case MND_STM_STATE_WATCH:
×
4323
      code = msmWatchHandleHbMsg(pCtx);
×
4324
      break;
×
4325
    case MND_STM_STATE_NORMAL:
32,324✔
4326
      code = msmNormalHandleHbMsg(pCtx);
32,324✔
4327
      break;
32,324✔
4328
    default:
×
4329
      mstError("Invalid stream state: %d", mStreamMgmt.state);
×
4330
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4331
      break;
×
4332
  }
4333

4334
  msmEncodeStreamHbRsp(code, &pReq->info, &rsp, pRspMsg);
32,324✔
4335

4336
  msmCleanStreamGrpCtx(pHb);
32,324✔
4337
  msmClearStreamToDeployMaps(pHb);
32,324✔
4338

4339
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
32,324✔
4340

4341
  tFreeSMStreamHbRspMsg(&rsp);
32,324✔
4342

4343
  return code;
32,324✔
4344
}
4345

4346
void msmHandleBecomeLeader(SMnode *pMnode) {
2,259✔
4347
  if (tsDisableStream) {
2,259!
4348
    return;
×
4349
  }
4350

4351
  mstInfo("start to process mnode become leader");
2,259!
4352
  
4353
  streamAddVnodeLeader(MNODE_HANDLE);
2,259✔
4354
  
4355
  taosWLockLatch(&mStreamMgmt.runtimeLock);
2,259✔
4356
  msmDestroyRuntimeInfo(pMnode);
2,259✔
4357
  msmInitRuntimeInfo(pMnode);
2,259✔
4358
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
2,259✔
4359

4360
  atomic_store_8(&mStreamMgmt.active, 1);
2,259✔
4361
}
4362

4363
void msmHandleBecomeNotLeader(SMnode *pMnode) {  
454✔
4364
  if (tsDisableStream) {
454!
4365
    return;
×
4366
  }
4367

4368
  mstInfo("start to process mnode become not leader");
454!
4369

4370
  streamRemoveVnodeLeader(MNODE_HANDLE);
454✔
4371

4372
  if (atomic_val_compare_exchange_8(&mStreamMgmt.active, 1, 0)) {
454✔
4373
    taosWLockLatch(&mStreamMgmt.runtimeLock);
3✔
4374
    msmDestroyRuntimeInfo(pMnode);
3✔
4375
    mStreamMgmt.stat.inactiveTimes++;
3✔
4376
    taosWUnLockLatch(&mStreamMgmt.runtimeLock);
3✔
4377
  }
4378
}
4379

4380

4381
static void msmRedeployStream(int64_t streamId, SStmStatus* pStatus) {
×
4382
  if (1 == atomic_val_compare_exchange_8(&pStatus->stopped, 1, 0)) {
×
4383
    mstsInfo("try to reset and redeploy stream, deployTimes:%" PRId64, pStatus->deployTimes);
×
4384
    mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4385
  } else {
4386
    mstsWarn("stream stopped %d already changed", atomic_load_8(&pStatus->stopped));
×
4387
  }
4388
}
×
4389

4390
static bool msmCheckStreamAssign(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
16✔
4391
  int32_t code = TSDB_CODE_SUCCESS;
16✔
4392
  int32_t lino = 0;
16✔
4393
  SStreamObj* pStream = pObj;
16✔
4394
  SSnodeObj* pSnode = p1;
16✔
4395
  SArray** ppRes = p2;
16✔
4396

4397
  if (pStream->mainSnodeId == pSnode->id) {
16✔
4398
    if (NULL == *ppRes) {
6✔
4399
      int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
3✔
4400
      *ppRes = taosArrayInit(streamNum, POINTER_BYTES);
3✔
4401
      TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
3!
4402
    }
4403

4404
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &pStream), code, lino, _exit, terrno);
12!
4405
  }
4406

4407
  return true;
16✔
4408

4409
_exit:
×
4410

4411
  if (code) {
×
4412
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4413
  }  
4414

4415
  *(int32_t*)p3 = code;
×
4416

4417
  return false;
×
4418
}
4419

4420

4421
int32_t msmCheckSnodeReassign(SMnode *pMnode, SSnodeObj* pSnode, SArray** ppRes) {
144✔
4422
  int32_t code = TSDB_CODE_SUCCESS;
144✔
4423
  int32_t lino = 0;
144✔
4424
  
4425
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckStreamAssign, pSnode, ppRes, &code);
144✔
4426
  TAOS_CHECK_EXIT(code);
144!
4427

4428
  int32_t streamNum = taosArrayGetSize(*ppRes);
144✔
4429
  if (streamNum > 0 && 0 == pSnode->replicaId) {
144!
4430
    mstError("snode %d has no replica while %d streams assigned", pSnode->id, streamNum);
×
4431
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_SNODE_IN_USE);
×
4432
  }
4433

4434
  //STREAMTODO CHECK REPLICA UPDATED OR NOT
4435

4436
_exit:
144✔
4437

4438
  if (code) {
144!
4439
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4440
  }  
4441

4442
  return code;
144✔
4443
}
4444

4445
static bool msmCheckLoopStreamSdb(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
748✔
4446
  SStreamObj* pStream = pObj;
748✔
4447
  int64_t streamId = pStream->pCreate->streamId;
748✔
4448
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
748✔
4449
  SStmCheckStatusCtx* pCtx = (SStmCheckStatusCtx*)p1;
748✔
4450
  int8_t userDropped = atomic_load_8(&pStream->userDropped), userStopped = atomic_load_8(&pStream->userStopped);
748✔
4451
  
4452
  if ((userDropped || userStopped) && (NULL == pStatus)) {
748!
4453
    mstsDebug("stream userDropped %d userStopped %d and not in streamMap, ignore it", userDropped, userStopped);
×
4454
    return true;
×
4455
  }
4456
  
4457
  if (pStatus && !MST_STM_PASS_ISOLATION(pStream, pStatus)) {
748✔
4458
    mstsDebug("stream not pass isolation time, updateTime:%" PRId64 ", lastActionTs:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
623✔
4459
        pStream->updateTime, pStatus->lastActionTs, mStreamMgmt.hCtx.currentTs);
4460
    return true;
623✔
4461
  }
4462

4463
  if (NULL == pStatus && !MST_STM_STATIC_PASS_ISOLATION(pStream)) {
125!
4464
    mstsDebug("stream not pass static isolation time, updateTime:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
35!
4465
        pStream->updateTime, mStreamMgmt.hCtx.currentTs);
4466
    return true;
35✔
4467
  }  
4468

4469
  if (pStatus) {
90!
4470
    if (userDropped || userStopped || MST_IS_USER_STOPPED(atomic_load_8(&pStatus->stopped))) {
90!
4471
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
×
4472
    }
4473

4474
    return true;
90✔
4475
  }
4476

4477
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, false, STREAM_ACT_DEPLOY);
×
4478

4479
  return true;
×
4480
}
4481

4482
void msmCheckLoopStreamMap(SMnode *pMnode) {
94✔
4483
  SStmStatus* pStatus = NULL;
94✔
4484
  void* pIter = NULL;
94✔
4485
  int8_t stopped = 0;
94✔
4486
  int64_t streamId = 0;
94✔
4487
  
4488
  while (true) {
4489
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
529✔
4490
    if (NULL == pIter) {
529✔
4491
      break;
94✔
4492
    }
4493

4494
    pStatus = (SStmStatus*)pIter;
435✔
4495

4496
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
435✔
4497
    stopped = atomic_load_8(&pStatus->stopped);
435✔
4498
    if (MST_IS_USER_STOPPED(stopped)) {
435!
4499
      mstsDebug("stream already stopped by user, deployTimes:%" PRId64, pStatus->deployTimes);
1!
4500
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
1✔
4501
      continue;
1✔
4502
    }
4503

4504
    if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, pStatus->streamName)) {
434!
4505
      mstsDebug("stream already not exists, deployTimes:%" PRId64, pStatus->deployTimes);
×
4506
      (void)msmRemoveStreamFromMaps(pMnode, *(int64_t*)taosHashGetKey(pIter, NULL));
×
4507
      continue;
×
4508
    }
4509

4510
    if (MST_IS_ERROR_STOPPED(stopped)) {
434✔
4511
      if (mStreamMgmt.hCtx.currentTs < pStatus->fatalRetryTs) {
15!
4512
        mstsDebug("stream already stopped by error %s, retried times:%" PRId64 ", next time not reached, currTs:%" PRId64 ", nextRetryTs:%" PRId64,
×
4513
            tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes, mStreamMgmt.hCtx.currentTs, pStatus->fatalRetryTs);
4514
            
4515
        MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, mStreamMgmt.hCtx.currentTs);
×
4516
        continue;
×
4517
      }
4518

4519
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
15✔
4520
      continue;
15✔
4521
    }
4522

4523
    if (MST_IS_GRANT_STOPPED(stopped) && TSDB_CODE_SUCCESS == grantCheckExpire(TSDB_GRANT_STREAMS)) {
419!
4524
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4525
      continue;
×
4526
    }
4527
  }
4528
}
94✔
4529

4530
void msmCheckStreamsStatus(SMnode *pMnode) {
494✔
4531
  SStmCheckStatusCtx ctx = {0};
494✔
4532

4533
  mstDebug("start to check streams status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
494✔
4534
  
4535
  if (MST_READY_FOR_SDB_LOOP()) {
494!
4536
    mstDebug("ready to check sdb loop, lastLoopSdbTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SDB].ts);
139✔
4537
    sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckLoopStreamSdb, &ctx, NULL, NULL);
139✔
4538
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_SDB, mStreamMgmt.hCtx.currentTs);
139✔
4539
  }
4540

4541
  if (MST_READY_FOR_MAP_LOOP()) {
494!
4542
    mstDebug("ready to check map loop, lastLoopMapTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_MAP].ts);
94✔
4543
    msmCheckLoopStreamMap(pMnode);
94✔
4544
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
94✔
4545
  }
4546
}
494✔
4547

4548
void msmCheckTaskListStatus(int64_t streamId, SStmTaskStatus** pList, int32_t taskNum) {
5,919✔
4549
  for (int32_t i = 0; i < taskNum; ++i) {
11,828✔
4550
    SStmTaskStatus* pTask = *(pList + i);
5,919✔
4551

4552
    if (atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped)) {
5,919✔
4553
      continue;
5,881✔
4554
    }
4555
    
4556
    if (!MST_PASS_ISOLATION(pTask->lastUpTs, 1)) {
5,789✔
4557
      continue;
5,751✔
4558
    }
4559

4560
    int64_t noUpTs = mStreamMgmt.hCtx.currentTs - pTask->lastUpTs;
38✔
4561
    if (STREAM_RUNNER_TASK == pTask->type || STREAM_TRIGGER_TASK == pTask->type) {
38✔
4562
      mstsWarn("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
10!
4563
          gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4564
          
4565
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_TASK_LOST, mStreamMgmt.hCtx.currentTs);
10✔
4566
      break;
10✔
4567
    }
4568

4569
    mstsInfo("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
28!
4570
        gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4571

4572
    int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
28✔
4573
    mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
28!
4574

4575
    SStmTaskAction task = {0};
28✔
4576
    task.streamId = streamId;
28✔
4577
    task.id = pTask->id;
28✔
4578
    task.flag = pTask->flags;
28✔
4579
    task.type = pTask->type;
28✔
4580
    
4581
    mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
28✔
4582
  }
4583
}
5,919✔
4584

4585
void msmCheckVgroupStreamStatus(SHashObj* pStreams) {
618✔
4586
  void* pIter = NULL;
618✔
4587
  SStmVgStreamStatus* pVg = NULL;
618✔
4588
  int64_t streamId = 0;
618✔
4589
  
4590
  while (true) {
1,535✔
4591
    pIter = taosHashIterate(pStreams, pIter);
2,153✔
4592
    if (NULL == pIter) {
2,153✔
4593
      break;
618✔
4594
    }
4595

4596
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
1,535✔
4597
    pVg = (SStmVgStreamStatus*)pIter;
1,535✔
4598

4599
    int32_t taskNum = taosArrayGetSize(pVg->trigReaders);
1,535✔
4600
    if (taskNum > 0) {
1,535✔
4601
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->trigReaders, 0), taskNum);
1,284✔
4602
    }
4603

4604
    taskNum = taosArrayGetSize(pVg->calcReaders);
1,535✔
4605
    if (taskNum > 0) {
1,535✔
4606
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->calcReaders, 0), taskNum);
450✔
4607
    }
4608
  }
4609
}
618✔
4610

4611
void msmHandleVgroupLost(SMnode *pMnode, int32_t vgId, SStmVgroupStatus* pVg) {
2✔
4612
  int64_t streamId = 0;
2✔
4613
  void* pIter = NULL;
2✔
4614
  SStmVgStreamStatus* pStream = NULL;
2✔
4615

4616
  if (!MST_PASS_ISOLATION(pVg->lastUpTs, 5)) {
2!
4617
    mstDebug("vgroup %d lost and still in watch time, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
2!
4618
    return;
2✔
4619
  }
4620

4621
  
4622
  while (true) {
4623
    pIter = taosHashIterate(pVg->streamTasks, pIter);
×
4624
    if (NULL == pIter) {
×
4625
      break;
×
4626
    }
4627

4628
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4629
    
4630
    msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_VGROUP_LOST, mStreamMgmt.hCtx.currentTs);
×
4631
  }
4632

4633
  taosHashClear(pVg->streamTasks);
×
4634
}
4635

4636

4637
void msmCheckVgroupStatus(SMnode *pMnode) {
494✔
4638
  void* pIter = NULL;
494✔
4639
  
4640
  while (true) {
1,853✔
4641
    pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter);
2,347✔
4642
    if (NULL == pIter) {
2,347✔
4643
      break;
494✔
4644
    }
4645

4646
    int32_t vgId = *(int32_t*)taosHashGetKey(pIter, NULL);
1,853✔
4647
    if ((vgId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
1,853✔
4648
      continue;
1,233✔
4649
    }
4650
    
4651
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
620✔
4652

4653
    if (MST_PASS_ISOLATION(pVg->lastUpTs, 1)) {
620✔
4654
      mstWarn("vgroup %d lost, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
2!
4655
      
4656
      msmHandleVgroupLost(pMnode, vgId, pVg);
2✔
4657
      continue;
2✔
4658
    }
4659

4660
    mstDebug("vgroup %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, vgId, mStreamMgmt.hCtx.currentTs, pVg->lastUpTs);
618✔
4661

4662
    msmCheckVgroupStreamStatus(pVg->streamTasks);
618✔
4663
  }
4664
}
494✔
4665

4666
void msmHandleRunnerRedeploy(int64_t streamId, SStmSnodeStreamStatus* pStream, int32_t* deployNum, int32_t* deployId) {
6✔
4667
  *deployNum = 0;
6✔
4668
  
4669
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
19✔
4670
    if (pStream->runners[i]) {
16✔
4671
      int32_t taskNum = taosArrayGetSize(pStream->runners[i]);
6✔
4672
      for (int32_t t = 0; t < taskNum; ++t) {
9✔
4673
        SStmTaskStatus* pTask = taosArrayGetP(pStream->runners[i], t);
6✔
4674
        int8_t stopped = atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped);
6✔
4675
        if (stopped) {
6✔
4676
          mstsDebug("stream already stopped %d, ignore it", stopped);
3!
4677
          *deployNum = 0;
3✔
4678
          return;
3✔
4679
        }
4680

4681
        int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
3✔
4682
        mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
3!
4683
      }
4684
      
4685
      deployId[*deployNum] = i;
3✔
4686
      (*deployNum)++;
3✔
4687
    }
4688
  }
4689
}
4690

4691
void msmHandleSnodeLost(SMnode *pMnode, SStmSnodeStatus* pSnode) {
33✔
4692
  pSnode->runnerThreadNum = -1;
33✔
4693

4694
  (void)msmSTAddSnodesToMap(pMnode);
33✔
4695

4696
  int64_t streamId = 0;
33✔
4697
  void* pIter = NULL;
33✔
4698
  SStmSnodeStreamStatus* pStream = NULL;
33✔
4699
  int32_t deployNum = 0;
33✔
4700
  SStmTaskAction task = {0};
33✔
4701
  
4702
  while (true) {
4703
    pIter = taosHashIterate(pSnode->streamTasks, pIter);
47✔
4704
    if (NULL == pIter) {
47✔
4705
      break;
33✔
4706
    }
4707

4708
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
14✔
4709
    
4710
    task.streamId = streamId;
14✔
4711
    
4712
    pStream = (SStmSnodeStreamStatus*)pIter;
14✔
4713
    if (pStream->trigger) {
14✔
4714
      int8_t stopped = atomic_load_8(&((SStmStatus*)pStream->trigger->pStream)->stopped);
8✔
4715
      if (stopped) {
8✔
4716
        mstsDebug("stream already stopped %d, ignore it", stopped);
2!
4717
        continue;
2✔
4718
      }
4719

4720
      mstsInfo("snode lost with trigger task %" PRIx64 ", will try to restart current stream", pStream->trigger->id.taskId);
6!
4721
      
4722
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_SNODE_LOST, mStreamMgmt.hCtx.currentTs);
6✔
4723
    } else {
4724
      msmHandleRunnerRedeploy(streamId, pStream, &task.deployNum, task.deployId);
6✔
4725
      
4726
      if (task.deployNum > 0) {
6✔
4727
        //task.triggerStatus = pStream->trigger;
4728
        task.multiRunner = true;
3✔
4729
        task.type = STREAM_RUNNER_TASK;
3✔
4730
        
4731
        mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
3✔
4732
        
4733
        mstsInfo("runner tasks %d redeploys added to actionQ", task.deployNum);
3!
4734
      }
4735
    }
4736
  }
4737

4738
  taosHashClear(pSnode->streamTasks);
33✔
4739
}
33✔
4740

4741

4742
void msmCheckSnodeStreamStatus(SHashObj* pStreams) {
245✔
4743
  void* pIter = NULL;
245✔
4744
  SStmSnodeStreamStatus* pSnode = NULL;
245✔
4745
  int64_t streamId = 0;
245✔
4746
  
4747
  while (true) {
4748
    pIter = taosHashIterate(pStreams, pIter);
1,439✔
4749
    if (NULL == pIter) {
1,439✔
4750
      break;
245✔
4751
    }
4752

4753
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
1,194✔
4754
    pSnode = (SStmSnodeStreamStatus*)pIter;
1,194✔
4755

4756
    if (NULL != pSnode->trigger) {
1,194✔
4757
      msmCheckTaskListStatus(streamId, &pSnode->trigger, 1);
1,047✔
4758
    }
4759

4760
    for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
4,776✔
4761
      int32_t taskNum = taosArrayGetSize(pSnode->runners[i]);
3,582✔
4762
      if (taskNum > 0) {
3,582✔
4763
        msmCheckTaskListStatus(streamId, taosArrayGet(pSnode->runners[i], 0), taskNum);
3,138✔
4764
      }
4765
    }
4766
  }
4767
}
245✔
4768

4769

4770
void msmCheckSnodeStatus(SMnode *pMnode) {
494✔
4771
  void* pIter = NULL;
494✔
4772
  
4773
  while (true) {
812✔
4774
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
1,306✔
4775
    if (NULL == pIter) {
1,306✔
4776
      break;
494✔
4777
    }
4778

4779
    int32_t snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
812✔
4780
    if ((snodeId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
812✔
4781
      continue;
527✔
4782
    }
4783

4784
    mstDebug("start to check snode %d status, currTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs);
285✔
4785
    
4786
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
285✔
4787
    if (NULL == pSnode->streamTasks) {
285✔
4788
      mstDebug("ignore snode %d health check since empty tasks", snodeId);
10!
4789
      continue;
10✔
4790
    }
4791
    
4792
    if (MST_PASS_ISOLATION(pSnode->lastUpTs, 1)) {
275✔
4793
      mstInfo("snode %d lost, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
30!
4794
          snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
4795
      
4796
      msmHandleSnodeLost(pMnode, pSnode);
30✔
4797
      continue;
30✔
4798
    }
4799
    
4800
    mstDebug("snode %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs, pSnode->lastUpTs);
245✔
4801

4802
    msmCheckSnodeStreamStatus(pSnode->streamTasks);
245✔
4803
  }
4804
}
494✔
4805

4806

4807
void msmCheckTasksStatus(SMnode *pMnode) {
494✔
4808
  mstDebug("start to check tasks status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
494✔
4809

4810
  msmCheckVgroupStatus(pMnode);
494✔
4811
  msmCheckSnodeStatus(pMnode);
494✔
4812
}
494✔
4813

4814
void msmCheckSnodesState(SMnode *pMnode) {
494✔
4815
  if (!MST_READY_FOR_SNODE_LOOP()) {
494!
4816
    return;
407✔
4817
  }
4818

4819
  mstDebug("ready to check snode loop, lastTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SNODE].ts);
87✔
4820

4821
  void* pIter = NULL;
87✔
4822
  int32_t snodeId = 0;
87✔
4823
  while (true) {
106✔
4824
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
193✔
4825
    if (NULL == pIter) {
193✔
4826
      break;
87✔
4827
    }
4828

4829
    snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
106✔
4830
    if (sdbCheckExists(pMnode->pSdb, SDB_SNODE, &snodeId)) {
106✔
4831
      continue;
103✔
4832
    }
4833

4834
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
3✔
4835
    if (NULL == pSnode->streamTasks) {
3!
4836
      mstDebug("snode %d already cleanup, try to rm it", snodeId);
×
4837
      taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
×
4838
      continue;
×
4839
    }
4840
    
4841
    mstWarn("snode %d lost while streams remain, will redeploy all and rm it, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
3!
4842
        snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
4843
    
4844
    msmHandleSnodeLost(pMnode, pSnode);
3✔
4845
  }
4846

4847
  MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
87✔
4848
}
4849

4850
void msmHealthCheck(SMnode *pMnode) {
18,735✔
4851
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
18,735✔
4852
  if (0 == active || MND_STM_STATE_NORMAL != state) {
18,735!
4853
    mstTrace("ignore health check since active:%d state:%d", active, state);
×
4854
    return;
×
4855
  }
4856

4857
  if (sdbGetSize(pMnode->pSdb, SDB_STREAM) <= 0) {
18,735✔
4858
    mstTrace("ignore health check since no stream now");
18,241✔
4859
    return;
18,241✔
4860
  }
4861
  
4862
  mStreamMgmt.hCtx.slotIdx = (mStreamMgmt.hCtx.slotIdx + 1) % MND_STREAM_ISOLATION_PERIOD_NUM;
494✔
4863
  mStreamMgmt.hCtx.currentTs = taosGetTimestampMs();
494✔
4864

4865
  mstDebug("start health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
494✔
4866

4867
  mstWaitLock(&mStreamMgmt.runtimeLock, false);
494✔
4868
  
4869
  msmCheckStreamsStatus(pMnode);
494✔
4870
  msmCheckTasksStatus(pMnode);
494✔
4871
  msmCheckSnodesState(pMnode);
494✔
4872

4873
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
494✔
4874

4875
  mstDebug("end health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
494✔
4876
}
4877

4878
static bool msmUpdateProfileStreams(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
4879
  SStreamObj *pStream = pObj;
×
4880
  if (atomic_load_8(&pStream->userDropped) || atomic_load_8(&pStream->userStopped)) {
×
4881
    return true;
×
4882
  }
4883
  
4884
  pStream->updateTime = *(int64_t*)p1;
×
4885
  
4886
  (*(int32_t*)p2)++;
×
4887
  
4888
  return true;
×
4889
}
4890

4891
int32_t msmGetTriggerTaskAddr(SMnode *pMnode, int64_t streamId, SStreamTaskAddr* pAddr) {
×
4892
  int32_t code = 0;
×
4893
  int8_t  stopped = 0;
×
4894
  
4895
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
4896
  
4897
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
4898
  if (NULL == pStatus) {
×
4899
    mstsError("stream not exists in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4900
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4901
    goto _exit;
×
4902
  }
4903

4904
  stopped = atomic_load_8(&pStatus->stopped);
×
4905
  if (stopped) {
×
4906
    mstsError("stream already stopped, stopped:%d", stopped);
×
4907
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4908
    goto _exit;
×
4909
  }
4910

4911
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
×
4912
    pAddr->taskId = pStatus->triggerTask->id.taskId;
×
4913
    pAddr->nodeId = pStatus->triggerTask->id.nodeId;
×
4914
    pAddr->epset = mndGetDnodeEpsetById(pMnode, pAddr->nodeId);
×
4915
    mstsDebug("stream trigger task %" PRIx64 " got with nodeId %d", pAddr->taskId, pAddr->nodeId);
×
4916
    goto _exit;
×
4917
  }
4918

4919
  mstsError("trigger task %p not running, status:%s", pStatus->triggerTask, pStatus->triggerTask ? gStreamStatusStr[pStatus->triggerTask->status] : "unknown");
×
4920
  code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4921

4922
_exit:
×
4923
  
4924
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
4925

4926
  return code;
×
4927
}
4928

4929
int32_t msmInitRuntimeInfo(SMnode *pMnode) {
2,259✔
4930
  int32_t code = TSDB_CODE_SUCCESS;
2,259✔
4931
  int32_t lino = 0;
2,259✔
4932
  int32_t vnodeNum = sdbGetSize(pMnode->pSdb, SDB_VGROUP);
2,259✔
4933
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
2,259✔
4934
  int32_t dnodeNum = sdbGetSize(pMnode->pSdb, SDB_DNODE);
2,259✔
4935

4936
  MND_STREAM_SET_LAST_TS(STM_EVENT_ACTIVE_BEGIN, taosGetTimestampMs());
4,497✔
4937

4938
  mStreamMgmt.stat.activeTimes++;
2,259✔
4939
  mStreamMgmt.threadNum = tsNumOfMnodeStreamMgmtThreads;
2,259✔
4940
  mStreamMgmt.tCtx = taosMemoryCalloc(mStreamMgmt.threadNum, sizeof(SStmThreadCtx));
2,259!
4941
  if (NULL == mStreamMgmt.tCtx) {
2,259!
4942
    code = terrno;
×
4943
    mstError("failed to initialize the stream runtime tCtx, threadNum:%d, error:%s", mStreamMgmt.threadNum, tstrerror(code));
×
4944
    goto _exit;
×
4945
  }
4946

4947
  mStreamMgmt.actionQ = taosMemoryCalloc(1, sizeof(SStmActionQ));
2,259!
4948
  if (mStreamMgmt.actionQ == NULL) {
2,259!
4949
    code = terrno;
×
4950
    mError("failed to initialize the stream runtime actionQ, error:%s", tstrerror(code));
×
4951
    goto _exit;
×
4952
  }
4953
  
4954
  mStreamMgmt.actionQ->head = taosMemoryCalloc(1, sizeof(SStmQNode));
2,259!
4955
  TSDB_CHECK_NULL(mStreamMgmt.actionQ->head, code, lino, _exit, terrno);
2,259!
4956
  
4957
  mStreamMgmt.actionQ->tail = mStreamMgmt.actionQ->head;
2,259✔
4958
  
4959
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
13,530✔
4960
    SStmThreadCtx* pCtx = mStreamMgmt.tCtx + i;
11,271✔
4961

4962
    for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
67,626✔
4963
      pCtx->deployStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
56,355✔
4964
      if (pCtx->deployStm[m] == NULL) {
56,355!
4965
        code = terrno;
×
4966
        mError("failed to initialize the stream runtime deployStm[%d][%d], error:%s", i, m, tstrerror(code));
×
4967
        goto _exit;
×
4968
      }
4969
      taosHashSetFreeFp(pCtx->deployStm[m], tDeepFreeSStmStreamDeploy);
56,355✔
4970
      
4971
      pCtx->actionStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
56,355✔
4972
      if (pCtx->actionStm[m] == NULL) {
56,355!
4973
        code = terrno;
×
4974
        mError("failed to initialize the stream runtime actionStm[%d][%d], error:%s", i, m, tstrerror(code));
×
4975
        goto _exit;
×
4976
      }
4977
      taosHashSetFreeFp(pCtx->actionStm[m], mstDestroySStmAction);
56,355✔
4978
    }
4979
  }
4980
  
4981
  mStreamMgmt.streamMap = taosHashInit(MND_STREAM_DEFAULT_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,259✔
4982
  if (mStreamMgmt.streamMap == NULL) {
2,259!
4983
    code = terrno;
×
4984
    mError("failed to initialize the stream runtime streamMap, error:%s", tstrerror(code));
×
4985
    goto _exit;
×
4986
  }
4987
  taosHashSetFreeFp(mStreamMgmt.streamMap, mstDestroySStmStatus);
2,259✔
4988
  
4989
  mStreamMgmt.taskMap = taosHashInit(MND_STREAM_DEFAULT_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,259✔
4990
  if (mStreamMgmt.taskMap == NULL) {
2,259!
4991
    code = terrno;
×
4992
    mError("failed to initialize the stream runtime taskMap, error:%s", tstrerror(code));
×
4993
    goto _exit;
×
4994
  }
4995
  
4996
  mStreamMgmt.vgroupMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,259✔
4997
  if (mStreamMgmt.vgroupMap == NULL) {
2,259!
4998
    code = terrno;
×
4999
    mError("failed to initialize the stream runtime vgroupMap, error:%s", tstrerror(code));
×
5000
    goto _exit;
×
5001
  }
5002
  taosHashSetFreeFp(mStreamMgmt.vgroupMap, mstDestroySStmVgroupStatus);
2,259✔
5003

5004
  mStreamMgmt.snodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,259✔
5005
  if (mStreamMgmt.snodeMap == NULL) {
2,259!
5006
    code = terrno;
×
5007
    mError("failed to initialize the stream runtime snodeMap, error:%s", tstrerror(code));
×
5008
    goto _exit;
×
5009
  }
5010
  taosHashSetFreeFp(mStreamMgmt.snodeMap, mstDestroySStmSnodeStatus);
2,259✔
5011
  
5012
  mStreamMgmt.dnodeMap = taosHashInit(dnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,259✔
5013
  if (mStreamMgmt.dnodeMap == NULL) {
2,259!
5014
    code = terrno;
×
5015
    mError("failed to initialize the stream runtime dnodeMap, error:%s", tstrerror(code));
×
5016
    goto _exit;
×
5017
  }
5018

5019
  mStreamMgmt.toDeployVgMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,259✔
5020
  if (mStreamMgmt.toDeployVgMap == NULL) {
2,259!
5021
    code = terrno;
×
5022
    mError("failed to initialize the stream runtime toDeployVgMap, error:%s", tstrerror(code));
×
5023
    goto _exit;
×
5024
  }
5025
  taosHashSetFreeFp(mStreamMgmt.toDeployVgMap, mstDestroySStmVgTasksToDeploy);
2,259✔
5026
  
5027
  mStreamMgmt.toDeploySnodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,259✔
5028
  if (mStreamMgmt.toDeploySnodeMap == NULL) {
2,259!
5029
    code = terrno;
×
5030
    mError("failed to initialize the stream runtime toDeploySnodeMap, error:%s", tstrerror(code));
×
5031
    goto _exit;
×
5032
  }
5033
  taosHashSetFreeFp(mStreamMgmt.toDeploySnodeMap, mstDestroySStmSnodeTasksDeploy);
2,259✔
5034

5035
  mStreamMgmt.toUpdateScanMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
2,259✔
5036
  if (mStreamMgmt.toUpdateScanMap == NULL) {
2,259!
5037
    code = terrno;
×
5038
    mError("failed to initialize the stream runtime toUpdateScanMap, error:%s", tstrerror(code));
×
5039
    goto _exit;
×
5040
  }
5041
  taosHashSetFreeFp(mStreamMgmt.toUpdateScanMap, mstDestroyScanAddrList);
2,259✔
5042

5043
  TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
2,259!
5044
  TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pMnode));
2,259!
5045

5046
  mStreamMgmt.lastTaskId = 1;
2,259✔
5047

5048
  int32_t activeStreamNum = 0;
2,259✔
5049
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmUpdateProfileStreams, &MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN), &activeStreamNum, NULL);
2,259✔
5050

5051
  if (activeStreamNum > 0) {
2,259!
5052
    msmSetInitRuntimeState(MND_STM_STATE_WATCH);
×
5053
  } else {
5054
    msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
2,259✔
5055
  }
5056

5057
_exit:
2,259✔
5058

5059
  if (code) {
2,259!
5060
    msmDestroyRuntimeInfo(pMnode);
×
5061
    mstError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
5062
  } else {
5063
    mstInfo("mnode stream runtime init done");
2,259!
5064
  }
5065

5066
  return code;
2,259✔
5067
}
5068

5069

5070

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc