• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.89
/source/libs/planner/src/planScaleOut.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "planInt.h"
17

18
typedef struct SScaleOutContext {
19
  SPlanContext* pPlanCxt;
20
  int32_t       subplanId;
21
} SScaleOutContext;
22

23
static SLogicSubplan* singleCloneSubLogicPlan(SScaleOutContext* pCxt, SLogicSubplan* pSrc, int32_t level) {
116,421✔
24
  SLogicSubplan* pDst = NULL;
116,421✔
25
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pDst);
116,421✔
26
  if (NULL == pDst) {
116,440!
27
    terrno = code;
×
28
    return NULL;
×
29
  }
30
  pDst->pNode = NULL;
116,440✔
31
  code = nodesCloneNode((SNode*)pSrc->pNode, (SNode**)&pDst->pNode);
116,440✔
32
  if (NULL == pDst->pNode) {
116,438!
33
    terrno = code;
34
    nodesDestroyNode((SNode*)pDst);
×
35
    return NULL;
2✔
36
  }
37
  pDst->subplanType = pSrc->subplanType;
116,441✔
38
  pDst->level = level;
116,441✔
39
  pDst->id.queryId = pSrc->id.queryId;
116,441✔
40
  pDst->id.groupId = pSrc->id.groupId;
116,441✔
41
  pDst->id.subplanId = pCxt->subplanId++;
116,441✔
42
  pDst->processOneBlock = pSrc->processOneBlock;
116,441✔
43
  return pDst;
116,441✔
44
}
45

46
static int32_t doSetScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup, bool* pFound) {
5,332✔
47
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
5,332✔
48
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
2,790✔
49
    pScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
2,790!
50
    if (NULL == pScan->pVgroupList) {
2,790!
51
      return terrno;
×
52
    }
53
    memcpy(pScan->pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
2,790✔
54
    *pFound = true;
2,790✔
55
    return TSDB_CODE_SUCCESS;
2,790✔
56
  } else if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode)) {
2,542!
57
    SDynQueryCtrlLogicNode* pCtrl = (SDynQueryCtrlLogicNode*)pNode;
×
58
    if (DYN_QTYPE_VTB_SCAN == pCtrl->qType) {
×
59
      pCtrl->vtbScan.pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
×
60
      if (NULL == pCtrl->vtbScan.pVgroupList) {
×
61
        return terrno;
×
62
      }
63
      memcpy(pCtrl->vtbScan.pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
×
64
      *pFound = true;
×
65
      return TSDB_CODE_SUCCESS;
×
66
    }
67
  }
68
  SNode* pChild = NULL;
2,542✔
69
  FOREACH(pChild, pNode->pChildren) {
2,542!
70
    int32_t code = doSetScanVgroup((SLogicNode*)pChild, pVgroup, pFound);
2,542✔
71
    if (TSDB_CODE_SUCCESS != code || *pFound) {
2,542!
72
      return code;
2,542✔
73
    }
74
  }
75
  return TSDB_CODE_SUCCESS;
×
76
}
77

78
static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
2,790✔
79
  bool found = false;
2,790✔
80
  return doSetScanVgroup(pNode, pVgroup, &found);
2,790✔
81
}
82

83
static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
1,378✔
84
  int32_t code = TSDB_CODE_SUCCESS;
1,378✔
85
  for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
4,168✔
86
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
2,790✔
87
    if (NULL == pNewSubplan) {
2,790!
88
      return terrno;
×
89
    }
90
    code = setScanVgroup(pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i);
2,790✔
91
    if (TSDB_CODE_SUCCESS == code) {
2,790!
92
      code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
2,790✔
93
    }
94
    if (TSDB_CODE_SUCCESS != code) {
2,790!
95
      break;
×
96
    }
97
  }
98
  return code;
1,378✔
99
}
100

101
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
6,895✔
102
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pSubplan->pNode) &&
6,895!
103
      ((SDynQueryCtrlLogicNode*)pSubplan->pNode)->qType == DYN_QTYPE_VTB_SCAN) {
×
104
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
×
105
  } else {
106
    return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
6,895✔
107
  }
108
}
109

110
static int32_t scaleOutForInsertValues(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level,
105,430✔
111
                                       SNodeList* pGroup) {
112
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
105,430✔
113
  size_t                 numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
105,430✔
114
  int32_t code = 0;
105,427✔
115
  for (int32_t i = 0; i < numOfVgroups; ++i) {
212,134✔
116
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
106,686✔
117
    if (NULL == pNewSubplan) {
106,697!
118
      return terrno;
×
119
    }
120
    ((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
106,697✔
121
    if (TSDB_CODE_SUCCESS != (code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan))) {
106,699!
122
      return code;
×
123
    }
124
  }
125
  return TSDB_CODE_SUCCESS;
105,448✔
126
}
127

128
static int32_t scaleOutForInsert(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
105,440✔
129
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
105,440✔
130
  if (NULL == pNode->node.pChildren) {
105,440✔
131
    return scaleOutForInsertValues(pCxt, pSubplan, level, pGroup);
105,436✔
132
  }
133
  return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
4✔
134
}
135

136
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
105,643✔
137
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
105,643✔
138
  if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
105,643✔
139
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
205✔
140
  }
141
  return scaleOutForInsert(pCxt, pSubplan, level, pGroup);
105,438✔
142
}
143

144
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
6,892✔
145
  if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) {
6,892✔
146
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
1,173✔
147
  } else {
148
    return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
5,719✔
149
  }
150
}
151

152
static int32_t scaleOutForCompute(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
13✔
153
  int32_t code = TSDB_CODE_SUCCESS;
13✔
154
  for (int32_t i = 0; i < pSubplan->numOfComputeNodes; ++i) {
62✔
155
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
49✔
156
    if (NULL == pNewSubplan) {
49!
157
      return terrno;
×
158
    }
159
    code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
49✔
160
    if (TSDB_CODE_SUCCESS != code) {
49!
161
      break;
×
162
    }
163
  }
164
  return code;
13✔
165
}
166

167
static int32_t pushHierarchicalPlanForCompute(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
13✔
168
  SNode*  pChild = NULL;
13✔
169
  SNode*  pParent = NULL;
13✔
170
  int32_t code = TSDB_CODE_SUCCESS;
13✔
171
  if (pParentsGroup->length == pCurrentGroup->length) {  
13!
172
    FORBOTH(pChild, pCurrentGroup, pParent, pParentsGroup) {
62!
173
      code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
49✔
174
      if (TSDB_CODE_SUCCESS == code) {
49!
175
        code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
49✔
176
      }
177
      if (TSDB_CODE_SUCCESS != code) {
49!
178
        break;
×
179
      }
180
    }
181
  } else {
182
    FOREACH(pChild, pCurrentGroup) {
×
183
      SNode* pParent = NULL;
×
184
      FOREACH(pParent, pParentsGroup) {
×
185
        code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
×
186
        if (TSDB_CODE_SUCCESS == code) {
×
187
          code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
×
188
        }
189
      }
190
    }
191
  }
192
  
193
  return code;
13✔
194
}
195

196
static bool isComputeGroup(SNodeList* pGroup) {
113,713✔
197
  if (0 == LIST_LENGTH(pGroup)) {
113,713!
198
    return false;
112,436✔
199
  }
200
  return SUBPLAN_TYPE_COMPUTE == ((SLogicSubplan*)nodesListGetNode(pGroup, 0))->subplanType;
1,277✔
201
}
202

203
static int32_t pushHierarchicalPlanForNormal(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
113,706✔
204
  int32_t code = TSDB_CODE_SUCCESS;
113,706✔
205
  bool    topLevel = (0 == LIST_LENGTH(pParentsGroup));
113,706!
206
  SNode*  pChild = NULL;
113,706✔
207
  FOREACH(pChild, pCurrentGroup) {
230,091!
208
    if (topLevel) {
116,378✔
209
      code = nodesListAppend(pParentsGroup, pChild);
113,805✔
210
    } else {
211
      SNode* pParent = NULL;
2,573✔
212
      FOREACH(pParent, pParentsGroup) {
5,146!
213
        code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
2,573✔
214
        if (TSDB_CODE_SUCCESS == code) {
2,573!
215
          code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
2,573✔
216
        }
217
      }
218
    }
219
    if (TSDB_CODE_SUCCESS != code) {
116,385!
220
      break;
×
221
    }
222
  }
223
  return code;
113,713✔
224
}
225

226
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
113,723✔
227
  if (isComputeGroup(pParentsGroup)) {
113,723✔
228
    return pushHierarchicalPlanForCompute(pParentsGroup, pCurrentGroup);
13✔
229
  }
230
  return pushHierarchicalPlanForNormal(pParentsGroup, pCurrentGroup);
113,706✔
231
}
232

233
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
113,719✔
234
  SNodeList* pCurrentGroup = NULL;
113,719✔
235
  int32_t code = nodesMakeList(&pCurrentGroup);
113,719✔
236
  if (NULL == pCurrentGroup) {
113,735!
237
    return code;
×
238
  }
239

240
  switch (pSubplan->subplanType) {
113,735✔
241
    case SUBPLAN_TYPE_MERGE:
1,170✔
242
      code = scaleOutForMerge(pCxt, pSubplan, level, pCurrentGroup);
1,170✔
243
      break;
1,170✔
244
    case SUBPLAN_TYPE_SCAN:
6,894✔
245
      code = scaleOutForScan(pCxt, pSubplan, level, pCurrentGroup);
6,894✔
246
      break;
6,896✔
247
    case SUBPLAN_TYPE_MODIFY:
105,652✔
248
      code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
105,652✔
249
      break;
105,650✔
250
    case SUBPLAN_TYPE_COMPUTE:
13✔
251
      code = scaleOutForCompute(pCxt, pSubplan, level, pCurrentGroup);
13✔
252
      break;
13✔
253
    default:
6✔
254
      break;
6✔
255
  }
256

257
  if (TSDB_CODE_SUCCESS == code) {
113,735✔
258
    code = pushHierarchicalPlan(pParentsGroup, pCurrentGroup);
113,733✔
259
  }
260

261
  if (TSDB_CODE_SUCCESS == code) {
113,720!
262
    SNode* pChild;
263
    FOREACH(pChild, pSubplan->pChildren) {
115,003✔
264
      code = doScaleOut(pCxt, (SLogicSubplan*)pChild, level + 1, pCurrentGroup);
1,277✔
265
      if (TSDB_CODE_SUCCESS != code) {
1,277!
266
        break;
×
267
      }
268
    }
269
  }
270

271
  if (TSDB_CODE_SUCCESS != code) {
113,720!
272
    nodesDestroyList(pCurrentGroup);
×
273
  } else {
274
    nodesClearList(pCurrentGroup);
113,720✔
275
  }
276

277
  return code;
113,739✔
278
}
279

280
static SQueryLogicPlan* makeQueryLogicPlan() {
112,444✔
281
  SQueryLogicPlan* pLogicPlan = NULL;
112,444✔
282
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN, (SNode**)&pLogicPlan);
112,444✔
283
  if (NULL == pLogicPlan) {
112,460!
284
    terrno = code;
×
285
    return NULL;
×
286
  }
287
  pLogicPlan->pTopSubplans = NULL;
112,460✔
288
  code = nodesMakeList(&pLogicPlan->pTopSubplans);
112,460✔
289
  if (NULL == pLogicPlan->pTopSubplans) {
112,465!
290
    nodesDestroyNode((SNode*)pLogicPlan);
×
291
    terrno = code;
×
292
    return NULL;
×
293
  }
294
  return pLogicPlan;
112,465✔
295
}
296

297
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) {
112,447✔
298
  SQueryLogicPlan* pPlan = makeQueryLogicPlan();
112,447✔
299
  if (NULL == pPlan) {
112,464!
300
    return terrno;
×
301
  }
302

303
  SScaleOutContext cxt = {.pPlanCxt = pCxt, .subplanId = 1};
112,464✔
304
  int32_t          code = doScaleOut(&cxt, pLogicSubplan, 0, pPlan->pTopSubplans);
112,464✔
305
  if (TSDB_CODE_SUCCESS == code) {
112,460!
306
    *pLogicPlan = pPlan;
112,462✔
307
  } else {
308
    nodesDestroyNode((SNode*)pPlan);
309
  }
310

311
  return code;
112,461✔
312
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc