• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/mnode/impl/src/mndScheduler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndScheduler.h"
17
#include "mndDb.h"
18
#include "mndMnode.h"
19
#include "mndSnode.h"
20
#include "mndVgroup.h"
21
#include "parser.h"
22
#include "tcompare.h"
23
#include "tmisce.h"
24
#include "tname.h"
25
#include "tuuid.h"
26

27
#define SINK_NODE_LEVEL (0)
28
extern bool tsDeployOnSnode;
29

UNCOV
30
static bool hasCountWindowNode(SPhysiNode* pNode) {
×
UNCOV
31
  if (nodeType(pNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT) {
×
UNCOV
32
    return true;
×
33
  } else {
UNCOV
34
    size_t size = LIST_LENGTH(pNode->pChildren);
×
35

UNCOV
36
    for (int32_t i = 0; i < size; ++i) {
×
UNCOV
37
      SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pNode->pChildren, i);
×
UNCOV
38
      if (hasCountWindowNode(pChild)) {
×
UNCOV
39
        return true;
×
40
      }
41
    }
42

UNCOV
43
    return false;
×
44
  }
45
}
46

UNCOV
47
static bool isCountWindowStreamTask(SSubplan* pPlan) {
×
UNCOV
48
  return hasCountWindowNode((SPhysiNode*)pPlan->pNode);
×
49
}
50

UNCOV
51
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
×
52
                           int64_t watermark, int64_t deleteMark) {
UNCOV
53
  int32_t     code = 0;
×
UNCOV
54
  SNode*      pAst = NULL;
×
UNCOV
55
  SQueryPlan* pPlan = NULL;
×
56

UNCOV
57
  if (nodesStringToNode(ast, &pAst) < 0) {
×
58
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
59
    goto END;
×
60
  }
61

UNCOV
62
  if (qSetSTableIdForRsma(pAst, uid) < 0) {
×
63
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
64
    goto END;
×
65
  }
66

UNCOV
67
  SPlanContext cxt = {
×
68
      .pAstRoot = pAst,
69
      .topicQuery = false,
70
      .streamQuery = true,
71
      .rSmaQuery = true,
72
      .triggerType = triggerType,
73
      .watermark = watermark,
74
      .deleteMark = deleteMark,
75
  };
76

UNCOV
77
  if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
×
78
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
79
    goto END;
×
80
  }
81

UNCOV
82
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
×
UNCOV
83
  if (levelNum != 1) {
×
84
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
85
    goto END;
×
86
  }
UNCOV
87
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
×
88

UNCOV
89
  int32_t opNum = LIST_LENGTH(inner->pNodeList);
×
UNCOV
90
  if (opNum != 1) {
×
91
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
92
    goto END;
×
93
  }
94

UNCOV
95
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
×
UNCOV
96
  if (qSubPlanToString(plan, pDst, pDstLen) < 0) {
×
97
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
98
    goto END;
×
99
  }
100

UNCOV
101
END:
×
UNCOV
102
  if (pAst) nodesDestroyNode(pAst);
×
UNCOV
103
  if (pPlan) nodesDestroyNode((SNode*)pPlan);
×
UNCOV
104
  TAOS_RETURN(code);
×
105
}
106

UNCOV
107
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
×
UNCOV
108
  STaskOutputInfo* pInfo = &pTask->outputInfo;
×
109

UNCOV
110
  mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr);
×
111

UNCOV
112
  if (pStream->smaId != 0 && pStream->subTableWithoutMd5 != 1) {
×
UNCOV
113
    pInfo->type = TASK_OUTPUT__SMA;
×
UNCOV
114
    pInfo->smaSink.smaId = pStream->smaId;
×
115
  } else {
UNCOV
116
    pInfo->type = TASK_OUTPUT__TABLE;
×
UNCOV
117
    pInfo->tbSink.stbUid = pStream->targetStbUid;
×
UNCOV
118
    (void)memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
119
    pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
×
UNCOV
120
    if (pInfo->tbSink.pSchemaWrapper == NULL) {
×
121
      return TSDB_CODE_OUT_OF_MEMORY;
×
122
    }
123
  }
124

UNCOV
125
  return 0;
×
126
}
127

UNCOV
128
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
×
129
                                        SStreamTask* pTask) {
UNCOV
130
  int32_t code = 0;
×
UNCOV
131
  bool isShuffle = false;
×
132

UNCOV
133
  if (pStream->fixedSinkVgId == 0) {
×
UNCOV
134
    SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
×
UNCOV
135
    if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
×
UNCOV
136
      isShuffle = true;
×
UNCOV
137
      pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH;
×
UNCOV
138
      pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
×
UNCOV
139
      TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL));
×
140
    }
141

UNCOV
142
    sdbRelease(pMnode->pSdb, pDb);
×
143
  }
144

UNCOV
145
  int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
×
146

UNCOV
147
  if (isShuffle) {
×
UNCOV
148
    (void)memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
149
    SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
×
150

UNCOV
151
    int32_t numOfVgroups = taosArrayGetSize(pVgs);
×
UNCOV
152
    for (int32_t i = 0; i < numOfVgroups; i++) {
×
UNCOV
153
      SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
×
154

UNCOV
155
      for (int32_t j = 0; j < numOfSinkNodes; j++) {
×
UNCOV
156
        SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
×
UNCOV
157
        if (pSinkTask->info.nodeId == pVgInfo->vgId) {
×
UNCOV
158
          pVgInfo->taskId = pSinkTask->id.taskId;
×
UNCOV
159
          break;
×
160
        }
161
      }
162
    }
163
  } else {
UNCOV
164
    SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
×
UNCOV
165
    streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask);
×
166
  }
167

UNCOV
168
  TAOS_RETURN(code);
×
169
}
170

UNCOV
171
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
×
172
  int32_t msgLen;
173

UNCOV
174
  pTask->info.nodeId = pVgroup->vgId;
×
UNCOV
175
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
176

UNCOV
177
  plan->execNode.nodeId = pTask->info.nodeId;
×
UNCOV
178
  plan->execNode.epSet = pTask->info.epSet;
×
UNCOV
179
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
×
180
}
181

UNCOV
182
SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
×
UNCOV
183
  SSnodeObj* pObj = NULL;
×
UNCOV
184
  void*      pIter = NULL;
×
185
  // TODO random fetch
UNCOV
186
  pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
×
UNCOV
187
  sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
188
  return pObj;
×
189
}
190

UNCOV
191
int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) {
×
192
  int32_t msgLen;
193

UNCOV
194
  pTask->info.nodeId = SNODE_HANDLE;
×
UNCOV
195
  pTask->info.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
×
196

UNCOV
197
  plan->execNode.nodeId = SNODE_HANDLE;
×
UNCOV
198
  plan->execNode.epSet = pTask->info.epSet;
×
UNCOV
199
  mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
×
200

UNCOV
201
  return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen);
×
202
}
203

204
// random choose a node to do compute
UNCOV
205
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, SStreamObj* pStream) {
×
UNCOV
206
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->sourceDb);
×
UNCOV
207
  if (pDbObj == NULL) {
×
208
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
209
    return NULL;
×
210
  }
211

UNCOV
212
  if (pStream->indexForMultiAggBalance == -1) {
×
UNCOV
213
    taosSeedRand(taosSafeRand());
×
UNCOV
214
    pStream->indexForMultiAggBalance = taosRand() % pDbObj->cfg.numOfVgroups;
×
215
  }
216

UNCOV
217
  int32_t index = 0;
×
UNCOV
218
  void*   pIter = NULL;
×
UNCOV
219
  SVgObj* pVgroup = NULL;
×
220
  while (1) {
UNCOV
221
    pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
×
UNCOV
222
    if (pIter == NULL) break;
×
UNCOV
223
    if (pVgroup->dbUid != pStream->sourceDbUid) {
×
UNCOV
224
      sdbRelease(pMnode->pSdb, pVgroup);
×
UNCOV
225
      continue;
×
226
    }
UNCOV
227
    if (index++ == pStream->indexForMultiAggBalance) {
×
UNCOV
228
      pStream->indexForMultiAggBalance++;
×
UNCOV
229
      pStream->indexForMultiAggBalance %= pDbObj->cfg.numOfVgroups;
×
UNCOV
230
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
231
      break;
×
232
    }
UNCOV
233
    sdbRelease(pMnode->pSdb, pVgroup);
×
234
  }
UNCOV
235
  sdbRelease(pMnode->pSdb, pDbObj);
×
236

UNCOV
237
  return pVgroup;
×
238
}
239

UNCOV
240
static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) {
×
UNCOV
241
  int64_t  uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
×
UNCOV
242
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
×
243

UNCOV
244
  SStreamTask* pTask = NULL;
×
UNCOV
245
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, 0, *pTaskList, pStream->conf.fillHistory,
×
UNCOV
246
                                pStream->subTableWithoutMd5, &pTask);
×
UNCOV
247
  if (code != 0) {
×
248
    return code;
×
249
  }
250

UNCOV
251
  mDebug("doAddSinkTask taskId:%s, %p vgId:%d, isFillHistory:%d", pTask->id.idStr, pTask, pVgroup->vgId, isFillhistory);
×
252

UNCOV
253
  pTask->info.nodeId = pVgroup->vgId;
×
UNCOV
254
  pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
255
  return mndSetSinkTaskInfo(pStream, pTask);
×
256
}
257

UNCOV
258
static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj) {
×
UNCOV
259
  int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false);
×
UNCOV
260
  if (code != 0) {
×
261
    return code;
×
262
  }
UNCOV
263
  if (pStream->conf.fillHistory) {
×
UNCOV
264
    code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true);
×
UNCOV
265
    if (code != 0) {
×
266
      return code;
×
267
    }
268
  }
UNCOV
269
  return TDB_CODE_SUCCESS;
×
270
}
271

272
// create sink node for each vgroup.
UNCOV
273
static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
×
UNCOV
274
  SSdb* pSdb = pMnode->pSdb;
×
UNCOV
275
  void* pIter = NULL;
×
276

UNCOV
277
  while (1) {
×
UNCOV
278
    SVgObj* pVgroup = NULL;
×
UNCOV
279
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
×
UNCOV
280
    if (pIter == NULL) {
×
UNCOV
281
      break;
×
282
    }
283

UNCOV
284
    if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
×
UNCOV
285
      sdbRelease(pSdb, pVgroup);
×
UNCOV
286
      continue;
×
287
    }
288

UNCOV
289
    int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup);
×
UNCOV
290
    if (code != 0) {
×
291
      sdbRelease(pSdb, pVgroup);
×
292
      return code;
×
293
    }
294

UNCOV
295
    sdbRelease(pSdb, pVgroup);
×
296
  }
297

UNCOV
298
  return TDB_CODE_SUCCESS;
×
299
}
300

UNCOV
301
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
×
UNCOV
302
  int32_t size = (int32_t) taosArrayGetSize(pList);
×
UNCOV
303
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
304
    SVgroupVer* pVer = taosArrayGet(pList, i);
×
UNCOV
305
    if (pVer->vgId == vgId) {
×
UNCOV
306
      return pVer->ver;
×
307
    }
308
  }
309

UNCOV
310
  mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
×
UNCOV
311
  return 1;
×
312
}
313

UNCOV
314
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
×
UNCOV
315
  int64_t latestVer = getVgroupLastVer(pVerList, vgId);
×
UNCOV
316
  if (latestVer < 0) {
×
317
    latestVer = 0;
×
318
  }
319

320
  // set the correct ts, which is the last key of queried table.
UNCOV
321
  SDataRange*  pRange = &pTask->dataRange;
×
UNCOV
322
  STimeWindow* pWindow = &pRange->window;
×
323

UNCOV
324
  if (pTask->info.fillHistory) {
×
UNCOV
325
    pWindow->skey = INT64_MIN;
×
UNCOV
326
    pWindow->ekey = skey - 1;
×
327

UNCOV
328
    pRange->range.minVer = 0;
×
UNCOV
329
    pRange->range.maxVer = latestVer;
×
UNCOV
330
    mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64,
×
331
           pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
332
  } else {
UNCOV
333
    pWindow->skey = skey;
×
UNCOV
334
    pWindow->ekey = INT64_MAX;
×
335

UNCOV
336
    pRange->range.minVer = latestVer + 1;
×
UNCOV
337
    pRange->range.maxVer = INT64_MAX;
×
338

UNCOV
339
    mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, pTask->id.taskId,
×
340
           pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer);
341
  }
UNCOV
342
}
×
343

UNCOV
344
static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) {
×
UNCOV
345
  bool hasCountWindowNode = isCountWindowStreamTask(pPlan);
×
346

UNCOV
347
  if (hasCountWindowNode && (!isFillhistoryTask)) {
×
UNCOV
348
    SStreamStatus* pStatus = &pTask->status;
×
UNCOV
349
    mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set",
×
350
           pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus));
UNCOV
351
    pStatus->taskStatus = TASK_STATUS__HALT;
×
352
  }
UNCOV
353
}
×
354

UNCOV
355
static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam, SStreamTask** pTask) {
×
UNCOV
356
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
×
UNCOV
357
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
×
358

UNCOV
359
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, pStream->conf.trigger,
×
UNCOV
360
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
×
UNCOV
361
                                pStream->subTableWithoutMd5, pTask);
×
UNCOV
362
  return code;
×
363
}
364

UNCOV
365
static int32_t addNewTaskList(SStreamObj* pStream) {
×
UNCOV
366
  SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
367
  if (pTaskList == NULL) {
×
368
    mError("failed init task list, code:%s", tstrerror(terrno));
×
369
    return terrno;
×
370
  }
371

UNCOV
372
  if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) {
×
373
    mError("failed to put into array, code:%s", tstrerror(terrno));
×
374
    return terrno;
×
375
  }
376

UNCOV
377
  if (pStream->conf.fillHistory) {
×
UNCOV
378
    pTaskList = taosArrayInit(0, POINTER_BYTES);
×
UNCOV
379
    if (pTaskList == NULL) {
×
380
      mError("failed init task list, code:%s", tstrerror(terrno));
×
381
      return terrno;
×
382
    }
383

UNCOV
384
    if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) {
×
385
      mError("failed to put into array, code:%s", tstrerror(terrno));
×
386
      return terrno;
×
387
    }
388
  }
389

UNCOV
390
  return TSDB_CODE_SUCCESS;
×
391
}
392

393
// set the history task id
UNCOV
394
static void setHTasksId(SStreamObj* pStream) {
×
UNCOV
395
  SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->tasks);
×
UNCOV
396
  SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTasksList);
×
397

UNCOV
398
  for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
×
UNCOV
399
    SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
×
UNCOV
400
    SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
×
401

UNCOV
402
    (*pStreamTask)->hTaskInfo.id.taskId = (*pHTask)->id.taskId;
×
UNCOV
403
    (*pStreamTask)->hTaskInfo.id.streamId = (*pHTask)->id.streamId;
×
404

UNCOV
405
    (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
×
UNCOV
406
    (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
×
407

UNCOV
408
    mDebug("s-task:0x%" PRIx64 "-0x%x related history task:0x%" PRIx64 "-0x%x, level:%d", (*pStreamTask)->id.streamId,
×
409
           (*pStreamTask)->id.taskId, (*pHTask)->id.streamId, (*pHTask)->id.taskId, (*pHTask)->info.taskLevel);
410
  }
UNCOV
411
}
×
412

UNCOV
413
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey,
×
414
                               SArray* pVerList, SVgObj* pVgroup, bool isHistoryTask, bool useTriggerParam) {
UNCOV
415
  SStreamTask* pTask = NULL;
×
UNCOV
416
  int32_t code = buildSourceTask(pStream, pEpset, isHistoryTask, useTriggerParam, &pTask);
×
UNCOV
417
  if (code != TSDB_CODE_SUCCESS) {
×
418
    return code;
×
419
  }
420

UNCOV
421
  mDebug("doAddSourceTask taskId:%s, %p vgId:%d, historyTask:%d", pTask->id.idStr, pTask, pVgroup->vgId, isHistoryTask);
×
422

UNCOV
423
  if (pStream->conf.fillHistory) {
×
UNCOV
424
    haltInitialTaskStatus(pTask, plan, isHistoryTask);
×
425
  }
426

UNCOV
427
  streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId);
×
428

UNCOV
429
  code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
×
UNCOV
430
  if (code != TSDB_CODE_SUCCESS) {
×
431
    return code;
×
432
  }
433

UNCOV
434
  return TDB_CODE_SUCCESS;
×
435
}
436

UNCOV
437
static SSubplan* getScanSubPlan(const SQueryPlan* pPlan) {
×
UNCOV
438
  int32_t        numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
×
UNCOV
439
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1);
×
UNCOV
440
  if (LIST_LENGTH(inner->pNodeList) != 1) {
×
441
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
442
    return NULL;
×
443
  }
444

UNCOV
445
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
×
UNCOV
446
  if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
×
447
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
448
    return NULL;
×
449
  }
UNCOV
450
  return plan;
×
451
}
452

UNCOV
453
static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) {
×
UNCOV
454
  SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index);
×
UNCOV
455
  if (LIST_LENGTH(inner->pNodeList) != 1) {
×
456
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
457
    return NULL;
×
458
  }
459

UNCOV
460
  SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
×
UNCOV
461
  if (plan->subplanType != SUBPLAN_TYPE_MERGE) {
×
462
    terrno = TSDB_CODE_QRY_INVALID_INPUT;
×
463
    return NULL;
×
464
  }
UNCOV
465
  return plan;
×
466
}
467

UNCOV
468
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
×
469
                             int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) {
UNCOV
470
  void*   pIter = NULL;
×
UNCOV
471
  SSdb*   pSdb = pMnode->pSdb;
×
UNCOV
472
  int32_t code = addNewTaskList(pStream);
×
UNCOV
473
  if (code) {
×
474
    return code;
×
475
  }
476

UNCOV
477
  while (1) {
×
UNCOV
478
    SVgObj* pVgroup = NULL;
×
UNCOV
479
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
×
UNCOV
480
    if (pIter == NULL) {
×
UNCOV
481
      break;
×
482
    }
483

UNCOV
484
    if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
×
UNCOV
485
      sdbRelease(pSdb, pVgroup);
×
UNCOV
486
      continue;
×
487
    }
488

UNCOV
489
    code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
×
UNCOV
490
    if (code != 0) {
×
491
      mError("failed to create stream task, code:%s", tstrerror(code));
×
492

493
      // todo drop the added source tasks.
494
      sdbRelease(pSdb, pVgroup);
×
495
      return code;
×
496
    }
497

UNCOV
498
    if (pStream->conf.fillHistory) {
×
UNCOV
499
      code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam);
×
UNCOV
500
      if (code != 0) {
×
501
        sdbRelease(pSdb, pVgroup);
×
502
        return code;
×
503
      }
504
    }
505

UNCOV
506
    sdbRelease(pSdb, pVgroup);
×
507
  }
508

UNCOV
509
  if (pStream->conf.fillHistory) {
×
UNCOV
510
    setHTasksId(pStream);
×
511
  }
512

UNCOV
513
  return TSDB_CODE_SUCCESS;
×
514
}
515

UNCOV
516
static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam,
×
517
                            SStreamTask** pAggTask) {
UNCOV
518
  *pAggTask = NULL;
×
519

UNCOV
520
  uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
×
UNCOV
521
  SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
×
522

UNCOV
523
  int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, pStream->conf.trigger,
×
UNCOV
524
                                useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory,
×
UNCOV
525
                                pStream->subTableWithoutMd5, pAggTask);
×
UNCOV
526
  return code;
×
527
}
528

UNCOV
529
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup,
×
530
                            SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam) {
UNCOV
531
  int32_t      code = 0;
×
UNCOV
532
  SStreamTask* pTask = NULL;
×
UNCOV
533
  const char*  id = NULL;
×
534

UNCOV
535
  code = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask);
×
UNCOV
536
  if (code != TSDB_CODE_SUCCESS) {
×
537
    return code;
×
538
  }
539

UNCOV
540
  id = pTask->id.idStr;
×
UNCOV
541
  if (pSnode != NULL) {
×
UNCOV
542
    code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode);
×
UNCOV
543
    mDebug("doAddAggTask taskId:%s, %p snode id:%d, isFillHistory:%d", id, pTask, pSnode->id, isFillhistory);
×
544
  } else {
UNCOV
545
    code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup);
×
UNCOV
546
    mDebug("doAddAggTask taskId:%s, %p vgId:%d, isFillHistory:%d", id, pTask, pVgroup->vgId, isFillhistory);
×
547
  }
UNCOV
548
  return code;
×
549
}
550

UNCOV
551
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam) {
×
UNCOV
552
  SVgObj*    pVgroup = NULL;
×
UNCOV
553
  SSnodeObj* pSnode = NULL;
×
UNCOV
554
  int32_t    code = 0;
×
UNCOV
555
  if (tsDeployOnSnode) {
×
UNCOV
556
    pSnode = mndSchedFetchOneSnode(pMnode);
×
UNCOV
557
    if (pSnode == NULL) {
×
UNCOV
558
      pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
559
    }
560
  } else {
561
    pVgroup = mndSchedFetchOneVg(pMnode, pStream);
×
562
  }
563

UNCOV
564
  code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
×
UNCOV
565
  if (code != 0) {
×
566
    goto END;
×
567
  }
568

UNCOV
569
  if (pStream->conf.fillHistory) {
×
UNCOV
570
    code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
×
UNCOV
571
    if (code != 0) {
×
572
      goto END;
×
573
    }
574

UNCOV
575
    setHTasksId(pStream);
×
576
  }
577

UNCOV
578
END:
×
UNCOV
579
  if (pSnode != NULL) {
×
UNCOV
580
    sdbRelease(pMnode->pSdb, pSnode);
×
581
  } else {
UNCOV
582
    sdbRelease(pMnode->pSdb, pVgroup);
×
583
  }
UNCOV
584
  return code;
×
585
}
586

UNCOV
587
static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) {
×
UNCOV
588
  int32_t code = addNewTaskList(pStream);
×
UNCOV
589
  if (code) {
×
590
    return code;
×
591
  }
592

UNCOV
593
  if (pStream->fixedSinkVgId == 0) {
×
UNCOV
594
    code = doAddShuffleSinkTask(pMnode, pStream, pEpset);
×
UNCOV
595
    if (code != 0) {
×
596
      return code;
×
597
    }
598
  } else {
UNCOV
599
    code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg);
×
UNCOV
600
    if (code != 0) {
×
601
      return code;
×
602
    }
603
  }
604

UNCOV
605
  if (pStream->conf.fillHistory) {
×
UNCOV
606
    setHTasksId(pStream);
×
607
  }
UNCOV
608
  return TDB_CODE_SUCCESS;
×
609
}
610

UNCOV
611
static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task) {
×
UNCOV
612
  int32_t code = 0;
×
UNCOV
613
  if ((code = mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task)) != 0) {
×
614
    mError("failed bind task to sink task since %s", tstrerror(code));
×
615
  }
UNCOV
616
  for (int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) {
×
UNCOV
617
    SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k);
×
UNCOV
618
    if ((code = streamTaskSetUpstreamInfo(pSinkTask, task)) != 0) {
×
619
      mError("failed bind task to sink task since %s", tstrerror(code));
×
620
    }
621
  }
UNCOV
622
  mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr);
×
UNCOV
623
}
×
624

UNCOV
625
static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) {
×
UNCOV
626
  SArray*  pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
×
UNCOV
627
  SArray** pAggTaskList = taosArrayGetLast(tasks);
×
628

UNCOV
629
  for (int i = 0; i < taosArrayGetSize(*pAggTaskList); i++) {
×
UNCOV
630
    SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i);
×
UNCOV
631
    bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask);
×
UNCOV
632
    mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr);
×
633
  }
UNCOV
634
}
×
635

UNCOV
636
static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) {
×
UNCOV
637
  int32_t code = 0;
×
UNCOV
638
  SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL);
×
UNCOV
639
  SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL);
×
640

UNCOV
641
  for (int i = 0; i < taosArrayGetSize(pSourceTaskList); i++) {
×
UNCOV
642
    SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i);
×
UNCOV
643
    mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr);
×
644

UNCOV
645
    if (hasExtraSink) {
×
UNCOV
646
      bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask);
×
647
    } else {
UNCOV
648
      if ((code = mndSetSinkTaskInfo(pStream, pSourceTask)) != 0) {
×
649
        mError("failed bind task to sink task since %s", tstrerror(code));
×
650
      }
651
    }
652
  }
UNCOV
653
}
×
654

UNCOV
655
static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
×
UNCOV
656
  int32_t code = 0;
×
UNCOV
657
  size_t size = taosArrayGetSize(tasks);
×
UNCOV
658
  if (size < 2) {
×
659
    mError("task list size is less than 2");
×
660
    return;
×
661
  }
UNCOV
662
  SArray* pDownTaskList = taosArrayGetP(tasks, size - 1);
×
UNCOV
663
  SArray* pUpTaskList = taosArrayGetP(tasks, size - 2);
×
664

UNCOV
665
  SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList);
×
UNCOV
666
  end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList) : end;
×
UNCOV
667
  for (int i = begin; i < end; i++) {
×
UNCOV
668
    SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
×
UNCOV
669
    pUpTask->info.selfChildId = i - begin;
×
UNCOV
670
    streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
×
UNCOV
671
    if ((code = streamTaskSetUpstreamInfo(*pDownTask, pUpTask)) != 0) {
×
672
      mError("failed bind task to sink task since %s", tstrerror(code));
×
673
    }
674
  }
UNCOV
675
  mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr);
×
676
}
677

UNCOV
678
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey,
×
679
                                SArray* pVerList) {
UNCOV
680
  int32_t code = 0;
×
UNCOV
681
  SSdb*   pSdb = pMnode->pSdb;
×
UNCOV
682
  int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
×
UNCOV
683
  bool    hasExtraSink = false;
×
UNCOV
684
  bool    externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
×
UNCOV
685
  SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb);
×
UNCOV
686
  if (pDbObj == NULL) {
×
687
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
688
    TAOS_RETURN(code);
×
689
  }
690

UNCOV
691
  bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
×
UNCOV
692
  sdbRelease(pSdb, pDbObj);
×
693

UNCOV
694
  mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel,
×
695
         externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan);
696

UNCOV
697
  pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
×
UNCOV
698
  pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES);
×
UNCOV
699
  if (pStream->tasks == NULL || pStream->pHTasksList == NULL) {
×
700
    mError("failed to create stream obj, code:%s", tstrerror(terrno));
×
701
    return terrno;
×
702
  }
703

UNCOV
704
  if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
×
705
    // add extra sink
UNCOV
706
    hasExtraSink = true;
×
UNCOV
707
    code = addSinkTask(pMnode, pStream, pEpset);
×
UNCOV
708
    if (code != TSDB_CODE_SUCCESS) {
×
709
      return code;
×
710
    }
711
  }
712

UNCOV
713
  pStream->totalLevel = numOfPlanLevel + hasExtraSink;
×
714

UNCOV
715
  SSubplan* plan = getScanSubPlan(pPlan);  // source plan
×
UNCOV
716
  if (plan == NULL) {
×
717
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
718
    if (terrno != 0) code = terrno;
×
719
    TAOS_RETURN(code);
×
720
  }
721

UNCOV
722
  code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1));
×
UNCOV
723
  if (code != TSDB_CODE_SUCCESS) {
×
724
    return code;
×
725
  }
726

UNCOV
727
  if (numOfPlanLevel == 1) {
×
UNCOV
728
    bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink);
×
UNCOV
729
    if (pStream->conf.fillHistory) {
×
UNCOV
730
      bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink);
×
731
    }
UNCOV
732
    return TDB_CODE_SUCCESS;
×
733
  }
734

UNCOV
735
  if (numOfPlanLevel == 3) {
×
UNCOV
736
    plan = getAggSubPlan(pPlan, 1);  // middle agg plan
×
UNCOV
737
    if (plan == NULL) {
×
738
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
739
      if (terrno != 0) code = terrno;
×
740
      TAOS_RETURN(code);
×
741
    }
742

UNCOV
743
    do {
×
UNCOV
744
      SArray** list = taosArrayGetLast(pStream->tasks);
×
UNCOV
745
      float    size = (float)taosArrayGetSize(*list);
×
UNCOV
746
      size_t   cnt = (size_t)ceil(size / tsStreamAggCnt);
×
UNCOV
747
      if (cnt <= 1) break;
×
748

UNCOV
749
      mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt);
×
UNCOV
750
      code = addNewTaskList(pStream);
×
UNCOV
751
      if (code) {
×
752
        return code;
×
753
      }
754

UNCOV
755
      for (int j = 0; j < cnt; j++) {
×
UNCOV
756
        code = addAggTask(pStream, pMnode, plan, pEpset, false);
×
UNCOV
757
        if (code != TSDB_CODE_SUCCESS) {
×
758
          return code;
×
759
        }
760

UNCOV
761
        bindTwoLevel(pStream->tasks, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
×
UNCOV
762
        if (pStream->conf.fillHistory) {
×
UNCOV
763
          bindTwoLevel(pStream->pHTasksList, j * tsStreamAggCnt, (j + 1) * tsStreamAggCnt);
×
764
        }
765
      }
766
    } while (1);
767
  }
768

UNCOV
769
  plan = getAggSubPlan(pPlan, 0);
×
UNCOV
770
  if (plan == NULL) {
×
771
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
772
    if (terrno != 0) code = terrno;
×
773
    TAOS_RETURN(code);
×
774
  }
775

UNCOV
776
  mDebug("doScheduleStream add final agg");
×
UNCOV
777
  SArray** list = taosArrayGetLast(pStream->tasks);
×
UNCOV
778
  size_t   size = taosArrayGetSize(*list);
×
779

UNCOV
780
  code = addNewTaskList(pStream);
×
UNCOV
781
  if (code) {
×
782
    return code;
×
783
  }
784

UNCOV
785
  code = addAggTask(pStream, pMnode, plan, pEpset, true);
×
UNCOV
786
  if (code != TSDB_CODE_SUCCESS) {
×
787
    TAOS_RETURN(code);
×
788
  }
UNCOV
789
  bindTwoLevel(pStream->tasks, 0, size);
×
UNCOV
790
  if (pStream->conf.fillHistory) {
×
UNCOV
791
    bindTwoLevel(pStream->pHTasksList, 0, size);
×
792
  }
793

UNCOV
794
  bindAggSink(pStream, pMnode, pStream->tasks);
×
UNCOV
795
  if (pStream->conf.fillHistory) {
×
UNCOV
796
    bindAggSink(pStream, pMnode, pStream->pHTasksList);
×
797
  }
UNCOV
798
  TAOS_RETURN(code);
×
799
}
800

UNCOV
801
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) {
×
UNCOV
802
  int32_t     code = 0;
×
UNCOV
803
  SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
×
UNCOV
804
  if (pPlan == NULL) {
×
805
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
806
    TAOS_RETURN(code);
×
807
  }
808

UNCOV
809
  SEpSet mnodeEpset = {0};
×
UNCOV
810
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
×
811

UNCOV
812
  code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList);
×
UNCOV
813
  qDestroyQueryPlan(pPlan);
×
814

UNCOV
815
  TAOS_RETURN(code);
×
816
}
817

UNCOV
818
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
×
UNCOV
819
  int32_t     code = 0;
×
UNCOV
820
  SSdb*       pSdb = pMnode->pSdb;
×
UNCOV
821
  SVgObj*     pVgroup = NULL;
×
UNCOV
822
  SQueryPlan* pPlan = NULL;
×
UNCOV
823
  SSubplan*   pSubplan = NULL;
×
824

UNCOV
825
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
×
UNCOV
826
    pPlan = qStringToQueryPlan(pTopic->physicalPlan);
×
UNCOV
827
    if (pPlan == NULL) {
×
828
      return TSDB_CODE_QRY_INVALID_INPUT;
×
829
    }
UNCOV
830
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
×
UNCOV
831
    SNode* pAst = NULL;
×
UNCOV
832
    code = nodesStringToNode(pTopic->ast, &pAst);
×
UNCOV
833
    if (code != 0) {
×
834
      mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
×
835
      return code;
×
836
    }
837

UNCOV
838
    SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
×
UNCOV
839
    code = qCreateQueryPlan(&cxt, &pPlan, NULL);
×
UNCOV
840
    if (code != 0) {
×
841
      mError("failed to create topic:%s since %s", pTopic->name, terrstr());
×
842
      nodesDestroyNode(pAst);
×
843
      return code;
×
844
    }
UNCOV
845
    nodesDestroyNode(pAst);
×
846
  }
847

UNCOV
848
  if (pPlan) {
×
UNCOV
849
    int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
×
UNCOV
850
    if (levelNum != 1) {
×
851
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
852
      goto END;
×
853
    }
854

UNCOV
855
    SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
×
UNCOV
856
    if (pNodeListNode == NULL){
×
857
      code = TSDB_CODE_OUT_OF_MEMORY;
×
858
      goto END;
×
859
    }
UNCOV
860
    int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
×
UNCOV
861
    if (opNum != 1) {
×
862
      code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
863
      goto END;
×
864
    }
865

UNCOV
866
    pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
×
867
  }
868

UNCOV
869
  void* pIter = NULL;
×
UNCOV
870
  while (1) {
×
UNCOV
871
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
×
UNCOV
872
    if (pIter == NULL) {
×
UNCOV
873
      break;
×
874
    }
875

UNCOV
876
    if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) {
×
UNCOV
877
      sdbRelease(pSdb, pVgroup);
×
UNCOV
878
      continue;
×
879
    }
880

UNCOV
881
    pSub->vgNum++;
×
882

UNCOV
883
    SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
×
UNCOV
884
    if (pVgEp == NULL){
×
885
      code = terrno;
×
886
      goto END;
×
887
    }
UNCOV
888
    pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
×
UNCOV
889
    pVgEp->vgId = pVgroup->vgId;
×
UNCOV
890
    if (taosArrayPush(pSub->unassignedVgs, &pVgEp) == NULL){
×
891
      code = terrno;
×
892
      taosMemoryFree(pVgEp);
×
893
      goto END;
×
894
    }
UNCOV
895
    mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
×
UNCOV
896
    sdbRelease(pSdb, pVgroup);
×
897
  }
898

UNCOV
899
  if (pSubplan) {
×
900
    int32_t msgLen;
901

UNCOV
902
    if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
×
903
      code = TSDB_CODE_QRY_INVALID_INPUT;
×
904
      goto END;
×
905
    }
906
  } else {
UNCOV
907
    pSub->qmsg = taosStrdup("");
×
908
  }
909

UNCOV
910
END:
×
UNCOV
911
  qDestroyQueryPlan(pPlan);
×
UNCOV
912
  return code;
×
913
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc