• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.27
/source/libs/planner/src/planner.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "planner.h"
17

18
#include "planInt.h"
19
#include "scalar.h"
20
#include "tglobal.h"
21

22
static int32_t debugPrintNode(SNode* pNode) {
×
23
  char*   pStr = NULL;
×
24
  int32_t code = nodesNodeToString(pNode, false, &pStr, NULL);
×
25
  if (TSDB_CODE_SUCCESS == code) {
×
26
    (void)printf("%s\n", pStr);
×
27
    taosMemoryFree(pStr);
×
28
  }
29
  return code;
×
30
}
31

32
static int32_t dumpQueryPlan(SQueryPlan* pPlan) {
10,739,500✔
33
  int32_t code = 0;
10,739,500✔
34
  if (!tsQueryPlannerTrace) {
10,739,500!
35
    return code;
10,759,996✔
36
  }
UNCOV
37
  char* pStr = NULL;
×
UNCOV
38
  code = nodesNodeToString((SNode*)pPlan, false, &pStr, NULL);
×
39
  if (TSDB_CODE_SUCCESS == code) {
316!
40
    planDebugL("QID:0x%" PRIx64 ", Query Plan, JsonPlan: %s", pPlan->queryId, pStr);
316!
41
    taosMemoryFree(pStr);
316!
42
  }
43
  return code;
316✔
44
}
45

46
static void printPlanNode(SLogicNode *pNode, int32_t level) {
×
47
  // print pnode and it's child for each level
48
  const char *nodename = nodesNodeName(nodeType((pNode)));
×
49
  for (int32_t i = 0; i < level; i++) {
×
50
    printf("    ");
×
51
  }
52
  printf("%s\n", nodename);
×
53
  SNode *tmp = NULL;
×
54
  FOREACH(tmp, pNode->pChildren) {
×
55
    printPlanNode((SLogicNode *)tmp, level + 1);
×
56
  }
57
  return;
×
58
}
59

60
static void dumpLogicPlan(SLogicSubplan* pLogicSubplan, int32_t level) {
×
61
  for (int32_t i = 0; i < level; i++) {
×
62
    printf("    ");
×
63
  }
64
  printf("Sub Plan:\n");
×
65
  if (pLogicSubplan->pNode) {
×
66
     printPlanNode(pLogicSubplan->pNode, level);
×
67
  }
68
  SNode *pNode = NULL;
×
69
  FOREACH(pNode, pLogicSubplan->pChildren) {
×
70
    dumpLogicPlan((SLogicSubplan *)pNode, level + 2);
×
71
  }
72
  return;
×
73
}
74

75
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
10,775,812✔
76
  SLogicSubplan*   pLogicSubplan = NULL;
10,775,812✔
77
  SQueryLogicPlan* pLogicPlan = NULL;
10,775,812✔
78

79
  int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
10,775,812✔
80
  if (TSDB_CODE_SUCCESS == code) {
10,801,920!
81
    code = createLogicPlan(pCxt, &pLogicSubplan);
10,805,142✔
82
  }
83
  if (TSDB_CODE_SUCCESS == code) {
10,762,797!
84
    code = optimizeLogicPlan(pCxt, pLogicSubplan);
10,768,740✔
85
  }
86
  if (TSDB_CODE_SUCCESS == code) {
10,791,687!
87
    code = splitLogicPlan(pCxt, pLogicSubplan);
10,797,836✔
88
  }
89
  if (TSDB_CODE_SUCCESS == code) {
10,795,245✔
90
    code = scaleOutLogicPlan(pCxt, pLogicSubplan, &pLogicPlan);
10,767,819✔
91
  }
92
  //dumpLogicPlan((SLogicSubplan*)pLogicPlan->pTopSubplans->pHead->pNode, 0);
93
  if (TSDB_CODE_SUCCESS == code) {
10,839,716✔
94
    code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
10,815,337✔
95
  }
96
  if (TSDB_CODE_SUCCESS == code) {
10,826,654✔
97
    code = validateQueryPlan(pCxt, *pPlan);
10,804,972✔
98
  }
99
  if (TSDB_CODE_SUCCESS == code) {
10,829,682✔
100
    code = dumpQueryPlan(*pPlan);
10,812,570✔
101
  }
102
  (void)nodesReleaseAllocator(pCxt->allocatorId);
10,795,557✔
103

104
  nodesDestroyNode((SNode*)pLogicSubplan);
10,765,962✔
105
  nodesDestroyNode((SNode*)pLogicPlan);
10,807,872✔
106
  terrno = code;
10,819,286✔
107
  return code;
10,796,705✔
108
}
109

110
static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) {
4,596,120✔
111
  int32_t code = 0;
4,596,120✔
112
  if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
4,596,120✔
113
    SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
1,601,919✔
114
    if (groupId >= pExchange->srcStartGroupId && groupId <= pExchange->srcEndGroupId) {
1,601,919✔
115
      SNode* pNew = NULL;
1,004,450✔
116
      code = nodesCloneNode((SNode*)pSource, &pNew);
1,004,450✔
117
      if (TSDB_CODE_SUCCESS == code) {
1,004,443!
118
        return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, pNew);
1,004,443✔
119
      }
120
    }
121
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
2,994,201✔
122
    SMergePhysiNode* pMerge = (SMergePhysiNode*)pNode;
317,251✔
123
    if (pMerge->srcGroupId <= groupId && pMerge->srcEndGroupId >= groupId) {
317,251✔
124
      SExchangePhysiNode* pExchange =
125
          (SExchangePhysiNode*)nodesListGetNode(pMerge->node.pChildren, pMerge->numOfChannels - 1);
243,340✔
126
      if (1 == pMerge->numOfChannels) {
243,340✔
127
        pMerge->numOfChannels = LIST_LENGTH(pMerge->node.pChildren);
94,018!
128
      } else {
129
        --(pMerge->numOfChannels);
149,322✔
130
      }
131
      SNode* pNew = NULL;
243,340✔
132
      code = nodesCloneNode((SNode*)pSource, &pNew);
243,340✔
133
      if (TSDB_CODE_SUCCESS == code) {
243,340!
134
        return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, pNew);
243,340✔
135
      }
136
    }
137
  }
138

139
  SNode* pChild = NULL;
3,348,330✔
140
  if (TSDB_CODE_SUCCESS == code) {
3,348,330!
141
    FOREACH(pChild, pNode->pChildren) {
6,696,605✔
142
      if (TSDB_CODE_SUCCESS != (code = setSubplanExecutionNode((SPhysiNode*)pChild, groupId, pSource))) {
3,348,262!
143
        return code;
×
144
      }
145
    }
146
  }
147
  return code;
3,348,327✔
148
}
149

150
int32_t qContinuePlanPostQuery(void* pPostPlan) {
438✔
151
  // TODO
152
  return TSDB_CODE_SUCCESS;
438✔
153
}
154

155
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
1,247,858✔
156
  planDebug("QID:0x%" PRIx64 ", set subplan execution node, groupId:%d", subplan->id.queryId, groupId);
1,247,858✔
157
  return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
1,247,870✔
158
}
159

160
static void clearSubplanExecutionNode(SPhysiNode* pNode) {
15,239✔
161
  if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
15,239✔
162
    SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
540✔
163
    NODES_DESTORY_LIST(pExchange->pSrcEndPoints);
540✔
164
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
14,699!
165
    SMergePhysiNode* pMerge = (SMergePhysiNode*)pNode;
×
166
    pMerge->numOfChannels = LIST_LENGTH(pMerge->node.pChildren);
×
167
    SNode* pChild = NULL;
×
168
    FOREACH(pChild, pMerge->node.pChildren) { NODES_DESTORY_LIST(((SExchangePhysiNode*)pChild)->pSrcEndPoints); }
×
169
  }
170

171
  SNode* pChild = NULL;
15,239✔
172
  FOREACH(pChild, pNode->pChildren) { clearSubplanExecutionNode((SPhysiNode*)pChild); }
22,392✔
173
}
15,239✔
174

175
void qClearSubplanExecutionNode(SSubplan* pSubplan) {
8,086✔
176
  planDebug("QID:0x%" PRIx64 ", clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId);
8,086!
177
  clearSubplanExecutionNode(pSubplan->pNode);
8,086✔
178
}
8,086✔
179

180
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
8,879✔
181
  if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
8,879!
182
    SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
×
183
    *pLen = insert->size;
×
184
    *pStr = insert->pData;
×
185
    insert->pData = NULL;
×
186
    return TSDB_CODE_SUCCESS;
×
187
  }
188
  return nodesNodeToString((const SNode*)pSubplan, false, pStr, pLen);
8,879✔
189
}
190

191
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); }
8,408✔
192

193
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
11,774,678✔
194
  if (NULL == pSubplan) {
11,774,678✔
195
    return terrno = TSDB_CODE_INVALID_PARA;
10✔
196
  }
197
  
198
  if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
11,774,668✔
199
    SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
9,815,356✔
200
    *pLen = insert->size;
9,815,356✔
201
    *pStr = insert->pData;
9,815,356✔
202
    insert->pData = NULL;
9,815,356✔
203
    return TSDB_CODE_SUCCESS;
9,815,356✔
204
  }
205
  return nodesNodeToMsg((const SNode*)pSubplan, pStr, pLen);
1,959,312✔
206
}
207

208
int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan) {
8,457,552✔
209
  return nodesMsgToNode(pStr, len, (SNode**)pSubplan);
8,457,552✔
210
}
211

212
SQueryPlan* qStringToQueryPlan(const char* pStr) {
2,000✔
213
  SQueryPlan* pPlan = NULL;
2,000✔
214
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pStr, (SNode**)&pPlan)) {
2,000!
215
    return NULL;
×
216
  }
217
  return pPlan;
2,000✔
218
}
219

220
void qDestroyQueryPlan(SQueryPlan* pPlan) { nodesDestroyNode((SNode*)pPlan); }
10,825,150✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc