• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4548

22 Jul 2025 02:37AM UTC coverage: 54.273% (-3.0%) from 57.287%
#4548

push

travis-ci

GitHub
Merge pull request #32061 from taosdata/new_testcases

132738 of 315239 branches covered (42.11%)

Branch coverage included in aggregate %.

201371 of 300373 relevant lines covered (67.04%)

3475977.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.79
/source/dnode/mnode/impl/src/mndStreamMgmt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndStream.h"
17
#include "mndDb.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndStb.h"
21
#include "mndTrans.h"
22
#include "osMemory.h"
23
#include "parser.h"
24
#include "taoserror.h"
25
#include "tmisce.h"
26
#include "tname.h"
27
#include "mndDnode.h"
28
#include "mndVgroup.h"
29
#include "mndSnode.h"
30
#include "mndMnode.h"
31

32
void msmDestroyActionQ() {
×
33
  SStmQNode* pQNode = NULL;
×
34

35
  if (NULL == mStreamMgmt.actionQ) {
×
36
    return;
×
37
  }
38

39
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
×
40
  }
41

42
  taosMemoryFree(mStreamMgmt.actionQ->head);
×
43
  taosMemoryFree(mStreamMgmt.actionQ);
×
44
}
45

46
void msmDestroySStmThreadCtx(SStmThreadCtx* pCtx) {
×
47
  for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
×
48
    taosHashCleanup(pCtx->deployStm[m]);
×
49
    taosHashCleanup(pCtx->actionStm[m]);
×
50
  }
51
}
×
52

53
void msmDestroyThreadCtxs() {
×
54
  if (NULL == mStreamMgmt.tCtx) {
×
55
    return;
×
56
  }
57
  
58
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
×
59
    msmDestroySStmThreadCtx(mStreamMgmt.tCtx + i);
×
60
  }
61
  taosMemoryFreeClear(mStreamMgmt.tCtx);
×
62
}
63

64

65
void msmDestroyRuntimeInfo(SMnode *pMnode) {
×
66
  msmDestroyActionQ();
×
67
  msmDestroyThreadCtxs();
×
68

69
  taosHashCleanup(mStreamMgmt.toUpdateScanMap);
×
70
  taosHashCleanup(mStreamMgmt.toDeployVgMap);
×
71
  taosHashCleanup(mStreamMgmt.toDeploySnodeMap);
×
72

73
  taosHashCleanup(mStreamMgmt.dnodeMap);
×
74
  taosHashCleanup(mStreamMgmt.snodeMap);
×
75
  taosHashCleanup(mStreamMgmt.vgroupMap);
×
76
  taosHashCleanup(mStreamMgmt.taskMap);
×
77
  taosHashCleanup(mStreamMgmt.streamMap);
×
78

79
  mStreamMgmt.stat.inactiveTimes++;
×
80
  // STREAMTODO
81

82
  memset(&mStreamMgmt, 0, sizeof(mStreamMgmt));
×
83
}
×
84

85

86
int32_t msmStopStreamByError(int64_t streamId, SStmStatus* pStatus, int32_t errCode, int64_t currTs) {
×
87
  int32_t code = TSDB_CODE_SUCCESS;
×
88
  int32_t lino = 0;
×
89
  SStmStatus* pStream = NULL;
×
90

91
  mstsInfo("try to stop stream for error: %s", tstrerror(errCode));
×
92

93
  if (NULL == pStatus) {
×
94
    pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
95
    if (NULL == pStream) {
×
96
      mstsInfo("stream already not in streamMap, error:%s", tstrerror(terrno));
×
97
      goto _exit;
×
98
    }
99

100
    pStatus = pStream;
×
101
  }
102

103
  int8_t stopped = atomic_load_8(&pStatus->stopped);
×
104
  if (stopped) {
×
105
    mstsDebug("stream already stopped %d, ignore stop", stopped);
×
106
    goto _exit;
×
107
  }
108

109
  if (pStatus->triggerTask && pStatus->triggerTask->runningStartTs && (currTs - pStatus->triggerTask->runningStartTs) > 2 * MST_ISOLATION_DURATION) {
×
110
    pStatus->fatalRetryTimes = 0;
×
111
    mstsDebug("reset stream retryTimes, running duation:%" PRId64 "ms", currTs - pStatus->triggerTask->runningStartTs);
×
112
  }
113

114
  pStatus->fatalRetryTimes++;
×
115
  pStatus->fatalError = errCode;
×
116
  pStatus->fatalRetryDuration = (pStatus->fatalRetryTimes > 10) ? MST_MAX_RETRY_DURATION : MST_ISOLATION_DURATION;
×
117
  pStatus->fatalRetryTs = currTs + pStatus->fatalRetryDuration;
×
118

119
  pStatus->stat.lastError = errCode;
×
120
    
121
  if (0 == atomic_val_compare_exchange_8(&pStatus->stopped, 0, 1)) {
×
122
    MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, currTs);
×
123
  }
124

125
_exit:
×
126

127
  taosHashRelease(mStreamMgmt.streamMap, pStream);
×
128

129
  if (code) {
×
130
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
131
  }
132

133
  return code;
×
134
}
135

136

137
static void msmSetInitRuntimeState(int8_t state) {
1,813✔
138
  switch (state) {
1,813!
139
    case MND_STM_STATE_WATCH:
×
140
      mStreamMgmt.watch.ending = 0;
×
141
      mStreamMgmt.watch.taskRemains = 0;
×
142
      mStreamMgmt.watch.processing = 0;
×
143
      mstInfo("switch to WATCH state");
×
144
      break;
×
145
    case MND_STM_STATE_NORMAL:
1,813✔
146
      MND_STREAM_SET_LAST_TS(STM_EVENT_NORMAL_BEGIN, taosGetTimestampMs());
3,605✔
147
      mstInfo("switch to NORMAL state");
1,813!
148
      break;
1,813✔
149
    default:
×
150
      return;
×
151
  }
152
  
153
  atomic_store_8(&mStreamMgmt.state, state);
1,813✔
154
}
155

156
void msmSTDeleteSnodeFromMap(int32_t snodeId) {
×
157
  int32_t code = taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
×
158
  if (code) {
×
159
    mstWarn("remove snode %d from snodeMap failed, error:%s", snodeId, tstrerror(code));
×
160
  } else {
161
    mstInfo("snode %d removed from snodeMap", snodeId);
×
162
  }
163
}
×
164

165
static int32_t msmSTAddSnodesToMap(SMnode* pMnode) {
1,816✔
166
  int32_t code = TSDB_CODE_SUCCESS;
1,816✔
167
  int32_t lino = 0;
1,816✔
168
  SStmSnodeStatus tasks = {0};
1,816✔
169
  SSnodeObj *pSnode = NULL;
1,816✔
170
  void *pIter = NULL;
1,816✔
171
  while (1) {
172
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pSnode);
1,825✔
173
    if (pIter == NULL) {
1,825✔
174
      break;
1,816✔
175
    }
176

177
    tasks.lastUpTs = taosGetTimestampMs();
9✔
178
    code = taosHashPut(mStreamMgmt.snodeMap, &pSnode->id, sizeof(pSnode->id), &tasks, sizeof(tasks));
9✔
179
    if (code && TSDB_CODE_DUP_KEY != code) {
9!
180
      sdbRelease(pMnode->pSdb, pSnode);
×
181
      sdbCancelFetch(pMnode->pSdb, pIter);
×
182
      pSnode = NULL;
×
183
      TAOS_CHECK_EXIT(code);
×
184
    }
185

186
    code = TSDB_CODE_SUCCESS;
9✔
187
  
188
    sdbRelease(pMnode->pSdb, pSnode);
9✔
189
  }
190

191
  pSnode = NULL;
1,816✔
192

193
_exit:
1,816✔
194

195
  if (code) {
1,816!
196
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
197
  }
198

199
  return code;
1,816✔
200
}
201

202
static int32_t msmSTAddDnodesToMap(SMnode* pMnode) {
3,529✔
203
  int32_t code = TSDB_CODE_SUCCESS;
3,529✔
204
  int32_t lino = 0;
3,529✔
205
  int64_t lastUpTs = INT64_MIN;
3,529✔
206
  SDnodeObj *pDnode = NULL;
3,529✔
207
  void *pIter = NULL;
3,529✔
208
  while (1) {
209
    pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
6,387✔
210
    if (pIter == NULL) {
6,387✔
211
      break;
3,529✔
212
    }
213

214
    code = taosHashPut(mStreamMgmt.dnodeMap, &pDnode->id, sizeof(pDnode->id), &lastUpTs, sizeof(lastUpTs));
2,858✔
215
    if (code && TSDB_CODE_DUP_KEY != code) {
2,858!
216
      sdbRelease(pMnode->pSdb, pDnode);
×
217
      sdbCancelFetch(pMnode->pSdb, pIter);
×
218
      pDnode = NULL;
×
219
      TAOS_CHECK_EXIT(code);
×
220
    }
221

222
    code = TSDB_CODE_SUCCESS;
2,858✔
223
    sdbRelease(pMnode->pSdb, pDnode);
2,858✔
224
  }
225

226
  pDnode = NULL;
3,529✔
227

228
_exit:
3,529✔
229

230
  if (code) {
3,529!
231
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
232
  }
233

234
  return code;
3,529✔
235
}
236

237

238

239
static int32_t msmSTAddToTaskMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask) {
×
240
  int32_t code = TSDB_CODE_SUCCESS;
×
241
  int32_t lino = 0;
×
242
  int32_t taskNum = pTask ? 1 : taosArrayGetSize(pTasks);
×
243
  int64_t key[2] = {streamId, 0};
×
244
  
245
  for (int32_t i = 0; i < taskNum; ++i) {
×
246
    SStmTaskStatus* pStatus = pTask ? pTask : taosArrayGet(pTasks, i);
×
247
    key[1] = pStatus->id.taskId;
×
248
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.taskMap, key, sizeof(key), &pStatus, POINTER_BYTES));
×
249
    mstsDebug("task %" PRIx64" tidx %d added to taskMap", pStatus->id.taskId, pStatus->id.taskIdx);
×
250
  }
251
  
252
_exit:
×
253

254
  if (code) {
×
255
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
256
  }
257
  
258
  return code;
×
259
}
260

261
static int32_t msmSTAddToVgStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
×
262
  int32_t code = TSDB_CODE_SUCCESS;
×
263
  int32_t lino = 0;
×
264
  SStmVgStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
×
265
  if (NULL == pStream) {
×
266
    SStmVgStreamStatus stream = {0};
×
267
    if (trigReader) {
×
268
      stream.trigReaders = taosArrayInit(1, POINTER_BYTES);
×
269
      TSDB_CHECK_NULL(stream.trigReaders, code, lino, _exit, terrno);
×
270
      TSDB_CHECK_NULL(taosArrayPush(stream.trigReaders, &pStatus), code, lino, _exit, terrno);
×
271
    } else {
272
      stream.calcReaders = taosArrayInit(2, POINTER_BYTES);
×
273
      TSDB_CHECK_NULL(stream.calcReaders, code, lino, _exit, terrno);
×
274
      TSDB_CHECK_NULL(taosArrayPush(stream.calcReaders, &pStatus), code, lino, _exit, terrno);
×
275
    }
276
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
×
277
    goto _exit;
×
278
  }
279
  
280
  if (trigReader) {
×
281
    if (NULL == pStream->trigReaders) {
×
282
      pStream->trigReaders = taosArrayInit(1, POINTER_BYTES);
×
283
      TSDB_CHECK_NULL(pStream->trigReaders, code, lino, _exit, terrno);
×
284
    }
285
    
286
    TSDB_CHECK_NULL(taosArrayPush(pStream->trigReaders, &pStatus), code, lino, _exit, terrno);
×
287
    goto _exit;
×
288
  }
289
  
290
  if (NULL == pStream->calcReaders) {
×
291
    pStream->calcReaders = taosArrayInit(1, POINTER_BYTES);
×
292
    TSDB_CHECK_NULL(pStream->calcReaders, code, lino, _exit, terrno);
×
293
  }
294

295
  TSDB_CHECK_NULL(taosArrayPush(pStream->calcReaders, &pStatus), code, lino, _exit, terrno);
×
296

297
_exit:
×
298

299
  if (code) {
×
300
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to vgroup %d streamHash in %s at line %d, error:%s", 
×
301
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, __FUNCTION__, lino, tstrerror(code));
302
  } else {
303
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to vgroup %d streamHash", 
×
304
        trigReader ? "trigReader" : "calcReader", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId);
305
  }
306

307
  return code;
×
308
}
309

310
static int32_t msmSTAddToVgroupMapImpl(int64_t streamId, SStmTaskStatus* pStatus, bool trigReader) {
×
311
  int32_t code = TSDB_CODE_SUCCESS;
×
312
  int32_t lino = 0;
×
313
  SStmVgroupStatus vg = {0};
×
314

315
  SStmVgroupStatus* pVg = taosHashGet(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
×
316
  if (NULL == pVg) {
×
317
    vg.streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
318
    TSDB_CHECK_NULL(vg.streamTasks, code, lino, _exit, terrno);
×
319
    taosHashSetFreeFp(vg.streamTasks, mstDestroySStmVgStreamStatus);
×
320

321
    vg.lastUpTs = taosGetTimestampMs();
×
322
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(vg.streamTasks, streamId, pStatus, trigReader));
×
323
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.vgroupMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId), &vg, sizeof(vg)));
×
324
  } else {
325
    TAOS_CHECK_EXIT(msmSTAddToVgStreamHash(pVg->streamTasks, streamId, pStatus, trigReader));
×
326
  }
327
  
328
_exit:
×
329

330
  if (code) {
×
331
    mstDestroyVgroupStatus(&vg);
×
332
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
333
  } else {
334
    mstsDebug("task %" PRIx64 " tidx %d added to vgroupMap %d", pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
×
335
  }
336

337
  return code;
×
338
}
339

340
static int32_t msmTDAddToVgroupMap(SHashObj* pVgMap, SStmTaskDeploy* pDeploy, int64_t streamId) {
×
341
  int32_t code = TSDB_CODE_SUCCESS;
×
342
  int32_t lino = 0;
×
343
  SStmVgTasksToDeploy vg = {0};
×
344
  SStreamTask* pTask = &pDeploy->task;
×
345
  SStmTaskToDeployExt ext = {0};
×
346
  ext.deploy = *pDeploy;
×
347

348
  while (true) {
×
349
    SStmVgTasksToDeploy* pVg = taosHashAcquire(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
×
350
    if (NULL == pVg) {
×
351
      vg.taskList = taosArrayInit(20, sizeof(SStmTaskToDeployExt));
×
352
      TSDB_CHECK_NULL(vg.taskList, code, lino, _return, terrno);
×
353
      TSDB_CHECK_NULL(taosArrayPush(vg.taskList, &ext), code, lino, _return, terrno);
×
354
      code = taosHashPut(pVgMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &vg, sizeof(SStmVgTasksToDeploy));
×
355
      if (TSDB_CODE_SUCCESS == code) {
×
356
        goto _return;
×
357
      }
358

359
      if (TSDB_CODE_DUP_KEY != code) {
×
360
        goto _return;
×
361
      }    
362

363
      taosArrayDestroy(vg.taskList);
×
364
      continue;
×
365
    }
366

367
    taosWLockLatch(&pVg->lock);
×
368
    if (NULL == taosArrayPush(pVg->taskList, &ext)) {
×
369
      taosWUnLockLatch(&pVg->lock);
×
370
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
371
    }
372
    taosWUnLockLatch(&pVg->lock);
×
373
    
374
    taosHashRelease(pVgMap, pVg);
×
375
    break;
×
376
  }
377
  
378
_return:
×
379

380
  if (code) {
×
381
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
382
  } else {
383
    int32_t num = atomic_add_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
×
384
    msttDebug("task added to toDeployVgTaskNum, vgToDeployTaskNum:%d", num);
×
385
  }
386

387
  return code;
×
388
}
389

390

391
static int32_t msmSTAddToSnodeStreamHash(SHashObj* pHash, int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
×
392
  int32_t code = TSDB_CODE_SUCCESS;
×
393
  int32_t lino = 0;
×
394
  SStmSnodeStreamStatus* pStream = taosHashGet(pHash, &streamId, sizeof(streamId));
×
395
  if (NULL == pStream) {
×
396
    SStmSnodeStreamStatus stream = {0};
×
397
    if (deployId < 0) {
×
398
      stream.trigger = pStatus;
×
399
    } else {
400
      stream.runners[deployId] = taosArrayInit(2, POINTER_BYTES);
×
401
      TSDB_CHECK_NULL(stream.runners[deployId], code, lino, _exit, terrno);
×
402
      TSDB_CHECK_NULL(taosArrayPush(stream.runners[deployId], &pStatus), code, lino, _exit, terrno);
×
403
    }
404
    
405
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &stream, sizeof(stream)));
×
406
    goto _exit;
×
407
  }
408
  
409
  if (deployId < 0) {
×
410
    if (NULL != pStream->trigger) {
×
411
      mstsWarn("stream already got trigger task %" PRIx64 " SID:%" PRIx64 " in snode %d, replace it with task %" PRIx64 " SID:%" PRIx64, 
×
412
          pStream->trigger->id.taskId, pStream->trigger->id.seriousId, pStatus->id.nodeId, pStatus->id.taskId, pStatus->id.seriousId);
413
    }
414
    
415
    pStream->trigger = pStatus;
×
416
    goto _exit;
×
417
  }
418
  
419
  if (NULL == pStream->runners[deployId]) {
×
420
    pStream->runners[deployId] = taosArrayInit(2, POINTER_BYTES);
×
421
    TSDB_CHECK_NULL(pStream->runners[deployId], code, lino, _exit, terrno);
×
422
  }
423

424
  TSDB_CHECK_NULL(taosArrayPush(pStream->runners[deployId], &pStatus), code, lino, _exit, terrno);
×
425

426
_exit:
×
427

428
  if (code) {
×
429
    mstsError("%s task %" PRIx64 " SID:%" PRIx64 " failed to add to snode %d streamHash deployId:%d in %s at line %d, error:%s", 
×
430
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId, __FUNCTION__, lino, tstrerror(code));
431
  } else {
432
    mstsDebug("%s task %" PRIx64 " SID:%" PRIx64 " added to snode %d streamHash deployId:%d", 
×
433
        (deployId < 0) ? "trigger" : "runner", pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.nodeId, deployId);
434
  }
435

436
  return code;
×
437
}
438

439

440
static int32_t msmSTAddToSnodeMapImpl(int64_t streamId, SStmTaskStatus* pStatus, int32_t deployId) {
×
441
  int32_t code = TSDB_CODE_SUCCESS;
×
442
  int32_t lino = 0;
×
443

444
  SStmSnodeStatus* pSnode = taosHashGet(mStreamMgmt.snodeMap, &pStatus->id.nodeId, sizeof(pStatus->id.nodeId));
×
445
  if (NULL == pSnode) {
×
446
    mstsWarn("snode %d not exists in snodeMap anymore, may be dropped", pStatus->id.nodeId);
×
447
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
448
  } else {
449
    if (NULL == pSnode->streamTasks) {
×
450
      pSnode->streamTasks = taosHashInit(2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
451
      TSDB_CHECK_NULL(pSnode->streamTasks, code, lino, _exit, terrno);
×
452
      taosHashSetFreeFp(pSnode->streamTasks, mstDestroySStmSnodeStreamStatus);
×
453
    }
454
    
455
    TAOS_CHECK_EXIT(msmSTAddToSnodeStreamHash(pSnode->streamTasks, streamId, pStatus, deployId));
×
456
  }
457
  
458
_exit:
×
459

460
  if (code) {
×
461
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
462
  } else {
463
    mstsDebug("%s task %" PRIx64 " tidx %d added to snodeMap, snodeId:%d", (deployId < 0) ? "trigger" : "runner", 
×
464
        pStatus->id.taskId, pStatus->id.taskIdx, pStatus->id.nodeId);
465
  }
466

467
  return code;
×
468
}
469

470

471

472
static int32_t msmTDAddTriggerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
×
473
  int32_t code = TSDB_CODE_SUCCESS;
×
474
  int32_t lino = 0;
×
475
  int64_t streamId = pStream->pCreate->streamId;
×
476
  SStmSnodeTasksDeploy snode = {0};
×
477
  SStmTaskToDeployExt ext;
478
  SStreamTask* pTask = &pDeploy->task;
×
479

480
  while (true) {
×
481
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
×
482
    if (NULL == pSnode) {
×
483
      snode.triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
×
484
      TSDB_CHECK_NULL(snode.triggerList, code, lino, _return, terrno);
×
485

486
      ext.deploy = *pDeploy;
×
487
      ext.deployed = false;
×
488
      TSDB_CHECK_NULL(taosArrayPush(snode.triggerList, &ext), code, lino, _return, terrno);
×
489

490
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
×
491
      if (TSDB_CODE_SUCCESS == code) {
×
492
        goto _return;
×
493
      }
494

495
      if (TSDB_CODE_DUP_KEY != code) {
×
496
        goto _return;
×
497
      }    
498

499
      taosArrayDestroy(snode.triggerList);
×
500
      continue;
×
501
    }
502
    
503
    taosWLockLatch(&pSnode->lock);
×
504
    if (NULL == pSnode->triggerList) {
×
505
      pSnode->triggerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
×
506
      if (NULL == pSnode->triggerList) {
×
507
        taosWUnLockLatch(&pSnode->lock);
×
508
        TSDB_CHECK_NULL(pSnode->triggerList, code, lino, _return, terrno);
×
509
      }
510
    }
511
    
512
    ext.deploy = *pDeploy;
×
513
    ext.deployed = false;
×
514
    
515
    if (NULL == taosArrayPush(pSnode->triggerList, &ext)) {
×
516
      taosWUnLockLatch(&pSnode->lock);
×
517
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
518
    }
519
    taosWUnLockLatch(&pSnode->lock);
×
520
    
521
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
522
    break;
×
523
  }
524
  
525
_return:
×
526

527
  if (code) {
×
528
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
529
  } else {
530
    msttDebug("trigger task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
×
531
  }
532

533
  return code;
×
534
}
535

536
static int32_t msmTDAddRunnerToSnodeMap(SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
×
537
  int32_t code = TSDB_CODE_SUCCESS;
×
538
  int32_t lino = 0;
×
539
  int64_t streamId = pStream->pCreate->streamId;
×
540
  SStmSnodeTasksDeploy snode = {0};
×
541
  SStmTaskToDeployExt ext;
542
  SStreamTask* pTask = &pDeploy->task;
×
543

544
  while (true) {
×
545
    SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId));
×
546
    if (NULL == pSnode) {
×
547
      snode.runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
×
548
      TSDB_CHECK_NULL(snode.runnerList, code, lino, _return, terrno);
×
549

550
      ext.deploy = *pDeploy;
×
551
      ext.deployed = false;
×
552
      TSDB_CHECK_NULL(taosArrayPush(snode.runnerList, &ext), code, lino, _return, terrno);
×
553

554
      code = taosHashPut(mStreamMgmt.toDeploySnodeMap, &pDeploy->task.nodeId, sizeof(pDeploy->task.nodeId), &snode, sizeof(snode));
×
555
      if (TSDB_CODE_SUCCESS == code) {
×
556
        goto _return;
×
557
      }
558

559
      if (TSDB_CODE_DUP_KEY != code) {
×
560
        goto _return;
×
561
      }    
562

563
      taosArrayDestroy(snode.runnerList);
×
564
      continue;
×
565
    }
566
    
567
    taosWLockLatch(&pSnode->lock);
×
568
    if (NULL == pSnode->runnerList) {
×
569
      pSnode->runnerList = taosArrayInit(10, sizeof(SStmTaskToDeployExt));
×
570
      if (NULL == pSnode->runnerList) {
×
571
        taosWUnLockLatch(&pSnode->lock);
×
572
        TSDB_CHECK_NULL(pSnode->runnerList, code, lino, _return, terrno);
×
573
      }
574
    }
575
    
576
    ext.deploy = *pDeploy;
×
577
    ext.deployed = false;
×
578
    
579
    if (NULL == taosArrayPush(pSnode->runnerList, &ext)) {
×
580
      taosWUnLockLatch(&pSnode->lock);
×
581
      TSDB_CHECK_NULL(NULL, code, lino, _return, terrno);
×
582
    }
583
    taosWUnLockLatch(&pSnode->lock);
×
584
    
585
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
586
    break;
×
587
  }
588
  
589
_return:
×
590

591
  if (code) {
×
592
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
593
  } else {
594
    msttDebug("task added to toDeploySnodeMap, tidx:%d", pTask->taskIdx);
×
595
  }
596

597
  return code;
×
598
}
599

600
static int32_t msmTDAddRunnersToSnodeMap(SArray* runnerList, SStreamObj* pStream) {
×
601
  int32_t code = TSDB_CODE_SUCCESS;
×
602
  int32_t lino = 0;
×
603
  int32_t runnerNum = taosArrayGetSize(runnerList);
×
604
  SStmTaskDeploy* pDeploy = NULL;
×
605
  int64_t streamId = pStream->pCreate->streamId;
×
606

607
  for (int32_t i = 0; i < runnerNum; ++i) {
×
608
    pDeploy = taosArrayGet(runnerList, i);
×
609
    
610
    TAOS_CHECK_EXIT(msmTDAddRunnerToSnodeMap(pDeploy, pStream));
×
611
    
612
    atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);    
×
613
  }
614

615
_exit:
×
616

617
  if (code) {
×
618
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
619
  }
620

621
  return code;
×
622
}
623

624

625
int32_t msmUpdateSnodeUpTs(SStmGrpCtx* pCtx) {
15✔
626
  int32_t  code = TSDB_CODE_SUCCESS;
15✔
627
  int32_t  lino = 0;
15✔
628
  SStmSnodeStatus* pStatus = NULL;
15✔
629
  bool     noExists = false;
15✔
630

631
  while (true) {
632
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &pCtx->pReq->snodeId, sizeof(pCtx->pReq->snodeId));
18✔
633
    if (NULL == pStatus) {
18✔
634
      if (noExists) {
3!
635
        mstWarn("snode %d not exists in snodeMap, may be dropped, ignore it", pCtx->pReq->snodeId);
×
636
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
×
637
      }
638

639
      noExists = true;
3✔
640
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pCtx->pMnode));
3!
641
      
642
      continue;
3✔
643
    }
644

645
    atomic_store_32(&pStatus->runnerThreadNum, pCtx->pReq->runnerThreadNum);
15✔
646
    
647
    while (true) {
×
648
      int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
15✔
649
      if (pCtx->currTs > lastTsValue) {
15✔
650
        if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
12!
651
          mstDebug("snode %d lastUpTs updated", pCtx->pReq->snodeId);
12!
652
          return code;
12✔
653
        }
654

655
        continue;
×
656
      }
657

658
      return code;
3✔
659
    }
660

661
    break;
662
  }
663

664
_exit:
×
665

666
  if (code) {
×
667
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
668
  }
669

670
  return code;  
×
671
}
672

673
void msmUpdateVgroupUpTs(SStmGrpCtx* pCtx, int32_t vgId) {
48,428✔
674
  int32_t  code = TSDB_CODE_SUCCESS;
48,428✔
675
  int32_t  lino = 0;
48,428✔
676
  SStmVgroupStatus* pStatus = taosHashGet(mStreamMgmt.vgroupMap, &vgId, sizeof(vgId));
48,428✔
677
  if (NULL == pStatus) {
48,428!
678
    mstDebug("vgroup %d not exists in vgroupMap, ignore update upTs", vgId);
48,428✔
679
    return;
48,428✔
680
  }
681

682
  while (true) {
×
683
    int64_t lastTsValue = atomic_load_64(&pStatus->lastUpTs);
×
684
    if (pCtx->currTs > lastTsValue) {
×
685
      if (lastTsValue == atomic_val_compare_exchange_64(&pStatus->lastUpTs, lastTsValue, pCtx->currTs)) {
×
686
        mstDebug("vgroup %d lastUpTs updated to %" PRId64, vgId, pCtx->currTs);
×
687
        return;
×
688
      }
689

690
      continue;
×
691
    }
692

693
    return;
×
694
  }  
695
}
696

697
int32_t msmUpdateVgroupsUpTs(SStmGrpCtx* pCtx) {
21,659✔
698
  int32_t code = TSDB_CODE_SUCCESS;
21,659✔
699
  int32_t lino = 0;
21,659✔
700
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
21,659✔
701

702
  mstDebug("start to update vgroups upTs");
21,659✔
703
  
704
  for (int32_t i = 0; i < vgNum; ++i) {
70,087✔
705
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
48,428✔
706

707
    msmUpdateVgroupUpTs(pCtx, *vgId);
48,428✔
708
  }
709

710
_exit:
21,659✔
711

712
  if (code) {
21,659!
713
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
714
  }
715

716
  return code;
21,659✔
717
}
718

719

720

721
void* msmSearchCalcCacheScanPlan(SArray* pList) {
×
722
  int32_t num = taosArrayGetSize(pList);
×
723
  for (int32_t i = 0; i < num; ++i) {
×
724
    SStreamCalcScan* pScan = taosArrayGet(pList, i);
×
725
    if (pScan->readFromCache) {
×
726
      return pScan->scanPlan;
×
727
    }
728
  }
729

730
  return NULL;
×
731
}
732

733
int32_t msmBuildReaderDeployInfo(SStmTaskDeploy* pDeploy, SStreamObj* pStream, void* calcScanPlan, SStmStatus* pInfo, bool triggerReader) {
×
734
  SStreamReaderDeployMsg* pMsg = &pDeploy->msg.reader;
×
735
  pMsg->triggerReader = triggerReader;
×
736
  
737
  if (triggerReader) {
×
738
    if (NULL == pStream) {
×
739
      return TSDB_CODE_SUCCESS;
×
740
    }
741
    
742
    SStreamReaderDeployFromTrigger* pTrigger = &pMsg->msg.trigger;
×
743
    pTrigger->triggerTblName = pInfo->pCreate->triggerTblName;
×
744
    pTrigger->triggerTblUid = pStream->pCreate->triggerTblUid;
×
745
    pTrigger->triggerTblType = pStream->pCreate->triggerTblType;
×
746
    pTrigger->deleteReCalc = pStream->pCreate->deleteReCalc;
×
747
    pTrigger->deleteOutTbl = pStream->pCreate->deleteOutTbl;
×
748
    pTrigger->partitionCols = pInfo->pCreate->partitionCols;
×
749
    pTrigger->triggerCols = pInfo->pCreate->triggerCols;
×
750
    //pTrigger->triggerPrevFilter = pStream->pCreate->triggerPrevFilter;
751
    pTrigger->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
×
752
    pTrigger->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
×
753
  } else {
754
    SStreamReaderDeployFromCalc* pCalc = &pMsg->msg.calc;
×
755
    pCalc->execReplica = pInfo->runnerDeploys * pInfo->runnerReplica;
×
756
    pCalc->calcScanPlan = calcScanPlan;
×
757
  }
758

759
  return TSDB_CODE_SUCCESS;
×
760
}
761

762
int32_t msmBuildTriggerRunnerTargets(SMnode* pMnode, SStmStatus* pInfo, int64_t streamId, SArray** ppRes) {
×
763
  int32_t code = TSDB_CODE_SUCCESS;
×
764
  int32_t lino = 0;
×
765

766
  if (pInfo->runnerDeploys > 0) {
×
767
    *ppRes = taosArrayInit(pInfo->runnerDeploys, sizeof(SStreamRunnerTarget));
×
768
    TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
×
769
  }
770
  
771
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
×
772
    SStmTaskStatus* pStatus = taosArrayGetLast(pInfo->runners[i]);
×
773
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
×
774

775
    if (!STREAM_IS_TOP_RUNNER(pStatus->flags)) {
×
776
      mstsError("the last runner task %" PRIx64 " SID:%" PRId64 " tidx:%d in deploy %d is not top runner", 
×
777
          pStatus->id.taskId, pStatus->id.seriousId, pStatus->id.taskIdx, i);
778
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
779
    }
780
    
781
    SStreamRunnerTarget runner;
782
    runner.addr.taskId = pStatus->id.taskId;
×
783
    runner.addr.nodeId = pStatus->id.nodeId;
×
784
    runner.addr.epset = mndGetDnodeEpsetById(pMnode, pStatus->id.nodeId);
×
785
    runner.execReplica = pInfo->runnerReplica; 
×
786
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &runner), code, lino, _exit, terrno);
×
787
    mstsDebug("the %dth runner target added to trigger's runnerList, TASK:%" PRIx64 , i, runner.addr.taskId);
×
788
  }
789

790
_exit:
×
791

792
  if (code) {
×
793
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
794
  }
795
  
796
  return TSDB_CODE_SUCCESS;
×
797
}
798

799
int32_t msmBuildStreamSnodeInfo(SMnode* pMnode, SStreamObj* pStream, SStreamSnodeInfo* pInfo) {
×
800
  int64_t streamId = pStream->pCreate->streamId;
×
801
  int32_t leaderSnodeId = atomic_load_32(&pStream->mainSnodeId);
×
802
  SSnodeObj* pSnode = mndAcquireSnode(pMnode, leaderSnodeId);
×
803
  if (NULL == pSnode) {
×
804
    mstsError("snode %d not longer exists, ignore build stream snode info", leaderSnodeId);
×
805
    return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
806
  }
807
  
808
  pInfo->leaderSnodeId = leaderSnodeId;
×
809
  pInfo->replicaSnodeId = pSnode->replicaId;
×
810

811
  mndReleaseSnode(pMnode, pSnode);
×
812

813
  pInfo->leaderEpSet = mndGetDnodeEpsetById(pMnode, pInfo->leaderSnodeId);
×
814
  if (GOT_SNODE(pInfo->replicaSnodeId)) {
×
815
    pInfo->replicaEpSet = mndGetDnodeEpsetById(pMnode, pInfo->replicaSnodeId);
×
816
  }
817

818
  return TSDB_CODE_SUCCESS;
×
819
}
820

821
int32_t msmBuildTriggerDeployInfo(SMnode* pMnode, SStmStatus* pInfo, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
×
822
  int32_t code = TSDB_CODE_SUCCESS;
×
823
  int32_t lino = 0;
×
824
  int64_t streamId = pStream->pCreate->streamId;
×
825
  SStreamTriggerDeployMsg* pMsg = &pDeploy->msg.trigger;
×
826
  
827
  pMsg->triggerType = pStream->pCreate->triggerType;
×
828
  pMsg->igDisorder = pStream->pCreate->igDisorder;
×
829
  pMsg->fillHistory = pStream->pCreate->fillHistory;
×
830
  pMsg->fillHistoryFirst = pStream->pCreate->fillHistoryFirst;
×
831
  pMsg->lowLatencyCalc = pStream->pCreate->lowLatencyCalc;
×
832
  pMsg->igNoDataTrigger = pStream->pCreate->igNoDataTrigger;
×
833
  pMsg->hasPartitionBy = (pStream->pCreate->partitionCols != NULL);
×
834
  pMsg->isTriggerTblVirt = STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags);
×
835

836
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
×
837
  pMsg->notifyEventTypes = pStream->pCreate->notifyEventTypes;
×
838
  pMsg->notifyErrorHandle = pStream->pCreate->notifyErrorHandle;
×
839
  pMsg->notifyHistory = pStream->pCreate->notifyHistory;
×
840

841
  pMsg->maxDelay = pStream->pCreate->maxDelay;
×
842
  pMsg->fillHistoryStartTime = pStream->pCreate->fillHistoryStartTime;
×
843
  pMsg->watermark = pStream->pCreate->watermark;
×
844
  pMsg->expiredTime = pStream->pCreate->expiredTime;
×
845
  pMsg->trigger = pInfo->pCreate->trigger;
×
846

847
  pMsg->eventTypes = pStream->pCreate->eventTypes;
×
848
  pMsg->placeHolderBitmap = pStream->pCreate->placeHolderBitmap;
×
849
  pMsg->calcTsSlotId = pStream->pCreate->calcTsSlotId;
×
850
  pMsg->triTsSlotId = pStream->pCreate->triTsSlotId;
×
851
  pMsg->triggerPrevFilter = pInfo->pCreate->triggerPrevFilter;
×
852
  if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
×
853
    pMsg->triggerScanPlan = pInfo->pCreate->triggerScanPlan;
×
854
    pMsg->calcCacheScanPlan = msmSearchCalcCacheScanPlan(pInfo->pCreate->calcScanPlanList);
×
855
  }
856

857
  SStreamTaskAddr addr;
858
  int32_t triggerReaderNum = taosArrayGetSize(pInfo->trigReaders);
×
859
  if (triggerReaderNum > 0) {
×
860
    pMsg->readerList = taosArrayInit(triggerReaderNum, sizeof(SStreamTaskAddr));
×
861
    TSDB_CHECK_NULL(pMsg->readerList, code, lino, _exit, terrno);
×
862
  }
863
  
864
  for (int32_t i = 0; i < triggerReaderNum; ++i) {
×
865
    SStmTaskStatus* pStatus = taosArrayGet(pInfo->trigReaders, i);
×
866
    addr.taskId = pStatus->id.taskId;
×
867
    addr.nodeId = pStatus->id.nodeId;
×
868
    addr.epset = mndGetVgroupEpsetById(pMnode, pStatus->id.nodeId);
×
869
    TSDB_CHECK_NULL(taosArrayPush(pMsg->readerList, &addr), code, lino, _exit, terrno);
×
870
    mstsDebug("the %dth trigReader src added to trigger's readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
×
871
  }
872

873
  pMsg->leaderSnodeId = pStream->mainSnodeId;
×
874
  pMsg->streamName = pInfo->streamName;
×
875

876
  if (0 == pInfo->runnerNum) {
×
877
    mstsDebug("no runner task, skip set trigger's runner list, deployNum:%d", pInfo->runnerDeploys);
×
878
    return code;
×
879
  }
880

881
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pInfo, streamId, &pMsg->runnerList));
×
882

883
_exit:
×
884

885
  if (code) {
×
886
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
887
  } else {
888
    mstsDebug("trigger deploy info built, readerNum:%d, runnerNum:%d", (int32_t)taosArrayGetSize(pMsg->readerList), (int32_t)taosArrayGetSize(pMsg->runnerList));
×
889
  }
890
  
891
  return TSDB_CODE_SUCCESS;
×
892
}
893

894

895
int32_t msmBuildRunnerDeployInfo(SStmTaskDeploy* pDeploy, SSubplan *plan, SStreamObj* pStream, SStmStatus* pInfo, bool topPlan) {
×
896
  int32_t code = TSDB_CODE_SUCCESS;
×
897
  int32_t lino = 0;
×
898
  int64_t streamId = pStream->pCreate->streamId;
×
899
  SStreamRunnerDeployMsg* pMsg = &pDeploy->msg.runner;
×
900
  //TAOS_CHECK_EXIT(qSubPlanToString(plan, &pMsg->pPlan, NULL));
901

902
  pMsg->execReplica = pInfo->runnerReplica;
×
903
  pMsg->streamName = pInfo->streamName;
×
904
  //TAOS_CHECK_EXIT(nodesCloneNode((SNode*)plan, (SNode**)&pMsg->pPlan));
905
  pMsg->pPlan = plan;
×
906
  pMsg->outDBFName = pInfo->pCreate->outDB;
×
907
  pMsg->outTblName = pInfo->pCreate->outTblName;
×
908
  pMsg->outTblType = pStream->pCreate->outTblType;
×
909
  pMsg->calcNotifyOnly = pStream->pCreate->calcNotifyOnly;
×
910
  pMsg->topPlan = topPlan;
×
911
  pMsg->pNotifyAddrUrls = pInfo->pCreate->pNotifyAddrUrls;
×
912
  pMsg->notifyErrorHandle = pStream->pCreate->notifyErrorHandle;
×
913
  pMsg->outCols = pInfo->pCreate->outCols;
×
914
  pMsg->outTags = pInfo->pCreate->outTags;
×
915
  pMsg->outStbUid = pStream->pCreate->outStbUid;
×
916
  pMsg->outStbSversion = pStream->pCreate->outStbSversion;
×
917
  
918
  pMsg->subTblNameExpr = pInfo->pCreate->subTblNameExpr;
×
919
  pMsg->tagValueExpr = pInfo->pCreate->tagValueExpr;
×
920
  pMsg->forceOutCols = pInfo->pCreate->forceOutCols;
×
921

922
_exit:
×
923

924
  if (code) {
×
925
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
926
  }
927
  
928
  return code;
×
929
}
930

931

932
static int32_t msmSTAddToVgroupMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, bool trigReader) {
×
933
  int32_t code = TSDB_CODE_SUCCESS;
×
934
  int32_t lino = 0;
×
935
  int32_t taskNum = pTask ? 1 : taosArrayGetSize(pTasks);
×
936
  
937
  for (int32_t i = 0; i < taskNum; ++i) {
×
938
    SStmTaskStatus* pStatus = pTask ? pTask : taosArrayGet(pTasks, i);
×
939
    TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pStatus, trigReader));
×
940
  }
941
  
942
_exit:
×
943

944
  if (code) {
×
945
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
946
  }
947
  
948
  return code;
×
949
}
950

951

952
static int32_t msmSTAddToSnodeMap(SStmGrpCtx* pCtx, int64_t streamId, SArray* pTasks, SStmTaskStatus* pTask, int32_t taskNum, int32_t deployId) {
×
953
  int32_t code = TSDB_CODE_SUCCESS;
×
954
  int32_t lino = 0;
×
955
  int32_t rtaskNum = (taskNum > 0) ? taskNum : taosArrayGetSize(pTasks);
×
956
  int32_t taskType = (deployId < 0) ? STREAM_TRIGGER_TASK : STREAM_RUNNER_TASK;
×
957
  
958
  for (int32_t i = 0; i < rtaskNum; ++i) {
×
959
    SStmTaskStatus* pStatus = (taskNum > 0) ? (pTask + i) : taosArrayGet(pTasks, i);
×
960
    TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pStatus, deployId));
×
961
  }
962
  
963
_exit:
×
964

965
  if (code) {
×
966
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
967
  }
968

969
  return code;
×
970
}
971

972
int64_t msmAssignTaskId(void) {
×
973
  return atomic_fetch_add_64(&mStreamMgmt.lastTaskId, 1);
×
974
}
975

976
int64_t msmAssignTaskSeriousId(void) {
×
977
  return taosGetTimestampNs();
×
978
}
979

980

981
int32_t msmIsSnodeAlive(SMnode* pMnode, int32_t snodeId, int64_t streamId, bool* alive) {
×
982
  int32_t code = TSDB_CODE_SUCCESS;
×
983
  int32_t lino = 0;
×
984
  bool     noExists = false;
×
985
  SStmSnodeStatus* pStatus = NULL;
×
986

987
  while (true) {
988
    pStatus = taosHashGet(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
×
989
    if (NULL == pStatus) {
×
990
      if (noExists) {
×
991
        mstsError("snode %d not exists in snodeMap", snodeId);
×
992
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
993
      }
994

995
      noExists = true;
×
996
      TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
×
997
      
998
      continue;
×
999
    }
1000

1001
    *alive = (pStatus->runnerThreadNum >= 0);
×
1002
    break;
×
1003
  }
1004

1005
_exit:
×
1006

1007
  if (code) {
×
1008
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1009
  }
1010

1011
  return code;
×
1012
}
1013

1014
int32_t msmRetrieveStaticSnodeId(SMnode* pMnode, SStreamObj* pStream) {
×
1015
  int32_t code = TSDB_CODE_SUCCESS;
×
1016
  int32_t lino = 0;
×
1017
  bool alive = false;
×
1018
  int32_t mainSnodeId = atomic_load_32(&pStream->mainSnodeId);
×
1019
  int32_t snodeId = mainSnodeId;
×
1020
  int64_t streamId = pStream->pCreate->streamId;
×
1021
  
1022
  while (true) {
1023
    TAOS_CHECK_EXIT(msmIsSnodeAlive(pMnode, snodeId, streamId, &alive));
×
1024

1025
    if (alive) {
×
1026
      return snodeId;
×
1027
    }
1028
    
1029
    if (snodeId == mainSnodeId) {
×
1030
      SSnodeObj* pSnode = mndAcquireSnode(pMnode, snodeId);
×
1031
      if (NULL == pSnode) {
×
1032
        stsWarn("snode %d not longer exists, ignore assign snode", snodeId);
×
1033
        return 0;
×
1034
      }
1035
      
1036
      if (pSnode->replicaId <= 0) {
×
1037
        mstsError("no available snode now, mainSnodeId:%d, replicaId:%d", mainSnodeId, pSnode->replicaId);
×
1038
        mndReleaseSnode(pMnode, pSnode);
×
1039
        return 0;
×
1040
      }
1041

1042
      snodeId = pSnode->replicaId;
×
1043
      mndReleaseSnode(pMnode, pSnode);
×
1044
      
1045
      continue;
×
1046
    }
1047

1048
    mstsError("no available snode now, mainSnodeId:%d, followerSnodeId:%d", mainSnodeId, snodeId);
×
1049
    return 0;
×
1050
  }
1051

1052
_exit:
×
1053

1054
  if (code) {
×
1055
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1056
  }
1057

1058
  return 0;
×
1059
}
1060

1061
int32_t msmAssignRandomSnodeId(SMnode* pMnode, int64_t streamId) {
×
1062
  int32_t code = TSDB_CODE_SUCCESS;
×
1063
  int32_t lino = 0;
×
1064
  int32_t snodeIdx = 0;
×
1065
  int32_t snodeId = 0;
×
1066
  void      *pIter = NULL;
×
1067
  SSnodeObj *pObj = NULL;
×
1068
  bool alive = false;
×
1069
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
×
1070
  if (snodeNum <= 0) {
×
1071
    mstsInfo("no available snode now, num:%d", snodeNum);
×
1072
    goto _exit;
×
1073
  }
1074

1075
  int32_t snodeTarget = taosRand() % snodeNum;
×
1076

1077
  while (1) {
1078
    pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
×
1079
    if (pIter == NULL) {
×
1080
      if (0 == snodeId) {
×
1081
        mstsError("no alive snode now, snodeNum:%d", snodeNum);
×
1082
        break;
×
1083
      }
1084
      
1085
      snodeId = 0;
×
1086
      continue;
×
1087
    }
1088

1089
    code = msmIsSnodeAlive(pMnode, pObj->id, streamId, &alive);
×
1090
    if (code) {
×
1091
      sdbRelease(pMnode->pSdb, pObj);
×
1092
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1093
      pObj = NULL;
×
1094
      TAOS_CHECK_EXIT(code);
×
1095
    }
1096
    
1097
    if (!alive) {
×
1098
      sdbRelease(pMnode->pSdb, pObj);
×
1099
      continue;
×
1100
    }
1101

1102
    snodeId = pObj->id;
×
1103
    if (snodeIdx == snodeTarget) {
×
1104
      sdbRelease(pMnode->pSdb, pObj);
×
1105
      sdbCancelFetch(pMnode->pSdb, pIter);
×
1106
      pObj = NULL;
×
1107
      goto _exit;
×
1108
    }
1109

1110
    sdbRelease(pMnode->pSdb, pObj);
×
1111
    snodeIdx++;
×
1112
  }
1113

1114
_exit:
×
1115

1116
  if (code) {
×
1117
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1118
  }
1119

1120
  if (0 == snodeId) {
×
1121
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
×
1122
  }
1123

1124
  return snodeId;
×
1125
}
1126

1127
int32_t msmAssignTaskSnodeId(SMnode* pMnode, SStreamObj* pStream, bool isStatic) {
×
1128
  int64_t streamId = pStream->pCreate->streamId;
×
1129
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
×
1130
  int32_t snodeId = 0;
×
1131
  if (snodeNum <= 0) {
×
1132
    mstsInfo("no available snode now, num:%d", snodeNum);
×
1133
    goto _exit;
×
1134
  }
1135

1136
  snodeId = isStatic ? msmRetrieveStaticSnodeId(pMnode, pStream) : msmAssignRandomSnodeId(pMnode, streamId);
×
1137

1138
_exit:
×
1139

1140
  if (0 == snodeId) {
×
1141
    terrno = TSDB_CODE_SNODE_NO_AVAILABLE_NODE;
×
1142
  }
1143

1144
  return snodeId;
×
1145
}
1146

1147

1148
static int32_t msmBuildTriggerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1149
  int32_t code = TSDB_CODE_SUCCESS;
×
1150
  int32_t lino = 0;
×
1151
  int64_t streamId = pStream->pCreate->streamId;
×
1152

1153
  pInfo->triggerTask = taosMemoryCalloc(1, sizeof(SStmTaskStatus));
×
1154
  TSDB_CHECK_NULL(pInfo->triggerTask, code, lino, _exit, terrno);
×
1155

1156
  pInfo->triggerTask->id.taskId = pCtx->triggerTaskId;
×
1157
  pInfo->triggerTask->id.deployId = 0;
×
1158
  pInfo->triggerTask->id.seriousId = msmAssignTaskSeriousId();
×
1159
  pInfo->triggerTask->id.nodeId = pCtx->triggerNodeId;
×
1160
  pInfo->triggerTask->id.taskIdx = 0;
×
1161
  pInfo->triggerTask->type = STREAM_TRIGGER_TASK;
×
1162
  pInfo->triggerTask->lastUpTs = pCtx->currTs;
×
1163
  pInfo->triggerTask->pStream = pInfo;
×
1164

1165
  SStmTaskDeploy info = {0};
×
1166
  info.task.type = pInfo->triggerTask->type;
×
1167
  info.task.streamId = streamId;
×
1168
  info.task.taskId =  pInfo->triggerTask->id.taskId;
×
1169
  info.task.seriousId = pInfo->triggerTask->id.seriousId;
×
1170
  info.task.nodeId =  pInfo->triggerTask->id.nodeId;
×
1171
  info.task.taskIdx =  pInfo->triggerTask->id.taskIdx;
×
1172
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pInfo, &info, pStream));
×
1173
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
×
1174
  
1175
  atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
×
1176

1177
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pInfo->triggerTask));
×
1178
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pInfo->triggerTask, 1, -1));
×
1179

1180
_exit:
×
1181

1182
  if (code) {
×
1183
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1184
  }
1185

1186
  return code;
×
1187
}
1188

1189
static int32_t msmTDAddSingleTrigReader(SStmGrpCtx* pCtx, SStmTaskStatus* pState, int32_t nodeId, SStmStatus* pInfo, SStreamObj* pStream, int64_t streamId) {
×
1190
  int32_t code = TSDB_CODE_SUCCESS;
×
1191
  int32_t lino = 0;
×
1192

1193
  pState->id.taskId = msmAssignTaskId();
×
1194
  pState->id.deployId = 0;
×
1195
  pState->id.seriousId = msmAssignTaskSeriousId();
×
1196
  pState->id.nodeId = nodeId;
×
1197
  pState->id.taskIdx = 0;
×
1198
  pState->type = STREAM_READER_TASK;
×
1199
  pState->flags = STREAM_FLAG_TRIGGER_READER;
×
1200
  pState->status = STREAM_STATUS_UNDEPLOYED;
×
1201
  pState->lastUpTs = pCtx->currTs;
×
1202
  pState->pStream = pInfo;
×
1203
  
1204
  SStmTaskDeploy info = {0};
×
1205
  info.task.type = pState->type;
×
1206
  info.task.streamId = streamId;
×
1207
  info.task.taskId = pState->id.taskId;
×
1208
  info.task.flags = pState->flags;
×
1209
  info.task.seriousId = pState->id.seriousId;
×
1210
  info.task.nodeId = pState->id.nodeId;
×
1211
  info.task.taskIdx = pState->id.taskIdx;
×
1212
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, pStream, NULL, pInfo, true));
×
1213
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
×
1214

1215
_exit:
×
1216

1217
  if (code) {
×
1218
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1219
  }
1220

1221
  return code;
×
1222
}
1223

1224
static int32_t msmTDAddTrigReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1225
  int32_t code = TSDB_CODE_SUCCESS;
×
1226
  int32_t lino = 0;
×
1227
  int64_t streamId = pStream->pCreate->streamId;
×
1228
  SSdb   *pSdb = pCtx->pMnode->pSdb;
×
1229
  SStmTaskStatus* pState = NULL;
×
1230
  SVgObj *pVgroup = NULL;
×
1231
  
1232
  switch (pStream->pCreate->triggerTblType) {
×
1233
    case TSDB_NORMAL_TABLE:
×
1234
    case TSDB_CHILD_TABLE:
1235
    case TSDB_VIRTUAL_CHILD_TABLE:
1236
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1237
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
×
1238
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
1239
      pState = taosArrayGet(pInfo->trigReaders, 0);
×
1240
      
1241
      TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, pStream->pCreate->triggerTblVgId, pInfo, pStream, streamId));
×
1242
      break;
×
1243
    }
1244
    case TSDB_SUPER_TABLE: {
×
1245
      SDbObj* pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
×
1246
      if (NULL == pDb) {
×
1247
        code = terrno;
×
1248
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
1249
        goto _exit;
×
1250
      }
1251

1252
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
×
1253
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
1254
      
1255
      void *pIter = NULL;
×
1256
      while (1) {
×
1257
        SStmTaskDeploy info = {0};
×
1258
        pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
×
1259
        if (pIter == NULL) {
×
1260
          break;
×
1261
        }
1262
      
1263
        if (pVgroup->dbUid == pDb->uid && !pVgroup->isTsma) {
×
1264
          pState = taosArrayReserve(pInfo->trigReaders, 1);
×
1265

1266
          code = msmTDAddSingleTrigReader(pCtx, pState, pVgroup->vgId, pInfo, pStream, streamId);
×
1267
          if (code) {
×
1268
            sdbRelease(pSdb, pVgroup);
×
1269
            sdbCancelFetch(pSdb, pIter);
×
1270
            pVgroup = NULL;
×
1271
            TAOS_CHECK_EXIT(code);
×
1272
          }
1273
        }
1274

1275
        sdbRelease(pSdb, pVgroup);
×
1276
      }
1277
      break;
×
1278
    }
1279
    default:
×
1280
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
×
1281
      break;
×
1282
  }
1283

1284
_exit:
×
1285

1286
  if (code) {
×
1287
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1288
  }
1289

1290
  return code;
×
1291
}
1292

1293
int32_t msmUPAddScanTask(SStmGrpCtx* pCtx, SStreamObj* pStream, char* scanPlan, int32_t vgId, int64_t taskId) {
×
1294
  int32_t code = TSDB_CODE_SUCCESS;
×
1295
  int32_t lino = 0;
×
1296
  SSubplan* pSubplan = NULL;
×
1297
  int64_t streamId = pStream->pCreate->streamId;
×
1298
  int64_t key[2] = {streamId, 0};
×
1299
  SStmTaskSrcAddr addr;
1300
  TAOS_CHECK_EXIT(nodesStringToNode(scanPlan, (SNode**)&pSubplan));
×
1301
  addr.isFromCache = false;
×
1302
  
1303
  if (MNODE_HANDLE == vgId) {
×
1304
    mndGetMnodeEpSet(pCtx->pMnode, &addr.epset);
×
1305
  } else if (vgId > MNODE_HANDLE) {
×
1306
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, vgId);
×
1307
  } else {
1308
    mstsError("invalid vgId %d in scanPlan", vgId);
×
1309
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1310
  }
1311
  
1312
  addr.taskId = taskId;
×
1313
  addr.vgId = vgId;
×
1314
  addr.groupId = pSubplan->id.groupId;
×
1315

1316
  key[1] = pSubplan->id.subplanId;
×
1317

1318
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
×
1319
  if (NULL == ppRes) {
×
1320
    SArray* pRes = taosArrayInit(1, sizeof(addr));
×
1321
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
×
1322
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
×
1323
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
×
1324
  } else {
1325
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1326
  }
1327

1328
  mstsDebug("calcReader %" PRIx64 " added to toUpdateScan, vgId:%d, groupId:%d, subplanId:%d", taskId, vgId, pSubplan->id.groupId, pSubplan->id.subplanId);
×
1329
  
1330
  atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
×
1331
  
1332
_exit:
×
1333

1334
  nodesDestroyNode((SNode*)pSubplan);
×
1335

1336
  if (code) {
×
1337
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1338
  }
1339

1340
  return code;
×
1341
}
1342

1343
int32_t msmUPAddCacheTask(SStmGrpCtx* pCtx, SStreamCalcScan* pScan, SStreamObj* pStream) {
×
1344
  int32_t code = TSDB_CODE_SUCCESS;
×
1345
  int32_t lino = 0;
×
1346
  SSubplan* pSubplan = NULL;
×
1347
  int64_t streamId = pStream->pCreate->streamId;
×
1348
  int64_t key[2] = {streamId, 0};
×
1349
  TAOS_CHECK_EXIT(nodesStringToNode(pScan->scanPlan, (SNode**)&pSubplan));
×
1350

1351
  SStmTaskSrcAddr addr;
1352
  addr.isFromCache = true;
×
1353
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pCtx->triggerNodeId);
×
1354
  addr.taskId = pCtx->triggerTaskId;
×
1355
  addr.vgId = pCtx->triggerNodeId;
×
1356
  addr.groupId = pSubplan->id.groupId;
×
1357

1358
  key[1] = pSubplan->id.subplanId;
×
1359
  SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
×
1360
  if (NULL == ppRes) {
×
1361
    SArray* pRes = taosArrayInit(1, sizeof(addr));
×
1362
    TSDB_CHECK_NULL(pRes, code, lino, _exit, terrno);
×
1363
    TSDB_CHECK_NULL(taosArrayPush(pRes, &addr), code, lino, _exit, terrno);
×
1364
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.toUpdateScanMap, key, sizeof(key), &pRes, POINTER_BYTES));
×
1365
  } else {
1366
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &addr), code, lino, _exit, terrno);
×
1367
  }
1368
  
1369
  atomic_add_fetch_32(&mStreamMgmt.toUpdateScanNum, 1);
×
1370
  
1371
_exit:
×
1372

1373
  nodesDestroyNode((SNode*)pSubplan);
×
1374
  
1375
  if (code) {
×
1376
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1377
  }
1378

1379
  return code;
×
1380
}
1381

1382

1383
static int32_t msmTDAddCalcReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1384
  int32_t code = TSDB_CODE_SUCCESS;
×
1385
  int32_t lino = 0;
×
1386
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
×
1387
  int64_t streamId = pStream->pCreate->streamId;
×
1388
  SStmTaskStatus* pState = NULL;
×
1389
  pInfo->calcReaders = taosArrayInit(calcTasksNum, sizeof(SStmTaskStatus));
×
1390
  TSDB_CHECK_NULL(pInfo->calcReaders, code, lino, _exit, terrno);
×
1391
  
1392
  for (int32_t i = 0; i < calcTasksNum; ++i) {
×
1393
    SStreamCalcScan* pScan = taosArrayGet(pInfo->pCreate->calcScanPlanList, i);
×
1394
    if (pScan->readFromCache) {
×
1395
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
×
1396
      continue;
×
1397
    }
1398
    
1399
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
×
1400
    for (int32_t m = 0; m < vgNum; ++m) {
×
1401
      pState = taosArrayReserve(pInfo->calcReaders, 1);
×
1402

1403
      pState->id.taskId = msmAssignTaskId();
×
1404
      pState->id.deployId = 0;
×
1405
      pState->id.seriousId = msmAssignTaskSeriousId();
×
1406
      pState->id.nodeId = *(int32_t*)taosArrayGet(pScan->vgList, m);
×
1407
      pState->id.taskIdx = i;
×
1408
      pState->type = STREAM_READER_TASK;
×
1409
      pState->flags = 0;
×
1410
      pState->status = STREAM_STATUS_UNDEPLOYED;
×
1411
      pState->lastUpTs = pCtx->currTs;
×
1412
      pState->pStream = pInfo;
×
1413

1414
      SStmTaskDeploy info = {0};
×
1415
      info.task.type = pState->type;
×
1416
      info.task.streamId = streamId;
×
1417
      info.task.taskId = pState->id.taskId;
×
1418
      info.task.flags = pState->flags;
×
1419
      info.task.seriousId = pState->id.seriousId;
×
1420
      info.task.nodeId = pState->id.nodeId;
×
1421
      info.task.taskIdx = pState->id.taskIdx;
×
1422
      TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, pStream, pScan->scanPlan, pInfo, false));
×
1423
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pState->id.nodeId, pState->id.taskId));
×
1424
      TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, streamId));
×
1425
    }
1426
  }
1427

1428
_exit:
×
1429

1430
  if (code) {
×
1431
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1432
  }
1433

1434
  return code;
×
1435
}
1436

1437

1438

1439
static int32_t msmUPPrepareReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1440
  int32_t code = TSDB_CODE_SUCCESS;
×
1441
  int32_t lino = 0;
×
1442
  int64_t streamId = pStream->pCreate->streamId;
×
1443
  int32_t calcTasksNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
×
1444
  if (calcTasksNum <= 0) {
×
1445
    mstsDebug("no calc scan plan, ignore parepare reader tasks, readerNum:%d", (int32_t)taosArrayGetSize(pInfo->calcReaders));
×
1446
    return code;    
×
1447
  }
1448
  
1449
  SStmTaskStatus* pReader = taosArrayGet(pInfo->calcReaders, 0);
×
1450
  
1451
  for (int32_t i = 0; i < calcTasksNum; ++i) {
×
1452
    SStreamCalcScan* pScan = taosArrayGet(pStream->pCreate->calcScanPlanList, i);
×
1453
    if (pScan->readFromCache) {
×
1454
      TAOS_CHECK_EXIT(msmUPAddCacheTask(pCtx, pScan, pStream));
×
1455
      continue;
×
1456
    }
1457
    
1458
    int32_t vgNum = taosArrayGetSize(pScan->vgList);
×
1459
    for (int32_t m = 0; m < vgNum; ++m) {
×
1460
      TAOS_CHECK_EXIT(msmUPAddScanTask(pCtx, pStream, pScan->scanPlan, pReader->id.nodeId, pReader->id.taskId));
×
1461
      pReader++;
×
1462
    }
1463
  }
1464

1465
_exit:
×
1466

1467
  if (code) {
×
1468
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1469
  }
1470

1471
  return code;
×
1472
}
1473

1474
static int32_t msmBuildReaderTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1475
  int32_t code = TSDB_CODE_SUCCESS;
×
1476
  int32_t lino = 0;
×
1477
  int64_t streamId = pStream->pCreate->streamId;
×
1478
  
1479
  TAOS_CHECK_EXIT(msmTDAddTrigReaderTasks(pCtx, pInfo, pStream));
×
1480
  TAOS_CHECK_EXIT(msmTDAddCalcReaderTasks(pCtx, pInfo, pStream));
×
1481

1482
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->trigReaders, NULL));
×
1483
  TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->calcReaders, NULL));
×
1484
  
1485
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->trigReaders, NULL, true));
×
1486
  TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, pInfo->calcReaders, NULL, false));
×
1487
  
1488
_exit:
×
1489

1490
  if (code) {
×
1491
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1492
  }
1493
  
1494
  return code;
×
1495
}
1496

1497
int32_t msmUpdatePlanSourceAddr(SStreamTask* pTask, int64_t streamId, SSubplan* plan, int64_t clientId, SStmTaskSrcAddr* pSrc, int32_t msgType, int64_t srcSubplanId) {
×
1498
  SDownstreamSourceNode source = {
×
1499
      .type = QUERY_NODE_DOWNSTREAM_SOURCE,
1500
      .clientId = clientId,
1501
      .taskId = pSrc->taskId,
×
1502
      .sId = 0,
1503
      .execId = 0,
1504
      .fetchMsgType = msgType,
1505
      .localExec = false,
1506
  };
1507

1508
  source.addr.epSet = pSrc->epset;
×
1509
  source.addr.nodeId = pSrc->vgId;
×
1510

1511
  msttDebug("try to update subplan %d grp %d sourceAddr from subplan %" PRId64 ", clientId:%" PRIx64 ", srcTaskId:%" PRIx64 ", srcNodeId:%d, msgType:%s", 
×
1512
      plan->id.subplanId, pSrc->groupId, srcSubplanId, source.clientId, source.taskId, source.addr.nodeId, TMSG_INFO(source.fetchMsgType));
1513
  
1514
  return qSetSubplanExecutionNode(plan, pSrc->groupId, &source);
×
1515
}
1516

1517
int32_t msmGetTaskIdFromSubplanId(SStreamObj* pStream, SArray* pRunners, int32_t beginIdx, int32_t subplanId, int64_t* taskId, SStreamTask** ppParent) {
×
1518
  int64_t streamId = pStream->pCreate->streamId;
×
1519
  int32_t runnerNum = taosArrayGetSize(pRunners);
×
1520
  for (int32_t i = beginIdx; i < runnerNum; ++i) {
×
1521
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
×
1522
    SSubplan* pPlan = pDeploy->msg.runner.pPlan;
×
1523
    if (pPlan->id.subplanId == subplanId) {
×
1524
      *taskId = pDeploy->task.taskId;
×
1525
      *ppParent = &pDeploy->task;
×
1526
      return TSDB_CODE_SUCCESS;
×
1527
    }
1528
  }
1529

1530
  mstsError("subplanId %d not found in runner list", subplanId);
×
1531

1532
  return TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
1533
}
1534

1535
int32_t msmUpdateLowestPlanSourceAddr(SSubplan* pPlan, SStmTaskDeploy* pDeploy, int64_t streamId) {
×
1536
  int32_t code = TSDB_CODE_SUCCESS;
×
1537
  int32_t lino = 0;
×
1538
  int64_t key[2] = {streamId, -1};
×
1539
  SNode* pNode = NULL;
×
1540
  SStreamTask* pTask = &pDeploy->task;
×
1541
  FOREACH(pNode, pPlan->pChildren) {
×
1542
    if (QUERY_NODE_VALUE != nodeType(pNode)) {
×
1543
      msttDebug("node type %d is not valueNode, skip it", nodeType(pNode));
×
1544
      continue;
×
1545
    }
1546
    
1547
    SValueNode* pVal = (SValueNode*)pNode;
×
1548
    if (TSDB_DATA_TYPE_BIGINT != pVal->node.resType.type) {
×
1549
      msttWarn("invalid value node data type %d for runner's child subplan", pVal->node.resType.type);
×
1550
      continue;
×
1551
    }
1552

1553
    key[1] = MND_GET_RUNNER_SUBPLANID(pVal->datum.i);
×
1554

1555
    SArray** ppRes = taosHashGet(mStreamMgmt.toUpdateScanMap, key, sizeof(key));
×
1556
    if (NULL == ppRes) {
×
1557
      msttError("lowest runner subplan ID:%d,%d can't get its child ID:%" PRId64 " addr", pPlan->id.groupId, pPlan->id.subplanId, key[1]);
×
1558
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1559
    }
1560

1561
    int32_t childrenNum = taosArrayGetSize(*ppRes);
×
1562
    for (int32_t i = 0; i < childrenNum; ++i) {
×
1563
      SStmTaskSrcAddr* pAddr = taosArrayGet(*ppRes, i);
×
1564
      TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(pTask, streamId, pPlan, pDeploy->task.taskId, pAddr, pAddr->isFromCache ? TDMT_STREAM_FETCH_FROM_CACHE : TDMT_STREAM_FETCH, key[1]));
×
1565
    }
1566
  }
1567

1568
_exit:
×
1569

1570
  if (code) {
×
1571
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1572
  }
1573

1574
  return code;
×
1575
}
1576

1577
int32_t msmUpdateRunnerPlan(SStmGrpCtx* pCtx, SArray* pRunners, int32_t beginIdx, SStmTaskDeploy* pDeploy, SStreamObj* pStream) {
×
1578
  int32_t code = TSDB_CODE_SUCCESS;
×
1579
  int32_t lino = 0;
×
1580
  SSubplan* pPlan = pDeploy->msg.runner.pPlan;
×
1581
  SStreamTask* pTask = &pDeploy->task;
×
1582
  SStreamTask* parentTask = NULL;
×
1583
  int64_t streamId = pStream->pCreate->streamId;
×
1584

1585
  TAOS_CHECK_EXIT(msmUpdateLowestPlanSourceAddr(pPlan, pDeploy, streamId));
×
1586
  
1587
  if (NULL == pPlan->pParents) {
×
1588
    goto _exit;
×
1589
  }
1590

1591
  SNode* pNode = NULL;
×
1592
  int64_t parentTaskId = 0;
×
1593
  SStmTaskSrcAddr addr = {0};
×
1594
  addr.taskId = pDeploy->task.taskId;
×
1595
  addr.vgId = pDeploy->task.nodeId;
×
1596
  addr.groupId = pPlan->id.groupId;
×
1597
  addr.epset = mndGetDnodeEpsetById(pCtx->pMnode, pDeploy->task.nodeId);
×
1598
  FOREACH(pNode, pPlan->pParents) {
×
1599
    SSubplan* pSubplan = (SSubplan*)pNode;
×
1600
    TAOS_CHECK_EXIT(msmGetTaskIdFromSubplanId(pStream, pRunners, beginIdx, pSubplan->id.subplanId, &parentTaskId, &parentTask));
×
1601
    TAOS_CHECK_EXIT(msmUpdatePlanSourceAddr(parentTask, streamId, pSubplan, parentTaskId, &addr, TDMT_STREAM_FETCH_FROM_RUNNER, pPlan->id.subplanId));
×
1602
  }
1603
  
1604
_exit:
×
1605

1606
  if (code) {
×
1607
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1608
  }
1609

1610
  return code;
×
1611
}
1612

1613
int32_t msmUpdateRunnerPlans(SStmGrpCtx* pCtx, SArray* pRunners, SStreamObj* pStream) {
×
1614
  int32_t code = TSDB_CODE_SUCCESS;
×
1615
  int32_t lino = 0;
×
1616
  int64_t streamId = pStream->pCreate->streamId;
×
1617
  int32_t runnerNum = taosArrayGetSize(pRunners);
×
1618
  
1619
  for (int32_t i = 0; i < runnerNum; ++i) {
×
1620
    SStmTaskDeploy* pDeploy = taosArrayGet(pRunners, i);
×
1621
    TAOS_CHECK_EXIT(msmUpdateRunnerPlan(pCtx, pRunners, i, pDeploy, pStream));
×
1622
    TAOS_CHECK_EXIT(nodesNodeToString((SNode*)pDeploy->msg.runner.pPlan, false, (char**)&pDeploy->msg.runner.pPlan, NULL));
×
1623

1624
    SStreamTask* pTask = &pDeploy->task;
×
1625
    msttDebugL("runner updated task plan:%s", (const char*)pDeploy->msg.runner.pPlan);
×
1626
  }
1627

1628
_exit:
×
1629

1630
  if (code) {
×
1631
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1632
  }
1633

1634
  return code;
×
1635
}
1636

1637
int32_t msmBuildRunnerTasksImpl(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream) {
×
1638
  int32_t code = 0;
×
1639
  int32_t lino = 0;
×
1640
  int64_t streamId = pStream->pCreate->streamId;
×
1641
  SArray* deployTaskList = NULL;
×
1642
  SArray* deployList = NULL;
×
1643
  int32_t deployNodeId = 0;
×
1644
  SStmTaskStatus* pState = NULL;
×
1645
  int32_t taskIdx = 0;
×
1646
  SNodeListNode *plans = NULL;
×
1647
  int32_t        taskNum = 0;
×
1648
  int32_t        totalTaskNum = 0;
×
1649

1650
  if (pDag->numOfSubplans <= 0) {
×
1651
    mstsError("invalid subplan num:%d", pDag->numOfSubplans);
×
1652
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1653
  }
1654

1655
  if (pDag->numOfSubplans != pStream->pCreate->numOfCalcSubplan) {
×
1656
    mstsError("numOfCalcSubplan %d mismatch with numOfSubplans %d", pStream->pCreate->numOfCalcSubplan, pDag->numOfSubplans);
×
1657
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1658
  }
1659

1660
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
×
1661
  if (levelNum <= 0) {
×
1662
    mstsError("invalid level num:%d", levelNum);
×
1663
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1664
  }
1665

1666
  int32_t        lowestLevelIdx = levelNum - 1;
×
1667
  
1668
  plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, 0);
×
1669
  if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
×
1670
    mstsError("invalid level plan, level:0, planNodeType:%d", nodeType(plans));
×
1671
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1672
  }
1673
  
1674
  taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
×
1675
  if (taskNum != 1) {
×
1676
    mstsError("invalid level plan number:%d, level:0", taskNum);
×
1677
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1678
  }
1679

1680
  deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
×
1681
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
×
1682
  
1683
  for (int32_t deployId = 0; deployId < pInfo->runnerDeploys; ++deployId) {
×
1684
    totalTaskNum = 0;
×
1685

1686
    deployList = pInfo->runners[deployId];
×
1687
    deployNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == deployId) ? true : false);
×
1688
    if (!GOT_SNODE(deployNodeId)) {
×
1689
      TAOS_CHECK_EXIT(terrno);
×
1690
    }
1691

1692
    taskIdx = 0;
×
1693
    
1694
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
×
1695
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
×
1696
      if (NULL == plans) {
×
1697
        mstsError("empty level plan, level:%d", i);
×
1698
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1699
      }
1700

1701
      if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
×
1702
        mstsError("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
×
1703
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1704
      }
1705

1706
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
×
1707
      if (taskNum <= 0) {
×
1708
        mstsError("invalid level plan number:%d, level:%d", taskNum, i);
×
1709
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1710
      }
1711

1712
      totalTaskNum += taskNum;
×
1713
      if (totalTaskNum > pDag->numOfSubplans) {
×
1714
        mstsError("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
×
1715
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1716
      }
1717

1718
      for (int32_t n = 0; n < taskNum; ++n) {
×
1719
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
×
1720
        pState = taosArrayReserve(deployList, 1);
×
1721

1722
        pState->id.taskId = msmAssignTaskId();
×
1723
        pState->id.deployId = deployId;
×
1724
        pState->id.seriousId = msmAssignTaskSeriousId();
×
1725
        pState->id.nodeId = deployNodeId;
×
1726
        pState->id.taskIdx = MND_SET_RUNNER_TASKIDX(i, n);
×
1727
        pState->type = STREAM_RUNNER_TASK;
×
1728
        pState->flags = (0 == i) ? STREAM_FLAG_TOP_RUNNER : 0;
×
1729
        pState->status = STREAM_STATUS_UNDEPLOYED;
×
1730
        pState->lastUpTs = pCtx->currTs;
×
1731
        pState->pStream = pInfo;
×
1732

1733
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
×
1734
        pDeploy->task.type = pState->type;
×
1735
        pDeploy->task.streamId = streamId;
×
1736
        pDeploy->task.taskId = pState->id.taskId;
×
1737
        pDeploy->task.flags = pState->flags;
×
1738
        pDeploy->task.seriousId = pState->id.seriousId;
×
1739
        pDeploy->task.deployId = pState->id.deployId;
×
1740
        pDeploy->task.nodeId = pState->id.nodeId;
×
1741
        pDeploy->task.taskIdx = pState->id.taskIdx;
×
1742
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
×
1743

1744
        SStreamTask* pTask = &pDeploy->task;
×
1745
        msttDebug("runner task deploy built, subplan level:%d, taskIdx:%d, groupId:%d, subplanId:%d",
×
1746
            i, pTask->taskIdx, plan->id.groupId, plan->id.subplanId);
1747
      }
1748

1749
      mstsDebug("deploy %d level %d initialized, taskNum:%d", deployId, i, taskNum);
×
1750
    }
1751

1752
    if (totalTaskNum != pDag->numOfSubplans) {
×
1753
      mstsError("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
×
1754
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
1755
    }
1756

1757
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
×
1758

1759
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
×
1760

1761
    nodesDestroyNode((SNode *)pDag);
×
1762
    pDag = NULL;
×
1763
    
1764
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
×
1765

1766
    mstsDebug("total %d runner tasks added for deploy %d", totalTaskNum, deployId);
×
1767
  }
1768

1769
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
×
1770
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, pInfo->runners[i], NULL));
×
1771
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[i], NULL, 0, i));
×
1772
  }
1773
  
1774
  pInfo->runnerNum = totalTaskNum;
×
1775
  
1776
_exit:
×
1777

1778
  if (code) {
×
1779
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1780
  }
1781

1782
  taosArrayDestroy(deployTaskList);
×
1783
  nodesDestroyNode((SNode *)pDag);
×
1784

1785
  return code;
×
1786
}
1787

1788
int32_t msmReBuildRunnerTasks(SStmGrpCtx* pCtx, SQueryPlan* pDag, SStmStatus* pInfo, SStreamObj* pStream, SStmTaskAction* pAction) {
×
1789
  int32_t code = 0;
×
1790
  int32_t lino = 0;
×
1791
  int64_t streamId = pStream->pCreate->streamId;
×
1792
  int32_t newNodeId = 0;
×
1793
  int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
×
1794
  int32_t        lowestLevelIdx = levelNum - 1;
×
1795
  SNodeListNode *plans = NULL;
×
1796
  int32_t        taskNum = 0;
×
1797
  int32_t        totalTaskNum = 0;
×
1798
  int32_t        deployId = 0;
×
1799
  SStmTaskStatus* pRunner = NULL;
×
1800
  SStmTaskStatus* pStartRunner = NULL;
×
1801
  int32_t taskIdx = 0;
×
1802
  SArray* deployTaskList = taosArrayInit_s(sizeof(SStmTaskDeploy), pDag->numOfSubplans);
×
1803
  TSDB_CHECK_NULL(deployTaskList, code, lino, _exit, terrno);
×
1804

1805
  for (int32_t r = 0; r < pAction->deployNum; ++r) {
×
1806
    deployId = pAction->deployId[r];
×
1807

1808
    pRunner = taosArrayGet(pInfo->runners[deployId], 0);
×
1809

1810
    pStartRunner = pRunner;
×
1811
    totalTaskNum = 0;
×
1812

1813
    newNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, (0 == r) ? true : false);
×
1814
    if (!GOT_SNODE(newNodeId)) {
×
1815
      TAOS_CHECK_EXIT(terrno);
×
1816
    }
1817

1818
    taskIdx = 0;
×
1819
    
1820
    for (int32_t i = lowestLevelIdx; i >= 0; --i) {
×
1821
      plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
×
1822
      taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
×
1823
      totalTaskNum += taskNum;
×
1824

1825
      pRunner->flags &= STREAM_FLAG_REDEPLOY_RUNNER;
×
1826
      
1827
      for (int32_t n = 0; n < taskNum; ++n) {
×
1828
        SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
×
1829

1830
        int32_t newTaskIdx = MND_SET_RUNNER_TASKIDX(i, n);
×
1831
        if (pRunner->id.taskIdx != newTaskIdx) {
×
1832
          mstsError("runner TASK:%" PRId64 " taskIdx %d mismatch with newTaskIdx:%d", pRunner->id.taskId, pRunner->id.taskIdx, newTaskIdx);
×
1833
          TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
1834
        }
1835

1836
        pRunner->id.nodeId = newNodeId;
×
1837

1838
        SStmTaskDeploy* pDeploy = taosArrayGet(deployTaskList, taskIdx++);
×
1839
        pDeploy->task.type = pRunner->type;
×
1840
        pDeploy->task.streamId = streamId;
×
1841
        pDeploy->task.taskId = pRunner->id.taskId;
×
1842
        pDeploy->task.flags = pRunner->flags;
×
1843
        pDeploy->task.seriousId = pRunner->id.seriousId;
×
1844
        pDeploy->task.nodeId = pRunner->id.nodeId;
×
1845
        pDeploy->task.taskIdx = pRunner->id.taskIdx;
×
1846
        TAOS_CHECK_EXIT(msmBuildRunnerDeployInfo(pDeploy, plan, pStream, pInfo, 0 == i));
×
1847

1848
        pRunner++;
×
1849
      }
1850

1851
      mstsDebug("level %d initialized, taskNum:%d", i, taskNum);
×
1852
    }
1853

1854
    TAOS_CHECK_EXIT(msmUpdateRunnerPlans(pCtx, deployTaskList, pStream));
×
1855

1856
    TAOS_CHECK_EXIT(msmTDAddRunnersToSnodeMap(deployTaskList, pStream));
×
1857

1858
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, pInfo->runners[deployId], NULL, 0, deployId));
×
1859

1860
    nodesDestroyNode((SNode *)pDag);
×
1861
    pDag = NULL;
×
1862

1863
    TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pDag));
×
1864
  }
1865

1866
_exit:
×
1867

1868
  if (code) {
×
1869
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1870
  }
1871

1872
  nodesDestroyNode((SNode *)pDag);
×
1873
  taosArrayDestroy(deployTaskList);
×
1874

1875
  return code;
×
1876
}
1877

1878

1879
int32_t msmSetStreamRunnerExecReplica(int64_t streamId, SStmStatus* pInfo) {
×
1880
  int32_t code = TSDB_CODE_SUCCESS;
×
1881
  int32_t lino = 0;
×
1882
  //STREAMTODO 
1883
  
1884
  pInfo->runnerDeploys = MND_STREAM_RUNNER_DEPLOY_NUM;
×
1885
  pInfo->runnerReplica = 3;
×
1886

1887
_exit:
×
1888

1889
  if (code) {
×
1890
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1891
  }
1892

1893
  return code;
×
1894
}
1895

1896

1897
static int32_t msmBuildRunnerTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1898
  if (NULL == pStream->pCreate->calcPlan) {
×
1899
    return TSDB_CODE_SUCCESS;
×
1900
  }
1901
  
1902
  int32_t code = TSDB_CODE_SUCCESS;
×
1903
  int32_t lino = 0;
×
1904
  int64_t streamId = pStream->pCreate->streamId;
×
1905
  SQueryPlan* pPlan = NULL;
×
1906

1907
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
×
1908

1909
  for (int32_t i = 0; i < pInfo->runnerDeploys; ++i) {
×
1910
    pInfo->runners[i] = taosArrayInit(pPlan->numOfSubplans, sizeof(SStmTaskStatus));
×
1911
    TSDB_CHECK_NULL(pInfo->runners[i], code, lino, _exit, terrno);
×
1912
  }
1913

1914
  code = msmBuildRunnerTasksImpl(pCtx, pPlan, pInfo, pStream);
×
1915
  pPlan = NULL;
×
1916
  
1917
  TAOS_CHECK_EXIT(code);
×
1918

1919
  taosHashClear(mStreamMgmt.toUpdateScanMap);
×
1920
  mStreamMgmt.toUpdateScanNum = 0;
×
1921

1922
_exit:
×
1923

1924
  nodesDestroyNode((SNode *)pPlan);
×
1925

1926
  if (code) {
×
1927
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1928
  }
1929

1930
  return code;
×
1931
}
1932

1933

1934
static int32_t msmBuildStreamTasks(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1935
  int32_t code = TSDB_CODE_SUCCESS;
×
1936
  int32_t lino = 0;
×
1937
  int64_t streamId = pStream->pCreate->streamId;
×
1938

1939
  mstsInfo("start to deploy stream tasks, deployTimes:%" PRId64, pInfo->deployTimes);
×
1940

1941
  pCtx->triggerTaskId = msmAssignTaskId();
×
1942
  pCtx->triggerNodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
×
1943
  if (!GOT_SNODE(pCtx->triggerNodeId)) {
×
1944
    TAOS_CHECK_EXIT(terrno);
×
1945
  }
1946

1947
  TAOS_CHECK_EXIT(msmBuildReaderTasks(pCtx, pInfo, pStream));
×
1948
  TAOS_CHECK_EXIT(msmBuildRunnerTasks(pCtx, pInfo, pStream));
×
1949
  TAOS_CHECK_EXIT(msmBuildTriggerTasks(pCtx, pInfo, pStream));
×
1950
  
1951
_exit:
×
1952

1953
  if (code) {
×
1954
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
1955
  }
1956

1957
  return code;
×
1958
}
1959

1960
static int32_t msmInitTrigReaderList(SStmGrpCtx* pCtx, SStmStatus* pInfo, SStreamObj* pStream) {
×
1961
  int32_t code = TSDB_CODE_SUCCESS;
×
1962
  int32_t lino = 0;
×
1963
  int64_t streamId = pStream->pCreate->streamId;
×
1964
  SSdb   *pSdb = pCtx->pMnode->pSdb;
×
1965
  SStmTaskStatus* pState = NULL;
×
1966
  
1967
  switch (pStream->pCreate->triggerTblType) {
×
1968
    case TSDB_NORMAL_TABLE:
×
1969
    case TSDB_CHILD_TABLE:
1970
    case TSDB_VIRTUAL_CHILD_TABLE:
1971
    case TSDB_VIRTUAL_NORMAL_TABLE: {
1972
      pInfo->trigReaders = taosArrayInit_s(sizeof(SStmTaskStatus), 1);
×
1973
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
1974
      pInfo->trigReaderNum = 1;
×
1975
      break;
×
1976
    }
1977
    case TSDB_SUPER_TABLE: {
×
1978
      SDbObj* pDb = mndAcquireDb(pCtx->pMnode, pStream->pCreate->triggerDB);
×
1979
      if (NULL == pDb) {
×
1980
        code = terrno;
×
1981
        mstsError("failed to acquire db %s, error:%s", pStream->pCreate->triggerDB, terrstr());
×
1982
        goto _exit;
×
1983
      }
1984

1985
      pInfo->trigReaders = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SStmTaskStatus));
×
1986
      TSDB_CHECK_NULL(pInfo->trigReaders, code, lino, _exit, terrno);
×
1987
      pInfo->trigReaderNum = pDb->cfg.numOfVgroups;
×
1988
      break;
×
1989
    }
1990
    default:
×
1991
      pInfo->trigReaderNum = 0;
×
1992
      mstsDebug("%s ignore triggerTblType %d", __FUNCTION__, pStream->pCreate->triggerTblType);
×
1993
      break;
×
1994
  }
1995

1996
_exit:
×
1997

1998
  if (code) {
×
1999
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2000
  }
2001

2002
  return code;
×
2003
}
2004

2005

2006
static int32_t msmInitStmStatus(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStreamObj* pStream, bool initList) {
×
2007
  int32_t code = TSDB_CODE_SUCCESS;
×
2008
  int32_t lino = 0;
×
2009
  int64_t streamId = pStream->pCreate->streamId;
×
2010

2011
  pStatus->lastActionTs = INT64_MIN;
×
2012

2013
  if (NULL == pStatus->streamName) {
×
2014
    pStatus->streamName = taosStrdup(pStream->name);
×
2015
    TSDB_CHECK_NULL(pStatus->streamName, code, lino, _exit, terrno);
×
2016
  }
2017

2018
  TAOS_CHECK_EXIT(tCloneStreamCreateDeployPointers(pStream->pCreate, &pStatus->pCreate));
×
2019
  
2020
  if (pStream->pCreate->numOfCalcSubplan > 0) {
×
2021
    pStatus->runnerNum = pStream->pCreate->numOfCalcSubplan;
×
2022
    
2023
    TAOS_CHECK_EXIT(msmSetStreamRunnerExecReplica(streamId, pStatus));
×
2024
  }
2025

2026
  if (initList) {
×
2027
    TAOS_CHECK_EXIT(msmInitTrigReaderList(pCtx, pStatus, pStream));
×
2028

2029
    int32_t subPlanNum = taosArrayGetSize(pStream->pCreate->calcScanPlanList);
×
2030
    if (subPlanNum > 0) {
×
2031
      pStatus->calcReaderNum = subPlanNum;
×
2032
      pStatus->calcReaders = taosArrayInit(subPlanNum, sizeof(SStmTaskStatus));
×
2033
      TSDB_CHECK_NULL(pStatus->calcReaders, code, lino, _exit, terrno);
×
2034
    }
2035

2036
    if (pStatus->runnerNum > 0) {
×
2037
      for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
2038
        pStatus->runners[i] = taosArrayInit(pStatus->runnerNum, sizeof(SStmTaskStatus));
×
2039
        TSDB_CHECK_NULL(pStatus->runners[i], code, lino, _exit, terrno);
×
2040
      }
2041
    }
2042
  }
2043
  
2044
_exit:
×
2045

2046
  if (code) {
×
2047
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2048
  }
2049

2050
  return code;
×
2051
}
2052

2053
static int32_t msmDeployStreamTasks(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmStatus* pStatus) {
×
2054
  int32_t code = TSDB_CODE_SUCCESS;
×
2055
  int32_t lino = 0;
×
2056
  int64_t streamId = pStream->pCreate->streamId;
×
2057
  SStmStatus info = {0};
×
2058

2059
  if (NULL == pStatus) {
×
2060
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &info, pStream, false));
×
2061

2062
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &info, sizeof(info)));
×
2063

2064
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
2065
  }
2066
  
2067
  TAOS_CHECK_EXIT(msmBuildStreamTasks(pCtx, pStatus, pStream));
×
2068

2069
  mstLogSStmStatus("stream deployed", streamId, pStatus);
×
2070

2071
_exit:
×
2072

2073
  if (code) {
×
2074
    if (NULL != pStatus) {
×
2075
      msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2076
      mstsError("stream build error:%s, will try to stop current stream", tstrerror(code));
×
2077
    }
2078
    
2079
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2080
  }
2081

2082
  return code;
×
2083
}
2084

2085

2086
static int32_t msmSTRemoveStream(int64_t streamId, bool fromStreamMap) {
×
2087
  int32_t code = TSDB_CODE_SUCCESS;
×
2088
  void* pIter = NULL;
×
2089

2090
  while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
×
2091
    SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
×
2092
    mstWaitLock(&pVg->lock, true);
×
2093

2094
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
×
2095
    if (atomic_load_32(&pVg->deployed) == taskNum) {
×
2096
      taosRUnLockLatch(&pVg->lock);
×
2097
      continue;
×
2098
    }
2099

2100
    for (int32_t i = 0; i < taskNum; ++i) {
×
2101
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
×
2102
      if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
×
2103
        continue;
×
2104
      }
2105
      
2106
      pExt->deployed = true;
×
2107
    }
2108
    
2109
    taosRUnLockLatch(&pVg->lock);
×
2110
  }
2111

2112
  while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
×
2113
    SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
×
2114
    mstWaitLock(&pSnode->lock, true);
×
2115

2116
    int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
×
2117
    if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
×
2118
      for (int32_t i = 0; i < taskNum; ++i) {
×
2119
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
×
2120
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
×
2121
          continue;
×
2122
        }
2123
        
2124
        pExt->deployed = true;
×
2125
      }
2126
    }
2127

2128
    taskNum = taosArrayGetSize(pSnode->runnerList);
×
2129
    if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
×
2130
      for (int32_t i = 0; i < taskNum; ++i) {
×
2131
        SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
×
2132
        if (pExt->deployed || pExt->deploy.task.streamId != streamId) {
×
2133
          continue;
×
2134
        }
2135
        
2136
        pExt->deployed = true;
×
2137
      }
2138
    }
2139

2140
    taosRUnLockLatch(&pSnode->lock);
×
2141
  }
2142

2143
  
2144
  while ((pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter))) {
×
2145
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
×
2146
    code = taosHashRemove(pSnode->streamTasks, &streamId, sizeof(streamId));
×
2147
    if (TSDB_CODE_SUCCESS == code) {
×
2148
      mstsDebug("stream removed from snodeMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pSnode->streamTasks));
×
2149
    }
2150
  }
2151

2152
  while ((pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter))) {
×
2153
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
×
2154
    code = taosHashRemove(pVg->streamTasks, &streamId, sizeof(streamId));
×
2155
    if (TSDB_CODE_SUCCESS == code) {
×
2156
      mstsDebug("stream removed from vgroupMap %d, remainStreams:%d", *(int32_t*)taosHashGetKey(pIter, NULL), (int32_t)taosHashGetSize(pVg->streamTasks));
×
2157
    }
2158
  }
2159

2160
  size_t keyLen = 0;
×
2161
  while ((pIter = taosHashIterate(mStreamMgmt.taskMap, pIter))) {
×
2162
    int64_t* pStreamId = taosHashGetKey(pIter, &keyLen);
×
2163
    if (*pStreamId == streamId) {
×
2164
      int64_t taskId = *(pStreamId + 1);
×
2165
      code = taosHashRemove(mStreamMgmt.taskMap, pStreamId, keyLen);
×
2166
      if (code) {
×
2167
        mstsError("TASK:%" PRId64 " remove from taskMap failed, error:%s", taskId, tstrerror(code));
×
2168
      } else {
2169
        mstsDebug("TASK:%" PRId64 " removed from taskMap", taskId);
×
2170
      }
2171
    }
2172
  }
2173

2174
  if (fromStreamMap) {
×
2175
    code = taosHashRemove(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
2176
    if (code) {
×
2177
      mstsError("stream remove from streamMap failed, error:%s", tstrerror(code));
×
2178
    } else {
2179
      mstsDebug("stream removed from streamMap, remains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2180
    }
2181
  }
2182
  
2183
  return code;
×
2184
}
2185

2186
static void msmResetStreamForRedeploy(int64_t streamId, SStmStatus* pStatus) {
×
2187
  mstsInfo("try to reset stream for redeploy, stopped:%d, current deployTimes:%" PRId64, atomic_load_8(&pStatus->stopped), pStatus->deployTimes);
×
2188
  
2189
  msmSTRemoveStream(streamId, false);  
×
2190

2191
  mstResetSStmStatus(pStatus);
×
2192

2193
  pStatus->deployTimes++;
×
2194
}
×
2195

2196
static int32_t msmLaunchStreamDeployAction(SStmGrpCtx* pCtx, SStmStreamAction* pAction) {
×
2197
  int32_t code = TSDB_CODE_SUCCESS;
×
2198
  int32_t lino = 0;
×
2199
  int64_t streamId = pAction->streamId;
×
2200
  char* streamName = pAction->streamName;
×
2201
  SStreamObj* pStream = NULL;
×
2202
  int8_t stopped = 0;
×
2203

2204
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
2205
  if (pStatus) {
×
2206
    stopped = atomic_load_8(&pStatus->stopped);
×
2207
    if (0 == stopped) {
×
2208
      mstsDebug("stream %s already running and in streamMap, ignore deploy it", pAction->streamName);
×
2209
      return code;
×
2210
    }
2211

2212
    if (MST_IS_USER_STOPPED(stopped) && !pAction->userAction) {
×
2213
      mstsWarn("stream %s already stopped by user, stopped:%d, ignore deploy it", pAction->streamName, stopped);
×
2214
      return code;
×
2215
    }
2216
    
2217
    if (stopped == atomic_val_compare_exchange_8(&pStatus->stopped, stopped, 0)) {
×
2218
      mstsDebug("stream %s will try to reset and redeploy it", pAction->streamName);
×
2219
      msmResetStreamForRedeploy(streamId, pStatus);
×
2220
    }
2221
  }
2222

2223
  code = mndAcquireStream(pCtx->pMnode, streamName, &pStream);
×
2224
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
×
2225
    mstsWarn("stream %s no longer exists, ignore deploy", streamName);
×
2226
    return TSDB_CODE_SUCCESS;
×
2227
  }
2228

2229
  TAOS_CHECK_EXIT(code);
×
2230

2231
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
×
2232
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
×
2233
  if (userStopped || userDropped) {
×
2234
    mstsWarn("stream %s is stopped %d or removing %d, ignore deploy", streamName, userStopped, userDropped);
×
2235
    goto _exit;
×
2236
  }
2237
  
2238
  TAOS_CHECK_EXIT(msmDeployStreamTasks(pCtx, pStream, pStatus));
×
2239

2240
_exit:
×
2241

2242
  mndReleaseStream(pCtx->pMnode, pStream);
×
2243

2244
  if (code) {
×
2245
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2246
  }
2247

2248
  return code;
×
2249
}
2250

2251
static int32_t msmReLaunchReaderTask(SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
×
2252
  int32_t code = TSDB_CODE_SUCCESS;
×
2253
  int32_t lino = 0;
×
2254
  int64_t streamId = pAction->streamId;
×
2255
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
×
2256
  if (NULL == ppTask) {
×
2257
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
×
2258
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2259
  }
2260
  
2261
  SStmTaskDeploy info = {0};
×
2262
  info.task.type = pAction->type;
×
2263
  info.task.streamId = pAction->streamId;
×
2264
  info.task.taskId = pAction->id.taskId;
×
2265
  info.task.seriousId = (*ppTask)->id.seriousId;
×
2266
  info.task.nodeId = pAction->id.nodeId;
×
2267
  info.task.taskIdx = pAction->id.taskIdx;
×
2268
  
2269
  bool isTriggerReader = STREAM_IS_TRIGGER_READER(pAction->flag);
×
2270
  SStreamCalcScan* scanPlan = NULL;
×
2271
  if (!isTriggerReader) {
×
2272
    scanPlan = taosArrayGet(pStatus->pCreate->calcScanPlanList, pAction->id.taskIdx);
×
2273
    if (NULL == scanPlan) {
×
2274
      mstsError("fail to get TASK:%" PRId64 " scanPlan, taskIdx:%d, scanPlanNum:%zu", 
×
2275
          pAction->id.taskId, pAction->id.taskIdx, taosArrayGetSize(pStatus->pCreate->calcScanPlanList));
2276
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2277
    }
2278
  }
2279
  
2280
  TAOS_CHECK_EXIT(msmBuildReaderDeployInfo(&info, pStream, scanPlan ? scanPlan->scanPlan : NULL, pStatus, isTriggerReader));
×
2281
  TAOS_CHECK_EXIT(msmTDAddToVgroupMap(mStreamMgmt.toDeployVgMap, &info, pAction->streamId));
×
2282

2283
_exit:
×
2284

2285
  if (code) {
×
2286
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2287
  }
2288

2289
  return code;
×
2290
}
2291

2292
/*
2293
static int32_t msmReLaunchTriggerTask(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
2294
  int32_t code = TSDB_CODE_SUCCESS;
2295
  int32_t lino = 0;
2296
  int64_t streamId = pAction->streamId;
2297
  SStmTaskStatus** ppTask = taosHashGet(mStreamMgmt.taskMap, &pAction->streamId, sizeof(pAction->streamId) + sizeof(pAction->id.taskId));
2298
  if (NULL == ppTask) {
2299
    mstsError("TASK:%" PRId64 " not in taskMap, remain:%d", pAction->id.taskId, taosHashGetSize(mStreamMgmt.taskMap));
2300
    TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
2301
  }
2302
  
2303
  (*ppTask)->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2304
  if (!GOT_SNODE((*ppTask)->id.nodeId)) {
2305
    mstsError("no avaible snode for deploying trigger task, seriousId: %" PRId64, (*ppTask)->id.seriousId);
2306
    return TSDB_CODE_SUCCESS;
2307
  }
2308
  
2309
  SStmTaskDeploy info = {0};
2310
  info.task.type = pAction->type;
2311
  info.task.streamId = streamId;
2312
  info.task.taskId = pAction->id.taskId;
2313
  info.task.seriousId = (*ppTask)->id.seriousId;
2314
  info.task.nodeId = (*ppTask)->id.nodeId;
2315
  info.task.taskIdx = pAction->id.taskIdx;
2316
  
2317
  TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2318
  TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2319
  TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, *ppTask, 1, -1));
2320
  
2321
  atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2322

2323
_exit:
2324

2325
  if (code) {
2326
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
2327
  }
2328

2329
  return code;
2330
}
2331
*/
2332

2333
static int32_t msmReLaunchRunnerDeploy(SStmGrpCtx* pCtx, SStreamObj* pStream, SStmTaskAction* pAction, SStmStatus* pStatus) {
×
2334
  int32_t code = TSDB_CODE_SUCCESS;
×
2335
  int32_t lino = 0;
×
2336
  int64_t streamId = pAction->streamId;
×
2337
  
2338
/*
2339
  if (pAction->triggerStatus) {
2340
    pCtx->triggerTaskId = pAction->triggerStatus->id.taskId;
2341
    pAction->triggerStatus->id.nodeId = msmAssignTaskSnodeId(pCtx->pMnode, pStream, true);
2342
    if (!GOT_SNODE(pAction->triggerStatus->id.nodeId)) {
2343
      mstsError("no avaible snode for deploying trigger task, seriousId:%" PRId64, pAction->triggerStatus->id.seriousId);
2344
      return TSDB_CODE_SUCCESS;
2345
    }
2346
  
2347
    pCtx->triggerNodeId = pAction->triggerStatus->id.nodeId;
2348
  } else {
2349
*/
2350
  pCtx->triggerTaskId = pStatus->triggerTask->id.taskId;
×
2351
  pCtx->triggerNodeId = pStatus->triggerTask->id.nodeId;
×
2352
//  }
2353
  
2354
  TAOS_CHECK_EXIT(msmUPPrepareReaderTasks(pCtx, pStatus, pStream));
×
2355
  
2356
  SQueryPlan* pPlan = NULL;
×
2357
  TAOS_CHECK_EXIT(nodesStringToNode(pStream->pCreate->calcPlan, (SNode**)&pPlan));
×
2358
  
2359
  TAOS_CHECK_EXIT(msmReBuildRunnerTasks(pCtx, pPlan, pStatus, pStream, pAction));
×
2360
  
2361
  taosHashClear(mStreamMgmt.toUpdateScanMap);
×
2362
  mStreamMgmt.toUpdateScanNum = 0;
×
2363
  
2364
/*
2365
  if (pAction->triggerStatus) {
2366
    SStmTaskDeploy info = {0};
2367
    info.task.type = STREAM_TRIGGER_TASK;
2368
    info.task.streamId = streamId;
2369
    info.task.taskId = pCtx->triggerTaskId;
2370
    info.task.seriousId = pAction->triggerStatus->id.seriousId;
2371
    info.task.nodeId = pCtx->triggerNodeId;
2372
    info.task.taskIdx = 0;
2373
  
2374
    TAOS_CHECK_EXIT(msmBuildTriggerDeployInfo(pCtx->pMnode, pStatus, &info, pStream));
2375
    TAOS_CHECK_EXIT(msmTDAddTriggerToSnodeMap(&info, pStream));
2376
    TAOS_CHECK_EXIT(msmSTAddToSnodeMap(pCtx, streamId, NULL, pAction->triggerStatus, 1, -1));
2377
    
2378
    atomic_add_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
2379
  }
2380
*/
2381

2382
_exit:
×
2383

2384
  if (code) {
×
2385
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2386
  }
2387

2388
  return code;
×
2389
}
2390

2391

2392
static int32_t msmLaunchTaskDeployAction(SStmGrpCtx* pCtx, SStmTaskAction* pAction) {
×
2393
  int32_t code = TSDB_CODE_SUCCESS;
×
2394
  int32_t lino = 0;
×
2395
  int64_t streamId = pAction->streamId;
×
2396
  int64_t taskId = pAction->id.taskId;
×
2397
  SStreamObj* pStream = NULL;
×
2398

2399
  mstsDebug("start to handle stream tasks action, action task type:%s", gStreamTaskTypeStr[pAction->type]);
×
2400

2401
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &pAction->streamId, sizeof(pAction->streamId));
×
2402
  if (NULL == pStatus) {
×
2403
    mstsWarn("stream not in streamMap, remain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2404
    return TSDB_CODE_SUCCESS;
×
2405
  }
2406

2407
  int8_t stopped = atomic_load_8(&pStatus->stopped);
×
2408
  if (stopped) {
×
2409
    mstsWarn("stream %s is already stopped %d, ignore task deploy", pStatus->streamName, stopped);
×
2410
    return TSDB_CODE_SUCCESS;
×
2411
  }
2412

2413
  code = mndAcquireStream(pCtx->pMnode, pStatus->streamName, &pStream);
×
2414
  if (TSDB_CODE_MND_STREAM_NOT_EXIST == code) {
×
2415
    mstsWarn("stream %s no longer exists, ignore task deploy", pStatus->streamName);
×
2416
    return TSDB_CODE_SUCCESS;
×
2417
  }
2418

2419
  TAOS_CHECK_EXIT(code);
×
2420

2421
  int8_t userStopped = atomic_load_8(&pStream->userStopped);
×
2422
  int8_t userDropped = atomic_load_8(&pStream->userDropped);
×
2423
  if (userStopped || userDropped) {
×
2424
    mstsWarn("stream %s is stopped %d or removing %d, ignore task deploy", pStatus->streamName, userStopped, userDropped);
×
2425
    goto _exit;
×
2426
  }
2427

2428
  switch (pAction->type) {
×
2429
    case STREAM_READER_TASK:
×
2430
      TAOS_CHECK_EXIT(msmReLaunchReaderTask(pStream, pAction, pStatus));
×
2431
      break;
×
2432
/*
2433
    case STREAM_TRIGGER_TASK:
2434
      TAOS_CHECK_EXIT(msmReLaunchTriggerTask(pCtx, pStream, pAction, pStatus));
2435
      break;
2436
*/
2437
    case STREAM_RUNNER_TASK:
×
2438
      if (pAction->multiRunner) {
×
2439
        TAOS_CHECK_EXIT(msmReLaunchRunnerDeploy(pCtx, pStream, pAction, pStatus));
×
2440
      } else {
2441
        mstsError("runner TASK:%" PRId64 " requires relaunch", pAction->id.taskId);
×
2442
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
2443
      }
2444
      break;
×
2445
    default:
×
2446
      mstsError("TASK:%" PRId64 " invalid task type:%d", pAction->id.taskId, pAction->type);
×
2447
      TAOS_CHECK_EXIT(TSDB_CODE_STREAM_INTERNAL_ERROR);
×
2448
      break;
×
2449
  }
2450

2451
_exit:
×
2452

2453
  if (pStream) {
×
2454
    mndReleaseStream(pCtx->pMnode, pStream);
×
2455
  }
2456

2457
  if (code) {
×
2458
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
2459
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2460
  }
2461

2462
  return code;
×
2463
}
2464

2465
static int32_t msmTDRemoveStream(int64_t streamId) {
×
2466
  void* pIter = NULL;
×
2467
  
2468
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
×
2469
    while ((pIter = taosHashIterate(mStreamMgmt.toDeployVgMap, pIter))) {
×
2470
      SStmVgTasksToDeploy* pVg = (SStmVgTasksToDeploy*)pIter;
×
2471
      int32_t taskNum = taosArrayGetSize(pVg->taskList);
×
2472
      if (atomic_load_32(&pVg->deployed) == taskNum) {
×
2473
        continue;
×
2474
      }
2475
      
2476
      for (int32_t i = 0; i < taskNum; ++i) {
×
2477
        SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, i);
×
2478
        if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2479
          pExt->deployed = true;
×
2480
        }
2481
      }
2482
    }
2483
  }
2484

2485
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
×
2486
    while ((pIter = taosHashIterate(mStreamMgmt.toDeploySnodeMap, pIter))) {
×
2487
      SStmSnodeTasksDeploy* pSnode = (SStmSnodeTasksDeploy*)pIter;
×
2488
      int32_t taskNum = taosArrayGetSize(pSnode->triggerList);
×
2489
      if (atomic_load_32(&pSnode->triggerDeployed) != taskNum) {
×
2490
        for (int32_t i = 0; i < taskNum; ++i) {
×
2491
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, i);
×
2492
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2493
            pExt->deployed = true;
×
2494
          }
2495
        }
2496
      }
2497

2498
      taskNum = taosArrayGetSize(pSnode->runnerList);
×
2499
      if (atomic_load_32(&pSnode->runnerDeployed) != taskNum) {
×
2500
        for (int32_t i = 0; i < taskNum; ++i) {
×
2501
          SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, i);
×
2502
          if (pExt->deploy.task.streamId == streamId && !pExt->deployed) {
×
2503
            pExt->deployed = true;
×
2504
          }
2505
        }
2506
      }
2507
    }
2508
  }
2509

2510
  return TSDB_CODE_SUCCESS;
×
2511
}
2512

2513
static int32_t msmRemoveStreamFromMaps(SMnode* pMnode, int64_t streamId) {
×
2514
  int32_t code = TSDB_CODE_SUCCESS;
×
2515
  int32_t lino = 0;
×
2516

2517
  mstsInfo("start to remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2518

2519
  TAOS_CHECK_EXIT(msmSTRemoveStream(streamId, true));
×
2520

2521
_exit:
×
2522

2523
  if (code) {
×
2524
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2525
  } else {
2526
    mstsInfo("end remove stream from maps, current stream num:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2527
  }
2528

2529
  return code;
×
2530
}
2531

2532
int32_t msmUndeployStream(SMnode* pMnode, int64_t streamId, char* streamName) {
×
2533
  int32_t code = TSDB_CODE_SUCCESS;
×
2534
  int32_t lino = 0;
×
2535

2536
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
×
2537
  if (0 == active || MND_STM_STATE_NORMAL != state) {
×
2538
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2539
    return TSDB_CODE_MND_STREAM_NOT_AVAILABLE;
×
2540
  }
2541

2542
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
2543
  if (NULL == pStream) {
×
2544
    mstsInfo("stream %s already not in streamMap", streamName);
×
2545
    goto _exit;
×
2546
  }
2547

2548
  atomic_store_8(&pStream->stopped, 2);
×
2549

2550
  mstsInfo("stream %s stopped by user", streamName);
×
2551

2552
_exit:
×
2553

2554
  taosHashRelease(mStreamMgmt.streamMap, pStream);
×
2555

2556
  if (code) {
×
2557
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2558
  }
2559

2560
  return code;
×
2561
}
2562

2563
int32_t msmRecalcStream(SMnode* pMnode, int64_t streamId, STimeWindow* timeRange) {
×
2564
  int32_t code = TSDB_CODE_SUCCESS;
×
2565
  int32_t lino = 0;
×
2566

2567
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
×
2568
  if (0 == active || MND_STM_STATE_NORMAL != state) {
×
2569
    mstsError("stream mgmt not available since active:%d state:%d", active, state);
×
2570
    return TSDB_CODE_MND_STREAM_NOT_AVAILABLE;
×
2571
  }
2572

2573
  SStmStatus* pStream = (SStmStatus*)taosHashAcquire(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
2574
  if (NULL == pStream || !STREAM_IS_RUNNING(pStream)) {
×
2575
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
2576
    mstsInfo("stream still not in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2577
    goto _exit;
×
2578
  }
2579

2580
  TAOS_CHECK_EXIT(mstAppendNewRecalcRange(streamId, pStream, timeRange));
×
2581

2582
_exit:
×
2583

2584
  taosHashRelease(mStreamMgmt.streamMap, pStream);
×
2585

2586
  if (code) {
×
2587
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2588
  }
2589

2590
  return code;
×
2591
}
2592

2593
static int32_t msmHandleStreamActions(SStmGrpCtx* pCtx) {
×
2594
  int32_t code = TSDB_CODE_SUCCESS;
×
2595
  int32_t lino = 0;
×
2596
  SStmQNode* pQNode = NULL;
×
2597

2598
  while (mndStreamActionDequeue(mStreamMgmt.actionQ, &pQNode)) {
×
2599
    switch (pQNode->type) {
×
2600
      case STREAM_ACT_DEPLOY:
×
2601
        if (pQNode->streamAct) {
×
2602
          mstDebug("start to handle stream deploy action");
×
2603
          msmLaunchStreamDeployAction(pCtx, &pQNode->action.stream);
×
2604
        } else {
2605
          mstDebug("start to handle task deploy action");
×
2606
          msmLaunchTaskDeployAction(pCtx, &pQNode->action.task);
×
2607
        }
2608
        break;
×
2609
      default:
×
2610
        break;
×
2611
    }
2612
  }
2613

2614
_exit:
×
2615

2616
  if (code) {
×
2617
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2618
  }
2619

2620
  return code;
×
2621
}
2622

2623
int32_t msmHandleGrantExpired(SMnode *pMnode) {
×
2624
  //STREAMTODO
2625
  return TSDB_CODE_SUCCESS;
×
2626
}
2627

2628
static int32_t msmInitStreamDeploy(SStmStreamDeploy* pStream, SStmTaskDeploy* pDeploy) {
×
2629
  int32_t code = TSDB_CODE_SUCCESS;
×
2630
  int32_t lino = 0;
×
2631
  int64_t streamId = pDeploy->task.streamId;
×
2632
  
2633
  switch (pDeploy->task.type) {
×
2634
    case STREAM_READER_TASK:
×
2635
      if (NULL == pStream->readerTasks) {
×
2636
        pStream->streamId = streamId;
×
2637
        pStream->readerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
×
2638
        TSDB_CHECK_NULL(pStream->readerTasks, code, lino, _exit, terrno);
×
2639
      }
2640
      
2641
      TSDB_CHECK_NULL(taosArrayPush(pStream->readerTasks, pDeploy), code, lino, _exit, terrno);
×
2642
      break;
×
2643
    case STREAM_TRIGGER_TASK:
×
2644
      pStream->streamId = streamId;
×
2645
      pStream->triggerTask = taosMemoryMalloc(sizeof(SStmTaskDeploy));
×
2646
      TSDB_CHECK_NULL(pStream->triggerTask, code, lino, _exit, terrno);
×
2647
      memcpy(pStream->triggerTask, pDeploy, sizeof(SStmTaskDeploy));
×
2648
      break;
×
2649
    case STREAM_RUNNER_TASK:
×
2650
      if (NULL == pStream->runnerTasks) {
×
2651
        pStream->streamId = streamId;
×
2652
        pStream->runnerTasks = taosArrayInit(20, sizeof(SStmTaskDeploy));
×
2653
        TSDB_CHECK_NULL(pStream->runnerTasks, code, lino, _exit, terrno);
×
2654
      }      
2655
      TSDB_CHECK_NULL(taosArrayPush(pStream->runnerTasks, pDeploy), code, lino, _exit, terrno);
×
2656
      break;
×
2657
    default:
×
2658
      break;
×
2659
  }
2660

2661
_exit:
×
2662

2663
  if (code) {
×
2664
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2665
  }
2666

2667
  return code;
×
2668
}
2669

2670
static int32_t msmGrpAddDeployTask(SHashObj* pHash, SStmTaskDeploy* pDeploy) {
×
2671
  int32_t code = TSDB_CODE_SUCCESS;
×
2672
  int32_t lino = 0;
×
2673
  int64_t streamId = pDeploy->task.streamId;
×
2674
  SStreamTask* pTask = &pDeploy->task;
×
2675
  SStmStreamDeploy streamDeploy = {0};
×
2676
  SStmStreamDeploy* pStream = NULL;
×
2677
   
2678
  while (true) {
2679
    pStream = taosHashAcquire(pHash, &streamId, sizeof(streamId));
×
2680
    if (NULL == pStream) {
×
2681
      TAOS_CHECK_EXIT(msmInitStreamDeploy(&streamDeploy, pDeploy));
×
2682
      code = taosHashPut(pHash, &streamId, sizeof(streamId), &streamDeploy, sizeof(streamDeploy));
×
2683
      if (TSDB_CODE_SUCCESS == code) {
×
2684
        goto _exit;
×
2685
      }
2686

2687
      if (TSDB_CODE_DUP_KEY != code) {
×
2688
        goto _exit;
×
2689
      }    
2690

2691
      tFreeSStmStreamDeploy(&streamDeploy);
×
2692
      continue;
×
2693
    }
2694

2695
    TAOS_CHECK_EXIT(msmInitStreamDeploy(pStream, pDeploy));
×
2696
    
2697
    break;
×
2698
  }
2699
  
2700
_exit:
×
2701

2702
  taosHashRelease(pHash, pStream);
×
2703

2704
  if (code) {
×
2705
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2706
  } else {
2707
    msttDebug("task added to GRP deployMap, taskIdx:%d", pTask->taskIdx);
×
2708
  }
2709

2710
  return code;
×
2711
}
2712

2713

2714
int32_t msmGrpAddDeployTasks(SHashObj* pHash, SArray* pTasks, int32_t* deployed) {
×
2715
  int32_t code = TSDB_CODE_SUCCESS;
×
2716
  int32_t lino = 0;
×
2717
  int32_t taskNum = taosArrayGetSize(pTasks);
×
2718

2719
  for (int32_t i = 0; i < taskNum; ++i) {
×
2720
    SStmTaskToDeployExt* pExt = taosArrayGet(pTasks, i);
×
2721
    if (pExt->deployed) {
×
2722
      continue;
×
2723
    }
2724

2725
    TAOS_CHECK_EXIT(msmGrpAddDeployTask(pHash, &pExt->deploy));
×
2726
    pExt->deployed = true;
×
2727

2728
    atomic_add_fetch_32(deployed, 1);
×
2729
  }
2730

2731
_exit:
×
2732

2733
  if (code) {
×
2734
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2735
  }
2736

2737
  return code;
×
2738
}
2739

2740
int32_t msmGrpAddDeployVgTasks(SStmGrpCtx* pCtx) {
×
2741
  int32_t code = TSDB_CODE_SUCCESS;
×
2742
  int32_t lino = 0;
×
2743
  int32_t vgNum = taosArrayGetSize(pCtx->pReq->pVgLeaders);
×
2744
  SStmVgTasksToDeploy* pVg = NULL;
×
2745
  //int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pCtx->pReq->streamGId);
2746

2747
  mstDebug("start to add stream vgroup tasks deploy");
×
2748
  
2749
  for (int32_t i = 0; i < vgNum; ++i) {
×
2750
    int32_t* vgId = taosArrayGet(pCtx->pReq->pVgLeaders, i);
×
2751

2752
    msmUpdateVgroupUpTs(pCtx, *vgId);
×
2753

2754
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
×
2755
    if (NULL == pVg) {
×
2756
      continue;
×
2757
    }
2758

2759
    if (taosRTryLockLatch(&pVg->lock)) {
×
2760
      continue;
×
2761
    }
2762
    
2763
    if (atomic_load_32(&pVg->deployed) == taosArrayGetSize(pVg->taskList)) {
×
2764
      taosRUnLockLatch(&pVg->lock);
×
2765
      continue;
×
2766
    }
2767
    
2768
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pVg->taskList, &pVg->deployed));
×
2769
    taosRUnLockLatch(&pVg->lock);
×
2770
  }
2771

2772
_exit:
×
2773

2774
  if (code) {
×
2775
    if (pVg) {
×
2776
      taosRUnLockLatch(&pVg->lock);
×
2777
    }
2778

2779
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2780
  }
2781

2782
  return code;
×
2783
}
2784

2785
int32_t msmGrpAddDeploySnodeTasks(SStmGrpCtx* pCtx) {
×
2786
  int32_t code = TSDB_CODE_SUCCESS;
×
2787
  int32_t lino = 0;
×
2788
  SStmSnodeTasksDeploy* pSnode = NULL;
×
2789
  SStreamHbMsg* pReq = pCtx->pReq;
×
2790

2791
  mstDebug("start to add stream snode tasks deploy");
×
2792
  
2793
  pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &pReq->snodeId, sizeof(pReq->snodeId));
×
2794
  if (NULL == pSnode) {
×
2795
    return TSDB_CODE_SUCCESS;
×
2796
  }
2797

2798
  mstWaitLock(&pSnode->lock, false);
×
2799
  
2800
  if (atomic_load_32(&pSnode->triggerDeployed) < taosArrayGetSize(pSnode->triggerList)) {
×
2801
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->triggerList, &pSnode->triggerDeployed));
×
2802
  }
2803

2804
  if (atomic_load_32(&pSnode->runnerDeployed) < taosArrayGetSize(pSnode->runnerList)) {
×
2805
    TAOS_CHECK_EXIT(msmGrpAddDeployTasks(pCtx->deployStm, pSnode->runnerList, &pSnode->runnerDeployed));
×
2806
  }
2807
  
2808
  taosWUnLockLatch(&pSnode->lock);
×
2809

2810
_exit:
×
2811

2812
  if (code) {
×
2813
    if (pSnode) {
×
2814
      taosWUnLockLatch(&pSnode->lock);
×
2815
    }
2816

2817
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2818
  }
2819

2820
  return code;
×
2821
}
2822

2823
int32_t msmUpdateStreamLastActTs(int64_t streamId, int64_t currTs) {
×
2824
  int32_t code = TSDB_CODE_SUCCESS;
×
2825
  int32_t lino = 0;
×
2826
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
2827
  if (NULL == pStatus) {
×
2828
    mstsWarn("stream already not exists in streamMap, mapSize:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
2829
    return TSDB_CODE_SUCCESS;
×
2830
  }
2831
  
2832
  pStatus->lastActionTs = currTs;
×
2833

2834
_exit:
×
2835

2836
  if (code) {
×
2837
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2838
  }
2839

2840
  return code;
×
2841
}
2842

2843
int32_t msmRspAddStreamsDeploy(SStmGrpCtx* pCtx) {
×
2844
  int32_t code = TSDB_CODE_SUCCESS;
×
2845
  int32_t lino = 0;
×
2846
  int32_t streamNum = taosHashGetSize(pCtx->deployStm);
×
2847
  void* pIter = NULL;
×
2848

2849
  mstDebug("start to add group %d deploy streams, streamNum:%d", pCtx->pReq->streamGId, taosHashGetSize(pCtx->deployStm));
×
2850
  
2851
  pCtx->pRsp->deploy.streamList = taosArrayInit(streamNum, sizeof(SStmStreamDeploy));
×
2852
  TSDB_CHECK_NULL(pCtx->pRsp->deploy.streamList, code, lino, _exit, terrno);
×
2853

2854
  while (1) {
×
2855
    pIter = taosHashIterate(pCtx->deployStm, pIter);
×
2856
    if (pIter == NULL) {
×
2857
      break;
×
2858
    }
2859
    
2860
    SStmStreamDeploy *pDeploy = (SStmStreamDeploy *)pIter;
×
2861
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->deploy.streamList, pDeploy), code, lino, _exit, terrno);
×
2862

2863
    int64_t streamId = pDeploy->streamId;
×
2864
    mstsDebug("stream DEPLOY added to dnode %d hb rsp, readerTasks:%zu, triggerTask:%d, runnerTasks:%zu", 
×
2865
        pCtx->pReq->dnodeId, taosArrayGetSize(pDeploy->readerTasks), pDeploy->triggerTask ? 1 : 0, taosArrayGetSize(pDeploy->runnerTasks));
2866

2867
    mstClearSStmStreamDeploy(pDeploy);
×
2868
    
2869
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(pDeploy->streamId, pCtx->currTs));
×
2870
  }
2871
  
2872
_exit:
×
2873

2874
  if (pIter) {
×
2875
    taosHashCancelIterate(pCtx->deployStm, pIter);
×
2876
  }
2877

2878
  if (code) {
×
2879
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2880
  }
2881
  
2882
  return code;
×
2883
}
2884

2885
void msmCleanDeployedVgTasks(SArray* pVgLeaders) {
×
2886
  int32_t code = TSDB_CODE_SUCCESS;
×
2887
  int32_t lino = 0;
×
2888
  int32_t vgNum = taosArrayGetSize(pVgLeaders);
×
2889
  SStmVgTasksToDeploy* pVg = NULL;
×
2890
  
2891
  for (int32_t i = 0; i < vgNum; ++i) {
×
2892
    int32_t* vgId = taosArrayGet(pVgLeaders, i);
×
2893
    pVg = taosHashAcquire(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
×
2894
    if (NULL == pVg) {
×
2895
      continue;
×
2896
    }
2897

2898
    if (taosWTryLockLatch(&pVg->lock)) {
×
2899
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2900
      continue;
×
2901
    }
2902
    
2903
    if (atomic_load_32(&pVg->deployed) <= 0) {
×
2904
      taosWUnLockLatch(&pVg->lock);
×
2905
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2906
      continue;
×
2907
    }
2908

2909
    int32_t taskNum = taosArrayGetSize(pVg->taskList);
×
2910
    if (atomic_load_32(&pVg->deployed) == taskNum) {
×
2911
      atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, taskNum);
×
2912
      taosArrayDestroyEx(pVg->taskList, mstDestroySStmTaskToDeployExt);
×
2913
      pVg->taskList = NULL;
×
2914
      taosHashRemove(mStreamMgmt.toDeployVgMap, vgId, sizeof(*vgId));
×
2915
      taosWUnLockLatch(&pVg->lock);
×
2916
      taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2917
      continue;
×
2918
    }
2919

2920
    for (int32_t m = taskNum - 1; m >= 0; --m) {
×
2921
      SStmTaskToDeployExt* pExt = taosArrayGet(pVg->taskList, m);
×
2922
      if (!pExt->deployed) {
×
2923
        continue;
×
2924
      }
2925

2926
      mstDestroySStmTaskToDeployExt(pExt);
×
2927

2928
      taosArrayRemove(pVg->taskList, m);
×
2929
      atomic_sub_fetch_32(&mStreamMgmt.toDeployVgTaskNum, 1);
×
2930
    }
2931
    atomic_store_32(&pVg->deployed, 0);
×
2932
    taosWUnLockLatch(&pVg->lock);
×
2933
    taosHashRelease(mStreamMgmt.toDeployVgMap, pVg);
×
2934
  }
2935

2936
_exit:
×
2937

2938
  if (code) {
×
2939
    if (pVg) {
×
2940
      taosWUnLockLatch(&pVg->lock);
×
2941
    }
2942

2943
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
2944
  }
2945
}
×
2946

2947
void msmCleanDeployedSnodeTasks (int32_t snodeId) {
×
2948
  if (!GOT_SNODE(snodeId)) {
×
2949
    return;
×
2950
  }
2951
  
2952
  int32_t code = TSDB_CODE_SUCCESS;
×
2953
  SStmSnodeTasksDeploy* pSnode = taosHashAcquire(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId));
×
2954
  if (NULL == pSnode) {
×
2955
    return;
×
2956
  }
2957

2958
  if (taosWTryLockLatch(&pSnode->lock)) {
×
2959
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
2960
    return;
×
2961
  }
2962

2963
  int32_t triggerNum = taosArrayGetSize(pSnode->triggerList);
×
2964
  int32_t runnerNum = taosArrayGetSize(pSnode->runnerList);
×
2965
  
2966
  if (atomic_load_32(&pSnode->triggerDeployed) <= 0 && atomic_load_32(&pSnode->runnerDeployed) <= 0) {
×
2967
    taosWUnLockLatch(&pSnode->lock);
×
2968
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
2969
    return;
×
2970
  }
2971

2972
  if (atomic_load_32(&pSnode->triggerDeployed) == triggerNum) {
×
2973
    atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, triggerNum);
×
2974
    taosArrayDestroyEx(pSnode->triggerList, mstDestroySStmTaskToDeployExt);
×
2975
    pSnode->triggerList = NULL;
×
2976
  }
2977

2978
  if (atomic_load_32(&pSnode->runnerDeployed) == runnerNum) {
×
2979
    atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, runnerNum);
×
2980
    taosArrayDestroyEx(pSnode->runnerList, mstDestroySStmTaskToDeployExt);
×
2981
    pSnode->runnerList = NULL;
×
2982
  }
2983

2984
  if (NULL == pSnode->triggerList && NULL == pSnode->runnerList) {
×
2985
    taosHashRemove(mStreamMgmt.toDeploySnodeMap, &snodeId, sizeof(snodeId));
×
2986
    taosWUnLockLatch(&pSnode->lock);
×
2987
    taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
2988
    return;
×
2989
  }
2990

2991
  if (atomic_load_32(&pSnode->triggerDeployed) > 0 && pSnode->triggerList) {
×
2992
    for (int32_t m = triggerNum - 1; m >= 0; --m) {
×
2993
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->triggerList, m);
×
2994
      if (!pExt->deployed) {
×
2995
        continue;
×
2996
      }
2997

2998
      mstDestroySStmTaskToDeployExt(pExt);
×
2999
      atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
×
3000
      taosArrayRemove(pSnode->triggerList, m);
×
3001
    }
3002
    
3003
    pSnode->triggerDeployed = 0;
×
3004
  }
3005

3006
  if (atomic_load_32(&pSnode->runnerDeployed) > 0 && pSnode->runnerList) {
×
3007
    for (int32_t m = runnerNum - 1; m >= 0; --m) {
×
3008
      SStmTaskToDeployExt* pExt = taosArrayGet(pSnode->runnerList, m);
×
3009
      if (!pExt->deployed) {
×
3010
        continue;
×
3011
      }
3012

3013
      mstDestroySStmTaskToDeployExt(pExt);
×
3014
      atomic_sub_fetch_32(&mStreamMgmt.toDeploySnodeTaskNum, 1);
×
3015
      taosArrayRemove(pSnode->runnerList, m);
×
3016
    }
3017
    
3018
    pSnode->runnerDeployed = 0;
×
3019
  }
3020
  
3021
  taosWUnLockLatch(&pSnode->lock);
×
3022
  taosHashRelease(mStreamMgmt.toDeploySnodeMap, pSnode);
×
3023
}
3024

3025
void msmClearStreamToDeployMaps(SStreamHbMsg* pHb) {
21,908✔
3026
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
21,908!
3027
    return;
×
3028
  }
3029

3030
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
21,908!
3031
    msmCleanDeployedVgTasks(pHb->pVgLeaders);
×
3032
  }
3033

3034
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0) {
21,908!
3035
    msmCleanDeployedSnodeTasks(pHb->snodeId);
×
3036
  }
3037
}
3038

3039
void msmCleanStreamGrpCtx(SStreamHbMsg* pHb) {
21,908✔
3040
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
21,908!
3041
    return;
×
3042
  }
3043

3044
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
21,908✔
3045
  taosHashClear(mStreamMgmt.tCtx[tidx].actionStm[pHb->streamGId]);
21,908✔
3046
  taosHashClear(mStreamMgmt.tCtx[tidx].deployStm[pHb->streamGId]);
21,908✔
3047
}
3048

3049
int32_t msmGrpAddActionStart(SHashObj* pHash, int64_t streamId, SStmTaskId* pId) {
×
3050
  int32_t code = TSDB_CODE_SUCCESS;
×
3051
  int32_t lino = 0;
×
3052
  int32_t action = STREAM_ACT_START;
×
3053
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
×
3054
  if (pAction) {
×
3055
    pAction->actions |= action;
×
3056
    pAction->start.triggerId = *pId;
×
3057
    mstsDebug("stream append START action, actions:%x", pAction->actions);
×
3058
  } else {
3059
    SStmAction newAction = {0};
×
3060
    newAction.actions = action;
×
3061
    newAction.start.triggerId = *pId;
×
3062
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3063
    mstsDebug("stream add START action, actions:%x", newAction.actions);
×
3064
  }
3065

3066
_exit:
×
3067

3068
  if (code) {
×
3069
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3070
  }
3071

3072
  return code;
×
3073
}
3074

3075
int32_t msmGrpAddActionUpdateTrigger(SHashObj* pHash, int64_t streamId) {
×
3076
  int32_t code = TSDB_CODE_SUCCESS;
×
3077
  int32_t lino = 0;
×
3078
  int32_t action = STREAM_ACT_UPDATE_TRIGGER;
×
3079
  
3080
  SStmAction *pAction = taosHashGet(pHash, &streamId, sizeof(streamId));
×
3081
  if (pAction) {
×
3082
    pAction->actions |= action;
×
3083
    mstsDebug("stream append UPDATE_TRIGGER action, actions:%x", pAction->actions);
×
3084
  } else {
3085
    SStmAction newAction = {0};
×
3086
    newAction.actions = action;
×
3087
    TAOS_CHECK_EXIT(taosHashPut(pHash, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3088
    mstsDebug("stream add UPDATE_TRIGGER action, actions:%x", newAction.actions);
×
3089
  }
3090

3091
_exit:
×
3092

3093
  if (code) {
×
3094
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3095
  }
3096

3097
  return code;
×
3098
}
3099

3100

3101

3102
int32_t msmGrpAddActionUndeploy(SStmGrpCtx* pCtx, int64_t streamId, SStreamTask* pTask) {
×
3103
  int32_t code = TSDB_CODE_SUCCESS;
×
3104
  int32_t lino = 0;
×
3105
  int32_t action = STREAM_ACT_UNDEPLOY;
×
3106
  bool    dropped = false;
×
3107

3108
  TAOS_CHECK_EXIT(mstIsStreamDropped(pCtx->pMnode, streamId, &dropped));
×
3109
  mstsDebug("stream dropped: %d", dropped);
×
3110
  
3111
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
×
3112
  if (pAction) {
×
3113
    pAction->actions |= action;
×
3114
    if (NULL == pAction->undeploy.taskList) {
×
3115
      pAction->undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
×
3116
      TSDB_CHECK_NULL(pAction->undeploy.taskList, code, lino, _exit, terrno);
×
3117
    }
3118

3119
    TSDB_CHECK_NULL(taosArrayPush(pAction->undeploy.taskList, &pTask), code, lino, _exit, terrno);
×
3120
    if (pAction->undeploy.doCheckpoint) {
×
3121
      pAction->undeploy.doCheckpoint = dropped ? false : true;
×
3122
    }
3123
    if (!pAction->undeploy.doCleanup) {
×
3124
      pAction->undeploy.doCleanup = dropped ? true : false;
×
3125
    }
3126
    
3127
    msttDebug("task append UNDEPLOY action[%d,%d], actions:%x", pAction->undeploy.doCheckpoint, pAction->undeploy.doCleanup, pAction->actions);
×
3128
  } else {
3129
    SStmAction newAction = {0};
×
3130
    newAction.actions = action;
×
3131
    newAction.undeploy.doCheckpoint = dropped ? false : true;
×
3132
    newAction.undeploy.doCleanup = dropped ? true : false;
×
3133
    newAction.undeploy.taskList = taosArrayInit(pCtx->taskNum, POINTER_BYTES);
×
3134
    TSDB_CHECK_NULL(newAction.undeploy.taskList, code, lino, _exit, terrno);
×
3135
    TSDB_CHECK_NULL(taosArrayPush(newAction.undeploy.taskList, &pTask), code, lino, _exit, terrno);
×
3136
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3137
    
3138
    msttDebug("task add UNDEPLOY action[%d,%d]", newAction.undeploy.doCheckpoint, newAction.undeploy.doCleanup);
×
3139
  }
3140

3141
_exit:
×
3142

3143
  if (code) {
×
3144
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3145
  }
3146

3147
  return code;
×
3148
}
3149

3150
int32_t msmGrpAddActionRecalc(SStmGrpCtx* pCtx, int64_t streamId, SArray* recalcList) {
×
3151
  int32_t code = TSDB_CODE_SUCCESS;
×
3152
  int32_t lino = 0;
×
3153
  int32_t action = STREAM_ACT_RECALC;
×
3154
  
3155
  SStmAction *pAction = taosHashGet(pCtx->actionStm, &streamId, sizeof(streamId));
×
3156
  if (pAction) {
×
3157
    pAction->actions |= action;
×
3158
    pAction->recalc.recalcList = recalcList;
×
3159

3160
    mstsDebug("stream append recalc action, listSize:%d, actions:%x", (int32_t)taosArrayGetSize(recalcList), pAction->actions);
×
3161
  } else {
3162
    SStmAction newAction = {0};
×
3163
    newAction.actions = action;
×
3164
    newAction.recalc.recalcList = recalcList;
×
3165
    
3166
    TAOS_CHECK_EXIT(taosHashPut(pCtx->actionStm, &streamId, sizeof(streamId), &newAction, sizeof(newAction)));
×
3167
    
3168
    mstsDebug("stream add recalc action, listSize:%d", (int32_t)taosArrayGetSize(recalcList));
×
3169
  }
3170

3171
_exit:
×
3172

3173
  if (code) {
×
3174
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3175
  }
3176

3177
  return code;
×
3178
}
3179

3180
bool msmCheckStreamStartCond(int64_t streamId, int32_t snodeId) {
×
3181
  SStmStatus* pStream = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3182
  if (NULL == pStream) {
×
3183
    return false;
×
3184
  }
3185

3186
  if (pStream->triggerTask->id.nodeId != snodeId || STREAM_STATUS_INIT != pStream->triggerTask->status) {
×
3187
    return false;
×
3188
  }
3189

3190
  int32_t readerNum = taosArrayGetSize(pStream->trigReaders);
×
3191
  for (int32_t i = 0; i < readerNum; ++i) {
×
3192
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigReaders, i);
×
3193
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
×
3194
      return false;
×
3195
    }
3196
  }
3197

3198
  readerNum = taosArrayGetSize(pStream->trigOReaders);
×
3199
  for (int32_t i = 0; i < readerNum; ++i) {
×
3200
    SStmTaskStatus* pStatus = taosArrayGet(pStream->trigOReaders, i);
×
3201
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
×
3202
      return false;
×
3203
    }
3204
  }
3205

3206
  readerNum = taosArrayGetSize(pStream->calcReaders);
×
3207
  for (int32_t i = 0; i < readerNum; ++i) {
×
3208
    SStmTaskStatus* pStatus = taosArrayGet(pStream->calcReaders, i);
×
3209
    if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
×
3210
      return false;
×
3211
    }
3212
  }
3213

3214
  for (int32_t i = 0; i < pStream->runnerDeploys; ++i) {
×
3215
    int32_t runnerNum = taosArrayGetSize(pStream->runners[i]);
×
3216
    for (int32_t m = 0; m < runnerNum; ++m) {
×
3217
      SStmTaskStatus* pStatus = taosArrayGet(pStream->runners[i], m);
×
3218
      if (STREAM_STATUS_INIT != pStatus->status && STREAM_STATUS_RUNNING != pStatus->status) {
×
3219
        return false;
×
3220
      }
3221
    }
3222
  }
3223
  
3224
  return true;
×
3225
}
3226

3227

3228
void msmHandleTaskAbnormalStatus(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pMsg, SStmTaskStatus* pTaskStatus) {
×
3229
  int32_t code = TSDB_CODE_SUCCESS;
×
3230
  int32_t lino = 0;
×
3231
  int32_t action = 0;
×
3232
  int64_t streamId = pMsg->streamId;
×
3233
  SStreamTask* pTask = (SStreamTask*)pMsg;
×
3234

3235
  msttDebug("start to handle task abnormal status %d", pTask->status);
×
3236
  
3237
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3238
  if (NULL == pStatus) {
×
3239
    msttInfo("stream no longer exists in streamMap, try to undeploy current task, idx:%d", pMsg->taskIdx);
×
3240
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3241
    return;
×
3242
  }
3243

3244
  if (atomic_load_8(&pStatus->stopped)) {
×
3245
    msttInfo("stream stopped, try to undeploy current task, idx:%d", pMsg->taskIdx);
×
3246
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3247
    return;
×
3248
  }
3249
  
3250
  switch (pMsg->status) {
×
3251
    case STREAM_STATUS_INIT:      
×
3252
      if (STREAM_TRIGGER_TASK != pMsg->type) {
×
3253
        msttTrace("task status is INIT and not trigger task, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
×
3254
        return;
×
3255
      }
3256
      
3257
      if (INT64_MIN == pStatus->lastActionTs) {
×
3258
        msttDebug("task still not deployed, ignore it, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
×
3259
        return;
×
3260
      }
3261
      
3262
      if ((pCtx->currTs - pStatus->lastActionTs) < STREAM_ACT_MIN_DELAY_MSEC) {
×
3263
        msttDebug("task wait not enough between actions, currTs:%" PRId64 ", lastTs:%" PRId64, pCtx->currTs, pStatus->lastActionTs);
×
3264
        return;
×
3265
      }
3266

3267
      if (STREAM_IS_RUNNING(pStatus)) {
×
3268
        msttDebug("stream already running, ignore status: %s", gStreamStatusStr[pTask->status]);
×
3269
      } else if (GOT_SNODE(pCtx->pReq->snodeId) && msmCheckStreamStartCond(streamId, pCtx->pReq->snodeId)) {
×
3270
        TAOS_CHECK_EXIT(msmGrpAddActionStart(pCtx->actionStm, streamId, &pStatus->triggerTask->id));
×
3271
      }
3272
      break;
×
3273
    case STREAM_STATUS_FAILED:
×
3274
      //STREAMTODO ADD ERRCODE HANDLE
3275
      msttInfo("task failed with error:%s, try to undeploy current task, idx:%d", tstrerror(pMsg->errorCode), pMsg->taskIdx);
×
3276
      TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3277
      break;
×
3278
    default:
×
3279
      break;
×
3280
  }
3281

3282
_exit:
×
3283

3284
  if (code) {
×
3285
    msmStopStreamByError(streamId, pStatus, code, pCtx->currTs);
×
3286
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3287
  }
3288
}
3289

3290
void msmHandleStatusUpdateErr(SStmGrpCtx* pCtx, EStmErrType err, SStmTaskStatusMsg* pStatus) {
×
3291
  int32_t code = TSDB_CODE_SUCCESS;
×
3292
  int32_t lino = 0;
×
3293
  SStreamTask* pTask = (SStreamTask*)pStatus;
×
3294
  int64_t streamId = pStatus->streamId;
×
3295

3296
  msttInfo("start to handle task status update error: %d", err);
×
3297
  
3298
  // STREAMTODO
3299

3300
  if (STM_ERR_TASK_NOT_EXISTS == err || STM_ERR_STREAM_STOPPED == err) {
×
3301
    TAOS_CHECK_EXIT(msmGrpAddActionUndeploy(pCtx, streamId, pTask));
×
3302
  }
3303

3304
_exit:
×
3305

3306
  if (code) {
×
3307
    // IGNORE STOP STREAM BY ERROR  
3308
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3309
  }
3310
}
×
3311

3312
void msmChkHandleTriggerOperations(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask, SStmTaskStatus* pStatus) {
×
3313
  int32_t code = TSDB_CODE_SUCCESS;
×
3314
  int32_t lino = 0;
×
3315
  SStmStatus* pStream = (SStmStatus*)pStatus->pStream;
×
3316

3317
  if (1 == atomic_val_compare_exchange_8(&pStream->triggerNeedUpdate, 1, 0)) {
×
3318
    TAOS_CHECK_EXIT(msmGrpAddActionUpdateTrigger(pCtx->actionStm, pTask->streamId));
×
3319
  }
3320
  
3321
  SArray* userRecalcList = NULL;
×
3322
  if (atomic_load_ptr(&pStream->userRecalcList)) {
×
3323
    taosWLockLatch(&pStream->userRecalcLock);
×
3324
    if (pStream->userRecalcList) {
×
3325
      userRecalcList = pStream->userRecalcList;
×
3326
      pStream->userRecalcList = NULL;
×
3327
    }
3328
    taosWUnLockLatch(&pStream->userRecalcLock);
×
3329
    
3330
    if (userRecalcList) {
×
3331
      TAOS_CHECK_EXIT(msmGrpAddActionRecalc(pCtx, pTask->streamId, userRecalcList));
×
3332
    }
3333
  }
3334

3335
  if (pTask->detailStatus >= 0 && pCtx->pReq->pTriggerStatus) {
×
3336
    mstWaitLock(&pStatus->detailStatusLock, false);
×
3337
    if (NULL == pStatus->detailStatus) {
×
3338
      pStatus->detailStatus = taosMemoryCalloc(1, sizeof(SSTriggerRuntimeStatus));
×
3339
      if (NULL == pStatus->detailStatus) {
×
3340
        taosWUnLockLatch(&pStatus->detailStatusLock);
×
3341
        TSDB_CHECK_NULL(pStatus->detailStatus, code, lino, _exit, terrno);
×
3342
      }
3343
    }
3344
    
3345
    memcpy(pStatus->detailStatus, taosArrayGet(pCtx->pReq->pTriggerStatus, pTask->detailStatus), sizeof(SSTriggerRuntimeStatus));
×
3346
    taosWUnLockLatch(&pStatus->detailStatusLock);
×
3347
  }
3348

3349
_exit:
×
3350

3351
  if (code) {
×
3352
    // IGNORE STOP STREAM BY ERROR
3353
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3354
  }
3355
}
×
3356

3357
int32_t msmNormalHandleStatusUpdate(SStmGrpCtx* pCtx) {
×
3358
  int32_t code = TSDB_CODE_SUCCESS;
×
3359
  int32_t lino = 0;
×
3360
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
×
3361

3362
  mstDebug("NORMAL: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
×
3363

3364
  for (int32_t i = 0; i < num; ++i) {
×
3365
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
×
3366
    msttDebug("task status %s got on dnode %d, taskIdx:%d", gStreamStatusStr[pTask->status], pCtx->pReq->dnodeId, pTask->taskIdx);
×
3367
    
3368
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
×
3369
    if (NULL == ppStatus) {
×
3370
      msttWarn("task no longer exists in taskMap, will try to undeploy current task, taskIdx:%d", pTask->taskIdx);
×
3371
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
×
3372
      continue;
×
3373
    }
3374

3375
    SStmStatus* pStream = (SStmStatus*)(*ppStatus)->pStream;
×
3376
    int8_t stopped = atomic_load_8(&pStream->stopped);
×
3377
    if (stopped) {
×
3378
      msttWarn("stream already stopped %d, will try to undeploy current task, taskIdx:%d", stopped, pTask->taskIdx);
×
3379
      msmHandleStatusUpdateErr(pCtx, STM_ERR_STREAM_STOPPED, pTask);
×
3380
      continue;
×
3381
    }
3382

3383
    if ((pTask->seriousId != (*ppStatus)->id.seriousId) || (pTask->nodeId != (*ppStatus)->id.nodeId)) {
×
3384
      msttInfo("task mismatch with it in taskMap, will try to rm it, current seriousId:%" PRId64 ", nodeId:%d", 
×
3385
          (*ppStatus)->id.seriousId, (*ppStatus)->id.nodeId);
3386
          
3387
      msmHandleStatusUpdateErr(pCtx, STM_ERR_TASK_NOT_EXISTS, pTask);
×
3388
      continue;
×
3389
    }
3390

3391
    if ((*ppStatus)->status != pTask->status) {
×
3392
      if (STREAM_STATUS_RUNNING == pTask->status) {
×
3393
        (*ppStatus)->runningStartTs = pCtx->currTs;
×
3394
      } else if (MST_IS_RUNNER_GETTING_READY(pTask) && STREAM_IS_REDEPLOY_RUNNER((*ppStatus)->flags)) {
×
3395
        if (pStream->triggerTask) {
×
3396
          atomic_store_8(&pStream->triggerNeedUpdate, 1);
×
3397
        }
3398
        
3399
        STREAM_CLR_FLAG((*ppStatus)->flags, STREAM_FLAG_REDEPLOY_RUNNER);
×
3400
      }
3401
    }
3402
    
3403
    (*ppStatus)->errCode = pTask->errorCode;
×
3404
    (*ppStatus)->status = pTask->status;
×
3405
    (*ppStatus)->lastUpTs = pCtx->currTs;
×
3406
    
3407
    if (STREAM_STATUS_RUNNING != pTask->status) {
×
3408
      msmHandleTaskAbnormalStatus(pCtx, pTask, *ppStatus);
×
3409
    }
3410
    
3411
    if (STREAM_TRIGGER_TASK == pTask->type) {
×
3412
      msmChkHandleTriggerOperations(pCtx, pTask, *ppStatus);
×
3413
    }
3414
  }
3415

3416
_exit:
×
3417

3418
  if (code) {
×
3419
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3420
  }
3421

3422
  return code;
×
3423
}
3424

3425
int32_t msmWatchRecordNewTask(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
×
3426
  int32_t code = TSDB_CODE_SUCCESS;
×
3427
  int32_t lino = 0;
×
3428
  int64_t streamId = pTask->streamId;
×
3429
  SStreamObj* pStream = NULL;
×
3430

3431
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3432
  if (NULL == pStatus) {
×
3433
    SStmStatus status = {0};
×
3434
    TAOS_CHECK_EXIT(mndAcquireStreamById(pCtx->pMnode, streamId, &pStream));
×
3435
    TSDB_CHECK_NULL(pStream, code, lino, _exit, TSDB_CODE_MND_STREAM_NOT_EXIST);
×
3436
    if (STREAM_IS_VIRTUAL_TABLE(pStream->pCreate->triggerTblType, pStream->pCreate->flags)) {
×
3437
      mndReleaseStream(pCtx->pMnode, pStream);
×
3438
      msttDebug("virtual table task ignored, status:%s", gStreamStatusStr[pTask->status]);
×
3439
      return code;
×
3440
    }
3441

3442
    TAOS_CHECK_EXIT(msmInitStmStatus(pCtx, &status, pStream, true));
×
3443
    mndReleaseStream(pCtx->pMnode, pStream);
×
3444

3445
    TAOS_CHECK_EXIT(taosHashPut(mStreamMgmt.streamMap, &streamId, sizeof(streamId), &status, sizeof(status)));
×
3446
    pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3447
    TSDB_CHECK_NULL(pStatus, code, lino, _exit, terrno);
×
3448
    msttDebug("stream added to streamMap cause of new task status:%s", gStreamStatusStr[pTask->status]);
×
3449
  }
3450

3451
  SStmTaskStatus* pNewTask = NULL;
×
3452
  switch (pTask->type) {
×
3453
    case STREAM_READER_TASK: {
×
3454
      SArray* pList = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaders : pStatus->calcReaders;
×
3455
      if (NULL == pList) {
×
3456
        mstsError("%sReader list is NULL", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc");
×
3457
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3458
      }
3459
      int32_t readerSize = STREAM_IS_TRIGGER_READER(pTask->flags) ? pStatus->trigReaderNum : pStatus->calcReaderNum;
×
3460
      if (taosArrayGetSize(pList) >= readerSize) {
×
3461
        mstsError("%sReader list is already full, size:%d, expSize:%d", STREAM_IS_TRIGGER_READER(pTask->flags) ? "trig" : "calc",
×
3462
            (int32_t)taosArrayGetSize(pList), readerSize);
3463
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3464
      }
3465
      
3466
      SStmTaskStatus taskStatus = {0};
×
3467
      taskStatus.pStream = pStatus;
×
3468
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3469
      pNewTask = taosArrayPush(pList, &taskStatus);
×
3470
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3471

3472
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3473
      TAOS_CHECK_EXIT(msmSTAddToVgroupMapImpl(streamId, pNewTask, STREAM_IS_TRIGGER_READER(pTask->flags)));
×
3474
      break;
×
3475
    }
3476
    case STREAM_TRIGGER_TASK: {
×
3477
      taosMemoryFreeClear(pStatus->triggerTask);
×
3478
      pStatus->triggerTask = taosMemoryCalloc(1, sizeof(*pStatus->triggerTask));
×
3479
      TSDB_CHECK_NULL(pStatus->triggerTask, code, lino, _exit, terrno);
×
3480
      pStatus->triggerTask->pStream = pStatus;
×
3481
      mstSetTaskStatusFromMsg(pCtx, pStatus->triggerTask, pTask);
×
3482
      pNewTask = pStatus->triggerTask;
×
3483

3484
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3485
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, 0));
×
3486
      break;
×
3487
    }
3488
    case STREAM_RUNNER_TASK:{
×
3489
      if (NULL == pStatus->runners[pTask->deployId]) {
×
3490
        mstsError("deploy %d runner list is NULL", pTask->deployId);
×
3491
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3492
      }
3493
      if (taosArrayGetSize(pStatus->runners[pTask->deployId]) >= pStatus->runnerNum) {
×
3494
        mstsError("deploy %d runner list is already full, size:%d, expSize:%d", pTask->deployId, 
×
3495
            (int32_t)taosArrayGetSize(pStatus->runners[pTask->deployId]), pStatus->runnerNum);
3496
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3497
      }    
3498
      
3499
      SStmTaskStatus taskStatus = {0};
×
3500
      taskStatus.pStream = pStatus;
×
3501
      mstSetTaskStatusFromMsg(pCtx, &taskStatus, pTask);
×
3502
      pNewTask = taosArrayPush(pStatus->runners[pTask->deployId], &taskStatus);
×
3503
      TSDB_CHECK_NULL(pNewTask, code, lino, _exit, terrno);
×
3504

3505
      TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pNewTask));
×
3506
      TAOS_CHECK_EXIT(msmSTAddToSnodeMapImpl(streamId, pNewTask, pTask->deployId));
×
3507
      break;
×
3508
    }
3509
    default: {
×
3510
      msttError("invalid task type:%d in task status", pTask->type);
×
3511
      TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);
×
3512
      break;
×
3513
    }
3514
  }
3515

3516
_exit:
×
3517

3518
  if (code) {
×
3519
    msttError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3520
  } else {
3521
    msttDebug("new task recored to taskMap/streamMap, task status:%s", gStreamStatusStr[pTask->status]);
×
3522
  }
3523

3524
  return code;
×
3525
}
3526

3527
int32_t msmWatchHandleStatusUpdate(SStmGrpCtx* pCtx) {
×
3528
  int32_t code = TSDB_CODE_SUCCESS;
×
3529
  int32_t lino = 0;
×
3530
  int32_t num = taosArrayGetSize(pCtx->pReq->pStreamStatus);
×
3531

3532
  mstDebug("WATCH: start to handle stream group %d tasks status, taskNum:%d", pCtx->pReq->streamGId, num);
×
3533

3534
  for (int32_t i = 0; i < num; ++i) {
×
3535
    SStmTaskStatusMsg* pTask = taosArrayGet(pCtx->pReq->pStreamStatus, i);
×
3536
    msttDebug("task status %s got, taskIdx:%d", gStreamStatusStr[pTask->status], pTask->taskIdx);
×
3537

3538
    if (pTask->taskId >= mStreamMgmt.lastTaskId) {
×
3539
      mStreamMgmt.lastTaskId = pTask->taskId + 1;
×
3540
    }
3541
    
3542
    SStmTaskStatus** ppStatus = taosHashGet(mStreamMgmt.taskMap, &pTask->streamId, sizeof(pTask->streamId) + sizeof(pTask->taskId));
×
3543
    if (NULL == ppStatus) {
×
3544
      msttInfo("task still not in taskMap, will try to add it, taskIdx:%d", pTask->taskIdx);
×
3545
      
3546
      TAOS_CHECK_EXIT(msmWatchRecordNewTask(pCtx, pTask));
×
3547
      
3548
      continue;
×
3549
    }
3550
    
3551
    (*ppStatus)->status = pTask->status;
×
3552
    (*ppStatus)->lastUpTs = pCtx->currTs;
×
3553
  }
3554

3555
_exit:
×
3556

3557
  if (code) {
×
3558
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3559
  }
3560

3561
  return code;
×
3562
}
3563

3564
void msmRspAddStreamStart(int64_t streamId, SStmGrpCtx* pCtx, int32_t streamNum, SStmAction *pAction) {
×
3565
  int32_t code = TSDB_CODE_SUCCESS;
×
3566
  int32_t lino = 0;
×
3567
  if (NULL == pCtx->pRsp->start.taskList) {
×
3568
    pCtx->pRsp->start.taskList = taosArrayInit(streamNum, sizeof(SStreamTaskStart));
×
3569
    TSDB_CHECK_NULL(pCtx->pRsp->start.taskList, code, lino, _exit, terrno);
×
3570
  }
3571

3572
  SStmTaskId* pId = &pAction->start.triggerId;
×
3573
  SStreamTaskStart start = {0};
×
3574
  start.task.type = STREAM_TRIGGER_TASK;
×
3575
  start.task.streamId = streamId;
×
3576
  start.task.taskId = pId->taskId;
×
3577
  start.task.seriousId = pId->seriousId;
×
3578
  start.task.nodeId = pId->nodeId;
×
3579
  start.task.taskIdx = pId->taskIdx;
×
3580

3581
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->start.taskList, &start), code, lino, _exit, terrno);
×
3582
  TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
×
3583

3584
  mstsDebug("stream START added to dnode %d hb rsp, triggerTaskId:%" PRIx64, pId->nodeId, pId->taskId);
×
3585

3586
  return;
×
3587

3588
_exit:
×
3589

3590
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3591
}
3592

3593

3594
void msmRspAddStreamUndeploy(int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3595
  int32_t code = TSDB_CODE_SUCCESS;
×
3596
  int32_t lino = 0;
×
3597
  int32_t dropNum = taosArrayGetSize(pAction->undeploy.taskList);
×
3598
  if (NULL == pCtx->pRsp->undeploy.taskList) {
×
3599
    pCtx->pRsp->undeploy.taskList = taosArrayInit(dropNum, sizeof(SStreamTaskUndeploy));
×
3600
    TSDB_CHECK_NULL(pCtx->pRsp->undeploy.taskList, code, lino, _exit, terrno);
×
3601
  }
3602

3603
  SStreamTaskUndeploy undeploy;
3604
  for (int32_t i = 0; i < dropNum; ++i) {
×
3605
    SStreamTask* pTask = (SStreamTask*)taosArrayGetP(pAction->undeploy.taskList, i);
×
3606
    undeploy.task = *pTask;
×
3607
    undeploy.undeployMsg.doCheckpoint = pAction->undeploy.doCheckpoint;
×
3608
    undeploy.undeployMsg.doCleanup = pAction->undeploy.doCleanup;
×
3609

3610
    TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->undeploy.taskList, &undeploy), code, lino, _exit, terrno);
×
3611
    TAOS_CHECK_EXIT(msmUpdateStreamLastActTs(streamId, pCtx->currTs));
×
3612

3613
    msttDebug("task UNDEPLOY added to hb rsp, doCheckpoint:%d, doCleanup:%d", undeploy.undeployMsg.doCheckpoint, undeploy.undeployMsg.doCleanup);
×
3614
  }
3615

3616
  return;
×
3617

3618
_exit:
×
3619

3620
  mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3621
}
3622

3623
void msmRspAddTriggerUpdate(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3624
  int32_t code = TSDB_CODE_SUCCESS;
×
3625
  int32_t lino = 0;
×
3626

3627
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3628
  if (NULL == pStream) {
×
3629
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3630
    return;
×
3631
  }
3632

3633
  if (NULL == pStream->triggerTask) {
×
3634
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3635
    return;
×
3636
  }
3637

3638
  SStreamMgmtRsp rsp = {0};
×
3639
  rsp.reqId = INT64_MIN;
×
3640
  rsp.header.msgType = STREAM_MSG_UPDATE_RUNNER;
×
3641
  rsp.task.streamId = streamId;
×
3642
  rsp.task.taskId = pStream->triggerTask->id.taskId;
×
3643

3644
  TAOS_CHECK_EXIT(msmBuildTriggerRunnerTargets(pMnode, pStream, streamId, &rsp.cont.runnerList));  
×
3645

3646
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
3647
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
3648
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
3649
  }
3650

3651
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
3652

3653
_exit:
×
3654

3655
  if (code) {
×
3656
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3657
  } else {
3658
    mstsDebug("trigger update rsp added, runnerNum:%d", (int32_t)taosArrayGetSize(rsp.cont.runnerList));
×
3659
  }
3660
}
3661

3662
void msmRspAddUserRecalc(SMnode * pMnode, int64_t streamId, SStmGrpCtx* pCtx, SStmAction *pAction) {
×
3663
  int32_t code = TSDB_CODE_SUCCESS;
×
3664
  int32_t lino = 0;
×
3665

3666
  SStmStatus* pStream = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3667
  if (NULL == pStream) {
×
3668
    mstsDebug("stream already not exists in streamMap, ignore trigger update, streamRemain:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3669
    return;
×
3670
  }
3671

3672
  if (NULL == pStream->triggerTask) {
×
3673
    mstsWarn("no triggerTask exists, ignore trigger update, stopped:%d", atomic_load_8(&pStream->stopped));
×
3674
    return;
×
3675
  }
3676

3677
  SStreamMgmtRsp rsp = {0};
×
3678
  rsp.reqId = INT64_MIN;
×
3679
  rsp.header.msgType = STREAM_MSG_USER_RECALC;
×
3680
  rsp.task.streamId = streamId;
×
3681
  rsp.task.taskId = pStream->triggerTask->id.taskId;
×
3682
  TSWAP(rsp.cont.recalcList, pAction->recalc.recalcList);
×
3683

3684
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
3685
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
3686
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
3687
  }
3688

3689
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
3690

3691
_exit:
×
3692

3693
  if (code) {
×
3694
    mstsError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3695
  } else {
3696
    mstsDebug("user recalc rsp added, recalcNum:%d", (int32_t)taosArrayGetSize(rsp.cont.recalcList));
×
3697
  }
3698
}
3699

3700

3701
int32_t msmHandleHbPostActions(SStmGrpCtx* pCtx) {
×
3702
  int32_t code = TSDB_CODE_SUCCESS;
×
3703
  int32_t lino = 0;
×
3704
  void* pIter = NULL;
×
3705
  int32_t streamNum = taosHashGetSize(pCtx->actionStm);
×
3706

3707
  mstDebug("start to handle stream group %d post actions", pCtx->pReq->streamGId);
×
3708

3709
  while (1) {
×
3710
    pIter = taosHashIterate(pCtx->actionStm, pIter);
×
3711
    if (pIter == NULL) {
×
3712
      break;
×
3713
    }
3714

3715
    int64_t* pStreamId = taosHashGetKey(pIter, NULL);
×
3716
    SStmAction *pAction = (SStmAction *)pIter;
×
3717
    
3718
    if (STREAM_ACT_UNDEPLOY & pAction->actions) {
×
3719
      msmRspAddStreamUndeploy(*pStreamId, pCtx, pAction);
×
3720
      continue;
×
3721
    }
3722

3723
    if (STREAM_ACT_UPDATE_TRIGGER & pAction->actions) {
×
3724
      msmRspAddTriggerUpdate(pCtx->pMnode, *pStreamId, pCtx, pAction);
×
3725
    }
3726

3727
    if (STREAM_ACT_RECALC & pAction->actions) {
×
3728
      msmRspAddUserRecalc(pCtx->pMnode, *pStreamId, pCtx, pAction);
×
3729
    }
3730

3731
    if (STREAM_ACT_START & pAction->actions) {
×
3732
      msmRspAddStreamStart(*pStreamId, pCtx, streamNum, pAction);
×
3733
    }
3734
  }
3735
  
3736
_exit:
×
3737

3738
  if (pIter) {
×
3739
    taosHashCancelIterate(pCtx->actionStm, pIter);
×
3740
  }
3741

3742
  if (code) {
×
3743
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3744
  }
3745

3746
  return code;
×
3747
}
3748

3749
int32_t msmCheckUpdateDnodeTs(SStmGrpCtx* pCtx) {
21,908✔
3750
  int32_t  code = TSDB_CODE_SUCCESS;
21,908✔
3751
  int32_t  lino = 0;
21,908✔
3752
  int64_t* lastTs = NULL;
21,908✔
3753
  bool     noExists = false;
21,908✔
3754

3755
  while (true) {
3756
    lastTs = taosHashGet(mStreamMgmt.dnodeMap, &pCtx->pReq->dnodeId, sizeof(pCtx->pReq->dnodeId));
23,624✔
3757
    if (NULL == lastTs) {
23,624✔
3758
      if (noExists) {
1,965✔
3759
        mstWarn("Got unknown dnode %d hb msg, may be dropped", pCtx->pReq->dnodeId);
249!
3760
        TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NODE_NOT_EXISTS);
249!
3761
      }
3762

3763
      noExists = true;
1,716✔
3764
      TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pCtx->pMnode));
1,716!
3765
      
3766
      continue;
1,716✔
3767
    }
3768

3769
    while (true) {
×
3770
      int64_t lastTsValue = atomic_load_64(lastTs);
21,659✔
3771
      if (pCtx->currTs > lastTsValue) {
21,659!
3772
        if (lastTsValue == atomic_val_compare_exchange_64(lastTs, lastTsValue, pCtx->currTs)) {
21,659!
3773
          mstDebug("dnode %d lastUpTs updated", pCtx->pReq->dnodeId);
21,659✔
3774
          return code;
21,659✔
3775
        }
3776

3777
        continue;
×
3778
      }
3779

3780
      return code;
×
3781
    }
3782

3783
    break;
3784
  }
3785

3786
_exit:
249✔
3787

3788
  if (code) {
249!
3789
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
249!
3790
  }
3791

3792
  return code;  
249✔
3793
}
3794

3795
void msmWatchCheckStreamMap(SStmGrpCtx* pCtx) {
×
3796
  SStmStatus* pStatus = NULL;
×
3797
  int32_t trigReaderNum = 0;
×
3798
  int32_t calcReaderNum = 0;
×
3799
  int32_t runnerNum = 0;
×
3800
  int64_t streamId = 0;
×
3801
  void* pIter = NULL;
×
3802
  while (true) {
3803
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
3804
    if (NULL == pIter) {
×
3805
      return;
×
3806
    }
3807

3808
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
3809
    pStatus = (SStmStatus*)pIter;
×
3810

3811
    if (NULL == pStatus->triggerTask) {
×
3812
      mstsWarn("no trigger task recored, deployTimes:%" PRId64, pStatus->deployTimes);
×
3813
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3814
      continue;
×
3815
    }
3816
    
3817
    trigReaderNum = taosArrayGetSize(pStatus->trigReaders);
×
3818
    if (pStatus->trigReaderNum != trigReaderNum) {
×
3819
      mstsWarn("trigReaderNum %d mis-match with expected %d", trigReaderNum, pStatus->trigReaderNum);
×
3820
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3821
      continue;
×
3822
    }
3823

3824
    calcReaderNum = taosArrayGetSize(pStatus->calcReaders);
×
3825
    if (pStatus->calcReaderNum != calcReaderNum) {
×
3826
      mstsWarn("calcReaderNum %d mis-match with expected %d", calcReaderNum, pStatus->calcReaderNum);
×
3827
      msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3828
      continue;
×
3829
    }
3830

3831
    for (int32_t i = 0; i < pStatus->runnerDeploys; ++i) {
×
3832
      runnerNum = taosArrayGetSize(pStatus->runners[i]);
×
3833
      if (runnerNum != pStatus->runnerNum) {
×
3834
        mstsWarn("runner deploy %d runnerNum %d mis-match with expected %d", i, runnerNum, pStatus->runnerNum);
×
3835
        msmStopStreamByError(streamId, pStatus, TSDB_CODE_MND_STREAM_TASK_LOST, pCtx->currTs);
×
3836
        continue;
×
3837
      }
3838
    }
3839
  }
3840
}
3841

3842
int32_t msmWatchHandleEnding(SStmGrpCtx* pCtx, bool watchError) {
×
3843
  int32_t code = TSDB_CODE_SUCCESS;
×
3844
  int32_t lino = 0;
×
3845

3846
  if (0 != atomic_val_compare_exchange_8(&mStreamMgmt.watch.ending, 0, 1)) {
×
3847
    return code;
×
3848
  }
3849

3850
  while (atomic_load_32(&mStreamMgmt.watch.processing) > 1) {
×
3851
    (void)sched_yield();
×
3852
  }
3853

3854
  if (watchError) {
×
3855
    taosHashClear(mStreamMgmt.vgroupMap);
×
3856
    taosHashClear(mStreamMgmt.snodeMap);
×
3857
    taosHashClear(mStreamMgmt.taskMap);
×
3858
    taosHashClear(mStreamMgmt.streamMap);
×
3859
    mstInfo("watch error happends, clear all maps");
×
3860
    goto _exit;
×
3861
  }
3862

3863
  if (0 == atomic_load_8(&mStreamMgmt.watch.taskRemains)) {
×
3864
    mstInfo("no stream tasks remain during watch state");
×
3865
    goto _exit;
×
3866
  }
3867

3868
  msmWatchCheckStreamMap(pCtx);
×
3869

3870
_exit:
×
3871

3872
  mStreamMgmt.lastTaskId += 100000;
×
3873

3874
  mstInfo("watch state end, new taskId begin from:%" PRIx64, mStreamMgmt.lastTaskId);
×
3875

3876
  msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
×
3877

3878
  if (code) {
×
3879
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3880
  }
3881

3882
  return code;
×
3883
}
3884

3885

3886
int32_t msmWatchHandleHbMsg(SStmGrpCtx* pCtx) {
×
3887
  int32_t code = TSDB_CODE_SUCCESS;
×
3888
  int32_t lino = 0;
×
3889
  SStreamHbMsg* pReq = pCtx->pReq;
×
3890

3891
  atomic_add_fetch_32(&mStreamMgmt.watch.processing, 1);
×
3892
  
3893
  if (atomic_load_8(&mStreamMgmt.watch.ending)) {
×
3894
    goto _exit;
×
3895
  }
3896

3897
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
×
3898
  if (GOT_SNODE(pReq->snodeId)) {
×
3899
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
×
3900
  }
3901

3902
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
×
3903
    atomic_store_8(&mStreamMgmt.watch.taskRemains, 1);
×
3904
    TAOS_CHECK_EXIT(msmWatchHandleStatusUpdate(pCtx));
×
3905
  }
3906

3907
  if ((pCtx->currTs - MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN)) > MST_ISOLATION_DURATION) {
×
3908
    TAOS_CHECK_EXIT(msmWatchHandleEnding(pCtx, false));
×
3909
  }
3910

3911
_exit:
×
3912

3913
  atomic_sub_fetch_32(&mStreamMgmt.watch.processing, 1);
×
3914
  
3915
  if (code) {
×
3916
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3917

3918
    msmWatchHandleEnding(pCtx, true);
×
3919
  }
3920

3921
  return code;
×
3922
}
3923

3924
int32_t msmCheckDeployTrigReader(SStmGrpCtx* pCtx, SStmStatus* pStatus, SStmTaskStatusMsg* pTask, int32_t vgId, int32_t vgNum) {
×
3925
  int32_t code = TSDB_CODE_SUCCESS;
×
3926
  int32_t lino = 0;
×
3927
  bool    readerExists = false;
×
3928
  int64_t streamId = pTask->streamId;
×
3929

3930
  int32_t readerNum = taosArrayGetSize(pStatus->trigReaders);
×
3931
  for (int32_t i = 0; i < readerNum; ++i) {
×
3932
    SStmTaskStatus* pReader = (SStmTaskStatus*)taosArrayGet(pStatus->trigReaders, i);
×
3933
    if (pReader->id.nodeId == vgId) {
×
3934
      readerExists = true;
×
3935
      break;
×
3936
    }
3937
  }
3938

3939
  if (!readerExists) {
×
3940
    if (NULL == pStatus->trigOReaders) {
×
3941
      pStatus->trigOReaders = taosArrayInit(vgNum, sizeof(SStmTaskStatus));
×
3942
      TSDB_CHECK_NULL(pStatus->trigOReaders, code, lino, _exit, terrno);
×
3943
    }
3944
    
3945
    SStmTaskStatus* pState = taosArrayReserve(pStatus->trigOReaders, 1);
×
3946
    TAOS_CHECK_EXIT(msmTDAddSingleTrigReader(pCtx, pState, vgId, pStatus, NULL, streamId));
×
3947
    TAOS_CHECK_EXIT(msmSTAddToTaskMap(pCtx, streamId, NULL, pState));
×
3948
    TAOS_CHECK_EXIT(msmSTAddToVgroupMap(pCtx, streamId, NULL, pState, true));
×
3949
  }
3950

3951
_exit:
×
3952

3953
  if (code) {
×
3954
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
3955
  }
3956

3957
  return code;
×
3958
}
3959

3960
int32_t msmProcessDeployOrigReader(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
×
3961
  int32_t code = TSDB_CODE_SUCCESS;
×
3962
  int32_t lino = 0;
×
3963
  int32_t vgId = 0;
×
3964
  int64_t streamId = pTask->streamId;
×
3965
  SArray* pTbs = pTask->pMgmtReq->cont.fullTableNames;
×
3966
  int32_t tbNum = taosArrayGetSize(pTbs);
×
3967
  SStreamDbTableName* pName = NULL;
×
3968
  SSHashObj* pDbVgroups = NULL;
×
3969
  SStreamMgmtRsp rsp = {0};
×
3970
  rsp.reqId = pTask->pMgmtReq->reqId;
×
3971
  rsp.header.msgType = STREAM_MSG_ORIGTBL_READER_INFO;
×
3972
  int32_t iter = 0;
×
3973
  void* p = NULL;
×
3974
  SSHashObj* pVgs = NULL;
×
3975
  SStreamMgmtReq* pMgmtReq = NULL;
×
3976
  
3977
  TSWAP(pTask->pMgmtReq, pMgmtReq);
×
3978
  rsp.task = *(SStreamTask*)pTask;
×
3979

3980
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
3981
  if (NULL == pStatus) {
×
3982
    mstsError("stream not deployed, remainStreams:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
3983
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_NOT_RUNNING);
×
3984
  }
3985

3986
  if (atomic_load_8(&pStatus->stopped)) {
×
3987
    msttInfo("stream stopped, ignore deploy trigger reader, vgId:%d", vgId);
×
3988
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_STOPPED);
×
3989
  }
3990

3991
  if (tbNum <= 0) {
×
3992
    mstsWarn("empty table list in origReader req, array:%p", pTbs);
×
3993
    goto _exit;
×
3994
  }
3995

3996
  int32_t oReaderNum = taosArrayGetSize(pStatus->trigOReaders);
×
3997
  if (oReaderNum > 0) {
×
3998
    mstsWarn("origReaders already exits, num:%d", oReaderNum);
×
3999
    goto _exit;
×
4000
  }
4001

4002
  TAOS_CHECK_EXIT(mstBuildDBVgroupsMap(pCtx->pMnode, &pDbVgroups));
×
4003
  rsp.cont.vgIds = taosArrayInit(tbNum, sizeof(int32_t));
×
4004
  TSDB_CHECK_NULL(rsp.cont.vgIds, code, lino, _exit, terrno);
×
4005

4006
  pVgs = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
4007
  TSDB_CHECK_NULL(pVgs, code, lino, _exit, terrno);
×
4008
  
4009
  for (int32_t i = 0; i < tbNum; ++i) {
×
4010
    pName = (SStreamDbTableName*)taosArrayGet(pTbs, i);
×
4011
    TAOS_CHECK_EXIT(mstGetTableVgId(pDbVgroups, pName->dbFName, pName->tbName, &vgId));
×
4012
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.vgIds, &vgId), code, lino, _exit, terrno);
×
4013
    TAOS_CHECK_EXIT(tSimpleHashPut(pVgs, &vgId, sizeof(vgId), &vgId, sizeof(vgId)));
×
4014
  }
4015

4016
  int32_t vgNum = tSimpleHashGetSize(pVgs);
×
4017
  while (true) {
4018
    p = tSimpleHashIterate(pVgs, p, &iter);
×
4019
    if (NULL == p) {
×
4020
      break;
×
4021
    }
4022
    
4023
    TAOS_CHECK_EXIT(msmCheckDeployTrigReader(pCtx, pStatus, pTask, *(int32_t*)p, vgNum));
×
4024
  }
4025
  
4026
  vgNum = taosArrayGetSize(pStatus->trigOReaders);
×
4027
  rsp.cont.readerList = taosArrayInit(vgNum, sizeof(SStreamTaskAddr));
×
4028
  TSDB_CHECK_NULL(rsp.cont.readerList, code, lino, _exit, terrno);
×
4029

4030
  SStreamTaskAddr addr;
4031
  for (int32_t i = 0; i < vgNum; ++i) {
×
4032
    SStmTaskStatus* pOTask = taosArrayGet(pStatus->trigOReaders, i);
×
4033
    addr.taskId = pOTask->id.taskId;
×
4034
    addr.nodeId = pOTask->id.nodeId;
×
4035
    addr.epset = mndGetVgroupEpsetById(pCtx->pMnode, pOTask->id.nodeId);
×
4036
    TSDB_CHECK_NULL(taosArrayPush(rsp.cont.readerList, &addr), code, lino, _exit, terrno);
×
4037
    mstsDebug("the %dth otrigReader src added to trigger's virtual orig readerList, TASK:%" PRIx64 " nodeId:%d", i, addr.taskId, addr.nodeId);
×
4038
  }
4039

4040
  if (NULL == pCtx->pRsp->rsps.rspList) {
×
4041
    pCtx->pRsp->rsps.rspList = taosArrayInit(2, sizeof(SStreamMgmtRsp));
×
4042
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
4043
  }
4044

4045
  TSDB_CHECK_NULL(taosArrayPush(pCtx->pRsp->rsps.rspList, &rsp), code, lino, _exit, terrno);
×
4046

4047
_exit:
×
4048

4049
  tFreeSStreamMgmtReq(pMgmtReq);
×
4050
  taosMemoryFree(pMgmtReq);
×
4051

4052
  tSimpleHashCleanup(pVgs);
×
4053

4054
  if (code) {
×
4055
    mndStreamDestroySStreamMgmtRsp(&rsp);
×
4056
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4057
  }
4058

4059
  mstDestroyDbVgroupsHash(pDbVgroups);
×
4060

4061
  return code;
×
4062
}
4063

4064
int32_t msmHandleTaskMgmtReq(SStmGrpCtx* pCtx, SStmTaskStatusMsg* pTask) {
×
4065
  int32_t code = TSDB_CODE_SUCCESS;
×
4066
  int32_t lino = 0;
×
4067

4068
  switch (pTask->pMgmtReq->type) {
×
4069
    case STREAM_MGMT_REQ_TRIGGER_ORIGTBL_READER:
×
4070
      TAOS_CHECK_EXIT(msmProcessDeployOrigReader(pCtx, pTask));
×
4071
      break;
×
4072
    default:
×
4073
      msttError("Invalid mgmtReq type:%d", pTask->pMgmtReq->type);
×
4074
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4075
      break;
×
4076
  }
4077

4078
_exit:
×
4079

4080
  if (code) {
×
4081
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4082
  }
4083

4084
  return code;
×
4085
}
4086

4087
int32_t msmHandleStreamRequests(SStmGrpCtx* pCtx) {
×
4088
  int32_t code = TSDB_CODE_SUCCESS;
×
4089
  int32_t lino = 0;
×
4090
  SStreamHbMsg* pReq = pCtx->pReq;
×
4091
  SStmTaskStatusMsg* pTask = NULL;
×
4092
  
4093
  int32_t reqNum = taosArrayGetSize(pReq->pStreamReq);
×
4094
  if (reqNum > 0 && NULL == pCtx->pRsp->rsps.rspList) {
×
4095
    pCtx->pRsp->rsps.rspList = taosArrayInit(reqNum, sizeof(SStreamMgmtRsp));
×
4096
    TSDB_CHECK_NULL(pCtx->pRsp->rsps.rspList, code, lino, _exit, terrno);
×
4097
  }
4098
  
4099
  for (int32_t i = 0; i < reqNum; ++i) {
×
4100
    int32_t idx = *(int32_t*)taosArrayGet(pReq->pStreamReq, i);
×
4101
    pTask = (SStmTaskStatusMsg*)taosArrayGet(pReq->pStreamStatus, idx);
×
4102
    if (NULL == pTask) {
×
4103
      mstError("idx %d is NULL, reqNum:%d", idx, reqNum);
×
4104
      continue;
×
4105
    }
4106

4107
    if (NULL == pTask->pMgmtReq) {
×
4108
      msttError("idx %d without mgmtReq", idx);
×
4109
      continue;
×
4110
    }
4111

4112
    TAOS_CHECK_EXIT(msmHandleTaskMgmtReq(pCtx, pTask));
×
4113
  }
4114

4115
_exit:
×
4116

4117
  if (code) {
×
4118
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4119
  }
4120

4121
  return code;
×
4122
}
4123

4124
int32_t msmNormalHandleHbMsg(SStmGrpCtx* pCtx) {
21,908✔
4125
  int32_t code = TSDB_CODE_SUCCESS;
21,908✔
4126
  int32_t lino = 0;
21,908✔
4127
  SStreamHbMsg* pReq = pCtx->pReq;
21,908✔
4128

4129
  TAOS_CHECK_EXIT(msmCheckUpdateDnodeTs(pCtx));
21,908✔
4130
  if (GOT_SNODE(pReq->snodeId)) {
21,659✔
4131
    TAOS_CHECK_EXIT(msmUpdateSnodeUpTs(pCtx));
15!
4132
  }
4133
  
4134
  if (atomic_load_64(&mStreamMgmt.actionQ->qRemainNum) > 0 && 0 == taosWTryLockLatch(&mStreamMgmt.actionQLock)) {
21,659!
4135
    code = msmHandleStreamActions(pCtx);
×
4136
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
×
4137
    TAOS_CHECK_EXIT(code);
×
4138
  }
4139

4140
  if (taosArrayGetSize(pReq->pStreamReq) > 0 && mstWaitLock(&mStreamMgmt.actionQLock, false)) {
21,659!
4141
    code = msmHandleStreamRequests(pCtx);
×
4142
    taosWUnLockLatch(&mStreamMgmt.actionQLock);
×
4143
    TAOS_CHECK_EXIT(code);
×
4144
  }
4145

4146
  if (atomic_load_32(&mStreamMgmt.toDeployVgTaskNum) > 0) {
21,659!
4147
    TAOS_CHECK_EXIT(msmGrpAddDeployVgTasks(pCtx));
×
4148
  } else {
4149
    TAOS_CHECK_EXIT(msmUpdateVgroupsUpTs(pCtx));
21,659!
4150
  }
4151

4152
  if (atomic_load_32(&mStreamMgmt.toDeploySnodeTaskNum) > 0 && GOT_SNODE(pReq->snodeId)) {
21,659!
4153
    TAOS_CHECK_EXIT(msmGrpAddDeploySnodeTasks(pCtx));
×
4154
  }
4155

4156
  if (taosHashGetSize(pCtx->deployStm) > 0) {
21,659!
4157
    TAOS_CHECK_EXIT(msmRspAddStreamsDeploy(pCtx));
×
4158
  }
4159

4160
  if (taosArrayGetSize(pReq->pStreamStatus) > 0) {
21,659!
4161
    TAOS_CHECK_EXIT(msmNormalHandleStatusUpdate(pCtx));
×
4162
  }
4163

4164
  if (taosHashGetSize(pCtx->actionStm) > 0) {
21,659!
4165
    TAOS_CHECK_EXIT(msmHandleHbPostActions(pCtx));
×
4166
  }
4167

4168
_exit:
21,659✔
4169

4170
  if (code) {
21,908✔
4171
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
249!
4172
  }
4173

4174
  return code;
21,908✔
4175
}
4176

4177
void msmEncodeStreamHbRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SMStreamHbRspMsg* pRsp, SRpcMsg* pMsg) {
21,908✔
4178
  int32_t lino = 0;
21,908✔
4179
  int32_t tlen = 0;
21,908✔
4180
  void   *buf = NULL;
21,908✔
4181

4182
  if (TSDB_CODE_SUCCESS != code) {
21,908✔
4183
    goto _exit;
249✔
4184
  }
4185

4186
  tEncodeSize(tEncodeStreamHbRsp, pRsp, tlen, code);
21,659!
4187
  if (code < 0) {
21,659!
4188
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4189
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4190
  }
4191

4192
  buf = rpcMallocCont(tlen + sizeof(SStreamMsgGrpHeader));
21,659✔
4193
  if (buf == NULL) {
21,659!
4194
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
×
4195
    TAOS_CHECK_EXIT(terrno);    
×
4196
  }
4197

4198
  ((SStreamMsgGrpHeader *)buf)->streamGid = pRsp->streamGId;
21,659✔
4199
  void *abuf = POINTER_SHIFT(buf, sizeof(SStreamMsgGrpHeader));
21,659✔
4200

4201
  SEncoder encoder;
4202
  tEncoderInit(&encoder, abuf, tlen);
21,659✔
4203
  if ((code = tEncodeStreamHbRsp(&encoder, pRsp)) < 0) {
21,659!
4204
    rpcFreeCont(buf);
×
4205
    buf = NULL;
×
4206
    tEncoderClear(&encoder);
×
4207
    mstError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
×
4208
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_INTERNAL_ERROR);    
×
4209
  }
4210
  tEncoderClear(&encoder);
21,659✔
4211

4212
_exit:
21,908✔
4213

4214
  pMsg->code = code;
21,908✔
4215
  pMsg->info = *pRpcInfo;
21,908✔
4216
  if (TSDB_CODE_SUCCESS == code) {
21,908✔
4217
    pMsg->contLen = tlen + sizeof(SStreamMsgGrpHeader);
21,659✔
4218
    pMsg->pCont = buf;
21,659✔
4219
  }
4220
}
21,908✔
4221

4222

4223
int32_t msmHandleStreamHbMsg(SMnode* pMnode, int64_t currTs, SStreamHbMsg* pHb, SRpcMsg *pReq, SRpcMsg* pRspMsg) {
22,058✔
4224
  int32_t code = TSDB_CODE_SUCCESS;
22,058✔
4225

4226
  if (0 == atomic_load_8(&mStreamMgmt.active)) {
22,058✔
4227
    mstWarn("mnode stream is NOT active, ignore stream hb from dnode %d streamGid %d", pHb->dnodeId, pHb->streamGId);
150!
4228
    return code;
150✔
4229
  }
4230

4231
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
21,908✔
4232

4233
  SMStreamHbRspMsg rsp = {0};
21,908✔
4234
  int32_t tidx = streamGetThreadIdx(mStreamMgmt.threadNum, pHb->streamGId);
21,908✔
4235
  SStmGrpCtx* pCtx = &mStreamMgmt.tCtx[tidx].grpCtx[pHb->streamGId];
21,908✔
4236

4237
  pCtx->tidx = tidx;
21,908✔
4238
  pCtx->pMnode = pMnode;
21,908✔
4239
  pCtx->currTs = currTs;
21,908✔
4240
  pCtx->pReq = pHb;
21,908✔
4241
  pCtx->pRsp = &rsp;
21,908✔
4242
  pCtx->deployStm = mStreamMgmt.tCtx[pCtx->tidx].deployStm[pHb->streamGId];
21,908✔
4243
  pCtx->actionStm = mStreamMgmt.tCtx[pCtx->tidx].actionStm[pHb->streamGId];
21,908✔
4244
  
4245
  switch (atomic_load_8(&mStreamMgmt.state)) {
21,908!
4246
    case MND_STM_STATE_WATCH:
×
4247
      code = msmWatchHandleHbMsg(pCtx);
×
4248
      break;
×
4249
    case MND_STM_STATE_NORMAL:
21,908✔
4250
      code = msmNormalHandleHbMsg(pCtx);
21,908✔
4251
      break;
21,908✔
4252
    default:
×
4253
      mstError("Invalid stream state: %d", mStreamMgmt.state);
×
4254
      code = TSDB_CODE_MND_STREAM_INTERNAL_ERROR;
×
4255
      break;
×
4256
  }
4257

4258
  msmEncodeStreamHbRsp(code, &pReq->info, &rsp, pRspMsg);
21,908✔
4259

4260
  msmCleanStreamGrpCtx(pHb);
21,908✔
4261
  msmClearStreamToDeployMaps(pHb);
21,908✔
4262

4263
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
21,908✔
4264

4265
  tFreeSMStreamHbRspMsg(&rsp);
21,908✔
4266

4267
  return code;
21,908✔
4268
}
4269

4270
void msmHandleBecomeLeader(SMnode *pMnode) {
1,813✔
4271
  streamAddVnodeLeader(MNODE_HANDLE);
1,813✔
4272
  
4273
  taosWLockLatch(&mStreamMgmt.runtimeLock);
1,813✔
4274
  msmInitRuntimeInfo(pMnode);
1,813✔
4275
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
1,813✔
4276
  atomic_store_8(&mStreamMgmt.active, 1);
1,813✔
4277
}
1,813✔
4278

4279
void msmHandleBecomeNotLeader(SMnode *pMnode) {  
135✔
4280
  streamRemoveVnodeLeader(MNODE_HANDLE);
135✔
4281

4282
  if (atomic_val_compare_exchange_8(&mStreamMgmt.active, 1, 0)) {
135!
4283
    taosWLockLatch(&mStreamMgmt.runtimeLock);
×
4284
    msmDestroyRuntimeInfo(pMnode);
×
4285
    taosWUnLockLatch(&mStreamMgmt.runtimeLock);
×
4286
  }
4287
}
135✔
4288

4289

4290
static void msmRedeployStream(int64_t streamId, SStmStatus* pStatus) {
×
4291
  if (1 == atomic_val_compare_exchange_8(&pStatus->stopped, 1, 0)) {
×
4292
    mstsInfo("try to reset and redeploy stream, deployTimes:%" PRId64, pStatus->deployTimes);
×
4293
    mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4294
  } else {
4295
    mstsWarn("stream stopped %d already changed", atomic_load_8(&pStatus->stopped));
×
4296
  }
4297
}
×
4298

4299
static bool msmCheckStreamAssign(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
4300
  int32_t code = TSDB_CODE_SUCCESS;
×
4301
  int32_t lino = 0;
×
4302
  SStreamObj* pStream = pObj;
×
4303
  SSnodeObj* pSnode = p1;
×
4304
  SArray** ppRes = p2;
×
4305

4306
  if (pStream->mainSnodeId == pSnode->id) {
×
4307
    if (NULL == *ppRes) {
×
4308
      int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM);
×
4309
      *ppRes = taosArrayInit(streamNum, POINTER_BYTES);
×
4310
      TSDB_CHECK_NULL(*ppRes, code, lino, _exit, terrno);
×
4311
    }
4312

4313
    TSDB_CHECK_NULL(taosArrayPush(*ppRes, &pStream), code, lino, _exit, terrno);
×
4314
  }
4315

4316
  return true;
×
4317

4318
_exit:
×
4319

4320
  if (code) {
×
4321
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4322
  }  
4323

4324
  *(int32_t*)p3 = code;
×
4325

4326
  return false;
×
4327
}
4328

4329

4330
int32_t msmCheckSnodeReassign(SMnode *pMnode, SSnodeObj* pSnode, SArray** ppRes) {
8✔
4331
  int32_t code = TSDB_CODE_SUCCESS;
8✔
4332
  int32_t lino = 0;
8✔
4333
  
4334
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckStreamAssign, pSnode, ppRes, &code);
8✔
4335
  TAOS_CHECK_EXIT(code);
8!
4336

4337
  int32_t streamNum = taosArrayGetSize(*ppRes);
8✔
4338
  if (streamNum > 0 && 0 == pSnode->replicaId) {
8!
4339
    mstError("snode %d has no replica while %d streams assigned", pSnode->id, streamNum);
×
4340
    TAOS_CHECK_EXIT(TSDB_CODE_MND_STREAM_SNODE_IN_USE);
×
4341
  }
4342

4343
  //STREAMTODO CHECK REPLICA UPDATED OR NOT
4344

4345
_exit:
8✔
4346

4347
  if (code) {
8!
4348
    mstError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
4349
  }  
4350

4351
  return code;
8✔
4352
}
4353

4354
static bool msmCheckLoopStreamSdb(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
4355
  SStreamObj* pStream = pObj;
×
4356
  int64_t streamId = pStream->pCreate->streamId;
×
4357
  SStmStatus* pStatus = taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
4358
  SStmCheckStatusCtx* pCtx = (SStmCheckStatusCtx*)p1;
×
4359
  int8_t userDropped = atomic_load_8(&pStream->userDropped), userStopped = atomic_load_8(&pStream->userStopped);
×
4360
  
4361
  if ((userDropped || userStopped) && (NULL == pStatus)) {
×
4362
    mstsDebug("stream userDropped %d userStopped %d and not in streamMap, ignore it", userDropped, userStopped);
×
4363
    return true;
×
4364
  }
4365
  
4366
  if (pStatus && !MST_STM_PASS_ISOLATION(pStream, pStatus)) {
×
4367
    mstsDebug("stream not pass isolation time, updateTime:%" PRId64 ", lastActionTs:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
×
4368
        pStream->updateTime, pStatus->lastActionTs, mStreamMgmt.hCtx.currentTs);
4369
    return true;
×
4370
  }
4371

4372
  if (NULL == pStatus && !MST_STM_STATIC_PASS_ISOLATION(pStream)) {
×
4373
    mstsDebug("stream not pass static isolation time, updateTime:%" PRId64 ", currentTs %" PRId64 ", ignore check it", 
×
4374
        pStream->updateTime, mStreamMgmt.hCtx.currentTs);
4375
    return true;
×
4376
  }  
4377

4378
  if (pStatus) {
×
4379
    if (userDropped || userStopped || MST_IS_USER_STOPPED(atomic_load_8(&pStatus->stopped))) {
×
4380
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
×
4381
    }
4382

4383
    return true;
×
4384
  }
4385

4386
  mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, false, STREAM_ACT_DEPLOY);
×
4387

4388
  return true;
×
4389
}
4390

4391
void msmCheckLoopStreamMap(SMnode *pMnode) {
×
4392
  SStmStatus* pStatus = NULL;
×
4393
  void* pIter = NULL;
×
4394
  int8_t stopped = 0;
×
4395
  int64_t streamId = 0;
×
4396
  
4397
  while (true) {
4398
    pIter = taosHashIterate(mStreamMgmt.streamMap, pIter);
×
4399
    if (NULL == pIter) {
×
4400
      break;
×
4401
    }
4402

4403
    pStatus = (SStmStatus*)pIter;
×
4404

4405
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4406
    stopped = atomic_load_8(&pStatus->stopped);
×
4407
    if (MST_IS_USER_STOPPED(stopped)) {
×
4408
      mstsDebug("stream already stopped by user, deployTimes:%" PRId64, pStatus->deployTimes);
×
4409
      (void)msmRemoveStreamFromMaps(pMnode, streamId);
×
4410
      continue;
×
4411
    }
4412

4413
    if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, pStatus->streamName)) {
×
4414
      mstsDebug("stream already not exists, deployTimes:%" PRId64, pStatus->deployTimes);
×
4415
      (void)msmRemoveStreamFromMaps(pMnode, *(int64_t*)taosHashGetKey(pIter, NULL));
×
4416
      continue;
×
4417
    }
4418

4419
    if (MST_IS_ERROR_STOPPED(stopped)) {
×
4420
      if (mStreamMgmt.hCtx.currentTs < pStatus->fatalRetryTs) {
×
4421
        mstsDebug("stream already stopped by error %s, retried times:%" PRId64 ", next time not reached, currTs:%" PRId64 ", nextRetryTs:%" PRId64,
×
4422
            tstrerror(pStatus->fatalError), pStatus->fatalRetryTimes, mStreamMgmt.hCtx.currentTs, pStatus->fatalRetryTs);
4423
            
4424
        MND_STREAM_SET_LAST_TS(STM_EVENT_STM_TERR, mStreamMgmt.hCtx.currentTs);
×
4425
        continue;
×
4426
      }
4427

4428
      mstPostStreamAction(mStreamMgmt.actionQ, *(int64_t*)taosHashGetKey(pIter, NULL), pStatus->streamName, NULL, false, STREAM_ACT_DEPLOY);
×
4429
    }
4430
  }
4431
}
×
4432

4433
void msmCheckStreamsStatus(SMnode *pMnode) {
×
4434
  SStmCheckStatusCtx ctx = {0};
×
4435

4436
  mstDebug("start to check streams status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
×
4437
  
4438
  if (MST_READY_FOR_SDB_LOOP()) {
×
4439
    mstDebug("ready to check sdb loop, lastLoopSdbTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SDB].ts);
×
4440
    sdbTraverse(pMnode->pSdb, SDB_STREAM, msmCheckLoopStreamSdb, &ctx, NULL, NULL);
×
4441
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_SDB, mStreamMgmt.hCtx.currentTs);
×
4442
  }
4443

4444
  if (MST_READY_FOR_MAP_LOOP()) {
×
4445
    mstDebug("ready to check map loop, lastLoopMapTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_MAP].ts);
×
4446
    msmCheckLoopStreamMap(pMnode);
×
4447
    MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
×
4448
  }
4449
}
×
4450

4451
void msmCheckTaskListStatus(int64_t streamId, SStmTaskStatus** pList, int32_t taskNum) {
×
4452
  for (int32_t i = 0; i < taskNum; ++i) {
×
4453
    SStmTaskStatus* pTask = *(pList + i);
×
4454

4455
    if (atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped)) {
×
4456
      continue;
×
4457
    }
4458
    
4459
    if (!MST_PASS_ISOLATION(pTask->lastUpTs, 1)) {
×
4460
      continue;
×
4461
    }
4462

4463
    int64_t noUpTs = mStreamMgmt.hCtx.currentTs - pTask->lastUpTs;
×
4464
    if (STREAM_RUNNER_TASK == pTask->type || STREAM_TRIGGER_TASK == pTask->type) {
×
4465
      mstsWarn("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
×
4466
          gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4467
          
4468
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_TASK_LOST, mStreamMgmt.hCtx.currentTs);
×
4469
      break;
×
4470
    }
4471

4472
    mstsInfo("%s TASK:%" PRIx64 " status not updated for %" PRId64 "ms, will try to redeploy it", 
×
4473
        gStreamTaskTypeStr[pTask->type], pTask->id.taskId, noUpTs);
4474

4475
    int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
×
4476
    mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
×
4477

4478
    SStmTaskAction task = {0};
×
4479
    task.streamId = streamId;
×
4480
    task.id = pTask->id;
×
4481
    task.flag = pTask->flags;
×
4482
    task.type = pTask->type;
×
4483
    
4484
    mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
×
4485
  }
4486
}
×
4487

4488
void msmCheckVgroupStreamStatus(SHashObj* pStreams) {
×
4489
  void* pIter = NULL;
×
4490
  SStmVgStreamStatus* pVg = NULL;
×
4491
  int64_t streamId = 0;
×
4492
  
4493
  while (true) {
×
4494
    pIter = taosHashIterate(pStreams, pIter);
×
4495
    if (NULL == pIter) {
×
4496
      break;
×
4497
    }
4498

4499
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4500
    pVg = (SStmVgStreamStatus*)pIter;
×
4501

4502
    int32_t taskNum = taosArrayGetSize(pVg->trigReaders);
×
4503
    if (taskNum > 0) {
×
4504
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->trigReaders, 0), taskNum);
×
4505
    }
4506

4507
    taskNum = taosArrayGetSize(pVg->calcReaders);
×
4508
    if (taskNum > 0) {
×
4509
      msmCheckTaskListStatus(streamId, taosArrayGet(pVg->calcReaders, 0), taskNum);
×
4510
    }
4511
  }
4512
}
×
4513

4514
void msmHandleVgroupLost(SMnode *pMnode, int32_t vgId, SStmVgroupStatus* pVg) {
×
4515
  int64_t streamId = 0;
×
4516
  void* pIter = NULL;
×
4517
  SStmVgStreamStatus* pStream = NULL;
×
4518

4519
  if (!MST_PASS_ISOLATION(pVg->lastUpTs, 5)) {
×
4520
    mstDebug("vgroup %d lost and still in watch time, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
×
4521
    return;
×
4522
  }
4523

4524
  
4525
  while (true) {
4526
    pIter = taosHashIterate(pVg->streamTasks, pIter);
×
4527
    if (NULL == pIter) {
×
4528
      break;
×
4529
    }
4530

4531
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4532
    
4533
    msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_VGROUP_LOST, mStreamMgmt.hCtx.currentTs);
×
4534
  }
4535

4536
  taosHashClear(pVg->streamTasks);
×
4537
}
4538

4539

4540
void msmCheckVgroupStatus(SMnode *pMnode) {
×
4541
  void* pIter = NULL;
×
4542
  
4543
  while (true) {
×
4544
    pIter = taosHashIterate(mStreamMgmt.vgroupMap, pIter);
×
4545
    if (NULL == pIter) {
×
4546
      break;
×
4547
    }
4548

4549
    int32_t vgId = *(int32_t*)taosHashGetKey(pIter, NULL);
×
4550
    if ((vgId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
×
4551
      continue;
×
4552
    }
4553
    
4554
    SStmVgroupStatus* pVg = (SStmVgroupStatus*)pIter;
×
4555

4556
    if (MST_PASS_ISOLATION(pVg->lastUpTs, 1)) {
×
4557
      mstWarn("vgroup %d lost, lastUpTs:%" PRId64 ", streamNum:%d", vgId, pVg->lastUpTs, (int32_t)taosHashGetSize(pVg->streamTasks));
×
4558
      
4559
      msmHandleVgroupLost(pMnode, vgId, pVg);
×
4560
      continue;
×
4561
    }
4562

4563
    mstDebug("vgroup %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, vgId, mStreamMgmt.hCtx.currentTs, pVg->lastUpTs);
×
4564

4565
    msmCheckVgroupStreamStatus(pVg->streamTasks);
×
4566
  }
4567
}
×
4568

4569
void msmHandleRunnerRedeploy(int64_t streamId, SStmSnodeStreamStatus* pStream, int32_t* deployNum, int32_t* deployId) {
×
4570
  *deployNum = 0;
×
4571
  
4572
  for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
×
4573
    if (pStream->runners[i]) {
×
4574
      int32_t taskNum = taosArrayGetSize(pStream->runners[i]);
×
4575
      for (int32_t t = 0; t < taskNum; ++t) {
×
4576
        SStmTaskStatus* pTask = taosArrayGetP(pStream->runners[i], t);
×
4577
        int8_t stopped = atomic_load_8(&((SStmStatus*)pTask->pStream)->stopped);
×
4578
        if (stopped) {
×
4579
          mstsDebug("stream already stopped %d, ignore it", stopped);
×
4580
          *deployNum = 0;
×
4581
          return;
×
4582
        }
4583

4584
        int64_t newSid = atomic_add_fetch_64(&pTask->id.seriousId, 1);
×
4585
        mstsDebug("task %" PRIx64 " SID updated to %" PRIx64, pTask->id.taskId, newSid);
×
4586
      }
4587
      
4588
      deployId[*deployNum] = i;
×
4589
      (*deployNum)++;
×
4590
    }
4591
  }
4592
}
4593

4594
void msmHandleSnodeLost(SMnode *pMnode, SStmSnodeStatus* pSnode) {
×
4595
  pSnode->runnerThreadNum = -1;
×
4596

4597
  (void)msmSTAddSnodesToMap(pMnode);
×
4598

4599
  int64_t streamId = 0;
×
4600
  void* pIter = NULL;
×
4601
  SStmSnodeStreamStatus* pStream = NULL;
×
4602
  int32_t deployNum = 0;
×
4603
  SStmTaskAction task = {0};
×
4604
  
4605
  while (true) {
4606
    pIter = taosHashIterate(pSnode->streamTasks, pIter);
×
4607
    if (NULL == pIter) {
×
4608
      break;
×
4609
    }
4610

4611
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4612
    
4613
    task.streamId = streamId;
×
4614
    
4615
    pStream = (SStmSnodeStreamStatus*)pIter;
×
4616
    if (pStream->trigger) {
×
4617
      int8_t stopped = atomic_load_8(&((SStmStatus*)pStream->trigger->pStream)->stopped);
×
4618
      if (stopped) {
×
4619
        mstsDebug("stream already stopped %d, ignore it", stopped);
×
4620
        continue;
×
4621
      }
4622

4623
      mstsInfo("snode lost with trigger task %" PRIx64 ", will try to restart current stream", pStream->trigger->id.taskId);
×
4624
      
4625
      msmStopStreamByError(streamId, NULL, TSDB_CODE_MND_STREAM_SNODE_LOST, mStreamMgmt.hCtx.currentTs);
×
4626
    } else {
4627
      msmHandleRunnerRedeploy(streamId, pStream, &task.deployNum, task.deployId);
×
4628
      
4629
      if (task.deployNum > 0) {
×
4630
        //task.triggerStatus = pStream->trigger;
4631
        task.multiRunner = true;
×
4632
        task.type = STREAM_RUNNER_TASK;
×
4633
        
4634
        mstPostTaskAction(mStreamMgmt.actionQ, &task, STREAM_ACT_DEPLOY);
×
4635
        
4636
        mstsInfo("runner tasks %d redeploys added to actionQ", task.deployNum);
×
4637
      }
4638
    }
4639
  }
4640

4641
  taosHashClear(pSnode->streamTasks);
×
4642
}
×
4643

4644

4645
void msmCheckSnodeStreamStatus(SHashObj* pStreams) {
×
4646
  void* pIter = NULL;
×
4647
  SStmSnodeStreamStatus* pSnode = NULL;
×
4648
  int64_t streamId = 0;
×
4649
  
4650
  while (true) {
4651
    pIter = taosHashIterate(pStreams, pIter);
×
4652
    if (NULL == pIter) {
×
4653
      break;
×
4654
    }
4655

4656
    streamId = *(int64_t*)taosHashGetKey(pIter, NULL);
×
4657
    pSnode = (SStmSnodeStreamStatus*)pIter;
×
4658

4659
    if (NULL != pSnode->trigger) {
×
4660
      msmCheckTaskListStatus(streamId, &pSnode->trigger, 1);
×
4661
    }
4662

4663
    for (int32_t i = 0; i < MND_STREAM_RUNNER_DEPLOY_NUM; ++i) {
×
4664
      int32_t taskNum = taosArrayGetSize(pSnode->runners[i]);
×
4665
      if (taskNum > 0) {
×
4666
        msmCheckTaskListStatus(streamId, taosArrayGet(pSnode->runners[i], 0), taskNum);
×
4667
      }
4668
    }
4669
  }
4670
}
×
4671

4672

4673
void msmCheckSnodeStatus(SMnode *pMnode) {
×
4674
  void* pIter = NULL;
×
4675
  
4676
  while (true) {
×
4677
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
×
4678
    if (NULL == pIter) {
×
4679
      break;
×
4680
    }
4681

4682
    int32_t snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
×
4683
    if ((snodeId % MND_STREAM_ISOLATION_PERIOD_NUM) != mStreamMgmt.hCtx.slotIdx) {
×
4684
      continue;
×
4685
    }
4686

4687
    mstDebug("start to check snode %d status, currTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs);
×
4688
    
4689
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
×
4690
    if (NULL == pSnode->streamTasks) {
×
4691
      mstDebug("ignore snode %d health check since empty tasks", snodeId);
×
4692
      continue;
×
4693
    }
4694
    
4695
    if (MST_PASS_ISOLATION(pSnode->lastUpTs, 1)) {
×
4696
      mstInfo("snode %d lost, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
×
4697
          snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
4698
      
4699
      msmHandleSnodeLost(pMnode, pSnode);
×
4700
      continue;
×
4701
    }
4702
    
4703
    mstDebug("snode %d online, try to check tasks status, currTs:%" PRId64 ", lastUpTs:%" PRId64, snodeId, mStreamMgmt.hCtx.currentTs, pSnode->lastUpTs);
×
4704

4705
    msmCheckSnodeStreamStatus(pSnode->streamTasks);
×
4706
  }
4707
}
×
4708

4709

4710
void msmCheckTasksStatus(SMnode *pMnode) {
×
4711
  mstDebug("start to check tasks status, currTs:%" PRId64, mStreamMgmt.hCtx.currentTs);
×
4712

4713
  msmCheckVgroupStatus(pMnode);
×
4714
  msmCheckSnodeStatus(pMnode);
×
4715
}
×
4716

4717
void msmCheckSnodesState(SMnode *pMnode) {
×
4718
  if (!MST_READY_FOR_SNODE_LOOP()) {
×
4719
    return;
×
4720
  }
4721

4722
  mstDebug("ready to check snode loop, lastTs:%" PRId64, mStreamMgmt.lastTs[STM_EVENT_LOOP_SNODE].ts);
×
4723

4724
  void* pIter = NULL;
×
4725
  int32_t snodeId = 0;
×
4726
  while (true) {
×
4727
    pIter = taosHashIterate(mStreamMgmt.snodeMap, pIter);
×
4728
    if (NULL == pIter) {
×
4729
      break;
×
4730
    }
4731

4732
    snodeId = *(int32_t*)taosHashGetKey(pIter, NULL);
×
4733
    if (sdbCheckExists(pMnode->pSdb, SDB_SNODE, &snodeId)) {
×
4734
      continue;
×
4735
    }
4736

4737
    SStmSnodeStatus* pSnode = (SStmSnodeStatus*)pIter;
×
4738
    if (NULL == pSnode->streamTasks) {
×
4739
      mstDebug("snode %d already cleanup, try to rm it", snodeId);
×
4740
      taosHashRemove(mStreamMgmt.snodeMap, &snodeId, sizeof(snodeId));
×
4741
      continue;
×
4742
    }
4743
    
4744
    mstWarn("snode %d lost while streams remain, will redeploy all and rm it, lastUpTs:%" PRId64 ", runnerThreadNum:%d, streamNum:%d", 
×
4745
        snodeId, pSnode->lastUpTs, pSnode->runnerThreadNum, (int32_t)taosHashGetSize(pSnode->streamTasks));
4746
    
4747
    msmHandleSnodeLost(pMnode, pSnode);
×
4748
  }
4749

4750
  MND_STREAM_SET_LAST_TS(STM_EVENT_LOOP_MAP, mStreamMgmt.hCtx.currentTs);
×
4751
}
4752

4753
void msmHealthCheck(SMnode *pMnode) {
12,867✔
4754
  int8_t active = atomic_load_8(&mStreamMgmt.active), state = atomic_load_8(&mStreamMgmt.state);
12,867✔
4755
  if (0 == active || MND_STM_STATE_NORMAL != state) {
12,867!
4756
    mstTrace("ignore health check since active:%d state:%d", active, state);
×
4757
    return;
×
4758
  }
4759

4760
  if (sdbGetSize(pMnode->pSdb, SDB_STREAM) <= 0) {
12,867!
4761
    mstTrace("ignore health check since no stream now");
12,867✔
4762
    return;
12,867✔
4763
  }
4764
  
4765
  mStreamMgmt.hCtx.slotIdx = (mStreamMgmt.hCtx.slotIdx + 1) % MND_STREAM_ISOLATION_PERIOD_NUM;
×
4766
  mStreamMgmt.hCtx.currentTs = taosGetTimestampMs();
×
4767

4768
  mstDebug("start health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
×
4769

4770
  mstWaitLock(&mStreamMgmt.runtimeLock, false);
×
4771
  
4772
  msmCheckStreamsStatus(pMnode);
×
4773
  msmCheckTasksStatus(pMnode);
×
4774
  msmCheckSnodesState(pMnode);
×
4775

4776
  taosWUnLockLatch(&mStreamMgmt.runtimeLock);
×
4777

4778
  mstDebug("end health check, soltIdx:%d, checkStartTs:%" PRId64, mStreamMgmt.hCtx.slotIdx, mStreamMgmt.hCtx.currentTs);
×
4779
}
4780

4781
static bool msmUpdateProfileStreams(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
×
4782
  SStreamObj *pStream = pObj;
×
4783
  if (atomic_load_8(&pStream->userDropped) || atomic_load_8(&pStream->userStopped)) {
×
4784
    return true;
×
4785
  }
4786
  
4787
  pStream->updateTime = *(int64_t*)p1;
×
4788
  
4789
  (*(int32_t*)p2)++;
×
4790
  
4791
  return true;
×
4792
}
4793

4794
int32_t msmGetTriggerTaskAddr(SMnode *pMnode, int64_t streamId, SStreamTaskAddr* pAddr) {
×
4795
  int32_t code = 0;
×
4796
  
4797
  mstWaitLock(&mStreamMgmt.runtimeLock, true);
×
4798
  
4799
  SStmStatus* pStatus = (SStmStatus*)taosHashGet(mStreamMgmt.streamMap, &streamId, sizeof(streamId));
×
4800
  if (NULL == pStatus) {
×
4801
    mstsError("stream not exists in streamMap, streamRemains:%d", taosHashGetSize(mStreamMgmt.streamMap));
×
4802
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4803
    goto _exit;
×
4804
  }
4805

4806
  if (atomic_load_8(&pStatus->stopped)) {
×
4807
    mstsError("stream already stopped, stopped:%d", atomic_load_8(&pStatus->stopped));
×
4808
    code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4809
    goto _exit;
×
4810
  }
4811

4812
  if (pStatus->triggerTask && STREAM_STATUS_RUNNING == pStatus->triggerTask->status) {
×
4813
    pAddr->taskId = pStatus->triggerTask->id.taskId;
×
4814
    pAddr->nodeId = pStatus->triggerTask->id.nodeId;
×
4815
    pAddr->epset = mndGetDnodeEpsetById(pMnode, pAddr->nodeId);
×
4816
    mstsDebug("stream trigger task %" PRIx64 " got with nodeId %d", pAddr->taskId, pAddr->nodeId);
×
4817
    goto _exit;
×
4818
  }
4819

4820
  mstsError("trigger task %p not running, status:%s", pStatus->triggerTask, pStatus->triggerTask ? gStreamStatusStr[pStatus->triggerTask->status] : "unknown");
×
4821
  code = TSDB_CODE_MND_STREAM_NOT_RUNNING;
×
4822

4823
_exit:
×
4824
  
4825
  taosRUnLockLatch(&mStreamMgmt.runtimeLock);
×
4826

4827
  return code;
×
4828
}
4829

4830
int32_t msmInitRuntimeInfo(SMnode *pMnode) {
1,813✔
4831
  int32_t code = TSDB_CODE_SUCCESS;
1,813✔
4832
  int32_t lino = 0;
1,813✔
4833
  int32_t vnodeNum = sdbGetSize(pMnode->pSdb, SDB_VGROUP);
1,813✔
4834
  int32_t snodeNum = sdbGetSize(pMnode->pSdb, SDB_SNODE);
1,813✔
4835
  int32_t dnodeNum = sdbGetSize(pMnode->pSdb, SDB_DNODE);
1,813✔
4836

4837
  MND_STREAM_SET_LAST_TS(STM_EVENT_ACTIVE_BEGIN, taosGetTimestampMs());
3,605✔
4838

4839
  mStreamMgmt.stat.activeTimes++;
1,813✔
4840
  mStreamMgmt.threadNum = tsNumOfMnodeStreamMgmtThreads;
1,813✔
4841
  mStreamMgmt.tCtx = taosMemoryCalloc(mStreamMgmt.threadNum, sizeof(SStmThreadCtx));
1,813!
4842
  if (NULL == mStreamMgmt.tCtx) {
1,813!
4843
    code = terrno;
×
4844
    mstError("failed to initialize the stream runtime tCtx, threadNum:%d, error:%s", mStreamMgmt.threadNum, tstrerror(code));
×
4845
    goto _exit;
×
4846
  }
4847

4848
  mStreamMgmt.actionQ = taosMemoryCalloc(1, sizeof(SStmActionQ));
1,813!
4849
  if (mStreamMgmt.actionQ == NULL) {
1,813!
4850
    code = terrno;
×
4851
    mError("failed to initialize the stream runtime actionQ, error:%s", tstrerror(code));
×
4852
    goto _exit;
×
4853
  }
4854
  
4855
  mStreamMgmt.actionQ->head = taosMemoryCalloc(1, sizeof(SStmQNode));
1,813!
4856
  TSDB_CHECK_NULL(mStreamMgmt.actionQ->head, code, lino, _exit, terrno);
1,813!
4857
  
4858
  mStreamMgmt.actionQ->tail = mStreamMgmt.actionQ->head;
1,813✔
4859
  
4860
  for (int32_t i = 0; i < mStreamMgmt.threadNum; ++i) {
10,854✔
4861
    SStmThreadCtx* pCtx = mStreamMgmt.tCtx + i;
9,041✔
4862

4863
    for (int32_t m = 0; m < STREAM_MAX_GROUP_NUM; ++m) {
54,246✔
4864
      pCtx->deployStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
45,205✔
4865
      if (pCtx->deployStm[m] == NULL) {
45,205!
4866
        code = terrno;
×
4867
        mError("failed to initialize the stream runtime deployStm[%d][%d], error:%s", i, m, tstrerror(code));
×
4868
        goto _exit;
×
4869
      }
4870
      taosHashSetFreeFp(pCtx->deployStm[m], tDeepFreeSStmStreamDeploy);
45,205✔
4871
      
4872
      pCtx->actionStm[m] = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
45,205✔
4873
      if (pCtx->actionStm[m] == NULL) {
45,205!
4874
        code = terrno;
×
4875
        mError("failed to initialize the stream runtime actionStm[%d][%d], error:%s", i, m, tstrerror(code));
×
4876
        goto _exit;
×
4877
      }
4878
      taosHashSetFreeFp(pCtx->actionStm[m], mstDestroySStmAction);
45,205✔
4879
    }
4880
  }
4881
  
4882
  mStreamMgmt.streamMap = taosHashInit(MND_STREAM_DEFAULT_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,813✔
4883
  if (mStreamMgmt.streamMap == NULL) {
1,813!
4884
    code = terrno;
×
4885
    mError("failed to initialize the stream runtime streamMap, error:%s", tstrerror(code));
×
4886
    goto _exit;
×
4887
  }
4888
  taosHashSetFreeFp(mStreamMgmt.streamMap, mstDestroySStmStatus);
1,813✔
4889
  
4890
  mStreamMgmt.taskMap = taosHashInit(MND_STREAM_DEFAULT_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,813✔
4891
  if (mStreamMgmt.taskMap == NULL) {
1,813!
4892
    code = terrno;
×
4893
    mError("failed to initialize the stream runtime taskMap, error:%s", tstrerror(code));
×
4894
    goto _exit;
×
4895
  }
4896
  
4897
  mStreamMgmt.vgroupMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,813✔
4898
  if (mStreamMgmt.vgroupMap == NULL) {
1,813!
4899
    code = terrno;
×
4900
    mError("failed to initialize the stream runtime vgroupMap, error:%s", tstrerror(code));
×
4901
    goto _exit;
×
4902
  }
4903
  taosHashSetFreeFp(mStreamMgmt.vgroupMap, mstDestroySStmVgroupStatus);
1,813✔
4904

4905
  mStreamMgmt.snodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,813✔
4906
  if (mStreamMgmt.snodeMap == NULL) {
1,813!
4907
    code = terrno;
×
4908
    mError("failed to initialize the stream runtime snodeMap, error:%s", tstrerror(code));
×
4909
    goto _exit;
×
4910
  }
4911
  taosHashSetFreeFp(mStreamMgmt.snodeMap, mstDestroySStmSnodeStatus);
1,813✔
4912
  
4913
  mStreamMgmt.dnodeMap = taosHashInit(dnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,813✔
4914
  if (mStreamMgmt.dnodeMap == NULL) {
1,813!
4915
    code = terrno;
×
4916
    mError("failed to initialize the stream runtime dnodeMap, error:%s", tstrerror(code));
×
4917
    goto _exit;
×
4918
  }
4919

4920
  mStreamMgmt.toDeployVgMap = taosHashInit(vnodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,813✔
4921
  if (mStreamMgmt.toDeployVgMap == NULL) {
1,813!
4922
    code = terrno;
×
4923
    mError("failed to initialize the stream runtime toDeployVgMap, error:%s", tstrerror(code));
×
4924
    goto _exit;
×
4925
  }
4926
  taosHashSetFreeFp(mStreamMgmt.toDeployVgMap, mstDestroySStmVgTasksToDeploy);
1,813✔
4927
  
4928
  mStreamMgmt.toDeploySnodeMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,813✔
4929
  if (mStreamMgmt.toDeploySnodeMap == NULL) {
1,813!
4930
    code = terrno;
×
4931
    mError("failed to initialize the stream runtime toDeploySnodeMap, error:%s", tstrerror(code));
×
4932
    goto _exit;
×
4933
  }
4934
  taosHashSetFreeFp(mStreamMgmt.toDeploySnodeMap, mstDestroySStmSnodeTasksDeploy);
1,813✔
4935

4936
  mStreamMgmt.toUpdateScanMap = taosHashInit(snodeNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,813✔
4937
  if (mStreamMgmt.toUpdateScanMap == NULL) {
1,813!
4938
    code = terrno;
×
4939
    mError("failed to initialize the stream runtime toUpdateScanMap, error:%s", tstrerror(code));
×
4940
    goto _exit;
×
4941
  }
4942
  taosHashSetFreeFp(mStreamMgmt.toUpdateScanMap, mstDestroyScanAddrList);
1,813✔
4943

4944
  TAOS_CHECK_EXIT(msmSTAddSnodesToMap(pMnode));
1,813!
4945
  TAOS_CHECK_EXIT(msmSTAddDnodesToMap(pMnode));
1,813!
4946

4947
  mStreamMgmt.lastTaskId = 1;
1,813✔
4948

4949
  int32_t activeStreamNum = 0;
1,813✔
4950
  sdbTraverse(pMnode->pSdb, SDB_STREAM, msmUpdateProfileStreams, &MND_STREAM_GET_LAST_TS(STM_EVENT_ACTIVE_BEGIN), &activeStreamNum, NULL);
1,813✔
4951

4952
  if (activeStreamNum > 0) {
1,813!
4953
    msmSetInitRuntimeState(MND_STM_STATE_WATCH);
×
4954
  } else {
4955
    msmSetInitRuntimeState(MND_STM_STATE_NORMAL);
1,813✔
4956
  }
4957

4958
_exit:
1,813✔
4959

4960
  if (code) {
1,813!
4961
    msmDestroyRuntimeInfo(pMnode);
×
4962
  }
4963

4964
  return code;
1,813✔
4965
}
4966

4967

4968

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc