• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.35
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

30
static bool hasCountWindowNode(SPhysiNode* pNode) {
3,482✔
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
3,482✔
32
    return true;
50✔
33
  } else {
34
    size_t size = LIST_LENGTH(pNode->pChildren);
3,432✔
35

36
    for (int32_t i = 0; i < size; ++i) {
5,458✔
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
2,074✔
38
      if (hasCountWindowNode(pChild)) {
2,074✔
39
        return true;
48✔
40
      }
41
    }
42

43
    return false;
3,384✔
44
  }
45
}
46

47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
1,408✔
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
1,408✔
49
}
50

51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
8✔
52
                           int64_t watermark, int64_t deleteMark) {
53
  int32_t     code = 0;
8✔
54
  SNode*      pAst = NULL;
8✔
55
  SQueryPlan* pPlan = NULL;
8✔
56

57
  if (nodesStringToNode(ast, &pAst) < 0) {
8!
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
8!
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

67
  SPlanContext cxt = {
8✔
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
8!
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
8!
83
  if (levelNum != 1) {
8!
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
8✔
88

89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
8!
90
  if (opNum != 1) {
8!
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
8✔
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
8!
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

101
END:
8✔
102
  if (pAst) nodesDestroyNode(pAst);
8!
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
8!
104
  TAOS_RETURN(code);
8✔
105
}
106

107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
2,623✔
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
2,623✔
109

110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
2,623✔
111

112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
2,623!
113
    pInfo->type = TASK_OUTPUT__SMA;
44✔
114
    pInfo->smaSink.smaId = pStream->smaId;
44✔
115
  } else {
116
    pInfo->type = TASK_OUTPUT__TABLE;
2,579✔
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
2,579✔
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
2,579✔
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
2,579!
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
2,579!
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

125
  return 0;
2,623✔
126
}
127

128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
2,309✔
129
                                        SStreamTask* pTask) {
130
  int32_t code = 0;
2,309✔
131
  bool isShuffle = false;
2,309✔
132

133
  if (pStream->fixedSinkVgId == 0) {
2,309✔
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
2,265✔
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
2,265!
136
      isShuffle = true;
2,215✔
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
2,215✔
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
2,215✔
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
2,215!
140
    }
141

142
    sdbRelease(pMnode->pSdb, pDb);
2,265✔
143
  }
144

145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
2,309✔
146

147
  if (isShuffle) {
2,309✔
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
2,215✔
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
2,215✔
150

151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
2,215✔
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
9,142✔
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
6,927✔
154

155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
14,962!
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
14,962✔
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
14,962✔
158
          pVgInfo->taskId = pSinkTask->id.taskId;
6,927✔
159
          break;
6,927✔
160
        }
161
      }
162
    }
163
  } else {
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
94✔
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
94✔
166
  }
167

168
  TAOS_RETURN(code);
2,309✔
169
}
170

171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
2,825✔
172
  int32_t msgLen;
173

174
  pTask->info.nodeId = pVgroup->vgId;
2,825✔
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2,825✔
176

177
  plan->execNode.nodeId = pTask->info.nodeId;
2,825✔
178
  plan->execNode.epSet = pTask->info.epSet;
2,825✔
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
2,825✔
180
}
181

182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
100✔
183
  SSnodeObj* pObj = NULL;
100✔
184
  void*      pIter = NULL;
100✔
185
  // TODO random fetch
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
100✔
187
  sdbCancelFetch(pMnode->pSdb, pIter);
100✔
188
  return pObj;
100✔
189
}
190

191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
40✔
192
  int32_t msgLen;
193

194
  pTask->info.nodeId = SNODE_HANDLE;
40✔
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
40✔
196

197
  plan->execNode.nodeId = SNODE_HANDLE;
40✔
198
  plan->execNode.epSet = pTask->info.epSet;
40✔
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
40!
200

201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
40✔
202
}
203

204
// random choose a node to do compute
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
70✔
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
70✔
207
  if (pDbObj == NULL) {
70!
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

212
  if (pStream->indexForMultiAggBalance == -1) {
70!
213
    taosSeedRand(taosSafeRand());
70✔
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
70✔
215
  }
216

217
  int32_t index = 0;
70✔
218
  void*   pIter = NULL;
70✔
219
  SVgObj* pVgroup = NULL;
70✔
220
  while (1) {
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
361✔
222
    if (pIter == NULL) break;
361!
223
    if (pVgroup->dbUid != pStream->sourceDbUid) {
361✔
224
      sdbRelease(pMnode->pSdb, pVgroup);
208✔
225
      continue;
208✔
226
    }
227
    if (index++ == pStream->indexForMultiAggBalance) {
153✔
228
      pStream->indexForMultiAggBalance++;
70✔
229
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
70✔
230
      sdbCancelFetch(pMnode->pSdb, pIter);
70✔
231
      break;
70✔
232
    }
233
    sdbRelease(pMnode->pSdb, pVgroup);
83✔
234
  }
235
  sdbRelease(pMnode->pSdb, pDbObj);
70✔
236

237
  return pVgroup;
70✔
238
}
239

240
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
2,476✔
241
  int64_t  uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
2,476✔
242
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
2,476✔
243

244
  SStreamTask* pTask = NULL;
2,476✔
245
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory,
2,476✔
246
                                pStream->subTableWithoutMd5, &pTask);
2,476✔
247
  if (code != 0) {
2,476!
248
    return code;
×
249
  }
250

251
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, isFillhistory);
2,476✔
252

253
  pTask->info.nodeId = pVgroup->vgId;
2,476✔
254
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2,476✔
255
  return mndSetSinkTaskInfo(pStream, pTask);
2,476✔
256
}
257

258
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
1,799✔
259
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
1,799✔
260
  if (code != 0) {
1,799!
261
    return code;
×
262
  }
263
  if (pStream->conf.fillHistory) {
1,799✔
264
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
677✔
265
    if (code != 0) {
677!
266
      return code;
×
267
    }
268
  }
269
  return TDB_CODE_SUCCESS;
1,799✔
270
}
271

272
// create sink node for each vgroup.
273
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
589✔
274
  SSdb* pSdb = pMnode->pSdb;
589✔
275
  void* pIter = NULL;
589✔
276

277
  while (1) {
2,427✔
278
    SVgObj* pVgroup = NULL;
3,016✔
279
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
3,016✔
280
    if (pIter == NULL) {
3,016✔
281
      break;
589✔
282
    }
283

284
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
2,427✔
285
      sdbRelease(pSdb, pVgroup);
650✔
286
      continue;
650✔
287
    }
288

289
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
1,777✔
290
    if (code != 0) {
1,777!
291
      sdbRelease(pSdb, pVgroup);
×
292
      return code;
×
293
    }
294

295
    sdbRelease(pSdb, pVgroup);
1,777✔
296
  }
297

298
  return TDB_CODE_SUCCESS;
589✔
299
}
300

301
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
2,736✔
302
  int32_t size = (int32_t) taosArrayGetSize(pList);
2,736✔
303
  for (int32_t i = 0; i < size; ++i) {
3,618✔
304
    SVgroupVer* pVer = taosArrayGet(pList, i);
1,328✔
305
    if (pVer->vgId == vgId) {
1,328✔
306
      return pVer->ver;
446✔
307
    }
308
  }
309

310
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
2,290✔
311
  return 1;
2,290✔
312
}
313

314
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
2,736✔
315
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
2,736✔
316
  if (latestVer < 0) {
2,736!
317
    latestVer = 0;
×
318
  }
319

320
  // set the correct ts, which is the last key of queried table.
321
  SDataRange*  pRange = &pTask->dataRange;
2,736✔
322
  STimeWindow* pWindow = &pRange->window;
2,736✔
323

324
  if (pTask->info.fillHistory) {
2,736✔
325
    pWindow->skey = INT64_MIN;
704✔
326
    pWindow->ekey = skey - 1;
704✔
327

328
    pRange->range.minVer = 0;
704✔
329
    pRange->range.maxVer = latestVer;
704✔
330
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
704✔
331
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
332
  } else {
333
    pWindow->skey = skey;
2,032✔
334
    pWindow->ekey = INT64_MAX;
2,032✔
335

336
    pRange->range.minVer = latestVer + 1;
2,032✔
337
    pRange->range.maxVer = INT64_MAX;
2,032✔
338

339
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
2,032✔
340
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
341
  }
342
}
2,736✔
343

344
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
1,408✔
345
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
1,408✔
346

347
  if (hasCountWindowNode && (!isFillhistoryTask)) {
1,408✔
348
    SStreamStatus* pStatus = &pTask->status;
25✔
349
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
25✔
350
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
351
    pStatus->taskStatus = TASK_STATUS__HALT;
25✔
352
  }
353
}
1,408✔
354

355
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam, SStreamTask** pTask) {
2,736✔
356
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
2,736✔
357
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
2,736✔
358

359
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
2,736✔
360
                     *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pTask);
2,736✔
361
  return code;
2,736✔
362
}
363

364
static void addNewTaskList(SStreamObj* pStream) {
1,464✔
365
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
1,464✔
366
  if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
2,928!
UNCOV
367
    mError("failed to put into array");
×
368
  }
369

370
  if (pStream->conf.fillHistory) {
1,464✔
371
    pTaskList = taosArrayInit(0, POINTER_BYTES);
518✔
372
    if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
1,036!
UNCOV
373
      mError("failed to put into array");
×
374
    }
375
  }
376
}
1,464✔
377

378
// set the history task id
379
static void setHTasksId(SStreamObj* pStream) {
518✔
380
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->tasks);
518✔
381
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTasksList);
518✔
382

383
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
1,928✔
384
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
1,410✔
385
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
1,410✔
386

387
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
1,410✔
388
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
1,410✔
389

390
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
1,410✔
391
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
1,410✔
392

393
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
1,410✔
394
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
395
  }
396
}
518✔
397

398
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
2,736✔
399
                               SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) {
400
  SStreamTask* pTask = NULL;
2,736✔
401
  int32_t code = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
2,736✔
402
  if (code != TSDB_CODE_SUCCESS) {
2,736!
UNCOV
403
    return code;
×
404
  }
405

406
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId,
2,736✔
407
         isFillhistory);
408

409
  if (pStream->conf.fillHistory) {
2,736✔
410
    haltInitialTaskStatus(pTask, plan, isFillhistory);
1,408✔
411
  }
412

413
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
2,736✔
414

415
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
2,736✔
416
  if (code != TSDB_CODE_SUCCESS) {
2,736!
UNCOV
417
    return code;
×
418
  }
419

420
  return TDB_CODE_SUCCESS;
2,736✔
421
}
422

423
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
753✔
424
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
753!
425
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
753✔
426
  if (LIST_LENGTH(inner->pNodeList) != 1) {
753!
UNCOV
427
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
428
    return NULL;
×
429
  }
430

431
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
753✔
432
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
753!
UNCOV
433
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
434
    return NULL;
×
435
  }
436
  return plan;
753✔
437
}
438

439
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
174✔
440
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
174✔
441
  if (LIST_LENGTH(inner->pNodeList) != 1) {
174!
UNCOV
442
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
443
    return NULL;
×
444
  }
445

446
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
174✔
447
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
174!
UNCOV
448
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
449
    return NULL;
×
450
  }
451
  return plan;
174✔
452
}
453

454
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
753✔
455
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
456
  addNewTaskList(pStream);
753✔
457

458
  void* pIter = NULL;
753✔
459
  SSdb* pSdb = pMnode->pSdb;
753✔
460
  while (1) {
2,810✔
461
    SVgObj* pVgroup;
462
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
3,563✔
463
    if (pIter == NULL) {
3,563✔
464
      break;
753✔
465
    }
466

467
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
2,810✔
468
      sdbRelease(pSdb, pVgroup);
778✔
469
      continue;
778✔
470
    }
471

472
    int code =
473
        doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
2,032✔
474
    if (code != 0) {
2,032!
UNCOV
475
      mError("create stream task, code:%s", tstrerror(code));
×
476

477
      // todo drop the added source tasks.
UNCOV
478
      sdbRelease(pSdb, pVgroup);
×
UNCOV
479
      return code;
×
480
    }
481

482
    if (pStream->conf.fillHistory) {
2,032✔
483
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
704✔
484
      if (code != 0) {
704!
UNCOV
485
        sdbRelease(pSdb, pVgroup);
×
UNCOV
486
        return code;
×
487
      }
488
    }
489

490
    sdbRelease(pSdb, pVgroup);
2,032✔
491
  }
492

493
  if (pStream->conf.fillHistory) {
753✔
494
    setHTasksId(pStream);
247✔
495
  }
496

497
  return TSDB_CODE_SUCCESS;
753✔
498
}
499

500
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam,
129✔
501
                            SStreamTask** pAggTask) {
502
  *pAggTask = NULL;
129✔
503

504
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
129✔
505
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
129✔
506

507
  int32_t code =
508
      tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
129✔
509
                     *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pAggTask);
129!
510
  return code;
129✔
511
}
512

513
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
129✔
514
                            SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
515
  int32_t      code = 0;
129✔
516
  SStreamTask* pTask = NULL;
129✔
517
  const char*  id = NULL;
129✔
518

519
  code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
129✔
520
  if (code != TSDB_CODE_SUCCESS) {
129!
UNCOV
521
    return code;
×
522
  }
523

524
  id = pTask->id.idStr;
129✔
525
  if (pSnode != NULL) {
129✔
526
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
40✔
527
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, isFillhistory);
40!
528
  } else {
529
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
89✔
530
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, isFillhistory);
89!
531
  }
532
  return code;
129✔
533
}
534

535
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
100✔
536
  SVgObj*    pVgroup = NULL;
100✔
537
  SSnodeObj* pSnode = NULL;
100✔
538
  int32_t    code = 0;
100✔
539
  if (tsDeployOnSnode) {
100!
540
    pSnode = mndSchedFetchOneSnode(pMnode);
100✔
541
    if (pSnode == NULL) {
100✔
542
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
70✔
543
    }
544
  } else {
UNCOV
545
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
546
  }
547

548
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
100✔
549
  if (code != 0) {
100!
UNCOV
550
    goto END;
×
551
  }
552

553
  if (pStream->conf.fillHistory) {
100✔
554
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
29✔
555
    if (code != 0) {
29!
UNCOV
556
      goto END;
×
557
    }
558

559
    setHTasksId(pStream);
29✔
560
  }
561

562
END:
71✔
563
  if (pSnode != NULL) {
100✔
564
    sdbRelease(pMnode->pSdb, pSnode);
30✔
565
  } else {
566
    sdbRelease(pMnode->pSdb, pVgroup);
70✔
567
  }
568
  return code;
100✔
569
}
570

571
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
611✔
572
  int32_t code = 0;
611✔
573
  addNewTaskList(pStream);
611✔
574

575
  if (pStream->fixedSinkVgId == 0) {
611✔
576
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
589✔
577
    if (code != 0) {
589!
UNCOV
578
      return code;
×
579
    }
580
  } else {
581
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
22✔
582
    if (code != 0) {
22!
UNCOV
583
      return code;
×
584
    }
585
  }
586

587
  if (pStream->conf.fillHistory) {
611✔
588
    setHTasksId(pStream);
242✔
589
  }
590
  return TDB_CODE_SUCCESS;
611✔
591
}
592

593
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
2,309✔
594
  int32_t code = 0;
2,309✔
595
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
2,309!
UNCOV
596
    mError("failed bind task to sink task since %s", tstrerror(code));
×
597
  }
598
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
9,330✔
599
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
7,021✔
600
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
7,021!
UNCOV
601
      mError("failed bind task to sink task since %s", tstrerror(code));
×
602
    }
603
  }
604
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
2,309✔
605
}
2,309✔
606

607
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
129✔
608
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
129✔
609
  SArray** pAggTaskList = taosArrayGetLast(tasks);
129✔
610

611
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
258✔
612
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
129✔
613
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
129✔
614
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
129✔
615
  }
616
}
129✔
617

618
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
871✔
619
  int32_t code = 0;
871✔
620
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
871✔
621
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
871✔
622

623
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
3,198✔
624
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
2,327✔
625
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
2,327✔
626

627
    if (hasExtraSink) {
2,327✔
628
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
2,180✔
629
    } else {
630
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
147!
UNCOV
631
        mError("failed bind task to sink task since %s", tstrerror(code));
×
632
      }
633
    }
634
  }
635
}
871✔
636

637
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
129✔
638
  int32_t code = 0;
129✔
639
  size_t size = taosArrayGetSize(tasks);
129✔
640
  if (size < 2) {
129!
UNCOV
641
    mError("task list size is less than 2");
×
UNCOV
642
    return;
×
643
  }
644
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
129✔
645
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
129✔
646

647
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
129✔
648
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
129!
649
  for (int i = begin; i < end; i++) {
538✔
650
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
409✔
651
    pUpTask->info.selfChildId = i - begin;
409✔
652
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
409✔
653
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
409!
UNCOV
654
      mError("failed bind task to sink task since %s", tstrerror(code));
×
655
    }
656
  }
657
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
129✔
658
}
659

660
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
753✔
661
                                SArray* pVerList) {
662
  int32_t code = 0;
753✔
663
  SSdb*   pSdb = pMnode->pSdb;
753✔
664
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
753!
665
  bool    hasExtraSink = false;
753✔
666
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
753✔
667
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
753✔
668
  if (pDbObj == NULL) {
753!
UNCOV
669
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
670
    TAOS_RETURN(code);
×
671
  }
672

673
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
753✔
674
  sdbRelease(pSdb, pDbObj);
753✔
675

676
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
753✔
677
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
678
  pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
753✔
679
  pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
753✔
680

681
  if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
753✔
682
    // add extra sink
683
    hasExtraSink = true;
611✔
684
    code = addSinkTask(pMnode, pStream, pEpset);
611✔
685
    if (code != TSDB_CODE_SUCCESS) {
611!
UNCOV
686
      return code;
×
687
    }
688
  }
689

690
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
753✔
691

692
  SSubplan* plan = getScanSubPlan(pPlan);  // source plan
753✔
693
  if (plan == NULL) {
753!
UNCOV
694
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
695
    if (terrno != 0) code = terrno;
×
696
    TAOS_RETURN(code);
×
697
  }
698
  code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1);
753✔
699
  if (code != TSDB_CODE_SUCCESS) {
753!
UNCOV
700
    return code;
×
701
  }
702

703
  if (numOfPlanLevel == 1) {
753✔
704
    bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink);
653✔
705
    if (pStream->conf.fillHistory) {
653✔
706
      bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink);
218✔
707
    }
708
    return TDB_CODE_SUCCESS;
653✔
709
  }
710

711
  if (numOfPlanLevel == 3) {
100✔
712
    plan = getAggSubPlan(pPlan, 1);  // middle agg plan
74✔
713
    if (plan == NULL) {
74!
UNCOV
714
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
715
      if (terrno != 0) code = terrno;
×
UNCOV
716
      TAOS_RETURN(code);
×
717
    }
718
    do {
×
719
      SArray** list = taosArrayGetLast(pStream->tasks);
74✔
720
      float    size = (float)taosArrayGetSize(*list);
74✔
721
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
74✔
722
      if (cnt <= 1) break;
74!
723

UNCOV
724
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
×
UNCOV
725
      addNewTaskList(pStream);
×
726

UNCOV
727
      for (int j = 0; j < cnt; j++) {
×
UNCOV
728
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
×
UNCOV
729
        if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
730
          return code;
×
731
        }
732

733
        bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
×
UNCOV
734
        if (pStream->conf.fillHistory) {
×
UNCOV
735
          bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
×
736
        }
737
      }
738
    } while (1);
739
  }
740

741
  plan = getAggSubPlan(pPlan, 0);
100✔
742
  if (plan == NULL) {
100!
UNCOV
743
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
744
    if (terrno != 0) code = terrno;
×
UNCOV
745
    TAOS_RETURN(code);
×
746
  }
747

748
  mDebug("doScheduleStream add final agg");
100✔
749
  SArray** list = taosArrayGetLast(pStream->tasks);
100✔
750
  size_t   size = taosArrayGetSize(*list);
100✔
751
  addNewTaskList(pStream);
100✔
752
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
100✔
753
  if (code != TSDB_CODE_SUCCESS) {
100!
UNCOV
754
    TAOS_RETURN(code);
×
755
  }
756
  bindTwoLevel(pStream->tasks, 0, size);
100✔
757
  if (pStream->conf.fillHistory) {
100✔
758
    bindTwoLevel(pStream->pHTasksList, 0, size);
29✔
759
  }
760

761
  bindAggSink(pStream, pMnode, pStream->tasks);
100✔
762
  if (pStream->conf.fillHistory) {
100✔
763
    bindAggSink(pStream, pMnode, pStream->pHTasksList);
29✔
764
  }
765
  TAOS_RETURN(code);
100✔
766
}
767

768
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
753✔
769
  int32_t     code = 0;
753✔
770
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
753✔
771
  if (pPlan == NULL) {
753!
UNCOV
772
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
773
    TAOS_RETURN(code);
×
774
  }
775

776
  SEpSet mnodeEpset = {0};
753✔
777
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
753✔
778

779
  code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
753✔
780
  qDestroyQueryPlan(pPlan);
753✔
781

782
  TAOS_RETURN(code);
753✔
783
}
784

785
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
66✔
786
  int32_t     code = 0;
66✔
787
  SSdb*       pSdb = pMnode->pSdb;
66✔
788
  SVgObj*     pVgroup = NULL;
66✔
789
  SQueryPlan* pPlan = NULL;
66✔
790
  SSubplan*   pSubplan = NULL;
66✔
791

792
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
66✔
793
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
58✔
794
    if (pPlan == NULL) {
58!
UNCOV
795
      return TSDB_CODE_QRY_INVALID_INPUT;
×
796
    }
797
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
8!
798
    SNode* pAst = NULL;
×
UNCOV
799
    code = nodesStringToNode(pTopic->ast, &pAst);
×
UNCOV
800
    if (code != 0) {
×
801
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
802
      return code;
×
803
    }
804

805
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
×
UNCOV
806
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
×
UNCOV
807
    if (code != 0) {
×
808
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
809
      nodesDestroyNode(pAst);
×
810
      return code;
×
811
    }
812
    nodesDestroyNode(pAst);
×
813
  }
814

815
  if (pPlan) {
66✔
816
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
58!
817
    if (levelNum != 1) {
58!
UNCOV
818
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
UNCOV
819
      goto END;
×
820
    }
821

822
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
58✔
823
    if (pNodeListNode == NULL){
58!
UNCOV
824
      code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
825
      goto END;
×
826
    }
827
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
58!
828
    if (opNum != 1) {
58!
UNCOV
829
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
UNCOV
830
      goto END;
×
831
    }
832

833
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
58✔
834
  }
835

836
  void* pIter = NULL;
66✔
837
  while (1) {
251✔
838
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
317✔
839
    if (pIter == NULL) {
317✔
840
      break;
66✔
841
    }
842

843
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
251✔
844
      sdbRelease(pSdb, pVgroup);
120✔
845
      continue;
120✔
846
    }
847

848
    pSub->vgNum++;
131✔
849

850
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
131✔
851
    if (pVgEp == NULL){
131!
UNCOV
852
      code = terrno;
×
UNCOV
853
      goto END;
×
854
    }
855
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
131✔
856
    pVgEp->vgId = pVgroup->vgId;
131✔
857
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
262!
UNCOV
858
      code = terrno;
×
UNCOV
859
      taosMemoryFree(pVgEp);
×
UNCOV
860
      goto END;
×
861
    }
862
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
131!
863
    sdbRelease(pSdb, pVgroup);
131✔
864
  }
865

866
  if (pSubplan) {
66✔
867
    int32_t msgLen;
868

869
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
58!
UNCOV
870
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
UNCOV
871
      goto END;
×
872
    }
873
  } else {
874
    pSub->qmsg = taosStrdup("");
8✔
875
  }
876

877
END:
66✔
878
  qDestroyQueryPlan(pPlan);
66✔
879
  return code;
66✔
880
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc