• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3526

10 Nov 2024 03:50AM UTC coverage: 60.225% (-0.6%) from 60.818%
#3526

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

117031 of 249004 branches covered (47.0%)

Branch coverage included in aggregate %.

130 of 169 new or added lines in 23 files covered. (76.92%)

4149 existing lines in 176 files now uncovered.

197577 of 273386 relevant lines covered (72.27%)

5840219.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.04
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

30
static bool hasCountWindowNode(SPhysiNode* pNode) {
6,386✔
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
6,386✔
32
    return true;
2✔
33
  } else {
34
    size_t size = LIST_LENGTH(pNode->pChildren);
6,384✔
35

36
    for (int32_t i = 0; i < size; ++i) {
9,812✔
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
3,428✔
38
      if (hasCountWindowNode(pChild)) {
3,428!
UNCOV
39
        return true;
×
40
      }
41
    }
42

43
    return false;
6,384✔
44
  }
45
}
46

47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
2,958✔
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
2,958✔
49
}
50

51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
8✔
52
                           int64_t watermark, int64_t deleteMark) {
53
  int32_t     code = 0;
8✔
54
  SNode*      pAst = NULL;
8✔
55
  SQueryPlan* pPlan = NULL;
8✔
56

57
  if (nodesStringToNode(ast, &pAst) < 0) {
8!
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
8!
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

67
  SPlanContext cxt = {
8✔
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
8!
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
8!
83
  if (levelNum != 1) {
8!
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
8✔
88

89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
8!
90
  if (opNum != 1) {
8!
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
8✔
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
8!
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

101
END:
8✔
102
  if (pAst) nodesDestroyNode(pAst);
8!
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
8!
104
  TAOS_RETURN(code);
8✔
105
}
106

107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
3,946✔
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
3,946✔
109

110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
3,946!
111

112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
3,946✔
113
    pInfo->type = TASK_OUTPUT__SMA;
60✔
114
    pInfo->smaSink.smaId = pStream->smaId;
60✔
115
  } else {
116
    pInfo->type = TASK_OUTPUT__TABLE;
3,886✔
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
3,886✔
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
3,886✔
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
3,886!
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
3,886!
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

125
  return 0;
3,946✔
126
}
127

128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
3,540✔
129
                                        SStreamTask* pTask) {
130
  int32_t code = 0;
3,540✔
131
  bool isShuffle = false;
3,540✔
132

133
  if (pStream->fixedSinkVgId == 0) {
3,540✔
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
3,480✔
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
3,480!
136
      isShuffle = true;
3,424✔
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
3,424✔
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
3,424✔
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
3,424!
140
    }
141

142
    sdbRelease(pMnode->pSdb, pDb);
3,480✔
143
  }
144

145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
3,540✔
146

147
  if (isShuffle) {
3,540✔
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
3,424✔
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
3,424✔
150

151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
3,424✔
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
14,348✔
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
10,924✔
154

155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
25,109!
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
25,109✔
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
25,109✔
158
          pVgInfo->taskId = pSinkTask->id.taskId;
10,924✔
159
          break;
10,924✔
160
        }
161
      }
162
    }
163
  } else {
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
116✔
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
116✔
166
  }
167

168
  TAOS_RETURN(code);
3,540✔
169
}
170

171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
4,311✔
172
  int32_t msgLen;
173

174
  pTask->info.nodeId = pVgroup->vgId;
4,311✔
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
4,311✔
176

177
  plan->execNode.nodeId = pTask->info.nodeId;
4,311✔
178
  plan->execNode.epSet = pTask->info.epSet;
4,311✔
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
4,311✔
180
}
181

182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
165✔
183
  SSnodeObj* pObj = NULL;
165✔
184
  void*      pIter = NULL;
165✔
185
  // TODO random fetch
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
165✔
187
  sdbCancelFetch(pMnode->pSdb, pIter);
165✔
188
  return pObj;
165✔
189
}
190

191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
4✔
192
  int32_t msgLen;
193

194
  pTask->info.nodeId = SNODE_HANDLE;
4✔
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
4✔
196

197
  plan->execNode.nodeId = SNODE_HANDLE;
4✔
198
  plan->execNode.epSet = pTask->info.epSet;
4✔
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
4!
200

201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
4✔
202
}
203

204
// random choose a node to do compute
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
162✔
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
162✔
207
  if (pDbObj == NULL) {
162!
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

212
  if (pStream->indexForMultiAggBalance == -1) {
162✔
213
    taosSeedRand(taosSafeRand());
150✔
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
150✔
215
  }
216

217
  int32_t index = 0;
162✔
218
  void*   pIter = NULL;
162✔
219
  SVgObj* pVgroup = NULL;
162✔
220
  while (1) {
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
613✔
222
    if (pIter == NULL) break;
613!
223
    if (pVgroup->dbUid != pStream->sourceDbUid) {
613✔
224
      sdbRelease(pMnode->pSdb, pVgroup);
258✔
225
      continue;
258✔
226
    }
227
    if (index++ == pStream->indexForMultiAggBalance) {
355✔
228
      pStream->indexForMultiAggBalance++;
162✔
229
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
162✔
230
      sdbCancelFetch(pMnode->pSdb, pIter);
162✔
231
      break;
162✔
232
    }
233
    sdbRelease(pMnode->pSdb, pVgroup);
193✔
234
  }
235
  sdbRelease(pMnode->pSdb, pDbObj);
162✔
236

237
  return pVgroup;
162✔
238
}
239

240
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
3,765✔
241
  int64_t  uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
3,765✔
242
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
3,765✔
243

244
  SStreamTask* pTask = NULL;
3,765✔
245
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, 0, *pTaskList, pStream->conf.fillHistory,
3,765✔
246
                                pStream->subTableWithoutMd5, &pTask);
3,765✔
247
  if (code != 0) {
3,765!
248
    return code;
×
249
  }
250

251
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, isFillhistory);
3,765!
252

253
  pTask->info.nodeId = pVgroup->vgId;
3,765✔
254
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
3,765✔
255
  return mndSetSinkTaskInfo(pStream, pTask);
3,765✔
256
}
257

258
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
2,325✔
259
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
2,325✔
260
  if (code != 0) {
2,325!
261
    return code;
×
262
  }
263
  if (pStream->conf.fillHistory) {
2,325✔
264
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
1,440✔
265
    if (code != 0) {
1,440!
266
      return code;
×
267
    }
268
  }
269
  return TDB_CODE_SUCCESS;
2,325✔
270
}
271

272
// create sink node for each vgroup.
273
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
848✔
274
  SSdb* pSdb = pMnode->pSdb;
848✔
275
  void* pIter = NULL;
848✔
276

277
  while (1) {
3,343✔
278
    SVgObj* pVgroup = NULL;
4,191✔
279
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
4,191✔
280
    if (pIter == NULL) {
4,191✔
281
      break;
848✔
282
    }
283

284
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
3,343✔
285
      sdbRelease(pSdb, pVgroup);
1,048✔
286
      continue;
1,048✔
287
    }
288

289
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
2,295✔
290
    if (code != 0) {
2,295!
291
      sdbRelease(pSdb, pVgroup);
×
292
      return code;
×
293
    }
294

295
    sdbRelease(pSdb, pVgroup);
2,295✔
296
  }
297

298
  return TDB_CODE_SUCCESS;
848✔
299
}
300

301
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
4,091✔
302
  int32_t size = (int32_t) taosArrayGetSize(pList);
4,091✔
303
  for (int32_t i = 0; i < size; ++i) {
7,151✔
304
    SVgroupVer* pVer = taosArrayGet(pList, i);
5,248✔
305
    if (pVer->vgId == vgId) {
5,248✔
306
      return pVer->ver;
2,188✔
307
    }
308
  }
309

310
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
1,903!
311
  return 1;
1,903✔
312
}
313

314
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
4,091✔
315
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
4,091✔
316
  if (latestVer < 0) {
4,091!
317
    latestVer = 0;
×
318
  }
319

320
  // set the correct ts, which is the last key of queried table.
321
  SDataRange*  pRange = &pTask->dataRange;
4,091✔
322
  STimeWindow* pWindow = &pRange->window;
4,091✔
323

324
  if (pTask->info.fillHistory) {
4,091✔
325
    pWindow->skey = INT64_MIN;
1,479✔
326
    pWindow->ekey = skey - 1;
1,479✔
327

328
    pRange->range.minVer = 0;
1,479✔
329
    pRange->range.maxVer = latestVer;
1,479✔
330
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
1,479!
331
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
332
  } else {
333
    pWindow->skey = skey;
2,612✔
334
    pWindow->ekey = INT64_MAX;
2,612✔
335

336
    pRange->range.minVer = latestVer + 1;
2,612✔
337
    pRange->range.maxVer = INT64_MAX;
2,612✔
338

339
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
2,612!
340
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
341
  }
342
}
4,091✔
343

344
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
2,958✔
345
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
2,958✔
346

347
  if (hasCountWindowNode && (!isFillhistoryTask)) {
2,958✔
348
    SStreamStatus* pStatus = &pTask->status;
1✔
349
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
1!
350
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
351
    pStatus->taskStatus = TASK_STATUS__HALT;
1✔
352
  }
353
}
2,958✔
354

355
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam, SStreamTask** pTask) {
4,091✔
356
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
4,091✔
357
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
4,091✔
358

359
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, pStream->conf.trigger,
4,091✔
360
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
4,091✔
361
                                pStream->subTableWithoutMd5, pTask);
4,091✔
362
  return code;
4,091✔
363
}
364

365
static void addNewTaskList(SStreamObj* pStream) {
2,086✔
366
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
2,086✔
367
  if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
4,172!
368
    mError("failed to put into array");
×
369
  }
370

371
  if (pStream->conf.fillHistory) {
2,086✔
372
    pTaskList = taosArrayInit(0, POINTER_BYTES);
1,084✔
373
    if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
2,168!
374
      mError("failed to put into array");
×
375
    }
376
  }
377
}
2,086✔
378

379
// set the history task id
380
static void setHTasksId(SStreamObj* pStream) {
1,090✔
381
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->tasks);
1,090✔
382
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTasksList);
1,090✔
383

384
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
4,076✔
385
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
2,986✔
386
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
2,986✔
387

388
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
2,986✔
389
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
2,986✔
390

391
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
2,986✔
392
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
2,986✔
393

394
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
2,986!
395
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
396
  }
397
}
1,090✔
398

399
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
4,091✔
400
                               SArray* pVerList, SVgObj* pVgroup, bool isHistoryTask, bool useTriggerParam) {
401
  SStreamTask* pTask = NULL;
4,091✔
402
  int32_t code = buildSourceTask(pStream, pEpset, isHistoryTask, useTriggerParam, &pTask);
4,091✔
403
  if (code != TSDB_CODE_SUCCESS) {
4,091!
404
    return code;
×
405
  }
406

407
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId,
4,091!
408
         isHistoryTask);
409

410
  if (pStream->conf.fillHistory) {
4,091✔
411
    haltInitialTaskStatus(pTask, plan, isHistoryTask);
2,958✔
412
  }
413

414
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
4,091✔
415

416
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
4,091✔
417
  if (code != TSDB_CODE_SUCCESS) {
4,091!
418
    return code;
×
419
  }
420

421
  return TDB_CODE_SUCCESS;
4,091✔
422
}
423

424
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
1,050✔
425
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,050!
426
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
1,050✔
427
  if (LIST_LENGTH(inner->pNodeList) != 1) {
1,050!
428
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
429
    return NULL;
×
430
  }
431

432
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
1,050✔
433
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
1,050!
434
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
435
    return NULL;
×
436
  }
437
  return plan;
1,050✔
438
}
439

440
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
291✔
441
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
291✔
442
  if (LIST_LENGTH(inner->pNodeList) != 1) {
291!
443
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
444
    return NULL;
×
445
  }
446

447
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
291✔
448
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
291!
449
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
450
    return NULL;
×
451
  }
452
  return plan;
291✔
453
}
454

455
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
1,050✔
456
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
457
  void*   pIter = NULL;
1,050✔
458
  int32_t code = 0;
1,050✔
459
  SSdb*   pSdb = pMnode->pSdb;
1,050✔
460

461
  addNewTaskList(pStream);
1,050✔
462

463
  while (1) {
3,820✔
464
    SVgObj* pVgroup;
465
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
4,870✔
466
    if (pIter == NULL) {
4,870✔
467
      break;
1,050✔
468
    }
469

470
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
3,820✔
471
      sdbRelease(pSdb, pVgroup);
1,208✔
472
      continue;
1,208✔
473
    }
474

475
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
2,612✔
476
    if (code != 0) {
2,612!
477
      mError("failed to create stream task, code:%s", tstrerror(code));
×
478

479
      // todo drop the added source tasks.
480
      sdbRelease(pSdb, pVgroup);
×
481
      return code;
×
482
    }
483

484
    if (pStream->conf.fillHistory) {
2,612✔
485
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
1,479✔
486
      if (code != 0) {
1,479!
487
        sdbRelease(pSdb, pVgroup);
×
488
        return code;
×
489
      }
490
    }
491

492
    sdbRelease(pSdb, pVgroup);
2,612✔
493
  }
494

495
  if (pStream->conf.fillHistory) {
1,050✔
496
    setHTasksId(pStream);
520✔
497
  }
498

499
  return TSDB_CODE_SUCCESS;
1,050✔
500
}
501

502
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam,
224✔
503
                            SStreamTask** pAggTask) {
504
  *pAggTask = NULL;
224✔
505

506
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
224✔
507
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
224✔
508

509
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, pStream->conf.trigger,
224✔
510
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
224✔
511
                                pStream->subTableWithoutMd5, pAggTask);
224✔
512
  return code;
224✔
513
}
514

515
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
224✔
516
                            SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
517
  int32_t      code = 0;
224✔
518
  SStreamTask* pTask = NULL;
224✔
519
  const char*  id = NULL;
224✔
520

521
  code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
224✔
522
  if (code != TSDB_CODE_SUCCESS) {
224!
523
    return code;
×
524
  }
525

526
  id = pTask->id.idStr;
224✔
527
  if (pSnode != NULL) {
224✔
528
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
4✔
529
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, isFillhistory);
4!
530
  } else {
531
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
220✔
532
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, isFillhistory);
220!
533
  }
534
  return code;
224✔
535
}
536

537
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
165✔
538
  SVgObj*    pVgroup = NULL;
165✔
539
  SSnodeObj* pSnode = NULL;
165✔
540
  int32_t    code = 0;
165✔
541
  if (tsDeployOnSnode) {
165!
542
    pSnode = mndSchedFetchOneSnode(pMnode);
165✔
543
    if (pSnode == NULL) {
165✔
544
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
162✔
545
    }
546
  } else {
547
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
548
  }
549

550
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
165✔
551
  if (code != 0) {
165!
552
    goto END;
×
553
  }
554

555
  if (pStream->conf.fillHistory) {
165✔
556
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
59✔
557
    if (code != 0) {
59!
558
      goto END;
×
559
    }
560

561
    setHTasksId(pStream);
59✔
562
  }
563

564
END:
106✔
565
  if (pSnode != NULL) {
165✔
566
    sdbRelease(pMnode->pSdb, pSnode);
3✔
567
  } else {
568
    sdbRelease(pMnode->pSdb, pVgroup);
162✔
569
  }
570
  return code;
165✔
571
}
572

573
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
878✔
574
  int32_t code = 0;
878✔
575
  addNewTaskList(pStream);
878✔
576

577
  if (pStream->fixedSinkVgId == 0) {
878✔
578
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
848✔
579
    if (code != 0) {
848!
580
      return code;
×
581
    }
582
  } else {
583
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
30✔
584
    if (code != 0) {
30!
585
      return code;
×
586
    }
587
  }
588

589
  if (pStream->conf.fillHistory) {
878✔
590
    setHTasksId(pStream);
511✔
591
  }
592
  return TDB_CODE_SUCCESS;
878✔
593
}
594

595
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
3,540✔
596
  int32_t code = 0;
3,540✔
597
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
3,540!
598
    mError("failed bind task to sink task since %s", tstrerror(code));
×
599
  }
600
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
14,580✔
601
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
11,040✔
602
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
11,040!
603
      mError("failed bind task to sink task since %s", tstrerror(code));
×
604
    }
605
  }
606
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
3,540!
607
}
3,540✔
608

609
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
202✔
610
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
202✔
611
  SArray** pAggTaskList = taosArrayGetLast(tasks);
202✔
612

613
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
404✔
614
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
202✔
615
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
202✔
616
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
202!
617
  }
618
}
202✔
619

620
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
1,368✔
621
  int32_t code = 0;
1,368✔
622
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
1,368✔
623
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
1,368✔
624

625
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
4,887✔
626
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
3,519✔
627
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
3,519!
628

629
    if (hasExtraSink) {
3,519✔
630
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
3,338✔
631
    } else {
632
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
181!
633
        mError("failed bind task to sink task since %s", tstrerror(code));
×
634
      }
635
    }
636
  }
637
}
1,368✔
638

639
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
224✔
640
  int32_t code = 0;
224✔
641
  size_t size = taosArrayGetSize(tasks);
224✔
642
  if (size < 2) {
224!
643
    mError("task list size is less than 2");
×
644
    return;
×
645
  }
646
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
224✔
647
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
224✔
648

649
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
224✔
650
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
224✔
651
  for (int i = begin; i < end; i++) {
818✔
652
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
594✔
653
    pUpTask->info.selfChildId = i - begin;
594✔
654
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
594✔
655
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
594!
656
      mError("failed bind task to sink task since %s", tstrerror(code));
×
657
    }
658
  }
659
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
224!
660
}
661

662
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
1,050✔
663
                                SArray* pVerList) {
664
  int32_t code = 0;
1,050✔
665
  SSdb*   pSdb = pMnode->pSdb;
1,050✔
666
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,050!
667
  bool    hasExtraSink = false;
1,050✔
668
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
1,050✔
669
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
1,050✔
670
  if (pDbObj == NULL) {
1,050!
671
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
672
    TAOS_RETURN(code);
×
673
  }
674

675
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
1,050✔
676
  sdbRelease(pSdb, pDbObj);
1,050✔
677

678
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
1,050!
679
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
680
  pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,050✔
681
  pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,050✔
682

683
  if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
1,050✔
684
    // add extra sink
685
    hasExtraSink = true;
878✔
686
    code = addSinkTask(pMnode, pStream, pEpset);
878✔
687
    if (code != TSDB_CODE_SUCCESS) {
878!
688
      return code;
×
689
    }
690
  }
691

692
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
1,050✔
693

694
  SSubplan* plan = getScanSubPlan(pPlan);  // source plan
1,050✔
695
  if (plan == NULL) {
1,050!
696
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
697
    if (terrno != 0) code = terrno;
×
698
    TAOS_RETURN(code);
×
699
  }
700

701
  code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1));
1,050✔
702
  if (code != TSDB_CODE_SUCCESS) {
1,050!
703
    return code;
×
704
  }
705

706
  if (numOfPlanLevel == 1) {
1,050✔
707
    bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink);
897✔
708
    if (pStream->conf.fillHistory) {
897✔
709
      bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink);
471✔
710
    }
711
    return TDB_CODE_SUCCESS;
897✔
712
  }
713

714
  if (numOfPlanLevel == 3) {
153✔
715
    plan = getAggSubPlan(pPlan, 1);  // middle agg plan
138✔
716
    if (plan == NULL) {
138!
717
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
      if (terrno != 0) code = terrno;
×
719
      TAOS_RETURN(code);
×
720
    }
721
    do {
5✔
722
      SArray** list = taosArrayGetLast(pStream->tasks);
143✔
723
      float    size = (float)taosArrayGetSize(*list);
143✔
724
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
143✔
725
      if (cnt <= 1) break;
143✔
726

727
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
5!
728
      addNewTaskList(pStream);
5✔
729

730
      for (int j = 0; j < cnt; j++) {
17✔
731
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
12✔
732
        if (code != TSDB_CODE_SUCCESS) {
12!
733
          return code;
×
734
        }
735

736
        bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
12✔
737
        if (pStream->conf.fillHistory) {
12✔
738
          bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
10✔
739
        }
740
      }
741
    } while (1);
742
  }
743

744
  plan = getAggSubPlan(pPlan, 0);
153✔
745
  if (plan == NULL) {
153!
746
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
747
    if (terrno != 0) code = terrno;
×
748
    TAOS_RETURN(code);
×
749
  }
750

751
  mDebug("doScheduleStream add final agg");
153!
752
  SArray** list = taosArrayGetLast(pStream->tasks);
153✔
753
  size_t   size = taosArrayGetSize(*list);
153✔
754
  addNewTaskList(pStream);
153✔
755
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
153✔
756
  if (code != TSDB_CODE_SUCCESS) {
153!
757
    TAOS_RETURN(code);
×
758
  }
759
  bindTwoLevel(pStream->tasks, 0, size);
153✔
760
  if (pStream->conf.fillHistory) {
153✔
761
    bindTwoLevel(pStream->pHTasksList, 0, size);
49✔
762
  }
763

764
  bindAggSink(pStream, pMnode, pStream->tasks);
153✔
765
  if (pStream->conf.fillHistory) {
153✔
766
    bindAggSink(pStream, pMnode, pStream->pHTasksList);
49✔
767
  }
768
  TAOS_RETURN(code);
153✔
769
}
770

771
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
1,050✔
772
  int32_t     code = 0;
1,050✔
773
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
1,050✔
774
  if (pPlan == NULL) {
1,050!
775
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
776
    TAOS_RETURN(code);
×
777
  }
778

779
  SEpSet mnodeEpset = {0};
1,050✔
780
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
1,050✔
781

782
  code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
1,050✔
783
  qDestroyQueryPlan(pPlan);
1,050✔
784

785
  TAOS_RETURN(code);
1,050✔
786
}
787

788
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
481✔
789
  int32_t     code = 0;
481✔
790
  SSdb*       pSdb = pMnode->pSdb;
481✔
791
  SVgObj*     pVgroup = NULL;
481✔
792
  SQueryPlan* pPlan = NULL;
481✔
793
  SSubplan*   pSubplan = NULL;
481✔
794

795
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
481✔
796
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
400✔
797
    if (pPlan == NULL) {
400!
798
      return TSDB_CODE_QRY_INVALID_INPUT;
×
799
    }
800
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
81✔
801
    SNode* pAst = NULL;
6✔
802
    code = nodesStringToNode(pTopic->ast, &pAst);
6✔
803
    if (code != 0) {
6!
804
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
805
      return code;
×
806
    }
807

808
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
6✔
809
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
6✔
810
    if (code != 0) {
6!
811
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
812
      nodesDestroyNode(pAst);
×
813
      return code;
×
814
    }
815
    nodesDestroyNode(pAst);
6✔
816
  }
817

818
  if (pPlan) {
481✔
819
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
406!
820
    if (levelNum != 1) {
406!
821
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
822
      goto END;
×
823
    }
824

825
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
406✔
826
    if (pNodeListNode == NULL){
406!
827
      code = TSDB_CODE_OUT_OF_MEMORY;
×
828
      goto END;
×
829
    }
830
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
406!
831
    if (opNum != 1) {
406!
832
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
833
      goto END;
×
834
    }
835

836
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
406✔
837
  }
838

839
  void* pIter = NULL;
481✔
840
  while (1) {
2,530✔
841
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
3,011✔
842
    if (pIter == NULL) {
3,011✔
843
      break;
481✔
844
    }
845

846
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
2,530✔
847
      sdbRelease(pSdb, pVgroup);
1,350✔
848
      continue;
1,350✔
849
    }
850

851
    pSub->vgNum++;
1,180✔
852

853
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
1,180✔
854
    if (pVgEp == NULL){
1,180!
855
      code = terrno;
×
856
      goto END;
×
857
    }
858
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,180✔
859
    pVgEp->vgId = pVgroup->vgId;
1,180✔
860
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
2,360!
861
      code = terrno;
×
862
      taosMemoryFree(pVgEp);
×
863
      goto END;
×
864
    }
865
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
1,180!
866
    sdbRelease(pSdb, pVgroup);
1,180✔
867
  }
868

869
  if (pSubplan) {
481✔
870
    int32_t msgLen;
871

872
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
406!
873
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
874
      goto END;
×
875
    }
876
  } else {
877
    pSub->qmsg = taosStrdup("");
75✔
878
  }
879

880
END:
481✔
881
  qDestroyQueryPlan(pPlan);
481✔
882
  return code;
481✔
883
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc