• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.87
/source/libs/planner/src/planScaleOut.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "planInt.h"
17

18
typedef struct SScaleOutContext {
19
  SPlanContext* pPlanCxt;
20
  int32_t       subplanId;
21
} SScaleOutContext;
22

23
static SLogicSubplan* singleCloneSubLogicPlan(SScaleOutContext* pCxt, SLogicSubplan* pSrc, int32_t level) {
166,038✔
24
  SLogicSubplan* pDst = NULL;
166,038✔
25
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pDst);
166,038✔
26
  if (NULL == pDst) {
166,203✔
27
    terrno = code;
×
28
    return NULL;
×
29
  }
30
  pDst->pNode = NULL;
166,203✔
31
  code = nodesCloneNode((SNode*)pSrc->pNode, (SNode**)&pDst->pNode);
166,203✔
32
  if (NULL == pDst->pNode) {
166,130✔
33
    terrno = code;
×
34
    nodesDestroyNode((SNode*)pDst);
×
35
    return NULL;
×
36
  }
37
  pDst->subplanType = pSrc->subplanType;
166,186✔
38
  pDst->level = level;
166,186✔
39
  pDst->id.queryId = pSrc->id.queryId;
166,186✔
40
  pDst->id.groupId = pSrc->id.groupId;
166,186✔
41
  pDst->id.subplanId = pCxt->subplanId++;
166,186✔
42
  pDst->processOneBlock = pSrc->processOneBlock;
166,186✔
43
  return pDst;
166,186✔
44
}
45

46
static int32_t doSetScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup, bool* pFound) {
5,473✔
47
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
5,473✔
48
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
2,862✔
49
    pScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
2,862✔
50
    if (NULL == pScan->pVgroupList) {
2,862✔
51
      return terrno;
×
52
    }
53
    memcpy(pScan->pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
2,862✔
54
    *pFound = true;
2,862✔
55
    return TSDB_CODE_SUCCESS;
2,862✔
56
  } else if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode)) {
2,611✔
57
    SDynQueryCtrlLogicNode* pCtrl = (SDynQueryCtrlLogicNode*)pNode;
×
58
    if (DYN_QTYPE_VTB_SCAN == pCtrl->qType) {
×
59
      pCtrl->vtbScan.pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
×
60
      if (NULL == pCtrl->vtbScan.pVgroupList) {
×
61
        return terrno;
×
62
      }
63
      memcpy(pCtrl->vtbScan.pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
×
64
      *pFound = true;
×
65
      return TSDB_CODE_SUCCESS;
×
66
    }
67
  }
68
  SNode* pChild = NULL;
2,611✔
69
  FOREACH(pChild, pNode->pChildren) {
2,611✔
70
    int32_t code = doSetScanVgroup((SLogicNode*)pChild, pVgroup, pFound);
2,611✔
71
    if (TSDB_CODE_SUCCESS != code || *pFound) {
2,611✔
72
      return code;
2,611✔
73
    }
74
  }
75
  return TSDB_CODE_SUCCESS;
×
76
}
77

78
static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
2,862✔
79
  bool found = false;
2,862✔
80
  return doSetScanVgroup(pNode, pVgroup, &found);
2,862✔
81
}
82

83
static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
1,393✔
84
  int32_t code = TSDB_CODE_SUCCESS;
1,393✔
85
  for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
4,255✔
86
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
2,862✔
87
    if (NULL == pNewSubplan) {
2,862✔
88
      return terrno;
×
89
    }
90
    code = setScanVgroup(pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i);
2,862✔
91
    if (TSDB_CODE_SUCCESS == code) {
2,862✔
92
      code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
2,862✔
93
    }
94
    if (TSDB_CODE_SUCCESS != code) {
2,862✔
95
      break;
×
96
    }
97
  }
98
  return code;
1,393✔
99
}
100

101
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
6,897✔
102
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pSubplan->pNode) &&
6,897✔
103
      ((SDynQueryCtrlLogicNode*)pSubplan->pNode)->qType == DYN_QTYPE_VTB_SCAN) {
×
104
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
×
105
  } else {
106
    return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
6,897✔
107
  }
108
}
109

110
static int32_t scaleOutForInsertValues(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level,
155,000✔
111
                                       SNodeList* pGroup) {
112
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
155,000✔
113
  size_t                 numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
155,000✔
114
  int32_t code = 0;
155,073✔
115
  for (int32_t i = 0; i < numOfVgroups; ++i) {
311,485✔
116
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
156,263✔
117
    if (NULL == pNewSubplan) {
156,352✔
118
      return terrno;
×
119
    }
120
    ((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
156,352✔
121
    if (TSDB_CODE_SUCCESS != (code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan))) {
156,396✔
122
      return code;
×
123
    }
124
  }
125
  return TSDB_CODE_SUCCESS;
155,222✔
126
}
127

128
static int32_t scaleOutForInsert(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
155,015✔
129
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
155,015✔
130
  if (NULL == pNode->node.pChildren) {
155,015✔
131
    return scaleOutForInsertValues(pCxt, pSubplan, level, pGroup);
155,044✔
132
  }
133
  return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
×
134
}
135

136
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
155,243✔
137
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
155,243✔
138
  if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
155,243✔
139
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
203✔
140
  }
141
  return scaleOutForInsert(pCxt, pSubplan, level, pGroup);
155,040✔
142
}
143

144
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
6,896✔
145
  if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) {
6,896✔
146
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
1,190✔
147
  } else {
148
    return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
5,706✔
149
  }
150
}
151

152
static int32_t scaleOutForCompute(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
13✔
153
  int32_t code = TSDB_CODE_SUCCESS;
13✔
154
  for (int32_t i = 0; i < pSubplan->numOfComputeNodes; ++i) {
62✔
155
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
49✔
156
    if (NULL == pNewSubplan) {
49✔
157
      return terrno;
×
158
    }
159
    code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
49✔
160
    if (TSDB_CODE_SUCCESS != code) {
49✔
161
      break;
×
162
    }
163
  }
164
  return code;
13✔
165
}
166

167
static int32_t pushHierarchicalPlanForCompute(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
13✔
168
  SNode*  pChild = NULL;
13✔
169
  SNode*  pParent = NULL;
13✔
170
  int32_t code = TSDB_CODE_SUCCESS;
13✔
171
  if (pParentsGroup->length == pCurrentGroup->length) {  
13✔
172
    FORBOTH(pChild, pCurrentGroup, pParent, pParentsGroup) {
62✔
173
      code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
49✔
174
      if (TSDB_CODE_SUCCESS == code) {
49✔
175
        code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
49✔
176
      }
177
      if (TSDB_CODE_SUCCESS != code) {
49✔
178
        break;
×
179
      }
180
    }
181
  } else {
182
    FOREACH(pChild, pCurrentGroup) {
×
183
      SNode* pParent = NULL;
×
184
      FOREACH(pParent, pParentsGroup) {
×
185
        code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
×
186
        if (TSDB_CODE_SUCCESS == code) {
×
187
          code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
×
188
        }
189
      }
190
    }
191
  }
192
  
193
  return code;
13✔
194
}
195

196
static bool isComputeGroup(SNodeList* pGroup) {
163,312✔
197
  if (0 == LIST_LENGTH(pGroup)) {
163,312✔
198
    return false;
162,018✔
199
  }
200
  return SUBPLAN_TYPE_COMPUTE == ((SLogicSubplan*)nodesListGetNode(pGroup, 0))->subplanType;
1,294✔
201
}
202

203
static int32_t pushHierarchicalPlanForNormal(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
163,317✔
204
  int32_t code = TSDB_CODE_SUCCESS;
163,317✔
205
  bool    topLevel = (0 == LIST_LENGTH(pParentsGroup));
163,317✔
206
  SNode*  pChild = NULL;
163,317✔
207
  FOREACH(pChild, pCurrentGroup) {
329,426✔
208
    if (topLevel) {
166,051✔
209
      code = nodesListAppend(pParentsGroup, pChild);
163,404✔
210
    } else {
211
      SNode* pParent = NULL;
2,647✔
212
      FOREACH(pParent, pParentsGroup) {
5,294✔
213
        code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
2,647✔
214
        if (TSDB_CODE_SUCCESS == code) {
2,647✔
215
          code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
2,647✔
216
        }
217
      }
218
    }
219
    if (TSDB_CODE_SUCCESS != code) {
166,109✔
220
      break;
×
221
    }
222
  }
223
  return code;
163,375✔
224
}
225

226
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
163,320✔
227
  if (isComputeGroup(pParentsGroup)) {
163,320✔
228
    return pushHierarchicalPlanForCompute(pParentsGroup, pCurrentGroup);
13✔
229
  }
230
  return pushHierarchicalPlanForNormal(pParentsGroup, pCurrentGroup);
163,337✔
231
}
232

233
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
163,403✔
234
  SNodeList* pCurrentGroup = NULL;
163,403✔
235
  int32_t code = nodesMakeList(&pCurrentGroup);
163,403✔
236
  if (NULL == pCurrentGroup) {
163,483✔
237
    return code;
×
238
  }
239

240
  switch (pSubplan->subplanType) {
163,483✔
241
    case SUBPLAN_TYPE_MERGE:
1,187✔
242
      code = scaleOutForMerge(pCxt, pSubplan, level, pCurrentGroup);
1,187✔
243
      break;
1,187✔
244
    case SUBPLAN_TYPE_SCAN:
6,908✔
245
      code = scaleOutForScan(pCxt, pSubplan, level, pCurrentGroup);
6,908✔
246
      break;
6,910✔
247
    case SUBPLAN_TYPE_MODIFY:
155,254✔
248
      code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
155,254✔
249
      break;
155,342✔
250
    case SUBPLAN_TYPE_COMPUTE:
13✔
251
      code = scaleOutForCompute(pCxt, pSubplan, level, pCurrentGroup);
13✔
252
      break;
13✔
253
    default:
121✔
254
      break;
121✔
255
  }
256

257
  if (TSDB_CODE_SUCCESS == code) {
163,573✔
258
    code = pushHierarchicalPlan(pParentsGroup, pCurrentGroup);
163,456✔
259
  }
260

261
  if (TSDB_CODE_SUCCESS == code) {
163,364✔
262
    SNode* pChild;
263
    FOREACH(pChild, pSubplan->pChildren) {
164,596✔
264
      code = doScaleOut(pCxt, (SLogicSubplan*)pChild, level + 1, pCurrentGroup);
1,294✔
265
      if (TSDB_CODE_SUCCESS != code) {
1,294✔
266
        break;
×
267
      }
268
    }
269
  }
270

271
  if (TSDB_CODE_SUCCESS != code) {
163,364✔
272
    nodesDestroyList(pCurrentGroup);
×
273
  } else {
274
    nodesClearList(pCurrentGroup);
163,364✔
275
  }
276

277
  return code;
163,459✔
278
}
279

280
static SQueryLogicPlan* makeQueryLogicPlan() {
162,031✔
281
  SQueryLogicPlan* pLogicPlan = NULL;
162,031✔
282
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN, (SNode**)&pLogicPlan);
162,031✔
283
  if (NULL == pLogicPlan) {
162,157✔
284
    terrno = code;
×
285
    return NULL;
×
286
  }
287
  pLogicPlan->pTopSubplans = NULL;
162,157✔
288
  code = nodesMakeList(&pLogicPlan->pTopSubplans);
162,157✔
289
  if (NULL == pLogicPlan->pTopSubplans) {
162,122✔
290
    nodesDestroyNode((SNode*)pLogicPlan);
×
291
    terrno = code;
×
292
    return NULL;
26✔
293
  }
294
  return pLogicPlan;
162,157✔
295
}
296

297
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) {
162,032✔
298
  SQueryLogicPlan* pPlan = makeQueryLogicPlan();
162,032✔
299
  if (NULL == pPlan) {
162,185✔
300
    return terrno;
×
301
  }
302

303
  SScaleOutContext cxt = {.pPlanCxt = pCxt, .subplanId = 1};
162,185✔
304
  int32_t          code = doScaleOut(&cxt, pLogicSubplan, 0, pPlan->pTopSubplans);
162,185✔
305
  if (TSDB_CODE_SUCCESS == code) {
162,169✔
306
    *pLogicPlan = pPlan;
162,184✔
307
  } else {
308
    nodesDestroyNode((SNode*)pPlan);
×
309
  }
310

311
  return code;
162,188✔
312
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc