• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3548

04 Dec 2024 01:03PM UTC coverage: 59.846% (-0.8%) from 60.691%
#3548

push

travis-ci

web-flow
Merge pull request #29033 from taosdata/fix/calculate-vnode-memory-used

fix/calculate-vnode-memory-used

118484 of 254183 branches covered (46.61%)

Branch coverage included in aggregate %.

199691 of 277471 relevant lines covered (71.97%)

18794141.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.17
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

30
static bool hasCountWindowNode(SPhysiNode* pNode) {
10,722✔
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
10,722✔
32
    return true;
158✔
33
  } else {
34
    size_t size = LIST_LENGTH(pNode->pChildren);
10,564✔
35

36
    for (int32_t i = 0; i < size; ++i) {
16,576✔
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
6,156✔
38
      if (hasCountWindowNode(pChild)) {
6,156✔
39
        return true;
144✔
40
      }
41
    }
42

43
    return false;
10,420✔
44
  }
45
}
46

47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
4,566✔
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
4,566✔
49
}
50

51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
8✔
52
                           int64_t watermark, int64_t deleteMark) {
53
  int32_t     code = 0;
8✔
54
  SNode*      pAst = NULL;
8✔
55
  SQueryPlan* pPlan = NULL;
8✔
56

57
  if (nodesStringToNode(ast, &pAst) < 0) {
8!
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
8!
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

67
  SPlanContext cxt = {
8✔
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
8!
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
8!
83
  if (levelNum != 1) {
8!
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
8✔
88

89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
8!
90
  if (opNum != 1) {
8!
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
8✔
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
8!
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

101
END:
8✔
102
  if (pAst) nodesDestroyNode(pAst);
8!
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
8!
104
  TAOS_RETURN(code);
8✔
105
}
106

107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
6,401✔
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
6,401✔
109

110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
6,401✔
111

112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
6,401✔
113
    pInfo->type = TASK_OUTPUT__SMA;
58✔
114
    pInfo->smaSink.smaId = pStream->smaId;
58✔
115
  } else {
116
    pInfo->type = TASK_OUTPUT__TABLE;
6,343✔
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
6,343✔
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
6,343✔
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
6,343!
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
6,343!
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

125
  return 0;
6,401✔
126
}
127

128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
5,837✔
129
                                        SStreamTask* pTask) {
130
  int32_t code = 0;
5,837✔
131
  bool isShuffle = false;
5,837✔
132

133
  if (pStream->fixedSinkVgId == 0) {
5,837✔
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
5,779✔
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
5,779!
136
      isShuffle = true;
5,723✔
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
5,723✔
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
5,723✔
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
5,723!
140
    }
141

142
    sdbRelease(pMnode->pSdb, pDb);
5,779✔
143
  }
144

145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
5,837✔
146

147
  if (isShuffle) {
5,837✔
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
5,723✔
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
5,723✔
150

151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
5,723✔
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
23,440✔
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
17,717✔
154

155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
38,461!
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
38,461✔
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
38,461✔
158
          pVgInfo->taskId = pSinkTask->id.taskId;
17,717✔
159
          break;
17,717✔
160
        }
161
      }
162
    }
163
  } else {
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
114✔
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
114✔
166
  }
167

168
  TAOS_RETURN(code);
5,837✔
169
}
170

171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
6,753✔
172
  int32_t msgLen;
173

174
  pTask->info.nodeId = pVgroup->vgId;
6,753✔
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
6,753✔
176

177
  plan->execNode.nodeId = pTask->info.nodeId;
6,753✔
178
  plan->execNode.epSet = pTask->info.epSet;
6,753✔
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
6,753✔
180
}
181

182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
211✔
183
  SSnodeObj* pObj = NULL;
211✔
184
  void*      pIter = NULL;
211✔
185
  // TODO random fetch
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
211✔
187
  sdbCancelFetch(pMnode->pSdb, pIter);
211✔
188
  return pObj;
211✔
189
}
190

191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
89✔
192
  int32_t msgLen;
193

194
  pTask->info.nodeId = SNODE_HANDLE;
89✔
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
89✔
196

197
  plan->execNode.nodeId = SNODE_HANDLE;
89✔
198
  plan->execNode.epSet = pTask->info.epSet;
89✔
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
89✔
200

201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
89✔
202
}
203

204
// random choose a node to do compute
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
154✔
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
154✔
207
  if (pDbObj == NULL) {
154!
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

212
  if (pStream->indexForMultiAggBalance == -1) {
154✔
213
    taosSeedRand(taosSafeRand());
142✔
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
142✔
215
  }
216

217
  int32_t index = 0;
154✔
218
  void*   pIter = NULL;
154✔
219
  SVgObj* pVgroup = NULL;
154✔
220
  while (1) {
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
580✔
222
    if (pIter == NULL) break;
580!
223
    if (pVgroup->dbUid != pStream->sourceDbUid) {
580✔
224
      sdbRelease(pMnode->pSdb, pVgroup);
247✔
225
      continue;
247✔
226
    }
227
    if (index++ == pStream->indexForMultiAggBalance) {
333✔
228
      pStream->indexForMultiAggBalance++;
154✔
229
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
154✔
230
      sdbCancelFetch(pMnode->pSdb, pIter);
154✔
231
      break;
154✔
232
    }
233
    sdbRelease(pMnode->pSdb, pVgroup);
179✔
234
  }
235
  sdbRelease(pMnode->pSdb, pDbObj);
154✔
236

237
  return pVgroup;
154✔
238
}
239

240
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
6,216✔
241
  int64_t  uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
6,216✔
242
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
6,216✔
243

244
  SStreamTask* pTask = NULL;
6,216✔
245
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, 0, *pTaskList, pStream->conf.fillHistory,
6,216✔
246
                                pStream->subTableWithoutMd5, &pTask);
6,216✔
247
  if (code != 0) {
6,216!
248
    return code;
×
249
  }
250

251
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, isFillhistory);
6,216✔
252

253
  pTask->info.nodeId = pVgroup->vgId;
6,216✔
254
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
6,216✔
255
  return mndSetSinkTaskInfo(pStream, pTask);
6,216✔
256
}
257

258
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
3,972✔
259
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
3,972✔
260
  if (code != 0) {
3,972!
261
    return code;
×
262
  }
263
  if (pStream->conf.fillHistory) {
3,972✔
264
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
2,244✔
265
    if (code != 0) {
2,244!
266
      return code;
×
267
    }
268
  }
269
  return TDB_CODE_SUCCESS;
3,972✔
270
}
271

272
// create sink node for each vgroup.
273
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
1,396✔
274
  SSdb* pSdb = pMnode->pSdb;
1,396✔
275
  void* pIter = NULL;
1,396✔
276

277
  while (1) {
4,991✔
278
    SVgObj* pVgroup = NULL;
6,387✔
279
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
6,387✔
280
    if (pIter == NULL) {
6,387✔
281
      break;
1,396✔
282
    }
283

284
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
4,991✔
285
      sdbRelease(pSdb, pVgroup);
1,048✔
286
      continue;
1,048✔
287
    }
288

289
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
3,943✔
290
    if (code != 0) {
3,943!
291
      sdbRelease(pSdb, pVgroup);
×
292
      return code;
×
293
    }
294

295
    sdbRelease(pSdb, pVgroup);
3,943✔
296
  }
297

298
  return TDB_CODE_SUCCESS;
1,396✔
299
}
300

301
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
6,544✔
302
  int32_t size = (int32_t) taosArrayGetSize(pList);
6,544✔
303
  for (int32_t i = 0; i < size; ++i) {
10,538✔
304
    SVgroupVer* pVer = taosArrayGet(pList, i);
6,608✔
305
    if (pVer->vgId == vgId) {
6,608✔
306
      return pVer->ver;
2,614✔
307
    }
308
  }
309

310
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
3,930✔
311
  return 1;
3,930✔
312
}
313

314
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
6,544✔
315
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
6,544✔
316
  if (latestVer < 0) {
6,544!
317
    latestVer = 0;
×
318
  }
319

320
  // set the correct ts, which is the last key of queried table.
321
  SDataRange*  pRange = &pTask->dataRange;
6,544✔
322
  STimeWindow* pWindow = &pRange->window;
6,544✔
323

324
  if (pTask->info.fillHistory) {
6,544✔
325
    pWindow->skey = INT64_MIN;
2,283✔
326
    pWindow->ekey = skey - 1;
2,283✔
327

328
    pRange->range.minVer = 0;
2,283✔
329
    pRange->range.maxVer = latestVer;
2,283✔
330
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
2,283✔
331
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
332
  } else {
333
    pWindow->skey = skey;
4,261✔
334
    pWindow->ekey = INT64_MAX;
4,261✔
335

336
    pRange->range.minVer = latestVer + 1;
4,261✔
337
    pRange->range.maxVer = INT64_MAX;
4,261✔
338

339
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
4,261✔
340
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
341
  }
342
}
6,544✔
343

344
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
4,566✔
345
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
4,566✔
346

347
  if (hasCountWindowNode && (!isFillhistoryTask)) {
4,566✔
348
    SStreamStatus* pStatus = &pTask->status;
79✔
349
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
79✔
350
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
351
    pStatus->taskStatus = TASK_STATUS__HALT;
79✔
352
  }
353
}
4,566✔
354

355
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam, SStreamTask** pTask) {
6,544✔
356
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
6,544✔
357
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
6,544✔
358

359
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, pStream->conf.trigger,
6,544✔
360
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
6,544✔
361
                                pStream->subTableWithoutMd5, pTask);
6,544✔
362
  return code;
6,544✔
363
}
364

365
static int32_t addNewTaskList(SStreamObj* pStream) {
3,229✔
366
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
3,229✔
367
  if (pTaskList == NULL) {
3,229!
368
    mError("failed init task list, code:%s", tstrerror(terrno));
×
369
    return terrno;
×
370
  }
371

372
  if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
6,458!
373
    mError("failed to put into array, code:%s", tstrerror(terrno));
×
374
    return terrno;
×
375
  }
376

377
  if (pStream->conf.fillHistory) {
3,229✔
378
    pTaskList = taosArrayInit(0, POINTER_BYTES);
1,661✔
379
    if (pTaskList == NULL) {
1,661!
380
      mError("failed init task list, code:%s", tstrerror(terrno));
×
381
      return terrno;
×
382
    }
383

384
    if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
3,322!
385
      mError("failed to put into array, code:%s", tstrerror(terrno));
×
386
      return terrno;
×
387
    }
388
  }
389

390
  return TSDB_CODE_SUCCESS;
3,229✔
391
}
392

393
// set the history task id
394
static void setHTasksId(SStreamObj* pStream) {
1,667✔
395
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->tasks);
1,667✔
396
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTasksList);
1,667✔
397

398
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
6,289✔
399
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
4,622✔
400
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
4,622✔
401

402
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
4,622✔
403
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
4,622✔
404

405
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
4,622✔
406
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
4,622✔
407

408
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
4,622✔
409
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
410
  }
411
}
1,667✔
412

413
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
6,544✔
414
                               SArray* pVerList, SVgObj* pVgroup, bool isHistoryTask, bool useTriggerParam) {
415
  SStreamTask* pTask = NULL;
6,544✔
416
  int32_t code = buildSourceTask(pStream, pEpset, isHistoryTask, useTriggerParam, &pTask);
6,544✔
417
  if (code != TSDB_CODE_SUCCESS) {
6,544!
418
    return code;
×
419
  }
420

421
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, historyTask:%d", pTask->id.idStr, pTask, pVgroup->vgId, isHistoryTask);
6,544✔
422

423
  if (pStream->conf.fillHistory) {
6,544✔
424
    haltInitialTaskStatus(pTask, plan, isHistoryTask);
4,566✔
425
  }
426

427
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
6,544✔
428

429
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
6,544✔
430
  if (code != TSDB_CODE_SUCCESS) {
6,544!
431
    return code;
×
432
  }
433

434
  return TDB_CODE_SUCCESS;
6,544✔
435
}
436

437
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
1,600✔
438
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,600!
439
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
1,600✔
440
  if (LIST_LENGTH(inner->pNodeList) != 1) {
1,600!
441
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
442
    return NULL;
×
443
  }
444

445
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
1,600✔
446
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
1,600!
447
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
448
    return NULL;
×
449
  }
450
  return plan;
1,600✔
451
}
452

453
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
369✔
454
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
369✔
455
  if (LIST_LENGTH(inner->pNodeList) != 1) {
369!
456
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
457
    return NULL;
×
458
  }
459

460
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
369✔
461
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
369!
462
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
463
    return NULL;
×
464
  }
465
  return plan;
369✔
466
}
467

468
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
1,600✔
469
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
470
  void*   pIter = NULL;
1,600✔
471
  SSdb*   pSdb = pMnode->pSdb;
1,600✔
472
  int32_t code = addNewTaskList(pStream);
1,600✔
473
  if (code) {
1,600!
474
    return code;
×
475
  }
476

477
  while (1) {
5,481✔
478
    SVgObj* pVgroup = NULL;
7,081✔
479
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
7,081✔
480
    if (pIter == NULL) {
7,081✔
481
      break;
1,600✔
482
    }
483

484
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
5,481✔
485
      sdbRelease(pSdb, pVgroup);
1,220✔
486
      continue;
1,220✔
487
    }
488

489
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
4,261✔
490
    if (code != 0) {
4,261!
491
      mError("failed to create stream task, code:%s", tstrerror(code));
×
492

493
      // todo drop the added source tasks.
494
      sdbRelease(pSdb, pVgroup);
×
495
      return code;
×
496
    }
497

498
    if (pStream->conf.fillHistory) {
4,261✔
499
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
2,283✔
500
      if (code != 0) {
2,283!
501
        sdbRelease(pSdb, pVgroup);
×
502
        return code;
×
503
      }
504
    }
505

506
    sdbRelease(pSdb, pVgroup);
4,261✔
507
  }
508

509
  if (pStream->conf.fillHistory) {
1,600✔
510
    setHTasksId(pStream);
795✔
511
  }
512

513
  return TSDB_CODE_SUCCESS;
1,600✔
514
}
515

516
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam,
298✔
517
                            SStreamTask** pAggTask) {
518
  *pAggTask = NULL;
298✔
519

520
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
298✔
521
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
298✔
522

523
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, pStream->conf.trigger,
298✔
524
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
298✔
525
                                pStream->subTableWithoutMd5, pAggTask);
298✔
526
  return code;
298✔
527
}
528

529
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
298✔
530
                            SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
531
  int32_t      code = 0;
298✔
532
  SStreamTask* pTask = NULL;
298✔
533
  const char*  id = NULL;
298✔
534

535
  code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
298✔
536
  if (code != TSDB_CODE_SUCCESS) {
298!
537
    return code;
×
538
  }
539

540
  id = pTask->id.idStr;
298✔
541
  if (pSnode != NULL) {
298✔
542
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
89✔
543
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, isFillhistory);
89✔
544
  } else {
545
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
209✔
546
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, isFillhistory);
209!
547
  }
548
  return code;
298✔
549
}
550

551
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
211✔
552
  SVgObj*    pVgroup = NULL;
211✔
553
  SSnodeObj* pSnode = NULL;
211✔
554
  int32_t    code = 0;
211✔
555
  if (tsDeployOnSnode) {
211!
556
    pSnode = mndSchedFetchOneSnode(pMnode);
211✔
557
    if (pSnode == NULL) {
211✔
558
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
154✔
559
    }
560
  } else {
561
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
562
  }
563

564
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
211✔
565
  if (code != 0) {
211!
566
    goto END;
×
567
  }
568

569
  if (pStream->conf.fillHistory) {
211✔
570
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
87✔
571
    if (code != 0) {
87!
572
      goto END;
×
573
    }
574

575
    setHTasksId(pStream);
87✔
576
  }
577

578
END:
124✔
579
  if (pSnode != NULL) {
211✔
580
    sdbRelease(pMnode->pSdb, pSnode);
57✔
581
  } else {
582
    sdbRelease(pMnode->pSdb, pVgroup);
154✔
583
  }
584
  return code;
211✔
585
}
586

587
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
1,425✔
588
  int32_t code = addNewTaskList(pStream);
1,425✔
589
  if (code) {
1,425!
590
    return code;
×
591
  }
592

593
  if (pStream->fixedSinkVgId == 0) {
1,425✔
594
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
1,396✔
595
    if (code != 0) {
1,396!
596
      return code;
×
597
    }
598
  } else {
599
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
29✔
600
    if (code != 0) {
29!
601
      return code;
×
602
    }
603
  }
604

605
  if (pStream->conf.fillHistory) {
1,425✔
606
    setHTasksId(pStream);
785✔
607
  }
608
  return TDB_CODE_SUCCESS;
1,425✔
609
}
610

611
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
5,837✔
612
  int32_t code = 0;
5,837✔
613
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
5,837!
614
    mError("failed bind task to sink task since %s", tstrerror(code));
×
615
  }
616
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
23,668✔
617
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
17,831✔
618
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
17,831!
619
      mError("failed bind task to sink task since %s", tstrerror(code));
×
620
    }
621
  }
622
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
5,837✔
623
}
5,837✔
624

625
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
276✔
626
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
276✔
627
  SArray** pAggTaskList = taosArrayGetLast(tasks);
276✔
628

629
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
552✔
630
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
276✔
631
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
276✔
632
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
276✔
633
  }
634
}
276✔
635

636
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
2,119✔
637
  int32_t code = 0;
2,119✔
638
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
2,119✔
639
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
2,119✔
640

641
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
7,865✔
642
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
5,746✔
643
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
5,746✔
644

645
    if (hasExtraSink) {
5,746✔
646
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
5,561✔
647
    } else {
648
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
185!
649
        mError("failed bind task to sink task since %s", tstrerror(code));
×
650
      }
651
    }
652
  }
653
}
2,119✔
654

655
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
298✔
656
  int32_t code = 0;
298✔
657
  size_t size = taosArrayGetSize(tasks);
298✔
658
  if (size < 2) {
298!
659
    mError("task list size is less than 2");
×
660
    return;
×
661
  }
662
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
298✔
663
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
298✔
664

665
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
298✔
666
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
298✔
667
  for (int i = begin; i < end; i++) {
1,118✔
668
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
820✔
669
    pUpTask->info.selfChildId = i - begin;
820✔
670
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
820✔
671
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
820!
672
      mError("failed bind task to sink task since %s", tstrerror(code));
×
673
    }
674
  }
675
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
298✔
676
}
677

678
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
1,600✔
679
                                SArray* pVerList) {
680
  int32_t code = 0;
1,600✔
681
  SSdb*   pSdb = pMnode->pSdb;
1,600✔
682
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
1,600!
683
  bool    hasExtraSink = false;
1,600✔
684
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
1,600✔
685
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
1,600✔
686
  if (pDbObj == NULL) {
1,600!
687
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
688
    TAOS_RETURN(code);
×
689
  }
690

691
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
1,600✔
692
  sdbRelease(pSdb, pDbObj);
1,600✔
693

694
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
1,600✔
695
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
696

697
  pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,600✔
698
  pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
1,600✔
699
  if (pStream->tasks == NULL || pStream->pHTasksList == NULL) {
1,600!
700
    mError("failed to create stream obj, code:%s", tstrerror(terrno));
×
701
    return terrno;
×
702
  }
703

704
  if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
1,600✔
705
    // add extra sink
706
    hasExtraSink = true;
1,425✔
707
    code = addSinkTask(pMnode, pStream, pEpset);
1,425✔
708
    if (code != TSDB_CODE_SUCCESS) {
1,425!
709
      return code;
×
710
    }
711
  }
712

713
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
1,600✔
714

715
  SSubplan* plan = getScanSubPlan(pPlan);  // source plan
1,600✔
716
  if (plan == NULL) {
1,600!
717
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
    if (terrno != 0) code = terrno;
×
719
    TAOS_RETURN(code);
×
720
  }
721

722
  code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1));
1,600✔
723
  if (code != TSDB_CODE_SUCCESS) {
1,600!
724
    return code;
×
725
  }
726

727
  if (numOfPlanLevel == 1) {
1,600✔
728
    bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink);
1,401✔
729
    if (pStream->conf.fillHistory) {
1,401✔
730
      bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink);
718✔
731
    }
732
    return TDB_CODE_SUCCESS;
1,401✔
733
  }
734

735
  if (numOfPlanLevel == 3) {
199✔
736
    plan = getAggSubPlan(pPlan, 1);  // middle agg plan
170✔
737
    if (plan == NULL) {
170!
738
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
739
      if (terrno != 0) code = terrno;
×
740
      TAOS_RETURN(code);
×
741
    }
742

743
    do {
5✔
744
      SArray** list = taosArrayGetLast(pStream->tasks);
175✔
745
      float    size = (float)taosArrayGetSize(*list);
175✔
746
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
175✔
747
      if (cnt <= 1) break;
175✔
748

749
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
5!
750
      code = addNewTaskList(pStream);
5✔
751
      if (code) {
5!
752
        return code;
×
753
      }
754

755
      for (int j = 0; j < cnt; j++) {
17✔
756
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
12✔
757
        if (code != TSDB_CODE_SUCCESS) {
12!
758
          return code;
×
759
        }
760

761
        bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
12✔
762
        if (pStream->conf.fillHistory) {
12✔
763
          bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
10✔
764
        }
765
      }
766
    } while (1);
767
  }
768

769
  plan = getAggSubPlan(pPlan, 0);
199✔
770
  if (plan == NULL) {
199!
771
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
772
    if (terrno != 0) code = terrno;
×
773
    TAOS_RETURN(code);
×
774
  }
775

776
  mDebug("doScheduleStream add final agg");
199✔
777
  SArray** list = taosArrayGetLast(pStream->tasks);
199✔
778
  size_t   size = taosArrayGetSize(*list);
199✔
779

780
  code = addNewTaskList(pStream);
199✔
781
  if (code) {
199!
782
    return code;
×
783
  }
784

785
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
199✔
786
  if (code != TSDB_CODE_SUCCESS) {
199!
787
    TAOS_RETURN(code);
×
788
  }
789
  bindTwoLevel(pStream->tasks, 0, size);
199✔
790
  if (pStream->conf.fillHistory) {
199✔
791
    bindTwoLevel(pStream->pHTasksList, 0, size);
77✔
792
  }
793

794
  bindAggSink(pStream, pMnode, pStream->tasks);
199✔
795
  if (pStream->conf.fillHistory) {
199✔
796
    bindAggSink(pStream, pMnode, pStream->pHTasksList);
77✔
797
  }
798
  TAOS_RETURN(code);
199✔
799
}
800

801
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
1,600✔
802
  int32_t     code = 0;
1,600✔
803
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
1,600✔
804
  if (pPlan == NULL) {
1,600!
805
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
806
    TAOS_RETURN(code);
×
807
  }
808

809
  SEpSet mnodeEpset = {0};
1,600✔
810
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
1,600✔
811

812
  code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
1,600✔
813
  qDestroyQueryPlan(pPlan);
1,600✔
814

815
  TAOS_RETURN(code);
1,600✔
816
}
817

818
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
478✔
819
  int32_t     code = 0;
478✔
820
  SSdb*       pSdb = pMnode->pSdb;
478✔
821
  SVgObj*     pVgroup = NULL;
478✔
822
  SQueryPlan* pPlan = NULL;
478✔
823
  SSubplan*   pSubplan = NULL;
478✔
824

825
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
478✔
826
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
397✔
827
    if (pPlan == NULL) {
397!
828
      return TSDB_CODE_QRY_INVALID_INPUT;
×
829
    }
830
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
81✔
831
    SNode* pAst = NULL;
6✔
832
    code = nodesStringToNode(pTopic->ast, &pAst);
6✔
833
    if (code != 0) {
6!
834
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
835
      return code;
×
836
    }
837

838
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
6✔
839
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
6✔
840
    if (code != 0) {
6!
841
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
842
      nodesDestroyNode(pAst);
×
843
      return code;
×
844
    }
845
    nodesDestroyNode(pAst);
6✔
846
  }
847

848
  if (pPlan) {
478✔
849
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
403!
850
    if (levelNum != 1) {
403!
851
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
852
      goto END;
×
853
    }
854

855
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
403✔
856
    if (pNodeListNode == NULL){
403!
857
      code = TSDB_CODE_OUT_OF_MEMORY;
×
858
      goto END;
×
859
    }
860
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
403!
861
    if (opNum != 1) {
403!
862
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
863
      goto END;
×
864
    }
865

866
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
403✔
867
  }
868

869
  void* pIter = NULL;
478✔
870
  while (1) {
2,519✔
871
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
2,997✔
872
    if (pIter == NULL) {
2,997✔
873
      break;
478✔
874
    }
875

876
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
2,519✔
877
      sdbRelease(pSdb, pVgroup);
1,351✔
878
      continue;
1,351✔
879
    }
880

881
    pSub->vgNum++;
1,168✔
882

883
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
1,168✔
884
    if (pVgEp == NULL){
1,168!
885
      code = terrno;
×
886
      goto END;
×
887
    }
888
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
1,168✔
889
    pVgEp->vgId = pVgroup->vgId;
1,168✔
890
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
2,336!
891
      code = terrno;
×
892
      taosMemoryFree(pVgEp);
×
893
      goto END;
×
894
    }
895
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
1,168!
896
    sdbRelease(pSdb, pVgroup);
1,168✔
897
  }
898

899
  if (pSubplan) {
478✔
900
    int32_t msgLen;
901

902
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
403!
903
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
904
      goto END;
×
905
    }
906
  } else {
907
    pSub->qmsg = taosStrdup("");
75✔
908
  }
909

910
END:
478✔
911
  qDestroyQueryPlan(pPlan);
478✔
912
  return code;
478✔
913
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc