• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.32
/source/libs/planner/src/planScaleOut.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "planInt.h"
17

18
typedef struct SScaleOutContext {
19
  SPlanContext* pPlanCxt;
20
  int32_t       subplanId;
21
} SScaleOutContext;
22

23
static SLogicSubplan* singleCloneSubLogicPlan(SScaleOutContext* pCxt, SLogicSubplan* pSrc, int32_t level) {
3,955,293✔
24
  SLogicSubplan* pDst = NULL;
3,955,293✔
25
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pDst);
3,955,293✔
26
  if (NULL == pDst) {
3,955,307!
27
    terrno = code;
×
28
    return NULL;
×
29
  }
30
  pDst->pNode = NULL;
3,955,307✔
31
  code = nodesCloneNode((SNode*)pSrc->pNode, (SNode**)&pDst->pNode);
3,955,307✔
32
  if (NULL == pDst->pNode) {
3,955,308!
UNCOV
33
    terrno = code;
×
34
    nodesDestroyNode((SNode*)pDst);
×
UNCOV
35
    return NULL;
×
36
  }
37
  pDst->subplanType = pSrc->subplanType;
3,955,309✔
38
  pDst->level = level;
3,955,309✔
39
  pDst->id.queryId = pSrc->id.queryId;
3,955,309✔
40
  pDst->id.groupId = pSrc->id.groupId;
3,955,309✔
41
  pDst->id.subplanId = pCxt->subplanId++;
3,955,309✔
42
  return pDst;
3,955,309✔
43
}
44

45
static int32_t doSetScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup, bool* pFound) {
1,744,895✔
46
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
1,744,895✔
47
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
1,190,215✔
48
    pScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
1,190,215✔
49
    if (NULL == pScan->pVgroupList) {
1,190,211!
50
      return terrno;
×
51
    }
52
    memcpy(pScan->pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
1,190,212✔
53
    *pFound = true;
1,190,212✔
54
    return TSDB_CODE_SUCCESS;
1,190,212✔
55
  }
56
  SNode* pChild = NULL;
554,680✔
57
  FOREACH(pChild, pNode->pChildren) {
554,680!
58
    int32_t code = doSetScanVgroup((SLogicNode*)pChild, pVgroup, pFound);
554,681✔
59
    if (TSDB_CODE_SUCCESS != code || *pFound) {
554,682!
60
      return code;
554,682✔
61
    }
62
  }
63
  return TSDB_CODE_SUCCESS;
×
64
}
65

66
static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
1,190,213✔
67
  bool found = false;
1,190,213✔
68
  return doSetScanVgroup(pNode, pVgroup, &found);
1,190,213✔
69
}
70

71
static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
615,730✔
72
  int32_t code = TSDB_CODE_SUCCESS;
615,730✔
73
  for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
1,805,945✔
74
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
1,190,213✔
75
    if (NULL == pNewSubplan) {
1,190,215!
76
      return terrno;
×
77
    }
78
    code = setScanVgroup(pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i);
1,190,215✔
79
    if (TSDB_CODE_SUCCESS == code) {
1,190,214!
80
      code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
1,190,214✔
81
    }
82
    if (TSDB_CODE_SUCCESS != code) {
1,190,215!
83
      break;
×
84
    }
85
  }
86
  return code;
615,732✔
87
}
88

89
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
839,014✔
90
  return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
839,014✔
91
}
92

93
static int32_t scaleOutForInsertValues(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level,
1,696,622✔
94
                                       SNodeList* pGroup) {
95
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
1,696,622✔
96
  size_t                 numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
1,696,622✔
97
  int32_t code = 0;
1,696,620✔
98
  for (int32_t i = 0; i < numOfVgroups; ++i) {
3,446,824✔
99
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
1,750,202✔
100
    if (NULL == pNewSubplan) {
1,750,205!
101
      return terrno;
×
102
    }
103
    ((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
1,750,205✔
104
    if (TSDB_CODE_SUCCESS != (code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan))) {
1,750,200!
105
      return code;
×
106
    }
107
  }
108
  return TSDB_CODE_SUCCESS;
1,696,622✔
109
}
110

111
static int32_t scaleOutForInsert(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
1,696,791✔
112
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
1,696,791✔
113
  if (NULL == pNode->node.pChildren) {
1,696,791✔
114
    return scaleOutForInsertValues(pCxt, pSubplan, level, pGroup);
1,696,621✔
115
  }
116
  return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
170✔
117
}
118

119
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
1,705,406✔
120
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
1,705,406✔
121
  if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
1,705,406✔
122
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
8,617✔
123
  }
124
  return scaleOutForInsert(pCxt, pSubplan, level, pGroup);
1,696,789✔
125
}
126

127
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
987,827✔
128
  if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) {
987,827✔
129
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
607,114✔
130
  } else {
131
    return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
380,713✔
132
  }
133
}
134

135
static int32_t scaleOutForCompute(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
110,810✔
136
  int32_t code = TSDB_CODE_SUCCESS;
110,810✔
137
  for (int32_t i = 0; i < pSubplan->numOfComputeNodes; ++i) {
286,682✔
138
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
175,871✔
139
    if (NULL == pNewSubplan) {
175,872!
140
      return terrno;
×
141
    }
142
    code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
175,872✔
143
    if (TSDB_CODE_SUCCESS != code) {
175,872!
144
      break;
×
145
    }
146
  }
147
  return code;
110,811✔
148
}
149

150
static int32_t pushHierarchicalPlanForCompute(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
124,065✔
151
  SNode*  pChild = NULL;
124,065✔
152
  SNode*  pParent = NULL;
124,065✔
153
  int32_t code = TSDB_CODE_SUCCESS;
124,065✔
154
  FORBOTH(pChild, pCurrentGroup, pParent, pParentsGroup) {
313,190!
155
    code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
189,125✔
156
    if (TSDB_CODE_SUCCESS == code) {
189,123!
157
      code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
189,123✔
158
    }
159
    if (TSDB_CODE_SUCCESS != code) {
189,125!
160
      break;
×
161
    }
162
  }
163
  return code;
124,065✔
164
}
165

166
static bool isComputeGroup(SNodeList* pGroup) {
3,262,178✔
167
  if (0 == LIST_LENGTH(pGroup)) {
3,262,178!
168
    return false;
2,456,750✔
169
  }
170
  return SUBPLAN_TYPE_COMPUTE == ((SLogicSubplan*)nodesListGetNode(pGroup, 0))->subplanType;
805,428✔
171
}
172

173
static int32_t pushHierarchicalPlanForNormal(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
3,138,113✔
174
  int32_t code = TSDB_CODE_SUCCESS;
3,138,113✔
175
  bool    topLevel = (0 == LIST_LENGTH(pParentsGroup));
3,138,113!
176
  SNode*  pChild = NULL;
3,138,113✔
177
  FOREACH(pChild, pCurrentGroup) {
6,904,303!
178
    if (topLevel) {
3,766,186✔
179
      code = nodesListAppend(pParentsGroup, pChild);
2,510,947✔
180
    } else {
181
      SNode* pParent = NULL;
1,255,239✔
182
      FOREACH(pParent, pParentsGroup) {
2,510,476✔
183
        code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
1,255,237✔
184
        if (TSDB_CODE_SUCCESS == code) {
1,255,235!
185
          code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
1,255,236✔
186
        }
187
      }
188
    }
189
    if (TSDB_CODE_SUCCESS != code) {
3,766,190!
190
      break;
×
191
    }
192
  }
193
  return code;
3,138,117✔
194
}
195

196
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
3,262,178✔
197
  if (isComputeGroup(pParentsGroup)) {
3,262,178✔
198
    return pushHierarchicalPlanForCompute(pParentsGroup, pCurrentGroup);
124,065✔
199
  }
200
  return pushHierarchicalPlanForNormal(pParentsGroup, pCurrentGroup);
3,138,115✔
201
}
202

203
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
3,262,174✔
204
  SNodeList* pCurrentGroup = NULL;
3,262,174✔
205
  int32_t code = nodesMakeList(&pCurrentGroup);
3,262,174✔
206
  if (NULL == pCurrentGroup) {
3,262,181!
207
    return code;
×
208
  }
209

210
  switch (pSubplan->subplanType) {
3,262,181!
211
    case SUBPLAN_TYPE_MERGE:
458,130✔
212
      code = scaleOutForMerge(pCxt, pSubplan, level, pCurrentGroup);
458,130✔
213
      break;
458,130✔
214
    case SUBPLAN_TYPE_SCAN:
987,829✔
215
      code = scaleOutForScan(pCxt, pSubplan, level, pCurrentGroup);
987,829✔
216
      break;
987,831✔
217
    case SUBPLAN_TYPE_MODIFY:
1,705,411✔
218
      code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
1,705,411✔
219
      break;
1,705,408✔
220
    case SUBPLAN_TYPE_COMPUTE:
110,811✔
221
      code = scaleOutForCompute(pCxt, pSubplan, level, pCurrentGroup);
110,811✔
222
      break;
110,811✔
UNCOV
223
    default:
×
UNCOV
224
      break;
×
225
  }
226

227
  if (TSDB_CODE_SUCCESS == code) {
3,262,180!
228
    code = pushHierarchicalPlan(pParentsGroup, pCurrentGroup);
3,262,182✔
229
  }
230

231
  if (TSDB_CODE_SUCCESS == code) {
3,262,187!
232
    SNode* pChild;
233
    FOREACH(pChild, pSubplan->pChildren) {
4,067,617✔
234
      code = doScaleOut(pCxt, (SLogicSubplan*)pChild, level + 1, pCurrentGroup);
805,427✔
235
      if (TSDB_CODE_SUCCESS != code) {
805,427!
236
        break;
×
237
      }
238
    }
239
  }
240

241
  if (TSDB_CODE_SUCCESS != code) {
3,262,187!
242
    nodesDestroyList(pCurrentGroup);
×
243
  } else {
244
    nodesClearList(pCurrentGroup);
3,262,187✔
245
  }
246

247
  return code;
3,262,187✔
248
}
249

250
static SQueryLogicPlan* makeQueryLogicPlan() {
2,456,745✔
251
  SQueryLogicPlan* pLogicPlan = NULL;
2,456,745✔
252
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN, (SNode**)&pLogicPlan);
2,456,745✔
253
  if (NULL == pLogicPlan) {
2,456,751!
254
    terrno = code;
×
255
    return NULL;
×
256
  }
257
  pLogicPlan->pTopSubplans = NULL;
2,456,751✔
258
  code = nodesMakeList(&pLogicPlan->pTopSubplans);
2,456,751✔
259
  if (NULL == pLogicPlan->pTopSubplans) {
2,456,754!
260
    nodesDestroyNode((SNode*)pLogicPlan);
×
261
    terrno = code;
×
262
    return NULL;
1✔
263
  }
264
  return pLogicPlan;
2,456,755✔
265
}
266

267
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) {
2,456,745✔
268
  SQueryLogicPlan* pPlan = makeQueryLogicPlan();
2,456,745✔
269
  if (NULL == pPlan) {
2,456,755!
270
    return terrno;
×
271
  }
272

273
  SScaleOutContext cxt = {.pPlanCxt = pCxt, .subplanId = 1};
2,456,755✔
274
  int32_t          code = doScaleOut(&cxt, pLogicSubplan, 0, pPlan->pTopSubplans);
2,456,755✔
275
  if (TSDB_CODE_SUCCESS == code) {
2,456,760!
276
    *pLogicPlan = pPlan;
2,456,760✔
277
  } else {
278
    nodesDestroyNode((SNode*)pPlan);
×
279
  }
280

281
  return code;
2,456,760✔
282
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc