• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3819

01 Apr 2025 09:27AM UTC coverage: 34.076% (+0.01%) from 34.065%
#3819

push

travis-ci

happyguoxy
test:alter gcda dir

148544 of 599532 branches covered (24.78%)

Branch coverage included in aggregate %.

222541 of 489451 relevant lines covered (45.47%)

763329.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.86
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

30
static bool hasCountWindowNode(SPhysiNode* pNode) {
8✔
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
8!
32
    return true;
×
33
  } else {
34
    size_t size = LIST_LENGTH(pNode->pChildren);
8✔
35

36
    for (int32_t i = 0; i < size; ++i) {
12✔
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
4✔
38
      if (hasCountWindowNode(pChild)) {
4!
39
        return true;
×
40
      }
41
    }
42

43
    return false;
8✔
44
  }
45
}
46

47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
4✔
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
4✔
49
}
50

51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
×
52
                           int64_t watermark, int64_t deleteMark) {
53
  int32_t     code = 0;
×
54
  SNode*      pAst = NULL;
×
55
  SQueryPlan* pPlan = NULL;
×
56

57
  if (nodesStringToNode(ast, &pAst) < 0) {
×
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
×
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

67
  SPlanContext cxt = {
×
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
×
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
×
83
  if (levelNum != 1) {
×
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
×
88

89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
×
90
  if (opNum != 1) {
×
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
×
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
×
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

101
END:
×
102
  if (pAst) nodesDestroyNode(pAst);
×
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
×
104
  TAOS_RETURN(code);
×
105
}
106
#ifdef USE_STREAM
107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
9✔
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
9✔
109

110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
9!
111

112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
9!
113
    pInfo->type = TASK_OUTPUT__SMA;
×
114
    pInfo->smaSink.smaId = pStream->smaId;
×
115
  } else {
116
    pInfo->type = TASK_OUTPUT__TABLE;
9✔
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
9✔
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
9✔
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
9!
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
9!
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

125
  return 0;
9✔
126
}
127

128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
5✔
129
                                        SStreamTask* pTask) {
130
  int32_t code = 0;
5✔
131
  bool isShuffle = false;
5✔
132

133
  if (pStream->fixedSinkVgId == 0) {
5!
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
5✔
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
5!
136
      isShuffle = true;
5✔
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
5✔
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
5✔
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
5!
140
    }
141

142
    sdbRelease(pMnode->pSdb, pDb);
5✔
143
  }
144

145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
5✔
146

147
  if (isShuffle) {
5!
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
5✔
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
5✔
150

151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
5✔
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
16✔
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
11✔
154

155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
18!
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
18✔
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
18✔
158
          pVgInfo->taskId = pSinkTask->id.taskId;
11✔
159
          break;
11✔
160
        }
161
      }
162
    }
163
  } else {
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
×
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
×
166
  }
167

168
  TAOS_RETURN(code);
5✔
169
}
170

171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
10✔
172
  int32_t msgLen;
173

174
  pTask->info.nodeId = pVgroup->vgId;
10✔
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
10✔
176

177
  plan->execNode.nodeId = pTask->info.nodeId;
10✔
178
  plan->execNode.epSet = pTask->info.epSet;
10✔
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
10✔
180
}
181

182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
2✔
183
  SSnodeObj* pObj = NULL;
2✔
184
  void*      pIter = NULL;
2✔
185
  // TODO random fetch
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
2✔
187
  sdbCancelFetch(pMnode->pSdb, pIter);
2✔
188
  return pObj;
2✔
189
}
190

191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
2✔
192
  int32_t msgLen;
193

194
  pTask->info.nodeId = SNODE_HANDLE;
2✔
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
2✔
196

197
  plan->execNode.nodeId = SNODE_HANDLE;
2✔
198
  plan->execNode.epSet = pTask->info.epSet;
2✔
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
2!
200

201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
2✔
202
}
203

204
// random choose a node to do compute
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
1✔
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
1✔
207
  if (pDbObj == NULL) {
1!
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

212
  if (pStream->indexForMultiAggBalance == -1) {
1!
213
    taosSeedRand(taosSafeRand());
1✔
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
1✔
215
  }
216

217
  int32_t index = 0;
1✔
218
  void*   pIter = NULL;
1✔
219
  SVgObj* pVgroup = NULL;
1✔
220
  while (1) {
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
3✔
222
    if (pIter == NULL) break;
3!
223
    if (pVgroup->dbUid != pStream->sourceDbUid) {
3✔
224
      sdbRelease(pMnode->pSdb, pVgroup);
2✔
225
      continue;
2✔
226
    }
227
    if (index++ == pStream->indexForMultiAggBalance) {
1!
228
      pStream->indexForMultiAggBalance++;
1✔
229
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
1✔
230
      sdbCancelFetch(pMnode->pSdb, pIter);
1✔
231
      break;
1✔
232
    }
233
    sdbRelease(pMnode->pSdb, pVgroup);
×
234
  }
235
  sdbRelease(pMnode->pSdb, pDbObj);
1✔
236

237
  return pVgroup;
1✔
238
}
239

240
static void streamGetUidTaskList(SStreamObj* pStream, EStreamTaskType type, uint64_t* pUid, SArray*** pTaskList) {
21✔
241
  if (type == STREAM_NORMAL_TASK) {
21✔
242
    *pUid = pStream->uid;
16✔
243
    *pTaskList = taosArrayGetLast(pStream->pTaskList);
16✔
244
  } else if (type == STREAM_HISTORY_TASK || type == STREAM_RECALCUL_TASK) {
5!
245
    *pUid = pStream->hTaskUid;
5✔
246
    *pTaskList = taosArrayGetLast(pStream->pHTaskList);
5✔
247
  }
248
}
21✔
249

250
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, EStreamTaskType type) {
9✔
251
  uint64_t uid = 0;
9✔
252
  SArray** pTaskList = NULL;
9✔
253
  streamGetUidTaskList(pStream, type, &uid, &pTaskList);
9✔
254

255
  SStreamTask* pTask = NULL;
9✔
256
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, type, pStream->conf.trigger, 0, *pTaskList, pStream->conf.fillHistory,
9✔
257
                                pStream->subTableWithoutMd5, 1, &pTask);
9✔
258
  if (code != 0) {
9!
259
    return code;
×
260
  }
261

262
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
9!
263

264
  pTask->info.nodeId = pVgroup->vgId;
9✔
265
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
9✔
266
  return mndSetSinkTaskInfo(pStream, pTask);
9✔
267
}
268

269
bool needHistoryTask(SStreamObj* pStream) {
44✔
270
  return (pStream->conf.fillHistory) || (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE);
44!
271
}
272

273
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
7✔
274
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, STREAM_NORMAL_TASK);
7✔
275
  if (code != 0) {
7!
276
    return code;
×
277
  }
278

279
  if (needHistoryTask(pStream)) {
7✔
280
    EStreamTaskType type = (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) ? STREAM_RECALCUL_TASK
4✔
281
                                                                                             : STREAM_HISTORY_TASK;
2!
282
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, type);
2✔
283
    if (code != 0) {
2!
284
      return code;
×
285
    }
286
  }
287
  return TDB_CODE_SUCCESS;
7✔
288
}
289

290
// create sink node for each vgroup.
291
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
3✔
292
  SSdb* pSdb = pMnode->pSdb;
3✔
293
  void* pIter = NULL;
3✔
294

295
  while (1) {
9✔
296
    SVgObj* pVgroup = NULL;
12✔
297
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
12✔
298
    if (pIter == NULL) {
12✔
299
      break;
3✔
300
    }
301

302
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
9✔
303
      sdbRelease(pSdb, pVgroup);
2✔
304
      continue;
2✔
305
    }
306

307
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
7✔
308
    if (code != 0) {
7!
309
      sdbRelease(pSdb, pVgroup);
×
310
      return code;
×
311
    }
312

313
    sdbRelease(pSdb, pVgroup);
7✔
314
  }
315

316
  return TDB_CODE_SUCCESS;
3✔
317
}
318

319
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
9✔
320
  int32_t size = (int32_t) taosArrayGetSize(pList);
9✔
321
  for (int32_t i = 0; i < size; ++i) {
11✔
322
    SVgroupVer* pVer = taosArrayGet(pList, i);
6✔
323
    if (pVer->vgId == vgId) {
6✔
324
      return pVer->ver;
4✔
325
    }
326
  }
327

328
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
5!
329
  return 1;
5✔
330
}
331

332
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
9✔
333
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
9✔
334
  if (latestVer < 0) {
9!
335
    latestVer = 0;
×
336
  }
337

338
  // set the correct ts, which is the last key of queried table.
339
  SDataRange*  pRange = &pTask->dataRange;
9✔
340
  STimeWindow* pWindow = &pRange->window;
9✔
341

342
  if (pTask->info.fillHistory == STREAM_HISTORY_TASK) {
9✔
343
    pWindow->skey = INT64_MIN;
2✔
344
    pWindow->ekey = skey - 1;
2✔
345

346
    pRange->range.minVer = 0;
2✔
347
    pRange->range.maxVer = latestVer;
2✔
348
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
2!
349
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
350
  } else {
351
    pWindow->skey = skey;
7✔
352
    pWindow->ekey = INT64_MAX;
7✔
353

354
    pRange->range.minVer = latestVer + 1;
7✔
355
    pRange->range.maxVer = INT64_MAX;
7✔
356

357
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
7!
358
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
359
  }
360
}
9✔
361

362
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
4✔
363
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
4✔
364

365
  if (hasCountWindowNode && (!isFillhistoryTask)) {
4!
366
    SStreamStatus* pStatus = &pTask->status;
×
367
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
×
368
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
369
    pStatus->taskStatus = TASK_STATUS__HALT;
×
370
  }
371
}
4✔
372

373
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, EStreamTaskType type, bool useTriggerParam,
9✔
374
                               int8_t hasAggTasks, SStreamTask** pTask, SArray* pSourceTaskList) {
375
  uint64_t uid = 0;
9✔
376
  SArray** pTaskList = NULL;
9✔
377
  if (pSourceTaskList) {
9!
378
    uid = pStream->uid;
×
379
    pTaskList = &pSourceTaskList;
×
380
  } else {
381
    streamGetUidTaskList(pStream, type, &uid, &pTaskList);
9✔
382
  }
383

384
  int32_t trigger = 0;
9✔
385
  if (type == STREAM_RECALCUL_TASK) {
9!
386
    trigger = STREAM_TRIGGER_WINDOW_CLOSE;
×
387
  } else {
388
    trigger = pStream->conf.trigger;
9✔
389
  }
390

391
  int32_t triggerParam = useTriggerParam ? pStream->conf.triggerParam : 0;
9✔
392
  int32_t code =
393
      tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, type, trigger, triggerParam,
9✔
394
                     *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5,
9✔
395
                     hasAggTasks, pTask);
396

397
  return code;
9✔
398
}
399

400
static int32_t addNewTaskList(SStreamObj* pStream) {
8✔
401
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
8✔
402
  if (pTaskList == NULL) {
8!
403
    mError("failed init task list, code:%s", tstrerror(terrno));
×
404
    return terrno;
×
405
  }
406

407
  if (taosArrayPush(pStream->pTaskList, &pTaskList) == NULL) {
16!
408
    mError("failed to put into array, code:%s", tstrerror(terrno));
×
409
    return terrno;
×
410
  }
411

412
  if (needHistoryTask(pStream)) {
8✔
413
    pTaskList = taosArrayInit(0, POINTER_BYTES);
3✔
414
    if (pTaskList == NULL) {
3!
415
      mError("failed init history task list, code:%s", tstrerror(terrno));
×
416
      return terrno;
×
417
    }
418

419
    if (taosArrayPush(pStream->pHTaskList, &pTaskList) == NULL) {
6!
420
      mError("failed to put into array, code:%s", tstrerror(terrno));
×
421
      return terrno;
×
422
    }
423
  }
424

425
  return TSDB_CODE_SUCCESS;
8✔
426
}
427

428
// set the history task id
429
static void setHTasksId(SStreamObj* pStream) {
3✔
430
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->pTaskList);
3✔
431
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTaskList);
3✔
432

433
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
8✔
434
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
5✔
435
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
5✔
436

437
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
5✔
438
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
5✔
439

440
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
5✔
441
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
5✔
442

443
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
5!
444
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
445
  }
446
}
3✔
447

448
static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks, SSHashObj* pVtables) {
×
449
  int32_t code = 0;
×
450
  int32_t lino = 0;
×
451
  int32_t taskNum = tSimpleHashGetSize(pVgTasks);
×
452
  int32_t tbNum = tSimpleHashGetSize(pVtables);
×
453

454
  SSHashObj *pTaskMap = tSimpleHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
455
  TSDB_CHECK_NULL(pTaskMap, code, lino, _end, terrno);
×
456

457
  pTask->outputInfo.type = TASK_OUTPUT__VTABLE_MAP;
×
458
  pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
×
459
  STaskDispatcherVtableMap *pDispatcher = &pTask->outputInfo.vtableMapDispatcher;
×
460
  pDispatcher->taskInfos = taosArrayInit(taskNum, sizeof(STaskDispatcherFixed));
×
461
  TSDB_CHECK_NULL(pDispatcher->taskInfos, code, lino, _end, terrno);
×
462
  pDispatcher->vtableMap = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
463
  TSDB_CHECK_NULL(pDispatcher->vtableMap, code, lino, _end, terrno);
×
464

465
  int32_t               iter = 0, vgId = 0;
×
466
  uint64_t              uid = 0;
×
467
  void*                 p = NULL;
×
468
  while (NULL != (p = tSimpleHashIterate(pVtables, p, &iter))) {
×
469
    char* vgUid = tSimpleHashGetKey(p, NULL);
×
470
    vgId = *(int32_t*)vgUid;
×
471
    uid = *(uint64_t*)((int32_t*)vgUid + 1);
×
472
    
473
    void *px = tSimpleHashGet(pVgTasks, &vgId, sizeof(vgId));
×
474
    if (NULL == px) {
×
475
      mError("tSimpleHashGet vgId %d not found", vgId);
×
476
      return code;
×
477
    }
478
    SStreamTask* pMergeTask = *(SStreamTask**)px;
×
479
    if (pMergeTask == NULL) {
×
480
      mError("tSimpleHashGet pMergeTask %d not found", vgId);
×
481
      return code;
×
482
    }
483

484
    px = tSimpleHashGet(pTaskMap, &pMergeTask->id.taskId, sizeof(pMergeTask->id.taskId));
×
485
    int32_t idx = 0;
×
486
    if (px == NULL) {
×
487
      STaskDispatcherFixed addr = {
×
488
          .taskId = pMergeTask->id.taskId, .nodeId = pMergeTask->info.nodeId, .epSet = pMergeTask->info.epSet};
×
489
      px = taosArrayPush(pDispatcher->taskInfos, &addr);
×
490
      TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
491
      idx = taosArrayGetSize(pDispatcher->taskInfos) - 1;
×
492
      code = tSimpleHashPut(pTaskMap, &pMergeTask->id.taskId, sizeof(pMergeTask->id.taskId), &idx, sizeof(idx));
×
493
      if (code) {
×
494
        mError("tSimpleHashPut uid to task idx failed, error:%d", code);
×
495
        return code;
×
496
      }
497
    } else {
498
      idx = *(int32_t*)px;
×
499
    }
500

501
    code = tSimpleHashPut(pDispatcher->vtableMap, &uid, sizeof(int64_t), &idx, sizeof(int32_t));
×
502
    if (code) {
×
503
      mError("tSimpleHashPut uid to STaskDispatcherFixed failed, error:%d", code);
×
504
      return code;
×
505
    }
506

507
    code = streamTaskSetUpstreamInfo(pMergeTask, pTask);
×
508
    if (code != TSDB_CODE_SUCCESS) {
×
509
      mError("failed to set upstream info of merge task, error:%d", code);
×
510
      return code;
×
511
    }
512

513
    mDebug("source task[%s,vg:%d] add vtable output map, vuid %" PRIu64 " => [%d, vg:%d]", pTask->id.idStr,
×
514
           pTask->info.nodeId, uid, pMergeTask->id.taskId, pMergeTask->info.nodeId);
515
  }
516

517
_end:
×
518
  if (code != TSDB_CODE_SUCCESS) {
×
519
    mError("source task[%s,vg:%d] add vtable output map failed, lino:%d, error:%s", pTask->id.idStr, pTask->info.nodeId,
×
520
           lino, tstrerror(code));
521
  }
522
  if (pTaskMap != NULL) {
×
523
    tSimpleHashCleanup(pTaskMap);
×
524
  }
525
  return code;
×
526
}
527

528
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
9✔
529
                               SArray* pVerList, SVgObj* pVgroup, EStreamTaskType type, bool useTriggerParam,
530
                               int8_t hasAggTasks, SSHashObj* pVgTasks, SArray* pSourceTaskList) {
531
  SStreamTask* pTask = NULL;
9✔
532
  int32_t code = buildSourceTask(pStream, pEpset, type, useTriggerParam, hasAggTasks, &pTask, pSourceTaskList);
9✔
533
  if (code != TSDB_CODE_SUCCESS) {
9!
534
    return code;
×
535
  }
536

537
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, historyTask:%d", pTask->id.idStr, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
9!
538

539
  if (needHistoryTask(pStream)) {
9✔
540
    haltInitialTaskStatus(pTask, plan, (type == STREAM_HISTORY_TASK));
4✔
541
  }
542

543
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
9✔
544

545
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
9✔
546
  if (code != TSDB_CODE_SUCCESS) {
9!
547
    return code;
×
548
  }
549

550
  mTrace("souce task plan:%s", pTask->exec.qmsg);
9!
551

552
  if (pVgTasks) {
9!
553
    code = addSourceTaskVTableOutput(pTask, pVgTasks, plan->pVTables);
×
554
  }
555

556
  return code;
9✔
557
}
558

559
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
3✔
560
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
3!
561
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
3✔
562
  if (LIST_LENGTH(inner->pNodeList) != 1) {
3!
563
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
564
    return NULL;
×
565
  }
566

567
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
3✔
568
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
3!
569
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
570
    return NULL;
×
571
  }
572
  return plan;
3✔
573
}
574

575
static int32_t doAddMergeTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, SVgObj* pVgroup,
×
576
                              bool isHistoryTask, bool useTriggerParam, int8_t hasAggTasks, SArray* pVtables) {
577
  SStreamTask* pTask = NULL;
×
578
  SArray** pTaskList = taosArrayGetLast(pStream->pTaskList);
×
579

580
  int32_t code = tNewStreamTask(pStream->uid, TASK_LEVEL__MERGE, pEpset, isHistoryTask, pStream->conf.trigger,
×
581
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
×
582
                                pStream->subTableWithoutMd5, hasAggTasks, &pTask);
×
583
  if (code != TSDB_CODE_SUCCESS) {
×
584
    return code;
×
585
  }
586

587
  int32_t vtbNum = taosArrayGetSize(pVtables);
×
588
  pTask->pVTables = taosArrayInit(vtbNum, sizeof(SVCTableMergeInfo));
×
589
  if (NULL == pTask->pVTables) {
×
590
    code = terrno;
×
591
    mError("taosArrayInit %d SVCTableMergeInfo failed, error:%d", vtbNum, terrno);
×
592
    return code;
×
593
  }
594

595
  SVCTableMergeInfo tbInfo;
596
  for (int32_t i = 0; i < vtbNum; ++i) {
×
597
    SVCTableRefCols** pTb = taosArrayGet(pVtables, i);
×
598
    tbInfo.uid = (*pTb)->uid;
×
599
    tbInfo.numOfSrcTbls = (*pTb)->numOfSrcTbls;
×
600
    if (NULL == taosArrayPush(pTask->pVTables, &tbInfo)) {
×
601
      code = terrno;
×
602
      mError("taosArrayPush SVCTableMergeInfo failed, error:%d", terrno);
×
603
      return code;
×
604
    }
605

606
    mDebug("merge task[%s, vg:%d] add vtable info: vuid %" PRIu64 ", numOfSrcTbls:%d", 
×
607
        pTask->id.idStr, pVgroup->vgId, tbInfo.uid, tbInfo.numOfSrcTbls);
608
  }
609

610
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
×
611
  if (code != TSDB_CODE_SUCCESS) {
×
612
    return code;
×
613
  }
614

615
  return TDB_CODE_SUCCESS;
×
616
}
617

618
static SSubplan* getVTbScanSubPlan(const SQueryPlan* pPlan) {
×
619
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
×
620
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 2);
×
621
  if (LIST_LENGTH(inner->pNodeList) != 1) {
×
622
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
623
    return NULL;
×
624
  }
625

626
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
×
627
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
×
628
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
629
    return NULL;
×
630
  }
631
  return plan;
×
632
}
633

634

635
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
4✔
636
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
4✔
637
  if (LIST_LENGTH(inner->pNodeList) != 1) {
4!
638
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
639
    return NULL;
×
640
  }
641

642
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
4✔
643
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
4!
644
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
645
    return NULL;
×
646
  }
647
  return plan;
4✔
648
}
649

650
static int32_t addVTableMergeTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
×
651
                                  bool useTriggerParam, bool hasAggTasks, SCMCreateStreamReq* pCreate) {
652
  SVgObj* pVgroup = NULL;
×
653
  int32_t code = TSDB_CODE_SUCCESS;
×
654
  int32_t vgNum = taosArrayGetSize(pCreate->pVSubTables);
×
655
  
656
  for (int32_t i = 0; i < vgNum; ++i) {
×
657
    SVSubTablesRsp* pVg = (SVSubTablesRsp*)taosArrayGet(pCreate->pVSubTables, i);
×
658
    pVgroup = mndAcquireVgroup(pMnode, pVg->vgId);
×
659
    if (NULL == pVgroup) {
×
660
      mWarn("vnode %d in pVSubTables not found", pVg->vgId);
×
661
      continue;
×
662
    }
663

664
    code = doAddMergeTask(pMnode, plan, pStream, pEpset, pVgroup, false, useTriggerParam, hasAggTasks, pVg->pTables);
×
665
    if (code != 0) {
×
666
      mError("failed to create stream task, code:%s", tstrerror(code));
×
667

668
      mndReleaseVgroup(pMnode, pVgroup);
×
669
      return code;
×
670
    }
671

672
    mndReleaseVgroup(pMnode, pVgroup);
×
673
  }
674

675
  return code;
×
676
}
677

678
static int32_t buildMergeTaskHash(SArray* pMergeTaskList, SSHashObj** ppVgTasks) {
×
679
  int32_t code = 0;
×
680
  int32_t taskNum = taosArrayGetSize(pMergeTaskList);
×
681

682
  *ppVgTasks = tSimpleHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
683
  if (NULL == *ppVgTasks) {
×
684
    code = terrno;
×
685
    mError("tSimpleHashInit %d failed", taskNum);
×
686
    return code;
×
687
  }
688
  
689
  for (int32_t i = 0; i < taskNum; ++i) {
×
690
    SStreamTask* pTask = taosArrayGetP(pMergeTaskList, i);
×
691

692
    code = tSimpleHashPut(*ppVgTasks, &pTask->info.nodeId, sizeof(pTask->info.nodeId), &pTask, POINTER_BYTES);
×
693
    if (code) {
×
694
      mError("tSimpleHashPut %d STaskDispatcherFixed failed", i);
×
695
      return code;
×
696
    }
697
  }
698

699
  return code;
×
700
}
701

702
static int32_t addVTableSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
×
703
                                   int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam, bool hasAggTasks,
704
                                   SCMCreateStreamReq* pCreate, SSHashObj* pVTableMap, SArray* pSourceTaskList,
705
                                   SArray* pMergeTaskList) {
706
  int32_t code = 0;
×
707
  SSHashObj* pVgTasks = NULL;
×
708
  int32_t vgId = 0;
×
709
  int32_t iter = 0;
×
710
  SVgObj* pVgroup = NULL;
×
711
  void* p = NULL;
×
712

713
  code = buildMergeTaskHash(pMergeTaskList, &pVgTasks);
×
714
  if (code) {
×
715
    tSimpleHashCleanup(pVgTasks);
×
716
    return code;
×
717
  }
718
  
719
  while (NULL != (p = tSimpleHashIterate(pVTableMap, p, &iter))) {
×
720
    char* pDbVg = tSimpleHashGetKey(p, NULL);
×
721
    char* pVgStr = strrchr(pDbVg, '.');
×
722
    if (NULL == pVgStr) {
×
723
      mError("Invalid DbVg string: %s", pDbVg);
×
724
      tSimpleHashCleanup(pVgTasks);
×
725
      return TSDB_CODE_MND_INTERNAL_ERROR;
×
726
    }
727

728
    (void)taosStr2int32(pVgStr + 1, &vgId);
×
729
    
730
    pVgroup = mndAcquireVgroup(pMnode, vgId);
×
731
    if (NULL == pVgroup) {
×
732
      mWarn("vnode %d not found", vgId);
×
733
      continue;
×
734
    }
735

736
    plan->pVTables = *(SSHashObj**)p;
×
737
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam,
×
738
                           hasAggTasks, pVgTasks, pSourceTaskList);
739
    plan->pVTables = NULL;
×
740
    if (code != 0) {
×
741
      mError("failed to create stream task, code:%s", tstrerror(code));
×
742

743
      mndReleaseVgroup(pMnode, pVgroup);
×
744
      tSimpleHashCleanup(pVgTasks);
×
745
      return code;
×
746
    }
747

748
    mndReleaseVgroup(pMnode, pVgroup);
×
749
  }
750

751
  tSimpleHashCleanup(pVgTasks);
×
752

753
  return TSDB_CODE_SUCCESS;
×
754
}
755

756
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
3✔
757
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam, bool hasAggTasks) {
758
  void*   pIter = NULL;
3✔
759
  SSdb*   pSdb = pMnode->pSdb;
3✔
760
  int32_t code = addNewTaskList(pStream);
3✔
761
  if (code) {
3!
762
    return code;
×
763
  }
764

765
  while (1) {
9✔
766
    SVgObj* pVgroup = NULL;
12✔
767
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
12✔
768
    if (pIter == NULL) {
12✔
769
      break;
3✔
770
    }
771

772
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
9✔
773
      sdbRelease(pSdb, pVgroup);
2✔
774
      continue;
2✔
775
    }
776

777
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, STREAM_NORMAL_TASK,
7✔
778
                           useTriggerParam, hasAggTasks, NULL, NULL);
779
    if (code != 0) {
7!
780
      mError("failed to create stream task, code:%s", tstrerror(code));
×
781

782
      mndReleaseVgroup(pMnode, pVgroup);
×
783
      return code;
×
784
    }
785

786
    if (needHistoryTask(pStream)) {
7✔
787
      EStreamTaskType type = 0;
2✔
788
      if (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE && (pStream->conf.fillHistory == 0)) {
2!
789
        type = STREAM_RECALCUL_TASK; // only the recalculating task
×
790
      } else {
791
        type = STREAM_HISTORY_TASK; // set the fill-history option
2✔
792
      }
793

794
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, type,
2✔
795
                             useTriggerParam, hasAggTasks, NULL, NULL);
796
      if (code != 0) {
2!
797
        sdbRelease(pSdb, pVgroup);
×
798
        return code;
×
799
      }
800
    }
801

802
    sdbRelease(pSdb, pVgroup);
7✔
803
  }
804

805
  if (needHistoryTask(pStream)) {
3✔
806
    setHTasksId(pStream);
1✔
807
  }
808

809
  return TSDB_CODE_SUCCESS;
3✔
810
}
811

812
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, EStreamTaskType type, bool useTriggerParam,
3✔
813
                            SStreamTask** pAggTask) {
814
  *pAggTask = NULL;
3✔
815

816
  uint64_t uid = 0;
3✔
817
  SArray** pTaskList = NULL;
3✔
818
  streamGetUidTaskList(pStream, type, &uid, &pTaskList);
3✔
819

820
  int64_t triggerParam = useTriggerParam? pStream->conf.triggerParam:0;
3!
821
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, type, pStream->conf.trigger,
3✔
822
                                triggerParam, *pTaskList, pStream->conf.fillHistory,
3✔
823
                                pStream->subTableWithoutMd5, 1, pAggTask);
3✔
824
  return code;
3✔
825
}
826

827
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
3✔
828
                            SSnodeObj* pSnode, EStreamTaskType type, bool useTriggerParam) {
829
  int32_t      code = 0;
3✔
830
  SStreamTask* pTask = NULL;
3✔
831
  const char*  id = NULL;
3✔
832

833
  code = buildAggTask(pStream, pEpset, type, useTriggerParam, &pTask);
3✔
834
  if (code != TSDB_CODE_SUCCESS) {
3!
835
    return code;
×
836
  }
837

838
  id = pTask->id.idStr;
3✔
839
  if (pSnode != NULL) {
3✔
840
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
2✔
841
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, (type == STREAM_HISTORY_TASK));
2!
842
  } else {
843
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
1✔
844
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
1!
845
  }
846
  return code;
3✔
847
}
848

849
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
2✔
850
  SVgObj*    pVgroup = NULL;
2✔
851
  SSnodeObj* pSnode = NULL;
2✔
852
  int32_t    code = 0;
2✔
853
  if (tsDeployOnSnode) {
2!
854
    pSnode = mndSchedFetchOneSnode(pMnode);
2✔
855
    if (pSnode == NULL) {
2✔
856
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
1✔
857
    }
858
  } else {
859
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
860
  }
861

862
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, STREAM_NORMAL_TASK, useTriggerParam);
2✔
863
  if (code != 0) {
2!
864
    goto END;
×
865
  }
866

867
  if (needHistoryTask(pStream)) {
2✔
868
    EStreamTaskType type = 0;
1✔
869
    if (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE && (pStream->conf.fillHistory == 0)) {
1!
870
      type = STREAM_RECALCUL_TASK;  // only the recalculating task
×
871
    } else {
872
      type = STREAM_HISTORY_TASK;  // set the fill-history option
1✔
873
    }
874
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, type, useTriggerParam);
1✔
875
    if (code != 0) {
1!
876
      goto END;
×
877
    }
878

879
    setHTasksId(pStream);
1✔
880
  }
881

882
END:
1✔
883
  if (pSnode != NULL) {
2✔
884
    sdbRelease(pMnode->pSdb, pSnode);
1✔
885
  } else {
886
    sdbRelease(pMnode->pSdb, pVgroup);
1✔
887
  }
888
  return code;
2✔
889
}
890

891
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
3✔
892
  int32_t code = addNewTaskList(pStream);
3✔
893
  if (code) {
3!
894
    return code;
×
895
  }
896

897
  if (pStream->fixedSinkVgId == 0) {
3!
898
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
3✔
899
    if (code != 0) {
3!
900
      return code;
×
901
    }
902
  } else {
903
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
×
904
    if (code != 0) {
×
905
      return code;
×
906
    }
907
  }
908

909
  if (needHistoryTask(pStream)) {
3✔
910
    setHTasksId(pStream);
1✔
911
  }
912

913
  return TDB_CODE_SUCCESS;
3✔
914
}
915

916
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
5✔
917
  int32_t code = 0;
5✔
918
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
5!
919
    mError("failed bind task to sink task since %s", tstrerror(code));
×
920
  }
921
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
16✔
922
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
11✔
923
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
11!
924
      mError("failed bind task to sink task since %s", tstrerror(code));
×
925
    }
926
  }
927
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
5!
928
}
5✔
929

930
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
3✔
931
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
3✔
932
  SArray** pAggTaskList = taosArrayGetLast(tasks);
3✔
933

934
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
6✔
935
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
3✔
936
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
3✔
937
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
3!
938
  }
939
}
3✔
940

941
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
1✔
942
  int32_t code = 0;
1✔
943
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
1✔
944
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
1✔
945

946
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
3✔
947
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
2✔
948
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
2!
949

950
    if (hasExtraSink) {
2!
951
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
2✔
952
    } else {
953
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
×
954
        mError("failed bind task to sink task since %s", tstrerror(code));
×
955
      }
956
    }
957
  }
958
}
1✔
959

960
static void bindVtableMergeSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
×
961
  int32_t code = 0;
×
962
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
×
963
  SArray* pMergeTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 2 : SINK_NODE_LEVEL + 1);
×
964

965
  for (int i = 0; i < taosArrayGetSize(pMergeTaskList); i++) {
×
966
    SStreamTask* pMergeTask = taosArrayGetP(pMergeTaskList, i);
×
967
    mDebug("bindVtableMergeSink taskId:%s to sink task list", pMergeTask->id.idStr);
×
968

969
    if (hasExtraSink) {
×
970
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pMergeTask);
×
971
    } else {
972
      if ((code = mndSetSinkTaskInfo(pStream, pMergeTask)) != 0) {
×
973
        mError("failed bind task to sink task since %s", tstrerror(code));
×
974
      }
975
    }
976
  }
977
}
×
978

979

980
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
3✔
981
  int32_t code = 0;
3✔
982
  size_t size = taosArrayGetSize(tasks);
3✔
983
  if (size < 2) {
3!
984
    mError("task list size is less than 2");
×
985
    return;
×
986
  }
987
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
3✔
988
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
3✔
989

990
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
3✔
991
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
3!
992
  for (int i = begin; i < end; i++) {
10✔
993
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
7✔
994
    pUpTask->info.selfChildId = i - begin;
7✔
995
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
7✔
996
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
7!
997
      mError("failed bind task to sink task since %s", tstrerror(code));
×
998
    }
999
  }
1000
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
3!
1001
}
1002

1003
int32_t tableHashValueComp(void const* lp, void const* rp) {
×
1004
  uint32_t*    key = (uint32_t*)lp;
×
1005
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
1006

1007
  if (*key < pVg->hashBegin) {
×
1008
    return -1;
×
1009
  } else if (*key > pVg->hashEnd) {
×
1010
    return 1;
×
1011
  }
1012

1013
  return 0;
×
1014
}
1015

1016

1017
int dbVgInfoComp(const void* lp, const void* rp) {
×
1018
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
×
1019
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
×
1020
  if (pLeft->hashBegin < pRight->hashBegin) {
×
1021
    return -1;
×
1022
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1023
    return 1;
×
1024
  }
1025

1026
  return 0;
×
1027
}
1028

1029
int32_t getTableVgId(SDBVgHashInfo* dbInfo, int32_t acctId, char* dbFName, int32_t* vgId, char *tbName) {
×
1030
  int32_t code = 0;
×
1031
  int32_t lino = 0;
×
1032
  SVgroupInfo* vgInfo = NULL;
×
1033
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1034
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
×
1035
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
×
1036
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
1037

1038
  if (!dbInfo->vgSorted) {
×
1039
    taosArraySort(dbInfo->vgArray, dbVgInfoComp);
×
1040
    dbInfo->vgSorted = true;
×
1041
  }
1042

1043
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, tableHashValueComp, TD_EQ);
×
1044
  if (NULL == vgInfo) {
×
1045
    qError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
1046
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
1047
    return TSDB_CODE_INVALID_PARA;
×
1048
  }
1049

1050
  *vgId = vgInfo->vgId;
×
1051

1052
_return:
×
1053

1054
  return code;
×
1055
}
1056

1057

1058
static void destroyVSubtableVtb(SSHashObj *pVtable) {
×
1059
  int32_t iter = 0;
×
1060
  void* p = NULL;
×
1061
  while (NULL != (p = tSimpleHashIterate(pVtable, p, &iter))) {
×
1062
    taosArrayDestroy(*(SArray**)p);
×
1063
  }
1064

1065
  tSimpleHashCleanup(pVtable);
×
1066
}
×
1067

1068
static void destroyVSubtableVgHash(SSHashObj *pVg) {
×
1069
  int32_t iter = 0;
×
1070
  SSHashObj** pVtable = NULL;
×
1071
  void* p = NULL;
×
1072
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
1073
    pVtable = (SSHashObj**)p;
×
1074
    destroyVSubtableVtb(*pVtable);
×
1075
  }
1076

1077
  tSimpleHashCleanup(pVg);
×
1078
}
×
1079

1080
static void destroyDbVgroupsHash(SSHashObj *pDbVgs) {
×
1081
  int32_t iter = 0;
×
1082
  SDBVgHashInfo* pVg = NULL;
×
1083
  void* p = NULL;
×
1084
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
×
1085
    pVg = (SDBVgHashInfo*)p;
×
1086
    taosArrayDestroy(pVg->vgArray);
×
1087
  }
1088
  
1089
  tSimpleHashCleanup(pDbVgs);
×
1090
}
×
1091

1092
static int32_t buildDBVgroupsMap(SMnode* pMnode, SSHashObj* pDbVgroup) {
×
1093
  void*   pIter = NULL;
×
1094
  SSdb*   pSdb = pMnode->pSdb;
×
1095
  int32_t code = TSDB_CODE_SUCCESS;
×
1096
  char    key[TSDB_DB_NAME_LEN + 32];
1097
  SArray* pTarget = NULL;
×
1098
  SArray* pNew = NULL;
×
1099
  SDbObj* pDb = NULL;
×
1100
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
×
1101

1102
  while (1) {
×
1103
    SVgObj* pVgroup = NULL;
×
1104
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
×
1105
    if (pIter == NULL) {
×
1106
      break;
×
1107
    }
1108

1109
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
×
1110
    if (NULL == pDbInfo) {
×
1111
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
×
1112
      if (NULL == pNew) {
×
1113
        code = terrno;
×
1114
        mError("taosArrayInit SVGroupHashInfo failed, code:%s", tstrerror(terrno));
×
1115
        sdbRelease(pSdb, pVgroup);
×
1116
        return code;
×
1117
      }      
1118

1119
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
1120
      if (pDb == NULL) {
×
1121
        code = terrno;
×
1122
        mError("mndAcquireDb %s failed, code:%s", pVgroup->dbName, tstrerror(terrno));
×
1123
        sdbRelease(pSdb, pVgroup);
×
1124
        return code;
×
1125
      }
1126

1127
      dbInfo.vgSorted = false;
×
1128
      dbInfo.hashMethod = pDb->cfg.hashMethod;
×
1129
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
×
1130
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
×
1131
      dbInfo.vgArray = pNew;
×
1132
      
1133
      mndReleaseDb(pMnode, pDb);
×
1134

1135
      pTarget = pNew;
×
1136
    } else {
1137
      pTarget = pDbInfo->vgArray;
×
1138
    }
1139

1140
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
×
1141
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
×
1142
      code = terrno;
×
1143
      mError("taosArrayPush SVGroupHashInfo failed, code:%s", tstrerror(terrno));
×
1144
      taosArrayDestroy(pNew);
×
1145
      sdbRelease(pSdb, pVgroup);
×
1146
      return code;
×
1147
    }
1148

1149
    if (NULL == pDbInfo) {
×
1150
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
×
1151
      if (code != 0) {
×
1152
        mError("tSimpleHashPut SDBVgHashInfo failed, code:%s", tstrerror(code));
×
1153
        taosArrayDestroy(pNew);
×
1154
        sdbRelease(pSdb, pVgroup);
×
1155
        return code;
×
1156
      }
1157
      
1158
      pNew = NULL;
×
1159
    }
1160

1161
    sdbRelease(pSdb, pVgroup);
×
1162
  }
1163

1164
  return code;
×
1165
}
1166

1167
static int32_t addVTableToVnode(SSHashObj* pVg, int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SStreamVBuildCtx* pCtx) {
×
1168
  int32_t code = TSDB_CODE_SUCCESS;
×
1169
  int32_t lino = 0;
×
1170
  SSHashObj* pNewVtable = NULL;
×
1171
  SArray* pNewOtable = NULL, *pTarOtable = NULL;
×
1172
  SColIdName col;
1173
  char vId[sizeof(int32_t) + sizeof(uint64_t)];
1174
  *(int32_t*)vId = vvgId;
×
1175
  *(uint64_t*)((int32_t*)vId + 1) = vuid;
×
1176

1177
  pCtx->lastUid = vuid;
×
1178

1179
  SSHashObj** pVtable = (SSHashObj**)tSimpleHashGet(pVg, vId, sizeof(vId));
×
1180
  if (NULL == pVtable) {
×
1181
    pNewVtable = (SSHashObj*)tSimpleHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
1182
    TSDB_CHECK_NULL(pNewVtable, code, lino, _return, terrno);
×
1183
    pNewOtable = taosArrayInit(4, sizeof(SColIdName));
×
1184
    TSDB_CHECK_NULL(pNewOtable, code, lino, _return, terrno);
×
1185
    tSimpleHashSetFreeFp(pNewVtable, tFreeStreamVtbOtbInfo);
×
1186
    col.colId = pCol->colId;
×
1187
    col.colName = taosStrdup(pCol->refColName);
×
1188
    TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);
×
1189
    TSDB_CHECK_NULL(taosArrayPush(pNewOtable, &col), code, lino, _return, terrno);
×
1190
    TSDB_CHECK_CODE(tSimpleHashPut(pNewVtable, pCol->refTableName, strlen(pCol->refTableName) + 1, &pNewOtable, POINTER_BYTES), lino, _return);
×
1191
    TSDB_CHECK_CODE(tSimpleHashPut(pVg, vId, sizeof(vId), &pNewVtable, POINTER_BYTES), lino, _return);
×
1192

1193
    pCtx->lastVtable = pNewVtable;
×
1194
    pCtx->lastOtable = pNewOtable;
×
1195

1196
    return code;
×
1197
  }
1198
  
1199
  SArray** pOtable = tSimpleHashGet(*pVtable, pCol->refTableName, strlen(pCol->refTableName) + 1);
×
1200
  if (NULL == pOtable) {
×
1201
    pNewOtable = taosArrayInit(4, sizeof(SColIdName));
×
1202
    TSDB_CHECK_NULL(pNewOtable, code, lino, _return, terrno);
×
1203
    pTarOtable = pNewOtable;
×
1204
  } else {
1205
    pTarOtable = *pOtable;
×
1206
  }
1207
  
1208
  col.colId = pCol->colId;
×
1209
  col.colName = taosStrdup(pCol->refColName);
×
1210
  TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);  
×
1211
  TSDB_CHECK_NULL(taosArrayPush(pTarOtable, &col), code, lino, _return, terrno);
×
1212
  if (NULL == pOtable) {
×
1213
    TSDB_CHECK_CODE(tSimpleHashPut(*pVtable, pCol->refTableName, strlen(pCol->refTableName) + 1, &pNewOtable, POINTER_BYTES), lino, _return);
×
1214
  }
1215

1216
  pCtx->lastVtable = *pVtable;
×
1217
  pCtx->lastOtable = pTarOtable;
×
1218

1219
_return:
×
1220

1221
  if (code) {
×
1222
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1223
  }
1224

1225
  return code;
×
1226
}
1227

1228
static int32_t addVgroupToRes(char* fDBName, int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SDBVgHashInfo* pDb, SSHashObj* pRes, SStreamVBuildCtx* pCtx) {
×
1229
  int32_t code = TSDB_CODE_SUCCESS;
×
1230
  int32_t lino = 0;
×
1231
  int32_t vgId = 0;
×
1232
  char dbVgId[TSDB_DB_NAME_LEN + 32];
1233
  SSHashObj *pTarVg = NULL, *pNewVg = NULL;
×
1234
  
1235
  TSDB_CHECK_CODE(getTableVgId(pDb, 1, fDBName, &vgId, pCol->refTableName), lino, _return);
×
1236

1237
  snprintf(dbVgId, sizeof(dbVgId), "%s.%d", pCol->refDbName, vgId);
×
1238

1239
  SSHashObj** pVg = (SSHashObj**)tSimpleHashGet(pRes, dbVgId, strlen(dbVgId) + 1);
×
1240
  if (NULL == pVg) {
×
1241
    pNewVg = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1242
    TSDB_CHECK_NULL(pNewVg, code, lino, _return, terrno);
×
1243
    tSimpleHashSetFreeFp(pNewVg, tFreeStreamVtbVtbInfo);
×
1244
    pTarVg = pNewVg;
×
1245
  } else {
1246
    pTarVg = *pVg;
×
1247
  }
1248

1249
  TSDB_CHECK_CODE(addVTableToVnode(pTarVg, vvgId, vuid, pCol, pCtx), lino, _return);
×
1250

1251
  if (NULL == pVg) {
×
1252
    TSDB_CHECK_CODE(tSimpleHashPut(pRes, dbVgId, strlen(dbVgId) + 1, &pNewVg, POINTER_BYTES), lino, _return);
×
1253
    pNewVg = NULL;
×
1254
  }
1255

1256
  pCtx->lastVg = pTarVg;
×
1257

1258
_return:
×
1259

1260
  if (code) {
×
1261
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1262
  }
1263

1264
  destroyVSubtableVgHash(pNewVg);
×
1265

1266
  return code;
×
1267
}
1268

1269
static int32_t addRefColToMap(int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SSHashObj* pDbVgroups, SSHashObj* pRes, SStreamVBuildCtx* pCtx) {
×
1270
  int32_t code = TSDB_CODE_SUCCESS;
×
1271
  int32_t lino = 0;
×
1272
  bool isLastVtable = vuid == pCtx->lastUid;
×
1273
  SSHashObj* currOtable = NULL;
×
1274
  SColIdName col;
1275
  char fDBName[TSDB_DB_FNAME_LEN];
1276
  
1277
  if (pCtx->lastCol && pCtx->lastCol->refDbName[0] == pCol->refDbName[0] && pCtx->lastCol->refTableName[0] == pCol->refTableName[0] &&
×
1278
     0 == strcmp(pCtx->lastCol->refDbName, pCol->refDbName) && 0 == strcmp(pCtx->lastCol->refTableName, pCol->refTableName)) {
×
1279
    if (isLastVtable) {
×
1280
      col.colId = pCol->colId;
×
1281
      col.colName = taosStrdup(pCol->refColName);
×
1282
      TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);
×
1283
      TSDB_CHECK_NULL(taosArrayPush(pCtx->lastOtable, &col), code, lino, _return, terrno);
×
1284
      return code;
×
1285
    }
1286

1287
    TSDB_CHECK_CODE(addVTableToVnode(pCtx->lastVg, vvgId, vuid, pCol, pCtx), lino, _return);
×
1288
    return code;
×
1289
  }
1290

1291
  snprintf(fDBName, sizeof(fDBName), "1.%s", pCol->refDbName);
×
1292
  SDBVgHashInfo* pDb = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, fDBName, strlen(fDBName) + 1);
×
1293
  if (NULL == pDb) {
×
1294
    mError("refDb %s does not exist", pCol->refDbName);
×
1295
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
1296
    goto _return;
×
1297
  }
1298

1299
  TSDB_CHECK_CODE(addVgroupToRes(fDBName, vvgId, vuid, pCol, pDb, pRes, pCtx), lino, _return);
×
1300

1301
  pCtx->lastCol = pCol;
×
1302

1303
_return:
×
1304

1305
  if (code) {
×
1306
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1307
  }
1308

1309
  return code;
×
1310
}
1311

1312
static int32_t buildVSubtableMap(SMnode* pMnode, SArray* pVSubTables, SSHashObj** ppRes) {
×
1313
  int32_t code = 0;
×
1314
  int32_t lino = 0;
×
1315

1316
  SSHashObj* pDbVgroups = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
1317
  if (NULL == pDbVgroups) {
×
1318
    mError("tSimpleHashInit failed, error:%s", tstrerror(terrno));
×
1319
    return terrno;
×
1320
  }
1321
  
1322
  TAOS_CHECK_EXIT(buildDBVgroupsMap(pMnode, pDbVgroups));
×
1323

1324
  *ppRes = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
1325
  if (NULL == *ppRes) {
×
1326
    code = terrno;
×
1327
    mError("tSimpleHashInit failed, error:%s", tstrerror(terrno));
×
1328
    goto _exit;
×
1329
  }
1330
  tSimpleHashSetFreeFp(*ppRes, tFreeStreamVtbDbVgInfo);
×
1331

1332
  SStreamVBuildCtx ctx = {0};
×
1333
  int32_t vgNum = taosArrayGetSize(pVSubTables);
×
1334
  for (int32_t i = 0; i < vgNum; ++i) {
×
1335
    SVSubTablesRsp* pVgTbs = taosArrayGet(pVSubTables, i);
×
1336
    int32_t tbNum = taosArrayGetSize(pVgTbs->pTables);
×
1337
    for (int32_t n = 0; n < tbNum; ++n) {
×
1338
      SVCTableRefCols* pTb = (SVCTableRefCols*)taosArrayGetP(pVgTbs->pTables, n);
×
1339
      for (int32_t m = 0; m < pTb->numOfColRefs; ++m) {
×
1340
        SRefColInfo* pCol = pTb->refCols + m;
×
1341
        TAOS_CHECK_EXIT(addRefColToMap(pVgTbs->vgId, pTb->uid, pCol, pDbVgroups, *ppRes, &ctx));
×
1342
      }
1343
    }
1344
  }
1345

1346
_exit:
×
1347

1348
  if (code) {
×
1349
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1350
  }
1351

1352
  destroyDbVgroupsHash(pDbVgroups);
×
1353

1354
  return code;
×
1355
}
1356

1357
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, SCMCreateStreamReq* pCreate) {
3✔
1358
  int32_t code = 0;
3✔
1359
  bool    isVTableStream = (NULL != pCreate->pVSubTables);
3✔
1360
  int64_t skey = pCreate->lastTs;
3✔
1361
  SArray* pVerList = pCreate->pVgroupVerList;
3✔
1362
  SSdb*   pSdb = pMnode->pSdb;
3✔
1363
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
3!
1364
  bool    hasExtraSink = false;
3✔
1365
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
3✔
1366
  SSubplan* plan = NULL;
3✔
1367
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
3✔
1368

1369
  if (pDbObj == NULL) {
3!
1370
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
1371
    TAOS_RETURN(code);
×
1372
  }
1373

1374
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
3✔
1375
  sdbRelease(pSdb, pDbObj);
3✔
1376

1377
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
3!
1378
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
1379

1380
  pStream->pTaskList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
3✔
1381
  pStream->pHTaskList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
3✔
1382
  if (pStream->pTaskList == NULL || pStream->pHTaskList == NULL) {
3!
1383
    mError("failed to create stream obj, code:%s", tstrerror(terrno));
×
1384
    return terrno;
×
1385
  }
1386

1387
  if (pCreate->pVSubTables) {
3!
1388
    code = buildVSubtableMap(pMnode, pCreate->pVSubTables, &pStream->pVTableMap);
×
1389
    if (TSDB_CODE_SUCCESS != code) {
×
1390
      mError("failed to buildVSubtableMap, code:%s", tstrerror(terrno));
×
1391
      return code;
×
1392
    }
1393
  }
1394

1395
  if ((numOfPlanLevel > 1 && !isVTableStream) || (numOfPlanLevel > 2 && isVTableStream) || externalTargetDB ||
3!
1396
      multiTarget || pStream->fixedSinkVgId) {
×
1397
    // add extra sink
1398
    hasExtraSink = true;
3✔
1399
    code = addSinkTask(pMnode, pStream, pEpset);
3✔
1400
    if (code != TSDB_CODE_SUCCESS) {
3!
1401
      return code;
×
1402
    }
1403
  }
1404

1405
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
3✔
1406

1407
  int8_t hasAggTasks = (numOfPlanLevel > 1) ? 1 : 0;  // task level is greater than 1, which means agg existing
3✔
1408
  if (pStream->pVTableMap) {
3!
1409
    code = addNewTaskList(pStream);
×
1410
    if (code) {
×
1411
      return code;
×
1412
    }
1413

1414
    plan = getVTbScanSubPlan(pPlan);
×
1415
    if (plan == NULL) {
×
1416
      mError("fail to get vtable scan plan");
×
1417
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1418
      if (terrno != 0) code = terrno;
×
1419
      TAOS_RETURN(code);
×
1420
    }
1421

1422
    SArray** pSourceTaskList = taosArrayGetLast(pStream->pTaskList);
×
1423

1424
    code = addNewTaskList(pStream);
×
1425
    if (code) {
×
1426
      return code;
×
1427
    }
1428
    code = addVTableMergeTask(pMnode, plan, pStream, pEpset, (numOfPlanLevel == 1), hasAggTasks, pCreate);
×
1429
    if (code) {
×
1430
      return code;
×
1431
    }
1432

1433
    plan = getScanSubPlan(pPlan);  // source plan
×
1434
    if (plan == NULL) {
×
1435
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1436
      if (terrno != 0) code = terrno;
×
1437
      TAOS_RETURN(code);
×
1438
    }
1439

1440
    SArray** pMergeTaskList = taosArrayGetLast(pStream->pTaskList);
×
1441
    code = addVTableSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1), hasAggTasks,
×
1442
                               pCreate, pStream->pVTableMap, *pSourceTaskList, *pMergeTaskList);
1443
  } else {
1444
    plan = getScanSubPlan(pPlan);  // source plan
3✔
1445
    if (plan == NULL) {
3!
1446
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1447
      if (terrno != 0) code = terrno;
×
1448
      TAOS_RETURN(code);
×
1449
    }
1450

1451
    code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1), hasAggTasks);
3✔
1452
  }
1453
  if (code != TSDB_CODE_SUCCESS) {
3!
1454
    return code;
×
1455
  }
1456

1457
  if ((numOfPlanLevel == 1 && !isVTableStream)) {
3!
1458
    bindSourceSink(pStream, pMnode, pStream->pTaskList, hasExtraSink);
1✔
1459
    if (needHistoryTask(pStream)) {
1!
1460
      bindSourceSink(pStream, pMnode, pStream->pHTaskList, hasExtraSink);
×
1461
    }
1462
    return TDB_CODE_SUCCESS;
1✔
1463
  }
1464

1465
  if (numOfPlanLevel == 2 && isVTableStream) {
2!
1466
    bindVtableMergeSink(pStream, pMnode, pStream->pTaskList, hasExtraSink);
×
1467
    return TDB_CODE_SUCCESS;
×
1468
  }
1469

1470
  if ((numOfPlanLevel == 3 && !isVTableStream) || (numOfPlanLevel == 4 && isVTableStream)) {
2!
1471
    int32_t idx = isVTableStream ? 2 : 1;
2!
1472
    plan = getAggSubPlan(pPlan, idx);  // middle agg plan
2✔
1473
    if (plan == NULL) {
2!
1474
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1475
      if (terrno != 0) code = terrno;
×
1476
      TAOS_RETURN(code);
×
1477
    }
1478

1479
    do {
×
1480
      SArray** list = taosArrayGetLast(pStream->pTaskList);
2✔
1481
      float    size = (float)taosArrayGetSize(*list);
2✔
1482
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
2✔
1483
      if (cnt <= 1) break;
2!
1484

1485
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
×
1486
      code = addNewTaskList(pStream);
×
1487
      if (code) {
×
1488
        return code;
×
1489
      }
1490

1491
      for (int j = 0; j < cnt; j++) {
×
1492
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
×
1493
        if (code != TSDB_CODE_SUCCESS) {
×
1494
          return code;
×
1495
        }
1496

1497
        bindTwoLevel(pStream->pTaskList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
×
1498
        if (needHistoryTask(pStream)) {
×
1499
          bindTwoLevel(pStream->pHTaskList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
×
1500
        }
1501
      }
1502
    } while (1);
1503
  }
1504

1505
  plan = getAggSubPlan(pPlan, 0);
2✔
1506
  if (plan == NULL) {
2!
1507
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1508
    if (terrno != 0) code = terrno;
×
1509
    TAOS_RETURN(code);
×
1510
  }
1511

1512
  mDebug("doScheduleStream add final agg");
2!
1513
  SArray** list = taosArrayGetLast(pStream->pTaskList);
2✔
1514
  size_t   size = taosArrayGetSize(*list);
2✔
1515

1516
  code = addNewTaskList(pStream);
2✔
1517
  if (code) {
2!
1518
    return code;
×
1519
  }
1520

1521
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
2✔
1522
  if (code != TSDB_CODE_SUCCESS) {
2!
1523
    TAOS_RETURN(code);
×
1524
  }
1525
  bindTwoLevel(pStream->pTaskList, 0, size);
2✔
1526
  if (needHistoryTask(pStream)) {
2✔
1527
    bindTwoLevel(pStream->pHTaskList, 0, size);
1✔
1528
  }
1529

1530
  bindAggSink(pStream, pMnode, pStream->pTaskList);
2✔
1531
  if (needHistoryTask(pStream)) {
2✔
1532
    bindAggSink(pStream, pMnode, pStream->pHTaskList);
1✔
1533
  }
1534
  TAOS_RETURN(code);
2✔
1535
}
1536

1537
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, SCMCreateStreamReq* pCreate) {
3✔
1538
  int32_t     code = 0;
3✔
1539
  pStream->pPlan = qStringToQueryPlan(pStream->physicalPlan);
3✔
1540
  if (pStream->pPlan == NULL) {
3!
1541
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
1542
    TAOS_RETURN(code);
×
1543
  }
1544

1545
  SEpSet mnodeEpset = {0};
3✔
1546
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
3✔
1547

1548
  code = doScheduleStream(pStream, pMnode, pStream->pPlan, &mnodeEpset, pCreate);
3✔
1549

1550
  TAOS_RETURN(code);
3✔
1551
}
1552
#endif
1553

1554
#ifdef USE_TOPIC
1555
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
15✔
1556
  int32_t     code = 0;
15✔
1557
  SSdb*       pSdb = pMnode->pSdb;
15✔
1558
  SVgObj*     pVgroup = NULL;
15✔
1559
  SQueryPlan* pPlan = NULL;
15✔
1560
  SSubplan*   pSubplan = NULL;
15✔
1561

1562
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
15✔
1563
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
14✔
1564
    if (pPlan == NULL) {
14!
1565
      return TSDB_CODE_QRY_INVALID_INPUT;
×
1566
    }
1567
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
1!
1568
    SNode* pAst = NULL;
×
1569
    code = nodesStringToNode(pTopic->ast, &pAst);
×
1570
    if (code != 0) {
×
1571
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
1572
      return code;
×
1573
    }
1574

1575
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
×
1576
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
×
1577
    if (code != 0) {
×
1578
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
1579
      nodesDestroyNode(pAst);
×
1580
      return code;
×
1581
    }
1582
    nodesDestroyNode(pAst);
×
1583
  }
1584

1585
  if (pPlan) {
15✔
1586
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
14!
1587
    if (levelNum != 1) {
14!
1588
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
1589
      goto END;
×
1590
    }
1591

1592
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
14✔
1593
    if (pNodeListNode == NULL){
14!
1594
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1595
      goto END;
×
1596
    }
1597
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
14!
1598
    if (opNum != 1) {
14!
1599
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
1600
      goto END;
×
1601
    }
1602

1603
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
14✔
1604
  }
1605

1606
  void* pIter = NULL;
15✔
1607
  while (1) {
72✔
1608
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
87✔
1609
    if (pIter == NULL) {
87✔
1610
      break;
15✔
1611
    }
1612

1613
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
72✔
1614
      sdbRelease(pSdb, pVgroup);
25✔
1615
      continue;
25✔
1616
    }
1617

1618
    pSub->vgNum++;
47✔
1619

1620
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
47!
1621
    if (pVgEp == NULL){
47!
1622
      code = terrno;
×
1623
      goto END;
×
1624
    }
1625
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
47✔
1626
    pVgEp->vgId = pVgroup->vgId;
47✔
1627
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
94!
1628
      code = terrno;
×
1629
      taosMemoryFree(pVgEp);
×
1630
      goto END;
×
1631
    }
1632
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
47!
1633
    sdbRelease(pSdb, pVgroup);
47✔
1634
  }
1635

1636
  if (pSubplan) {
15✔
1637
    int32_t msgLen;
1638

1639
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
14!
1640
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
1641
      goto END;
×
1642
    }
1643
  } else {
1644
    pSub->qmsg = taosStrdup("");
1!
1645
  }
1646

1647
END:
15✔
1648
  qDestroyQueryPlan(pPlan);
15✔
1649
  return code;
15✔
1650
}
1651
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc