• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4488

12 Jul 2025 07:47AM UTC coverage: 62.207% (-0.7%) from 62.948%
#4488

push

travis-ci

web-flow
docs: update stream docs (#31822)

157961 of 324087 branches covered (48.74%)

Branch coverage included in aggregate %.

244465 of 322830 relevant lines covered (75.73%)

6561668.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.27
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

30
static bool hasCountWindowNode(SPhysiNode* pNode) {
4,004✔
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
4,004✔
32
    return true;
2✔
33
  } else {
34
    size_t size = LIST_LENGTH(pNode->pChildren);
4,002✔
35

36
    for (int32_t i = 0; i < size; ++i) {
6,222✔
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
2,220✔
38
      if (hasCountWindowNode(pChild)) {
2,220!
39
        return true;
×
40
      }
41
    }
42

43
    return false;
4,002✔
44
  }
45
}
46

47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
1,784✔
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
1,784✔
49
}
50

51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
10✔
52
                           int64_t watermark, int64_t deleteMark) {
53
  int32_t     code = 0;
10✔
54
  SNode*      pAst = NULL;
10✔
55
  SQueryPlan* pPlan = NULL;
10✔
56

57
  if (nodesStringToNode(ast, &pAst) < 0) {
10!
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
10!
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

67
  SPlanContext cxt = {
10✔
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
10!
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
10!
83
  if (levelNum != 1) {
10!
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
10✔
88

89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
10!
90
  if (opNum != 1) {
10!
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
10✔
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
10!
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

101
END:
10✔
102
  if (pAst) nodesDestroyNode(pAst);
10!
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
10!
104
  TAOS_RETURN(code);
10✔
105
}
106
#ifdef USE_STREAM
107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
3,217✔
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
3,217✔
109

110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
3,217!
111

112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
3,217!
113
    pInfo->type = TASK_OUTPUT__SMA;
×
114
    pInfo->smaSink.smaId = pStream->smaId;
×
115
  } else {
116
    pInfo->type = TASK_OUTPUT__TABLE;
3,217✔
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
3,217✔
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
3,217✔
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
3,217!
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
3,217!
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

125
  return 0;
3,217✔
126
}
127

128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
2,689✔
129
                                        SStreamTask* pTask) {
130
  int32_t code = 0;
2,689✔
131
  bool isShuffle = false;
2,689✔
132

133
  if (pStream->fixedSinkVgId == 0) {
2,689!
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
2,689✔
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
2,689!
136
      isShuffle = true;
2,633✔
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
2,633✔
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
2,633✔
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
2,633!
140
    }
141

142
    sdbRelease(pMnode->pSdb, pDb);
2,689✔
143
  }
144

145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
2,689✔
146

147
  if (isShuffle) {
2,689✔
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
2,633✔
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
2,633✔
150

151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
2,633✔
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
10,496✔
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
7,863✔
154

155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
17,563!
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
17,563✔
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
17,563✔
158
          pVgInfo->taskId = pSinkTask->id.taskId;
7,863✔
159
          break;
7,863✔
160
        }
161
      }
162
    }
163
  } else {
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
56✔
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
56✔
166
  }
167

168
  TAOS_RETURN(code);
2,689✔
169
}
170

171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
3,489✔
172
  int32_t msgLen;
173

174
  pTask->info.nodeId = pVgroup->vgId;
3,489✔
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,489✔
176

177
  plan->execNode.nodeId = pTask->info.nodeId;
3,489✔
178
  plan->execNode.epSet = pTask->info.epSet;
3,489✔
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
3,489✔
180
}
181

182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
153✔
183
  SSnodeObj* pObj = NULL;
153✔
184
  void*      pIter = NULL;
153✔
185
  // TODO random fetch
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
153✔
187
  sdbCancelFetch(pMnode->pSdb, pIter);
153✔
188
  return pObj;
153✔
189
}
190

191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
12✔
192
  int32_t msgLen;
193

194
  pTask->info.nodeId = SNODE_HANDLE;
12✔
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
12✔
196

197
  plan->execNode.nodeId = SNODE_HANDLE;
12✔
198
  plan->execNode.epSet = pTask->info.epSet;
12✔
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
12!
200

201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
12✔
202
}
203

204
// random choose a node to do compute
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
144✔
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
144✔
207
  if (pDbObj == NULL) {
144!
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

212
  if (pStream->indexForMultiAggBalance == -1) {
144✔
213
    taosSeedRand(taosSafeRand());
132✔
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
132✔
215
  }
216

217
  int32_t index = 0;
144✔
218
  void*   pIter = NULL;
144✔
219
  SVgObj* pVgroup = NULL;
144✔
220
  while (1) {
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
560✔
222
    if (pIter == NULL) break;
560!
223
    if (pVgroup->mountVgId) {
560!
224
      sdbRelease(pMnode->pSdb, pVgroup);
×
225
      continue;
×
226
    }
227
    if (pVgroup->dbUid != pStream->sourceDbUid) {
560✔
228
      sdbRelease(pMnode->pSdb, pVgroup);
257✔
229
      continue;
257✔
230
    }
231
    if (index++ == pStream->indexForMultiAggBalance) {
303✔
232
      pStream->indexForMultiAggBalance++;
144✔
233
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
144✔
234
      sdbCancelFetch(pMnode->pSdb, pIter);
144✔
235
      break;
144✔
236
    }
237
    sdbRelease(pMnode->pSdb, pVgroup);
159✔
238
  }
239
  sdbRelease(pMnode->pSdb, pDbObj);
144✔
240

241
  return pVgroup;
144✔
242
}
243

244
static void streamGetUidTaskList(SStreamObj* pStream, EStreamTaskType type, uint64_t* pUid, SArray*** pTaskList) {
6,451✔
245
  if (type == STREAM_NORMAL_TASK) {
6,451✔
246
    *pUid = pStream->uid;
4,647✔
247
    *pTaskList = taosArrayGetLast(pStream->pTaskList);
4,647✔
248
  } else if (type == STREAM_HISTORY_TASK || type == STREAM_RECALCUL_TASK) {
1,804!
249
    *pUid = pStream->hTaskUid;
1,804✔
250
    *pTaskList = taosArrayGetLast(pStream->pHTaskList);
1,804✔
251
  }
252
}
6,451✔
253

254
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, EStreamTaskType type) {
2,950✔
255
  uint64_t uid = 0;
2,950✔
256
  SArray** pTaskList = NULL;
2,950✔
257
  streamGetUidTaskList(pStream, type, &uid, &pTaskList);
2,950✔
258

259
  SStreamTask* pTask = NULL;
2,950✔
260
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, type, pStream->conf.trigger, 0, *pTaskList, pStream->conf.fillHistory,
2,950✔
261
                                pStream->subTableWithoutMd5, 1, &pTask);
2,950✔
262
  if (code != 0) {
2,950!
263
    return code;
×
264
  }
265

266
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
2,950!
267

268
  pTask->info.nodeId = pVgroup->vgId;
2,950✔
269
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2,950✔
270
  return mndSetSinkTaskInfo(pStream, pTask);
2,950✔
271
}
272

273
bool needHistoryTask(SStreamObj* pStream) {
13,024✔
274
  return (pStream->conf.fillHistory) || (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE);
13,024!
275
}
276

277
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
2,078✔
278
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, STREAM_NORMAL_TASK);
2,078✔
279
  if (code != 0) {
2,078!
280
    return code;
×
281
  }
282

283
  if (needHistoryTask(pStream)) {
2,078✔
284
    EStreamTaskType type = (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) ? STREAM_RECALCUL_TASK
1,744✔
285
                                                                                             : STREAM_HISTORY_TASK;
872!
286
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, type);
872✔
287
    if (code != 0) {
872!
288
      return code;
×
289
    }
290
  }
291
  return TDB_CODE_SUCCESS;
2,078✔
292
}
293

294
// create sink node for each vgroup.
295
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
804✔
296
  SSdb* pSdb = pMnode->pSdb;
804✔
297
  void* pIter = NULL;
804✔
298

299
  while (1) {
3,132✔
300
    SVgObj* pVgroup = NULL;
3,936✔
301
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
3,936✔
302
    if (pIter == NULL) {
3,936✔
303
      break;
804✔
304
    }
305
    if (pVgroup->mountVgId) {
3,132!
306
      sdbRelease(pSdb, pVgroup);
×
307
      continue;
1,054✔
308
    }
309

310
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
3,132✔
311
      sdbRelease(pSdb, pVgroup);
1,054✔
312
      continue;
1,054✔
313
    }
314

315
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
2,078✔
316
    if (code != 0) {
2,078!
317
      sdbRelease(pSdb, pVgroup);
×
318
      return code;
×
319
    }
320

321
    sdbRelease(pSdb, pVgroup);
2,078✔
322
  }
323

324
  return TDB_CODE_SUCCESS;
804✔
325
}
326

327
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
3,308✔
328
  int32_t size = (int32_t) taosArrayGetSize(pList);
3,308✔
329
  for (int32_t i = 0; i < size; ++i) {
5,026✔
330
    SVgroupVer* pVer = taosArrayGet(pList, i);
3,026✔
331
    if (pVer->vgId == vgId) {
3,026✔
332
      return pVer->ver;
1,308✔
333
    }
334
  }
335

336
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
2,000!
337
  return 1;
2,000✔
338
}
339

340
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
3,308✔
341
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
3,308✔
342
  if (latestVer < 0) {
3,308!
343
    latestVer = 0;
×
344
  }
345

346
  // set the correct ts, which is the last key of queried table.
347
  SDataRange*  pRange = &pTask->dataRange;
3,308✔
348
  STimeWindow* pWindow = &pRange->window;
3,308✔
349

350
  if (pTask->info.fillHistory == STREAM_HISTORY_TASK) {
3,308✔
351
    pWindow->skey = INT64_MIN;
892✔
352
    pWindow->ekey = skey - 1;
892✔
353

354
    pRange->range.minVer = 0;
892✔
355
    pRange->range.maxVer = latestVer;
892✔
356
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
892!
357
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
358
  } else {
359
    pWindow->skey = skey;
2,416✔
360
    pWindow->ekey = INT64_MAX;
2,416✔
361

362
    pRange->range.minVer = latestVer + 1;
2,416✔
363
    pRange->range.maxVer = INT64_MAX;
2,416✔
364

365
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
2,416!
366
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
367
  }
368
}
3,308✔
369

370
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
1,784✔
371
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
1,784✔
372

373
  if (hasCountWindowNode && (!isFillhistoryTask)) {
1,784✔
374
    SStreamStatus* pStatus = &pTask->status;
1✔
375
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
1!
376
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
377
    pStatus->taskStatus = TASK_STATUS__HALT;
1✔
378
  }
379
}
1,784✔
380

381
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, EStreamTaskType type, bool useTriggerParam,
3,308✔
382
                               int8_t hasAggTasks, SStreamTask** pTask, SArray* pSourceTaskList) {
383
  uint64_t uid = 0;
3,308✔
384
  SArray** pTaskList = NULL;
3,308✔
385
  if (pSourceTaskList) {
3,308!
386
    uid = pStream->uid;
×
387
    pTaskList = &pSourceTaskList;
×
388
  } else {
389
    streamGetUidTaskList(pStream, type, &uid, &pTaskList);
3,308✔
390
  }
391

392
  int32_t trigger = 0;
3,308✔
393
  if (type == STREAM_RECALCUL_TASK) {
3,308!
394
    trigger = STREAM_TRIGGER_WINDOW_CLOSE;
×
395
  } else {
396
    trigger = pStream->conf.trigger;
3,308✔
397
  }
398

399
  int32_t triggerParam = useTriggerParam ? pStream->conf.triggerParam : 0;
3,308✔
400
  int32_t code =
401
      tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, type, trigger, triggerParam,
3,308✔
402
                     *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5,
3,308✔
403
                     hasAggTasks, pTask);
404

405
  return code;
3,308✔
406
}
407

408
static int32_t addNewTaskList(SStreamObj* pStream) {
2,004✔
409
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
2,004✔
410
  if (pTaskList == NULL) {
2,004!
411
    mError("failed init task list, code:%s", tstrerror(terrno));
×
412
    return terrno;
×
413
  }
414

415
  if (taosArrayPush(pStream->pTaskList, &pTaskList) == NULL) {
4,008!
416
    mError("failed to put into array, code:%s", tstrerror(terrno));
×
417
    return terrno;
×
418
  }
419

420
  if (needHistoryTask(pStream)) {
2,004✔
421
    pTaskList = taosArrayInit(0, POINTER_BYTES);
703✔
422
    if (pTaskList == NULL) {
703!
423
      mError("failed init history task list, code:%s", tstrerror(terrno));
×
424
      return terrno;
×
425
    }
426

427
    if (taosArrayPush(pStream->pHTaskList, &pTaskList) == NULL) {
1,406!
428
      mError("failed to put into array, code:%s", tstrerror(terrno));
×
429
      return terrno;
×
430
    }
431
  }
432

433
  return TSDB_CODE_SUCCESS;
2,004✔
434
}
435

436
// set the history task id
437
static void setHTasksId(SStreamObj* pStream) {
709✔
438
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->pTaskList);
709✔
439
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTaskList);
709✔
440

441
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
2,521✔
442
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
1,812✔
443
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
1,812✔
444

445
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
1,812✔
446
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
1,812✔
447

448
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
1,812✔
449
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
1,812✔
450

451
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
1,812!
452
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
453
  }
454
}
709✔
455

456
static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks, SSHashObj* pVtables) {
×
457
  int32_t code = 0;
×
458
  int32_t lino = 0;
×
459
  int32_t taskNum = tSimpleHashGetSize(pVgTasks);
×
460
  int32_t tbNum = tSimpleHashGetSize(pVtables);
×
461

462
  SSHashObj *pTaskMap = tSimpleHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
463
  TSDB_CHECK_NULL(pTaskMap, code, lino, _end, terrno);
×
464

465
  pTask->outputInfo.type = TASK_OUTPUT__VTABLE_MAP;
×
466
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
×
467
  STaskDispatcherVtableMap *pDispatcher = &pTask->outputInfo.vtableMapDispatcher;
×
468
  pDispatcher->taskInfos = taosArrayInit(taskNum, sizeof(STaskDispatcherFixed));
×
469
  TSDB_CHECK_NULL(pDispatcher->taskInfos, code, lino, _end, terrno);
×
470
  pDispatcher->vtableMap = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
471
  TSDB_CHECK_NULL(pDispatcher->vtableMap, code, lino, _end, terrno);
×
472

473
  int32_t               iter = 0, vgId = 0;
×
474
  uint64_t              uid = 0;
×
475
  void*                 p = NULL;
×
476
  while (NULL != (p = tSimpleHashIterate(pVtables, p, &iter))) {
×
477
    char* vgUid = tSimpleHashGetKey(p, NULL);
×
478
    vgId = *(int32_t*)vgUid;
×
479
    uid = *(uint64_t*)((int32_t*)vgUid + 1);
×
480
    
481
    void *px = tSimpleHashGet(pVgTasks, &vgId, sizeof(vgId));
×
482
    if (NULL == px) {
×
483
      mError("tSimpleHashGet vgId %d not found", vgId);
×
484
      return code;
×
485
    }
486
    SStreamTask* pMergeTask = *(SStreamTask**)px;
×
487
    if (pMergeTask == NULL) {
×
488
      mError("tSimpleHashGet pMergeTask %d not found", vgId);
×
489
      return code;
×
490
    }
491

492
    px = tSimpleHashGet(pTaskMap, &pMergeTask->id.taskId, sizeof(pMergeTask->id.taskId));
×
493
    int32_t idx = 0;
×
494
    if (px == NULL) {
×
495
      STaskDispatcherFixed addr = {
×
496
          .taskId = pMergeTask->id.taskId, .nodeId = pMergeTask->info.nodeId, .epSet = pMergeTask->info.epSet};
×
497
      px = taosArrayPush(pDispatcher->taskInfos, &addr);
×
498
      TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
499
      idx = taosArrayGetSize(pDispatcher->taskInfos) - 1;
×
500
      code = tSimpleHashPut(pTaskMap, &pMergeTask->id.taskId, sizeof(pMergeTask->id.taskId), &idx, sizeof(idx));
×
501
      if (code) {
×
502
        mError("tSimpleHashPut uid to task idx failed, error:%d", code);
×
503
        return code;
×
504
      }
505
    } else {
506
      idx = *(int32_t*)px;
×
507
    }
508

509
    code = tSimpleHashPut(pDispatcher->vtableMap, &uid, sizeof(int64_t), &idx, sizeof(int32_t));
×
510
    if (code) {
×
511
      mError("tSimpleHashPut uid to STaskDispatcherFixed failed, error:%d", code);
×
512
      return code;
×
513
    }
514

515
    code = streamTaskSetUpstreamInfo(pMergeTask, pTask);
×
516
    if (code != TSDB_CODE_SUCCESS) {
×
517
      mError("failed to set upstream info of merge task, error:%d", code);
×
518
      return code;
×
519
    }
520

521
    mDebug("source task[%s,vg:%d] add vtable output map, vuid %" PRIu64 " => [%d, vg:%d]", pTask->id.idStr,
×
522
           pTask->info.nodeId, uid, pMergeTask->id.taskId, pMergeTask->info.nodeId);
523
  }
524

525
_end:
×
526
  if (code != TSDB_CODE_SUCCESS) {
×
527
    mError("source task[%s,vg:%d] add vtable output map failed, lino:%d, error:%s", pTask->id.idStr, pTask->info.nodeId,
×
528
           lino, tstrerror(code));
529
  }
530
  if (pTaskMap != NULL) {
×
531
    tSimpleHashCleanup(pTaskMap);
×
532
  }
533
  return code;
×
534
}
535

536
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
3,308✔
537
                               SArray* pVerList, SVgObj* pVgroup, EStreamTaskType type, bool useTriggerParam,
538
                               int8_t hasAggTasks, SSHashObj* pVgTasks, SArray* pSourceTaskList) {
539
  SStreamTask* pTask = NULL;
3,308✔
540
  int32_t code = buildSourceTask(pStream, pEpset, type, useTriggerParam, hasAggTasks, &pTask, pSourceTaskList);
3,308✔
541
  if (code != TSDB_CODE_SUCCESS) {
3,308!
542
    return code;
×
543
  }
544

545
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, historyTask:%d", pTask->id.idStr, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
3,308!
546

547
  if (needHistoryTask(pStream)) {
3,308✔
548
    haltInitialTaskStatus(pTask, plan, (type == STREAM_HISTORY_TASK));
1,784✔
549
  }
550

551
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
3,308✔
552

553
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
3,308✔
554
  if (code != TSDB_CODE_SUCCESS) {
3,308!
555
    return code;
×
556
  }
557

558
  mTrace("souce task plan:%s", pTask->exec.qmsg);
3,308✔
559

560
  if (pVgTasks) {
3,308!
561
    code = addSourceTaskVTableOutput(pTask, pVgTasks, plan->pVTables);
×
562
  }
563

564
  return code;
3,308✔
565
}
566

567
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
1,054✔
568
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,054!
569
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
1,054✔
570
  if (LIST_LENGTH(inner->pNodeList) != 1) {
1,054!
571
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
572
    return NULL;
×
573
  }
574

575
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
1,054✔
576
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
1,054!
577
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
578
    return NULL;
×
579
  }
580
  return plan;
1,054✔
581
}
582

583
static int32_t doAddMergeTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, SVgObj* pVgroup,
×
584
                              bool isHistoryTask, bool useTriggerParam, int8_t hasAggTasks, SArray* pVtables) {
585
  SStreamTask* pTask = NULL;
×
586
  SArray** pTaskList = taosArrayGetLast(pStream->pTaskList);
×
587

588
  int32_t code = tNewStreamTask(pStream->uid, TASK_LEVEL__MERGE, pEpset, isHistoryTask, pStream->conf.trigger,
×
589
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
×
590
                                pStream->subTableWithoutMd5, hasAggTasks, &pTask);
×
591
  if (code != TSDB_CODE_SUCCESS) {
×
592
    return code;
×
593
  }
594

595
  int32_t vtbNum = taosArrayGetSize(pVtables);
×
596
  pTask->pVTables = taosArrayInit(vtbNum, sizeof(SVCTableMergeInfo));
×
597
  if (NULL == pTask->pVTables) {
×
598
    code = terrno;
×
599
    mError("taosArrayInit %d SVCTableMergeInfo failed, error:%d", vtbNum, terrno);
×
600
    return code;
×
601
  }
602

603
  SVCTableMergeInfo tbInfo;
604
  for (int32_t i = 0; i < vtbNum; ++i) {
×
605
    SVCTableRefCols** pTb = taosArrayGet(pVtables, i);
×
606
    tbInfo.uid = (*pTb)->uid;
×
607
    tbInfo.numOfSrcTbls = (*pTb)->numOfSrcTbls;
×
608
    if (NULL == taosArrayPush(pTask->pVTables, &tbInfo)) {
×
609
      code = terrno;
×
610
      mError("taosArrayPush SVCTableMergeInfo failed, error:%d", terrno);
×
611
      return code;
×
612
    }
613

614
    mDebug("merge task[%s, vg:%d] add vtable info: vuid %" PRIu64 ", numOfSrcTbls:%d", 
×
615
        pTask->id.idStr, pVgroup->vgId, tbInfo.uid, tbInfo.numOfSrcTbls);
616
  }
617

618
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
×
619
  if (code != TSDB_CODE_SUCCESS) {
×
620
    return code;
×
621
  }
622

623
  return TDB_CODE_SUCCESS;
×
624
}
625

626
static SSubplan* getVTbScanSubPlan(const SQueryPlan* pPlan) {
×
627
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
×
628
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 2);
×
629
  if (LIST_LENGTH(inner->pNodeList) != 1) {
×
630
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
631
    return NULL;
×
632
  }
633

634
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
×
635
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
×
636
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
637
    return NULL;
×
638
  }
639
  return plan;
×
640
}
641

642

643
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
264✔
644
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
264✔
645
  if (LIST_LENGTH(inner->pNodeList) != 1) {
264!
646
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
647
    return NULL;
×
648
  }
649

650
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
264✔
651
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
264!
652
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
653
    return NULL;
×
654
  }
655
  return plan;
264✔
656
}
657

658
static int32_t addVTableMergeTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
×
659
                                  bool useTriggerParam, bool hasAggTasks, SCMCreateStreamReq* pCreate) {
660
  SVgObj* pVgroup = NULL;
×
661
  int32_t code = TSDB_CODE_SUCCESS;
×
662
  int32_t vgNum = taosArrayGetSize(pCreate->pVSubTables);
×
663
  
664
  for (int32_t i = 0; i < vgNum; ++i) {
×
665
    SVSubTablesRsp* pVg = (SVSubTablesRsp*)taosArrayGet(pCreate->pVSubTables, i);
×
666
    pVgroup = mndAcquireVgroup(pMnode, pVg->vgId);
×
667
    if (NULL == pVgroup) {
×
668
      mWarn("vnode %d in pVSubTables not found", pVg->vgId);
×
669
      continue;
×
670
    }
671

672
    code = doAddMergeTask(pMnode, plan, pStream, pEpset, pVgroup, false, useTriggerParam, hasAggTasks, pVg->pTables);
×
673
    if (code != 0) {
×
674
      mError("failed to create stream task, code:%s", tstrerror(code));
×
675

676
      mndReleaseVgroup(pMnode, pVgroup);
×
677
      return code;
×
678
    }
679

680
    mndReleaseVgroup(pMnode, pVgroup);
×
681
  }
682

683
  return code;
×
684
}
685

686
static int32_t buildMergeTaskHash(SArray* pMergeTaskList, SSHashObj** ppVgTasks) {
×
687
  int32_t code = 0;
×
688
  int32_t taskNum = taosArrayGetSize(pMergeTaskList);
×
689

690
  *ppVgTasks = tSimpleHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
691
  if (NULL == *ppVgTasks) {
×
692
    code = terrno;
×
693
    mError("tSimpleHashInit %d failed", taskNum);
×
694
    return code;
×
695
  }
696
  
697
  for (int32_t i = 0; i < taskNum; ++i) {
×
698
    SStreamTask* pTask = taosArrayGetP(pMergeTaskList, i);
×
699

700
    code = tSimpleHashPut(*ppVgTasks, &pTask->info.nodeId, sizeof(pTask->info.nodeId), &pTask, POINTER_BYTES);
×
701
    if (code) {
×
702
      mError("tSimpleHashPut %d STaskDispatcherFixed failed", i);
×
703
      return code;
×
704
    }
705
  }
706

707
  return code;
×
708
}
709

710
static int32_t addVTableSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
×
711
                                   int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam, bool hasAggTasks,
712
                                   SCMCreateStreamReq* pCreate, SSHashObj* pVTableMap, SArray* pSourceTaskList,
713
                                   SArray* pMergeTaskList) {
714
  int32_t code = 0;
×
715
  SSHashObj* pVgTasks = NULL;
×
716
  int32_t vgId = 0;
×
717
  int32_t iter = 0;
×
718
  SVgObj* pVgroup = NULL;
×
719
  void* p = NULL;
×
720

721
  code = buildMergeTaskHash(pMergeTaskList, &pVgTasks);
×
722
  if (code) {
×
723
    tSimpleHashCleanup(pVgTasks);
×
724
    return code;
×
725
  }
726
  
727
  while (NULL != (p = tSimpleHashIterate(pVTableMap, p, &iter))) {
×
728
    char* pDbVg = tSimpleHashGetKey(p, NULL);
×
729
    char* pVgStr = strrchr(pDbVg, '.');
×
730
    if (NULL == pVgStr) {
×
731
      mError("Invalid DbVg string: %s", pDbVg);
×
732
      tSimpleHashCleanup(pVgTasks);
×
733
      return TSDB_CODE_MND_INTERNAL_ERROR;
×
734
    }
735

736
    (void)taosStr2int32(pVgStr + 1, &vgId);
×
737
    
738
    pVgroup = mndAcquireVgroup(pMnode, vgId);
×
739
    if (NULL == pVgroup) {
×
740
      mWarn("vnode %d not found", vgId);
×
741
      continue;
×
742
    }
743

744
    plan->pVTables = *(SSHashObj**)p;
×
745
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam,
×
746
                           hasAggTasks, pVgTasks, pSourceTaskList);
747
    plan->pVTables = NULL;
×
748
    if (code != 0) {
×
749
      mError("failed to create stream task, code:%s", tstrerror(code));
×
750

751
      mndReleaseVgroup(pMnode, pVgroup);
×
752
      tSimpleHashCleanup(pVgTasks);
×
753
      return code;
×
754
    }
755

756
    mndReleaseVgroup(pMnode, pVgroup);
×
757
  }
758

759
  tSimpleHashCleanup(pVgTasks);
×
760

761
  return TSDB_CODE_SUCCESS;
×
762
}
763

764
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
1,054✔
765
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam, bool hasAggTasks) {
766
  void*   pIter = NULL;
1,054✔
767
  SSdb*   pSdb = pMnode->pSdb;
1,054✔
768
  int32_t code = addNewTaskList(pStream);
1,054✔
769
  if (code) {
1,054!
770
    return code;
×
771
  }
772

773
  while (1) {
3,630✔
774
    SVgObj* pVgroup = NULL;
4,684✔
775
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
4,684✔
776
    if (pIter == NULL) {
4,684✔
777
      break;
1,054✔
778
    }
779
    if (pVgroup->mountVgId) {
3,630!
780
      sdbRelease(pSdb, pVgroup);
×
781
      continue;
1,214✔
782
    }
783

784
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
3,630✔
785
      sdbRelease(pSdb, pVgroup);
1,214✔
786
      continue;
1,214✔
787
    }
788

789
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, STREAM_NORMAL_TASK,
2,416✔
790
                           useTriggerParam, hasAggTasks, NULL, NULL);
791
    if (code != 0) {
2,416!
792
      mError("failed to create stream task, code:%s", tstrerror(code));
×
793

794
      mndReleaseVgroup(pMnode, pVgroup);
×
795
      return code;
×
796
    }
797

798
    if (needHistoryTask(pStream)) {
2,416✔
799
      EStreamTaskType type = 0;
892✔
800
      if (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE && (pStream->conf.fillHistory == 0)) {
892!
801
        type = STREAM_RECALCUL_TASK; // only the recalculating task
×
802
      } else {
803
        type = STREAM_HISTORY_TASK; // set the fill-history option
892✔
804
      }
805

806
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, type,
892✔
807
                             useTriggerParam, hasAggTasks, NULL, NULL);
808
      if (code != 0) {
892!
809
        sdbRelease(pSdb, pVgroup);
×
810
        return code;
×
811
      }
812
    }
813

814
    sdbRelease(pSdb, pVgroup);
2,416✔
815
  }
816

817
  if (needHistoryTask(pStream)) {
1,054✔
818
    setHTasksId(pStream);
343✔
819
  }
820

821
  return TSDB_CODE_SUCCESS;
1,054✔
822
}
823

824
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, EStreamTaskType type, bool useTriggerParam,
193✔
825
                            SStreamTask** pAggTask) {
826
  *pAggTask = NULL;
193✔
827

828
  uint64_t uid = 0;
193✔
829
  SArray** pTaskList = NULL;
193✔
830
  streamGetUidTaskList(pStream, type, &uid, &pTaskList);
193✔
831

832
  int64_t triggerParam = useTriggerParam? pStream->conf.triggerParam:0;
193✔
833
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, type, pStream->conf.trigger,
193✔
834
                                triggerParam, *pTaskList, pStream->conf.fillHistory,
193✔
835
                                pStream->subTableWithoutMd5, 1, pAggTask);
193✔
836
  return code;
193✔
837
}
838

839
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
193✔
840
                            SSnodeObj* pSnode, EStreamTaskType type, bool useTriggerParam) {
841
  int32_t      code = 0;
193✔
842
  SStreamTask* pTask = NULL;
193✔
843
  const char*  id = NULL;
193✔
844

845
  code = buildAggTask(pStream, pEpset, type, useTriggerParam, &pTask);
193✔
846
  if (code != TSDB_CODE_SUCCESS) {
193!
847
    return code;
×
848
  }
849

850
  id = pTask->id.idStr;
193✔
851
  if (pSnode != NULL) {
193✔
852
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
12✔
853
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, (type == STREAM_HISTORY_TASK));
12!
854
  } else {
855
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
181✔
856
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
181!
857
  }
858
  return code;
193✔
859
}
860

861
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
153✔
862
  SVgObj*    pVgroup = NULL;
153✔
863
  SSnodeObj* pSnode = NULL;
153✔
864
  int32_t    code = 0;
153✔
865
  if (tsDeployOnSnode) {
153!
866
    pSnode = mndSchedFetchOneSnode(pMnode);
153✔
867
    if (pSnode == NULL) {
153✔
868
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
144✔
869
    }
870
  } else {
871
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
872
  }
873

874
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, STREAM_NORMAL_TASK, useTriggerParam);
153✔
875
  if (code != 0) {
153!
876
    goto END;
×
877
  }
878

879
  if (needHistoryTask(pStream)) {
153✔
880
    EStreamTaskType type = 0;
40✔
881
    if (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE && (pStream->conf.fillHistory == 0)) {
40!
882
      type = STREAM_RECALCUL_TASK;  // only the recalculating task
×
883
    } else {
884
      type = STREAM_HISTORY_TASK;  // set the fill-history option
40✔
885
    }
886
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, type, useTriggerParam);
40✔
887
    if (code != 0) {
40!
888
      goto END;
×
889
    }
890

891
    setHTasksId(pStream);
40✔
892
  }
893

894
END:
113✔
895
  if (pSnode != NULL) {
153✔
896
    sdbRelease(pMnode->pSdb, pSnode);
9✔
897
  } else {
898
    sdbRelease(pMnode->pSdb, pVgroup);
144✔
899
  }
900
  return code;
153✔
901
}
902

903
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
804✔
904
  int32_t code = addNewTaskList(pStream);
804✔
905
  if (code) {
804!
906
    return code;
×
907
  }
908

909
  if (pStream->fixedSinkVgId == 0) {
804!
910
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
804✔
911
    if (code != 0) {
804!
912
      return code;
×
913
    }
914
  } else {
915
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
×
916
    if (code != 0) {
×
917
      return code;
×
918
    }
919
  }
920

921
  if (needHistoryTask(pStream)) {
804✔
922
    setHTasksId(pStream);
326✔
923
  }
924

925
  return TDB_CODE_SUCCESS;
804✔
926
}
927

928
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
2,689✔
929
  int32_t code = 0;
2,689✔
930
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
2,689!
931
    mError("failed bind task to sink task since %s", tstrerror(code));
×
932
  }
933
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
10,608✔
934
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
7,919✔
935
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
7,919!
936
      mError("failed bind task to sink task since %s", tstrerror(code));
×
937
    }
938
  }
939
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
2,689!
940
}
2,689✔
941

942
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
171✔
943
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
171✔
944
  SArray** pAggTaskList = taosArrayGetLast(tasks);
171✔
945

946
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
342✔
947
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
171✔
948
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
171✔
949
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
171!
950
  }
951
}
171✔
952

953
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
1,226✔
954
  int32_t code = 0;
1,226✔
955
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
1,226✔
956
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
1,226✔
957

958
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
4,011✔
959
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
2,785✔
960
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
2,785!
961

962
    if (hasExtraSink) {
2,785✔
963
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
2,518✔
964
    } else {
965
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
267!
966
        mError("failed bind task to sink task since %s", tstrerror(code));
×
967
      }
968
    }
969
  }
970
}
1,226✔
971

972
static void bindVtableMergeSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
×
973
  int32_t code = 0;
×
974
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
×
975
  SArray* pMergeTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 2 : SINK_NODE_LEVEL + 1);
×
976

977
  for (int i = 0; i < taosArrayGetSize(pMergeTaskList); i++) {
×
978
    SStreamTask* pMergeTask = taosArrayGetP(pMergeTaskList, i);
×
979
    mDebug("bindVtableMergeSink taskId:%s to sink task list", pMergeTask->id.idStr);
×
980

981
    if (hasExtraSink) {
×
982
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pMergeTask);
×
983
    } else {
984
      if ((code = mndSetSinkTaskInfo(pStream, pMergeTask)) != 0) {
×
985
        mError("failed bind task to sink task since %s", tstrerror(code));
×
986
      }
987
    }
988
  }
989
}
×
990

991

992
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
193✔
993
  int32_t code = 0;
193✔
994
  size_t size = taosArrayGetSize(tasks);
193✔
995
  if (size < 2) {
193!
996
    mError("task list size is less than 2");
×
997
    return;
×
998
  }
999
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
193✔
1000
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
193✔
1001

1002
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
193✔
1003
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
193✔
1004
  for (int i = begin; i < end; i++) {
738✔
1005
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
545✔
1006
    pUpTask->info.selfChildId = i - begin;
545✔
1007
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
545✔
1008
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
545!
1009
      mError("failed bind task to sink task since %s", tstrerror(code));
×
1010
    }
1011
  }
1012
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
193!
1013
}
1014

1015
int32_t tableHashValueComp(void const* lp, void const* rp) {
×
1016
  uint32_t*    key = (uint32_t*)lp;
×
1017
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
1018

1019
  if (*key < pVg->hashBegin) {
×
1020
    return -1;
×
1021
  } else if (*key > pVg->hashEnd) {
×
1022
    return 1;
×
1023
  }
1024

1025
  return 0;
×
1026
}
1027

1028

1029
int dbVgInfoComp(const void* lp, const void* rp) {
×
1030
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
×
1031
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
×
1032
  if (pLeft->hashBegin < pRight->hashBegin) {
×
1033
    return -1;
×
1034
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1035
    return 1;
×
1036
  }
1037

1038
  return 0;
×
1039
}
1040

1041
int32_t getTableVgId(SDBVgHashInfo* dbInfo, int32_t acctId, char* dbFName, int32_t* vgId, char *tbName) {
×
1042
  int32_t code = 0;
×
1043
  int32_t lino = 0;
×
1044
  SVgroupInfo* vgInfo = NULL;
×
1045
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1046
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
×
1047
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
×
1048
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
1049

1050
  if (!dbInfo->vgSorted) {
×
1051
    taosArraySort(dbInfo->vgArray, dbVgInfoComp);
×
1052
    dbInfo->vgSorted = true;
×
1053
  }
1054

1055
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, tableHashValueComp, TD_EQ);
×
1056
  if (NULL == vgInfo) {
×
1057
    qError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
1058
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
1059
    return TSDB_CODE_INVALID_PARA;
×
1060
  }
1061

1062
  *vgId = vgInfo->vgId;
×
1063

1064
_return:
×
1065

1066
  return code;
×
1067
}
1068

1069

1070
static void destroyVSubtableVtb(SSHashObj *pVtable) {
×
1071
  int32_t iter = 0;
×
1072
  void* p = NULL;
×
1073
  while (NULL != (p = tSimpleHashIterate(pVtable, p, &iter))) {
×
1074
    taosArrayDestroy(*(SArray**)p);
×
1075
  }
1076

1077
  tSimpleHashCleanup(pVtable);
×
1078
}
×
1079

1080
static void destroyVSubtableVgHash(SSHashObj *pVg) {
×
1081
  int32_t iter = 0;
×
1082
  SSHashObj** pVtable = NULL;
×
1083
  void* p = NULL;
×
1084
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
1085
    pVtable = (SSHashObj**)p;
×
1086
    destroyVSubtableVtb(*pVtable);
×
1087
  }
1088

1089
  tSimpleHashCleanup(pVg);
×
1090
}
×
1091

1092
static void destroyDbVgroupsHash(SSHashObj *pDbVgs) {
×
1093
  int32_t iter = 0;
×
1094
  SDBVgHashInfo* pVg = NULL;
×
1095
  void* p = NULL;
×
1096
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
×
1097
    pVg = (SDBVgHashInfo*)p;
×
1098
    taosArrayDestroy(pVg->vgArray);
×
1099
  }
1100
  
1101
  tSimpleHashCleanup(pDbVgs);
×
1102
}
×
1103

1104
static int32_t buildDBVgroupsMap(SMnode* pMnode, SSHashObj* pDbVgroup) {
×
1105
  void*   pIter = NULL;
×
1106
  SSdb*   pSdb = pMnode->pSdb;
×
1107
  int32_t code = TSDB_CODE_SUCCESS;
×
1108
  char    key[TSDB_DB_NAME_LEN + 32];
1109
  SArray* pTarget = NULL;
×
1110
  SArray* pNew = NULL;
×
1111
  SDbObj* pDb = NULL;
×
1112
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
×
1113

1114
  while (1) {
×
1115
    SVgObj* pVgroup = NULL;
×
1116
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
×
1117
    if (pIter == NULL) {
×
1118
      break;
×
1119
    }
1120
    if (pVgroup->mountVgId) {
×
1121
      sdbRelease(pSdb, pVgroup);
×
1122
      continue;
×
1123
    }
1124

1125
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
×
1126
    if (NULL == pDbInfo) {
×
1127
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
×
1128
      if (NULL == pNew) {
×
1129
        code = terrno;
×
1130
        mError("taosArrayInit SVGroupHashInfo failed, code:%s", tstrerror(terrno));
×
1131
        sdbRelease(pSdb, pVgroup);
×
1132
        return code;
×
1133
      }      
1134

1135
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
1136
      if (pDb == NULL) {
×
1137
        code = terrno;
×
1138
        mError("mndAcquireDb %s failed, code:%s", pVgroup->dbName, tstrerror(terrno));
×
1139
        sdbRelease(pSdb, pVgroup);
×
1140
        return code;
×
1141
      }
1142

1143
      dbInfo.vgSorted = false;
×
1144
      dbInfo.hashMethod = pDb->cfg.hashMethod;
×
1145
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
×
1146
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
×
1147
      dbInfo.vgArray = pNew;
×
1148
      
1149
      mndReleaseDb(pMnode, pDb);
×
1150

1151
      pTarget = pNew;
×
1152
    } else {
1153
      pTarget = pDbInfo->vgArray;
×
1154
    }
1155

1156
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
×
1157
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
×
1158
      code = terrno;
×
1159
      mError("taosArrayPush SVGroupHashInfo failed, code:%s", tstrerror(terrno));
×
1160
      taosArrayDestroy(pNew);
×
1161
      sdbRelease(pSdb, pVgroup);
×
1162
      return code;
×
1163
    }
1164

1165
    if (NULL == pDbInfo) {
×
1166
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
×
1167
      if (code != 0) {
×
1168
        mError("tSimpleHashPut SDBVgHashInfo failed, code:%s", tstrerror(code));
×
1169
        taosArrayDestroy(pNew);
×
1170
        sdbRelease(pSdb, pVgroup);
×
1171
        return code;
×
1172
      }
1173
      
1174
      pNew = NULL;
×
1175
    }
1176

1177
    sdbRelease(pSdb, pVgroup);
×
1178
  }
1179

1180
  return code;
×
1181
}
1182

1183
static int32_t addVTableToVnode(SSHashObj* pVg, int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SStreamVBuildCtx* pCtx) {
×
1184
  int32_t code = TSDB_CODE_SUCCESS;
×
1185
  int32_t lino = 0;
×
1186
  SSHashObj* pNewVtable = NULL;
×
1187
  SArray* pNewOtable = NULL, *pTarOtable = NULL;
×
1188
  SColIdName col;
1189
  char vId[sizeof(int32_t) + sizeof(uint64_t)];
1190
  *(int32_t*)vId = vvgId;
×
1191
  *(uint64_t*)((int32_t*)vId + 1) = vuid;
×
1192

1193
  pCtx->lastUid = vuid;
×
1194

1195
  SSHashObj** pVtable = (SSHashObj**)tSimpleHashGet(pVg, vId, sizeof(vId));
×
1196
  if (NULL == pVtable) {
×
1197
    pNewVtable = (SSHashObj*)tSimpleHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
1198
    TSDB_CHECK_NULL(pNewVtable, code, lino, _return, terrno);
×
1199
    pNewOtable = taosArrayInit(4, sizeof(SColIdName));
×
1200
    TSDB_CHECK_NULL(pNewOtable, code, lino, _return, terrno);
×
1201
    tSimpleHashSetFreeFp(pNewVtable, tFreeStreamVtbOtbInfo);
×
1202
    col.colId = pCol->colId;
×
1203
    col.colName = taosStrdup(pCol->refColName);
×
1204
    TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);
×
1205
    TSDB_CHECK_NULL(taosArrayPush(pNewOtable, &col), code, lino, _return, terrno);
×
1206
    TSDB_CHECK_CODE(tSimpleHashPut(pNewVtable, pCol->refTableName, strlen(pCol->refTableName) + 1, &pNewOtable, POINTER_BYTES), lino, _return);
×
1207
    TSDB_CHECK_CODE(tSimpleHashPut(pVg, vId, sizeof(vId), &pNewVtable, POINTER_BYTES), lino, _return);
×
1208

1209
    pCtx->lastVtable = pNewVtable;
×
1210
    pCtx->lastOtable = pNewOtable;
×
1211

1212
    return code;
×
1213
  }
1214
  
1215
  SArray** pOtable = tSimpleHashGet(*pVtable, pCol->refTableName, strlen(pCol->refTableName) + 1);
×
1216
  if (NULL == pOtable) {
×
1217
    pNewOtable = taosArrayInit(4, sizeof(SColIdName));
×
1218
    TSDB_CHECK_NULL(pNewOtable, code, lino, _return, terrno);
×
1219
    pTarOtable = pNewOtable;
×
1220
  } else {
1221
    pTarOtable = *pOtable;
×
1222
  }
1223
  
1224
  col.colId = pCol->colId;
×
1225
  col.colName = taosStrdup(pCol->refColName);
×
1226
  TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);  
×
1227
  TSDB_CHECK_NULL(taosArrayPush(pTarOtable, &col), code, lino, _return, terrno);
×
1228
  if (NULL == pOtable) {
×
1229
    TSDB_CHECK_CODE(tSimpleHashPut(*pVtable, pCol->refTableName, strlen(pCol->refTableName) + 1, &pNewOtable, POINTER_BYTES), lino, _return);
×
1230
  }
1231

1232
  pCtx->lastVtable = *pVtable;
×
1233
  pCtx->lastOtable = pTarOtable;
×
1234

1235
_return:
×
1236

1237
  if (code) {
×
1238
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1239
  }
1240

1241
  return code;
×
1242
}
1243

1244
static int32_t addVgroupToRes(char* fDBName, int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SDBVgHashInfo* pDb, SSHashObj* pRes, SStreamVBuildCtx* pCtx) {
×
1245
  int32_t code = TSDB_CODE_SUCCESS;
×
1246
  int32_t lino = 0;
×
1247
  int32_t vgId = 0;
×
1248
  char dbVgId[TSDB_DB_NAME_LEN + 32];
1249
  SSHashObj *pTarVg = NULL, *pNewVg = NULL;
×
1250
  
1251
  TSDB_CHECK_CODE(getTableVgId(pDb, 1, fDBName, &vgId, pCol->refTableName), lino, _return);
×
1252

1253
  snprintf(dbVgId, sizeof(dbVgId), "%s.%d", pCol->refDbName, vgId);
×
1254

1255
  SSHashObj** pVg = (SSHashObj**)tSimpleHashGet(pRes, dbVgId, strlen(dbVgId) + 1);
×
1256
  if (NULL == pVg) {
×
1257
    pNewVg = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1258
    TSDB_CHECK_NULL(pNewVg, code, lino, _return, terrno);
×
1259
    tSimpleHashSetFreeFp(pNewVg, tFreeStreamVtbVtbInfo);
×
1260
    pTarVg = pNewVg;
×
1261
  } else {
1262
    pTarVg = *pVg;
×
1263
  }
1264

1265
  TSDB_CHECK_CODE(addVTableToVnode(pTarVg, vvgId, vuid, pCol, pCtx), lino, _return);
×
1266

1267
  if (NULL == pVg) {
×
1268
    TSDB_CHECK_CODE(tSimpleHashPut(pRes, dbVgId, strlen(dbVgId) + 1, &pNewVg, POINTER_BYTES), lino, _return);
×
1269
    pNewVg = NULL;
×
1270
  }
1271

1272
  pCtx->lastVg = pTarVg;
×
1273

1274
_return:
×
1275

1276
  if (code) {
×
1277
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1278
  }
1279

1280
  destroyVSubtableVgHash(pNewVg);
×
1281

1282
  return code;
×
1283
}
1284

1285
static int32_t addRefColToMap(int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SSHashObj* pDbVgroups, SSHashObj* pRes, SStreamVBuildCtx* pCtx) {
×
1286
  int32_t code = TSDB_CODE_SUCCESS;
×
1287
  int32_t lino = 0;
×
1288
  bool isLastVtable = vuid == pCtx->lastUid;
×
1289
  SSHashObj* currOtable = NULL;
×
1290
  SColIdName col;
1291
  char fDBName[TSDB_DB_FNAME_LEN];
1292
  
1293
  if (pCtx->lastCol && pCtx->lastCol->refDbName[0] == pCol->refDbName[0] && pCtx->lastCol->refTableName[0] == pCol->refTableName[0] &&
×
1294
     0 == strcmp(pCtx->lastCol->refDbName, pCol->refDbName) && 0 == strcmp(pCtx->lastCol->refTableName, pCol->refTableName)) {
×
1295
    if (isLastVtable) {
×
1296
      col.colId = pCol->colId;
×
1297
      col.colName = taosStrdup(pCol->refColName);
×
1298
      TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);
×
1299
      TSDB_CHECK_NULL(taosArrayPush(pCtx->lastOtable, &col), code, lino, _return, terrno);
×
1300
      return code;
×
1301
    }
1302

1303
    TSDB_CHECK_CODE(addVTableToVnode(pCtx->lastVg, vvgId, vuid, pCol, pCtx), lino, _return);
×
1304
    return code;
×
1305
  }
1306

1307
  snprintf(fDBName, sizeof(fDBName), "1.%s", pCol->refDbName);
×
1308
  SDBVgHashInfo* pDb = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, fDBName, strlen(fDBName) + 1);
×
1309
  if (NULL == pDb) {
×
1310
    mError("refDb %s does not exist", pCol->refDbName);
×
1311
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
1312
    goto _return;
×
1313
  }
1314

1315
  TSDB_CHECK_CODE(addVgroupToRes(fDBName, vvgId, vuid, pCol, pDb, pRes, pCtx), lino, _return);
×
1316

1317
  pCtx->lastCol = pCol;
×
1318

1319
_return:
×
1320

1321
  if (code) {
×
1322
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1323
  }
1324

1325
  return code;
×
1326
}
1327

1328
static int32_t buildVSubtableMap(SMnode* pMnode, SArray* pVSubTables, SSHashObj** ppRes) {
×
1329
  int32_t code = 0;
×
1330
  int32_t lino = 0;
×
1331

1332
  SSHashObj* pDbVgroups = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
1333
  if (NULL == pDbVgroups) {
×
1334
    mError("tSimpleHashInit failed, error:%s", tstrerror(terrno));
×
1335
    return terrno;
×
1336
  }
1337
  
1338
  TAOS_CHECK_EXIT(buildDBVgroupsMap(pMnode, pDbVgroups));
×
1339

1340
  *ppRes = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
1341
  if (NULL == *ppRes) {
×
1342
    code = terrno;
×
1343
    mError("tSimpleHashInit failed, error:%s", tstrerror(terrno));
×
1344
    goto _exit;
×
1345
  }
1346
  tSimpleHashSetFreeFp(*ppRes, tFreeStreamVtbDbVgInfo);
×
1347

1348
  SStreamVBuildCtx ctx = {0};
×
1349
  int32_t vgNum = taosArrayGetSize(pVSubTables);
×
1350
  for (int32_t i = 0; i < vgNum; ++i) {
×
1351
    SVSubTablesRsp* pVgTbs = taosArrayGet(pVSubTables, i);
×
1352
    int32_t tbNum = taosArrayGetSize(pVgTbs->pTables);
×
1353
    for (int32_t n = 0; n < tbNum; ++n) {
×
1354
      SVCTableRefCols* pTb = (SVCTableRefCols*)taosArrayGetP(pVgTbs->pTables, n);
×
1355
      for (int32_t m = 0; m < pTb->numOfColRefs; ++m) {
×
1356
        SRefColInfo* pCol = pTb->refCols + m;
×
1357
        TAOS_CHECK_EXIT(addRefColToMap(pVgTbs->vgId, pTb->uid, pCol, pDbVgroups, *ppRes, &ctx));
×
1358
      }
1359
    }
1360
  }
1361

1362
_exit:
×
1363

1364
  if (code) {
×
1365
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1366
  }
1367

1368
  destroyDbVgroupsHash(pDbVgroups);
×
1369

1370
  return code;
×
1371
}
1372

1373
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, SCMCreateStreamReq* pCreate) {
1,054✔
1374
  int32_t code = 0;
1,054✔
1375
  bool    isVTableStream = (NULL != pCreate->pVSubTables);
1,054✔
1376
  int64_t skey = pCreate->lastTs;
1,054✔
1377
  SArray* pVerList = pCreate->pVgroupVerList;
1,054✔
1378
  SSdb*   pSdb = pMnode->pSdb;
1,054✔
1379
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,054!
1380
  bool    hasExtraSink = false;
1,054✔
1381
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
1,054✔
1382
  SSubplan* plan = NULL;
1,054✔
1383
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
1,054✔
1384

1385
  if (pDbObj == NULL) {
1,054!
1386
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
1387
    TAOS_RETURN(code);
×
1388
  }
1389

1390
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
1,054✔
1391
  sdbRelease(pSdb, pDbObj);
1,054✔
1392

1393
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
1,054!
1394
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
1395

1396
  pStream->pTaskList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,054✔
1397
  pStream->pHTaskList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,054✔
1398
  if (pStream->pTaskList == NULL || pStream->pHTaskList == NULL) {
1,054!
1399
    mError("failed to create stream obj, code:%s", tstrerror(terrno));
×
1400
    return terrno;
×
1401
  }
1402

1403
  if (pCreate->pVSubTables) {
1,054!
1404
    code = buildVSubtableMap(pMnode, pCreate->pVSubTables, &pStream->pVTableMap);
×
1405
    if (TSDB_CODE_SUCCESS != code) {
×
1406
      mError("failed to buildVSubtableMap, code:%s", tstrerror(terrno));
×
1407
      return code;
×
1408
    }
1409
  }
1410

1411
  if ((numOfPlanLevel > 1 && !isVTableStream) || (numOfPlanLevel > 2 && isVTableStream) || externalTargetDB ||
1,054!
1412
      multiTarget || pStream->fixedSinkVgId) {
250!
1413
    // add extra sink
1414
    hasExtraSink = true;
804✔
1415
    code = addSinkTask(pMnode, pStream, pEpset);
804✔
1416
    if (code != TSDB_CODE_SUCCESS) {
804!
1417
      return code;
×
1418
    }
1419
  }
1420

1421
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
1,054✔
1422

1423
  int8_t hasAggTasks = (numOfPlanLevel > 1) ? 1 : 0;  // task level is greater than 1, which means agg existing
1,054✔
1424
  if (pStream->pVTableMap) {
1,054!
1425
    code = addNewTaskList(pStream);
×
1426
    if (code) {
×
1427
      return code;
×
1428
    }
1429

1430
    plan = getVTbScanSubPlan(pPlan);
×
1431
    if (plan == NULL) {
×
1432
      mError("fail to get vtable scan plan");
×
1433
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1434
      if (terrno != 0) code = terrno;
×
1435
      TAOS_RETURN(code);
×
1436
    }
1437

1438
    SArray** pSourceTaskList = taosArrayGetLast(pStream->pTaskList);
×
1439

1440
    code = addNewTaskList(pStream);
×
1441
    if (code) {
×
1442
      return code;
×
1443
    }
1444
    code = addVTableMergeTask(pMnode, plan, pStream, pEpset, (numOfPlanLevel == 1), hasAggTasks, pCreate);
×
1445
    if (code) {
×
1446
      return code;
×
1447
    }
1448

1449
    plan = getScanSubPlan(pPlan);  // source plan
×
1450
    if (plan == NULL) {
×
1451
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1452
      if (terrno != 0) code = terrno;
×
1453
      TAOS_RETURN(code);
×
1454
    }
1455

1456
    SArray** pMergeTaskList = taosArrayGetLast(pStream->pTaskList);
×
1457
    code = addVTableSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1), hasAggTasks,
×
1458
                               pCreate, pStream->pVTableMap, *pSourceTaskList, *pMergeTaskList);
1459
  } else {
1460
    plan = getScanSubPlan(pPlan);  // source plan
1,054✔
1461
    if (plan == NULL) {
1,054!
1462
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1463
      if (terrno != 0) code = terrno;
×
1464
      TAOS_RETURN(code);
×
1465
    }
1466

1467
    code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1), hasAggTasks);
1,054✔
1468
  }
1469
  if (code != TSDB_CODE_SUCCESS) {
1,054!
1470
    return code;
×
1471
  }
1472

1473
  if ((numOfPlanLevel == 1 && !isVTableStream)) {
1,054!
1474
    bindSourceSink(pStream, pMnode, pStream->pTaskList, hasExtraSink);
913✔
1475
    if (needHistoryTask(pStream)) {
913✔
1476
      bindSourceSink(pStream, pMnode, pStream->pHTaskList, hasExtraSink);
313✔
1477
    }
1478
    return TDB_CODE_SUCCESS;
913✔
1479
  }
1480

1481
  if (numOfPlanLevel == 2 && isVTableStream) {
141!
1482
    bindVtableMergeSink(pStream, pMnode, pStream->pTaskList, hasExtraSink);
×
1483
    return TDB_CODE_SUCCESS;
×
1484
  }
1485

1486
  if ((numOfPlanLevel == 3 && !isVTableStream) || (numOfPlanLevel == 4 && isVTableStream)) {
141!
1487
    int32_t idx = isVTableStream ? 2 : 1;
123!
1488
    plan = getAggSubPlan(pPlan, idx);  // middle agg plan
123✔
1489
    if (plan == NULL) {
123!
1490
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1491
      if (terrno != 0) code = terrno;
×
1492
      TAOS_RETURN(code);
×
1493
    }
1494

1495
    do {
5✔
1496
      SArray** list = taosArrayGetLast(pStream->pTaskList);
128✔
1497
      float    size = (float)taosArrayGetSize(*list);
128✔
1498
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
128✔
1499
      if (cnt <= 1) break;
128✔
1500

1501
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
5!
1502
      code = addNewTaskList(pStream);
5✔
1503
      if (code) {
5!
1504
        return code;
×
1505
      }
1506

1507
      for (int j = 0; j < cnt; j++) {
17✔
1508
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
12✔
1509
        if (code != TSDB_CODE_SUCCESS) {
12!
1510
          return code;
×
1511
        }
1512

1513
        bindTwoLevel(pStream->pTaskList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
12✔
1514
        if (needHistoryTask(pStream)) {
12✔
1515
          bindTwoLevel(pStream->pHTaskList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
10✔
1516
        }
1517
      }
1518
    } while (1);
1519
  }
1520

1521
  plan = getAggSubPlan(pPlan, 0);
141✔
1522
  if (plan == NULL) {
141!
1523
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1524
    if (terrno != 0) code = terrno;
×
1525
    TAOS_RETURN(code);
×
1526
  }
1527

1528
  mDebug("doScheduleStream add final agg");
141!
1529
  SArray** list = taosArrayGetLast(pStream->pTaskList);
141✔
1530
  size_t   size = taosArrayGetSize(*list);
141✔
1531

1532
  code = addNewTaskList(pStream);
141✔
1533
  if (code) {
141!
1534
    return code;
×
1535
  }
1536

1537
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
141✔
1538
  if (code != TSDB_CODE_SUCCESS) {
141!
1539
    TAOS_RETURN(code);
×
1540
  }
1541
  bindTwoLevel(pStream->pTaskList, 0, size);
141✔
1542
  if (needHistoryTask(pStream)) {
141✔
1543
    bindTwoLevel(pStream->pHTaskList, 0, size);
30✔
1544
  }
1545

1546
  bindAggSink(pStream, pMnode, pStream->pTaskList);
141✔
1547
  if (needHistoryTask(pStream)) {
141✔
1548
    bindAggSink(pStream, pMnode, pStream->pHTaskList);
30✔
1549
  }
1550
  TAOS_RETURN(code);
141✔
1551
}
1552

1553
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, SCMCreateStreamReq* pCreate) {
1,054✔
1554
  int32_t     code = 0;
1,054✔
1555
  pStream->pPlan = qStringToQueryPlan(pStream->physicalPlan);
1,054✔
1556
  if (pStream->pPlan == NULL) {
1,054!
1557
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
1558
    TAOS_RETURN(code);
×
1559
  }
1560

1561
  SEpSet mnodeEpset = {0};
1,054✔
1562
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
1,054✔
1563

1564
  code = doScheduleStream(pStream, pMnode, pStream->pPlan, &mnodeEpset, pCreate);
1,054✔
1565

1566
  TAOS_RETURN(code);
1,054✔
1567
}
1568
#endif
1569

1570
#ifdef USE_TOPIC
1571
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
508✔
1572
  int32_t     code = 0;
508✔
1573
  SSdb*       pSdb = pMnode->pSdb;
508✔
1574
  SVgObj*     pVgroup = NULL;
508✔
1575
  SQueryPlan* pPlan = NULL;
508✔
1576
  SSubplan*   pSubplan = NULL;
508✔
1577

1578
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
508✔
1579
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
418✔
1580
    if (pPlan == NULL) {
418!
1581
      return TSDB_CODE_QRY_INVALID_INPUT;
×
1582
    }
1583
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
90✔
1584
    SNode* pAst = NULL;
6✔
1585
    code = nodesStringToNode(pTopic->ast, &pAst);
6✔
1586
    if (code != 0) {
6!
1587
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
1588
      return code;
×
1589
    }
1590

1591
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
6✔
1592
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
6✔
1593
    if (code != 0) {
6!
1594
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
1595
      nodesDestroyNode(pAst);
×
1596
      return code;
×
1597
    }
1598
    nodesDestroyNode(pAst);
6✔
1599
  }
1600

1601
  if (pPlan) {
508✔
1602
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
424!
1603
    if (levelNum != 1) {
424!
1604
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
1605
      goto END;
×
1606
    }
1607

1608
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
424✔
1609
    if (pNodeListNode == NULL){
424!
1610
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1611
      goto END;
×
1612
    }
1613
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
424!
1614
    if (opNum != 1) {
424!
1615
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
1616
      goto END;
×
1617
    }
1618

1619
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
424✔
1620
  }
1621

1622
  void* pIter = NULL;
508✔
1623
  while (1) {
2,639✔
1624
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
3,147✔
1625
    if (pIter == NULL) {
3,147✔
1626
      break;
508✔
1627
    }
1628
    if (pVgroup->mountVgId) {
2,639!
1629
      sdbRelease(pSdb, pVgroup);
×
1630
      continue;
1,356✔
1631
    }
1632

1633
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
2,639✔
1634
      sdbRelease(pSdb, pVgroup);
1,356✔
1635
      continue;
1,356✔
1636
    }
1637

1638
    pSub->vgNum++;
1,283✔
1639

1640
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
1,283!
1641
    if (pVgEp == NULL){
1,283!
1642
      code = terrno;
×
1643
      goto END;
×
1644
    }
1645
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,283✔
1646
    pVgEp->vgId = pVgroup->vgId;
1,283✔
1647
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
2,566!
1648
      code = terrno;
×
1649
      taosMemoryFree(pVgEp);
×
1650
      goto END;
×
1651
    }
1652
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
1,283!
1653
    sdbRelease(pSdb, pVgroup);
1,283✔
1654
  }
1655

1656
  if (pSubplan) {
508✔
1657
    int32_t msgLen;
1658

1659
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
424!
1660
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
1661
      goto END;
×
1662
    }
1663
  } else {
1664
    pSub->qmsg = taosStrdup("");
84!
1665
  }
1666

1667
END:
508✔
1668
  qDestroyQueryPlan(pPlan);
508✔
1669
  return code;
508✔
1670
}
1671
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc