• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.38
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

30
static bool hasCountWindowNode(SPhysiNode* pNode) {
11,136✔
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
11,136✔
32
    return true;
158✔
33
  } else {
34
    size_t size = LIST_LENGTH(pNode->pChildren);
10,978✔
35

36
    for (int32_t i = 0; i < size; ++i) {
17,210✔
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
6,376✔
38
      if (hasCountWindowNode(pChild)) {
6,376✔
39
        return true;
144✔
40
      }
41
    }
42

43
    return false;
10,834✔
44
  }
45
}
46

47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
4,760✔
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
4,760✔
49
}
50

51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
8✔
52
                           int64_t watermark, int64_t deleteMark) {
53
  int32_t     code = 0;
8✔
54
  SNode*      pAst = NULL;
8✔
55
  SQueryPlan* pPlan = NULL;
8✔
56

57
  if (nodesStringToNode(ast, &pAst) < 0) {
8!
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
8!
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

67
  SPlanContext cxt = {
8✔
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
8!
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
8!
83
  if (levelNum != 1) {
8!
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
8✔
88

89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
8!
90
  if (opNum != 1) {
8!
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
8✔
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
8!
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

101
END:
8✔
102
  if (pAst) nodesDestroyNode(pAst);
8!
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
8!
104
  TAOS_RETURN(code);
8✔
105
}
106
#ifdef USE_STREAM
107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
6,967✔
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
6,967✔
109

110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
6,967✔
111

112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
6,967!
UNCOV
113
    pInfo->type = TASK_OUTPUT__SMA;
×
UNCOV
114
    pInfo->smaSink.smaId = pStream->smaId;
×
115
  } else {
116
    pInfo->type = TASK_OUTPUT__TABLE;
6,967✔
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
6,967✔
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
6,967✔
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
6,967!
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
6,967!
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

125
  return 0;
6,967✔
126
}
127

128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
6,268✔
129
                                        SStreamTask* pTask) {
130
  int32_t code = 0;
6,268✔
131
  bool isShuffle = false;
6,268✔
132

133
  if (pStream->fixedSinkVgId == 0) {
6,268!
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
6,268✔
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
6,268!
136
      isShuffle = true;
6,212✔
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
6,212✔
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
6,212✔
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
6,212!
140
    }
141

142
    sdbRelease(pMnode->pSdb, pDb);
6,268✔
143
  }
144

145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
6,268✔
146

147
  if (isShuffle) {
6,268✔
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
6,212✔
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
6,212✔
150

151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
6,212✔
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
25,533✔
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
19,321✔
154

155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
42,132!
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
42,132✔
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
42,132✔
158
          pVgInfo->taskId = pSinkTask->id.taskId;
19,321✔
159
          break;
19,321✔
160
        }
161
      }
162
    }
163
  } else {
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
56✔
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
56✔
166
  }
167

168
  TAOS_RETURN(code);
6,268✔
169
}
170

171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
7,250✔
172
  int32_t msgLen;
173

174
  pTask->info.nodeId = pVgroup->vgId;
7,250✔
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
7,250✔
176

177
  plan->execNode.nodeId = pTask->info.nodeId;
7,250✔
178
  plan->execNode.epSet = pTask->info.epSet;
7,250✔
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
7,250✔
180
}
181

182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
210✔
183
  SSnodeObj* pObj = NULL;
210✔
184
  void*      pIter = NULL;
210✔
185
  // TODO random fetch
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
210✔
187
  sdbCancelFetch(pMnode->pSdb, pIter);
210✔
188
  return pObj;
210✔
189
}
190

191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
91✔
192
  int32_t msgLen;
193

194
  pTask->info.nodeId = SNODE_HANDLE;
91✔
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
91✔
196

197
  plan->execNode.nodeId = SNODE_HANDLE;
91✔
198
  plan->execNode.epSet = pTask->info.epSet;
91✔
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
91✔
200

201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
91✔
202
}
203

204
// random choose a node to do compute
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
150✔
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
150✔
207
  if (pDbObj == NULL) {
150!
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

212
  if (pStream->indexForMultiAggBalance == -1) {
150✔
213
    taosSeedRand(taosSafeRand());
138✔
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
138✔
215
  }
216

217
  int32_t index = 0;
150✔
218
  void*   pIter = NULL;
150✔
219
  SVgObj* pVgroup = NULL;
150✔
220
  while (1) {
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
611✔
222
    if (pIter == NULL) break;
611!
223
    if (pVgroup->dbUid != pStream->sourceDbUid) {
611✔
224
      sdbRelease(pMnode->pSdb, pVgroup);
287✔
225
      continue;
287✔
226
    }
227
    if (index++ == pStream->indexForMultiAggBalance) {
324✔
228
      pStream->indexForMultiAggBalance++;
150✔
229
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
150✔
230
      sdbCancelFetch(pMnode->pSdb, pIter);
150✔
231
      break;
150✔
232
    }
233
    sdbRelease(pMnode->pSdb, pVgroup);
174✔
234
  }
235
  sdbRelease(pMnode->pSdb, pDbObj);
150✔
236

237
  return pVgroup;
150✔
238
}
239

240
static void streamGetUidTaskList(SStreamObj* pStream, EStreamTaskType type, uint64_t* pUid, SArray*** pTaskList) {
14,041✔
241
  if (type == STREAM_NORMAL_TASK) {
14,041✔
242
    *pUid = pStream->uid;
9,233✔
243
    *pTaskList = taosArrayGetLast(pStream->pTaskList);
9,233✔
244
  } else if (type == STREAM_HISTORY_TASK || type == STREAM_RECALCUL_TASK) {
4,808!
245
    *pUid = pStream->hTaskUid;
4,808✔
246
    *pTaskList = taosArrayGetLast(pStream->pHTaskList);
4,808✔
247
  }
248
}
14,041✔
249

250
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, EStreamTaskType type) {
6,700✔
251
  uint64_t uid = 0;
6,700✔
252
  SArray** pTaskList = NULL;
6,700✔
253
  streamGetUidTaskList(pStream, type, &uid, &pTaskList);
6,700✔
254

255
  SStreamTask* pTask = NULL;
6,700✔
256
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, type, pStream->conf.trigger, 0, *pTaskList, pStream->conf.fillHistory,
6,700✔
257
                                pStream->subTableWithoutMd5, 1, &pTask);
6,700✔
258
  if (code != 0) {
6,700!
UNCOV
259
    return code;
×
260
  }
261

262
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
6,700✔
263

264
  pTask->info.nodeId = pVgroup->vgId;
6,700✔
265
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
6,700✔
266
  return mndSetSinkTaskInfo(pStream, pTask);
6,700✔
267
}
268

269
bool needHistoryTask(SStreamObj* pStream) {
25,114✔
270
  return (pStream->conf.fillHistory) || (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE);
25,114✔
271
}
272

273
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
4,345✔
274
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, STREAM_NORMAL_TASK);
4,345✔
275
  if (code != 0) {
4,345!
UNCOV
276
    return code;
×
277
  }
278

279
  if (needHistoryTask(pStream)) {
4,345✔
280
    EStreamTaskType type = (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) ? STREAM_RECALCUL_TASK
4,710✔
281
                                                                                             : STREAM_HISTORY_TASK;
2,355✔
282
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, type);
2,355✔
283
    if (code != 0) {
2,355!
UNCOV
284
      return code;
×
285
    }
286
  }
287
  return TDB_CODE_SUCCESS;
4,345✔
288
}
289

290
// create sink node for each vgroup.
291
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
1,535✔
292
  SSdb* pSdb = pMnode->pSdb;
1,535✔
293
  void* pIter = NULL;
1,535✔
294

295
  while (1) {
5,716✔
296
    SVgObj* pVgroup = NULL;
7,251✔
297
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
7,251✔
298
    if (pIter == NULL) {
7,251✔
299
      break;
1,535✔
300
    }
301

302
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
5,716✔
303
      sdbRelease(pSdb, pVgroup);
1,371✔
304
      continue;
1,371✔
305
    }
306

307
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
4,345✔
308
    if (code != 0) {
4,345!
UNCOV
309
      sdbRelease(pSdb, pVgroup);
×
UNCOV
310
      return code;
×
311
    }
312

313
    sdbRelease(pSdb, pVgroup);
4,345✔
314
  }
315

316
  return TDB_CODE_SUCCESS;
1,535✔
317
}
318

319
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
7,058✔
320
  int32_t size = (int32_t) taosArrayGetSize(pList);
7,058✔
321
  for (int32_t i = 0; i < size; ++i) {
11,340✔
322
    SVgroupVer* pVer = taosArrayGet(pList, i);
7,122✔
323
    if (pVer->vgId == vgId) {
7,122✔
324
      return pVer->ver;
2,840✔
325
    }
326
  }
327

328
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
4,218✔
329
  return 1;
4,218✔
330
}
331

332
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
7,058✔
333
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
7,058✔
334
  if (latestVer < 0) {
7,058!
UNCOV
335
    latestVer = 0;
×
336
  }
337

338
  // set the correct ts, which is the last key of queried table.
339
  SDataRange*  pRange = &pTask->dataRange;
7,058✔
340
  STimeWindow* pWindow = &pRange->window;
7,058✔
341

342
  if (pTask->info.fillHistory == STREAM_HISTORY_TASK) {
7,058✔
343
    pWindow->skey = INT64_MIN;
2,364✔
344
    pWindow->ekey = skey - 1;
2,364✔
345

346
    pRange->range.minVer = 0;
2,364✔
347
    pRange->range.maxVer = latestVer;
2,364✔
348
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
2,364✔
349
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
350
  } else {
351
    pWindow->skey = skey;
4,694✔
352
    pWindow->ekey = INT64_MAX;
4,694✔
353

354
    pRange->range.minVer = latestVer + 1;
4,694✔
355
    pRange->range.maxVer = INT64_MAX;
4,694✔
356

357
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
4,694✔
358
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
359
  }
360
}
7,058✔
361

362
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
4,760✔
363
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
4,760✔
364

365
  if (hasCountWindowNode && (!isFillhistoryTask)) {
4,760✔
366
    SStreamStatus* pStatus = &pTask->status;
79✔
367
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
79✔
368
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
369
    pStatus->taskStatus = TASK_STATUS__HALT;
79✔
370
  }
371
}
4,760✔
372

373
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, EStreamTaskType type, bool useTriggerParam,
7,058✔
374
                               int8_t hasAggTasks, SStreamTask** pTask, SArray* pSourceTaskList) {
375
  uint64_t uid = 0;
7,058✔
376
  SArray** pTaskList = NULL;
7,058✔
377
  if (pSourceTaskList) {
7,058!
NEW
378
    pTaskList = &pSourceTaskList;
×
379
  } else {
380
    streamGetUidTaskList(pStream, type, &uid, &pTaskList);
7,058✔
381
  }
382

383
  int32_t trigger = 0;
7,058✔
384
  if (type == STREAM_RECALCUL_TASK) {
7,058✔
385
    trigger = STREAM_TRIGGER_WINDOW_CLOSE;
16✔
386
  } else {
387
    trigger = pStream->conf.trigger;
7,042✔
388
  }
389

390
  int32_t triggerParam = useTriggerParam ? pStream->conf.triggerParam : 0;
7,058✔
391
  int32_t code =
392
      tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, type, trigger, triggerParam,
7,058✔
393
                     *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5,
7,058✔
394
                     hasAggTasks, pTask);
395

396
  return code;
7,058✔
397
}
398

399
static int32_t addNewTaskList(SStreamObj* pStream) {
3,518✔
400
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
3,518✔
401
  if (pTaskList == NULL) {
3,518!
UNCOV
402
    mError("failed init task list, code:%s", tstrerror(terrno));
×
UNCOV
403
    return terrno;
×
404
  }
405

406
  if (taosArrayPush(pStream->pTaskList, &pTaskList) == NULL) {
7,036!
UNCOV
407
    mError("failed to put into array, code:%s", tstrerror(terrno));
×
UNCOV
408
    return terrno;
×
409
  }
410

411
  if (needHistoryTask(pStream)) {
3,518✔
412
    pTaskList = taosArrayInit(0, POINTER_BYTES);
1,695✔
413
    if (pTaskList == NULL) {
1,695!
UNCOV
414
      mError("failed init history task list, code:%s", tstrerror(terrno));
×
UNCOV
415
      return terrno;
×
416
    }
417

418
    if (taosArrayPush(pStream->pHTaskList, &pTaskList) == NULL) {
3,390!
UNCOV
419
      mError("failed to put into array, code:%s", tstrerror(terrno));
×
UNCOV
420
      return terrno;
×
421
    }
422
  }
423

424
  return TSDB_CODE_SUCCESS;
3,518✔
425
}
426

427
// set the history task id
428
static void setHTasksId(SStreamObj* pStream) {
1,701✔
429
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->pTaskList);
1,701✔
430
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTaskList);
1,701✔
431

432
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
6,517✔
433
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
4,816✔
434
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
4,816✔
435

436
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
4,816✔
437
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
4,816✔
438

439
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
4,816✔
440
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
4,816✔
441

442
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
4,816✔
443
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
444
  }
445
}
1,701✔
446

NEW
447
static int32_t addSourceTaskVTableOutput(SStreamTask* pTask, SSHashObj* pVgTasks, SSHashObj* pVtables) {
×
NEW
448
  int32_t code = 0;
×
NEW
449
  int32_t lino = 0;
×
NEW
450
  int32_t taskNum = tSimpleHashGetSize(pVgTasks);
×
NEW
451
  int32_t tbNum = tSimpleHashGetSize(pVtables);
×
452

NEW
453
  SSHashObj *pTaskMap = tSimpleHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
NEW
454
  TSDB_CHECK_NULL(pTaskMap, code, lino, _end, terrno);
×
455

NEW
456
  pTask->outputInfo.type = TASK_OUTPUT__VTABLE_MAP;
×
NEW
457
  STaskDispatcherVtableMap *pDispatcher = &pTask->outputInfo.vtableMapDispatcher;
×
NEW
458
  pDispatcher->taskInfos = taosArrayInit(taskNum, sizeof(STaskDispatcherFixed));
×
NEW
459
  TSDB_CHECK_NULL(pDispatcher->taskInfos, code, lino, _end, terrno);
×
NEW
460
  pDispatcher->vtableMap = tSimpleHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
NEW
461
  TSDB_CHECK_NULL(pDispatcher->vtableMap, code, lino, _end, terrno);
×
462

NEW
463
  int32_t               iter = 0, vgId = 0;
×
NEW
464
  uint64_t              uid = 0;
×
NEW
465
  STaskDispatcherFixed* pAddr = NULL;
×
NEW
466
  void*                 p = NULL;
×
NEW
467
  while (NULL != (p = tSimpleHashIterate(pVtables, p, &iter))) {
×
NEW
468
    char* vgUid = tSimpleHashGetKey(p, NULL);
×
NEW
469
    vgId = *(int32_t*)vgUid;
×
NEW
470
    uid = *(uint64_t*)((int32_t*)vgUid + 1);
×
471
    
NEW
472
    pAddr = tSimpleHashGet(pVgTasks, &vgId, sizeof(vgId));
×
NEW
473
    if (NULL == pAddr) {
×
NEW
474
      mError("tSimpleHashGet vgId %d not found", vgId);
×
NEW
475
      return code;
×
476
    }
477

NEW
478
    void*   px = tSimpleHashGet(pTaskMap, &pAddr->taskId, sizeof(int32_t));
×
NEW
479
    int32_t idx = 0;
×
NEW
480
    if (px == NULL) {
×
NEW
481
      px = taosArrayPush(pDispatcher->taskInfos, pAddr);
×
NEW
482
      TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
NEW
483
      idx = taosArrayGetSize(pDispatcher->taskInfos) - 1;
×
NEW
484
      code = tSimpleHashPut(pTaskMap, &pAddr->taskId, sizeof(int32_t), &idx, sizeof(int32_t));
×
NEW
485
      if (code) {
×
NEW
486
        mError("tSimpleHashPut uid to task idx failed, error:%d", code);
×
NEW
487
        return code;
×
488
      }
489
    } else {
NEW
490
      idx = *(int32_t*)px;
×
491
    }
492

NEW
493
    code = tSimpleHashPut(pDispatcher->vtableMap, &uid, sizeof(int64_t), &idx, sizeof(int32_t));
×
NEW
494
    if (code) {
×
NEW
495
      mError("tSimpleHashPut uid to STaskDispatcherFixed failed, error:%d", code);
×
NEW
496
      return code;
×
497
    }
498
    
NEW
499
    mDebug("source task[%s,vg:%d] add vtable output map, vuid %" PRIu64 " => [%d, vg:%d]", 
×
500
        pTask->id.idStr, pTask->info.nodeId, uid, pAddr->taskId, pAddr->nodeId);
501
  }
502

NEW
503
_end:
×
NEW
504
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
505
    mError("source task[%s,vg:%d] add vtable output map failed, lino:%d, error:%s", pTask->id.idStr, pTask->info.nodeId,
×
506
           lino, tstrerror(code));
507
  }
NEW
508
  if (pTaskMap != NULL) {
×
NEW
509
    tSimpleHashCleanup(pTaskMap);
×
510
  }
NEW
511
  return code;
×
512
}
513

514
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
7,058✔
515
                               SArray* pVerList, SVgObj* pVgroup, EStreamTaskType type, bool useTriggerParam,
516
                               int8_t hasAggTasks, SSHashObj* pVgTasks, SArray* pSourceTaskList) {
517
  SStreamTask* pTask = NULL;
7,058✔
518
  int32_t code = buildSourceTask(pStream, pEpset, type, useTriggerParam, hasAggTasks, &pTask, pSourceTaskList);
7,058✔
519
  if (code != TSDB_CODE_SUCCESS) {
7,058!
520
    return code;
×
521
  }
522

523
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, historyTask:%d", pTask->id.idStr, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
7,058✔
524

525
  if (needHistoryTask(pStream)) {
7,058✔
526
    haltInitialTaskStatus(pTask, plan, (type == STREAM_HISTORY_TASK));
4,760✔
527
  }
528

529
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
7,058✔
530

531
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
7,058✔
532
  if (code != TSDB_CODE_SUCCESS) {
7,058!
UNCOV
533
    return code;
×
534
  }
535

536
  mTrace("souce task plan:%s", pTask->exec.qmsg);
7,058✔
537

538
  if (pVgTasks) {
7,058!
NEW
539
    code = addSourceTaskVTableOutput(pTask, pVgTasks, plan->pVTables);
×
540
  }
541

542
  return code;
7,058✔
543
}
544

545
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
1,780✔
546
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,780!
547
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
1,780✔
548
  if (LIST_LENGTH(inner->pNodeList) != 1) {
1,780!
UNCOV
549
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
550
    return NULL;
×
551
  }
552

553
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
1,780✔
554
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
1,780!
UNCOV
555
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
556
    return NULL;
×
557
  }
558
  return plan;
1,780✔
559
}
560

NEW
561
static int32_t doAddMergeTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, SVgObj* pVgroup,
×
562
                              bool isHistoryTask, bool useTriggerParam, int8_t hasAggTasks, SArray* pVtables) {
NEW
563
  SStreamTask* pTask = NULL;
×
NEW
564
  SArray** pTaskList = taosArrayGetLast(pStream->pTaskList);
×
565

NEW
566
  int32_t code = tNewStreamTask(pStream->uid, TASK_LEVEL__MERGE, pEpset, isHistoryTask, pStream->conf.trigger,
×
NEW
567
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
×
NEW
568
                                pStream->subTableWithoutMd5, hasAggTasks, &pTask);
×
NEW
569
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
570
    return code;
×
571
  }
572

NEW
573
  int32_t vtbNum = taosArrayGetSize(pVtables);
×
NEW
574
  pTask->pVTables = taosArrayInit(vtbNum, sizeof(SVCTableMergeInfo));
×
NEW
575
  if (NULL == pTask->pVTables) {
×
NEW
576
    code = terrno;
×
NEW
577
    mError("taosArrayInit %d SVCTableMergeInfo failed, error:%d", vtbNum, terrno);
×
NEW
578
    return code;
×
579
  }
580

581
  SVCTableMergeInfo tbInfo;
NEW
582
  for (int32_t i = 0; i < vtbNum; ++i) {
×
NEW
583
    SVCTableRefCols** pTb = taosArrayGet(pVtables, i);
×
NEW
584
    tbInfo.uid = (*pTb)->uid;
×
NEW
585
    tbInfo.numOfSrcTbls = (*pTb)->numOfSrcTbls;
×
NEW
586
    if (NULL == taosArrayPush(pTask->pVTables, &tbInfo)) {
×
NEW
587
      code = terrno;
×
NEW
588
      mError("taosArrayPush SVCTableMergeInfo failed, error:%d", terrno);
×
NEW
589
      return code;
×
590
    }
591

NEW
592
    mDebug("merge task[%s, vg:%d] add vtable info: vuid %" PRIu64 ", numOfSrcTbls:%d", 
×
593
        pTask->id.idStr, pVgroup->vgId, tbInfo.uid, tbInfo.numOfSrcTbls);
594
  }
595

NEW
596
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
×
NEW
597
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
598
    return code;
×
599
  }
600

NEW
601
  return TDB_CODE_SUCCESS;
×
602
}
603

NEW
604
static SSubplan* getVTbScanSubPlan(const SQueryPlan* pPlan) {
×
NEW
605
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
×
NEW
606
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 2);
×
NEW
607
  if (LIST_LENGTH(inner->pNodeList) != 1) {
×
NEW
608
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
NEW
609
    return NULL;
×
610
  }
611

NEW
612
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
×
NEW
613
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
×
NEW
614
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
NEW
615
    return NULL;
×
616
  }
NEW
617
  return plan;
×
618
}
619

620

621
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
364✔
622
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
364✔
623
  if (LIST_LENGTH(inner->pNodeList) != 1) {
364!
UNCOV
624
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
625
    return NULL;
×
626
  }
627

628
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
364✔
629
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
364!
UNCOV
630
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
631
    return NULL;
×
632
  }
633
  return plan;
364✔
634
}
635

NEW
636
static int32_t addVTableMergeTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
×
637
                                  bool useTriggerParam, bool hasAggTasks, SCMCreateStreamReq* pCreate) {
NEW
638
  SVgObj* pVgroup = NULL;
×
NEW
639
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
640
  int32_t vgNum = taosArrayGetSize(pCreate->pVSubTables);
×
641
  
NEW
642
  for (int32_t i = 0; i < vgNum; ++i) {
×
NEW
643
    SVSubTablesRsp* pVg = (SVSubTablesRsp*)taosArrayGet(pCreate->pVSubTables, i);
×
NEW
644
    pVgroup = mndAcquireVgroup(pMnode, pVg->vgId);
×
NEW
645
    if (NULL == pVgroup) {
×
NEW
646
      mWarn("vnode %d in pVSubTables not found", pVg->vgId);
×
NEW
647
      continue;
×
648
    }
649

NEW
650
    code = doAddMergeTask(pMnode, plan, pStream, pEpset, pVgroup, false, useTriggerParam, hasAggTasks, pVg->pTables);
×
NEW
651
    if (code != 0) {
×
NEW
652
      mError("failed to create stream task, code:%s", tstrerror(code));
×
653

NEW
654
      mndReleaseVgroup(pMnode, pVgroup);
×
NEW
655
      return code;
×
656
    }
657

NEW
658
    mndReleaseVgroup(pMnode, pVgroup);
×
659
  }
660

NEW
661
  return code;
×
662
}
663

NEW
664
static int32_t buildMergeTaskHash(SArray* pMergeTaskList, SSHashObj** ppVgTasks) {
×
665
  STaskDispatcherFixed addr;
NEW
666
  int32_t code = 0;
×
NEW
667
  int32_t taskNum = taosArrayGetSize(pMergeTaskList);
×
668

NEW
669
  *ppVgTasks = tSimpleHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
NEW
670
  if (NULL == *ppVgTasks) {
×
NEW
671
    code = terrno;
×
NEW
672
    mError("tSimpleHashInit %d failed", taskNum);
×
NEW
673
    return code;
×
674
  }
675
  
NEW
676
  for (int32_t i = 0; i < taskNum; ++i) {
×
NEW
677
    SStreamTask* pTask = taosArrayGetP(pMergeTaskList, i);
×
678

NEW
679
    addr.taskId = pTask->id.taskId;
×
NEW
680
    addr.nodeId = pTask->info.nodeId;
×
NEW
681
    addr.epSet = pTask->info.epSet;
×
682

NEW
683
    code = tSimpleHashPut(*ppVgTasks, &addr.nodeId, sizeof(addr.nodeId), &addr, sizeof(addr));
×
NEW
684
    if (code) {
×
NEW
685
      mError("tSimpleHashPut %d STaskDispatcherFixed failed", i);
×
NEW
686
      return code;
×
687
    }
688
  }
689

NEW
690
  return code;
×
691
}
692

NEW
693
static int32_t addVTableSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
×
694
                                   int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam, bool hasAggTasks,
695
                                   SCMCreateStreamReq* pCreate, SSHashObj* pVTableMap, SArray* pSourceTaskList,
696
                                   SArray* pMergeTaskList) {
NEW
697
  int32_t code = 0;
×
NEW
698
  SSHashObj* pVgTasks = NULL;
×
NEW
699
  int32_t vgId = 0;
×
NEW
700
  int32_t iter = 0;
×
NEW
701
  SVgObj* pVgroup = NULL;
×
NEW
702
  void* p = NULL;
×
703

NEW
704
  code = buildMergeTaskHash(pMergeTaskList, &pVgTasks);
×
NEW
705
  if (code) {
×
NEW
706
    tSimpleHashCleanup(pVgTasks);
×
NEW
707
    return code;
×
708
  }
709
  
NEW
710
  while (NULL != (p = tSimpleHashIterate(pVTableMap, p, &iter))) {
×
NEW
711
    char* pDbVg = tSimpleHashGetKey(p, NULL);
×
NEW
712
    char* pVgStr = strrchr(pDbVg, '.');
×
NEW
713
    if (NULL == pVgStr) {
×
NEW
714
      mError("Invalid DbVg string: %s", pDbVg);
×
NEW
715
      tSimpleHashCleanup(pVgTasks);
×
NEW
716
      return TSDB_CODE_MND_INTERNAL_ERROR;
×
717
    }
718

NEW
719
    (void)taosStr2int32(pVgStr + 1, &vgId);
×
720
    
NEW
721
    pVgroup = mndAcquireVgroup(pMnode, vgId);
×
NEW
722
    if (NULL == pVgroup) {
×
NEW
723
      mWarn("vnode %d not found", vgId);
×
NEW
724
      continue;
×
725
    }
726

NEW
727
    plan->pVTables = *(SSHashObj**)p;
×
NEW
728
    *(SSHashObj**)p = NULL;
×
729

NEW
730
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam,
×
731
                           hasAggTasks, pVgTasks, pSourceTaskList);
NEW
732
    if (code != 0) {
×
NEW
733
      mError("failed to create stream task, code:%s", tstrerror(code));
×
734

NEW
735
      mndReleaseVgroup(pMnode, pVgroup);
×
NEW
736
      tSimpleHashCleanup(pVgTasks);
×
NEW
737
      return code;
×
738
    }
739

NEW
740
    mndReleaseVgroup(pMnode, pVgroup);
×
741
  }
742

NEW
743
  tSimpleHashCleanup(pVgTasks);
×
744

NEW
745
  return TSDB_CODE_SUCCESS;
×
746
}
747

748
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
1,780✔
749
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam, bool hasAggTasks) {
750
  void*   pIter = NULL;
1,780✔
751
  SSdb*   pSdb = pMnode->pSdb;
1,780✔
752
  int32_t code = addNewTaskList(pStream);
1,780✔
753
  if (code) {
1,780!
UNCOV
754
    return code;
×
755
  }
756

757
  while (1) {
6,217✔
758
    SVgObj* pVgroup = NULL;
7,997✔
759
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
7,997✔
760
    if (pIter == NULL) {
7,997✔
761
      break;
1,780✔
762
    }
763

764
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
6,217✔
765
      sdbRelease(pSdb, pVgroup);
1,539✔
766
      continue;
1,539✔
767
    }
768

769
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, STREAM_NORMAL_TASK,
4,678✔
770
                           useTriggerParam, hasAggTasks, NULL, NULL);
771
    if (code != 0) {
4,678!
UNCOV
772
      mError("failed to create stream task, code:%s", tstrerror(code));
×
773

NEW
774
      mndReleaseVgroup(pMnode, pVgroup);
×
UNCOV
775
      return code;
×
776
    }
777

778
    if (needHistoryTask(pStream)) {
4,678✔
779
      EStreamTaskType type = 0;
2,380✔
780
      if (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE && (pStream->conf.fillHistory == 0)) {
2,380✔
781
        type = STREAM_RECALCUL_TASK; // only the recalculating task
16✔
782
      } else {
783
        type = STREAM_HISTORY_TASK; // set the fill-history option
2,364✔
784
      }
785

786
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, type,
2,380✔
787
                             useTriggerParam, hasAggTasks, NULL, NULL);
788
      if (code != 0) {
2,380!
UNCOV
789
        sdbRelease(pSdb, pVgroup);
×
UNCOV
790
        return code;
×
791
      }
792
    }
793

794
    sdbRelease(pSdb, pVgroup);
4,678✔
795
  }
796

797
  if (needHistoryTask(pStream)) {
1,780✔
798
    setHTasksId(pStream);
825✔
799
  }
800

801
  return TSDB_CODE_SUCCESS;
1,780✔
802
}
803

804
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, EStreamTaskType type, bool useTriggerParam,
283✔
805
                            SStreamTask** pAggTask) {
806
  *pAggTask = NULL;
283✔
807

808
  uint64_t uid = 0;
283✔
809
  SArray** pTaskList = NULL;
283✔
810
  streamGetUidTaskList(pStream, type, &uid, &pTaskList);
283✔
811

812
  int64_t triggerParam = useTriggerParam? pStream->conf.triggerParam:0;
283✔
813
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, type, pStream->conf.trigger,
283✔
814
                                triggerParam, *pTaskList, pStream->conf.fillHistory,
283✔
815
                                pStream->subTableWithoutMd5, 1, pAggTask);
283✔
816
  return code;
283✔
817
}
818

819
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
283✔
820
                            SSnodeObj* pSnode, EStreamTaskType type, bool useTriggerParam) {
821
  int32_t      code = 0;
283✔
822
  SStreamTask* pTask = NULL;
283✔
823
  const char*  id = NULL;
283✔
824

825
  code = buildAggTask(pStream, pEpset, type, useTriggerParam, &pTask);
283✔
826
  if (code != TSDB_CODE_SUCCESS) {
283!
UNCOV
827
    return code;
×
828
  }
829

830
  id = pTask->id.idStr;
283✔
831
  if (pSnode != NULL) {
283✔
832
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
91✔
833
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, (type == STREAM_HISTORY_TASK));
91✔
834
  } else {
835
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
192✔
836
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, (type == STREAM_HISTORY_TASK));
192!
837
  }
838
  return code;
283✔
839
}
840

841
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
210✔
842
  SVgObj*    pVgroup = NULL;
210✔
843
  SSnodeObj* pSnode = NULL;
210✔
844
  int32_t    code = 0;
210✔
845
  if (tsDeployOnSnode) {
210!
846
    pSnode = mndSchedFetchOneSnode(pMnode);
210✔
847
    if (pSnode == NULL) {
210✔
848
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
150✔
849
    }
850
  } else {
UNCOV
851
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
852
  }
853

854
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, STREAM_NORMAL_TASK, useTriggerParam);
210✔
855
  if (code != 0) {
210!
UNCOV
856
    goto END;
×
857
  }
858

859
  if (needHistoryTask(pStream)) {
210✔
860
    EStreamTaskType type = (pStream->conf.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) ? STREAM_RECALCUL_TASK
146✔
861
                                                                                             : STREAM_HISTORY_TASK;
73✔
862
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, type, useTriggerParam);
73✔
863
    if (code != 0) {
73!
UNCOV
864
      goto END;
×
865
    }
866

867
    setHTasksId(pStream);
73✔
868
  }
869

870
END:
137✔
871
  if (pSnode != NULL) {
210✔
872
    sdbRelease(pMnode->pSdb, pSnode);
60✔
873
  } else {
874
    sdbRelease(pMnode->pSdb, pVgroup);
150✔
875
  }
876
  return code;
210✔
877
}
878

879
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
1,535✔
880
  int32_t code = addNewTaskList(pStream);
1,535✔
881
  if (code) {
1,535!
UNCOV
882
    return code;
×
883
  }
884

885
  if (pStream->fixedSinkVgId == 0) {
1,535!
886
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
1,535✔
887
    if (code != 0) {
1,535!
UNCOV
888
      return code;
×
889
    }
890
  } else {
UNCOV
891
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
×
UNCOV
892
    if (code != 0) {
×
UNCOV
893
      return code;
×
894
    }
895
  }
896

897
  if (needHistoryTask(pStream)) {
1,535✔
898
    setHTasksId(pStream);
803✔
899
  }
900

901
  return TDB_CODE_SUCCESS;
1,535✔
902
}
903

904
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
6,268✔
905
  int32_t code = 0;
6,268✔
906
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
6,268!
UNCOV
907
    mError("failed bind task to sink task since %s", tstrerror(code));
×
908
  }
909
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
25,645✔
910
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
19,377✔
911
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
19,377!
UNCOV
912
      mError("failed bind task to sink task since %s", tstrerror(code));
×
913
    }
914
  }
915
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
6,268✔
916
}
6,268✔
917

918
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
261✔
919
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
261✔
920
  SArray** pAggTaskList = taosArrayGetLast(tasks);
261✔
921

922
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
522✔
923
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
261✔
924
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
261✔
925
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
261✔
926
  }
927
}
261✔
928

929
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
2,344✔
930
  int32_t code = 0;
2,344✔
931
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
2,344✔
932
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
2,344✔
933

934
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
8,618✔
935
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
6,274✔
936
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
6,274✔
937

938
    if (hasExtraSink) {
6,274✔
939
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
6,007✔
940
    } else {
941
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
267!
UNCOV
942
        mError("failed bind task to sink task since %s", tstrerror(code));
×
943
      }
944
    }
945
  }
946
}
2,344✔
947

NEW
948
static void bindVtableMergeSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
×
NEW
949
  int32_t code = 0;
×
NEW
950
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
×
NEW
951
  SArray* pMergeTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 2 : SINK_NODE_LEVEL + 1);
×
952

NEW
953
  for (int i = 0; i < taosArrayGetSize(pMergeTaskList); i++) {
×
NEW
954
    SStreamTask* pMergeTask = taosArrayGetP(pMergeTaskList, i);
×
NEW
955
    mDebug("bindVtableMergeSink taskId:%s to sink task list", pMergeTask->id.idStr);
×
956

NEW
957
    if (hasExtraSink) {
×
NEW
958
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pMergeTask);
×
959
    } else {
NEW
960
      if ((code = mndSetSinkTaskInfo(pStream, pMergeTask)) != 0) {
×
NEW
961
        mError("failed bind task to sink task since %s", tstrerror(code));
×
962
      }
963
    }
964
  }
NEW
965
}
×
966

967

968
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
283✔
969
  int32_t code = 0;
283✔
970
  size_t size = taosArrayGetSize(tasks);
283✔
971
  if (size < 2) {
283!
UNCOV
972
    mError("task list size is less than 2");
×
UNCOV
973
    return;
×
974
  }
975
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
283✔
976
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
283✔
977

978
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
283✔
979
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
283✔
980
  for (int i = begin; i < end; i++) {
1,089✔
981
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
806✔
982
    pUpTask->info.selfChildId = i - begin;
806✔
983
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
806✔
984
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
806!
UNCOV
985
      mError("failed bind task to sink task since %s", tstrerror(code));
×
986
    }
987
  }
988
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
283✔
989
}
990

NEW
991
int32_t tableHashValueComp(void const* lp, void const* rp) {
×
NEW
992
  uint32_t*    key = (uint32_t*)lp;
×
NEW
993
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
994

NEW
995
  if (*key < pVg->hashBegin) {
×
NEW
996
    return -1;
×
NEW
997
  } else if (*key > pVg->hashEnd) {
×
NEW
998
    return 1;
×
999
  }
1000

NEW
1001
  return 0;
×
1002
}
1003

1004

NEW
1005
int dbVgInfoComp(const void* lp, const void* rp) {
×
NEW
1006
  SVGroupHashInfo* pLeft = (SVGroupHashInfo*)lp;
×
NEW
1007
  SVGroupHashInfo* pRight = (SVGroupHashInfo*)rp;
×
NEW
1008
  if (pLeft->hashBegin < pRight->hashBegin) {
×
NEW
1009
    return -1;
×
NEW
1010
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
NEW
1011
    return 1;
×
1012
  }
1013

NEW
1014
  return 0;
×
1015
}
1016

NEW
1017
int32_t getTableVgId(SDBVgHashInfo* dbInfo, int32_t acctId, char* dbFName, int32_t* vgId, char *tbName) {
×
UNCOV
1018
  int32_t code = 0;
×
NEW
1019
  int32_t lino = 0;
×
NEW
1020
  SVgroupInfo* vgInfo = NULL;
×
1021
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
NEW
1022
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbFName, tbName);
×
NEW
1023
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
×
NEW
1024
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
1025

NEW
1026
  if (!dbInfo->vgSorted) {
×
NEW
1027
    taosArraySort(dbInfo->vgArray, dbVgInfoComp);
×
NEW
1028
    dbInfo->vgSorted = true;
×
1029
  }
1030

NEW
1031
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, tableHashValueComp, TD_EQ);
×
NEW
1032
  if (NULL == vgInfo) {
×
NEW
1033
    qError("no hash range found for hash value [%u], dbFName:%s, numOfVgId:%d", hashValue, dbFName,
×
1034
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
NEW
1035
    return TSDB_CODE_INVALID_PARA;
×
1036
  }
1037

NEW
1038
  *vgId = vgInfo->vgId;
×
1039

NEW
1040
_return:
×
1041

NEW
1042
  return code;
×
1043
}
1044

1045

NEW
1046
static void destroyVSubtableVtb(SSHashObj *pVtable) {
×
NEW
1047
  int32_t iter = 0;
×
NEW
1048
  void* p = NULL;
×
NEW
1049
  while (NULL != (p = tSimpleHashIterate(pVtable, p, &iter))) {
×
NEW
1050
    taosArrayDestroy(*(SArray**)p);
×
1051
  }
1052

NEW
1053
  tSimpleHashCleanup(pVtable);
×
NEW
1054
}
×
1055

NEW
1056
static void destroyVSubtableVgHash(SSHashObj *pVg) {
×
NEW
1057
  int32_t iter = 0;
×
NEW
1058
  SSHashObj** pVtable = NULL;
×
NEW
1059
  void* p = NULL;
×
NEW
1060
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
NEW
1061
    pVtable = (SSHashObj**)p;
×
NEW
1062
    destroyVSubtableVtb(*pVtable);
×
1063
  }
1064

NEW
1065
  tSimpleHashCleanup(pVg);
×
NEW
1066
}
×
1067

NEW
1068
static void destroyDbVgroupsHash(SSHashObj *pDbVgs) {
×
NEW
1069
  int32_t iter = 0;
×
NEW
1070
  SDBVgHashInfo* pVg = NULL;
×
NEW
1071
  void* p = NULL;
×
NEW
1072
  while (NULL != (p = tSimpleHashIterate(pDbVgs, p, &iter))) {
×
NEW
1073
    pVg = (SDBVgHashInfo*)p;
×
NEW
1074
    taosArrayDestroy(pVg->vgArray);
×
1075
  }
1076
  
NEW
1077
  tSimpleHashCleanup(pDbVgs);
×
NEW
1078
}
×
1079

NEW
1080
static int32_t buildDBVgroupsMap(SMnode* pMnode, SSHashObj* pDbVgroup) {
×
NEW
1081
  void*   pIter = NULL;
×
NEW
1082
  SSdb*   pSdb = pMnode->pSdb;
×
NEW
1083
  int32_t code = TSDB_CODE_SUCCESS;
×
1084
  char    key[TSDB_DB_NAME_LEN + 32];
NEW
1085
  SArray* pTarget = NULL;
×
NEW
1086
  SArray* pNew = NULL;
×
NEW
1087
  SDbObj* pDb = NULL;
×
NEW
1088
  SDBVgHashInfo dbInfo = {0}, *pDbInfo = NULL;
×
1089

NEW
1090
  while (1) {
×
NEW
1091
    SVgObj* pVgroup = NULL;
×
NEW
1092
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
×
NEW
1093
    if (pIter == NULL) {
×
NEW
1094
      break;
×
1095
    }
1096

NEW
1097
    pDbInfo = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1);
×
NEW
1098
    if (NULL == pDbInfo) {
×
NEW
1099
      pNew = taosArrayInit(20, sizeof(SVGroupHashInfo));
×
NEW
1100
      if (NULL == pNew) {
×
NEW
1101
        code = terrno;
×
NEW
1102
        mError("taosArrayInit SVGroupHashInfo failed, code:%s", tstrerror(terrno));
×
NEW
1103
        sdbRelease(pSdb, pVgroup);
×
NEW
1104
        return code;
×
1105
      }      
1106

NEW
1107
      pDb = mndAcquireDb(pMnode, pVgroup->dbName);
×
NEW
1108
      if (pDb == NULL) {
×
NEW
1109
        code = terrno;
×
NEW
1110
        mError("mndAcquireDb %s failed, code:%s", pVgroup->dbName, tstrerror(terrno));
×
NEW
1111
        sdbRelease(pSdb, pVgroup);
×
NEW
1112
        return code;
×
1113
      }
1114

NEW
1115
      dbInfo.vgSorted = false;
×
NEW
1116
      dbInfo.hashMethod = pDb->cfg.hashMethod;
×
NEW
1117
      dbInfo.hashPrefix = pDb->cfg.hashPrefix;
×
NEW
1118
      dbInfo.hashSuffix = pDb->cfg.hashSuffix;
×
NEW
1119
      dbInfo.vgArray = pNew;
×
1120
      
NEW
1121
      mndReleaseDb(pMnode, pDb);
×
1122

NEW
1123
      pTarget = pNew;
×
1124
    } else {
NEW
1125
      pTarget = pDbInfo->vgArray;
×
1126
    }
1127

NEW
1128
    SVGroupHashInfo vgInfo = {.vgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd};
×
NEW
1129
    if (NULL == taosArrayPush(pTarget, &vgInfo)) {
×
NEW
1130
      code = terrno;
×
NEW
1131
      mError("taosArrayPush SVGroupHashInfo failed, code:%s", tstrerror(terrno));
×
NEW
1132
      taosArrayDestroy(pNew);
×
NEW
1133
      sdbRelease(pSdb, pVgroup);
×
NEW
1134
      return code;
×
1135
    }
1136

NEW
1137
    if (NULL == pDbInfo) {
×
NEW
1138
      code = tSimpleHashPut(pDbVgroup, pVgroup->dbName, strlen(pVgroup->dbName) + 1, &dbInfo, sizeof(dbInfo));
×
NEW
1139
      if (code != 0) {
×
NEW
1140
        mError("tSimpleHashPut SDBVgHashInfo failed, code:%s", tstrerror(code));
×
NEW
1141
        taosArrayDestroy(pNew);
×
NEW
1142
        sdbRelease(pSdb, pVgroup);
×
NEW
1143
        return code;
×
1144
      }
1145
      
NEW
1146
      pNew = NULL;
×
1147
    }
1148

NEW
1149
    sdbRelease(pSdb, pVgroup);
×
1150
  }
1151

NEW
1152
  return code;
×
1153
}
1154

NEW
1155
static int32_t addVTableToVnode(SSHashObj* pVg, int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SStreamVBuildCtx* pCtx) {
×
NEW
1156
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
1157
  int32_t lino = 0;
×
NEW
1158
  SSHashObj* pNewVtable = NULL;
×
NEW
1159
  SArray* pNewOtable = NULL, *pTarOtable = NULL;
×
1160
  SColIdName col;
1161
  char vId[sizeof(int32_t) + sizeof(uint64_t)];
NEW
1162
  *(int32_t*)vId = vvgId;
×
NEW
1163
  *(uint64_t*)((int32_t*)vId + 1) = vuid;
×
1164

NEW
1165
  pCtx->lastUid = vuid;
×
1166

NEW
1167
  SSHashObj** pVtable = (SSHashObj**)tSimpleHashGet(pVg, vId, sizeof(vId));
×
NEW
1168
  if (NULL == pVtable) {
×
NEW
1169
    pNewVtable = (SSHashObj*)tSimpleHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
NEW
1170
    TSDB_CHECK_NULL(pNewVtable, code, lino, _return, terrno);
×
NEW
1171
    pNewOtable = taosArrayInit(4, sizeof(SColIdName));
×
NEW
1172
    TSDB_CHECK_NULL(pNewOtable, code, lino, _return, terrno);
×
NEW
1173
    tSimpleHashSetFreeFp(pNewVtable, tFreeStreamVtbOtbInfo);
×
NEW
1174
    col.colId = pCol->colId;
×
NEW
1175
    col.colName = taosStrdup(pCol->refColName);
×
NEW
1176
    TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);
×
NEW
1177
    TSDB_CHECK_NULL(taosArrayPush(pNewOtable, &col), code, lino, _return, terrno);
×
NEW
1178
    TSDB_CHECK_CODE(tSimpleHashPut(pNewVtable, pCol->refTableName, strlen(pCol->refTableName) + 1, &pNewOtable, POINTER_BYTES), lino, _return);
×
NEW
1179
    TSDB_CHECK_CODE(tSimpleHashPut(pVg, vId, sizeof(vId), &pNewVtable, POINTER_BYTES), lino, _return);
×
1180

NEW
1181
    pCtx->lastVtable = pNewVtable;
×
NEW
1182
    pCtx->lastOtable = pNewOtable;
×
1183

NEW
1184
    return code;
×
1185
  }
1186
  
NEW
1187
  SArray** pOtable = tSimpleHashGet(*pVtable, pCol->refTableName, strlen(pCol->refTableName) + 1);
×
NEW
1188
  if (NULL == pOtable) {
×
NEW
1189
    pNewOtable = taosArrayInit(4, sizeof(SColIdName));
×
NEW
1190
    TSDB_CHECK_NULL(pNewOtable, code, lino, _return, terrno);
×
NEW
1191
    pTarOtable = pNewOtable;
×
1192
  } else {
NEW
1193
    pTarOtable = *pOtable;
×
1194
  }
1195
  
NEW
1196
  col.colId = pCol->colId;
×
NEW
1197
  col.colName = taosStrdup(pCol->refColName);
×
NEW
1198
  TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);  
×
NEW
1199
  TSDB_CHECK_NULL(taosArrayPush(pTarOtable, &col), code, lino, _return, terrno);
×
NEW
1200
  if (NULL == pOtable) {
×
NEW
1201
    TSDB_CHECK_CODE(tSimpleHashPut(*pVtable, pCol->refTableName, strlen(pCol->refTableName) + 1, &pNewOtable, POINTER_BYTES), lino, _return);
×
1202
  }
1203

NEW
1204
  pCtx->lastVtable = *pVtable;
×
NEW
1205
  pCtx->lastOtable = pTarOtable;
×
1206

NEW
1207
_return:
×
1208

NEW
1209
  if (code) {
×
NEW
1210
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1211
  }
1212

NEW
1213
  return code;
×
1214
}
1215

NEW
1216
static int32_t addVgroupToRes(char* fDBName, int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SDBVgHashInfo* pDb, SSHashObj* pRes, SStreamVBuildCtx* pCtx) {
×
NEW
1217
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
1218
  int32_t lino = 0;
×
NEW
1219
  int32_t vgId = 0;
×
1220
  char dbVgId[TSDB_DB_NAME_LEN + 32];
NEW
1221
  SSHashObj *pTarVg = NULL, *pNewVg = NULL;
×
1222
  
NEW
1223
  TSDB_CHECK_CODE(getTableVgId(pDb, 1, fDBName, &vgId, pCol->refColName), lino, _return);
×
1224

NEW
1225
  snprintf(dbVgId, sizeof(dbVgId), "%s.%d", pCol->refDbName, vgId);
×
1226

NEW
1227
  SSHashObj** pVg = (SSHashObj**)tSimpleHashGet(pRes, dbVgId, strlen(dbVgId) + 1);
×
NEW
1228
  if (NULL == pVg) {
×
NEW
1229
    pNewVg = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
NEW
1230
    TSDB_CHECK_NULL(pNewVg, code, lino, _return, terrno);
×
NEW
1231
    tSimpleHashSetFreeFp(pNewVg, tFreeStreamVtbVtbInfo);
×
NEW
1232
    pTarVg = pNewVg;
×
1233
  } else {
NEW
1234
    pTarVg = *pVg;
×
1235
  }
1236

NEW
1237
  TSDB_CHECK_CODE(addVTableToVnode(pTarVg, vvgId, vuid, pCol, pCtx), lino, _return);
×
1238

NEW
1239
  if (NULL == pVg) {
×
NEW
1240
    TSDB_CHECK_CODE(tSimpleHashPut(pRes, dbVgId, strlen(dbVgId) + 1, &pNewVg, POINTER_BYTES), lino, _return);
×
NEW
1241
    pNewVg = NULL;
×
1242
  }
1243

NEW
1244
  pCtx->lastVg = pTarVg;
×
1245

NEW
1246
_return:
×
1247

NEW
1248
  if (code) {
×
NEW
1249
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1250
  }
1251

NEW
1252
  destroyVSubtableVgHash(pNewVg);
×
1253

NEW
1254
  return code;
×
1255
}
1256

NEW
1257
static int32_t addRefColToMap(int32_t vvgId, uint64_t vuid, SRefColInfo* pCol, SSHashObj* pDbVgroups, SSHashObj* pRes, SStreamVBuildCtx* pCtx) {
×
NEW
1258
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
1259
  int32_t lino = 0;
×
NEW
1260
  bool isLastVtable = vuid == pCtx->lastUid;
×
NEW
1261
  SSHashObj* currOtable = NULL;
×
1262
  SColIdName col;
1263
  char fDBName[TSDB_DB_FNAME_LEN];
1264
  
NEW
1265
  if (pCtx->lastCol && pCtx->lastCol->refDbName[0] == pCol->refDbName[0] && pCtx->lastCol->refTableName[0] == pCol->refTableName[0] &&
×
NEW
1266
     0 == strcmp(pCtx->lastCol->refDbName, pCol->refDbName) && 0 == strcmp(pCtx->lastCol->refTableName, pCol->refTableName)) {
×
NEW
1267
    if (isLastVtable) {
×
NEW
1268
      col.colId = pCol->colId;
×
NEW
1269
      col.colName = taosStrdup(pCol->refColName);
×
NEW
1270
      TSDB_CHECK_NULL(col.colName, code, lino, _return, terrno);
×
NEW
1271
      TSDB_CHECK_NULL(taosArrayPush(pCtx->lastOtable, &col), code, lino, _return, terrno);
×
NEW
1272
      return code;
×
1273
    }
1274

NEW
1275
    TSDB_CHECK_CODE(addVTableToVnode(pCtx->lastVg, vvgId, vuid, pCol, pCtx), lino, _return);
×
NEW
1276
    return code;
×
1277
  }
1278

NEW
1279
  snprintf(fDBName, sizeof(fDBName), "1.%s", pCol->refDbName);
×
NEW
1280
  SDBVgHashInfo* pDb = (SDBVgHashInfo*)tSimpleHashGet(pDbVgroups, fDBName, strlen(fDBName) + 1);
×
NEW
1281
  if (NULL == pDb) {
×
NEW
1282
    mError("refDb %s does not exist", pCol->refDbName);
×
NEW
1283
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
NEW
1284
    goto _return;
×
1285
  }
1286

NEW
1287
  TSDB_CHECK_CODE(addVgroupToRes(fDBName, vvgId, vuid, pCol, pDb, pRes, pCtx), lino, _return);
×
1288

NEW
1289
  pCtx->lastCol = pCol;
×
1290

NEW
1291
_return:
×
1292

NEW
1293
  if (code) {
×
NEW
1294
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1295
  }
1296

NEW
1297
  return code;
×
1298
}
1299

NEW
1300
static int32_t buildVSubtableMap(SMnode* pMnode, SArray* pVSubTables, SSHashObj** ppRes) {
×
NEW
1301
  int32_t code = 0;
×
NEW
1302
  int32_t lino = 0;
×
1303

NEW
1304
  SSHashObj* pDbVgroups = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
NEW
1305
  if (NULL == pDbVgroups) {
×
NEW
1306
    mError("tSimpleHashInit failed, error:%s", tstrerror(terrno));
×
NEW
1307
    return terrno;
×
1308
  }
1309
  
NEW
1310
  TAOS_CHECK_EXIT(buildDBVgroupsMap(pMnode, pDbVgroups));
×
1311

NEW
1312
  *ppRes = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
NEW
1313
  if (NULL == *ppRes) {
×
NEW
1314
    code = terrno;
×
NEW
1315
    mError("tSimpleHashInit failed, error:%s", tstrerror(terrno));
×
NEW
1316
    goto _exit;
×
1317
  }
NEW
1318
  tSimpleHashSetFreeFp(*ppRes, tFreeStreamVtbDbVgInfo);
×
1319

NEW
1320
  SStreamVBuildCtx ctx = {0};
×
NEW
1321
  int32_t vgNum = taosArrayGetSize(pVSubTables);
×
NEW
1322
  for (int32_t i = 0; i < vgNum; ++i) {
×
NEW
1323
    SVSubTablesRsp* pVgTbs = taosArrayGet(pVSubTables, i);
×
NEW
1324
    int32_t tbNum = taosArrayGetSize(pVgTbs->pTables);
×
NEW
1325
    for (int32_t n = 0; n < tbNum; ++n) {
×
NEW
1326
      SVCTableRefCols* pTb = (SVCTableRefCols*)taosArrayGetP(pVgTbs->pTables, n);
×
NEW
1327
      for (int32_t m = 0; m < pTb->numOfColRefs; ++m) {
×
NEW
1328
        SRefColInfo* pCol = pTb->refCols + m;
×
NEW
1329
        TAOS_CHECK_EXIT(addRefColToMap(pVgTbs->vgId, pTb->uid, pCol, pDbVgroups, *ppRes, &ctx));
×
1330
      }
1331
    }
1332
  }
1333

NEW
1334
_exit:
×
1335

NEW
1336
  if (code) {
×
NEW
1337
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1338
  }
1339

NEW
1340
  destroyDbVgroupsHash(pDbVgroups);
×
1341

NEW
1342
  return code;
×
1343
}
1344

1345
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, SCMCreateStreamReq* pCreate) {
1,780✔
1346
  int32_t code = 0;
1,780✔
1347
  bool    isVTableStream = (NULL != pCreate->pVSubTables);
1,780✔
1348
  int64_t skey = pCreate->lastTs;
1,780✔
1349
  SArray* pVerList = pCreate->pVgroupVerList;
1,780✔
1350
  SSdb*   pSdb = pMnode->pSdb;
1,780✔
1351
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,780!
1352
  bool    hasExtraSink = false;
1,780✔
1353
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
1,780✔
1354
  SSubplan* plan = NULL;
1,780✔
1355
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
1,780✔
1356

1357
  if (pDbObj == NULL) {
1,780!
UNCOV
1358
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
1359
    TAOS_RETURN(code);
×
1360
  }
1361

1362
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
1,780✔
1363
  sdbRelease(pSdb, pDbObj);
1,780✔
1364

1365
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
1,780✔
1366
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
1367

1368
  pStream->pTaskList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,780✔
1369
  pStream->pHTaskList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,780✔
1370
  if (pStream->pTaskList == NULL || pStream->pHTaskList == NULL) {
1,780!
UNCOV
1371
    mError("failed to create stream obj, code:%s", tstrerror(terrno));
×
UNCOV
1372
    return terrno;
×
1373
  }
1374

1375
  if (pCreate->pVSubTables) {
1,780!
NEW
1376
    code = buildVSubtableMap(pMnode, pCreate->pVSubTables, &pStream->pVTableMap);
×
NEW
1377
    if (TSDB_CODE_SUCCESS != code) {
×
NEW
1378
      mError("failed to buildVSubtableMap, code:%s", tstrerror(terrno));
×
NEW
1379
      return code;
×
1380
    }
1381
  }
1382

1383
  if ((numOfPlanLevel > 1 && !isVTableStream) || (numOfPlanLevel > 2 && isVTableStream) || externalTargetDB ||
1,780!
1384
      multiTarget || pStream->fixedSinkVgId) {
245!
1385
    // add extra sink
1386
    hasExtraSink = true;
1,535✔
1387
    code = addSinkTask(pMnode, pStream, pEpset);
1,535✔
1388
    if (code != TSDB_CODE_SUCCESS) {
1,535!
UNCOV
1389
      return code;
×
1390
    }
1391
  }
1392

1393
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
1,780✔
1394

1395
  int8_t hasAggTasks = (numOfPlanLevel > 1) ? 1 : 0;  // task level is greater than 1, which means agg existing
1,780✔
1396
  if (pStream->pVTableMap) {
1,780!
NEW
1397
    code = addNewTaskList(pStream);
×
NEW
1398
    if (code) {
×
NEW
1399
      return code;
×
1400
    }
1401

NEW
1402
    plan = getVTbScanSubPlan(pPlan);
×
NEW
1403
    if (plan == NULL) {
×
NEW
1404
      mError("fail to get vtable scan plan");
×
NEW
1405
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
1406
      if (terrno != 0) code = terrno;
×
NEW
1407
      TAOS_RETURN(code);
×
1408
    }
1409

NEW
1410
    SArray** pSourceTaskList = taosArrayGetLast(pStream->pTaskList);
×
1411

NEW
1412
    code = addNewTaskList(pStream);
×
NEW
1413
    if (code) {
×
NEW
1414
      return code;
×
1415
    }
NEW
1416
    code = addVTableMergeTask(pMnode, plan, pStream, pEpset, (numOfPlanLevel == 1), hasAggTasks, pCreate);
×
NEW
1417
    if (code) {
×
NEW
1418
      return code;
×
1419
    }
1420

NEW
1421
    plan = getScanSubPlan(pPlan);  // source plan
×
NEW
1422
    if (plan == NULL) {
×
NEW
1423
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
1424
      if (terrno != 0) code = terrno;
×
NEW
1425
      TAOS_RETURN(code);
×
1426
    }
1427

NEW
1428
    SArray** pMergeTaskList = taosArrayGetLast(pStream->pTaskList);
×
NEW
1429
    code = addVTableSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1), hasAggTasks,
×
1430
                               pCreate, pStream->pVTableMap, *pSourceTaskList, *pMergeTaskList);
1431
  } else {
1432
    plan = getScanSubPlan(pPlan);  // source plan
1,780✔
1433
    if (plan == NULL) {
1,780!
NEW
1434
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
1435
      if (terrno != 0) code = terrno;
×
NEW
1436
      TAOS_RETURN(code);
×
1437
    }
1438

1439
    code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1), hasAggTasks);
1,780✔
1440
  }
1441
  if (code != TSDB_CODE_SUCCESS) {
1,780!
UNCOV
1442
    return code;
×
1443
  }
1444

1445
  if ((numOfPlanLevel == 1 && !isVTableStream)) {
1,780!
1446
    bindSourceSink(pStream, pMnode, pStream->pTaskList, hasExtraSink);
1,582✔
1447
    if (needHistoryTask(pStream)) {
1,582✔
1448
      bindSourceSink(pStream, pMnode, pStream->pHTaskList, hasExtraSink);
762✔
1449
    }
1450
    return TDB_CODE_SUCCESS;
1,582✔
1451
  }
1452

1453
  if (numOfPlanLevel == 2 && isVTableStream) {
198!
NEW
1454
    bindVtableMergeSink(pStream, pMnode, pStream->pTaskList, hasExtraSink);
×
NEW
1455
    return TDB_CODE_SUCCESS;
×
1456
  }
1457

1458
  if ((numOfPlanLevel == 3 && !isVTableStream) || (numOfPlanLevel == 4 && isVTableStream)) {
198!
1459
    int32_t idx = isVTableStream ? 2 : 1;
166!
1460
    plan = getAggSubPlan(pPlan, idx);  // middle agg plan
166✔
1461
    if (plan == NULL) {
166!
UNCOV
1462
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1463
      if (terrno != 0) code = terrno;
×
UNCOV
1464
      TAOS_RETURN(code);
×
1465
    }
1466

1467
    do {
5✔
1468
      SArray** list = taosArrayGetLast(pStream->pTaskList);
171✔
1469
      float    size = (float)taosArrayGetSize(*list);
171✔
1470
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
171✔
1471
      if (cnt <= 1) break;
171✔
1472

1473
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
5!
1474
      code = addNewTaskList(pStream);
5✔
1475
      if (code) {
5!
UNCOV
1476
        return code;
×
1477
      }
1478

1479
      for (int j = 0; j < cnt; j++) {
17✔
1480
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
12✔
1481
        if (code != TSDB_CODE_SUCCESS) {
12!
UNCOV
1482
          return code;
×
1483
        }
1484

1485
        bindTwoLevel(pStream->pTaskList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
12✔
1486
        if (needHistoryTask(pStream)) {
12✔
1487
          bindTwoLevel(pStream->pHTaskList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
10✔
1488
        }
1489
      }
1490
    } while (1);
1491
  }
1492

1493
  plan = getAggSubPlan(pPlan, 0);
198✔
1494
  if (plan == NULL) {
198!
UNCOV
1495
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
1496
    if (terrno != 0) code = terrno;
×
UNCOV
1497
    TAOS_RETURN(code);
×
1498
  }
1499

1500
  mDebug("doScheduleStream add final agg");
198✔
1501
  SArray** list = taosArrayGetLast(pStream->pTaskList);
198✔
1502
  size_t   size = taosArrayGetSize(*list);
198✔
1503

1504
  code = addNewTaskList(pStream);
198✔
1505
  if (code) {
198!
UNCOV
1506
    return code;
×
1507
  }
1508

1509
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
198✔
1510
  if (code != TSDB_CODE_SUCCESS) {
198!
UNCOV
1511
    TAOS_RETURN(code);
×
1512
  }
1513
  bindTwoLevel(pStream->pTaskList, 0, size);
198✔
1514
  if (needHistoryTask(pStream)) {
198✔
1515
    bindTwoLevel(pStream->pHTaskList, 0, size);
63✔
1516
  }
1517

1518
  bindAggSink(pStream, pMnode, pStream->pTaskList);
198✔
1519
  if (needHistoryTask(pStream)) {
198✔
1520
    bindAggSink(pStream, pMnode, pStream->pHTaskList);
63✔
1521
  }
1522
  TAOS_RETURN(code);
198✔
1523
}
1524

1525
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, SCMCreateStreamReq* pCreate) {
1,780✔
1526
  int32_t     code = 0;
1,780✔
1527
  pStream->pPlan = qStringToQueryPlan(pStream->physicalPlan);
1,780✔
1528
  if (pStream->pPlan == NULL) {
1,780!
UNCOV
1529
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
1530
    TAOS_RETURN(code);
×
1531
  }
1532

1533
  SEpSet mnodeEpset = {0};
1,780✔
1534
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
1,780✔
1535

1536
  code = doScheduleStream(pStream, pMnode, pStream->pPlan, &mnodeEpset, pCreate);
1,780✔
1537

1538
  TAOS_RETURN(code);
1,780✔
1539
}
1540
#endif
1541

1542
#ifdef USE_TOPIC
1543
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
498✔
1544
  int32_t     code = 0;
498✔
1545
  SSdb*       pSdb = pMnode->pSdb;
498✔
1546
  SVgObj*     pVgroup = NULL;
498✔
1547
  SQueryPlan* pPlan = NULL;
498✔
1548
  SSubplan*   pSubplan = NULL;
498✔
1549

1550
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
498✔
1551
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
412✔
1552
    if (pPlan == NULL) {
412!
UNCOV
1553
      return TSDB_CODE_QRY_INVALID_INPUT;
×
1554
    }
1555
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
86✔
1556
    SNode* pAst = NULL;
6✔
1557
    code = nodesStringToNode(pTopic->ast, &pAst);
6✔
1558
    if (code != 0) {
6!
UNCOV
1559
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
UNCOV
1560
      return code;
×
1561
    }
1562

1563
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
6✔
1564
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
6✔
1565
    if (code != 0) {
6!
UNCOV
1566
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
UNCOV
1567
      nodesDestroyNode(pAst);
×
UNCOV
1568
      return code;
×
1569
    }
1570
    nodesDestroyNode(pAst);
6✔
1571
  }
1572

1573
  if (pPlan) {
498✔
1574
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
418!
1575
    if (levelNum != 1) {
418!
UNCOV
1576
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
UNCOV
1577
      goto END;
×
1578
    }
1579

1580
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
418✔
1581
    if (pNodeListNode == NULL){
418!
1582
      code = TSDB_CODE_OUT_OF_MEMORY;
×
1583
      goto END;
×
1584
    }
1585
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
418!
1586
    if (opNum != 1) {
418!
UNCOV
1587
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
UNCOV
1588
      goto END;
×
1589
    }
1590

1591
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
418✔
1592
  }
1593

1594
  void* pIter = NULL;
498✔
1595
  while (1) {
2,792✔
1596
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
3,290✔
1597
    if (pIter == NULL) {
3,290✔
1598
      break;
498✔
1599
    }
1600

1601
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
2,792✔
1602
      sdbRelease(pSdb, pVgroup);
1,583✔
1603
      continue;
1,583✔
1604
    }
1605

1606
    pSub->vgNum++;
1,209✔
1607

1608
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
1,209!
1609
    if (pVgEp == NULL){
1,209!
UNCOV
1610
      code = terrno;
×
UNCOV
1611
      goto END;
×
1612
    }
1613
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,209✔
1614
    pVgEp->vgId = pVgroup->vgId;
1,209✔
1615
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
2,418!
UNCOV
1616
      code = terrno;
×
UNCOV
1617
      taosMemoryFree(pVgEp);
×
UNCOV
1618
      goto END;
×
1619
    }
1620
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
1,209!
1621
    sdbRelease(pSdb, pVgroup);
1,209✔
1622
  }
1623

1624
  if (pSubplan) {
498✔
1625
    int32_t msgLen;
1626

1627
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
418!
UNCOV
1628
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
1629
      goto END;
×
1630
    }
1631
  } else {
1632
    pSub->qmsg = taosStrdup("");
80!
1633
  }
1634

1635
END:
498✔
1636
  qDestroyQueryPlan(pPlan);
498✔
1637
  return code;
498✔
1638
}
1639
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc