• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.65
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

30
static bool hasCountWindowNode(SPhysiNode* pNode) {
2,050✔
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
2,050✔
32
    return true;
2✔
33
  } else {
34
    size_t size = LIST_LENGTH(pNode->pChildren);
2,048✔
35

36
    for (int32_t i = 0; i < size; ++i) {
3,228✔
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
1,180✔
38
      if (hasCountWindowNode(pChild)) {
1,180!
UNCOV
39
        return true;
×
40
      }
41
    }
42

43
    return false;
2,048✔
44
  }
45
}
46

47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
870✔
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
870✔
49
}
50

51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
8✔
52
                           int64_t watermark, int64_t deleteMark) {
53
  int32_t     code = 0;
8✔
54
  SNode*      pAst = NULL;
8✔
55
  SQueryPlan* pPlan = NULL;
8✔
56

57
  if (nodesStringToNode(ast, &pAst) < 0) {
8!
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
8!
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

67
  SPlanContext cxt = {
8✔
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
8!
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
8!
83
  if (levelNum != 1) {
8!
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
8✔
88

89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
8!
90
  if (opNum != 1) {
8!
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
8✔
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
8!
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

101
END:
8✔
102
  if (pAst) nodesDestroyNode(pAst);
8!
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
8!
104
  TAOS_RETURN(code);
8✔
105
}
106

107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
1,551✔
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
1,551✔
109

110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
1,551✔
111

112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
1,551✔
113
    pInfo->type = TASK_OUTPUT__SMA;
30✔
114
    pInfo->smaSink.smaId = pStream->smaId;
30✔
115
  } else {
116
    pInfo->type = TASK_OUTPUT__TABLE;
1,521✔
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
1,521✔
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,521✔
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
1,521!
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
1,521!
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

125
  return 0;
1,551✔
126
}
127

128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
1,206✔
129
                                        SStreamTask* pTask) {
130
  int32_t code = 0;
1,206✔
131
  bool isShuffle = false;
1,206✔
132

133
  if (pStream->fixedSinkVgId == 0) {
1,206✔
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
1,176✔
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
1,176!
136
      isShuffle = true;
1,123✔
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
1,123✔
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
1,123✔
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
1,123!
140
    }
141

142
    sdbRelease(pMnode->pSdb, pDb);
1,176✔
143
  }
144

145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
1,206✔
146

147
  if (isShuffle) {
1,206✔
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
1,123✔
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
1,123✔
150

151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
1,123✔
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
3,941✔
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
2,818✔
154

155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
5,941!
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
5,941✔
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
5,941✔
158
          pVgInfo->taskId = pSinkTask->id.taskId;
2,818✔
159
          break;
2,818✔
160
        }
161
      }
162
    }
163
  } else {
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
83✔
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
83✔
166
  }
167

168
  TAOS_RETURN(code);
1,206✔
169
}
170

171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
1,826✔
172
  int32_t msgLen;
173

174
  pTask->info.nodeId = pVgroup->vgId;
1,826✔
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,826✔
176

177
  plan->execNode.nodeId = pTask->info.nodeId;
1,826✔
178
  plan->execNode.epSet = pTask->info.epSet;
1,826✔
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
1,826✔
180
}
181

182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
129✔
183
  SSnodeObj* pObj = NULL;
129✔
184
  void*      pIter = NULL;
129✔
185
  // TODO random fetch
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
129✔
187
  sdbCancelFetch(pMnode->pSdb, pIter);
129✔
188
  return pObj;
129✔
189
}
190

191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
5✔
192
  int32_t msgLen;
193

194
  pTask->info.nodeId = SNODE_HANDLE;
5✔
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
5✔
196

197
  plan->execNode.nodeId = SNODE_HANDLE;
5✔
198
  plan->execNode.epSet = pTask->info.epSet;
5✔
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
5✔
200

201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
5✔
202
}
203

204
// random choose a node to do compute
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
126✔
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
126✔
207
  if (pDbObj == NULL) {
126!
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

212
  if (pStream->indexForMultiAggBalance == -1) {
126✔
213
    taosSeedRand(taosSafeRand());
114✔
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
114✔
215
  }
216

217
  int32_t index = 0;
126✔
218
  void*   pIter = NULL;
126✔
219
  SVgObj* pVgroup = NULL;
126✔
220
  while (1) {
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
491✔
222
    if (pIter == NULL) break;
491!
223
    if (pVgroup->dbUid != pStream->sourceDbUid) {
491✔
224
      sdbRelease(pMnode->pSdb, pVgroup);
219✔
225
      continue;
219✔
226
    }
227
    if (index++ == pStream->indexForMultiAggBalance) {
272✔
228
      pStream->indexForMultiAggBalance++;
126✔
229
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
126✔
230
      sdbCancelFetch(pMnode->pSdb, pIter);
126✔
231
      break;
126✔
232
    }
233
    sdbRelease(pMnode->pSdb, pVgroup);
146✔
234
  }
235
  sdbRelease(pMnode->pSdb, pDbObj);
126✔
236

237
  return pVgroup;
126✔
238
}
239

240
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
1,386✔
241
  int64_t  uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
1,386✔
242
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
1,386✔
243

244
  SStreamTask* pTask = NULL;
1,386✔
245
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, 0, *pTaskList, pStream->conf.fillHistory,
1,386✔
246
                                pStream->subTableWithoutMd5, &pTask);
1,386✔
247
  if (code != 0) {
1,386!
248
    return code;
×
249
  }
250

251
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, isFillhistory);
1,386✔
252

253
  pTask->info.nodeId = pVgroup->vgId;
1,386✔
254
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,386✔
255
  return mndSetSinkTaskInfo(pStream, pTask);
1,386✔
256
}
257

258
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
973✔
259
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
973✔
260
  if (code != 0) {
973!
261
    return code;
×
262
  }
263
  if (pStream->conf.fillHistory) {
973✔
264
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
413✔
265
    if (code != 0) {
413!
266
      return code;
×
267
    }
268
  }
269
  return TDB_CODE_SUCCESS;
973✔
270
}
271

272
// create sink node for each vgroup.
273
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
408✔
274
  SSdb* pSdb = pMnode->pSdb;
408✔
275
  void* pIter = NULL;
408✔
276

277
  while (1) {
1,529✔
278
    SVgObj* pVgroup = NULL;
1,937✔
279
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
1,937✔
280
    if (pIter == NULL) {
1,937✔
281
      break;
408✔
282
    }
283

284
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
1,529✔
285
      sdbRelease(pSdb, pVgroup);
571✔
286
      continue;
571✔
287
    }
288

289
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
958✔
290
    if (code != 0) {
958!
291
      sdbRelease(pSdb, pVgroup);
×
292
      return code;
×
293
    }
294

295
    sdbRelease(pSdb, pVgroup);
958✔
296
  }
297

298
  return TDB_CODE_SUCCESS;
408✔
299
}
300

301
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
1,657✔
302
  int32_t size = (int32_t) taosArrayGetSize(pList);
1,657✔
303
  for (int32_t i = 0; i < size; ++i) {
2,101✔
304
    SVgroupVer* pVer = taosArrayGet(pList, i);
848✔
305
    if (pVer->vgId == vgId) {
848✔
306
      return pVer->ver;
404✔
307
    }
308
  }
309

310
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
1,253✔
311
  return 1;
1,253✔
312
}
313

314
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
1,657✔
315
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
1,657✔
316
  if (latestVer < 0) {
1,657!
317
    latestVer = 0;
×
318
  }
319

320
  // set the correct ts, which is the last key of queried table.
321
  SDataRange*  pRange = &pTask->dataRange;
1,657✔
322
  STimeWindow* pWindow = &pRange->window;
1,657✔
323

324
  if (pTask->info.fillHistory) {
1,657✔
325
    pWindow->skey = INT64_MIN;
435✔
326
    pWindow->ekey = skey - 1;
435✔
327

328
    pRange->range.minVer = 0;
435✔
329
    pRange->range.maxVer = latestVer;
435✔
330
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
435✔
331
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
332
  } else {
333
    pWindow->skey = skey;
1,222✔
334
    pWindow->ekey = INT64_MAX;
1,222✔
335

336
    pRange->range.minVer = latestVer + 1;
1,222✔
337
    pRange->range.maxVer = INT64_MAX;
1,222✔
338

339
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
1,222✔
340
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
341
  }
342
}
1,657✔
343

344
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
870✔
345
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
870✔
346

347
  if (hasCountWindowNode && (!isFillhistoryTask)) {
870✔
348
    SStreamStatus* pStatus = &pTask->status;
1✔
349
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
1!
350
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
351
    pStatus->taskStatus = TASK_STATUS__HALT;
1✔
352
  }
353
}
870✔
354

355
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam, SStreamTask** pTask) {
1,657✔
356
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
1,657✔
357
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
1,657✔
358

359
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, pStream->conf.trigger,
1,657✔
360
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
1,657✔
361
                                pStream->subTableWithoutMd5, pTask);
1,657✔
362
  return code;
1,657✔
363
}
364

365
static void addNewTaskList(SStreamObj* pStream) {
1,126✔
366
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
1,126✔
367
  if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
2,252!
368
    mError("failed to put into array");
×
369
  }
370

371
  if (pStream->conf.fillHistory) {
1,126✔
372
    pTaskList = taosArrayInit(0, POINTER_BYTES);
460✔
373
    if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
920!
374
      mError("failed to put into array");
×
375
    }
376
  }
377
}
1,126✔
378

379
// set the history task id
380
static void setHTasksId(SStreamObj* pStream) {
466✔
381
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->tasks);
466✔
382
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTasksList);
466✔
383

384
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
1,367✔
385
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
901✔
386
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
901✔
387

388
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
901✔
389
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
901✔
390

391
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
901✔
392
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
901✔
393

394
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
901✔
395
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
396
  }
397
}
466✔
398

399
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
1,657✔
400
                               SArray* pVerList, SVgObj* pVgroup, bool isHistoryTask, bool useTriggerParam) {
401
  SStreamTask* pTask = NULL;
1,657✔
402
  int32_t code = buildSourceTask(pStream, pEpset, isHistoryTask, useTriggerParam, &pTask);
1,657✔
403
  if (code != TSDB_CODE_SUCCESS) {
1,657!
404
    return code;
×
405
  }
406

407
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId,
1,657✔
408
         isHistoryTask);
409

410
  if (pStream->conf.fillHistory) {
1,657✔
411
    haltInitialTaskStatus(pTask, plan, isHistoryTask);
870✔
412
  }
413

414
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
1,657✔
415

416
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
1,657✔
417
  if (code != TSDB_CODE_SUCCESS) {
1,657!
418
    return code;
×
419
  }
420

421
  return TDB_CODE_SUCCESS;
1,657✔
422
}
423

424
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
581✔
425
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
581!
426
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
581✔
427
  if (LIST_LENGTH(inner->pNodeList) != 1) {
581!
428
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
429
    return NULL;
×
430
  }
431

432
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
581✔
433
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
581!
434
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
435
    return NULL;
×
436
  }
437
  return plan;
581✔
438
}
439

440
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
221✔
441
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
221✔
442
  if (LIST_LENGTH(inner->pNodeList) != 1) {
221!
443
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
444
    return NULL;
×
445
  }
446

447
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
221✔
448
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
221!
449
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
450
    return NULL;
×
451
  }
452
  return plan;
221✔
453
}
454

455
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
581✔
456
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
457
  void*   pIter = NULL;
581✔
458
  int32_t code = 0;
581✔
459
  SSdb*   pSdb = pMnode->pSdb;
581✔
460

461
  addNewTaskList(pStream);
581✔
462

463
  while (1) {
1,933✔
464
    SVgObj* pVgroup;
465
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
2,514✔
466
    if (pIter == NULL) {
2,514✔
467
      break;
581✔
468
    }
469

470
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
1,933✔
471
      sdbRelease(pSdb, pVgroup);
711✔
472
      continue;
711✔
473
    }
474

475
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
1,222✔
476
    if (code != 0) {
1,222!
477
      mError("failed to create stream task, code:%s", tstrerror(code));
×
478

479
      // todo drop the added source tasks.
480
      sdbRelease(pSdb, pVgroup);
×
481
      return code;
×
482
    }
483

484
    if (pStream->conf.fillHistory) {
1,222✔
485
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
435✔
486
      if (code != 0) {
435!
487
        sdbRelease(pSdb, pVgroup);
×
488
        return code;
×
489
      }
490
    }
491

492
    sdbRelease(pSdb, pVgroup);
1,222✔
493
  }
494

495
  if (pStream->conf.fillHistory) {
581✔
496
    setHTasksId(pStream);
214✔
497
  }
498

499
  return TSDB_CODE_SUCCESS;
581✔
500
}
501

502
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam,
174✔
503
                            SStreamTask** pAggTask) {
504
  *pAggTask = NULL;
174✔
505

506
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
174✔
507
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
174✔
508

509
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, pStream->conf.trigger,
174✔
510
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
174✔
511
                                pStream->subTableWithoutMd5, pAggTask);
174✔
512
  return code;
174✔
513
}
514

515
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
174✔
516
                            SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
517
  int32_t      code = 0;
174✔
518
  SStreamTask* pTask = NULL;
174✔
519
  const char*  id = NULL;
174✔
520

521
  code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
174✔
522
  if (code != TSDB_CODE_SUCCESS) {
174!
523
    return code;
×
524
  }
525

526
  id = pTask->id.idStr;
174✔
527
  if (pSnode != NULL) {
174✔
528
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
5✔
529
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, isFillhistory);
5✔
530
  } else {
531
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
169✔
532
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, isFillhistory);
169!
533
  }
534
  return code;
174✔
535
}
536

537
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
129✔
538
  SVgObj*    pVgroup = NULL;
129✔
539
  SSnodeObj* pSnode = NULL;
129✔
540
  int32_t    code = 0;
129✔
541
  if (tsDeployOnSnode) {
129!
542
    pSnode = mndSchedFetchOneSnode(pMnode);
129✔
543
    if (pSnode == NULL) {
129✔
544
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
126✔
545
    }
546
  } else {
547
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
548
  }
549

550
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
129✔
551
  if (code != 0) {
129!
552
    goto END;
×
553
  }
554

555
  if (pStream->conf.fillHistory) {
129✔
556
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
45✔
557
    if (code != 0) {
45!
558
      goto END;
×
559
    }
560

561
    setHTasksId(pStream);
45✔
562
  }
563

564
END:
84✔
565
  if (pSnode != NULL) {
129✔
566
    sdbRelease(pMnode->pSdb, pSnode);
3✔
567
  } else {
568
    sdbRelease(pMnode->pSdb, pVgroup);
126✔
569
  }
570
  return code;
129✔
571
}
572

573
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
423✔
574
  int32_t code = 0;
423✔
575
  addNewTaskList(pStream);
423✔
576

577
  if (pStream->fixedSinkVgId == 0) {
423✔
578
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
408✔
579
    if (code != 0) {
408!
580
      return code;
×
581
    }
582
  } else {
583
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
15✔
584
    if (code != 0) {
15!
585
      return code;
×
586
    }
587
  }
588

589
  if (pStream->conf.fillHistory) {
423✔
590
    setHTasksId(pStream);
207✔
591
  }
592
  return TDB_CODE_SUCCESS;
423✔
593
}
594

595
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
1,206✔
596
  int32_t code = 0;
1,206✔
597
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
1,206!
598
    mError("failed bind task to sink task since %s", tstrerror(code));
×
599
  }
600
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
4,107✔
601
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
2,901✔
602
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
2,901!
603
      mError("failed bind task to sink task since %s", tstrerror(code));
×
604
    }
605
  }
606
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
1,206✔
607
}
1,206✔
608

609
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
152✔
610
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
152✔
611
  SArray** pAggTaskList = taosArrayGetLast(tasks);
152✔
612

613
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
304✔
614
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
152✔
615
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
152✔
616
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
152✔
617
  }
618
}
152✔
619

620
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
643✔
621
  int32_t code = 0;
643✔
622
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
643✔
623
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
643✔
624

625
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
1,862✔
626
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
1,219✔
627
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
1,219!
628

629
    if (hasExtraSink) {
1,219✔
630
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
1,054✔
631
    } else {
632
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
165!
633
        mError("failed bind task to sink task since %s", tstrerror(code));
×
634
      }
635
    }
636
  }
637
}
643✔
638

639
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
174✔
640
  int32_t code = 0;
174✔
641
  size_t size = taosArrayGetSize(tasks);
174✔
642
  if (size < 2) {
174!
643
    mError("task list size is less than 2");
×
644
    return;
×
645
  }
646
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
174✔
647
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
174✔
648

649
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
174✔
650
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
174✔
651
  for (int i = begin; i < end; i++) {
634✔
652
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
460✔
653
    pUpTask->info.selfChildId = i - begin;
460✔
654
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
460✔
655
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
460!
656
      mError("failed bind task to sink task since %s", tstrerror(code));
×
657
    }
658
  }
659
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
174✔
660
}
661

662
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
581✔
663
                                SArray* pVerList) {
664
  int32_t code = 0;
581✔
665
  SSdb*   pSdb = pMnode->pSdb;
581✔
666
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
581!
667
  bool    hasExtraSink = false;
581✔
668
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
581✔
669
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
581✔
670
  if (pDbObj == NULL) {
581!
671
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
672
    TAOS_RETURN(code);
×
673
  }
674

675
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
581✔
676
  sdbRelease(pSdb, pDbObj);
581✔
677

678
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
581✔
679
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
680
  pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
581✔
681
  pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
581✔
682

683
  if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
581✔
684
    // add extra sink
685
    hasExtraSink = true;
423✔
686
    code = addSinkTask(pMnode, pStream, pEpset);
423✔
687
    if (code != TSDB_CODE_SUCCESS) {
423!
688
      return code;
×
689
    }
690
  }
691

692
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
581✔
693

694
  SSubplan* plan = getScanSubPlan(pPlan);  // source plan
581✔
695
  if (plan == NULL) {
581!
696
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
697
    if (terrno != 0) code = terrno;
×
698
    TAOS_RETURN(code);
×
699
  }
700

701
  code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1));
581✔
702
  if (code != TSDB_CODE_SUCCESS) {
581!
703
    return code;
×
704
  }
705

706
  if (numOfPlanLevel == 1) {
581✔
707
    bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink);
464✔
708
    if (pStream->conf.fillHistory) {
464✔
709
      bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink);
179✔
710
    }
711
    return TDB_CODE_SUCCESS;
464✔
712
  }
713

714
  if (numOfPlanLevel == 3) {
117✔
715
    plan = getAggSubPlan(pPlan, 1);  // middle agg plan
104✔
716
    if (plan == NULL) {
104!
717
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
      if (terrno != 0) code = terrno;
×
719
      TAOS_RETURN(code);
×
720
    }
721
    do {
5✔
722
      SArray** list = taosArrayGetLast(pStream->tasks);
109✔
723
      float    size = (float)taosArrayGetSize(*list);
109✔
724
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
109✔
725
      if (cnt <= 1) break;
109✔
726

727
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
5!
728
      addNewTaskList(pStream);
5✔
729

730
      for (int j = 0; j < cnt; j++) {
17✔
731
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
12✔
732
        if (code != TSDB_CODE_SUCCESS) {
12!
733
          return code;
×
734
        }
735

736
        bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
12✔
737
        if (pStream->conf.fillHistory) {
12✔
738
          bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
10✔
739
        }
740
      }
741
    } while (1);
742
  }
743

744
  plan = getAggSubPlan(pPlan, 0);
117✔
745
  if (plan == NULL) {
117!
746
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
747
    if (terrno != 0) code = terrno;
×
748
    TAOS_RETURN(code);
×
749
  }
750

751
  mDebug("doScheduleStream add final agg");
117✔
752
  SArray** list = taosArrayGetLast(pStream->tasks);
117✔
753
  size_t   size = taosArrayGetSize(*list);
117✔
754
  addNewTaskList(pStream);
117✔
755
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
117✔
756
  if (code != TSDB_CODE_SUCCESS) {
117!
757
    TAOS_RETURN(code);
×
758
  }
759
  bindTwoLevel(pStream->tasks, 0, size);
117✔
760
  if (pStream->conf.fillHistory) {
117✔
761
    bindTwoLevel(pStream->pHTasksList, 0, size);
35✔
762
  }
763

764
  bindAggSink(pStream, pMnode, pStream->tasks);
117✔
765
  if (pStream->conf.fillHistory) {
117✔
766
    bindAggSink(pStream, pMnode, pStream->pHTasksList);
35✔
767
  }
768
  TAOS_RETURN(code);
117✔
769
}
770

771
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
581✔
772
  int32_t     code = 0;
581✔
773
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
581✔
774
  if (pPlan == NULL) {
581!
775
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
776
    TAOS_RETURN(code);
×
777
  }
778

779
  SEpSet mnodeEpset = {0};
581✔
780
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
581✔
781

782
  code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
581✔
783
  qDestroyQueryPlan(pPlan);
581✔
784

785
  TAOS_RETURN(code);
581✔
786
}
787

788
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
321✔
789
  int32_t     code = 0;
321✔
790
  SSdb*       pSdb = pMnode->pSdb;
321✔
791
  SVgObj*     pVgroup = NULL;
321✔
792
  SQueryPlan* pPlan = NULL;
321✔
793
  SSubplan*   pSubplan = NULL;
321✔
794

795
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
321✔
796
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
271✔
797
    if (pPlan == NULL) {
271!
798
      return TSDB_CODE_QRY_INVALID_INPUT;
×
799
    }
800
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
50✔
801
    SNode* pAst = NULL;
1✔
802
    code = nodesStringToNode(pTopic->ast, &pAst);
1✔
803
    if (code != 0) {
1!
804
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
805
      return code;
×
806
    }
807

808
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
1✔
809
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
1✔
810
    if (code != 0) {
1!
811
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
812
      nodesDestroyNode(pAst);
×
813
      return code;
×
814
    }
815
    nodesDestroyNode(pAst);
1✔
816
  }
817

818
  if (pPlan) {
321✔
819
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
272!
820
    if (levelNum != 1) {
272!
821
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
822
      goto END;
×
823
    }
824

825
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
272✔
826
    if (pNodeListNode == NULL){
272!
827
      code = TSDB_CODE_OUT_OF_MEMORY;
×
828
      goto END;
×
829
    }
830
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
272!
831
    if (opNum != 1) {
272!
832
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
833
      goto END;
×
834
    }
835

836
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
272✔
837
  }
838

839
  void* pIter = NULL;
321✔
840
  while (1) {
1,448✔
841
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
1,769✔
842
    if (pIter == NULL) {
1,769✔
843
      break;
321✔
844
    }
845

846
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
1,448✔
847
      sdbRelease(pSdb, pVgroup);
812✔
848
      continue;
812✔
849
    }
850

851
    pSub->vgNum++;
636✔
852

853
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
636✔
854
    if (pVgEp == NULL){
636!
855
      code = terrno;
×
856
      goto END;
×
857
    }
858
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
636✔
859
    pVgEp->vgId = pVgroup->vgId;
636✔
860
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
1,272!
861
      code = terrno;
×
862
      taosMemoryFree(pVgEp);
×
863
      goto END;
×
864
    }
865
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
636!
866
    sdbRelease(pSdb, pVgroup);
636✔
867
  }
868

869
  if (pSubplan) {
321✔
870
    int32_t msgLen;
871

872
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
272!
873
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
874
      goto END;
×
875
    }
876
  } else {
877
    pSub->qmsg = taosStrdup("");
49✔
878
  }
879

880
END:
321✔
881
  qDestroyQueryPlan(pPlan);
321✔
882
  return code;
321✔
883
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc