• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3524

08 Nov 2024 04:27AM UTC coverage: 60.898% (+5.0%) from 55.861%
#3524

push

travis-ci

web-flow
Merge pull request #28647 from taosdata/fix/3.0/TD-32519_drop_ctb

fix TD-32519 drop child table with tsma caused crash

118687 of 248552 branches covered (47.75%)

Branch coverage included in aggregate %.

286 of 337 new or added lines in 18 files covered. (84.87%)

9647 existing lines in 190 files now uncovered.

199106 of 273291 relevant lines covered (72.85%)

15236719.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.1
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

30
static bool hasCountWindowNode(SPhysiNode* pNode) {
11,150✔
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
11,150✔
32
    return true;
158✔
33
  } else {
34
    size_t size = LIST_LENGTH(pNode->pChildren);
10,992✔
35

36
    for (int32_t i = 0; i < size; ++i) {
17,216✔
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
6,368✔
38
      if (hasCountWindowNode(pChild)) {
6,368✔
39
        return true;
144✔
40
      }
41
    }
42

43
    return false;
10,848✔
44
  }
45
}
46

47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
4,782✔
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
4,782✔
49
}
50

51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
8✔
52
                           int64_t watermark, int64_t deleteMark) {
53
  int32_t     code = 0;
8✔
54
  SNode*      pAst = NULL;
8✔
55
  SQueryPlan* pPlan = NULL;
8✔
56

57
  if (nodesStringToNode(ast, &pAst) < 0) {
8!
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
8!
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

67
  SPlanContext cxt = {
8✔
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
8!
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
8!
83
  if (levelNum != 1) {
8!
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
8✔
88

89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
8!
90
  if (opNum != 1) {
8!
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
8✔
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
8!
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

101
END:
8✔
102
  if (pAst) nodesDestroyNode(pAst);
8!
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
8!
104
  TAOS_RETURN(code);
8✔
105
}
106

107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
6,584✔
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
6,584✔
109

110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
6,584✔
111

112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
6,584✔
113
    pInfo->type = TASK_OUTPUT__SMA;
62✔
114
    pInfo->smaSink.smaId = pStream->smaId;
62✔
115
  } else {
116
    pInfo->type = TASK_OUTPUT__TABLE;
6,522✔
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
6,522✔
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
6,522✔
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
6,522!
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
6,522!
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

125
  return 0;
6,584✔
126
}
127

128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
6,022✔
129
                                        SStreamTask* pTask) {
130
  int32_t code = 0;
6,022✔
131
  bool isShuffle = false;
6,022✔
132

133
  if (pStream->fixedSinkVgId == 0) {
6,022✔
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
5,960✔
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
5,960!
136
      isShuffle = true;
5,904✔
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
5,904✔
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
5,904✔
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
5,904!
140
    }
141

142
    sdbRelease(pMnode->pSdb, pDb);
5,960✔
143
  }
144

145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
6,022✔
146

147
  if (isShuffle) {
6,022✔
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
5,904✔
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
5,904✔
150

151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
5,904✔
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
24,260✔
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
18,356✔
154

155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
39,975!
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
39,975✔
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
39,975✔
158
          pVgInfo->taskId = pSinkTask->id.taskId;
18,356✔
159
          break;
18,356✔
160
        }
161
      }
162
    }
163
  } else {
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
118✔
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
118✔
166
  }
167

168
  TAOS_RETURN(code);
6,022✔
169
}
170

171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
6,955✔
172
  int32_t msgLen;
173

174
  pTask->info.nodeId = pVgroup->vgId;
6,955✔
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
6,955✔
176

177
  plan->execNode.nodeId = pTask->info.nodeId;
6,955✔
178
  plan->execNode.epSet = pTask->info.epSet;
6,955✔
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
6,955✔
180
}
181

182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
217✔
183
  SSnodeObj* pObj = NULL;
217✔
184
  void*      pIter = NULL;
217✔
185
  // TODO random fetch
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
217✔
187
  sdbCancelFetch(pMnode->pSdb, pIter);
217✔
188
  return pObj;
217✔
189
}
190

191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
85✔
192
  int32_t msgLen;
193

194
  pTask->info.nodeId = SNODE_HANDLE;
85✔
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
85✔
196

197
  plan->execNode.nodeId = SNODE_HANDLE;
85✔
198
  plan->execNode.epSet = pTask->info.epSet;
85✔
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
85✔
200

201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
85✔
202
}
203

204
// random choose a node to do compute
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
162✔
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
162✔
207
  if (pDbObj == NULL) {
162!
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

212
  if (pStream->indexForMultiAggBalance == -1) {
162✔
213
    taosSeedRand(taosSafeRand());
150✔
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
150✔
215
  }
216

217
  int32_t index = 0;
162✔
218
  void*   pIter = NULL;
162✔
219
  SVgObj* pVgroup = NULL;
162✔
220
  while (1) {
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
605✔
222
    if (pIter == NULL) break;
605!
223
    if (pVgroup->dbUid != pStream->sourceDbUid) {
605✔
224
      sdbRelease(pMnode->pSdb, pVgroup);
258✔
225
      continue;
258✔
226
    }
227
    if (index++ == pStream->indexForMultiAggBalance) {
347✔
228
      pStream->indexForMultiAggBalance++;
162✔
229
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
162✔
230
      sdbCancelFetch(pMnode->pSdb, pIter);
162✔
231
      break;
162✔
232
    }
233
    sdbRelease(pMnode->pSdb, pVgroup);
185✔
234
  }
235
  sdbRelease(pMnode->pSdb, pDbObj);
162✔
236

237
  return pVgroup;
162✔
238
}
239

240
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
6,401✔
241
  int64_t  uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
6,401✔
242
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
6,401✔
243

244
  SStreamTask* pTask = NULL;
6,401✔
245
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, 0, *pTaskList, pStream->conf.fillHistory,
6,401✔
246
                                pStream->subTableWithoutMd5, &pTask);
6,401✔
247
  if (code != 0) {
6,401!
248
    return code;
×
249
  }
250

251
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, isFillhistory);
6,401✔
252

253
  pTask->info.nodeId = pVgroup->vgId;
6,401✔
254
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
6,401✔
255
  return mndSetSinkTaskInfo(pStream, pTask);
6,401✔
256
}
257

258
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
4,053✔
259
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
4,053✔
260
  if (code != 0) {
4,053!
261
    return code;
×
262
  }
263
  if (pStream->conf.fillHistory) {
4,053✔
264
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
2,348✔
265
    if (code != 0) {
2,348!
266
      return code;
×
267
    }
268
  }
269
  return TDB_CODE_SUCCESS;
4,053✔
270
}
271

272
// create sink node for each vgroup.
273
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
1,424✔
274
  SSdb* pSdb = pMnode->pSdb;
1,424✔
275
  void* pIter = NULL;
1,424✔
276

277
  while (1) {
5,161✔
278
    SVgObj* pVgroup = NULL;
6,585✔
279
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
6,585✔
280
    if (pIter == NULL) {
6,585✔
281
      break;
1,424✔
282
    }
283

284
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
5,161✔
285
      sdbRelease(pSdb, pVgroup);
1,139✔
286
      continue;
1,139✔
287
    }
288

289
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
4,022✔
290
    if (code != 0) {
4,022!
291
      sdbRelease(pSdb, pVgroup);
×
292
      return code;
×
293
    }
294

295
    sdbRelease(pSdb, pVgroup);
4,022✔
296
  }
297

298
  return TDB_CODE_SUCCESS;
1,424✔
299
}
300

301
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
6,735✔
302
  int32_t size = (int32_t) taosArrayGetSize(pList);
6,735✔
303
  for (int32_t i = 0; i < size; ++i) {
10,987✔
304
    SVgroupVer* pVer = taosArrayGet(pList, i);
7,040✔
305
    if (pVer->vgId == vgId) {
7,040✔
306
      return pVer->ver;
2,788✔
307
    }
308
  }
309

310
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
3,947✔
311
  return 1;
3,947✔
312
}
313

314
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
6,735✔
315
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
6,735✔
316
  if (latestVer < 0) {
6,735!
317
    latestVer = 0;
×
318
  }
319

320
  // set the correct ts, which is the last key of queried table.
321
  SDataRange*  pRange = &pTask->dataRange;
6,735✔
322
  STimeWindow* pWindow = &pRange->window;
6,735✔
323

324
  if (pTask->info.fillHistory) {
6,735✔
325
    pWindow->skey = INT64_MIN;
2,391✔
326
    pWindow->ekey = skey - 1;
2,391✔
327

328
    pRange->range.minVer = 0;
2,391✔
329
    pRange->range.maxVer = latestVer;
2,391✔
330
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
2,391✔
331
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
332
  } else {
333
    pWindow->skey = skey;
4,344✔
334
    pWindow->ekey = INT64_MAX;
4,344✔
335

336
    pRange->range.minVer = latestVer + 1;
4,344✔
337
    pRange->range.maxVer = INT64_MAX;
4,344✔
338

339
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
4,344✔
340
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
341
  }
342
}
6,735✔
343

344
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
4,782✔
345
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
4,782✔
346

347
  if (hasCountWindowNode && (!isFillhistoryTask)) {
4,782✔
348
    SStreamStatus* pStatus = &pTask->status;
79✔
349
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
79✔
350
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
351
    pStatus->taskStatus = TASK_STATUS__HALT;
79✔
352
  }
353
}
4,782✔
354

355
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam, SStreamTask** pTask) {
6,735✔
356
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
6,735✔
357
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
6,735✔
358

359
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, pStream->conf.trigger,
6,735✔
360
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
6,735✔
361
                                pStream->subTableWithoutMd5, pTask);
6,735✔
362
  return code;
6,735✔
363
}
364

365
static void addNewTaskList(SStreamObj* pStream) {
3,293✔
366
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
3,293✔
367
  if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
6,586!
UNCOV
368
    mError("failed to put into array");
×
369
  }
370

371
  if (pStream->conf.fillHistory) {
3,293✔
372
    pTaskList = taosArrayInit(0, POINTER_BYTES);
1,724✔
373
    if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
3,448!
UNCOV
374
      mError("failed to put into array");
×
375
    }
376
  }
377
}
3,293✔
378

379
// set the history task id
380
static void setHTasksId(SStreamObj* pStream) {
1,730✔
381
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->tasks);
1,730✔
382
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTasksList);
1,730✔
383

384
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
6,565✔
385
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
4,835✔
386
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
4,835✔
387

388
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
4,835✔
389
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
4,835✔
390

391
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
4,835✔
392
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
4,835✔
393

394
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
4,835✔
395
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
396
  }
397
}
1,730✔
398

399
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
6,735✔
400
                               SArray* pVerList, SVgObj* pVgroup, bool isHistoryTask, bool useTriggerParam) {
401
  SStreamTask* pTask = NULL;
6,735✔
402
  int32_t code = buildSourceTask(pStream, pEpset, isHistoryTask, useTriggerParam, &pTask);
6,735✔
403
  if (code != TSDB_CODE_SUCCESS) {
6,735!
UNCOV
404
    return code;
×
405
  }
406

407
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId,
6,735✔
408
         isHistoryTask);
409

410
  if (pStream->conf.fillHistory) {
6,735✔
411
    haltInitialTaskStatus(pTask, plan, isHistoryTask);
4,782✔
412
  }
413

414
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
6,735✔
415

416
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
6,735✔
417
  if (code != TSDB_CODE_SUCCESS) {
6,735!
UNCOV
418
    return code;
×
419
  }
420

421
  return TDB_CODE_SUCCESS;
6,735✔
422
}
423

424
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
1,628✔
425
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,628!
426
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
1,628✔
427
  if (LIST_LENGTH(inner->pNodeList) != 1) {
1,628!
428
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
429
    return NULL;
×
430
  }
431

432
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
1,628✔
433
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
1,628!
434
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
435
    return NULL;
×
436
  }
437
  return plan;
1,628✔
438
}
439

440
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
381✔
441
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
381✔
442
  if (LIST_LENGTH(inner->pNodeList) != 1) {
381!
443
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
444
    return NULL;
×
445
  }
446

447
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
381✔
448
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
381!
449
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
450
    return NULL;
×
451
  }
452
  return plan;
381✔
453
}
454

455
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
1,628✔
456
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
457
  void*   pIter = NULL;
1,628✔
458
  int32_t code = 0;
1,628✔
459
  SSdb*   pSdb = pMnode->pSdb;
1,628✔
460

461
  addNewTaskList(pStream);
1,628✔
462

463
  while (1) {
5,663✔
464
    SVgObj* pVgroup;
465
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
7,291✔
466
    if (pIter == NULL) {
7,291✔
467
      break;
1,628✔
468
    }
469

470
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
5,663✔
471
      sdbRelease(pSdb, pVgroup);
1,319✔
472
      continue;
1,319✔
473
    }
474

475
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
4,344✔
476
    if (code != 0) {
4,344!
UNCOV
477
      mError("failed to create stream task, code:%s", tstrerror(code));
×
478

479
      // todo drop the added source tasks.
UNCOV
480
      sdbRelease(pSdb, pVgroup);
×
UNCOV
481
      return code;
×
482
    }
483

484
    if (pStream->conf.fillHistory) {
4,344✔
485
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
2,391✔
486
      if (code != 0) {
2,391!
UNCOV
487
        sdbRelease(pSdb, pVgroup);
×
UNCOV
488
        return code;
×
489
      }
490
    }
491

492
    sdbRelease(pSdb, pVgroup);
4,344✔
493
  }
494

495
  if (pStream->conf.fillHistory) {
1,628✔
496
    setHTasksId(pStream);
826✔
497
  }
498

499
  return TSDB_CODE_SUCCESS;
1,628✔
500
}
501

502
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam,
305✔
503
                            SStreamTask** pAggTask) {
504
  *pAggTask = NULL;
305✔
505

506
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
305✔
507
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
305✔
508

509
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, pStream->conf.trigger,
305✔
510
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
305✔
511
                                pStream->subTableWithoutMd5, pAggTask);
305✔
512
  return code;
305✔
513
}
514

515
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
305✔
516
                            SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
517
  int32_t      code = 0;
305✔
518
  SStreamTask* pTask = NULL;
305✔
519
  const char*  id = NULL;
305✔
520

521
  code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
305✔
522
  if (code != TSDB_CODE_SUCCESS) {
305!
UNCOV
523
    return code;
×
524
  }
525

526
  id = pTask->id.idStr;
305✔
527
  if (pSnode != NULL) {
305✔
528
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
85✔
529
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, isFillhistory);
85✔
530
  } else {
531
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
220✔
532
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, isFillhistory);
220!
533
  }
534
  return code;
305✔
535
}
536

537
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
217✔
538
  SVgObj*    pVgroup = NULL;
217✔
539
  SSnodeObj* pSnode = NULL;
217✔
540
  int32_t    code = 0;
217✔
541
  if (tsDeployOnSnode) {
217!
542
    pSnode = mndSchedFetchOneSnode(pMnode);
217✔
543
    if (pSnode == NULL) {
217✔
544
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
162✔
545
    }
546
  } else {
UNCOV
547
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
548
  }
549

550
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
217✔
551
  if (code != 0) {
217!
UNCOV
552
    goto END;
×
553
  }
554

555
  if (pStream->conf.fillHistory) {
217✔
556
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
88✔
557
    if (code != 0) {
88!
UNCOV
558
      goto END;
×
559
    }
560

561
    setHTasksId(pStream);
88✔
562
  }
563

564
END:
129✔
565
  if (pSnode != NULL) {
217✔
566
    sdbRelease(pMnode->pSdb, pSnode);
55✔
567
  } else {
568
    sdbRelease(pMnode->pSdb, pVgroup);
162✔
569
  }
570
  return code;
217✔
571
}
572

573
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
1,455✔
574
  int32_t code = 0;
1,455✔
575
  addNewTaskList(pStream);
1,455✔
576

577
  if (pStream->fixedSinkVgId == 0) {
1,455✔
578
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
1,424✔
579
    if (code != 0) {
1,424!
UNCOV
580
      return code;
×
581
    }
582
  } else {
583
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
31✔
584
    if (code != 0) {
31!
UNCOV
585
      return code;
×
586
    }
587
  }
588

589
  if (pStream->conf.fillHistory) {
1,455✔
590
    setHTasksId(pStream);
816✔
591
  }
592
  return TDB_CODE_SUCCESS;
1,455✔
593
}
594

595
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
6,022✔
596
  int32_t code = 0;
6,022✔
597
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
6,022!
UNCOV
598
    mError("failed bind task to sink task since %s", tstrerror(code));
×
599
  }
600
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
24,496✔
601
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
18,474✔
602
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
18,474!
UNCOV
603
      mError("failed bind task to sink task since %s", tstrerror(code));
×
604
    }
605
  }
606
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
6,022✔
607
}
6,022✔
608

609
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
283✔
610
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
283✔
611
  SArray** pAggTaskList = taosArrayGetLast(tasks);
283✔
612

613
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
566✔
614
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
283✔
615
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
283✔
616
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
283✔
617
  }
618
}
283✔
619

620
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
2,171✔
621
  int32_t code = 0;
2,171✔
622
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
2,171✔
623
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
2,171✔
624

625
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
8,093✔
626
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
5,922✔
627
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
5,922✔
628

629
    if (hasExtraSink) {
5,922✔
630
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
5,739✔
631
    } else {
632
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
183!
UNCOV
633
        mError("failed bind task to sink task since %s", tstrerror(code));
×
634
      }
635
    }
636
  }
637
}
2,171✔
638

639
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
305✔
640
  int32_t code = 0;
305✔
641
  size_t size = taosArrayGetSize(tasks);
305✔
642
  if (size < 2) {
305!
UNCOV
643
    mError("task list size is less than 2");
×
UNCOV
644
    return;
×
645
  }
646
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
305✔
647
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
305✔
648

649
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
305✔
650
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
305✔
651
  for (int i = begin; i < end; i++) {
1,140✔
652
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
835✔
653
    pUpTask->info.selfChildId = i - begin;
835✔
654
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
835✔
655
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
835!
UNCOV
656
      mError("failed bind task to sink task since %s", tstrerror(code));
×
657
    }
658
  }
659
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
305✔
660
}
661

662
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
1,628✔
663
                                SArray* pVerList) {
664
  int32_t code = 0;
1,628✔
665
  SSdb*   pSdb = pMnode->pSdb;
1,628✔
666
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,628!
667
  bool    hasExtraSink = false;
1,628✔
668
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
1,628✔
669
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
1,628✔
670
  if (pDbObj == NULL) {
1,628!
UNCOV
671
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
672
    TAOS_RETURN(code);
×
673
  }
674

675
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
1,628✔
676
  sdbRelease(pSdb, pDbObj);
1,628✔
677

678
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
1,628✔
679
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
680
  pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,628✔
681
  pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,628✔
682

683
  if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
1,628✔
684
    // add extra sink
685
    hasExtraSink = true;
1,455✔
686
    code = addSinkTask(pMnode, pStream, pEpset);
1,455✔
687
    if (code != TSDB_CODE_SUCCESS) {
1,455!
UNCOV
688
      return code;
×
689
    }
690
  }
691

692
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
1,628✔
693

694
  SSubplan* plan = getScanSubPlan(pPlan);  // source plan
1,628✔
695
  if (plan == NULL) {
1,628!
696
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
697
    if (terrno != 0) code = terrno;
×
UNCOV
698
    TAOS_RETURN(code);
×
699
  }
700

701
  code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1));
1,628✔
702
  if (code != TSDB_CODE_SUCCESS) {
1,628!
UNCOV
703
    return code;
×
704
  }
705

706
  if (numOfPlanLevel == 1) {
1,628✔
707
    bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink);
1,423✔
708
    if (pStream->conf.fillHistory) {
1,423✔
709
      bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink);
748✔
710
    }
711
    return TDB_CODE_SUCCESS;
1,423✔
712
  }
713

714
  if (numOfPlanLevel == 3) {
205✔
715
    plan = getAggSubPlan(pPlan, 1);  // middle agg plan
176✔
716
    if (plan == NULL) {
176!
UNCOV
717
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
      if (terrno != 0) code = terrno;
×
UNCOV
719
      TAOS_RETURN(code);
×
720
    }
721
    do {
5✔
722
      SArray** list = taosArrayGetLast(pStream->tasks);
181✔
723
      float    size = (float)taosArrayGetSize(*list);
181✔
724
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
181✔
725
      if (cnt <= 1) break;
181✔
726

727
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
5!
728
      addNewTaskList(pStream);
5✔
729

730
      for (int j = 0; j < cnt; j++) {
17✔
731
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
12✔
732
        if (code != TSDB_CODE_SUCCESS) {
12!
733
          return code;
×
734
        }
735

736
        bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
12✔
737
        if (pStream->conf.fillHistory) {
12✔
738
          bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
10✔
739
        }
740
      }
741
    } while (1);
742
  }
743

744
  plan = getAggSubPlan(pPlan, 0);
205✔
745
  if (plan == NULL) {
205!
UNCOV
746
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
747
    if (terrno != 0) code = terrno;
×
UNCOV
748
    TAOS_RETURN(code);
×
749
  }
750

751
  mDebug("doScheduleStream add final agg");
205✔
752
  SArray** list = taosArrayGetLast(pStream->tasks);
205✔
753
  size_t   size = taosArrayGetSize(*list);
205✔
754
  addNewTaskList(pStream);
205✔
755
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
205✔
756
  if (code != TSDB_CODE_SUCCESS) {
205!
UNCOV
757
    TAOS_RETURN(code);
×
758
  }
759
  bindTwoLevel(pStream->tasks, 0, size);
205✔
760
  if (pStream->conf.fillHistory) {
205✔
761
    bindTwoLevel(pStream->pHTasksList, 0, size);
78✔
762
  }
763

764
  bindAggSink(pStream, pMnode, pStream->tasks);
205✔
765
  if (pStream->conf.fillHistory) {
205✔
766
    bindAggSink(pStream, pMnode, pStream->pHTasksList);
78✔
767
  }
768
  TAOS_RETURN(code);
205✔
769
}
770

771
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
1,628✔
772
  int32_t     code = 0;
1,628✔
773
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
1,628✔
774
  if (pPlan == NULL) {
1,628!
UNCOV
775
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
776
    TAOS_RETURN(code);
×
777
  }
778

779
  SEpSet mnodeEpset = {0};
1,628✔
780
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
1,628✔
781

782
  code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
1,628✔
783
  qDestroyQueryPlan(pPlan);
1,628✔
784

785
  TAOS_RETURN(code);
1,628✔
786
}
787

788
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
481✔
789
  int32_t     code = 0;
481✔
790
  SSdb*       pSdb = pMnode->pSdb;
481✔
791
  SVgObj*     pVgroup = NULL;
481✔
792
  SQueryPlan* pPlan = NULL;
481✔
793
  SSubplan*   pSubplan = NULL;
481✔
794

795
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
481✔
796
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
400✔
797
    if (pPlan == NULL) {
400!
798
      return TSDB_CODE_QRY_INVALID_INPUT;
×
799
    }
800
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
81✔
801
    SNode* pAst = NULL;
6✔
802
    code = nodesStringToNode(pTopic->ast, &pAst);
6✔
803
    if (code != 0) {
6!
UNCOV
804
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
805
      return code;
×
806
    }
807

808
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
6✔
809
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
6✔
810
    if (code != 0) {
6!
UNCOV
811
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
812
      nodesDestroyNode(pAst);
×
UNCOV
813
      return code;
×
814
    }
815
    nodesDestroyNode(pAst);
6✔
816
  }
817

818
  if (pPlan) {
481✔
819
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
406!
820
    if (levelNum != 1) {
406!
UNCOV
821
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
UNCOV
822
      goto END;
×
823
    }
824

825
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
406✔
826
    if (pNodeListNode == NULL){
406!
UNCOV
827
      code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
828
      goto END;
×
829
    }
830
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
406!
831
    if (opNum != 1) {
406!
UNCOV
832
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
UNCOV
833
      goto END;
×
834
    }
835

836
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
406✔
837
  }
838

839
  void* pIter = NULL;
481✔
840
  while (1) {
2,530✔
841
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
3,011✔
842
    if (pIter == NULL) {
3,011✔
843
      break;
481✔
844
    }
845

846
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
2,530✔
847
      sdbRelease(pSdb, pVgroup);
1,350✔
848
      continue;
1,350✔
849
    }
850

851
    pSub->vgNum++;
1,180✔
852

853
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
1,180✔
854
    if (pVgEp == NULL){
1,180!
UNCOV
855
      code = terrno;
×
UNCOV
856
      goto END;
×
857
    }
858
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,180✔
859
    pVgEp->vgId = pVgroup->vgId;
1,180✔
860
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
2,360!
UNCOV
861
      code = terrno;
×
UNCOV
862
      taosMemoryFree(pVgEp);
×
UNCOV
863
      goto END;
×
864
    }
865
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
1,180!
866
    sdbRelease(pSdb, pVgroup);
1,180✔
867
  }
868

869
  if (pSubplan) {
481✔
870
    int32_t msgLen;
871

872
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
406!
UNCOV
873
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
874
      goto END;
×
875
    }
876
  } else {
877
    pSub->qmsg = taosStrdup("");
75✔
878
  }
879

880
END:
481✔
881
  qDestroyQueryPlan(pPlan);
481✔
882
  return code;
481✔
883
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc