• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.72
/source/libs/planner/src/planSpliter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "functionMgt.h"
17
#include "planInt.h"
18
#include "taoserror.h"
19
#include "tglobal.h"
20

21
#define SPLIT_FLAG_MASK(n) (1 << n)
22

23
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
24
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
25

26
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
27
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
28

29
typedef struct SSplitContext {
30
  SPlanContext* pPlanCxt;
31
  uint64_t      queryId;
32
  int32_t       groupId;
33
  bool          split;
34
} SSplitContext;
35

36
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
37

38
typedef struct SSplitRule {
39
  char*  pName;
40
  FSplit splitFunc;
41
} SSplitRule;
42

43
typedef struct SFindSplitNodeCtx {
44
  const SSplitContext* pSplitCtx;
45
  const SLogicSubplan* pSubplan;
46
} SFindSplitNodeCtx;
47

48
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
49

50
static int32_t cloneVgroups(SVgroupsInfo **pDst, SVgroupsInfo* pSrc) {
104,466✔
51
  if (pSrc == NULL) {
104,466✔
52
    *pDst = NULL;
1,890✔
53
    return TSDB_CODE_SUCCESS;
1,890✔
54
  }
55
  int32_t len = VGROUPS_INFO_SIZE(pSrc);
102,576✔
56
  *pDst = taosMemoryMalloc(len);
102,576✔
57
  if (NULL == *pDst) {
102,576✔
58
    return terrno;
×
59
  }
60
  memcpy(*pDst, pSrc, len);
102,576✔
61
  return TSDB_CODE_SUCCESS;
102,576✔
62
}
63

64
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput);
65
static int32_t stbSplCreateMergeKeysByExpr(SNode* pExpr, EOrder order, SNodeList** pMergeKeys);
66

67
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
248,116,464✔
68
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
248,116,464✔
69
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
159,615,341✔
70
  } else if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode)) {
88,501,123✔
71
    // do nothing, since virtual table scan node is SUBPLAN_TYPE_MERGE
72
  } else if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode) && ((SDynQueryCtrlLogicNode *)pNode)->qType == DYN_QTYPE_VTB_SCAN) {
88,066,059✔
73
    TSWAP(pSubplan->pVgroupList, ((SDynQueryCtrlLogicNode*)pNode)->vtbScan.pVgroupList);
672✔
74
  } else {
75
    if (1 == LIST_LENGTH(pNode->pChildren)) {
88,065,938✔
76
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
88,065,112✔
77
    }
78
  }
79
}
248,115,851✔
80

81
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
159,816,077✔
82
  SLogicSubplan* pSubplan = NULL;
159,816,077✔
83
  terrno = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan);
159,816,077✔
84
  if (NULL == pSubplan) {
159,834,553✔
85
    return NULL;
×
86
  }
87
  pSubplan->id.queryId = pCxt->queryId;
159,834,553✔
88
  pSubplan->id.groupId = pCxt->groupId;
159,834,049✔
89
  // TODO(smj):refact here.
90
  pSubplan->subplanType = nodeType(pNode) == QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN ? SUBPLAN_TYPE_MERGE : SUBPLAN_TYPE_SCAN;
159,834,002✔
91
  pSubplan->pNode = pNode;
159,834,049✔
92
  pSubplan->pNode->pParent = NULL;
159,832,900✔
93
  splSetSubplanVgroups(pSubplan, pNode);
159,833,451✔
94
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
159,831,252✔
95
  return pSubplan;
159,831,252✔
96
}
97

98
static bool splHasScan(SLogicNode* pNode) {
192,972,328✔
99
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
192,972,328✔
100
    return true;
×
101
  }
102

103
  SNode* pChild = NULL;
192,972,328✔
104
  FOREACH(pChild, pNode->pChildren) {
192,972,328✔
105
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
140,676,985✔
106
      return true;
50,110,608✔
107
    }
108
    return splHasScan((SLogicNode*)pChild);
90,566,377✔
109
  }
110

111
  return false;
52,295,343✔
112
}
113

114
static void splSetSubplanType(SLogicSubplan* pSubplan) {
102,408,886✔
115
  pSubplan->subplanType = splHasScan(pSubplan->pNode) ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
102,408,886✔
116
}
102,415,029✔
117

118
static int32_t splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, SLogicSubplan** ppSubplan) {
60,737,744✔
119
  SLogicSubplan* pSubplan = NULL;
60,737,744✔
120
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan);
60,737,744✔
121
  if (NULL == pSubplan) {
60,743,539✔
122
    return code;
×
123
  }
124
  pSubplan->id.queryId = pCxt->queryId;
60,743,539✔
125
  pSubplan->id.groupId = pCxt->groupId;
60,743,539✔
126
  pSubplan->pNode = pNode;
60,743,539✔
127
  pNode->pParent = NULL;
60,743,539✔
128
  splSetSubplanType(pSubplan);
60,743,539✔
129
  *ppSubplan = pSubplan;
60,744,472✔
130
  return code;
60,744,472✔
131
}
132

133
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
122,503,248✔
134
  SExchangeLogicNode* pExchange = NULL;
122,503,248✔
135
  int32_t code = TSDB_CODE_SUCCESS;
122,503,295✔
136

137
  PLAN_ERR_JRET(nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange));
122,503,295✔
138

139
  pExchange->srcStartGroupId = pCxt->groupId;
122,514,305✔
140
  pExchange->srcEndGroupId = pCxt->groupId;
122,513,707✔
141
  pExchange->node.precision = pChild->precision;
122,513,707✔
142
  pExchange->node.dynamicOp = pChild->dynamicOp;
122,513,156✔
143
  pExchange->node.pTargets = NULL;
122,514,258✔
144
  PLAN_ERR_JRET(nodesCloneList(pChild->pTargets, &pExchange->node.pTargets));
122,513,707✔
145

146
  if (NULL != pChild->pLimit) {
122,516,755✔
147
    pExchange->node.pLimit = NULL;
1,734,384✔
148
    PLAN_ERR_JRET(nodesCloneNode(pChild->pLimit, &pExchange->node.pLimit));
1,734,384✔
149
    if (((SLimitNode*)pChild->pLimit)->limit && ((SLimitNode*)pChild->pLimit)->offset) {
1,729,398✔
150
      ((SLimitNode*)pChild->pLimit)->limit->datum.i += ((SLimitNode*)pChild->pLimit)->offset->datum.i;
422,572✔
151
    }
152
    if (((SLimitNode*)pChild->pLimit)->offset) {
1,729,398✔
153
      ((SLimitNode*)pChild->pLimit)->offset->datum.i = 0;
422,572✔
154
    }
155
  }
156

157
  *pOutput = pExchange;
122,511,769✔
158

159
  return code;
122,511,769✔
160
_return:
×
161
  planError("failed to create exchange node, code:%d", code);
×
162
  nodesDestroyNode((SNode*)pExchange);
×
163
  return code;
×
164
}
165

166
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
82,241,745✔
167
                                               ESubplanType subplanType, bool seqScan) {
168
  SExchangeLogicNode* pExchange = NULL;
82,241,745✔
169
  int32_t             code = TSDB_CODE_SUCCESS;
82,241,745✔
170

171
  PLAN_ERR_JRET(splCreateExchangeNode(pCxt, pSplitNode, &pExchange));
82,241,745✔
172

173
  pExchange->dynTbname = nodeType(pSplitNode) == QUERY_NODE_LOGIC_PLAN_SCAN ? ((SScanLogicNode*)pSplitNode)->phTbnameScan : false;
82,246,550✔
174
  pExchange->seqRecvData = seqScan;
82,246,550✔
175

176
  PLAN_ERR_JRET(replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange));
82,246,550✔
177
  pSubplan->subplanType = subplanType;
82,250,831✔
178

179
  return code;
82,250,831✔
180

181
_return:
×
182
  planError("failed to create exchange node for subplan, code:%d", code);
×
183
  nodesDestroyNode((SNode*)pExchange);
×
184
  return code;
×
185
}
186

187
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
32,170,219✔
188
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
32,170,219✔
189
    return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
22,211,935✔
190
           groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
11,106,268✔
191
  }
192

193
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
21,064,552✔
194
    return ((SMergeLogicNode*)pLogicNode)->srcGroupId <= groupId &&
3,621,678✔
195
           ((SMergeLogicNode*)pLogicNode)->srcEndGroupId >= groupId;
1,800,807✔
196
  }
197

198
  SNode* pChild;
199
  FOREACH(pChild, pLogicNode->pChildren) {
21,442,984✔
200
    bool isChild = splIsChildSubplan((SLogicNode*)pChild, groupId);
19,416,774✔
201
    if (isChild) {
19,416,774✔
202
      return isChild;
17,217,471✔
203
    }
204
  }
205
  return false;
2,026,210✔
206
}
207

208
static int32_t splMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
60,743,411✔
209
  SNode* pChild = NULL;
60,743,411✔
210
  WHERE_EACH(pChild, pChildren) {
73,497,542✔
211
    if (splIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
12,753,785✔
212
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
12,715,189✔
213
      if (TSDB_CODE_SUCCESS == code) {
12,715,104✔
214
        REPLACE_NODE(NULL);
12,715,104✔
215
        ERASE_NODE(pChildren);
12,715,104✔
216
        continue;
12,714,574✔
217
      } else {
218
        return code;
×
219
      }
220
    }
221
    WHERE_NEXT;
39,557✔
222
  }
223
  return TSDB_CODE_SUCCESS;
60,743,757✔
224
}
225

226
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
2,147,483,647✔
227
                           void* pInfo) {
228
  if (!pNode->splitDone && func(pCxt, pSubplan, pNode, pInfo)) {
2,147,483,647✔
229
    return true;
173,323,735✔
230
  }
231
  SNode* pChild;
232
  FOREACH(pChild, pNode->pChildren) {
2,147,483,647✔
233
    if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) {
2,147,483,647✔
234
      return true;
263,636,481✔
235
    }
236
  }
237
  return false;
2,147,483,647✔
238
}
239

240
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
2,147,483,647✔
241
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
2,147,483,647✔
242
    if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) {
2,147,483,647✔
243
      return true;
173,318,424✔
244
    }
245
  }
246
  SNode* pChild;
247
  FOREACH(pChild, pSubplan->pChildren) {
2,147,483,647✔
248
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
2,147,483,647✔
249
      return true;
13,431,117✔
250
    }
251
  }
252
  return false;
2,147,483,647✔
253
}
254

255
static void splSetParent(SLogicNode* pNode) {
54,838,858✔
256
  SNode* pChild = NULL;
54,838,858✔
257
  FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
91,028,543✔
258
}
54,838,858✔
259

260
typedef struct SStableSplitInfo {
261
  SLogicNode*    pSplitNode;
262
  SLogicSubplan* pSubplan;
263
} SStableSplitInfo;
264

265
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
235,021,510✔
266
  SNode* pFunc = NULL;
235,021,510✔
267
  FOREACH(pFunc, pFuncs) {
548,084,881✔
268
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
328,290,860✔
269
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
323,724,478✔
270
      return true;
15,233,562✔
271
    }
272
  }
273
  return false;
219,794,073✔
274
}
275

276
static bool stbSplIsMultiTbScan(SScanLogicNode* pScan) {
684,036,666✔
277
  return ((NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) || pScan->needSplit) &&
1,037,914,838✔
278
         pScan->placeholderType != SP_PARTITION_TBNAME &&
124,667,105✔
279
         pScan->placeholderType != SP_PARTITION_ROWS &&
124,655,897✔
280
         !pScan->phTbnameScan && !pScan->virtualStableScan;
1,368,078,587✔
281
}
282

283
static bool stbSplHasMultiTbScan(SLogicNode* pNode) {
383,468,297✔
284
  if (1 != LIST_LENGTH(pNode->pChildren)) {
383,468,297✔
285
    return false;
13,468,183✔
286
  }
287
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
370,000,036✔
288
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
370,006,052✔
289
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
11,645,140✔
290
      return false;
×
291
    }
292
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
11,645,140✔
293
  }
294
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan((SScanLogicNode*)pChild)) {
369,980,551✔
295
    return true;
53,023,179✔
296
  }
297

298
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
316,970,601✔
299
    if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) || (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) &&
107,557,992✔
300
                                                         ((SWindowLogicNode*)pNode)->winType == WINDOW_TYPE_INTERVAL)) {
28,414,841✔
301
      return ((SScanLogicNode*)pChild)->needSplit;
67,809,335✔
302
    }
303
  }
304
  if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pChild) &&
249,161,428✔
305
      ((SWindowLogicNode*)pChild)->winType == WINDOW_TYPE_EXTERNAL) {
1,086,312✔
306
    return stbSplHasMultiTbScan((SLogicNode*)pChild);
115,776✔
307
  }
308
  return false;
249,045,601✔
309
}
310

311
static bool stbSplIsMultiTbScanChild(SLogicNode* pNode) {
22,506,509✔
312
  if (1 != LIST_LENGTH(pNode->pChildren)) {
22,506,509✔
313
    return false;
×
314
  }
315
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
22,511,257✔
316
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan((SScanLogicNode*)pChild));
22,511,537✔
317
}
318

319
static bool stbSplNeedSplitWindow(SLogicNode* pNode) {
45,033,580✔
320
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
45,033,580✔
321
  if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
45,033,580✔
322
    return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(pNode);
19,745,913✔
323
  }
324

325
  if (WINDOW_TYPE_EXTERNAL == pWindow->winType) {
25,287,667✔
326
    return pWindow->pFuncs && !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(pNode);
4,856,979✔
327
  }
328

329
  if (WINDOW_TYPE_SESSION == pWindow->winType || WINDOW_TYPE_STATE == pWindow->winType || WINDOW_TYPE_COUNT == pWindow->winType || WINDOW_TYPE_EVENT == pWindow->winType) {
20,430,688✔
330
    return stbSplHasMultiTbScan(pNode);
20,430,688✔
331
  }
332

333
  return false;
×
334
}
335

336
static bool stbSplNeedSplitJoin(SJoinLogicNode* pJoin) {
60,196,842✔
337
  if (pJoin->isSingleTableJoin || JOIN_ALGO_HASH == pJoin->joinAlgo) {
60,196,842✔
338
    return false;
47,249,604✔
339
  }
340
  SNode* pChild = NULL;
12,947,238✔
341
  FOREACH(pChild, pJoin->node.pChildren) {
23,662,693✔
342
    if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) && QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pChild)) {
18,312,830✔
343
      return false;
7,597,375✔
344
    }
345
  }
346
  return true;
5,349,863✔
347
}
348

349
static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
34,764,517✔
350
  if (1 != LIST_LENGTH(pNode->pChildren)) {
34,764,517✔
351
    return false;
×
352
  }
353
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
34,766,429✔
354
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
34,766,282✔
355
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
1,028✔
356
      return false;
×
357
    }
358
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
1,028✔
359
  }
360
  return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
34,764,186✔
361
}
362

363
static bool stbSplNeedSplit(SFindSplitNodeCtx* pCtx, SLogicNode* pNode) {
1,705,749,345✔
364
  switch (nodeType(pNode)) {
1,705,749,345✔
365
    case QUERY_NODE_LOGIC_PLAN_SCAN:
508,786,201✔
366
      return stbSplIsMultiTbScan((SScanLogicNode*)pNode);
508,786,201✔
367
    case QUERY_NODE_LOGIC_PLAN_JOIN:
60,199,707✔
368
      return stbSplNeedSplitJoin((SJoinLogicNode*)pNode);
60,199,707✔
369
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
22,510,133✔
370
      return stbSplIsMultiTbScanChild(pNode);
22,510,133✔
371
    case QUERY_NODE_LOGIC_PLAN_AGG:
210,459,799✔
372
      return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
225,417,968✔
373
              isPartTableAgg((SAggLogicNode*)pNode)) &&
455,698,075✔
374
             (stbSplHasMultiTbScan(pNode) && !stbSplIsTableCountQuery(pNode));
233,885,020✔
375
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
44,935,201✔
376
      return stbSplNeedSplitWindow(pNode);
44,935,201✔
377
    case QUERY_NODE_LOGIC_PLAN_SORT:
139,540,214✔
378
      if (1 == LIST_LENGTH(pNode->pChildren)) {
139,540,214✔
379
        SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
139,542,196✔
380
        if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pChild) &&
139,550,121✔
381
            WINDOW_TYPE_EXTERNAL == ((SWindowLogicNode*)pChild)->winType &&
1,115,509✔
382
            !((SWindowLogicNode*)pChild)->calcWithPartition &&
245,413✔
383
            stbSplNeedSplitWindow(pChild)) {
100,440✔
384
          return false;
29,197✔
385
        }
386
      }
387
      return stbSplHasMultiTbScan(pNode);
139,514,142✔
388

389
    default:
719,319,490✔
390
      break;
719,319,490✔
391
  }
392
  return false;
719,319,490✔
393
}
394

395
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,705,757,123✔
396
                                SStableSplitInfo* pInfo) {
397
  SFindSplitNodeCtx ctx = {.pSplitCtx = pCxt, .pSubplan = pSubplan};
1,705,757,123✔
398
  if (stbSplNeedSplit(&ctx, pNode)) {
1,705,757,723✔
399
    pInfo->pSplitNode = pNode;
110,033,235✔
400
    pInfo->pSubplan = pSubplan;
110,033,235✔
401
    return true;
110,032,684✔
402
  }
403
  return false;
1,595,796,228✔
404
}
405

406
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMidFuncs, SNodeList** pMergeFuncs) {
27,164,155✔
407
  SNode* pNode = NULL;
27,164,155✔
408
  FOREACH(pNode, pFuncs) {
70,928,337✔
409
    SFunctionNode* pPartFunc = NULL;
43,758,505✔
410
    SFunctionNode* pMidFunc = NULL;
43,758,505✔
411
    SFunctionNode* pMergeFunc = NULL;
43,758,552✔
412
    int32_t        code = TSDB_CODE_SUCCESS;
43,758,552✔
413

414
    if (nodeType(pNode) != QUERY_NODE_FUNCTION) {
43,758,552✔
415
      planError("%s failed, expect function node in function list, actual nodeType:%d", __FUNCTION__, nodeType(pNode));
×
416
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
417
    } else {
418
      SFunctionNode* pFunc = (SFunctionNode*)pNode;
43,758,465✔
419
      if (fmIsWindowPseudoColumnFunc(pFunc->funcId) || fmIsPlaceHolderFunc(pFunc->funcId)) {
43,758,465✔
420
        code = nodesCloneNode(pNode, (SNode**)&pPartFunc);
1,846,345✔
421
        if (TSDB_CODE_SUCCESS == code) {
1,839,612✔
422
          code = nodesCloneNode(pNode, (SNode**)&pMergeFunc);
1,839,612✔
423
        }
424
        if (TSDB_CODE_SUCCESS == code && pMidFuncs != NULL) {
1,839,612✔
425
          code = nodesCloneNode(pNode, (SNode**)&pMidFunc);
×
426
          if (NULL == pMidFunc) {
×
427
            nodesDestroyNode((SNode*)pMidFunc);
×
428
          }
429
        }
430
      } else {
431
        code = fmGetDistMethod(pFunc, &pPartFunc, &pMidFunc, &pMergeFunc);
41,920,267✔
432
      }
433
    }
434

435
    if (TSDB_CODE_SUCCESS == code) {
43,758,749✔
436
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
43,759,789✔
437
    }
438
    if (TSDB_CODE_SUCCESS == code) {
43,763,223✔
439
      if (pMidFuncs != NULL) {
43,764,358✔
440
        code = nodesListMakeStrictAppend(pMidFuncs, (SNode*)pMidFunc);
×
441
      } else {
442
        nodesDestroyNode((SNode*)pMidFunc);
43,764,358✔
443
      }
444
    }
445
    if (TSDB_CODE_SUCCESS == code) {
43,761,351✔
446
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
43,763,016✔
447
    }
448
    if (TSDB_CODE_SUCCESS != code) {
43,764,182✔
449
      nodesDestroyNode((SNode*)pPartFunc);
×
450
      nodesDestroyNode((SNode*)pMidFunc);
×
451
      nodesDestroyNode((SNode*)pMergeFunc);
×
452
      return code;
×
453
    }
454
  }
455
  return TSDB_CODE_SUCCESS;
27,169,778✔
456
}
457

458
static int32_t stbSplAppendWStart(SNodeList** pFuncs, int32_t* pIndex, uint8_t precision) {
4,139,278✔
459
  int32_t index = 0;
4,139,278✔
460
  SNode*  pFunc = NULL;
4,139,278✔
461
  FOREACH(pFunc, *pFuncs) {
8,395,239✔
462
    if (nodeType(pFunc) == QUERY_NODE_FUNCTION && FUNCTION_TYPE_WSTART == ((SFunctionNode*)pFunc)->funcType) {
5,372,919✔
463
      *pIndex = index;
1,116,958✔
464
      return TSDB_CODE_SUCCESS;
1,116,958✔
465
    }
466
    ++index;
4,255,961✔
467
  }
468

469
  SFunctionNode* pWStart = NULL;
3,022,320✔
470
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pWStart);
3,022,320✔
471
  if (NULL == pWStart) {
3,023,985✔
472
    return code;
×
473
  }
474
  tstrncpy(pWStart->functionName, "_wstart", TSDB_FUNC_NAME_LEN);
3,023,985✔
475
  int64_t pointer = (int64_t)pWStart;
3,023,985✔
476
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
3,023,985✔
477
  int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64, pWStart->functionName, pointer);
3,023,985✔
478
  (void)taosHashBinary(name, len, sizeof(name));
479
  tstrncpy(pWStart->node.aliasName, name, TSDB_COL_NAME_LEN);
3,022,850✔
480
  pWStart->node.resType.precision = precision;
3,022,850✔
481

482
  code = fmGetFuncInfo(pWStart, NULL, 0);
3,022,850✔
483
  if (TSDB_CODE_SUCCESS == code) {
3,022,049✔
484
    code = nodesListMakeStrictAppend(pFuncs, (SNode*)pWStart);
3,022,055✔
485
  }
486
  if (TSDB_CODE_SUCCESS == code) {
3,023,710✔
487
    *pIndex = index;
3,023,710✔
488
  } else {
489
    nodesDestroyNode((SNode*)pWStart);
×
490
  }
491
  return code;
3,023,710✔
492
}
493

494
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
×
495
  int32_t index = 0;
×
496
  SNode*  pFunc = NULL;
×
497
  FOREACH(pFunc, pWin->pFuncs) {
×
498
    if (FUNCTION_TYPE_WEND == ((SFunctionNode*)pFunc)->funcType) {
×
499
      *pIndex = index;
×
500
      return TSDB_CODE_SUCCESS;
×
501
    }
502
    ++index;
×
503
  }
504

505
  SFunctionNode* pWEnd = NULL;
×
506
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pWEnd);
×
507
  if (NULL == pWEnd) {
×
508
    return code;
×
509
  }
510
  tstrncpy(pWEnd->functionName, "_wend", TSDB_FUNC_NAME_LEN);
×
511
  int64_t pointer = (int64_t)pWEnd;
×
512
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
×
513
  int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64, pWEnd->functionName, pointer);
×
514
  (void)taosHashBinary(name, len, sizeof(name));
515
  tstrncpy(pWEnd->node.aliasName, name, TSDB_COL_NAME_LEN);
×
516

517
  code = fmGetFuncInfo(pWEnd, NULL, 0);
×
518
  if (TSDB_CODE_SUCCESS == code) {
×
519
    code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
×
520
  }
521
  *pIndex = index;
×
522
  if (TSDB_CODE_SUCCESS == code) {
×
523
    code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
×
524
  }
525
  return code;
×
526
}
527

528
static int32_t stbSplAppendPlaceHolder(SNodeList* pFuncs, int32_t* pIndex, uint8_t precision, ENodeType winType) {
19,196✔
529
  int32_t index = 0;
19,196✔
530
  SNode*  pFunc = NULL;
19,196✔
531
  FOREACH(pFunc, pFuncs) {
41,447✔
532
    if (FUNCTION_TYPE_TWSTART == ((SFunctionNode*)pFunc)->funcType ||
34,569✔
533
        FUNCTION_TYPE_TPREV_TS == ((SFunctionNode*)pFunc)->funcType ||
22,251✔
534
        FUNCTION_TYPE_TPREV_LOCALTIME == ((SFunctionNode*)pFunc)->funcType ||
22,251✔
535
        FUNCTION_TYPE_TIDLESTART == ((SFunctionNode*)pFunc)->funcType) {
22,251✔
536
      *pIndex = index;
12,318✔
537
      return TSDB_CODE_SUCCESS;
12,318✔
538
    }
539
    ++index;
22,251✔
540
  }
541

542
  int32_t        code = TSDB_CODE_SUCCESS;
6,878✔
543
  bool           needFreeExtra = false;
6,878✔
544
  SNode*         extraValue = NULL;
6,878✔
545
  SFunctionNode* pPlaceHolder = NULL;
6,878✔
546

547
  PLAN_ERR_JRET(nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pPlaceHolder));
6,878✔
548

549
  switch(winType) {
6,878✔
550
    case QUERY_NODE_SLIDING_WINDOW:
210✔
551
      tstrncpy(pPlaceHolder->functionName, "_tprev_ts", TSDB_FUNC_NAME_LEN);
210✔
552
      break;
210✔
553
    case QUERY_NODE_INTERVAL_WINDOW:
6,668✔
554
    case QUERY_NODE_STATE_WINDOW:
555
    case QUERY_NODE_EVENT_WINDOW:
556
    case QUERY_NODE_SESSION_WINDOW:
557
    case QUERY_NODE_COUNT_WINDOW:
558
      tstrncpy(pPlaceHolder->functionName, "_twstart", TSDB_FUNC_NAME_LEN);
6,668✔
559
      break;
6,668✔
560
    case QUERY_NODE_PERIOD_WINDOW:
×
561
      tstrncpy(pPlaceHolder->functionName, "_tprev_localtime", TSDB_FUNC_NAME_LEN);
×
562
      break;
×
563
    default:
×
564
      break;
×
565
  }
566

567
  int64_t pointer = (int64_t)pPlaceHolder;
6,878✔
568
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
6,878✔
569
  int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64, pPlaceHolder->functionName, pointer);
6,878✔
570
  (void)taosHashBinary(name, len, sizeof(name));
571
  tstrncpy(pPlaceHolder->node.aliasName, name, TSDB_COL_NAME_LEN);
6,878✔
572
  pPlaceHolder->node.resType.precision = precision;
6,878✔
573

574
  PLAN_ERR_JRET(fmGetFuncInfo(pPlaceHolder, NULL, 0));
6,878✔
575
  PLAN_ERR_RET(nodesMakeValueNodeFromTimestamp(0, &extraValue));
6,878✔
576
  needFreeExtra = true;
6,878✔
577
  ((SValueNode*)extraValue)->notReserved = true;
6,878✔
578
  PLAN_ERR_JRET(nodesListMakePushFront(&pPlaceHolder->pParameterList, extraValue));
6,878✔
579
  needFreeExtra = false;
6,878✔
580
  PLAN_ERR_JRET(nodesListStrictAppend(pFuncs, (SNode*)pPlaceHolder));
6,878✔
581
  *pIndex = index;
6,878✔
582
  return code;
6,878✔
583
_return:
×
584
  nodesDestroyNode((SNode*)pPlaceHolder);
×
585
  if (needFreeExtra) {
×
586
    nodesDestroyNode(extraValue);
×
587
  }
588
  return code;
×
589
}
590

591
static int32_t stbSplCreatePartWindowNode(SSplitContext* pCxt, SWindowLogicNode* pMergeWindow,
4,153,214✔
592
                                          SLogicNode** pPartWindow, SNodeList** pMergeKeys) {
593
  int32_t    code = TSDB_CODE_SUCCESS;
4,153,214✔
594
  SNodeList* pFunc = pMergeWindow->pFuncs;
4,153,214✔
595
  pMergeWindow->pFuncs = NULL;
4,153,214✔
596
  SNodeList* pTargets = pMergeWindow->node.pTargets;
4,153,214✔
597
  pMergeWindow->node.pTargets = NULL;
4,153,214✔
598
  SNodeList* pChildren = pMergeWindow->node.pChildren;
4,153,214✔
599
  pMergeWindow->node.pChildren = NULL;
4,153,214✔
600
  SNode* pConditions = pMergeWindow->node.pConditions;
4,153,214✔
601
  pMergeWindow->node.pConditions = NULL;
4,153,214✔
602

603
  SWindowLogicNode* pPartWin = NULL;
4,153,214✔
604
  PLAN_ERR_JRET(nodesCloneNode((SNode*)pMergeWindow, (SNode**)&pPartWin));
4,153,214✔
605

606
  pPartWin->node.groupAction = GROUP_ACTION_KEEP;
4,154,555✔
607
  pMergeWindow->node.pTargets = pTargets;
4,154,555✔
608
  pMergeWindow->node.pConditions = pConditions;
4,154,555✔
609
  pPartWin->node.pChildren = pChildren;
4,154,555✔
610
  splSetParent((SLogicNode*)pPartWin);
4,154,555✔
611

612
  int32_t index = -1;
4,153,639✔
613
  int32_t indexExt = -1;
4,153,639✔
614
  const SColumnNode* pMergeTspk = (const SColumnNode*)pMergeWindow->pTspk;
4,153,639✔
615
  PLAN_ERR_JRET(stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, NULL, &pMergeWindow->pFuncs));
4,153,639✔
616
  if (inStreamCalcClause(pCxt->pPlanCxt)) {
4,154,610✔
617
    /**
618
      For stream calc query, we need the _twstart or _tprev_ts placeholder
619
      in the part window to merge part results together.
620
    */
621
    PLAN_ERR_JRET(stbSplAppendPlaceHolder(pPartWin->pFuncs, &indexExt,
19,196✔
622
                                          pMergeTspk->node.resType.precision,
623
                                          pCxt->pPlanCxt->streamCxt.triggerWinType));
624
  }
625
  if (pMergeWindow->winType == WINDOW_TYPE_EXTERNAL && !inStreamCalcClause(pCxt->pPlanCxt)) {
4,154,535✔
626
    /**
627
      For external window query, we still need an explicit _wstart placeholder
628
      on the partial window output so merged external-window aggregation can
629
      bind pTspk to window-start, instead of accidentally using the first
630
      aggregate output column (e.g. count/sum).
631
    */
632
    PLAN_ERR_JRET(stbSplAppendWStart(&pPartWin->pFuncs, &index,
56,643✔
633
                                     pMergeTspk->node.resType.precision));
634
  } else if (!pCxt->pPlanCxt->streamCxt.hasExtWindow) {
4,097,892✔
635
    /**
636
      If the query is not an external window query, we need the _wstart
637
      placeholder for the merged INTERVAL window to do aggregation.
638
    */
639
    PLAN_ERR_JRET(stbSplAppendWStart(&pPartWin->pFuncs, &index,
4,082,540✔
640
                                     pMergeTspk->node.resType.precision));
641
  }
642
  if (index < 0 && indexExt < 0) {
4,155,320✔
643
    planError("%s failed since no pkts placeholder set", __FUNCTION__);
×
644
    code = TSDB_CODE_INTERNAL_ERROR;
×
645
    PLAN_ERR_JRET(code);
×
646
  }
647

648
  PLAN_ERR_JRET(createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets));
4,155,320✔
649
  nodesDestroyNode(pMergeWindow->pTspk);
4,154,280✔
650
  pMergeWindow->pTspk = NULL;
4,154,885✔
651
  if (NULL != pMergeKeys) {
4,154,885✔
652
    /**
653
      Both _twstart and _wstart placeholders should be used as merge keys
654
      for INTERVAL window.
655
    */
656
    if (indexExt >= 0) {
4,071,103✔
657
      PLAN_ERR_JRET(stbSplCreateMergeKeysByExpr(nodesListGetNode(pPartWin->node.pTargets, indexExt),
19,196✔
658
                                                pMergeWindow->node.outputTsOrder, pMergeKeys));
659
    }
660
    if (index >= 0) {
4,071,103✔
661
      PLAN_ERR_JRET(stbSplCreateMergeKeysByExpr(nodesListGetNode(pPartWin->node.pTargets, index),
4,055,506✔
662
                                                pMergeWindow->node.outputTsOrder, pMergeKeys));
663
    }
664
  }
665

666
  int32_t indexPkts = index >= 0 ? index: indexExt;
4,154,355✔
667
  PLAN_ERR_JRET(nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, indexPkts),
4,154,355✔
668
                               &pMergeWindow->pTspk));
669

670
  nodesDestroyList(pFunc);
4,155,235✔
671
  *pPartWindow = (SLogicNode*)pPartWin;
4,155,235✔
672

673
  return code;
4,155,235✔
674
_return:
×
675
  nodesDestroyNode((SNode*)pPartWin);
×
676
  return code;
×
677
}
678

679
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
55,572,678✔
680
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
55,572,678✔
681
    return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
37,829,335✔
682
  } else {
683
    if (1 == LIST_LENGTH(pNode->pChildren)) {
17,743,343✔
684
      return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
17,750,626✔
685
    }
686
  }
687
  return 0;
×
688
}
689

690
static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* pNode) {
37,829,211✔
691
  int32_t code = TSDB_CODE_SUCCESS;
37,829,211✔
692
  pMerge->node.inputTsOrder = pNode->outputTsOrder;
37,829,211✔
693
  pMerge->node.outputTsOrder = pNode->outputTsOrder;
37,829,211✔
694

695
  switch (nodeType(pNode)) {
37,829,211✔
696
    case QUERY_NODE_LOGIC_PLAN_PROJECT: {
936,123✔
697
      SProjectLogicNode *pLogicNode = (SProjectLogicNode*)pNode;
936,123✔
698
      if (pLogicNode->ignoreGroupId && (pMerge->node.pLimit || pMerge->node.pSlimit)) {
936,123✔
699
        pMerge->ignoreGroupId = true;
443✔
700
        pLogicNode->ignoreGroupId = false;
443✔
701
      }
702
      break;
936,123✔
703
    }
704
    case QUERY_NODE_LOGIC_PLAN_WINDOW: {
4,071,018✔
705
      SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
4,071,018✔
706
      if (pMerge->node.pLimit) {
4,071,018✔
707
        nodesDestroyNode(pMerge->node.pLimit);
227,855✔
708
        pMerge->node.pLimit = NULL;
227,855✔
709
      }
710
      if (pMerge->node.pSlimit) {
4,071,018✔
711
        nodesDestroyNode(pMerge->node.pSlimit);
×
712
        pMerge->node.pSlimit = NULL;
×
713
      }
714
      break;
4,071,018✔
715
    }
716
    case QUERY_NODE_LOGIC_PLAN_SORT: {
9,020,334✔
717
      SSortLogicNode* pSort = (SSortLogicNode*)pNode;
9,020,334✔
718
      if (pSort->calcGroupId) pMerge->inputWithGroupId = true;
9,020,334✔
719
      break;
9,020,334✔
720
    }
721
    default:
23,801,736✔
722
      break;
23,801,736✔
723
  }
724

725
  return code;
37,829,211✔
726
}
727

728
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
37,824,661✔
729
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort, bool needSort) {
730
  SMergeLogicNode* pMerge = NULL;
37,824,661✔
731
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE, (SNode**)&pMerge);
37,824,661✔
732
  if (NULL == pMerge) {
37,831,655✔
733
    return code;
×
734
  }
735
  pMerge->needSort = needSort;
37,831,655✔
736
  pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
37,831,655✔
737
  pMerge->srcGroupId = pCxt->groupId;
37,830,221✔
738
  pMerge->srcEndGroupId = pCxt->groupId;
37,830,221✔
739
  pMerge->node.precision = pPartChild->precision;
37,830,221✔
740
  pMerge->node.dynamicOp = pSplitNode->dynamicOp;
37,830,221✔
741
  if (!pMerge->node.dynamicOp && NULL != pSplitNode->pParent) {
37,830,221✔
742
    pMerge->node.dynamicOp = pSplitNode->pParent->dynamicOp;
30,778,371✔
743
  }
744
  pMerge->pMergeKeys = pMergeKeys;
37,830,221✔
745
  pMerge->groupSort = groupSort;
37,830,221✔
746
  pMerge->numOfSubplans = 1;
37,830,221✔
747

748
  pMerge->pInputs = NULL;
37,830,221✔
749
  code = nodesCloneList(pPartChild->pTargets, &pMerge->pInputs);
37,830,221✔
750
  if (TSDB_CODE_SUCCESS == code) {
37,832,177✔
751
    // NULL != pSubplan means 'merge node' replaces 'split node'.
752
    if (NULL == pSubplan) {
37,832,262✔
753
      code = nodesCloneList(pPartChild->pTargets, &pMerge->node.pTargets);
4,200,387✔
754
    } else {
755
      code = nodesCloneList(pSplitNode->pTargets, &pMerge->node.pTargets);
33,631,875✔
756
    }
757
  }
758
  if (TSDB_CODE_SUCCESS == code && NULL != pSplitNode->pLimit) {
37,832,462✔
759
    pMerge->node.pLimit = NULL;
3,639,426✔
760
    code = nodesCloneNode(pSplitNode->pLimit, &pMerge->node.pLimit);
3,639,426✔
761
    if (((SLimitNode*)pSplitNode->pLimit)->limit && ((SLimitNode*)pSplitNode->pLimit)->offset) {
3,639,426✔
762
      ((SLimitNode*)pSplitNode->pLimit)->limit->datum.i += ((SLimitNode*)pSplitNode->pLimit)->offset->datum.i;
1,227,432✔
763
    }
764
    if (((SLimitNode*)pSplitNode->pLimit)->offset) {
3,639,426✔
765
      ((SLimitNode*)pSplitNode->pLimit)->offset->datum.i = 0;
1,227,432✔
766
    }
767
  }
768
  if (TSDB_CODE_SUCCESS == code) {
37,832,462✔
769
    code = stbSplRewriteFromMergeNode(pMerge, pSplitNode);
37,830,542✔
770
  }
771
  if (TSDB_CODE_SUCCESS == code) {
37,830,900✔
772
    if (NULL == pSubplan) {
37,829,745✔
773
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
4,199,517✔
774
    } else {
775
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
33,630,228✔
776
    }
777
  }
778
  if (TSDB_CODE_SUCCESS != code) {
37,831,831✔
779
    nodesDestroyNode((SNode*)pMerge);
×
780
  }
781
  return code;
37,831,206✔
782
}
783

784
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
22,738,532✔
785
  SExchangeLogicNode* pExchange = NULL;
22,738,532✔
786
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
22,738,532✔
787
  if (TSDB_CODE_SUCCESS == code) {
22,741,377✔
788
    pExchange->node.pParent = pParent;
22,742,613✔
789
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
22,742,613✔
790
  }
791
  return code;
22,743,240✔
792
}
793

794
static int32_t stbSplCreateMergeKeysByExpr(SNode* pExpr, EOrder order, SNodeList** pMergeKeys) {
27,713,948✔
795
  SOrderByExprNode* pOrderByExpr = NULL;
27,713,948✔
796
  int32_t code = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExpr);
27,713,948✔
797
  if (NULL == pOrderByExpr) {
27,718,039✔
798
    return code;
×
799
  }
800
  pOrderByExpr->pExpr = NULL;
27,718,039✔
801
  code = nodesCloneNode(pExpr, &pOrderByExpr->pExpr);
27,718,039✔
802
  if (NULL == pOrderByExpr->pExpr) {
27,718,751✔
803
    nodesDestroyNode((SNode*)pOrderByExpr);
×
804
    return code;
×
805
  }
806
  pOrderByExpr->order = order;
27,718,951✔
807
  pOrderByExpr->nullOrder = (order == ORDER_ASC) ? NULL_ORDER_FIRST : NULL_ORDER_LAST;
27,718,951✔
808
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pOrderByExpr);
27,718,951✔
809
}
810

811
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
22,929,865✔
812
  return stbSplCreateMergeKeysByExpr(pPrimaryKey, order, pMergeKeys);
22,929,865✔
813
}
814

815
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
4,069,432✔
816
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pInfo->pSplitNode;
4,069,432✔
817
  if (pWindow->winType == WINDOW_TYPE_EXTERNAL) {
4,069,432✔
818
    if (!pWindow->pFuncs) {
71,020✔
819
      // only have projection in external window.
820
      return TSDB_CODE_SUCCESS;
×
821
    }
822
  }
823
  SLogicNode* pPartWindow = NULL;
4,069,432✔
824
  SNodeList*  pMergeKeys = NULL;
4,069,432✔
825
  int32_t     code = stbSplCreatePartWindowNode(pCxt, pWindow,
4,069,432✔
826
                                                &pPartWindow, &pMergeKeys);
827
  if (TSDB_CODE_SUCCESS == code) {
4,071,453✔
828
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = ((SWindowLogicNode*)pInfo->pSplitNode)->winType == WINDOW_TYPE_INTERVAL ? INTERVAL_ALGO_HASH : EXTERNAL_ALGO_HASH;
4,071,358✔
829
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = ((SWindowLogicNode*)pInfo->pSplitNode)->winType == WINDOW_TYPE_INTERVAL ? INTERVAL_ALGO_MERGE : EXTERNAL_ALGO_MERGE;
4,071,358✔
830
    code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true, true);
4,071,358✔
831
    if (TSDB_CODE_SUCCESS != code) {
4,071,538✔
832
      nodesDestroyList(pMergeKeys);
×
833
    }
834
  }
835
  SLogicSubplan* pSplitSubPlan = NULL;
4,071,633✔
836
  if (TSDB_CODE_SUCCESS == code) {
4,071,633✔
837
    pSplitSubPlan = splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT);
4,070,933✔
838
    if (!pSplitSubPlan) code = terrno;
4,070,913✔
839
  }
840
  if (code == TSDB_CODE_SUCCESS) {
4,071,613✔
841
    SNode* pNode;
842
    SMergeLogicNode* pMerge = (SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode;
4,070,913✔
843
    SWindowLogicNode* pWindow = (SWindowLogicNode*)pInfo->pSplitNode;
4,070,913✔
844
    if (LIST_LENGTH(pWindow->pTsmaSubplans) > 0) {
4,070,913✔
845
      FOREACH(pNode, pWindow->pTsmaSubplans) {
154,524✔
846
        ++(pCxt->groupId);
83,782✔
847
        SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
83,782✔
848
        pSubplan->id.groupId = pCxt->groupId;
83,782✔
849
        pSubplan->id.queryId = pCxt->queryId;
83,782✔
850
        //pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
851
        splSetSubplanVgroups(pSubplan, pSubplan->pNode);
83,782✔
852
        code = stbSplCreatePartWindowNode(pCxt, (SWindowLogicNode*)pSubplan->pNode, &pPartWindow, NULL);
83,782✔
853
        if (TSDB_CODE_SUCCESS == code) {
83,782✔
854
          nodesDestroyNode((SNode*)pSubplan->pNode);
83,782✔
855
          pSubplan->pNode = pPartWindow;
83,782✔
856
        }
857
      }
858
      code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pWindow->pTsmaSubplans);
70,742✔
859
      pMerge->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
70,742✔
860
    }
861
    pMerge->srcEndGroupId = pCxt->groupId;
4,070,913✔
862
  }
863
  if (code == TSDB_CODE_SUCCESS) {
4,071,613✔
864
    code = nodesListMakePushFront(&pInfo->pSubplan->pChildren, (SNode*)pSplitSubPlan);
4,069,873✔
865
  }
866
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
4,070,583✔
867
  ++(pCxt->groupId);
4,070,583✔
868
  return code;
4,070,583✔
869
}
870

871
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
3,868,400✔
872
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
3,868,400✔
873
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
3,859,268✔
874
    pScan->scanType = SCAN_TYPE_TABLE_MERGE;
3,859,268✔
875
    pScan->filesetDelimited = true;
3,859,268✔
876
    if (NULL != pScan->pGroupTags) {
3,859,268✔
877
      pScan->groupSort = true;
18,800✔
878
    }
879
  } else {
880
    if (1 == LIST_LENGTH(pNode->pChildren)) {
9,472✔
881
      stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
9,652✔
882
    }
883
  }
884
}
3,868,400✔
885

886
static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
3,857,611✔
887
  SLogicNode* pWindow = pInfo->pSplitNode;
3,857,611✔
888
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
3,857,611✔
889

890
  SNodeList* pMergeKeys = NULL;
3,858,564✔
891
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk,
3,858,564✔
892
                                                      ((SWindowLogicNode*)pWindow)->node.inputTsOrder, &pMergeKeys);
893

894
  if (TSDB_CODE_SUCCESS == code) {
3,860,243✔
895
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true, true);
3,860,243✔
896
  }
897

898
  if (TSDB_CODE_SUCCESS == code) {
3,859,614✔
899
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
3,859,964✔
900
                                     (SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
3,859,889✔
901
  }
902

903
  if (TSDB_CODE_SUCCESS == code) {
3,860,674✔
904
    stbSplSetTableMergeScan(pChild);
3,860,674✔
905
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
3,859,268✔
906
    //SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
907
    ++(pCxt->groupId);
3,859,268✔
908
  } else {
909
    nodesDestroyList(pMergeKeys);
×
910
  }
911

912
  return code;
3,859,353✔
913
}
914

915
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
7,894,162✔
916
  SWindowLogicNode* pWin = (SWindowLogicNode*)pInfo->pSplitNode;
7,894,162✔
917
  SNode*            pChild = nodesListGetNode(pWin->node.pChildren, 0);
7,894,162✔
918

919
  if (pChild && nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_WINDOW &&
7,896,685✔
920
      ((SWindowLogicNode*)pChild)->winType == WINDOW_TYPE_EXTERNAL) {
×
921
    ((SWindowLogicNode*)pChild)->needGroupSort = true;
×
922
  }
923

924
  if (pWin->winType == WINDOW_TYPE_EXTERNAL) {
7,896,685✔
925
    pWin->extWinSplit = true;
71,020✔
926
    pWin->needGroupSort = pWin->calcWithPartition;
71,020✔
927
  }
928

929
  switch (pWin->winType) {
7,896,685✔
930
    case WINDOW_TYPE_INTERVAL:
4,037,767✔
931
    case WINDOW_TYPE_EXTERNAL:
932
      return stbSplSplitIntervalForBatch(pCxt, pInfo);
4,037,767✔
933
    case WINDOW_TYPE_SESSION:
3,859,094✔
934
    case WINDOW_TYPE_STATE:
935
    case WINDOW_TYPE_EVENT:
936
    case WINDOW_TYPE_COUNT:
937
    case WINDOW_TYPE_ANOMALY:
938
      return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
3,859,094✔
939
    default:
×
940
      break;
×
941
  }
942
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
943
}
944

945
static bool stbSplNeedSeqRecvData(SLogicNode* pNode) {
3,800,300✔
946
  if (NULL == pNode) {
3,800,300✔
947
    return false;
1,189,529✔
948
  }
949

950
  if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
2,610,771✔
951
    return true;
118,409✔
952
  }
953
  return stbSplNeedSeqRecvData(pNode->pParent);
2,492,457✔
954
}
955

956
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1,307,843✔
957
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_FILL == nodeType(pInfo->pSplitNode->pParent)) {
1,307,843✔
958
    pInfo->pSplitNode = pInfo->pSplitNode->pParent;
56,201✔
959
  }
960
  SExchangeLogicNode* pExchange = NULL;
1,307,843✔
961
  int32_t             code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
1,307,843✔
962
  if (TSDB_CODE_SUCCESS == code && pExchange) {
1,307,847✔
963
    code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
1,307,752✔
964
  }
965
  if (TSDB_CODE_SUCCESS == code && pExchange) {
1,307,942✔
966
    pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange);
1,307,847✔
967
    pExchange->dynTbname = false;
1,307,938✔
968
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1,307,938✔
969
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
1,307,938✔
970
  }
971
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1,307,938✔
972
  ++(pCxt->groupId);
1,307,938✔
973
  return code;
1,307,938✔
974
}
975

976
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
9,202,005✔
977
  if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) &&
9,202,005✔
978
      (LIST_LENGTH(((SWindowLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) == 0)) {
1,315,250✔
979
    return stbSplSplitWindowForPartTable(pCxt, pInfo);
1,307,752✔
980
  } else {
981
    return stbSplSplitWindowForCrossTable(pCxt, pInfo);
7,895,283✔
982
  }
983
}
984

985
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
23,008,696✔
986
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
23,008,696✔
987
  pMergeAgg->pAggFuncs = NULL;
23,008,098✔
988
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
23,008,649✔
989
  pMergeAgg->pGroupKeys = NULL;
23,008,696✔
990
  SNodeList* pTargets = pMergeAgg->node.pTargets;
23,008,602✔
991
  pMergeAgg->node.pTargets = NULL;
23,008,051✔
992
  SNodeList* pChildren = pMergeAgg->node.pChildren;
23,008,696✔
993
  pMergeAgg->node.pChildren = NULL;
23,008,696✔
994
  SNode* pConditions = pMergeAgg->node.pConditions;
23,007,500✔
995
  pMergeAgg->node.pConditions = NULL;
23,008,098✔
996

997
  SAggLogicNode* pPartAgg = NULL;
23,008,649✔
998
  int32_t        code = TSDB_CODE_SUCCESS;
23,008,649✔
999
  int32_t        lino = 0;
23,008,649✔
1000

1001
  PLAN_ERR_JRET(nodesCloneNode((SNode*)pMergeAgg, (SNode**)&pPartAgg));
23,008,649✔
1002

1003
  pPartAgg->node.groupAction = GROUP_ACTION_KEEP;
23,010,005✔
1004

1005
  if (NULL != pGroupKeys) {
23,010,052✔
1006
    pPartAgg->pGroupKeys = pGroupKeys;
9,602,181✔
1007
    PLAN_ERR_JRET(createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets));
9,602,181✔
1008
    pMergeAgg->pGroupKeys = NULL;
9,604,849✔
1009
    PLAN_ERR_JRET(nodesCloneList(pPartAgg->node.pTargets, &pMergeAgg->pGroupKeys));
9,604,849✔
1010
  }
1011

1012
  pMergeAgg->node.pConditions = pConditions;
23,012,109✔
1013
  pMergeAgg->node.pTargets = pTargets;
23,012,156✔
1014
  pPartAgg->node.pChildren = pChildren;
23,012,156✔
1015
  splSetParent((SLogicNode*)pPartAgg);
23,012,156✔
1016

1017
  PLAN_ERR_JRET(stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, NULL, &pMergeAgg->pAggFuncs));
23,012,459✔
1018

1019
  PLAN_ERR_JRET(createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets));
23,013,934✔
1020

1021
  nodesDestroyList(pFunc);
23,014,049✔
1022

1023
  *pOutput = (SLogicNode*)pPartAgg;
23,012,582✔
1024

1025
  return code;
23,012,031✔
1026
_return:
×
1027
  if (code) {
×
1028
    planError("%s failed at line %d, code: %d", __func__, lino, code);
×
1029
    nodesDestroyNode((SNode*)pPartAgg);
×
1030
  }
1031
  nodesDestroyList(pFunc);
×
1032
  return code;
×
1033
}
1034

1035
static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
13,235,779✔
1036
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE, false);
13,235,779✔
1037
  if (TSDB_CODE_SUCCESS == code) {
13,235,779✔
1038
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
13,235,779✔
1039
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
13,235,779✔
1040
  }
1041
  ++(pCxt->groupId);
13,235,779✔
1042
  return code;
13,235,779✔
1043
}
1044

1045

1046
/**
1047
 * @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes.
1048
 *        For agg + partition, results are sorted by group id, use group sort.
1049
 *        For agg + sort for group, results are sorted by partition keys, not group id, merges keys should be the same
1050
 *            as partition keys
1051
 */
1052
static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* pInfo, SLogicNode* pChildAgg) {
16,318✔
1053
  bool       groupSort = true;
16,318✔
1054
  SNodeList* pMergeKeys = NULL;
16,318✔
1055
  int32_t    code = TSDB_CODE_SUCCESS;
16,318✔
1056
  bool       sortForGroup = false;
16,318✔
1057

1058
  if (pChildAgg->pChildren->length != 1) return TSDB_CODE_TSC_INTERNAL_ERROR;
16,318✔
1059

1060
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pChildAgg->pChildren, 0);
16,318✔
1061
  if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_SORT) {
16,318✔
1062
    SSortLogicNode* pSort = (SSortLogicNode*)pChild;
×
1063
    if (pSort->calcGroupId) {
×
1064
      SNode *node, *node2;
1065
      groupSort = false;
×
1066
      sortForGroup = true;
×
1067
      SNodeList* extraAggFuncs = NULL;
×
1068
      uint32_t   originalLen = LIST_LENGTH(pSort->node.pTargets), idx = 0;
×
1069
      code = stbSplCreateMergeKeys(pSort->pSortKeys, pSort->node.pTargets, &pMergeKeys);
×
1070
      if (TSDB_CODE_SUCCESS != code) return code;
×
1071

1072
      // Create group_key func for all sort keys.
1073
      // We only need newly added nodes in pSort.node.pTargets when stbSplCreateMergeKeys
1074
      FOREACH(node, pSort->node.pTargets) {
×
1075
        if (idx++ < originalLen) continue;
×
1076
        SFunctionNode* pGroupKeyFunc = createGroupKeyAggFunc((SColumnNode*)node);
×
1077
        if (!pGroupKeyFunc) {
×
1078
          code = terrno;
×
1079
          break;
×
1080
        }
1081
        code = nodesListMakeStrictAppend(&extraAggFuncs, (SNode*)pGroupKeyFunc);
×
1082
        if (code != TSDB_CODE_SUCCESS) {
×
1083
          nodesDestroyNode((SNode*)pGroupKeyFunc);
×
1084
        }
1085
      }
1086

1087
      if (TSDB_CODE_SUCCESS == code) {
×
1088
        // add these extra group_key funcs into targets
1089
        code = createColumnByRewriteExprs(extraAggFuncs, &pChildAgg->pTargets);
×
1090
      }
1091
      if (code == TSDB_CODE_SUCCESS) {
×
1092
        code = nodesListAppendList(((SAggLogicNode*)pChildAgg)->pAggFuncs, extraAggFuncs);
×
1093
        extraAggFuncs = NULL;
×
1094
      }
1095

1096
      if (code == TSDB_CODE_SUCCESS) {
×
1097
        FOREACH(node, pMergeKeys) {
×
1098
          SOrderByExprNode* pOrder = (SOrderByExprNode*)node;
×
1099
          SColumnNode*      pCol = (SColumnNode*)pOrder->pExpr;
×
1100
          FOREACH(node2, ((SAggLogicNode*)pChildAgg)->pAggFuncs) {
×
1101
            SFunctionNode* pFunc = (SFunctionNode*)node2;
×
1102
            if (0 != strcmp(pFunc->functionName, "_group_key")) continue;
×
1103
            SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1104
            if (!nodesEqualNode(pParam, (SNode*)pCol)) continue;
×
1105

1106
            // use the colName of group_key func to make sure finding the right slot id for merge keys.
1107
            tstrncpy(pCol->colName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
×
1108
            tstrncpy(pCol->node.aliasName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
×
1109
            memset(pCol->tableAlias, 0, TSDB_TABLE_NAME_LEN);
×
1110
            break;
×
1111
          }
1112
        }
1113
      }
1114
      if (TSDB_CODE_SUCCESS != code) {
×
1115
        nodesDestroyList(pMergeKeys);
×
1116
        nodesDestroyList(extraAggFuncs);
×
1117
      }
1118
    }
1119
  }
1120
  if (TSDB_CODE_SUCCESS == code) {
16,318✔
1121
    code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort, true);
16,318✔
1122
  }
1123
  if (TSDB_CODE_SUCCESS == code && sortForGroup) {
16,318✔
1124
    SMergeLogicNode* pMerge =
1125
        (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
×
1126
    pMerge->inputWithGroupId = true;
×
1127
  }
1128
  return code;
16,318✔
1129
}
1130

1131
static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
112,796✔
1132
  SLogicNode*      pPartAgg = NULL;
112,796✔
1133
  bool             hasExchange = false;
112,796✔
1134
  SMergeLogicNode* pMergeNode = NULL;
112,796✔
1135
  SLogicSubplan*   pFirstScanSubplan = NULL;
112,796✔
1136
  int32_t          code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
112,796✔
1137

1138
  if (TSDB_CODE_SUCCESS == code) {
112,796✔
1139
    if (pInfo->pSplitNode->forceCreateNonBlockingOptr) {
112,796✔
1140
      code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
×
1141
    } else {
1142
      hasExchange = true;
112,796✔
1143
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, false, false);
112,796✔
1144
    }
1145
    pMergeNode =
1146
        (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
112,796✔
1147
  } else {
1148
    nodesDestroyNode((SNode*)pPartAgg);
×
1149
  }
1150

1151
  if (code == TSDB_CODE_SUCCESS) {
112,796✔
1152
    pFirstScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
112,796✔
1153
    if (!pFirstScanSubplan) code = terrno;
112,796✔
1154
  }
1155

1156
  if (code == TSDB_CODE_SUCCESS) {
112,796✔
1157
    SNode* pNode;
1158
    SAggLogicNode* pAgg = (SAggLogicNode*)pInfo->pSplitNode;
112,796✔
1159
    if (LIST_LENGTH(pAgg->pTsmaSubplans) > 0) {
112,796✔
1160
      FOREACH(pNode, pAgg->pTsmaSubplans) {
255,258✔
1161
        ++(pCxt->groupId);
142,462✔
1162
        SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
142,462✔
1163
        pSubplan->id.groupId = pCxt->groupId;
142,462✔
1164
        pSubplan->id.queryId = pCxt->queryId;
142,462✔
1165
        //pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
1166
        splSetSubplanVgroups(pSubplan, pSubplan->pNode);
142,462✔
1167
        code = stbSplCreatePartAggNode((SAggLogicNode*)pSubplan->pNode, &pPartAgg);
142,462✔
1168
        if (code) break;
142,462✔
1169
        nodesDestroyNode((SNode*)pSubplan->pNode);
142,462✔
1170
        pSubplan->pNode = pPartAgg;
142,462✔
1171
      }
1172
      code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pAgg->pTsmaSubplans);
112,796✔
1173
      pMergeNode->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
112,796✔
1174
    }
1175
    pMergeNode->srcEndGroupId = pCxt->groupId;
112,796✔
1176
  }
1177

1178
  if (code == TSDB_CODE_SUCCESS) {
112,796✔
1179
    code = nodesListMakeAppend(&pInfo->pSubplan->pChildren, (SNode*)pFirstScanSubplan);
112,796✔
1180
  }
1181

1182
  if (code && pFirstScanSubplan) {
112,796✔
1183
    nodesDestroyNode((SNode*)pFirstScanSubplan);
×
1184
  }
1185

1186
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
112,796✔
1187
  ++(pCxt->groupId);
112,796✔
1188
  return code;
112,796✔
1189
}
1190

1191
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
21,399,045✔
1192
  SLogicNode* pPartAgg = NULL;
21,399,045✔
1193
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
21,399,092✔
1194
  if (TSDB_CODE_SUCCESS == code) {
21,401,897✔
1195
    // if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
1196
    if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
21,403,032✔
1197
      code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
16,318✔
1198
    else {
1199
      code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
21,386,163✔
1200
    }
1201
  } else {
1202
    nodesDestroyNode((SNode*)pPartAgg);
×
1203
  }
1204

1205
  SLogicSubplan* pScanSubplan = NULL;
21,403,541✔
1206
  if (TSDB_CODE_SUCCESS == code) {
21,403,541✔
1207
    pScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
21,403,806✔
1208
    if (!pScanSubplan) code = terrno;
21,403,159✔
1209
  }
1210

1211
  if (code == TSDB_CODE_SUCCESS) {
21,402,894✔
1212
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)pScanSubplan);
21,403,880✔
1213
  }
1214

1215
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
21,403,302✔
1216
  ++(pCxt->groupId);
21,403,853✔
1217
  return code;
21,403,255✔
1218
}
1219

1220
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
34,747,211✔
1221
  if (LIST_LENGTH(((SAggLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) > 0) {
34,747,211✔
1222
    return stbSplSplitAggNodeForCrossTableMulSubplan(pCxt, pInfo);
112,796✔
1223
  }
1224
  if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
34,634,509✔
1225
    return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
13,235,779✔
1226
  }
1227
  return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
21,398,059✔
1228
}
1229

1230
static int32_t stbSplCreateColumnNode(SExprNode* pExpr, SNode** ppNode) {
2,005,874✔
1231
  SColumnNode* pCol = NULL;
2,005,874✔
1232
  int32_t code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
2,005,874✔
1233
  if (NULL == pCol) {
2,006,214✔
1234
    return code;
×
1235
  }
1236
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
2,006,214✔
1237
    tstrncpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName, TSDB_DB_NAME_LEN);
1,910,184✔
1238
    tstrncpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName, TSDB_TABLE_NAME_LEN);
1,910,184✔
1239
    tstrncpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias, TSDB_TABLE_NAME_LEN);
1,910,184✔
1240
    tstrncpy(pCol->colName, ((SColumnNode*)pExpr)->colName, TSDB_COL_NAME_LEN);
1,910,184✔
1241
  } else {
1242
    tstrncpy(pCol->colName, pExpr->aliasName, TSDB_COL_NAME_LEN);
96,030✔
1243
  }
1244
  tstrncpy(pCol->node.aliasName, pExpr->aliasName, TSDB_COL_NAME_LEN);
2,006,214✔
1245
  tstrncpy(pCol->node.userAlias, pExpr->userAlias, TSDB_COL_NAME_LEN);
2,006,214✔
1246
  pCol->node.resType = pExpr->resType;
2,006,214✔
1247
  *ppNode = (SNode*)pCol;
2,006,214✔
1248
  return code;
2,006,214✔
1249
}
1250

1251
static int32_t stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol, SNode** ppNode) {
11,160,548✔
1252
  SOrderByExprNode* pOutput = NULL;
11,160,548✔
1253
  int32_t code = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOutput);
11,160,548✔
1254
  if (NULL == pOutput) {
11,162,986✔
1255
    return code;
×
1256
  }
1257
  pOutput->pExpr = NULL;
11,162,986✔
1258
  code = nodesCloneNode(pCol, &pOutput->pExpr);
11,162,986✔
1259
  if (NULL == pOutput->pExpr) {
11,163,237✔
1260
    nodesDestroyNode((SNode*)pOutput);
×
1261
    return code;
×
1262
  }
1263
  pOutput->order = pSortKey->order;
11,163,492✔
1264
  pOutput->nullOrder = pSortKey->nullOrder;
11,163,492✔
1265
  *ppNode = (SNode*)pOutput;
11,163,492✔
1266
  return code;
11,163,492✔
1267
}
1268

1269
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
9,017,870✔
1270
  int32_t    code = TSDB_CODE_SUCCESS;
9,017,870✔
1271
  SNodeList* pMergeKeys = NULL;
9,017,870✔
1272
  SNode*     pNode = NULL;
9,017,870✔
1273
  FOREACH(pNode, pSortKeys) {
20,179,359✔
1274
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
11,160,789✔
1275
    SExprNode*        pSortExpr = (SExprNode*)pSortKey->pExpr;
11,160,789✔
1276
    SNode*            pTarget = NULL;
11,160,789✔
1277
    bool              found = false;
11,160,789✔
1278
    FOREACH(pTarget, pTargets) {
31,084,426✔
1279
      if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) || 
19,922,937✔
1280
          (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
10,767,055✔
1281
        SNode* pNew = NULL;
9,157,411✔
1282
        code = stbSplCreateOrderByExpr(pSortKey, pTarget, &pNew);
9,157,411✔
1283
        if (TSDB_CODE_SUCCESS == code) {
9,157,118✔
1284
          code = nodesListMakeStrictAppend(&pMergeKeys, pNew);
9,157,458✔
1285
        }
1286
        if (TSDB_CODE_SUCCESS != code) {
9,157,648✔
1287
          break;
×
1288
        }
1289
        found = true;
9,157,648✔
1290
      }
1291
    }
1292
    if (TSDB_CODE_SUCCESS == code && !found) {
11,161,489✔
1293
      SNode* pCol = NULL;
2,006,214✔
1294
      code = stbSplCreateColumnNode(pSortExpr, &pCol);
2,006,214✔
1295
      if (TSDB_CODE_SUCCESS == code) {
2,006,214✔
1296
        SNode* pNew = NULL;
2,006,214✔
1297
        code = stbSplCreateOrderByExpr(pSortKey, pCol, &pNew);
2,006,214✔
1298
        if (TSDB_CODE_SUCCESS == code) {
2,006,214✔
1299
          code = nodesListMakeStrictAppend(&pMergeKeys, pNew);
2,006,214✔
1300
        }
1301
      }
1302
      if (TSDB_CODE_SUCCESS == code) {
2,006,214✔
1303
        code = nodesListStrictAppend(pTargets, pCol);
2,006,214✔
1304
      } else {
1305
        nodesDestroyNode(pCol);
×
1306
      }
1307
    }
1308
    if (TSDB_CODE_SUCCESS != code) {
11,161,489✔
1309
      break;
×
1310
    }
1311
  }
1312
  if (TSDB_CODE_SUCCESS == code) {
9,018,570✔
1313
    *pOutput = pMergeKeys;
9,021,805✔
1314
  } else {
1315
    nodesDestroyList(pMergeKeys);
×
1316
  }
1317
  return code;
9,021,615✔
1318
}
1319

1320
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
9,017,172✔
1321
                                        SNodeList** pOutputMergeKeys) {
1322
  SNodeList* pSortKeys = pSort->pSortKeys;
9,017,172✔
1323
  pSort->pSortKeys = NULL;
9,017,172✔
1324
  SNodeList* pChildren = pSort->node.pChildren;
9,017,172✔
1325
  pSort->node.pChildren = NULL;
9,017,172✔
1326

1327
  int32_t         code = TSDB_CODE_SUCCESS;
9,017,172✔
1328
  SSortLogicNode* pPartSort = NULL;
9,017,172✔
1329
  code = nodesCloneNode((SNode*)pSort, (SNode**)&pPartSort);
9,017,172✔
1330

1331
  SNodeList* pMergeKeys = NULL;
9,020,484✔
1332
  if (TSDB_CODE_SUCCESS == code) {
9,020,484✔
1333
    pPartSort->node.pChildren = pChildren;
9,020,569✔
1334
    splSetParent((SLogicNode*)pPartSort);
9,020,569✔
1335
    pPartSort->pSortKeys = pSortKeys;
9,021,275✔
1336
    pPartSort->groupSort = pSort->groupSort;
9,021,275✔
1337
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
9,021,275✔
1338
  }
1339

1340
  if (TSDB_CODE_SUCCESS == code) {
9,021,700✔
1341
    *pOutputPartSort = (SLogicNode*)pPartSort;
9,021,700✔
1342
    *pOutputMergeKeys = pMergeKeys;
9,021,700✔
1343
  } else {
1344
    nodesDestroyNode((SNode*)pPartSort);
×
1345
    nodesDestroyList(pMergeKeys);
×
1346
  }
1347

1348
  return code;
9,021,704✔
1349
}
1350

1351
static void stbSplSetScanPartSort(SLogicNode* pNode) {
87,694✔
1352
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
87,694✔
1353
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
43,847✔
1354
    if (NULL != pScan->pGroupTags) {
43,847✔
1355
      pScan->groupSort = true;
43,847✔
1356
    }
1357
  } else {
1358
    if (1 == LIST_LENGTH(pNode->pChildren)) {
43,847✔
1359
      stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
43,847✔
1360
    }
1361
  }
1362
}
87,694✔
1363

1364
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
9,017,077✔
1365
  SLogicNode* pPartSort = NULL;
9,017,077✔
1366
  SNodeList*  pMergeKeys = NULL;
9,017,077✔
1367
  bool        groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
9,017,077✔
1368
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
9,017,077✔
1369
  if (TSDB_CODE_SUCCESS == code) {
9,021,704✔
1370
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort, true);
9,021,704✔
1371
  }
1372
  if (TSDB_CODE_SUCCESS == code) {
9,021,275✔
1373
    nodesDestroyNode((SNode*)pInfo->pSplitNode);
9,021,366✔
1374
    pInfo->pSplitNode = NULL;
9,021,631✔
1375
    if (groupSort) {
9,021,631✔
1376
      stbSplSetScanPartSort(pPartSort);
43,847✔
1377
    }
1378
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
9,019,784✔
1379
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
9,021,631✔
1380
  }
1381
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
9,022,161✔
1382
  ++(pCxt->groupId);
9,022,161✔
1383
  return code;
9,022,161✔
1384
}
1385

1386
static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) {
42,619,189✔
1387
  *pSplitNode = pInfo->pSplitNode;
42,619,189✔
1388
  if (NULL != pInfo->pSplitNode->pParent && 
42,619,189✔
1389
      QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
42,486,245✔
1390
      NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit && 
33,068,920✔
1391
      !((SProjectLogicNode*)pInfo->pSplitNode->pParent)->inputIgnoreGroup) {
33,064,113✔
1392
    *pSplitNode = pInfo->pSplitNode->pParent;
32,814,239✔
1393
    if (NULL != pInfo->pSplitNode->pLimit) {
32,814,239✔
1394
      (*pSplitNode)->pLimit = NULL;
1,575,277✔
1395
      int32_t code = nodesCloneNode(pInfo->pSplitNode->pLimit, &(*pSplitNode)->pLimit);
1,575,277✔
1396
      if (NULL == (*pSplitNode)->pLimit) {
1,575,277✔
1397
        return code;
×
1398
      }
1399
      if (((SLimitNode*)pInfo->pSplitNode->pLimit)->limit && ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset) {
1,575,277✔
1400
        ((SLimitNode*)pInfo->pSplitNode->pLimit)->limit->datum.i += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset->datum.i;
371,540✔
1401
      }
1402
      if (((SLimitNode*)pInfo->pSplitNode->pLimit)->offset) {
1,575,277✔
1403
        ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset->datum.i = 0;
371,540✔
1404
      }
1405
    }
1406
  }
1407
  return TSDB_CODE_SUCCESS;
42,619,189✔
1408
}
1409

1410
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
41,667,436✔
1411
  SLogicNode* pSplitNode = NULL;
41,667,436✔
1412
  int32_t     code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
41,667,436✔
1413
  if (TSDB_CODE_SUCCESS == code) {
41,675,078✔
1414
    code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, pInfo->pSubplan->subplanType, false);
41,675,527✔
1415
  }
1416
  if (TSDB_CODE_SUCCESS == code) {
41,672,961✔
1417
    splSetSubplanType(pInfo->pSubplan);
41,674,055✔
1418
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
41,674,372✔
1419
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
41,671,781✔
1420
  }
1421
  ++(pCxt->groupId);
41,676,565✔
1422
  return code;
41,676,565✔
1423
}
1424

1425
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
952,198✔
1426
  SLogicNode* pSplitNode = NULL;
952,198✔
1427
  int32_t     code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
952,198✔
1428
  if (TSDB_CODE_SUCCESS == code) {
952,473✔
1429
    bool needSort = true;
952,473✔
1430
    if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pSplitNode) && !pSplitNode->pLimit && !pSplitNode->pSlimit) {
952,473✔
1431
      needSort = !((SProjectLogicNode*)pSplitNode)->ignoreGroupId;
935,237✔
1432
    }
1433
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, needSort, needSort);
952,473✔
1434
  }
1435
  if (TSDB_CODE_SUCCESS == code) {
952,463✔
1436
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
952,283✔
1437
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
952,463✔
1438
  }
1439
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
952,643✔
1440
  ++(pCxt->groupId);
952,643✔
1441
  return code;
952,643✔
1442
}
1443

1444
static int32_t stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan, SNode** ppNode) {
19,072,699✔
1445
  bool   find = false;
19,072,699✔
1446
  SNode* pCol = NULL;
19,072,699✔
1447
  FOREACH(pCol, pScan->pScanCols) {
26,064,514✔
1448
    if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
26,064,854✔
1449
      find = true;
19,073,039✔
1450
      break;
19,073,039✔
1451
    }
1452
  }
1453
  if (!find) {
19,072,699✔
1454
    *ppNode = NULL;
×
1455
    return TSDB_CODE_SUCCESS;
×
1456
  }
1457
  SNode* pTarget = NULL;
19,072,699✔
1458
  FOREACH(pTarget, pScan->node.pTargets) {
26,064,514✔
1459
    if (nodesEqualNode(pTarget, pCol)) {
26,064,854✔
1460
      *ppNode = pCol;
19,073,308✔
1461
      return TSDB_CODE_SUCCESS;
19,073,308✔
1462
    }
1463
  }
1464
  SNode* pNew = NULL;
×
1465
  int32_t code = nodesCloneNode(pCol, &pNew);
×
1466
  if (TSDB_CODE_SUCCESS == code) {
×
1467
    code = nodesListStrictAppend(pScan->node.pTargets, pNew);
×
1468
  }
1469
  if (TSDB_CODE_SUCCESS == code) {
×
1470
    *ppNode = pCol;
×
1471
  }
1472
  return code;
×
1473
}
1474

1475
static int32_t stbSplFindPkFromScan(SScanLogicNode* pScan, SNode** ppNode) {
18,651,079✔
1476
  int32_t code = 0;
18,651,079✔
1477
  bool   find = false;
18,651,079✔
1478
  SNode* pCol = NULL;
18,651,079✔
1479
  FOREACH(pCol, pScan->pScanCols) {
58,180,864✔
1480
    if (((SColumnNode*)pCol)->isPk) {
40,239,261✔
1481
      find = true;
709,476✔
1482
      break;
709,476✔
1483
    }
1484
  }
1485
  if (!find) {
18,651,079✔
1486
    *ppNode = NULL;
17,942,133✔
1487
    return code;
17,942,133✔
1488
  }
1489
  SNode* pTarget = NULL;
709,476✔
1490
  FOREACH(pTarget, pScan->node.pTargets) {
1,219,776✔
1491
    if (nodesEqualNode(pTarget, pCol)) {
1,219,776✔
1492
      *ppNode = pCol;
709,476✔
1493
      return code;
709,476✔
1494
    }
1495
  }
1496
  SNode* pNew = NULL;
×
1497
  code = nodesCloneNode(pCol, &pNew);
×
1498
  if (TSDB_CODE_SUCCESS == code) {
×
1499
    code = nodesListStrictAppend(pScan->node.pTargets, pNew);
×
1500
  }
1501
  if (TSDB_CODE_SUCCESS == code) {
×
1502
    *ppNode = pCol;
×
1503
  }
1504
  return code;
×
1505
}
1506

1507
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
18,650,369✔
1508
                                         SNodeList** pOutputMergeKeys) {
1509
  SNodeList* pChildren = pScan->node.pChildren;
18,650,369✔
1510
  pScan->node.pChildren = NULL;
18,650,369✔
1511

1512
  int32_t         code = TSDB_CODE_SUCCESS;
18,650,369✔
1513
  SScanLogicNode* pMergeScan = NULL;
18,650,369✔
1514
  code = nodesCloneNode((SNode*)pScan, (SNode**)&pMergeScan);
18,650,369✔
1515

1516
  SNodeList* pMergeKeys = NULL;
18,651,524✔
1517
  if (TSDB_CODE_SUCCESS == code) {
18,651,524✔
1518
    pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
18,651,269✔
1519
    pMergeScan->filesetDelimited = true;
18,651,269✔
1520
    pMergeScan->node.pChildren = pChildren;
18,651,269✔
1521
    splSetParent((SLogicNode*)pMergeScan);
18,651,269✔
1522

1523
    SNode* pTs = NULL;
18,651,609✔
1524
    code = stbSplFindPrimaryKeyFromScan(pMergeScan, &pTs);
18,651,609✔
1525
    if (TSDB_CODE_SUCCESS == code) {
18,651,603✔
1526
      code = stbSplCreateMergeKeysByPrimaryKey(pTs, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
18,650,978✔
1527
    }
1528
    SNode* pPk = NULL;
18,652,054✔
1529
    if (TSDB_CODE_SUCCESS == code) {
18,652,054✔
1530
      code = stbSplFindPkFromScan(pMergeScan, &pPk);
18,651,429✔
1531
    }
1532
    if (TSDB_CODE_SUCCESS == code && NULL != pPk) {
18,652,319✔
1533
      code = stbSplCreateMergeKeysByExpr(pPk, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
709,476✔
1534
    }
1535
  }
1536

1537
  if (TSDB_CODE_SUCCESS == code) {
18,651,239✔
1538
    *pOutputMergeScan = (SLogicNode*)pMergeScan;
18,651,239✔
1539
    *pOutputMergeKeys = pMergeKeys;
18,651,239✔
1540
  } else {
1541
    nodesDestroyNode((SNode*)pMergeScan);
×
1542
    nodesDestroyList(pMergeKeys);
×
1543
  }
1544

1545
  return code;
18,651,334✔
1546
}
1547

1548
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan,
18,650,369✔
1549
                                        bool groupSort, SStableSplitInfo* pInfo) {
1550
  SLogicNode* pMergeScan = NULL;
18,650,369✔
1551
  SNodeList*  pMergeKeys = NULL;
18,650,369✔
1552
  int32_t     code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys);
18,650,369✔
1553
  if (TSDB_CODE_SUCCESS == code) {
18,651,429✔
1554
    if (NULL != pMergeScan->pLimit) {
18,651,524✔
1555
      if (((SLimitNode*)pMergeScan->pLimit)->limit && ((SLimitNode*)pMergeScan->pLimit)->offset) {
1,520,165✔
1556
        ((SLimitNode*)pMergeScan->pLimit)->limit->datum.i += ((SLimitNode*)pMergeScan->pLimit)->offset->datum.i;
738,130✔
1557
      }
1558
      if (((SLimitNode*)pMergeScan->pLimit)->offset) {
1,520,165✔
1559
        ((SLimitNode*)pMergeScan->pLimit)->offset->datum.i = 0;
738,130✔
1560
      }
1561
    }
1562
    code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort, true);
18,651,524✔
1563
  }
1564
  if (TSDB_CODE_SUCCESS == code) {
18,651,144✔
1565
    if ((void*)pInfo->pSplitNode == (void*)pScan) {
18,651,239✔
1566
      pInfo->pSplitNode = NULL;
7,932,309✔
1567
    }
1568
    nodesDestroyNode((SNode*)pScan);
18,651,239✔
1569
    code = nodesListMakeStrictAppend(&pSubplan->pChildren,
18,651,344✔
1570
                                     (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
18,651,609✔
1571
  }
1572
  ++(pCxt->groupId);
18,651,609✔
1573
  return code;
18,651,609✔
1574
}
1575

1576
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
50,552,839✔
1577
  SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
50,552,839✔
1578
  if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) {
50,552,839✔
1579
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
7,932,309✔
1580
    return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan, true, pInfo);
7,932,309✔
1581
  }
1582
  if (NULL != pScan->pGroupTags) {
42,620,530✔
1583
    return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
952,378✔
1584
  }
1585
  return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
41,668,152✔
1586
}
1587

1588
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin, SStableSplitInfo* pInfo) {
5,368,092✔
1589
  int32_t code = TSDB_CODE_SUCCESS;
5,368,092✔
1590
  SNode*  pChild = NULL;
5,368,092✔
1591
  FOREACH(pChild, pJoin->node.pChildren) {
16,106,331✔
1592
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
10,737,519✔
1593
      //if (pJoin->node.dynamicOp) {
1594
      //  code = TSDB_CODE_SUCCESS;
1595
      //} else {
1596
        code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, pJoin->grpJoin ? true : false, pInfo);
10,718,580✔
1597
      //}
1598
    } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
18,939✔
1599
      code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild, pInfo);
18,939✔
1600
    } else {
1601
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1602
    }
1603
    if (TSDB_CODE_SUCCESS != code) {
10,738,239✔
1604
      break;
×
1605
    }
1606
  }
1607
  return code;
5,368,812✔
1608
}
1609

1610
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
5,349,153✔
1611
  int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode, pInfo);
5,349,153✔
1612
  if (TSDB_CODE_SUCCESS == code) {
5,350,128✔
1613
    //if (!pInfo->pSplitNode->dynamicOp) {
1614
      pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
5,350,033✔
1615
    //}
1616
    //SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
1617
    pInfo->pSplitNode->splitDone = true;
5,350,033✔
1618
  }
1619
  return code;
5,350,128✔
1620
}
1621

1622
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
421,705✔
1623
  SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0);
421,705✔
1624
  SNode*          pPK = NULL;
421,705✔
1625
  SNode*          pPrimaryKey = NULL;
421,705✔
1626
  int32_t code = stbSplFindPrimaryKeyFromScan(pScan, &pPK);
421,705✔
1627
  if (TSDB_CODE_SUCCESS == code) {
421,705✔
1628
    code = nodesCloneNode(pPK, &pPrimaryKey);
421,705✔
1629
  }
1630
  if (NULL == pPrimaryKey) {
421,705✔
1631
    return code;
×
1632
  }
1633
  code = nodesListStrictAppend(pPart->pTargets, pPrimaryKey);
421,705✔
1634
  if (TSDB_CODE_SUCCESS == code) {
421,705✔
1635
    code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys);
421,705✔
1636
  }
1637
  return code;
421,705✔
1638
}
1639

1640
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1,144,903✔
1641
  int32_t    code = TSDB_CODE_SUCCESS;
1,144,903✔
1642
  SNodeList* pMergeKeys = NULL;
1,144,903✔
1643
  if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) {
1,144,903✔
1644
    code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys);
421,705✔
1645
  }
1646
  if (TSDB_CODE_SUCCESS == code) {
1,144,903✔
1647
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true, true);
1,144,988✔
1648
  }
1649
  if (TSDB_CODE_SUCCESS == code) {
1,145,073✔
1650
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1,145,243✔
1651
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
1,145,158✔
1652
  }
1653
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1,145,243✔
1654
  ++(pCxt->groupId);
1,145,243✔
1655
  return code;
1,145,243✔
1656
}
1657

1658
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,126,157✔
1659
  if (pCxt->pPlanCxt->rSmaQuery) {
548,126,157✔
1660
    return TSDB_CODE_SUCCESS;
×
1661
  }
1662

1663
  SStableSplitInfo info = {0};
548,122,829✔
1664
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
548,126,197✔
1665
    return TSDB_CODE_SUCCESS;
438,206,628✔
1666
  }
1667

1668
  int32_t code = TSDB_CODE_SUCCESS;
109,998,478✔
1669
  switch (nodeType(info.pSplitNode)) {
109,998,478✔
1670
    case QUERY_NODE_LOGIC_PLAN_SCAN:
50,556,778✔
1671
      code = stbSplSplitScanNode(pCxt, &info);
50,556,778✔
1672
      break;
50,558,956✔
1673
    case QUERY_NODE_LOGIC_PLAN_JOIN:
5,349,418✔
1674
      code = stbSplSplitJoinNode(pCxt, &info);
5,349,418✔
1675
      break;
5,349,843✔
1676
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
1,144,903✔
1677
      code = stbSplSplitPartitionNode(pCxt, &info);
1,144,903✔
1678
      break;
1,145,243✔
1679
    case QUERY_NODE_LOGIC_PLAN_AGG:
34,746,354✔
1680
      code = stbSplSplitAggNode(pCxt, &info);
34,746,354✔
1681
      break;
34,751,760✔
1682
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
9,202,244✔
1683
      code = stbSplSplitWindowNode(pCxt, &info);
9,202,244✔
1684
      break;
9,203,142✔
1685
    case QUERY_NODE_LOGIC_PLAN_SORT:
9,017,628✔
1686
      code = stbSplSplitSortNode(pCxt, &info);
9,017,628✔
1687
      break;
9,022,161✔
1688
    default:
×
1689
      break;
×
1690
  }
1691

1692
  if (info.pSplitNode && !inStreamTriggerClause(pCxt->pPlanCxt) && !inStreamCalcClause(pCxt->pPlanCxt)) {
110,013,360✔
1693
    info.pSplitNode->splitDone = true;
92,975,485✔
1694
  }
1695
  pCxt->split = true;
110,036,350✔
1696
  return code;
110,036,948✔
1697
}
1698

1699
typedef struct SSigTbJoinSplitInfo {
1700
  SJoinLogicNode* pJoin;
1701
  SLogicNode*     pSplitNode;
1702
  SLogicSubplan*  pSubplan;
1703
} SSigTbJoinSplitInfo;
1704

1705
static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) {
1,986,538,089✔
1706
  if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
1,986,538,089✔
1707
    return false;
1,929,318,724✔
1708
  }
1709

1710
  SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
57,220,455✔
1711
  if (!pJoin->isSingleTableJoin) {
57,220,455✔
1712
    return false;
22,350,805✔
1713
  }
1714
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
69,788,300✔
1715
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
34,918,873✔
1716
}
1717

1718
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,986,532,455✔
1719
                                      SSigTbJoinSplitInfo* pInfo) {
1720
  if (sigTbJoinSplNeedSplit(pNode)) {
1,986,532,455✔
1721
    pInfo->pJoin = (SJoinLogicNode*)pNode;
16,231,427✔
1722
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
16,230,664✔
1723
    pInfo->pSubplan = pSubplan;
16,246,842✔
1724
    return true;
16,246,842✔
1725
  }
1726
  return false;
1,970,380,937✔
1727
}
1728

1729
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,142,174✔
1730
  SSigTbJoinSplitInfo info = {0};
548,142,174✔
1731
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
548,142,301✔
1732
    return TSDB_CODE_SUCCESS;
532,000,925✔
1733
  }
1734
  bool hasScan = checkScanLogicNode((SLogicNode*)nodesListGetNode(info.pJoin->node.pChildren, 0));
16,245,008✔
1735
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, hasScan ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE, false);
16,245,716✔
1736
  if (TSDB_CODE_SUCCESS == code) {
16,246,359✔
1737
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
16,246,544✔
1738
  }
1739
  ++(pCxt->groupId);
16,247,018✔
1740
  pCxt->split = true;
16,247,018✔
1741
  return code;
16,247,018✔
1742
}
1743

1744
static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
29,988,722✔
1745
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
29,988,722✔
1746
  pUnionSubplan->pChildren = NULL;
29,988,722✔
1747

1748
  int32_t code = TSDB_CODE_SUCCESS;
29,988,722✔
1749

1750
  SNode* pChild = NULL;
29,988,722✔
1751
  FOREACH(pChild, pSplitNode->pChildren) {
90,214,623✔
1752
    SLogicSubplan* pNewSubplan = NULL;
60,229,006✔
1753
    code = splCreateSubplan(pCxt, (SLogicNode*)pChild, &pNewSubplan);
60,229,006✔
1754
    if (TSDB_CODE_SUCCESS == code) {
60,231,120✔
1755
      code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
60,231,397✔
1756
    }
1757
    if (TSDB_CODE_SUCCESS == code) {
60,230,845✔
1758
      REPLACE_NODE(NULL);
60,231,213✔
1759
      code = splMountSubplan(pNewSubplan, pSubplanChildren);
60,231,213✔
1760
    }
1761
    if (TSDB_CODE_SUCCESS != code) {
60,225,901✔
1762
      break;
×
1763
    }
1764
    ++(pCxt->groupId);
60,225,901✔
1765
  }
1766
  if (TSDB_CODE_SUCCESS == code) {
29,985,617✔
1767
    if (NULL != pSubplanChildren) {
29,995,774✔
1768
      if (pSubplanChildren->length > 0) {
12,521,826✔
1769
        code = nodesListMakeStrictAppendList(&pUnionSubplan->pChildren, pSubplanChildren);
5,016✔
1770
      } else {
1771
        nodesDestroyList(pSubplanChildren);
12,516,810✔
1772
      }
1773
    }
1774
    NODES_DESTORY_LIST(pSplitNode->pChildren);
29,995,689✔
1775
  }
1776
  return code;
29,985,453✔
1777
}
1778

1779
typedef struct SUnionAllSplitInfo {
1780
  SProjectLogicNode* pProject;
1781
  SLogicSubplan*     pSubplan;
1782
} SUnionAllSplitInfo;
1783

1784
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,961,472,979✔
1785
                                  SUnionAllSplitInfo* pInfo) {
1786
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
1,961,472,979✔
1787
    pInfo->pProject = (SProjectLogicNode*)pNode;
16,451,761✔
1788
    pInfo->pSubplan = pSubplan;
16,451,761✔
1789
    return true;
16,451,761✔
1790
  }
1791
  return false;
1,945,022,159✔
1792
}
1793

1794
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
16,449,986✔
1795
                                          SProjectLogicNode* pProject) {
1796
  SExchangeLogicNode* pExchange = NULL;
16,449,986✔
1797
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
16,449,986✔
1798
  if (NULL == pExchange) {
16,451,934✔
1799
    return code;
×
1800
  }
1801
  pExchange->srcStartGroupId = startGroupId;
16,451,934✔
1802
  pExchange->srcEndGroupId = pCxt->groupId - 1;
16,451,934✔
1803
  pExchange->node.precision = pProject->node.precision;
16,451,934✔
1804
  pExchange->node.pTargets = NULL;
16,451,934✔
1805
  code = nodesCloneList(pProject->node.pTargets, &pExchange->node.pTargets);
16,451,934✔
1806
  if (TSDB_CODE_SUCCESS != code) {
16,452,402✔
1807
    nodesDestroyNode((SNode*)pExchange);
×
1808
    return code;
×
1809
  }
1810
  pExchange->node.pConditions = NULL;
16,452,402✔
1811
  code = nodesCloneNode(pProject->node.pConditions, &pExchange->node.pConditions);
16,452,402✔
1812
  if (TSDB_CODE_SUCCESS != code) {
16,452,402✔
1813
    nodesDestroyNode((SNode*)pExchange);
1,671✔
1814
    return code;
×
1815
  }
1816
  TSWAP(pExchange->node.pLimit, pProject->node.pLimit);
16,450,731✔
1817

1818
  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
16,450,731✔
1819

1820
  if (NULL == pProject->node.pParent) {
16,450,731✔
1821
    pSubplan->pNode = (SLogicNode*)pExchange;
9,908,200✔
1822
    nodesDestroyNode((SNode*)pProject);
9,908,200✔
1823
    return TSDB_CODE_SUCCESS;
9,908,762✔
1824
  }
1825

1826
  SNode* pNode;
1827
  FOREACH(pNode, pProject->node.pParent->pChildren) {
6,542,531✔
1828
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
6,542,814✔
1829
      REPLACE_NODE(pExchange);
6,542,600✔
1830
      nodesDestroyNode(pNode);
6,542,600✔
1831
      return TSDB_CODE_SUCCESS;
6,543,545✔
1832
    }
1833
  }
UNCOV
1834
  nodesDestroyNode((SNode*)pExchange);
×
1835
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1836
}
1837

1838
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,142,574✔
1839
  SUnionAllSplitInfo info = {0};
548,142,574✔
1840
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
548,143,231✔
1841
    return TSDB_CODE_SUCCESS;
531,795,269✔
1842
  }
1843

1844
  int32_t startGroupId = pCxt->groupId;
16,448,698✔
1845
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
16,448,698✔
1846
  if (TSDB_CODE_SUCCESS == code) {
16,452,120✔
1847
    code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
16,452,120✔
1848
  }
1849
  pCxt->split = true;
16,451,758✔
1850
  return code;
16,451,758✔
1851
}
1852

1853
typedef struct SUnionDistinctSplitInfo {
1854
  SAggLogicNode* pAgg;
1855
  SLogicSubplan* pSubplan;
1856
} SUnionDistinctSplitInfo;
1857

1858
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
13,541,772✔
1859
                                           SAggLogicNode* pAgg) {
1860
  SExchangeLogicNode* pExchange = NULL;
13,541,772✔
1861
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
13,541,772✔
1862
  if (NULL == pExchange) {
13,544,052✔
1863
    return code;
×
1864
  }
1865
  pExchange->srcStartGroupId = startGroupId;
13,544,052✔
1866
  pExchange->srcEndGroupId = pCxt->groupId - 1;
13,544,052✔
1867
  pExchange->node.precision = pAgg->node.precision;
13,544,052✔
1868
  pExchange->node.pTargets = NULL;
13,544,052✔
1869
  code = nodesCloneList(pAgg->pGroupKeys, &pExchange->node.pTargets);
13,544,052✔
1870
  if (NULL == pExchange->node.pTargets) {
13,544,231✔
1871
    nodesDestroyNode((SNode*)pExchange);
×
1872
    return code;
×
1873
  }
1874

1875
  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
13,544,231✔
1876

1877
  return nodesListMakeStrictAppend(&pAgg->node.pChildren, (SNode*)pExchange);
13,544,231✔
1878
}
1879

1880
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,974,657,177✔
1881
                                   SUnionDistinctSplitInfo* pInfo) {
1882
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
1,974,657,177✔
1883
    pInfo->pAgg = (SAggLogicNode*)pNode;
13,543,323✔
1884
    if (!pInfo->pAgg->pGroupKeys) return false;
13,543,323✔
1885
    pInfo->pSubplan = pSubplan;
13,543,323✔
1886
    return true;
13,543,323✔
1887
  }
1888
  return false;
1,961,113,647✔
1889
}
1890

1891
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,141,591✔
1892
  SUnionDistinctSplitInfo info = {0};
548,141,591✔
1893
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
548,143,307✔
1894
    return TSDB_CODE_SUCCESS;
534,697,953✔
1895
  }
1896

1897
  int32_t startGroupId = pCxt->groupId;
13,544,128✔
1898
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
13,544,128✔
1899
  if (TSDB_CODE_SUCCESS == code) {
13,544,127✔
1900
    code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
13,543,857✔
1901
  }
1902
  pCxt->split = true;
13,544,127✔
1903
  return code;
13,544,127✔
1904
}
1905

1906
typedef struct SSmaIndexSplitInfo {
1907
  SMergeLogicNode* pMerge;
1908
  SLogicSubplan*   pSubplan;
1909
} SSmaIndexSplitInfo;
1910

1911
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,060,708,217✔
1912
                                   SSmaIndexSplitInfo* pInfo) {
1913
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
2,060,708,217✔
1914
    if (((SMergeLogicNode*)pNode)->node.dynamicOp) {
1,592,986✔
1915
      return false;
169,512✔
1916
    }
1917
    int32_t nodeType = nodeType(nodesListGetNode(pNode->pChildren, 0));
1,423,474✔
1918
    if (nodeType == QUERY_NODE_LOGIC_PLAN_EXCHANGE || nodeType == QUERY_NODE_LOGIC_PLAN_MERGE) {
1,423,474✔
1919
      pInfo->pMerge = (SMergeLogicNode*)pNode;
1,206✔
1920
      pInfo->pSubplan = pSubplan;
×
1921
      return true;
×
1922
    }
1923
  }
1924
  return false;
2,060,539,926✔
1925
}
1926

1927
static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,156,282✔
1928
  SSmaIndexSplitInfo info = {0};
548,156,282✔
1929
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
548,158,019✔
1930
    return TSDB_CODE_SUCCESS;
548,246,288✔
1931
  }
1932

1933
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
×
1934
  if (TSDB_CODE_SUCCESS == code) {
×
1935
    info.pMerge->srcGroupId = pCxt->groupId;
×
1936
  }
1937
  ++(pCxt->groupId);
×
1938
  pCxt->split = true;
×
1939
  return code;
×
1940
}
1941

1942
typedef struct SInsertSelectSplitInfo {
1943
  SLogicNode*    pQueryRoot;
1944
  SLogicSubplan* pSubplan;
1945
} SInsertSelectSplitInfo;
1946

1947
static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,058,205,273✔
1948
                                   SInsertSelectSplitInfo* pInfo) {
1949
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
2,058,205,273✔
1950
      MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
514,372✔
1951
    pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
514,372✔
1952
    pInfo->pSubplan = pSubplan;
514,372✔
1953
    return true;
514,372✔
1954
  }
1955
  return false;
2,057,691,004✔
1956
}
1957

1958
static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,192,565✔
1959
  SInsertSelectSplitInfo info = {0};
548,192,565✔
1960
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
548,193,280✔
1961
    return TSDB_CODE_SUCCESS;
547,736,060✔
1962
  }
1963

1964
  SLogicSubplan* pNewSubplan = NULL;
513,613✔
1965
  SNodeList*     pSubplanChildren = info.pSubplan->pChildren;
513,613✔
1966
  int32_t        code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY, false);
513,613✔
1967
  if (TSDB_CODE_SUCCESS == code) {
514,372✔
1968
    code = splCreateSubplan(pCxt, info.pQueryRoot, &pNewSubplan);
514,372✔
1969
  }
1970
  if (TSDB_CODE_SUCCESS == code) {
514,372✔
1971
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pNewSubplan);
514,372✔
1972
  }
1973
  if (TSDB_CODE_SUCCESS == code) {
514,372✔
1974
    code = splMountSubplan(pNewSubplan, pSubplanChildren);
514,372✔
1975
  }
1976

1977
  SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
514,372✔
1978
  ++(pCxt->groupId);
514,372✔
1979
  pCxt->split = true;
514,372✔
1980
  return code;
514,372✔
1981
}
1982

1983
typedef struct SVirtualTableSplitInfo {
1984
  SVirtualScanLogicNode *pVirtual;
1985
  SLogicSubplan          *pSubplan;
1986
} SVirtualTableSplitInfo;
1987

1988
static bool virtualTableFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,039,363,665✔
1989
                                      SVirtualTableSplitInfo* pInfo) {
1990
  if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode) && 0 != LIST_LENGTH(pNode->pChildren) &&
2,039,363,665✔
1991
      QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
10,154,283✔
1992
    pInfo->pVirtual = (SVirtualScanLogicNode*)pNode;
4,732,351✔
1993
    pInfo->pSubplan = pSubplan;
4,732,351✔
1994
    return true;
4,732,351✔
1995
  }
1996
  return false;
2,034,632,413✔
1997
}
1998

1999
static bool needProcessOneBlockEachTime(SVirtualScanLogicNode* pVirtual) {
16,216,343✔
2000
  if (pVirtual->node.pParent && QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pVirtual->node.pParent)) {
16,216,343✔
2001
    return true;
3,476,249✔
2002
  }
2003
  return false;
12,740,094✔
2004
}
2005

2006
static int32_t virtualTableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,139,414✔
2007
  int32_t                code = TSDB_CODE_SUCCESS;
548,139,414✔
2008
  SVirtualTableSplitInfo info = {0};
548,139,414✔
2009
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)virtualTableFindSplitNode, &info)) {
548,139,963✔
2010
    return TSDB_CODE_SUCCESS;
543,513,526✔
2011
  }
2012
  SNode*  pChild = NULL;
4,730,773✔
2013
  FOREACH(pChild, info.pVirtual->node.pChildren) {
20,947,116✔
2014
    SExchangeLogicNode* pExchange = NULL;
16,216,343✔
2015
    PLAN_ERR_JRET(splCreateExchangeNode(pCxt, (SLogicNode*)pChild, &pExchange));
16,216,343✔
2016

2017
    pExchange->dynTbname = nodeType((SLogicNode*)pChild) == QUERY_NODE_LOGIC_PLAN_SCAN ? ((SScanLogicNode*)pChild)->phTbnameScan : false;
16,216,343✔
2018
    pExchange->seqRecvData = (info.pVirtual->tableType == TSDB_SUPER_TABLE);
16,216,343✔
2019

2020
    pExchange->node.stmtRoot = ((SLogicNode*)pChild)->stmtRoot;
16,216,343✔
2021
    REPLACE_NODE(pExchange);
16,216,343✔
2022
    pExchange->node.pParent = ((SLogicNode*)pChild)->pParent;
16,216,343✔
2023

2024
    SLogicSubplan *sub = splCreateScanSubplan(pCxt, (SLogicNode*)pChild, 0);
16,216,343✔
2025
    sub->processOneBlock = needProcessOneBlockEachTime(info.pVirtual);
16,216,343✔
2026
    PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)sub));
16,216,343✔
2027
    ++(pCxt->groupId);
16,216,343✔
2028
  }
2029
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
4,730,773✔
2030
_return:
4,730,773✔
2031
  pCxt->split = true;
4,730,773✔
2032
  return code;
4,730,773✔
2033
}
2034

2035
typedef struct SMergeAggColsSplitInfo {
2036
  SAggLogicNode   *pAgg;
2037
  SLogicNode      *pSplitNode;
2038
  SLogicSubplan   *pSubplan;
2039
} SMergeAggColsSplitInfo;
2040

2041
typedef struct SMergeTableScanSplitInfo {
2042
  SLogicNode    *pMerge;    // Dynamic merge node selected for split.
2043
  SLogicSubplan *pSubplan;  // Subplan that owns pMerge.
2044
} SMergeTableScanSplitInfo;
2045

2046
/*
2047
 * Find dynamic merge node that contains table-merge scan children.
2048
 *
2049
 * @param pCxt Split context.
2050
 * @param pSubplan Subplan currently visited.
2051
 * @param pNode Candidate logic node.
2052
 * @param pInfo Output split-node info.
2053
 *
2054
 * @return true when a split candidate is found, otherwise false.
2055
 */
2056
static bool mergeTableScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,076,965,792✔
2057
                                        SMergeTableScanSplitInfo* pInfo) {
2058
  if (QUERY_NODE_LOGIC_PLAN_MERGE != nodeType(pNode)) {
2,076,965,792✔
2059
    return false;
1,997,571,939✔
2060
  }
2061
  if (!pNode->dynamicOp) {
79,395,786✔
2062
    return false;
79,259,142✔
2063
  }
2064

2065
  SNode* pChild = NULL;
202,393✔
2066
  FOREACH(pChild, pNode->pChildren) {
646,137✔
2067
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) &&
500,248✔
2068
        ((SScanLogicNode*)pChild)->scanType == SCAN_TYPE_TABLE_MERGE &&
56,504✔
2069
        ((SLogicNode*)pChild)->dynamicOp) {
56,504✔
2070
      pInfo->pSubplan = pSubplan;
56,504✔
2071
      pInfo->pMerge = pNode;
56,504✔
2072
      return true;
56,504✔
2073
    }
2074
  }
2075
  return false;
145,889✔
2076
}
2077

2078
/*
2079
 * Split dynamic table-merge scan children into independent scan subplans.
2080
 *
2081
 * @param pCxt Split context.
2082
 * @param pSubplan Subplan to split.
2083
 *
2084
 * @return TSDB_CODE_SUCCESS when completed or no split needed, otherwise error code.
2085
 */
2086
static int32_t mergeTableScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,144,910✔
2087
  int32_t                   code = TSDB_CODE_SUCCESS;
548,144,910✔
2088
  SMergeTableScanSplitInfo  info = {0};
548,144,910✔
2089
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)mergeTableScanFindSplitNode, &info)) {
548,145,580✔
2090
    return TSDB_CODE_SUCCESS;
548,187,310✔
2091
  }
2092
  SMergeLogicNode* pMerge = (SMergeLogicNode*)info.pMerge;
56,504✔
2093
  // set group id range for merge node
2094
  pMerge->srcGroupId = pCxt->groupId;
56,504✔
2095

2096
  SNode*  pChild = NULL;
56,504✔
2097
  FOREACH(pChild, info.pMerge->pChildren) {
278,376✔
2098
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) &&
221,872✔
2099
        ((SScanLogicNode*)pChild)->scanType == SCAN_TYPE_TABLE_MERGE &&
221,872✔
2100
        ((SLogicNode*)pChild)->dynamicOp) {
221,872✔
2101
      PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)pChild, SUBPLAN_TYPE_SCAN, false));
221,872✔
2102
      PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pChild, 0)));
221,872✔
2103
      ++(pCxt->groupId);
221,872✔
2104
    }
2105
  }
2106

2107
  pMerge->srcEndGroupId = pCxt->groupId - 1;
56,504✔
2108

2109
  pCxt->split = true;
56,504✔
2110
  return code;
56,504✔
2111
}
2112

2113
static bool mergeAggColsNeedSplit(SLogicNode* pNode) {
2,077,447,218✔
2114
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
2,077,447,218✔
2115
      NULL != pNode->pParent &&
225,061,763✔
2116
      QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode->pParent) &&
84,346,579✔
2117
      ((SMergeLogicNode *)pNode->pParent)->colsMerge &&
173,274✔
2118
      QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))) {
173,274✔
2119
    return true;
75,952✔
2120
  }
2121
  return false;
2,077,371,537✔
2122
}
2123

2124
static bool mergeAggColsFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,077,432,929✔
2125
                                      SMergeAggColsSplitInfo* pInfo) {
2126
  if (mergeAggColsNeedSplit(pNode)) {
2,077,432,929✔
2127
    pInfo->pAgg = (SAggLogicNode *)pNode;
74,526✔
2128
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
75,572✔
2129
    pInfo->pSubplan = pSubplan;
75,952✔
2130
    return true;
75,952✔
2131
  }
2132
  return false;
2,077,451,580✔
2133
}
2134

2135
static int32_t mergeAggColsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,139,317✔
2136
  int32_t                code = TSDB_CODE_SUCCESS;
548,139,317✔
2137
  SMergeAggColsSplitInfo info = {0};
548,139,317✔
2138
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)mergeAggColsFindSplitNode, &info)) {
548,141,029✔
2139
    return TSDB_CODE_SUCCESS;
548,172,416✔
2140
  }
2141

2142
  PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, SUBPLAN_TYPE_MERGE, false));
76,807✔
2143
  PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)));
75,952✔
2144

2145
  ++(pCxt->groupId);
75,952✔
2146
  pCxt->split = true;
75,952✔
2147
  return code;
75,952✔
2148
}
2149

2150
typedef struct SMergeExtWinSplitInfo {
2151
  SLogicNode      *pSplitNode;
2152
  SLogicSubplan   *pSubplan;
2153
} SMergeExtWinSplitInfo;
2154

2155
static bool mergeExtWinNeedSplit(SLogicNode* pNode) {
2,078,893,461✔
2156
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) &&
2,078,893,461✔
2157
      pNode->pParent &&
631,204,599✔
2158
      pNode->pParent->pParent &&
457,764,049✔
2159
      QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
148,524,003✔
2160
    // right part, which calculate the final result depends on the time range from left part
2161
    // virtual normal/child table
2162
    if (((SWindowLogicNode*)(pNode->pParent))->winType == WINDOW_TYPE_EXTERNAL &&
8,750,644✔
2163
        QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode->pParent->pParent)) {
1,231,704✔
2164
      return true;
767,928✔
2165
    }
2166
    // virtual super table
2167
    if (((SWindowLogicNode*)(pNode->pParent))->winType == WINDOW_TYPE_EXTERNAL &&
7,982,716✔
2168
        QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode->pParent->pParent) &&
463,776✔
2169
        ((SDynQueryCtrlLogicNode*)pNode->pParent->pParent)->qType == DYN_QTYPE_VTB_WINDOW) {
302,127✔
2170
      return true;
302,127✔
2171
    }
2172
    // left part, which calculate the window range
2173
    if (((SWindowLogicNode*)(pNode->pParent))->winType != WINDOW_TYPE_EXTERNAL &&
7,680,589✔
2174
        QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode->pParent->pParent)) {
7,520,919✔
2175
      return true;
321,276✔
2176
    }
2177
    return false;
7,359,313✔
2178
  }
2179
  return false;
2,070,141,469✔
2180
}
2181

2182
static bool mergeExtWinFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,078,887,395✔
2183
                                     SMergeExtWinSplitInfo* pInfo) {
2184
  if (mergeExtWinNeedSplit(pNode)) {
2,078,887,395✔
2185
    pInfo->pSplitNode = pNode;
1,390,339✔
2186
    pInfo->pSubplan = pSubplan;
1,391,331✔
2187
    return true;
1,391,331✔
2188
  }
2189
  return false;
2,077,571,422✔
2190
}
2191

2192
static int32_t mergeExtWinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,150,855✔
2193
  int32_t                code = TSDB_CODE_SUCCESS;
548,150,855✔
2194
  SMergeExtWinSplitInfo  info = {0};
548,150,855✔
2195
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)mergeExtWinFindSplitNode, &info)) {
548,150,894✔
2196
    return TSDB_CODE_SUCCESS;
546,858,204✔
2197
  }
2198

2199
  PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false));
1,391,331✔
2200
  PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)));
1,391,331✔
2201

2202
  ++(pCxt->groupId);
1,391,331✔
2203
  pCxt->split = true;
1,391,331✔
2204
  return code;
1,391,331✔
2205
}
2206

2207
typedef struct SQnodeSplitInfo {
2208
  SLogicNode*    pSplitNode;
2209
  SLogicSubplan* pSubplan;
2210
} SQnodeSplitInfo;
2211

2212
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,790,429✔
2213
                                SQnodeSplitInfo* pInfo) {
2214
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
2,790,429✔
2215
      QUERY_NODE_LOGIC_PLAN_ANALYSIS_FUNC != nodeType(pNode->pParent) &&
1,074,766✔
2216
      QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
1,074,766✔
2217
      ((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
1,074,766✔
2218
    pInfo->pSplitNode = pNode;
1,074,766✔
2219
    pInfo->pSubplan = pSubplan;
1,074,766✔
2220
    return true;
1,074,766✔
2221
  }
2222
  return false;
1,715,663✔
2223
}
2224

2225
static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
394,151,813✔
2226
  if (QUERY_POLICY_QNODE != tsQueryPolicy) {
394,151,813✔
2227
    return TSDB_CODE_SUCCESS;
392,483,966✔
2228
  }
2229

2230
  SQnodeSplitInfo info = {0};
1,681,440✔
2231
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
1,679,550✔
2232
    return TSDB_CODE_SUCCESS;
604,784✔
2233
  }
2234
  ((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
1,074,766✔
2235
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false);
1,074,766✔
2236
  if (TSDB_CODE_SUCCESS == code) {
1,074,766✔
2237
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
1,074,766✔
2238
    if (NULL != pScanSubplan) {
1,074,766✔
2239
      if (NULL != info.pSubplan->pVgroupList) {
1,074,766✔
2240
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
1,078✔
2241
        TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
1,078✔
2242
      } else {
2243
        info.pSubplan->numOfComputeNodes = 1;
1,073,688✔
2244
      }
2245
      code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
1,074,766✔
2246
    } else {
2247
      code = terrno;
×
2248
    }
2249
  }
2250
  info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
1,074,766✔
2251
  ++(pCxt->groupId);
1,074,766✔
2252
  pCxt->split = true;
1,074,766✔
2253
  return code;
1,074,766✔
2254
}
2255

2256
typedef struct SDynVirtualScanSplitInfo {
2257
  SScanLogicNode         *pDyn;
2258
  SLogicSubplan          *pSubplan;
2259
} SDynVirtualScanSplitInfo;
2260

2261
static bool dynVirtualScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,055,002,269✔
2262
                                        SDynVirtualScanSplitInfo* pInfo) {
2263
  if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent) {
2,055,002,269✔
2264
    return false;
1,595,948,275✔
2265
  }
2266

2267
  SLogicNode*   pParent = pNode->pParent;
459,056,678✔
2268
  EScanType     scanType = ((SScanLogicNode*)pNode)->scanType;
459,056,678✔
2269

2270
  // 1. split for system table scan under dynamic query control node(virtual stable scan)
2271
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pParent) &&
459,055,256✔
2272
      (((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_SCAN ||
4,659,569✔
2273
       ((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_WINDOW ||
2,638,550✔
2274
       ((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_TS_SCAN) &&
4,659,569✔
2275
      scanType == SCAN_TYPE_SYSTEM_TABLE) {
2276
    pInfo->pDyn = (SScanLogicNode*)pNode;
2,379,650✔
2277
    pInfo->pSubplan = pSubplan;
2,379,650✔
2278
    return true;
2,379,650✔
2279
  }
2280

2281
  // 2. split for system table scan under dynamic query control node(virtual stable agg)
2282
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pParent) &&
456,675,346✔
2283
      (((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_AGG ||
2,279,919✔
2284
       ((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_INTERVAL) &&
2,279,919✔
2285
      scanType == SCAN_TYPE_SYSTEM_TABLE) {
2286
    pInfo->pDyn = (SScanLogicNode*)pNode;
2,142,323✔
2287
    pInfo->pSubplan = pSubplan;
2,142,323✔
2288
    return true;
2,142,323✔
2289
  }
2290

2291
  // 3. split for tag scan under partition node under dynamic query control node(virtual stable agg)
2292
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && NULL != pParent->pParent &&
454,533,361✔
2293
      (((SDynQueryCtrlLogicNode*)(pParent->pParent))->qType == DYN_QTYPE_VTB_AGG ||
17,021,129✔
2294
       ((SDynQueryCtrlLogicNode*)(pParent->pParent))->qType == DYN_QTYPE_VTB_INTERVAL) &&
17,021,129✔
2295
      scanType == SCAN_TYPE_TAG) {
2296
    pInfo->pDyn = (SScanLogicNode*)pNode;
1,723,588✔
2297
    pInfo->pSubplan = pSubplan;
1,723,588✔
2298
    return true;
1,723,588✔
2299
  }
2300

2301
  // 4. split for tag scan under dynamic query control node(virtual stable agg)
2302
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pParent) &&
452,809,773✔
2303
      (((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_AGG ||
137,596✔
2304
       ((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_INTERVAL) &&
137,596✔
2305
      scanType == SCAN_TYPE_TAG) {
2306
    pInfo->pDyn = (SScanLogicNode*)pNode;
137,596✔
2307
    pInfo->pSubplan = pSubplan;
137,596✔
2308
    return true;
137,596✔
2309
  }
2310

2311
  return false;
452,671,847✔
2312
}
2313

2314
static int32_t dynVirtualScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,133,529✔
2315
  int32_t                  code = TSDB_CODE_SUCCESS;
548,133,529✔
2316
  SDynVirtualScanSplitInfo info = {0};
548,133,529✔
2317
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)dynVirtualScanFindSplitNode, &info)) {
548,134,089✔
2318
    return TSDB_CODE_SUCCESS;
541,848,508✔
2319
  }
2320

2321
  PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pDyn, info.pSubplan->subplanType, false));
6,380,915✔
2322
  PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)info.pDyn, 0)));
6,383,157✔
2323
  
2324
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
6,383,157✔
2325
  ++(pCxt->groupId);
6,383,157✔
2326

2327
  pCxt->split = true;
6,383,157✔
2328
  return code;
6,383,157✔
2329
}
2330

2331
typedef struct SVstbAggSplitInfo {
2332
  SLogicNode             *pAgg;
2333
  SLogicNode             *pDyn;
2334
  SLogicSubplan          *pSubplan;
2335
  bool                    needPartAgg;
2336
} SVstbAggSplitInfo;
2337

2338
static bool vstbAggFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,081,320,764✔
2339
                                 SVstbAggSplitInfo* pInfo) {
2340
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && NULL != pNode->pParent && LIST_LENGTH(pNode->pChildren) == 1) {
2,081,320,764✔
2341
    if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode->pParent) &&
84,370,302✔
2342
        (((SDynQueryCtrlLogicNode*)(pNode->pParent))->qType == DYN_QTYPE_VTB_AGG ||
4,570,884✔
2343
         ((SDynQueryCtrlLogicNode*)(pNode->pParent))->qType == DYN_QTYPE_VTB_INTERVAL) &&
×
2344
        QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))) {
4,569,585✔
2345
      if (((SDynQueryCtrlLogicNode*)(pNode->pParent))->vtbScan.batchProcessChild) {
2,109,442✔
2346
        pInfo->pAgg = (SLogicNode *)pNode;
1,354,850✔
2347
        pInfo->pSubplan = pSubplan;
1,354,850✔
2348
        pInfo->needPartAgg = true;
1,354,850✔
2349
      } else {
2350
        pInfo->pAgg = (SLogicNode *)pNode;
754,592✔
2351
        pInfo->pDyn = pNode->pParent;
754,592✔
2352
        pInfo->pSubplan = pSubplan;
754,592✔
2353
        pInfo->needPartAgg = false;
754,592✔
2354
      }
2355
      return true;
2,109,442✔
2356
    }
2357
  }
2358
  return false;
2,079,247,108✔
2359

2360
}
2361

2362
static int32_t vstbAggSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,134,308✔
2363
  int32_t                  code = TSDB_CODE_SUCCESS;
548,134,308✔
2364
  int32_t                  lino = 0;
548,134,308✔
2365
  struct SVstbAggSplitInfo info = {0};
548,134,308✔
2366
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)vstbAggFindSplitNode, &info)) {
548,133,639✔
2367
    return TSDB_CODE_SUCCESS;
546,139,179✔
2368
  }
2369

2370
  SLogicNode* pPartAgg = NULL;
2,108,516✔
2371

2372
  if (info.needPartAgg) {
2,108,516✔
2373
    PLAN_ERR_JRET(stbSplCreatePartAggNode((SAggLogicNode*)info.pAgg, &pPartAgg));
1,354,850✔
2374
    PLAN_ERR_JRET(stbSplCreateExchangeNode(pCxt, info.pAgg, pPartAgg));
1,354,850✔
2375
    PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pPartAgg, 0)));
1,354,850✔
2376
  } else {
2377
    PLAN_ERR_JRET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg, info.pSubplan->subplanType, false));
754,402✔
2378
    PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)info.pAgg, 0)));
754,592✔
2379
  }
2380

2381

2382
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
2,109,442✔
2383
  ++(pCxt->groupId);
2,109,442✔
2384

2385
  pCxt->split = true;
2,109,442✔
2386
  return code;
2,109,442✔
2387
_return:
×
2388
  planError("%s failed, code: %d, line: %d", __func__, code, lino);
×
2389
  return code;
×
2390
}
2391

2392
typedef struct SVstbIntervalSplitInfo {
2393
  SLogicNode*    pWindow;   // Interval-window node selected for split.
2394
  SLogicSubplan* pSubplan;  // Subplan that owns pWindow.
2395
} SVstbIntervalSplitInfo;
2396

2397
/*
2398
 * Find virtual-stable interval window that can be split for batch processing.
2399
 *
2400
 * @param pCxt Split context.
2401
 * @param pSubplan Subplan currently visited.
2402
 * @param pNode Candidate logic node.
2403
 * @param pInfo Output split-node info.
2404
 *
2405
 * @return true when a split candidate is found, otherwise false.
2406
 */
2407
static bool vstbIntervalFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,084,017,625✔
2408
                                      SVstbIntervalSplitInfo* pInfo) {
2409
  (void)pCxt;
2410
  if (QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode) || LIST_LENGTH(pNode->pChildren) != 1 ||
2,084,017,625✔
2411
      WINDOW_TYPE_INTERVAL != ((SWindowLogicNode*)pNode)->winType || NULL == pNode->pParent ||
44,181,107✔
2412
      ((SWindowLogicNode*)pNode)->indefRowsFunc ||
4,516,375✔
2413
      QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL != nodeType(pNode->pParent) ||
4,446,938✔
2414
      DYN_QTYPE_VTB_INTERVAL != ((SDynQueryCtrlLogicNode*)pNode->pParent)->qType ||
32,881✔
2415
      !((SDynQueryCtrlLogicNode*)pNode->pParent)->vtbScan.batchProcessChild ||
32,881✔
2416
      QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
32,881✔
2417
    return false;
2,084,007,054✔
2418
  }
2419

2420
  pInfo->pWindow = pNode;
32,881✔
2421
  pInfo->pSubplan = pSubplan;
32,881✔
2422

2423
  return true;
32,881✔
2424
}
2425

2426
/*
2427
 * Split a virtual-stable interval window subplan into per-group scan subplans.
2428
 *
2429
 * @param pCxt Split context.
2430
 * @param pSubplan Subplan to split.
2431
 *
2432
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
2433
 */
2434
static int32_t vstbIntervalSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
548,124,796✔
2435
  SVstbIntervalSplitInfo info = {0};
548,124,796✔
2436
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)vstbIntervalFindSplitNode, &info)) {
548,124,874✔
2437
    return TSDB_CODE_SUCCESS;
548,209,236✔
2438
  }
2439

2440
  SStableSplitInfo splitInfo = {
32,882✔
2441
    .pSplitNode = info.pWindow,
32,882✔
2442
    .pSubplan = info.pSubplan,
32,882✔
2443
  };
2444

2445
  int32_t code = stbSplSplitIntervalForBatch(pCxt, &splitInfo);
32,882✔
2446
  if (TSDB_CODE_SUCCESS == code) {
32,881✔
2447
    info.pWindow->splitDone = true;
32,881✔
2448
    pCxt->split = true;
32,881✔
2449
  }
2450
  return code;
32,881✔
2451
}
2452

2453
typedef struct SStreamScanSplitInfo {
2454
  SLogicNode             *pSplitNode;
2455
  SLogicSubplan          *pSubplan;
2456
} SStreamScanSplitInfo;
2457

2458
static bool streamScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
4,592,567✔
2459
                                           SStreamScanSplitInfo* pInfo) {
2460
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent) {
4,592,567✔
2461
    pInfo->pSplitNode = (SLogicNode *)pNode;
677,686✔
2462
    pInfo->pSubplan = pSubplan;
677,686✔
2463
    return true;
677,686✔
2464
  }
2465
  return false;
3,914,881✔
2466
}
2467

2468
static int32_t streamScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
394,209,613✔
2469
  int32_t                     code = TSDB_CODE_SUCCESS;
394,209,613✔
2470
  SStreamScanSplitInfo info = {0};
394,209,613✔
2471
  if (!inStreamCalcClause(pCxt->pPlanCxt) && !inStreamTriggerClause(pCxt->pPlanCxt)) {
394,210,045✔
2472
    return TSDB_CODE_SUCCESS;
393,256,893✔
2473
  }
2474
  while (splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)streamScanFindSplitNode, &info)) {
1,634,388✔
2475
    PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false));
677,686✔
2476
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
677,686✔
2477
    if (NULL != pScanSubplan) {
677,686✔
2478
      if (NULL != info.pSubplan->pVgroupList) {
677,686✔
2479
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
102,576✔
2480
      } else {
2481
        info.pSubplan->numOfComputeNodes = 1;
575,110✔
2482
      }
2483
      if (!pScanSubplan->pVgroupList) {
677,686✔
2484
        PLAN_ERR_RET(cloneVgroups(&pScanSubplan->pVgroupList, info.pSubplan->pVgroupList));
104,466✔
2485
      }
2486
      pScanSubplan->dynTbname = ((SScanLogicNode*)info.pSplitNode)->phTbnameScan;
677,686✔
2487
      PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan));
677,686✔
2488
    } else {
2489
      PLAN_ERR_RET(terrno);
×
2490
    }
2491
    info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
677,686✔
2492
    ++(pCxt->groupId);
677,686✔
2493
    info.pSplitNode->splitDone = true;
677,686✔
2494
    pCxt->split = true;
677,686✔
2495
  }
2496

2497
  return code;
956,702✔
2498
}
2499

2500
// clang-format off
2501
static const SSplitRule splitRuleSet[] = {
2502
  {.pName = "SuperTableSplit",        .splitFunc = stableSplit},
2503
  {.pName = "SingleTableJoinSplit",   .splitFunc = singleTableJoinSplit},
2504
  {.pName = "UnionAllSplit",          .splitFunc = unionAllSplit},
2505
  {.pName = "UnionDistinctSplit",     .splitFunc = unionDistinctSplit},
2506
  {.pName = "SmaIndexSplit",          .splitFunc = smaIndexSplit}, // not used yet
2507
  {.pName = "InsertSelectSplit",      .splitFunc = insertSelectSplit},
2508
  {.pName = "VirtualtableSplit",      .splitFunc = virtualTableSplit},
2509
  {.pName = "MergeTableScanSplit",    .splitFunc = mergeTableScanSplit},
2510
  {.pName = "MergeAggColsSplit",      .splitFunc = mergeAggColsSplit},
2511
  {.pName = "DynVirtualScanSplit",    .splitFunc = dynVirtualScanSplit},
2512
  {.pName = "VStbIntervalSplit",      .splitFunc = vstbIntervalSplit},
2513
  {.pName = "MergeExtWinSplit",       .splitFunc = mergeExtWinSplit},
2514
  {.pName = "VStbAggSplit",           .splitFunc = vstbAggSplit},
2515
};
2516
// clang-format on
2517

2518
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
2519

2520
static int32_t dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
565,704,730✔
2521
  int32_t code = 0;
565,704,730✔
2522
  if (!tsQueryPlannerTrace) {
565,704,730✔
2523
    return code;
565,623,109✔
2524
  }
2525
  char* pStr = NULL;
81,621✔
2526
  code = nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
80,457✔
2527
  if (TSDB_CODE_SUCCESS == code) {
106,832✔
2528
    if (NULL == pRuleName) {
106,832✔
2529
      qDebugL("before split, JsonPlan: %s", pStr);
79,966✔
2530
    } else {
2531
      qDebugL("apply split %s rule, JsonPlan: %s", pRuleName, pStr);
26,866✔
2532
    }
2533
    taosMemoryFree(pStr);
106,832✔
2534
  }
2535
  return code;
106,832✔
2536
}
2537

2538
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
394,152,250✔
2539
  SSplitContext cxt = {
394,152,250✔
2540
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pCxt->groupId + 1, .split = false};
394,154,021✔
2541
  bool    split = false;
394,151,571✔
2542
  int32_t code =TSDB_CODE_SUCCESS;
394,151,571✔
2543
  PLAN_ERR_RET(dumpLogicSubplan(NULL, pSubplan));
394,151,571✔
2544
  do {
2545
    split = false;
548,194,729✔
2546
    for (int32_t i = 0; i < splitRuleNum; ++i) {
2,147,483,647✔
2547
      cxt.split = false;
2,147,483,647✔
2548
      PLAN_ERR_RET(splitRuleSet[i].splitFunc(&cxt, pSubplan));
2,147,483,647✔
2549
      if (cxt.split) {
2,147,483,647✔
2550
        split = true;
171,575,063✔
2551
        PLAN_ERR_RET(dumpLogicSubplan(splitRuleSet[i].pName, pSubplan));
171,575,063✔
2552
      }
2553
    }
2554
  } while (split);
548,197,211✔
2555

2556
  PLAN_ERR_RET(streamScanSplit(&cxt, pSubplan));
394,255,270✔
2557
  PLAN_ERR_RET(qnodeSplit(&cxt, pSubplan));
394,203,195✔
2558

2559
  pCxt->groupId = cxt.groupId + 1;
394,229,684✔
2560
  
2561
  PLAN_RET(code);
394,228,094✔
2562
}
2563

2564
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
706,913,765✔
2565
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
706,913,765✔
2566
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
1,550,670✔
2567
    return;
1,550,670✔
2568
  } else if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode)) {
705,416,479✔
2569
    // do nothing, since virtual table scan node is SUBPLAN_TYPE_MERGE
2570
    return;
×
2571
  }
2572

2573
  SNode* pChild;
2574
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
708,476,443✔
2575
}
2576

2577
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
1,097,973,383✔
2578
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
1,097,973,383✔
2579
    return true;
393,677,024✔
2580
  }
2581
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
704,326,868✔
2582
  return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
704,343,247✔
2583
}
2584

2585
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
1,097,980,834✔
2586
  if (!needSplitSubplan(pLogicSubplan)) {
1,097,980,834✔
2587
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
703,901,597✔
2588
    return TSDB_CODE_SUCCESS;
703,826,734✔
2589
  }
2590
  return applySplitRule(pCxt, pLogicSubplan);
394,185,052✔
2591
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc