• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.9
/source/libs/planner/src/planSpliter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "functionMgt.h"
17
#include "planInt.h"
18
#include "taoserror.h"
19
#include "tglobal.h"
20

21
#define SPLIT_FLAG_MASK(n) (1 << n)
22

23
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
24
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
25

26
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
27
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
28

29
typedef struct SSplitContext {
30
  SPlanContext* pPlanCxt;
31
  uint64_t      queryId;
32
  int32_t       groupId;
33
  bool          split;
34
} SSplitContext;
35

36
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
37

38
typedef struct SSplitRule {
39
  char*  pName;
40
  FSplit splitFunc;
41
} SSplitRule;
42

43
typedef struct SFindSplitNodeCtx {
44
  const SSplitContext* pSplitCtx;
45
  const SLogicSubplan* pSubplan;
46
} SFindSplitNodeCtx;
47

48
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
49

50
static int32_t cloneVgroups(SVgroupsInfo **pDst, SVgroupsInfo* pSrc) {
96,942✔
51
  if (pSrc == NULL) {
96,942✔
52
    *pDst = NULL;
2,163✔
53
    return TSDB_CODE_SUCCESS;
2,163✔
54
  }
55
  int32_t len = VGROUPS_INFO_SIZE(pSrc);
94,779✔
56
  *pDst = taosMemoryMalloc(len);
94,779✔
57
  if (NULL == *pDst) {
94,779✔
58
    return terrno;
×
59
  }
60
  memcpy(*pDst, pSrc, len);
94,779✔
61
  return TSDB_CODE_SUCCESS;
94,779✔
62
}
63

64
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput);
65
static int32_t stbSplCreateMergeKeysByExpr(SNode* pExpr, EOrder order, SNodeList** pMergeKeys);
66

67
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
231,429,135✔
68
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
231,429,135✔
69
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
149,889,610✔
70
  } else if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode)) {
81,539,525✔
71
    // do nothing, since virtual table scan node is SUBPLAN_TYPE_MERGE
72
  } else if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode) && ((SDynQueryCtrlLogicNode *)pNode)->qType == DYN_QTYPE_VTB_SCAN) {
81,125,929✔
73
    TSWAP(pSubplan->pVgroupList, ((SDynQueryCtrlLogicNode*)pNode)->vtbScan.pVgroupList);
762✔
74
  } else {
75
    if (1 == LIST_LENGTH(pNode->pChildren)) {
81,125,167✔
76
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
81,123,299✔
77
    }
78
  }
79
}
231,433,046✔
80

81
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
150,123,166✔
82
  SLogicSubplan* pSubplan = NULL;
150,123,166✔
83
  terrno = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan);
150,123,166✔
84
  if (NULL == pSubplan) {
150,137,311✔
85
    return NULL;
×
86
  }
87
  pSubplan->id.queryId = pCxt->queryId;
150,137,311✔
88
  pSubplan->id.groupId = pCxt->groupId;
150,136,779✔
89
  // TODO(smj):refact here.
90
  pSubplan->subplanType = nodeType(pNode) == QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN ? SUBPLAN_TYPE_MERGE : SUBPLAN_TYPE_SCAN;
150,137,311✔
91
  pSubplan->pNode = pNode;
150,137,311✔
92
  pSubplan->pNode->pParent = NULL;
150,136,779✔
93
  splSetSubplanVgroups(pSubplan, pNode);
150,137,311✔
94
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
150,137,758✔
95
  return pSubplan;
150,137,758✔
96
}
97

98
static bool splHasScan(SLogicNode* pNode) {
182,779,097✔
99
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
182,779,097✔
100
    return true;
×
101
  }
102

103
  SNode* pChild = NULL;
182,779,097✔
104
  FOREACH(pChild, pNode->pChildren) {
182,779,097✔
105
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
134,102,290✔
106
      return true;
47,524,993✔
107
    }
108
    return splHasScan((SLogicNode*)pChild);
86,577,297✔
109
  }
110

111
  return false;
48,676,807✔
112
}
113

114
static void splSetSubplanType(SLogicSubplan* pSubplan) {
96,202,575✔
115
  pSubplan->subplanType = splHasScan(pSubplan->pNode) ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
96,202,575✔
116
}
96,211,235✔
117

118
static int32_t splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, SLogicSubplan** ppSubplan) {
57,414,018✔
119
  SLogicSubplan* pSubplan = NULL;
57,414,018✔
120
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan);
57,414,018✔
121
  if (NULL == pSubplan) {
57,418,052✔
122
    return code;
×
123
  }
124
  pSubplan->id.queryId = pCxt->queryId;
57,418,052✔
125
  pSubplan->id.groupId = pCxt->groupId;
57,418,052✔
126
  pSubplan->pNode = pNode;
57,418,052✔
127
  pNode->pParent = NULL;
57,418,052✔
128
  splSetSubplanType(pSubplan);
57,418,052✔
129
  *ppSubplan = pSubplan;
57,419,294✔
130
  return code;
57,419,294✔
131
}
132

133
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
115,726,926✔
134
  SExchangeLogicNode* pExchange = NULL;
115,726,926✔
135
  int32_t code = TSDB_CODE_SUCCESS;
115,726,926✔
136

137
  PLAN_ERR_JRET(nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange));
115,726,926✔
138

139
  pExchange->srcStartGroupId = pCxt->groupId;
115,738,554✔
140
  pExchange->srcEndGroupId = pCxt->groupId;
115,738,554✔
141
  pExchange->node.precision = pChild->precision;
115,738,022✔
142
  pExchange->node.dynamicOp = pChild->dynamicOp;
115,738,554✔
143
  pExchange->node.pTargets = NULL;
115,738,554✔
144
  PLAN_ERR_JRET(nodesCloneList(pChild->pTargets, &pExchange->node.pTargets));
115,738,554✔
145

146
  if (NULL != pChild->pLimit) {
115,738,839✔
147
    pExchange->node.pLimit = NULL;
1,723,450✔
148
    PLAN_ERR_JRET(nodesCloneNode(pChild->pLimit, &pExchange->node.pLimit));
1,723,450✔
149
    if (((SLimitNode*)pChild->pLimit)->limit && ((SLimitNode*)pChild->pLimit)->offset) {
1,719,039✔
150
      ((SLimitNode*)pChild->pLimit)->limit->datum.i += ((SLimitNode*)pChild->pLimit)->offset->datum.i;
476,975✔
151
    }
152
    if (((SLimitNode*)pChild->pLimit)->offset) {
1,719,039✔
153
      ((SLimitNode*)pChild->pLimit)->offset->datum.i = 0;
476,975✔
154
    }
155
  }
156

157
  *pOutput = pExchange;
115,734,428✔
158

159
  return code;
115,734,428✔
160
_return:
×
161
  planError("failed to create exchange node, code:%d", code);
×
162
  nodesDestroyNode((SNode*)pExchange);
×
163
  return code;
×
164
}
165

166
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
78,088,521✔
167
                                               ESubplanType subplanType, bool seqScan) {
168
  SExchangeLogicNode* pExchange = NULL;
78,088,521✔
169
  int32_t             code = TSDB_CODE_SUCCESS;
78,088,521✔
170

171
  PLAN_ERR_JRET(splCreateExchangeNode(pCxt, pSplitNode, &pExchange));
78,088,521✔
172

173
  pExchange->dynTbname = nodeType(pSplitNode) == QUERY_NODE_LOGIC_PLAN_SCAN ? ((SScanLogicNode*)pSplitNode)->phTbnameScan : false;
78,092,584✔
174
  pExchange->seqRecvData = seqScan;
78,092,584✔
175

176
  PLAN_ERR_JRET(replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange));
78,092,584✔
177
  pSubplan->subplanType = subplanType;
78,098,906✔
178

179
  return code;
78,098,906✔
180

181
_return:
×
182
  planError("failed to create exchange node for subplan, code:%d", code);
×
183
  nodesDestroyNode((SNode*)pExchange);
×
184
  return code;
×
185
}
186

187
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
30,567,995✔
188
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
30,567,995✔
189
    return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
20,679,108✔
190
           groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
10,339,704✔
191
  }
192

193
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
20,228,591✔
194
    return ((SMergeLogicNode*)pLogicNode)->srcGroupId <= groupId &&
3,480,520✔
195
           ((SMergeLogicNode*)pLogicNode)->srcEndGroupId >= groupId;
1,730,564✔
196
  }
197

198
  SNode* pChild;
199
  FOREACH(pChild, pLogicNode->pChildren) {
20,584,131✔
200
    bool isChild = splIsChildSubplan((SLogicNode*)pChild, groupId);
18,646,642✔
201
    if (isChild) {
18,646,642✔
202
      return isChild;
16,541,146✔
203
    }
204
  }
205
  return false;
1,937,519✔
206
}
207

208
static int32_t splMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
57,416,078✔
209
  SNode* pChild = NULL;
57,416,078✔
210
  WHERE_EACH(pChild, pChildren) {
69,338,352✔
211
    if (splIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
11,922,274✔
212
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
11,884,318✔
213
      if (TSDB_CODE_SUCCESS == code) {
11,884,618✔
214
        REPLACE_NODE(NULL);
11,884,618✔
215
        ERASE_NODE(pChildren);
11,884,618✔
216
        continue;
11,884,087✔
217
      } else {
218
        return code;
×
219
      }
220
    }
221
    WHERE_NEXT;
38,187✔
222
  }
223
  return TSDB_CODE_SUCCESS;
57,416,078✔
224
}
225

226
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
2,147,483,647✔
227
                           void* pInfo) {
228
  if (!pNode->splitDone && func(pCxt, pSubplan, pNode, pInfo)) {
2,147,483,647✔
229
    return true;
162,656,007✔
230
  }
231
  SNode* pChild;
232
  FOREACH(pChild, pNode->pChildren) {
2,147,483,647✔
233
    if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) {
2,147,483,647✔
234
      return true;
251,273,278✔
235
    }
236
  }
237
  return false;
2,147,483,647✔
238
}
239

240
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
2,147,483,647✔
241
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
2,147,483,647✔
242
    if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) {
2,147,483,647✔
243
      return true;
162,654,780✔
244
    }
245
  }
246
  SNode* pChild;
247
  FOREACH(pChild, pSubplan->pChildren) {
2,147,483,647✔
248
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
2,147,483,647✔
249
      return true;
12,565,009✔
250
    }
251
  }
252
  return false;
2,147,483,647✔
253
}
254

255
static void splSetParent(SLogicNode* pNode) {
50,498,404✔
256
  SNode* pChild = NULL;
50,498,404✔
257
  FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
83,348,975✔
258
}
50,497,872✔
259

260
typedef struct SStableSplitInfo {
261
  SLogicNode*    pSplitNode;
262
  SLogicSubplan* pSubplan;
263
} SStableSplitInfo;
264

265
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
222,079,234✔
266
  SNode* pFunc = NULL;
222,079,234✔
267
  FOREACH(pFunc, pFuncs) {
518,023,869✔
268
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
310,394,110✔
269
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
306,196,708✔
270
      return true;
14,452,311✔
271
    }
272
  }
273
  return false;
207,629,363✔
274
}
275

276
static bool stbSplIsMultiTbScan(SScanLogicNode* pScan) {
649,143,115✔
277
  return ((NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) || pScan->needSplit) &&
986,317,971✔
278
         pScan->placeholderType != SP_PARTITION_TBNAME &&
116,583,546✔
279
         pScan->placeholderType != SP_PARTITION_ROWS &&
116,586,264✔
280
         !pScan->phTbnameScan && !pScan->virtualStableScan;
1,298,291,561✔
281
}
282

283
static bool stbSplHasMultiTbScan(SLogicNode* pNode) {
363,731,709✔
284
  if (1 != LIST_LENGTH(pNode->pChildren)) {
363,731,709✔
285
    return false;
12,622,976✔
286
  }
287
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
351,107,890✔
288
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
351,116,409✔
289
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
10,939,916✔
290
      return false;
×
291
    }
292
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
10,939,916✔
293
  }
294
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan((SScanLogicNode*)pChild)) {
351,081,768✔
295
    return true;
48,830,715✔
296
  }
297

298
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
302,274,803✔
299
    if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) || (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) &&
102,542,948✔
300
                                                         ((SWindowLogicNode*)pNode)->winType == WINDOW_TYPE_INTERVAL)) {
27,084,443✔
301
      return ((SScanLogicNode*)pChild)->needSplit;
64,434,507✔
302
    }
303
  }
304
  if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pChild) &&
237,840,235✔
305
      ((SWindowLogicNode*)pChild)->winType == WINDOW_TYPE_EXTERNAL) {
968,096✔
306
    return stbSplHasMultiTbScan((SLogicNode*)pChild);
81,745✔
307
  }
308
  return false;
237,758,776✔
309
}
310

311
static bool stbSplIsMultiTbScanChild(SLogicNode* pNode) {
21,282,037✔
312
  if (1 != LIST_LENGTH(pNode->pChildren)) {
21,282,037✔
313
    return false;
×
314
  }
315
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
21,288,004✔
316
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan((SScanLogicNode*)pChild));
21,287,889✔
317
}
318

319
static bool stbSplNeedSplitWindow(SLogicNode* pNode) {
42,978,894✔
320
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
42,978,894✔
321
  if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
42,978,894✔
322
    return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(pNode);
18,374,386✔
323
  }
324

325
  if (WINDOW_TYPE_EXTERNAL == pWindow->winType) {
24,604,508✔
326
    return pWindow->pFuncs && !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(pNode);
4,871,896✔
327
  }
328

329
  if (WINDOW_TYPE_SESSION == pWindow->winType || WINDOW_TYPE_STATE == pWindow->winType || WINDOW_TYPE_COUNT == pWindow->winType || WINDOW_TYPE_EVENT == pWindow->winType) {
19,732,612✔
330
    return stbSplHasMultiTbScan(pNode);
19,732,612✔
331
  }
332

333
  return false;
×
334
}
335

336
static bool stbSplNeedSplitJoin(SJoinLogicNode* pJoin) {
57,537,485✔
337
  if (pJoin->isSingleTableJoin || JOIN_ALGO_HASH == pJoin->joinAlgo) {
57,537,485✔
338
    return false;
45,229,170✔
339
  }
340
  SNode* pChild = NULL;
12,308,315✔
341
  FOREACH(pChild, pJoin->node.pChildren) {
22,367,232✔
342
    if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) && QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pChild)) {
17,345,696✔
343
      return false;
7,286,779✔
344
    }
345
  }
346
  return true;
5,021,536✔
347
}
348

349
static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
32,375,237✔
350
  if (1 != LIST_LENGTH(pNode->pChildren)) {
32,375,237✔
351
    return false;
×
352
  }
353
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
32,376,567✔
354
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
32,377,530✔
355
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
966✔
356
      return false;
×
357
    }
358
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
966✔
359
  }
360
  return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
32,375,736✔
361
}
362

363
static bool stbSplNeedSplit(SFindSplitNodeCtx* pCtx, SLogicNode* pNode) {
1,618,777,587✔
364
  switch (nodeType(pNode)) {
1,618,777,587✔
365
    case QUERY_NODE_LOGIC_PLAN_SCAN:
483,910,965✔
366
      return stbSplIsMultiTbScan((SScanLogicNode*)pNode);
483,910,965✔
367
    case QUERY_NODE_LOGIC_PLAN_JOIN:
57,539,498✔
368
      return stbSplNeedSplitJoin((SJoinLogicNode*)pNode);
57,539,498✔
369
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
21,286,944✔
370
      return stbSplIsMultiTbScanChild(pNode);
21,286,944✔
371
    case QUERY_NODE_LOGIC_PLAN_AGG:
198,869,198✔
372
      return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
213,158,946✔
373
              isPartTableAgg((SAggLogicNode*)pNode)) &&
430,150,331✔
374
             (stbSplHasMultiTbScan(pNode) && !stbSplIsTableCountQuery(pNode));
220,431,894✔
375
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
42,911,633✔
376
      return stbSplNeedSplitWindow(pNode);
42,911,633✔
377
    case QUERY_NODE_LOGIC_PLAN_SORT:
132,836,456✔
378
      if (1 == LIST_LENGTH(pNode->pChildren)) {
132,836,456✔
379
        SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
132,839,455✔
380
        if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pChild) &&
132,847,101✔
381
            WINDOW_TYPE_EXTERNAL == ((SWindowLogicNode*)pChild)->winType &&
994,363✔
382
            !((SWindowLogicNode*)pChild)->calcWithPartition &&
178,589✔
383
            stbSplNeedSplitWindow(pChild)) {
70,577✔
384
          return false;
26,267✔
385
        }
386
      }
387
      return stbSplHasMultiTbScan(pNode);
132,813,521✔
388

389
    default:
681,424,132✔
390
      break;
681,424,132✔
391
  }
392
  return false;
681,424,132✔
393
}
394

395
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,618,781,273✔
396
                                SStableSplitInfo* pInfo) {
397
  SFindSplitNodeCtx ctx = {.pSplitCtx = pCxt, .pSubplan = pSubplan};
1,618,781,273✔
398
  if (stbSplNeedSplit(&ctx, pNode)) {
1,618,782,503✔
399
    pInfo->pSplitNode = pNode;
102,103,156✔
400
    pInfo->pSubplan = pSubplan;
102,103,156✔
401
    return true;
102,103,643✔
402
  }
403
  return false;
1,516,767,651✔
404
}
405

406
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMidFuncs, SNodeList** pMergeFuncs) {
24,676,219✔
407
  SNode* pNode = NULL;
24,676,219✔
408
  FOREACH(pNode, pFuncs) {
64,955,716✔
409
    SFunctionNode* pPartFunc = NULL;
40,274,274✔
410
    SFunctionNode* pMidFunc = NULL;
40,274,274✔
411
    SFunctionNode* pMergeFunc = NULL;
40,274,274✔
412
    int32_t        code = TSDB_CODE_SUCCESS;
40,274,274✔
413

414
    if (nodeType(pNode) != QUERY_NODE_FUNCTION) {
40,274,274✔
NEW
415
      planError("%s failed, expect function node in function list, actual nodeType:%d", __FUNCTION__, nodeType(pNode));
×
NEW
416
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
417
    } else {
418
      SFunctionNode* pFunc = (SFunctionNode*)pNode;
40,274,319✔
419
      if (fmIsWindowPseudoColumnFunc(pFunc->funcId) || fmIsPlaceHolderFunc(pFunc->funcId)) {
40,274,319✔
420
        code = nodesCloneNode(pNode, (SNode**)&pPartFunc);
1,727,020✔
421
        if (TSDB_CODE_SUCCESS == code) {
1,724,233✔
422
          code = nodesCloneNode(pNode, (SNode**)&pMergeFunc);
1,724,233✔
423
        }
424
        if (TSDB_CODE_SUCCESS == code && pMidFuncs != NULL) {
1,724,233✔
NEW
425
          code = nodesCloneNode(pNode, (SNode**)&pMidFunc);
×
NEW
426
          if (NULL == pMidFunc) {
×
NEW
427
            nodesDestroyNode((SNode*)pMidFunc);
×
428
          }
429
        }
430
      } else {
431
        code = fmGetDistMethod(pFunc, &pPartFunc, &pMidFunc, &pMergeFunc);
38,554,139✔
432
      }
433
    }
434

435
    if (TSDB_CODE_SUCCESS == code) {
40,274,985✔
436
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
40,276,299✔
437
    }
438
    if (TSDB_CODE_SUCCESS == code) {
40,277,838✔
439
      if (pMidFuncs != NULL) {
40,279,152✔
440
        code = nodesListMakeStrictAppend(pMidFuncs, (SNode*)pMidFunc);
×
441
      } else {
442
        nodesDestroyNode((SNode*)pMidFunc);
40,279,152✔
443
      }
444
    }
445
    if (TSDB_CODE_SUCCESS == code) {
40,275,627✔
446
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
40,277,493✔
447
    }
448
    if (TSDB_CODE_SUCCESS != code) {
40,279,497✔
449
      nodesDestroyNode((SNode*)pPartFunc);
×
450
      nodesDestroyNode((SNode*)pMidFunc);
×
451
      nodesDestroyNode((SNode*)pMergeFunc);
×
452
      return code;
×
453
    }
454
  }
455
  return TSDB_CODE_SUCCESS;
24,681,397✔
456
}
457

458
static int32_t stbSplAppendWStart(SNodeList** pFuncs, int32_t* pIndex, uint8_t precision) {
3,693,590✔
459
  int32_t index = 0;
3,693,590✔
460
  SNode*  pFunc = NULL;
3,693,590✔
461
  FOREACH(pFunc, *pFuncs) {
7,426,778✔
462
    if (nodeType(pFunc) == QUERY_NODE_FUNCTION && FUNCTION_TYPE_WSTART == ((SFunctionNode*)pFunc)->funcType) {
4,783,240✔
463
      *pIndex = index;
1,050,052✔
464
      return TSDB_CODE_SUCCESS;
1,050,052✔
465
    }
466
    ++index;
3,733,188✔
467
  }
468

469
  SFunctionNode* pWStart = NULL;
2,643,538✔
470
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pWStart);
2,643,538✔
471
  if (NULL == pWStart) {
2,644,663✔
472
    return code;
×
473
  }
474
  tstrncpy(pWStart->functionName, "_wstart", TSDB_FUNC_NAME_LEN);
2,644,663✔
475
  int64_t pointer = (int64_t)pWStart;
2,644,663✔
476
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
2,644,663✔
477
  int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64, pWStart->functionName, pointer);
2,644,663✔
478
  (void)taosHashBinary(name, len, sizeof(name));
479
  tstrncpy(pWStart->node.aliasName, name, TSDB_COL_NAME_LEN);
2,644,663✔
480
  pWStart->node.resType.precision = precision;
2,644,663✔
481

482
  code = fmGetFuncInfo(pWStart, NULL, 0);
2,644,663✔
483
  if (TSDB_CODE_SUCCESS == code) {
2,643,811✔
484
    code = nodesListMakeStrictAppend(pFuncs, (SNode*)pWStart);
2,643,811✔
485
  }
486
  if (TSDB_CODE_SUCCESS == code) {
2,644,756✔
487
    *pIndex = index;
2,644,756✔
488
  } else {
NEW
489
    nodesDestroyNode((SNode*)pWStart);
×
490
  }
491
  return code;
2,644,756✔
492
}
493

494
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
×
495
  int32_t index = 0;
×
496
  SNode*  pFunc = NULL;
×
497
  FOREACH(pFunc, pWin->pFuncs) {
×
498
    if (FUNCTION_TYPE_WEND == ((SFunctionNode*)pFunc)->funcType) {
×
499
      *pIndex = index;
×
500
      return TSDB_CODE_SUCCESS;
×
501
    }
502
    ++index;
×
503
  }
504

505
  SFunctionNode* pWEnd = NULL;
×
506
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pWEnd);
×
507
  if (NULL == pWEnd) {
×
508
    return code;
×
509
  }
510
  tstrncpy(pWEnd->functionName, "_wend", TSDB_FUNC_NAME_LEN);
×
511
  int64_t pointer = (int64_t)pWEnd;
×
512
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
×
513
  int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64, pWEnd->functionName, pointer);
×
514
  (void)taosHashBinary(name, len, sizeof(name));
515
  tstrncpy(pWEnd->node.aliasName, name, TSDB_COL_NAME_LEN);
×
516

517
  code = fmGetFuncInfo(pWEnd, NULL, 0);
×
518
  if (TSDB_CODE_SUCCESS == code) {
×
519
    code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
×
520
  }
521
  *pIndex = index;
×
522
  if (TSDB_CODE_SUCCESS == code) {
×
523
    code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
×
524
  }
525
  return code;
×
526
}
527

528
static int32_t stbSplAppendPlaceHolder(SNodeList* pFuncs, int32_t* pIndex, uint8_t precision, ENodeType winType) {
20,342✔
529
  int32_t index = 0;
20,342✔
530
  SNode*  pFunc = NULL;
20,342✔
531
  FOREACH(pFunc, pFuncs) {
44,612✔
532
    if (FUNCTION_TYPE_TWSTART == ((SFunctionNode*)pFunc)->funcType ||
36,992✔
533
        FUNCTION_TYPE_TPREV_TS == ((SFunctionNode*)pFunc)->funcType ||
24,270✔
534
        FUNCTION_TYPE_TPREV_LOCALTIME == ((SFunctionNode*)pFunc)->funcType ||
24,270✔
535
        FUNCTION_TYPE_TIDLESTART == ((SFunctionNode*)pFunc)->funcType) {
24,270✔
536
      *pIndex = index;
12,722✔
537
      return TSDB_CODE_SUCCESS;
12,722✔
538
    }
539
    ++index;
24,270✔
540
  }
541

542
  int32_t        code = TSDB_CODE_SUCCESS;
7,620✔
543
  bool           needFreeExtra = false;
7,620✔
544
  SNode*         extraValue = NULL;
7,620✔
545
  SFunctionNode* pPlaceHolder = NULL;
7,620✔
546

547
  PLAN_ERR_JRET(nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pPlaceHolder));
7,620✔
548

549
  switch(winType) {
7,620✔
550
    case QUERY_NODE_SLIDING_WINDOW:
208✔
551
      tstrncpy(pPlaceHolder->functionName, "_tprev_ts", TSDB_FUNC_NAME_LEN);
208✔
552
      break;
208✔
553
    case QUERY_NODE_INTERVAL_WINDOW:
7,412✔
554
    case QUERY_NODE_STATE_WINDOW:
555
    case QUERY_NODE_EVENT_WINDOW:
556
    case QUERY_NODE_SESSION_WINDOW:
557
    case QUERY_NODE_COUNT_WINDOW:
558
      tstrncpy(pPlaceHolder->functionName, "_twstart", TSDB_FUNC_NAME_LEN);
7,412✔
559
      break;
7,412✔
560
    case QUERY_NODE_PERIOD_WINDOW:
×
561
      tstrncpy(pPlaceHolder->functionName, "_tprev_localtime", TSDB_FUNC_NAME_LEN);
×
562
      break;
×
563
    default:
×
564
      break;
×
565
  }
566

567
  int64_t pointer = (int64_t)pPlaceHolder;
7,620✔
568
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
7,620✔
569
  int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64, pPlaceHolder->functionName, pointer);
7,620✔
570
  (void)taosHashBinary(name, len, sizeof(name));
571
  tstrncpy(pPlaceHolder->node.aliasName, name, TSDB_COL_NAME_LEN);
7,620✔
572
  pPlaceHolder->node.resType.precision = precision;
7,620✔
573

574
  PLAN_ERR_JRET(fmGetFuncInfo(pPlaceHolder, NULL, 0));
7,620✔
575
  PLAN_ERR_RET(nodesMakeValueNodeFromTimestamp(0, &extraValue));
7,620✔
576
  needFreeExtra = true;
7,620✔
577
  ((SValueNode*)extraValue)->notReserved = true;
7,620✔
578
  PLAN_ERR_JRET(nodesListMakePushFront(&pPlaceHolder->pParameterList, extraValue));
7,620✔
579
  needFreeExtra = false;
7,620✔
580
  PLAN_ERR_JRET(nodesListStrictAppend(pFuncs, (SNode*)pPlaceHolder));
7,620✔
581
  *pIndex = index;
7,620✔
582
  return code;
7,620✔
583
_return:
×
584
  nodesDestroyNode((SNode*)pPlaceHolder);
×
585
  if (needFreeExtra) {
×
586
    nodesDestroyNode(extraValue);
×
587
  }
588
  return code;
×
589
}
590

591
static int32_t stbSplCreatePartWindowNode(SSplitContext* pCxt, SWindowLogicNode* pMergeWindow,
3,707,785✔
592
                                          SLogicNode** pPartWindow, SNodeList** pMergeKeys) {
593
  int32_t    code = TSDB_CODE_SUCCESS;
3,707,785✔
594
  SNodeList* pFunc = pMergeWindow->pFuncs;
3,707,785✔
595
  pMergeWindow->pFuncs = NULL;
3,707,785✔
596
  SNodeList* pTargets = pMergeWindow->node.pTargets;
3,707,785✔
597
  pMergeWindow->node.pTargets = NULL;
3,707,785✔
598
  SNodeList* pChildren = pMergeWindow->node.pChildren;
3,707,785✔
599
  pMergeWindow->node.pChildren = NULL;
3,707,785✔
600
  SNode* pConditions = pMergeWindow->node.pConditions;
3,707,785✔
601
  pMergeWindow->node.pConditions = NULL;
3,707,785✔
602

603
  SWindowLogicNode* pPartWin = NULL;
3,707,785✔
604
  PLAN_ERR_JRET(nodesCloneNode((SNode*)pMergeWindow, (SNode**)&pPartWin));
3,707,785✔
605

606
  pPartWin->node.groupAction = GROUP_ACTION_KEEP;
3,709,003✔
607
  pMergeWindow->node.pTargets = pTargets;
3,709,003✔
608
  pMergeWindow->node.pConditions = pConditions;
3,709,003✔
609
  pPartWin->node.pChildren = pChildren;
3,709,003✔
610
  splSetParent((SLogicNode*)pPartWin);
3,709,003✔
611

612
  int32_t index = -1;
3,708,775✔
613
  int32_t indexExt = -1;
3,708,775✔
614
  const SColumnNode* pMergeTspk = (const SColumnNode*)pMergeWindow->pTspk;
3,708,775✔
615
  PLAN_ERR_JRET(stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, NULL, &pMergeWindow->pFuncs));
3,708,775✔
616
  if (inStreamCalcClause(pCxt->pPlanCxt)) {
3,709,489✔
617
    /**
618
      For stream calc query, we need the _twstart or _tprev_ts placeholder
619
      in the part window to merge part results together.
620
    */
621
    PLAN_ERR_JRET(stbSplAppendPlaceHolder(pPartWin->pFuncs, &indexExt,
20,342✔
622
                                          pMergeTspk->node.resType.precision,
623
                                          pCxt->pPlanCxt->streamCxt.triggerWinType));
624
  }
625
  if (pMergeWindow->winType == WINDOW_TYPE_EXTERNAL && !inStreamCalcClause(pCxt->pPlanCxt)) {
3,709,717✔
626
    /**
627
      For external window query, we still need an explicit _wstart placeholder
628
      on the partial window output so merged external-window aggregation can
629
      bind pTspk to window-start, instead of accidentally using the first
630
      aggregate output column (e.g. count/sum).
631
    */
632
    PLAN_ERR_JRET(stbSplAppendWStart(&pPartWin->pFuncs, &index,
44,695✔
633
                                     pMergeTspk->node.resType.precision));
634
  } else if (!pCxt->pPlanCxt->streamCxt.hasExtWindow) {
3,665,022✔
635
    /**
636
      If the query is not an external window query, we need the _wstart
637
      placeholder for the merged INTERVAL window to do aggregation.
638
    */
639
    PLAN_ERR_JRET(stbSplAppendWStart(&pPartWin->pFuncs, &index,
3,649,078✔
640
                                     pMergeTspk->node.resType.precision));
641
  }
642
  if (index < 0 && indexExt < 0) {
3,709,948✔
643
    planError("%s failed since no pkts placeholder set", __FUNCTION__);
×
644
    code = TSDB_CODE_INTERNAL_ERROR;
×
645
    PLAN_ERR_JRET(code);
×
646
  }
647

648
  PLAN_ERR_JRET(createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets));
3,709,948✔
649
  nodesDestroyNode(pMergeWindow->pTspk);
3,709,327✔
650
  pMergeWindow->pTspk = NULL;
3,709,624✔
651
  if (NULL != pMergeKeys) {
3,709,624✔
652
    /**
653
      Both _twstart and _wstart placeholders should be used as merge keys
654
      for INTERVAL window.
655
    */
656
    if (indexExt >= 0) {
3,644,385✔
657
      PLAN_ERR_JRET(stbSplCreateMergeKeysByExpr(nodesListGetNode(pPartWin->node.pTargets, indexExt),
20,342✔
658
                                                pMergeWindow->node.outputTsOrder, pMergeKeys));
659
    }
660
    if (index >= 0) {
3,644,385✔
661
      PLAN_ERR_JRET(stbSplCreateMergeKeysByExpr(nodesListGetNode(pPartWin->node.pTargets, index),
3,628,882✔
662
                                                pMergeWindow->node.outputTsOrder, pMergeKeys));
663
    }
664
  }
665

666
  int32_t indexPkts = index >= 0 ? index: indexExt;
3,709,465✔
667
  PLAN_ERR_JRET(nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, indexPkts),
3,709,465✔
668
                               &pMergeWindow->pTspk));
669

670
  nodesDestroyList(pFunc);
3,709,786✔
671
  *pPartWindow = (SLogicNode*)pPartWin;
3,709,855✔
672

673
  return code;
3,709,855✔
674
_return:
×
675
  nodesDestroyNode((SNode*)pPartWin);
×
676
  return code;
×
677
}
678

679
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
50,921,540✔
680
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
50,921,540✔
681
    return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
34,933,703✔
682
  } else {
683
    if (1 == LIST_LENGTH(pNode->pChildren)) {
15,987,837✔
684
      return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
15,994,817✔
685
    }
686
  }
687
  return 0;
×
688
}
689

690
static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* pNode) {
34,933,556✔
691
  int32_t code = TSDB_CODE_SUCCESS;
34,933,556✔
692
  pMerge->node.inputTsOrder = pNode->outputTsOrder;
34,933,556✔
693
  pMerge->node.outputTsOrder = pNode->outputTsOrder;
34,933,556✔
694

695
  switch (nodeType(pNode)) {
34,933,556✔
696
    case QUERY_NODE_LOGIC_PLAN_PROJECT: {
844,072✔
697
      SProjectLogicNode *pLogicNode = (SProjectLogicNode*)pNode;
844,072✔
698
      if (pLogicNode->ignoreGroupId && (pMerge->node.pLimit || pMerge->node.pSlimit)) {
844,072✔
699
        pMerge->ignoreGroupId = true;
437✔
700
        pLogicNode->ignoreGroupId = false;
437✔
701
      }
702
      break;
844,072✔
703
    }
704
    case QUERY_NODE_LOGIC_PLAN_WINDOW: {
3,644,316✔
705
      SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
3,644,316✔
706
      if (pMerge->node.pLimit) {
3,644,316✔
707
        nodesDestroyNode(pMerge->node.pLimit);
218,486✔
708
        pMerge->node.pLimit = NULL;
218,486✔
709
      }
710
      if (pMerge->node.pSlimit) {
3,644,316✔
711
        nodesDestroyNode(pMerge->node.pSlimit);
×
712
        pMerge->node.pSlimit = NULL;
×
713
      }
714
      break;
3,644,316✔
715
    }
716
    case QUERY_NODE_LOGIC_PLAN_SORT: {
8,172,820✔
717
      SSortLogicNode* pSort = (SSortLogicNode*)pNode;
8,172,820✔
718
      if (pSort->calcGroupId) pMerge->inputWithGroupId = true;
8,172,820✔
719
      break;
8,172,820✔
720
    }
721
    default:
22,272,348✔
722
      break;
22,272,348✔
723
  }
724

725
  return code;
34,933,556✔
726
}
727

728
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
34,930,351✔
729
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort, bool needSort) {
730
  SMergeLogicNode* pMerge = NULL;
34,930,351✔
731
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE, (SNode**)&pMerge);
34,930,351✔
732
  if (NULL == pMerge) {
34,935,284✔
733
    return code;
×
734
  }
735
  pMerge->needSort = needSort;
34,935,284✔
736
  pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
34,935,284✔
737
  pMerge->srcGroupId = pCxt->groupId;
34,933,706✔
738
  pMerge->srcEndGroupId = pCxt->groupId;
34,933,706✔
739
  pMerge->node.precision = pPartChild->precision;
34,933,706✔
740
  pMerge->node.dynamicOp = pSplitNode->dynamicOp;
34,933,706✔
741
  if (!pMerge->node.dynamicOp && NULL != pSplitNode->pParent) {
34,933,706✔
742
    pMerge->node.dynamicOp = pSplitNode->pParent->dynamicOp;
28,586,443✔
743
  }
744
  pMerge->pMergeKeys = pMergeKeys;
34,933,706✔
745
  pMerge->groupSort = groupSort;
34,933,706✔
746
  pMerge->numOfSubplans = 1;
34,933,706✔
747

748
  pMerge->pInputs = NULL;
34,933,706✔
749
  code = nodesCloneList(pPartChild->pTargets, &pMerge->pInputs);
34,933,706✔
750
  if (TSDB_CODE_SUCCESS == code) {
34,935,884✔
751
    // NULL != pSubplan means 'merge node' replaces 'split node'.
752
    if (NULL == pSubplan) {
34,935,815✔
753
      code = nodesCloneList(pPartChild->pTargets, &pMerge->node.pTargets);
3,746,803✔
754
    } else {
755
      code = nodesCloneList(pSplitNode->pTargets, &pMerge->node.pTargets);
31,189,012✔
756
    }
757
  }
758
  if (TSDB_CODE_SUCCESS == code && NULL != pSplitNode->pLimit) {
34,936,250✔
759
    pMerge->node.pLimit = NULL;
3,376,168✔
760
    code = nodesCloneNode(pSplitNode->pLimit, &pMerge->node.pLimit);
3,376,168✔
761
    if (((SLimitNode*)pSplitNode->pLimit)->limit && ((SLimitNode*)pSplitNode->pLimit)->offset) {
3,376,168✔
762
      ((SLimitNode*)pSplitNode->pLimit)->limit->datum.i += ((SLimitNode*)pSplitNode->pLimit)->offset->datum.i;
1,141,187✔
763
    }
764
    if (((SLimitNode*)pSplitNode->pLimit)->offset) {
3,376,168✔
765
      ((SLimitNode*)pSplitNode->pLimit)->offset->datum.i = 0;
1,141,187✔
766
    }
767
  }
768
  if (TSDB_CODE_SUCCESS == code) {
34,936,250✔
769
    code = stbSplRewriteFromMergeNode(pMerge, pSplitNode);
34,934,108✔
770
  }
771
  if (TSDB_CODE_SUCCESS == code) {
34,936,274✔
772
    if (NULL == pSubplan) {
34,935,146✔
773
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
3,746,158✔
774
    } else {
775
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
31,188,988✔
776
    }
777
  }
778
  if (TSDB_CODE_SUCCESS != code) {
34,936,325✔
779
    nodesDestroyNode((SNode*)pMerge);
×
780
  }
781
  return code;
34,935,194✔
782
}
783

784
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
20,753,777✔
785
  SExchangeLogicNode* pExchange = NULL;
20,753,777✔
786
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
20,753,777✔
787
  if (TSDB_CODE_SUCCESS == code) {
20,756,197✔
788
    pExchange->node.pParent = pParent;
20,756,815✔
789
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
20,756,815✔
790
  }
791
  return code;
20,758,748✔
792
}
793

794
static int32_t stbSplCreateMergeKeysByExpr(SNode* pExpr, EOrder order, SNodeList** pMergeKeys) {
25,850,278✔
795
  SOrderByExprNode* pOrderByExpr = NULL;
25,850,278✔
796
  int32_t code = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExpr);
25,850,278✔
797
  if (NULL == pOrderByExpr) {
25,852,252✔
798
    return code;
×
799
  }
800
  pOrderByExpr->pExpr = NULL;
25,852,252✔
801
  code = nodesCloneNode(pExpr, &pOrderByExpr->pExpr);
25,852,252✔
802
  if (NULL == pOrderByExpr->pExpr) {
25,852,875✔
UNCOV
803
    nodesDestroyNode((SNode*)pOrderByExpr);
×
804
    return code;
×
805
  }
806
  pOrderByExpr->order = order;
25,852,944✔
807
  pOrderByExpr->nullOrder = (order == ORDER_ASC) ? NULL_ORDER_FIRST : NULL_ORDER_LAST;
25,852,944✔
808
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pOrderByExpr);
25,852,944✔
809
}
810

811
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
21,513,696✔
812
  return stbSplCreateMergeKeysByExpr(pPrimaryKey, order, pMergeKeys);
21,513,696✔
813
}
814

815
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
3,642,774✔
816
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pInfo->pSplitNode;
3,642,774✔
817
  if (pWindow->winType == WINDOW_TYPE_EXTERNAL) {
3,642,774✔
818
    if (!pWindow->pFuncs) {
59,604✔
819
      // only have projection in external window.
820
      return TSDB_CODE_SUCCESS;
×
821
    }
822
  }
823
  SLogicNode* pPartWindow = NULL;
3,642,774✔
824
  SNodeList*  pMergeKeys = NULL;
3,642,774✔
825
  int32_t     code = stbSplCreatePartWindowNode(pCxt, pWindow,
3,642,774✔
826
                                                &pPartWindow, &pMergeKeys);
827
  if (TSDB_CODE_SUCCESS == code) {
3,644,616✔
828
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = ((SWindowLogicNode*)pInfo->pSplitNode)->winType == WINDOW_TYPE_INTERVAL ? INTERVAL_ALGO_HASH : EXTERNAL_ALGO_HASH;
3,644,685✔
829
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = ((SWindowLogicNode*)pInfo->pSplitNode)->winType == WINDOW_TYPE_INTERVAL ? INTERVAL_ALGO_MERGE : EXTERNAL_ALGO_MERGE;
3,644,685✔
830
    code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true, true);
3,644,685✔
831
    if (TSDB_CODE_SUCCESS != code) {
3,644,823✔
832
      nodesDestroyList(pMergeKeys);
×
833
    }
834
  }
835
  SLogicSubplan* pSplitSubPlan = NULL;
3,644,754✔
836
  if (TSDB_CODE_SUCCESS == code) {
3,644,754✔
837
    pSplitSubPlan = splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT);
3,644,181✔
838
    if (!pSplitSubPlan) code = terrno;
3,644,523✔
839
  }
840
  if (code == TSDB_CODE_SUCCESS) {
3,645,096✔
841
    SNode* pNode;
842
    SMergeLogicNode* pMerge = (SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode;
3,644,661✔
843
    SWindowLogicNode* pWindow = (SWindowLogicNode*)pInfo->pSplitNode;
3,644,661✔
844
    if (LIST_LENGTH(pWindow->pTsmaSubplans) > 0) {
3,644,661✔
845
      FOREACH(pNode, pWindow->pTsmaSubplans) {
123,690✔
846
        ++(pCxt->groupId);
65,170✔
847
        SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
65,170✔
848
        pSubplan->id.groupId = pCxt->groupId;
65,170✔
849
        pSubplan->id.queryId = pCxt->queryId;
65,170✔
850
        //pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
851
        splSetSubplanVgroups(pSubplan, pSubplan->pNode);
65,170✔
852
        code = stbSplCreatePartWindowNode(pCxt, (SWindowLogicNode*)pSubplan->pNode, &pPartWindow, NULL);
65,170✔
853
        if (TSDB_CODE_SUCCESS == code) {
65,170✔
854
          nodesDestroyNode((SNode*)pSubplan->pNode);
65,170✔
855
          pSubplan->pNode = pPartWindow;
65,170✔
856
        }
857
      }
858
      code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pWindow->pTsmaSubplans);
58,520✔
859
      pMerge->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
58,520✔
860
    }
861
    pMerge->srcEndGroupId = pCxt->groupId;
3,644,661✔
862
  }
863
  if (code == TSDB_CODE_SUCCESS) {
3,645,096✔
864
    code = nodesListMakePushFront(&pInfo->pSubplan->pChildren, (SNode*)pSplitSubPlan);
3,643,581✔
865
  }
866
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
3,644,685✔
867
  ++(pCxt->groupId);
3,644,685✔
868
  return code;
3,644,685✔
869
}
870

871
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
3,467,474✔
872
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
3,467,474✔
873
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
3,457,876✔
874
    pScan->scanType = SCAN_TYPE_TABLE_MERGE;
3,457,876✔
875
    pScan->filesetDelimited = true;
3,457,876✔
876
    if (NULL != pScan->pGroupTags) {
3,457,876✔
877
      pScan->groupSort = true;
18,001✔
878
    }
879
  } else {
880
    if (1 == LIST_LENGTH(pNode->pChildren)) {
9,598✔
881
      stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
9,346✔
882
    }
883
  }
884
}
3,467,474✔
885

886
static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
3,456,682✔
887
  SLogicNode* pWindow = pInfo->pSplitNode;
3,456,682✔
888
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
3,456,682✔
889

890
  SNodeList* pMergeKeys = NULL;
3,458,011✔
891
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk,
3,458,011✔
892
                                                      ((SWindowLogicNode*)pWindow)->node.inputTsOrder, &pMergeKeys);
893

894
  if (TSDB_CODE_SUCCESS == code) {
3,459,303✔
895
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true, true);
3,459,234✔
896
  }
897

898
  if (TSDB_CODE_SUCCESS == code) {
3,458,149✔
899
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
3,458,884✔
900
                                     (SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
3,458,494✔
901
  }
902

903
  if (TSDB_CODE_SUCCESS == code) {
3,459,322✔
904
    stbSplSetTableMergeScan(pChild);
3,459,322✔
905
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
3,458,311✔
906
    //SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
907
    ++(pCxt->groupId);
3,458,311✔
908
  } else {
909
    nodesDestroyList(pMergeKeys);
×
910
  }
911

912
  return code;
3,458,242✔
913
}
914

915
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
7,056,162✔
916
  SWindowLogicNode* pWin = (SWindowLogicNode*)pInfo->pSplitNode;
7,056,162✔
917
  SNode*            pChild = nodesListGetNode(pWin->node.pChildren, 0);
7,056,162✔
918

919
  if (pChild && nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_WINDOW &&
7,059,033✔
NEW
920
      ((SWindowLogicNode*)pChild)->winType == WINDOW_TYPE_EXTERNAL) {
×
NEW
921
    ((SWindowLogicNode*)pChild)->needGroupSort = true;
×
922
  }
923

924
  if (pWin->winType == WINDOW_TYPE_EXTERNAL) {
7,059,033✔
925
    pWin->extWinSplit = true;
59,604✔
926
    pWin->needGroupSort = pWin->calcWithPartition;
59,604✔
927
  }
928

929
  switch (pWin->winType) {
7,059,033✔
930
    case WINDOW_TYPE_INTERVAL:
3,600,605✔
931
    case WINDOW_TYPE_EXTERNAL:
932
      return stbSplSplitIntervalForBatch(pCxt, pInfo);
3,600,605✔
933
    case WINDOW_TYPE_SESSION:
3,458,611✔
934
    case WINDOW_TYPE_STATE:
935
    case WINDOW_TYPE_EVENT:
936
    case WINDOW_TYPE_COUNT:
937
    case WINDOW_TYPE_ANOMALY:
938
      return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
3,458,611✔
939
    default:
24✔
940
      break;
24✔
941
  }
942
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
24✔
943
}
944

945
static bool stbSplNeedSeqRecvData(SLogicNode* pNode) {
3,500,886✔
946
  if (NULL == pNode) {
3,500,886✔
947
    return false;
1,080,406✔
948
  }
949

950
  if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
2,420,480✔
951
    return true;
112,025✔
952
  }
953
  return stbSplNeedSeqRecvData(pNode->pParent);
2,308,617✔
954
}
955

956
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1,192,245✔
957
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_FILL == nodeType(pInfo->pSplitNode->pParent)) {
1,192,245✔
958
    pInfo->pSplitNode = pInfo->pSplitNode->pParent;
50,906✔
959
  }
960
  SExchangeLogicNode* pExchange = NULL;
1,192,245✔
961
  int32_t             code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
1,192,245✔
962
  if (TSDB_CODE_SUCCESS == code && pExchange) {
1,192,431✔
963
    code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
1,192,338✔
964
  }
965
  if (TSDB_CODE_SUCCESS == code && pExchange) {
1,192,431✔
966
    pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange);
1,192,431✔
967
    pExchange->dynTbname = false;
1,192,431✔
968
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1,192,431✔
969
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
1,192,431✔
970
  }
971
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1,192,431✔
972
  ++(pCxt->groupId);
1,192,431✔
973
  return code;
1,192,431✔
974
}
975

976
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
8,248,269✔
977
  if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) &&
8,248,269✔
978
      (LIST_LENGTH(((SWindowLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) == 0)) {
1,197,831✔
979
    return stbSplSplitWindowForPartTable(pCxt, pInfo);
1,192,245✔
980
  } else {
981
    return stbSplSplitWindowForCrossTable(pCxt, pInfo);
7,057,382✔
982
  }
983
}
984

985
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
20,963,835✔
986
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
20,963,835✔
987
  pMergeAgg->pAggFuncs = NULL;
20,963,880✔
988
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
20,963,303✔
989
  pMergeAgg->pGroupKeys = NULL;
20,962,726✔
990
  SNodeList* pTargets = pMergeAgg->node.pTargets;
20,963,835✔
991
  pMergeAgg->node.pTargets = NULL;
20,963,835✔
992
  SNodeList* pChildren = pMergeAgg->node.pChildren;
20,963,303✔
993
  pMergeAgg->node.pChildren = NULL;
20,963,258✔
994
  SNode* pConditions = pMergeAgg->node.pConditions;
20,963,835✔
995
  pMergeAgg->node.pConditions = NULL;
20,963,790✔
996

997
  SAggLogicNode* pPartAgg = NULL;
20,963,745✔
998
  int32_t        code = TSDB_CODE_SUCCESS;
20,963,745✔
999
  int32_t        lino = 0;
20,963,745✔
1000

1001
  PLAN_ERR_JRET(nodesCloneNode((SNode*)pMergeAgg, (SNode**)&pPartAgg));
20,963,745✔
1002

1003
  pPartAgg->node.groupAction = GROUP_ACTION_KEEP;
20,966,093✔
1004

1005
  if (NULL != pGroupKeys) {
20,966,580✔
1006
    pPartAgg->pGroupKeys = pGroupKeys;
8,812,878✔
1007
    PLAN_ERR_JRET(createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets));
8,812,346✔
1008
    pMergeAgg->pGroupKeys = NULL;
8,814,690✔
1009
    PLAN_ERR_JRET(nodesCloneList(pPartAgg->node.pTargets, &pMergeAgg->pGroupKeys));
8,814,690✔
1010
  }
1011

1012
  pMergeAgg->node.pConditions = pConditions;
20,966,408✔
1013
  pMergeAgg->node.pTargets = pTargets;
20,966,985✔
1014
  pPartAgg->node.pChildren = pChildren;
20,966,453✔
1015
  splSetParent((SLogicNode*)pPartAgg);
20,966,940✔
1016

1017
  PLAN_ERR_JRET(stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, NULL, &pMergeAgg->pAggFuncs));
20,967,512✔
1018

1019
  PLAN_ERR_JRET(createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets));
20,968,871✔
1020

1021
  nodesDestroyList(pFunc);
20,969,881✔
1022

1023
  *pOutput = (SLogicNode*)pPartAgg;
20,968,435✔
1024

1025
  return code;
20,968,967✔
1026
_return:
×
1027
  if (code) {
×
1028
    planError("%s failed at line %d, code: %d", __func__, lino, code);
×
1029
    nodesDestroyNode((SNode*)pPartAgg);
×
1030
  }
1031
  nodesDestroyList(pFunc);
×
1032
  return code;
×
1033
}
1034

1035
static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
12,767,719✔
1036
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE, false);
12,767,719✔
1037
  if (TSDB_CODE_SUCCESS == code) {
12,767,719✔
1038
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
12,767,719✔
1039
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
12,767,719✔
1040
  }
1041
  ++(pCxt->groupId);
12,767,719✔
1042
  return code;
12,767,719✔
1043
}
1044

1045

1046
/**
1047
 * @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes.
1048
 *        For agg + partition, results are sorted by group id, use group sort.
1049
 *        For agg + sort for group, results are sorted by partition keys, not group id, merges keys should be the same
1050
 *            as partition keys
1051
 */
1052
static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* pInfo, SLogicNode* pChildAgg) {
15,958✔
1053
  bool       groupSort = true;
15,958✔
1054
  SNodeList* pMergeKeys = NULL;
15,958✔
1055
  int32_t    code = TSDB_CODE_SUCCESS;
15,958✔
1056
  bool       sortForGroup = false;
15,958✔
1057

1058
  if (pChildAgg->pChildren->length != 1) return TSDB_CODE_TSC_INTERNAL_ERROR;
15,958✔
1059

1060
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pChildAgg->pChildren, 0);
15,958✔
1061
  if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_SORT) {
15,958✔
1062
    SSortLogicNode* pSort = (SSortLogicNode*)pChild;
×
1063
    if (pSort->calcGroupId) {
×
1064
      SNode *node, *node2;
1065
      groupSort = false;
×
1066
      sortForGroup = true;
×
1067
      SNodeList* extraAggFuncs = NULL;
×
1068
      uint32_t   originalLen = LIST_LENGTH(pSort->node.pTargets), idx = 0;
×
1069
      code = stbSplCreateMergeKeys(pSort->pSortKeys, pSort->node.pTargets, &pMergeKeys);
×
1070
      if (TSDB_CODE_SUCCESS != code) return code;
×
1071

1072
      // Create group_key func for all sort keys.
1073
      // We only need newly added nodes in pSort.node.pTargets when stbSplCreateMergeKeys
1074
      FOREACH(node, pSort->node.pTargets) {
×
1075
        if (idx++ < originalLen) continue;
×
1076
        SFunctionNode* pGroupKeyFunc = createGroupKeyAggFunc((SColumnNode*)node);
×
1077
        if (!pGroupKeyFunc) {
×
1078
          code = terrno;
×
1079
          break;
×
1080
        }
1081
        code = nodesListMakeStrictAppend(&extraAggFuncs, (SNode*)pGroupKeyFunc);
×
1082
        if (code != TSDB_CODE_SUCCESS) {
×
1083
          nodesDestroyNode((SNode*)pGroupKeyFunc);
×
1084
        }
1085
      }
1086

1087
      if (TSDB_CODE_SUCCESS == code) {
×
1088
        // add these extra group_key funcs into targets
1089
        code = createColumnByRewriteExprs(extraAggFuncs, &pChildAgg->pTargets);
×
1090
      }
1091
      if (code == TSDB_CODE_SUCCESS) {
×
1092
        code = nodesListAppendList(((SAggLogicNode*)pChildAgg)->pAggFuncs, extraAggFuncs);
×
1093
        extraAggFuncs = NULL;
×
1094
      }
1095

1096
      if (code == TSDB_CODE_SUCCESS) {
×
1097
        FOREACH(node, pMergeKeys) {
×
1098
          SOrderByExprNode* pOrder = (SOrderByExprNode*)node;
×
1099
          SColumnNode*      pCol = (SColumnNode*)pOrder->pExpr;
×
1100
          FOREACH(node2, ((SAggLogicNode*)pChildAgg)->pAggFuncs) {
×
1101
            SFunctionNode* pFunc = (SFunctionNode*)node2;
×
1102
            if (0 != strcmp(pFunc->functionName, "_group_key")) continue;
×
1103
            SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1104
            if (!nodesEqualNode(pParam, (SNode*)pCol)) continue;
×
1105

1106
            // use the colName of group_key func to make sure finding the right slot id for merge keys.
1107
            tstrncpy(pCol->colName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
×
1108
            tstrncpy(pCol->node.aliasName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
×
1109
            memset(pCol->tableAlias, 0, TSDB_TABLE_NAME_LEN);
×
1110
            break;
×
1111
          }
1112
        }
1113
      }
1114
      if (TSDB_CODE_SUCCESS != code) {
×
1115
        nodesDestroyList(pMergeKeys);
×
1116
        nodesDestroyList(extraAggFuncs);
×
1117
      }
1118
    }
1119
  }
1120
  if (TSDB_CODE_SUCCESS == code) {
15,958✔
1121
    code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort, true);
15,958✔
1122
  }
1123
  if (TSDB_CODE_SUCCESS == code && sortForGroup) {
15,958✔
1124
    SMergeLogicNode* pMerge =
1125
        (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
×
1126
    pMerge->inputWithGroupId = true;
×
1127
  }
1128
  return code;
15,958✔
1129
}
1130

1131
static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
86,184✔
1132
  SLogicNode*      pPartAgg = NULL;
86,184✔
1133
  bool             hasExchange = false;
86,184✔
1134
  SMergeLogicNode* pMergeNode = NULL;
86,184✔
1135
  SLogicSubplan*   pFirstScanSubplan = NULL;
86,184✔
1136
  int32_t          code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
86,184✔
1137

1138
  if (TSDB_CODE_SUCCESS == code) {
86,184✔
1139
    if (pInfo->pSplitNode->forceCreateNonBlockingOptr) {
86,184✔
1140
      code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
×
1141
    } else {
1142
      hasExchange = true;
86,184✔
1143
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, false, false);
86,184✔
1144
    }
1145
    pMergeNode =
1146
        (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
86,184✔
1147
  } else {
1148
    nodesDestroyNode((SNode*)pPartAgg);
×
1149
  }
1150

1151
  if (code == TSDB_CODE_SUCCESS) {
86,184✔
1152
    pFirstScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
86,184✔
1153
    if (!pFirstScanSubplan) code = terrno;
86,184✔
1154
  }
1155

1156
  if (code == TSDB_CODE_SUCCESS) {
86,184✔
1157
    SNode* pNode;
1158
    SAggLogicNode* pAgg = (SAggLogicNode*)pInfo->pSplitNode;
86,184✔
1159
    if (LIST_LENGTH(pAgg->pTsmaSubplans) > 0) {
86,184✔
1160
      FOREACH(pNode, pAgg->pTsmaSubplans) {
196,308✔
1161
        ++(pCxt->groupId);
110,124✔
1162
        SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
110,124✔
1163
        pSubplan->id.groupId = pCxt->groupId;
110,124✔
1164
        pSubplan->id.queryId = pCxt->queryId;
110,124✔
1165
        //pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
1166
        splSetSubplanVgroups(pSubplan, pSubplan->pNode);
110,124✔
1167
        code = stbSplCreatePartAggNode((SAggLogicNode*)pSubplan->pNode, &pPartAgg);
110,124✔
1168
        if (code) break;
110,124✔
1169
        nodesDestroyNode((SNode*)pSubplan->pNode);
110,124✔
1170
        pSubplan->pNode = pPartAgg;
110,124✔
1171
      }
1172
      code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pAgg->pTsmaSubplans);
86,184✔
1173
      pMergeNode->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
86,184✔
1174
    }
1175
    pMergeNode->srcEndGroupId = pCxt->groupId;
86,184✔
1176
  }
1177

1178
  if (code == TSDB_CODE_SUCCESS) {
86,184✔
1179
    code = nodesListMakeAppend(&pInfo->pSubplan->pChildren, (SNode*)pFirstScanSubplan);
86,184✔
1180
  }
1181

1182
  if (code && pFirstScanSubplan) {
86,184✔
1183
    nodesDestroyNode((SNode*)pFirstScanSubplan);
×
1184
  }
1185

1186
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
86,184✔
1187
  ++(pCxt->groupId);
86,184✔
1188
  return code;
86,184✔
1189
}
1190

1191
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
19,506,847✔
1192
  SLogicNode* pPartAgg = NULL;
19,506,847✔
1193
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
19,506,847✔
1194
  if (TSDB_CODE_SUCCESS == code) {
19,508,108✔
1195
    // if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
1196
    if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
19,509,101✔
1197
      code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
15,958✔
1198
    else {
1199
      code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
19,494,207✔
1200
    }
1201
  } else {
1202
    nodesDestroyNode((SNode*)pPartAgg);
×
1203
  }
1204

1205
  SLogicSubplan* pScanSubplan = NULL;
19,509,676✔
1206
  if (TSDB_CODE_SUCCESS == code) {
19,509,676✔
1207
    pScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
19,510,597✔
1208
    if (!pScanSubplan) code = terrno;
19,510,735✔
1209
  }
1210

1211
  if (code == TSDB_CODE_SUCCESS) {
19,509,814✔
1212
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)pScanSubplan);
19,510,828✔
1213
  }
1214

1215
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
19,509,210✔
1216
  ++(pCxt->groupId);
19,509,742✔
1217
  return code;
19,509,742✔
1218
}
1219

1220
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
32,359,391✔
1221
  if (LIST_LENGTH(((SAggLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) > 0) {
32,359,391✔
1222
    return stbSplSplitAggNodeForCrossTableMulSubplan(pCxt, pInfo);
86,184✔
1223
  }
1224
  if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
32,272,585✔
1225
    return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
12,767,719✔
1226
  }
1227
  return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
19,504,521✔
1228
}
1229

1230
static int32_t stbSplCreateColumnNode(SExprNode* pExpr, SNode** ppNode) {
1,805,354✔
1231
  SColumnNode* pCol = NULL;
1,805,354✔
1232
  int32_t code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
1,805,354✔
1233
  if (NULL == pCol) {
1,805,561✔
1234
    return code;
×
1235
  }
1236
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
1,805,561✔
1237
    tstrncpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName, TSDB_DB_NAME_LEN);
1,712,704✔
1238
    tstrncpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName, TSDB_TABLE_NAME_LEN);
1,712,704✔
1239
    tstrncpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias, TSDB_TABLE_NAME_LEN);
1,712,704✔
1240
    tstrncpy(pCol->colName, ((SColumnNode*)pExpr)->colName, TSDB_COL_NAME_LEN);
1,712,704✔
1241
  } else {
1242
    tstrncpy(pCol->colName, pExpr->aliasName, TSDB_COL_NAME_LEN);
92,857✔
1243
  }
1244
  tstrncpy(pCol->node.aliasName, pExpr->aliasName, TSDB_COL_NAME_LEN);
1,805,561✔
1245
  tstrncpy(pCol->node.userAlias, pExpr->userAlias, TSDB_COL_NAME_LEN);
1,805,561✔
1246
  pCol->node.resType = pExpr->resType;
1,805,561✔
1247
  *ppNode = (SNode*)pCol;
1,805,561✔
1248
  return code;
1,805,561✔
1249
}
1250

1251
static int32_t stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol, SNode** ppNode) {
10,078,890✔
1252
  SOrderByExprNode* pOutput = NULL;
10,078,890✔
1253
  int32_t code = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOutput);
10,078,890✔
1254
  if (NULL == pOutput) {
10,081,436✔
1255
    return code;
×
1256
  }
1257
  pOutput->pExpr = NULL;
10,081,436✔
1258
  code = nodesCloneNode(pCol, &pOutput->pExpr);
10,081,436✔
1259
  if (NULL == pOutput->pExpr) {
10,082,471✔
1260
    nodesDestroyNode((SNode*)pOutput);
×
1261
    return code;
×
1262
  }
1263
  pOutput->order = pSortKey->order;
10,082,540✔
1264
  pOutput->nullOrder = pSortKey->nullOrder;
10,082,540✔
1265
  *ppNode = (SNode*)pOutput;
10,082,540✔
1266
  return code;
10,082,540✔
1267
}
1268

1269
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
8,170,320✔
1270
  int32_t    code = TSDB_CODE_SUCCESS;
8,170,320✔
1271
  SNodeList* pMergeKeys = NULL;
8,170,320✔
1272
  SNode*     pNode = NULL;
8,170,320✔
1273
  FOREACH(pNode, pSortKeys) {
18,250,824✔
1274
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
10,078,827✔
1275
    SExprNode*        pSortExpr = (SExprNode*)pSortKey->pExpr;
10,078,827✔
1276
    SNode*            pTarget = NULL;
10,078,827✔
1277
    bool              found = false;
10,078,827✔
1278
    FOREACH(pTarget, pTargets) {
28,471,737✔
1279
      if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) || 
18,391,233✔
1280
          (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
10,116,798✔
1281
        SNode* pNew = NULL;
8,276,297✔
1282
        code = stbSplCreateOrderByExpr(pSortKey, pTarget, &pNew);
8,276,297✔
1283
        if (TSDB_CODE_SUCCESS == code) {
8,277,003✔
1284
          code = nodesListMakeStrictAppend(&pMergeKeys, pNew);
8,277,003✔
1285
        }
1286
        if (TSDB_CODE_SUCCESS != code) {
8,277,141✔
1287
          break;
×
1288
        }
1289
        found = true;
8,277,141✔
1290
      }
1291
    }
1292
    if (TSDB_CODE_SUCCESS == code && !found) {
10,080,504✔
1293
      SNode* pCol = NULL;
1,805,630✔
1294
      code = stbSplCreateColumnNode(pSortExpr, &pCol);
1,805,630✔
1295
      if (TSDB_CODE_SUCCESS == code) {
1,805,630✔
1296
        SNode* pNew = NULL;
1,805,630✔
1297
        code = stbSplCreateOrderByExpr(pSortKey, pCol, &pNew);
1,805,630✔
1298
        if (TSDB_CODE_SUCCESS == code) {
1,805,630✔
1299
          code = nodesListMakeStrictAppend(&pMergeKeys, pNew);
1,805,630✔
1300
        }
1301
      }
1302
      if (TSDB_CODE_SUCCESS == code) {
1,805,630✔
1303
        code = nodesListStrictAppend(pTargets, pCol);
1,805,630✔
1304
      } else {
1305
        nodesDestroyNode(pCol);
×
1306
      }
1307
    }
1308
    if (TSDB_CODE_SUCCESS != code) {
10,080,504✔
1309
      break;
×
1310
    }
1311
  }
1312
  if (TSDB_CODE_SUCCESS == code) {
8,171,997✔
1313
    *pOutput = pMergeKeys;
8,172,820✔
1314
  } else {
1315
    nodesDestroyList(pMergeKeys);
×
1316
  }
1317
  return code;
8,173,075✔
1318
}
1319

1320
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
8,169,285✔
1321
                                        SNodeList** pOutputMergeKeys) {
1322
  SNodeList* pSortKeys = pSort->pSortKeys;
8,169,285✔
1323
  pSort->pSortKeys = NULL;
8,169,285✔
1324
  SNodeList* pChildren = pSort->node.pChildren;
8,169,285✔
1325
  pSort->node.pChildren = NULL;
8,169,285✔
1326

1327
  int32_t         code = TSDB_CODE_SUCCESS;
8,169,285✔
1328
  SSortLogicNode* pPartSort = NULL;
8,169,285✔
1329
  code = nodesCloneNode((SNode*)pSort, (SNode**)&pPartSort);
8,169,285✔
1330

1331
  SNodeList* pMergeKeys = NULL;
8,171,987✔
1332
  if (TSDB_CODE_SUCCESS == code) {
8,171,987✔
1333
    pPartSort->node.pChildren = pChildren;
8,172,125✔
1334
    splSetParent((SLogicNode*)pPartSort);
8,172,125✔
1335
    pPartSort->pSortKeys = pSortKeys;
8,172,932✔
1336
    pPartSort->groupSort = pSort->groupSort;
8,172,932✔
1337
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
8,172,932✔
1338
  }
1339

1340
  if (TSDB_CODE_SUCCESS == code) {
8,173,553✔
1341
    *pOutputPartSort = (SLogicNode*)pPartSort;
8,173,553✔
1342
    *pOutputMergeKeys = pMergeKeys;
8,173,553✔
1343
  } else {
1344
    nodesDestroyNode((SNode*)pPartSort);
×
1345
    nodesDestroyList(pMergeKeys);
×
1346
  }
1347

1348
  return code;
8,173,765✔
1349
}
1350

1351
static void stbSplSetScanPartSort(SLogicNode* pNode) {
84,104✔
1352
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
84,104✔
1353
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
42,052✔
1354
    if (NULL != pScan->pGroupTags) {
42,052✔
1355
      pScan->groupSort = true;
42,052✔
1356
    }
1357
  } else {
1358
    if (1 == LIST_LENGTH(pNode->pChildren)) {
42,052✔
1359
      stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
42,052✔
1360
    }
1361
  }
1362
}
84,104✔
1363

1364
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
8,169,285✔
1365
  SLogicNode* pPartSort = NULL;
8,169,285✔
1366
  SNodeList*  pMergeKeys = NULL;
8,169,285✔
1367
  bool        groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
8,169,285✔
1368
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
8,169,285✔
1369
  if (TSDB_CODE_SUCCESS == code) {
8,173,696✔
1370
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort, true);
8,173,855✔
1371
  }
1372
  if (TSDB_CODE_SUCCESS == code) {
8,172,701✔
1373
    nodesDestroyNode((SNode*)pInfo->pSplitNode);
8,173,046✔
1374
    pInfo->pSplitNode = NULL;
8,173,208✔
1375
    if (groupSort) {
8,173,208✔
1376
      stbSplSetScanPartSort(pPartSort);
42,052✔
1377
    }
1378
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
8,171,739✔
1379
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
8,173,208✔
1380
  }
1381
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
8,173,805✔
1382
  ++(pCxt->groupId);
8,173,805✔
1383
  return code;
8,173,805✔
1384
}
1385

1386
static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) {
39,649,257✔
1387
  *pSplitNode = pInfo->pSplitNode;
39,649,257✔
1388
  if (NULL != pInfo->pSplitNode->pParent && 
39,649,257✔
1389
      QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
39,521,082✔
1390
      NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit && 
30,460,162✔
1391
      !((SProjectLogicNode*)pInfo->pSplitNode->pParent)->inputIgnoreGroup) {
30,455,638✔
1392
    *pSplitNode = pInfo->pSplitNode->pParent;
30,219,000✔
1393
    if (NULL != pInfo->pSplitNode->pLimit) {
30,219,000✔
1394
      (*pSplitNode)->pLimit = NULL;
1,572,969✔
1395
      int32_t code = nodesCloneNode(pInfo->pSplitNode->pLimit, &(*pSplitNode)->pLimit);
1,572,969✔
1396
      if (NULL == (*pSplitNode)->pLimit) {
1,572,969✔
1397
        return code;
×
1398
      }
1399
      if (((SLimitNode*)pInfo->pSplitNode->pLimit)->limit && ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset) {
1,572,969✔
1400
        ((SLimitNode*)pInfo->pSplitNode->pLimit)->limit->datum.i += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset->datum.i;
422,615✔
1401
      }
1402
      if (((SLimitNode*)pInfo->pSplitNode->pLimit)->offset) {
1,572,969✔
1403
        ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset->datum.i = 0;
422,615✔
1404
      }
1405
    }
1406
  }
1407
  return TSDB_CODE_SUCCESS;
39,649,257✔
1408
}
1409

1410
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
38,789,913✔
1411
  SLogicNode* pSplitNode = NULL;
38,789,913✔
1412
  int32_t     code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
38,789,913✔
1413
  if (TSDB_CODE_SUCCESS == code) {
38,797,495✔
1414
    code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, pInfo->pSubplan->subplanType, false);
38,797,447✔
1415
  }
1416
  if (TSDB_CODE_SUCCESS == code) {
38,794,984✔
1417
    splSetSubplanType(pInfo->pSubplan);
38,795,743✔
1418
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
38,797,720✔
1419
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
38,794,110✔
1420
  }
1421
  ++(pCxt->groupId);
38,798,962✔
1422
  return code;
38,798,962✔
1423
}
1424

1425
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
859,551✔
1426
  SLogicNode* pSplitNode = NULL;
859,551✔
1427
  int32_t     code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
859,551✔
1428
  if (TSDB_CODE_SUCCESS == code) {
859,848✔
1429
    bool needSort = true;
859,917✔
1430
    if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pSplitNode) && !pSplitNode->pLimit && !pSplitNode->pSlimit) {
859,917✔
1431
      needSort = !((SProjectLogicNode*)pSplitNode)->ignoreGroupId;
843,267✔
1432
    }
1433
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, needSort, needSort);
859,917✔
1434
  }
1435
  if (TSDB_CODE_SUCCESS == code) {
859,848✔
1436
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
859,710✔
1437
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
859,848✔
1438
  }
1439
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
859,917✔
1440
  ++(pCxt->groupId);
859,917✔
1441
  return code;
859,917✔
1442
}
1443

1444
static int32_t stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan, SNode** ppNode) {
18,056,762✔
1445
  bool   find = false;
18,056,762✔
1446
  SNode* pCol = NULL;
18,056,762✔
1447
  FOREACH(pCol, pScan->pScanCols) {
24,745,772✔
1448
    if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
24,746,234✔
1449
      find = true;
18,057,224✔
1450
      break;
18,057,224✔
1451
    }
1452
  }
1453
  if (!find) {
18,056,762✔
1454
    *ppNode = NULL;
×
1455
    return TSDB_CODE_SUCCESS;
×
1456
  }
1457
  SNode* pTarget = NULL;
18,056,762✔
1458
  FOREACH(pTarget, pScan->node.pTargets) {
24,745,772✔
1459
    if (nodesEqualNode(pTarget, pCol)) {
24,746,093✔
1460
      *ppNode = pCol;
18,057,314✔
1461
      return TSDB_CODE_SUCCESS;
18,057,314✔
1462
    }
1463
  }
1464
  SNode* pNew = NULL;
×
1465
  int32_t code = nodesCloneNode(pCol, &pNew);
×
1466
  if (TSDB_CODE_SUCCESS == code) {
×
1467
    code = nodesListStrictAppend(pScan->node.pTargets, pNew);
×
1468
  }
1469
  if (TSDB_CODE_SUCCESS == code) {
×
1470
    *ppNode = pCol;
×
1471
  }
1472
  return code;
×
1473
}
1474

1475
static int32_t stbSplFindPkFromScan(SScanLogicNode* pScan, SNode** ppNode) {
17,649,064✔
1476
  int32_t code = 0;
17,649,064✔
1477
  bool   find = false;
17,649,064✔
1478
  SNode* pCol = NULL;
17,649,064✔
1479
  FOREACH(pCol, pScan->pScanCols) {
56,130,051✔
1480
    if (((SColumnNode*)pCol)->isPk) {
39,168,048✔
1481
      find = true;
687,061✔
1482
      break;
687,061✔
1483
    }
1484
  }
1485
  if (!find) {
17,649,064✔
1486
    *ppNode = NULL;
16,962,327✔
1487
    return code;
16,962,327✔
1488
  }
1489
  SNode* pTarget = NULL;
687,061✔
1490
  FOREACH(pTarget, pScan->node.pTargets) {
1,181,928✔
1491
    if (nodesEqualNode(pTarget, pCol)) {
1,181,928✔
1492
      *ppNode = pCol;
687,061✔
1493
      return code;
687,061✔
1494
    }
1495
  }
1496
  SNode* pNew = NULL;
×
1497
  code = nodesCloneNode(pCol, &pNew);
×
1498
  if (TSDB_CODE_SUCCESS == code) {
×
1499
    code = nodesListStrictAppend(pScan->node.pTargets, pNew);
×
1500
  }
1501
  if (TSDB_CODE_SUCCESS == code) {
×
1502
    *ppNode = pCol;
×
1503
  }
1504
  return code;
×
1505
}
1506

1507
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
17,648,836✔
1508
                                         SNodeList** pOutputMergeKeys) {
1509
  SNodeList* pChildren = pScan->node.pChildren;
17,648,836✔
1510
  pScan->node.pChildren = NULL;
17,648,836✔
1511

1512
  int32_t         code = TSDB_CODE_SUCCESS;
17,648,836✔
1513
  SScanLogicNode* pMergeScan = NULL;
17,648,836✔
1514
  code = nodesCloneNode((SNode*)pScan, (SNode**)&pMergeScan);
17,648,836✔
1515

1516
  SNodeList* pMergeKeys = NULL;
17,649,160✔
1517
  if (TSDB_CODE_SUCCESS == code) {
17,649,160✔
1518
    pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
17,648,998✔
1519
    pMergeScan->filesetDelimited = true;
17,648,998✔
1520
    pMergeScan->node.pChildren = pChildren;
17,648,998✔
1521
    splSetParent((SLogicNode*)pMergeScan);
17,648,998✔
1522

1523
    SNode* pTs = NULL;
17,649,091✔
1524
    code = stbSplFindPrimaryKeyFromScan(pMergeScan, &pTs);
17,649,091✔
1525
    if (TSDB_CODE_SUCCESS == code) {
17,649,388✔
1526
      code = stbSplCreateMergeKeysByPrimaryKey(pTs, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
17,649,133✔
1527
    }
1528
    SNode* pPk = NULL;
17,649,619✔
1529
    if (TSDB_CODE_SUCCESS == code) {
17,649,619✔
1530
      code = stbSplFindPkFromScan(pMergeScan, &pPk);
17,649,457✔
1531
    }
1532
    if (TSDB_CODE_SUCCESS == code && NULL != pPk) {
17,649,619✔
1533
      code = stbSplCreateMergeKeysByExpr(pPk, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
687,061✔
1534
    }
1535
  }
1536

1537
  if (TSDB_CODE_SUCCESS == code) {
17,649,319✔
1538
    *pOutputMergeScan = (SLogicNode*)pMergeScan;
17,649,319✔
1539
    *pOutputMergeKeys = pMergeKeys;
17,649,319✔
1540
  } else {
1541
    nodesDestroyNode((SNode*)pMergeScan);
×
1542
    nodesDestroyList(pMergeKeys);
×
1543
  }
1544

1545
  return code;
17,649,319✔
1546
}
1547

1548
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan,
17,648,767✔
1549
                                        bool groupSort, SStableSplitInfo* pInfo) {
1550
  SLogicNode* pMergeScan = NULL;
17,648,767✔
1551
  SNodeList*  pMergeKeys = NULL;
17,648,767✔
1552
  int32_t     code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys);
17,648,767✔
1553
  if (TSDB_CODE_SUCCESS == code) {
17,649,319✔
1554
    if (NULL != pMergeScan->pLimit) {
17,649,319✔
1555
      if (((SLimitNode*)pMergeScan->pLimit)->limit && ((SLimitNode*)pMergeScan->pLimit)->offset) {
1,465,463✔
1556
        ((SLimitNode*)pMergeScan->pLimit)->limit->datum.i += ((SLimitNode*)pMergeScan->pLimit)->offset->datum.i;
691,838✔
1557
      }
1558
      if (((SLimitNode*)pMergeScan->pLimit)->offset) {
1,465,463✔
1559
        ((SLimitNode*)pMergeScan->pLimit)->offset->datum.i = 0;
691,838✔
1560
      }
1561
    }
1562
    code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort, true);
17,649,319✔
1563
  }
1564
  if (TSDB_CODE_SUCCESS == code) {
17,649,178✔
1565
    if ((void*)pInfo->pSplitNode == (void*)pScan) {
17,649,109✔
1566
      pInfo->pSplitNode = NULL;
7,587,230✔
1567
    }
1568
    nodesDestroyNode((SNode*)pScan);
17,649,109✔
1569
    code = nodesListMakeStrictAppend(&pSubplan->pChildren,
17,649,388✔
1570
                                     (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
17,649,250✔
1571
  }
1572
  ++(pCxt->groupId);
17,649,388✔
1573
  return code;
17,649,388✔
1574
}
1575

1576
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
47,237,864✔
1577
  SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
47,237,864✔
1578
  if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) {
47,237,864✔
1579
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
7,587,140✔
1580
    return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan, true, pInfo);
7,587,140✔
1581
  }
1582
  if (NULL != pScan->pGroupTags) {
39,650,724✔
1583
    return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
859,758✔
1584
  }
1585
  return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
38,790,966✔
1586
}
1587

1588
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin, SStableSplitInfo* pInfo) {
5,039,722✔
1589
  int32_t code = TSDB_CODE_SUCCESS;
5,039,722✔
1590
  SNode*  pChild = NULL;
5,039,722✔
1591
  FOREACH(pChild, pJoin->node.pChildren) {
15,120,228✔
1592
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
10,079,906✔
1593
      //if (pJoin->node.dynamicOp) {
1594
      //  code = TSDB_CODE_SUCCESS;
1595
      //} else {
1596
        code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, pJoin->grpJoin ? true : false, pInfo);
10,061,558✔
1597
      //}
1598
    } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
18,348✔
1599
      code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild, pInfo);
18,348✔
1600
    } else {
1601
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1602
    }
1603
    if (TSDB_CODE_SUCCESS != code) {
10,080,506✔
1604
      break;
×
1605
    }
1606
  }
1607
  return code;
5,040,322✔
1608
}
1609

1610
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
5,021,305✔
1611
  int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode, pInfo);
5,021,305✔
1612
  if (TSDB_CODE_SUCCESS == code) {
5,021,743✔
1613
    //if (!pInfo->pSplitNode->dynamicOp) {
1614
      pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
5,021,905✔
1615
    //}
1616
    //SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
1617
    pInfo->pSplitNode->splitDone = true;
5,021,905✔
1618
  }
1619
  return code;
5,021,743✔
1620
}
1621

1622
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
407,926✔
1623
  SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0);
407,926✔
1624
  SNode*          pPK = NULL;
407,926✔
1625
  SNode*          pPrimaryKey = NULL;
407,926✔
1626
  int32_t code = stbSplFindPrimaryKeyFromScan(pScan, &pPK);
407,926✔
1627
  if (TSDB_CODE_SUCCESS == code) {
407,926✔
1628
    code = nodesCloneNode(pPK, &pPrimaryKey);
407,926✔
1629
  }
1630
  if (NULL == pPrimaryKey) {
407,926✔
1631
    return code;
×
1632
  }
1633
  code = nodesListStrictAppend(pPart->pTargets, pPrimaryKey);
407,926✔
1634
  if (TSDB_CODE_SUCCESS == code) {
407,926✔
1635
    code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys);
407,926✔
1636
  }
1637
  return code;
407,926✔
1638
}
1639

1640
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1,046,991✔
1641
  int32_t    code = TSDB_CODE_SUCCESS;
1,046,991✔
1642
  SNodeList* pMergeKeys = NULL;
1,046,991✔
1643
  if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) {
1,046,991✔
1644
    code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys);
407,926✔
1645
  }
1646
  if (TSDB_CODE_SUCCESS == code) {
1,046,991✔
1647
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true, true);
1,047,153✔
1648
  }
1649
  if (TSDB_CODE_SUCCESS == code) {
1,047,153✔
1650
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1,047,477✔
1651
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
1,047,315✔
1652
  }
1653
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1,047,477✔
1654
  ++(pCxt->groupId);
1,047,477✔
1655
  return code;
1,047,477✔
1656
}
1657

1658
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,509,385✔
1659
  if (pCxt->pPlanCxt->rSmaQuery) {
515,509,385✔
1660
    return TSDB_CODE_SUCCESS;
×
1661
  }
1662

1663
  SStableSplitInfo info = {0};
515,507,712✔
1664
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
515,507,752✔
1665
    return TSDB_CODE_SUCCESS;
413,531,279✔
1666
  }
1667

1668
  int32_t code = TSDB_CODE_SUCCESS;
102,084,206✔
1669
  switch (nodeType(info.pSplitNode)) {
102,084,206✔
1670
    case QUERY_NODE_LOGIC_PLAN_SCAN:
47,243,241✔
1671
      code = stbSplSplitScanNode(pCxt, &info);
47,243,241✔
1672
      break;
47,241,900✔
1673
    case QUERY_NODE_LOGIC_PLAN_JOIN:
5,021,098✔
1674
      code = stbSplSplitJoinNode(pCxt, &info);
5,021,098✔
1675
      break;
5,021,326✔
1676
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
1,046,991✔
1677
      code = stbSplSplitPartitionNode(pCxt, &info);
1,046,991✔
1678
      break;
1,047,477✔
1679
    case QUERY_NODE_LOGIC_PLAN_AGG:
32,359,479✔
1680
      code = stbSplSplitAggNode(pCxt, &info);
32,359,479✔
1681
      break;
32,363,231✔
1682
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
8,248,380✔
1683
      code = stbSplSplitWindowNode(pCxt, &info);
8,248,380✔
1684
      break;
8,250,915✔
1685
    case QUERY_NODE_LOGIC_PLAN_SORT:
8,170,508✔
1686
      code = stbSplSplitSortNode(pCxt, &info);
8,170,508✔
1687
      break;
8,173,715✔
1688
    default:
×
1689
      break;
×
1690
  }
1691

1692
  if (info.pSplitNode && !inStreamTriggerClause(pCxt->pPlanCxt) && !inStreamCalcClause(pCxt->pPlanCxt)) {
102,093,605✔
1693
    info.pSplitNode->splitDone = true;
86,241,479✔
1694
  }
1695
  pCxt->split = true;
102,103,584✔
1696
  return code;
102,103,052✔
1697
}
1698

1699
typedef struct SSigTbJoinSplitInfo {
1700
  SJoinLogicNode* pJoin;
1701
  SLogicNode*     pSplitNode;
1702
  SLogicSubplan*  pSubplan;
1703
} SSigTbJoinSplitInfo;
1704

1705
static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) {
1,877,313,202✔
1706
  if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
1,877,313,202✔
1707
    return false;
1,822,522,678✔
1708
  }
1709

1710
  SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
54,790,583✔
1711
  if (!pJoin->isSingleTableJoin) {
54,790,583✔
1712
    return false;
21,415,925✔
1713
  }
1714
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
66,801,054✔
1715
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
33,425,510✔
1716
}
1717

1718
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,877,300,354✔
1719
                                      SSigTbJoinSplitInfo* pInfo) {
1720
  if (sigTbJoinSplNeedSplit(pNode)) {
1,877,300,354✔
1721
    pInfo->pJoin = (SJoinLogicNode*)pNode;
15,534,672✔
1722
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
15,534,081✔
1723
    pInfo->pSubplan = pSubplan;
15,553,626✔
1724
    return true;
15,553,626✔
1725
  }
1726
  return false;
1,861,846,595✔
1727
}
1728

1729
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,526,328✔
1730
  SSigTbJoinSplitInfo info = {0};
515,526,328✔
1731
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
515,526,585✔
1732
    return TSDB_CODE_SUCCESS;
500,096,566✔
1733
  }
1734
  bool hasScan = checkScanLogicNode((SLogicNode*)nodesListGetNode(info.pJoin->node.pChildren, 0));
15,551,662✔
1735
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, hasScan ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE, false);
15,551,603✔
1736
  if (TSDB_CODE_SUCCESS == code) {
15,551,511✔
1737
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
15,551,857✔
1738
  }
1739
  ++(pCxt->groupId);
15,553,634✔
1740
  pCxt->split = true;
15,553,634✔
1741
  return code;
15,553,634✔
1742
}
1743

1744
static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
28,323,461✔
1745
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
28,323,461✔
1746
  pUnionSubplan->pChildren = NULL;
28,323,461✔
1747

1748
  int32_t code = TSDB_CODE_SUCCESS;
28,323,461✔
1749

1750
  SNode* pChild = NULL;
28,323,461✔
1751
  FOREACH(pChild, pSplitNode->pChildren) {
85,200,999✔
1752
    SLogicSubplan* pNewSubplan = NULL;
56,881,726✔
1753
    code = splCreateSubplan(pCxt, (SLogicNode*)pChild, &pNewSubplan);
56,881,726✔
1754
    if (TSDB_CODE_SUCCESS == code) {
56,883,249✔
1755
      code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
56,883,969✔
1756
    }
1757
    if (TSDB_CODE_SUCCESS == code) {
56,883,776✔
1758
      REPLACE_NODE(NULL);
56,884,590✔
1759
      code = splMountSubplan(pNewSubplan, pSubplanChildren);
56,884,590✔
1760
    }
1761
    if (TSDB_CODE_SUCCESS != code) {
56,877,538✔
1762
      break;
×
1763
    }
1764
    ++(pCxt->groupId);
56,877,538✔
1765
  }
1766
  if (TSDB_CODE_SUCCESS == code) {
28,319,273✔
1767
    if (NULL != pSubplanChildren) {
28,330,866✔
1768
      if (pSubplanChildren->length > 0) {
11,697,422✔
1769
        code = nodesListMakeStrictAppendList(&pUnionSubplan->pChildren, pSubplanChildren);
4,848✔
1770
      } else {
1771
        nodesDestroyList(pSubplanChildren);
11,692,574✔
1772
      }
1773
    }
1774
    NODES_DESTORY_LIST(pSplitNode->pChildren);
28,330,728✔
1775
  }
1776
  return code;
28,319,308✔
1777
}
1778

1779
typedef struct SUnionAllSplitInfo {
1780
  SProjectLogicNode* pProject;
1781
  SLogicSubplan*     pSubplan;
1782
} SUnionAllSplitInfo;
1783

1784
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,853,730,436✔
1785
                                  SUnionAllSplitInfo* pInfo) {
1786
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
1,853,730,436✔
1787
    pInfo->pProject = (SProjectLogicNode*)pNode;
15,631,932✔
1788
    pInfo->pSubplan = pSubplan;
15,631,932✔
1789
    return true;
15,631,932✔
1790
  }
1791
  return false;
1,838,097,964✔
1792
}
1793

1794
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
15,629,849✔
1795
                                          SProjectLogicNode* pProject) {
1796
  SExchangeLogicNode* pExchange = NULL;
15,629,849✔
1797
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
15,629,849✔
1798
  if (NULL == pExchange) {
15,632,410✔
1799
    return code;
×
1800
  }
1801
  pExchange->srcStartGroupId = startGroupId;
15,632,410✔
1802
  pExchange->srcEndGroupId = pCxt->groupId - 1;
15,632,410✔
1803
  pExchange->node.precision = pProject->node.precision;
15,632,410✔
1804
  pExchange->node.pTargets = NULL;
15,632,410✔
1805
  code = nodesCloneList(pProject->node.pTargets, &pExchange->node.pTargets);
15,632,410✔
1806
  if (TSDB_CODE_SUCCESS != code) {
15,632,747✔
1807
    nodesDestroyNode((SNode*)pExchange);
×
1808
    return code;
×
1809
  }
1810
  pExchange->node.pConditions = NULL;
15,632,747✔
1811
  code = nodesCloneNode(pProject->node.pConditions, &pExchange->node.pConditions);
15,632,747✔
1812
  if (TSDB_CODE_SUCCESS != code) {
15,632,945✔
1813
    nodesDestroyNode((SNode*)pExchange);
2,168✔
1814
    return code;
×
1815
  }
1816
  TSWAP(pExchange->node.pLimit, pProject->node.pLimit);
15,630,777✔
1817

1818
  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
15,630,777✔
1819

1820
  if (NULL == pProject->node.pParent) {
15,630,777✔
1821
    pSubplan->pNode = (SLogicNode*)pExchange;
9,424,415✔
1822
    nodesDestroyNode((SNode*)pProject);
9,424,415✔
1823
    return TSDB_CODE_SUCCESS;
9,425,720✔
1824
  }
1825

1826
  SNode* pNode;
1827
  FOREACH(pNode, pProject->node.pParent->pChildren) {
6,206,362✔
1828
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
6,206,984✔
1829
      REPLACE_NODE(pExchange);
6,206,424✔
1830
      nodesDestroyNode(pNode);
6,206,424✔
1831
      return TSDB_CODE_SUCCESS;
6,207,320✔
1832
    }
1833
  }
1834
  nodesDestroyNode((SNode*)pExchange);
×
1835
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1836
}
1837

1838
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,511,967✔
1839
  SUnionAllSplitInfo info = {0};
515,511,967✔
1840
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
515,512,912✔
1841
    return TSDB_CODE_SUCCESS;
500,019,559✔
1842
  }
1843

1844
  int32_t startGroupId = pCxt->groupId;
15,629,837✔
1845
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
15,629,837✔
1846
  if (TSDB_CODE_SUCCESS == code) {
15,632,789✔
1847
    code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
15,632,883✔
1848
  }
1849
  pCxt->split = true;
15,632,971✔
1850
  return code;
15,632,971✔
1851
}
1852

1853
typedef struct SUnionDistinctSplitInfo {
1854
  SAggLogicNode* pAgg;
1855
  SLogicSubplan* pSubplan;
1856
} SUnionDistinctSplitInfo;
1857

1858
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
12,696,351✔
1859
                                           SAggLogicNode* pAgg) {
1860
  SExchangeLogicNode* pExchange = NULL;
12,696,351✔
1861
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
12,696,351✔
1862
  if (NULL == pExchange) {
12,698,435✔
1863
    return code;
×
1864
  }
1865
  pExchange->srcStartGroupId = startGroupId;
12,698,435✔
1866
  pExchange->srcEndGroupId = pCxt->groupId - 1;
12,698,435✔
1867
  pExchange->node.precision = pAgg->node.precision;
12,698,435✔
1868
  pExchange->node.pTargets = NULL;
12,698,435✔
1869
  code = nodesCloneList(pAgg->pGroupKeys, &pExchange->node.pTargets);
12,698,435✔
1870
  if (NULL == pExchange->node.pTargets) {
12,698,871✔
1871
    nodesDestroyNode((SNode*)pExchange);
×
1872
    return code;
×
1873
  }
1874

1875
  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
12,698,871✔
1876

1877
  return nodesListMakeStrictAppend(&pAgg->node.pChildren, (SNode*)pExchange);
12,698,871✔
1878
}
1879

1880
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,866,946,821✔
1881
                                   SUnionDistinctSplitInfo* pInfo) {
1882
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
1,866,946,821✔
1883
    pInfo->pAgg = (SAggLogicNode*)pNode;
12,697,464✔
1884
    if (!pInfo->pAgg->pGroupKeys) return false;
12,697,464✔
1885
    pInfo->pSubplan = pSubplan;
12,697,464✔
1886
    return true;
12,697,464✔
1887
  }
1888
  return false;
1,854,249,247✔
1889
}
1890

1891
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,544,803✔
1892
  SUnionDistinctSplitInfo info = {0};
515,544,803✔
1893
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
515,546,853✔
1894
    return TSDB_CODE_SUCCESS;
502,949,802✔
1895
  }
1896

1897
  int32_t startGroupId = pCxt->groupId;
12,696,842✔
1898
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
12,696,842✔
1899
  if (TSDB_CODE_SUCCESS == code) {
12,698,640✔
1900
    code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
12,698,733✔
1901
  }
1902
  pCxt->split = true;
12,698,964✔
1903
  return code;
12,698,964✔
1904
}
1905

1906
typedef struct SSmaIndexSplitInfo {
1907
  SMergeLogicNode* pMerge;
1908
  SLogicSubplan*   pSubplan;
1909
} SSmaIndexSplitInfo;
1910

1911
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,948,018,123✔
1912
                                   SSmaIndexSplitInfo* pInfo) {
1913
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
1,948,018,123✔
1914
    if (((SMergeLogicNode*)pNode)->node.dynamicOp) {
1,587,114✔
1915
      return false;
210,042✔
1916
    }
1917
    int32_t nodeType = nodeType(nodesListGetNode(pNode->pChildren, 0));
1,377,072✔
1918
    if (nodeType == QUERY_NODE_LOGIC_PLAN_EXCHANGE || nodeType == QUERY_NODE_LOGIC_PLAN_MERGE) {
1,377,072✔
1919
      pInfo->pMerge = (SMergeLogicNode*)pNode;
385✔
1920
      pInfo->pSubplan = pSubplan;
×
1921
      return true;
×
1922
    }
1923
  }
1924
  return false;
1,947,808,078✔
1925
}
1926

1927
static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,538,092✔
1928
  SSmaIndexSplitInfo info = {0};
515,538,092✔
1929
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
515,538,706✔
1930
    return TSDB_CODE_SUCCESS;
515,648,352✔
1931
  }
1932

1933
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
×
1934
  if (TSDB_CODE_SUCCESS == code) {
×
1935
    info.pMerge->srcGroupId = pCxt->groupId;
×
1936
  }
1937
  ++(pCxt->groupId);
×
1938
  pCxt->split = true;
×
1939
  return code;
×
1940
}
1941

1942
typedef struct SInsertSelectSplitInfo {
1943
  SLogicNode*    pQueryRoot;
1944
  SLogicSubplan* pSubplan;
1945
} SInsertSelectSplitInfo;
1946

1947
static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,945,401,390✔
1948
                                   SInsertSelectSplitInfo* pInfo) {
1949
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
1,945,401,390✔
1950
      MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
537,034✔
1951
    pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
537,034✔
1952
    pInfo->pSubplan = pSubplan;
537,034✔
1953
    return true;
537,034✔
1954
  }
1955
  return false;
1,944,864,693✔
1956
}
1957

1958
static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,566,632✔
1959
  SInsertSelectSplitInfo info = {0};
515,566,632✔
1960
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
515,567,918✔
1961
    return TSDB_CODE_SUCCESS;
515,116,184✔
1962
  }
1963

1964
  SLogicSubplan* pNewSubplan = NULL;
536,477✔
1965
  SNodeList*     pSubplanChildren = info.pSubplan->pChildren;
536,477✔
1966
  int32_t        code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY, false);
536,477✔
1967
  if (TSDB_CODE_SUCCESS == code) {
537,034✔
1968
    code = splCreateSubplan(pCxt, info.pQueryRoot, &pNewSubplan);
537,034✔
1969
  }
1970
  if (TSDB_CODE_SUCCESS == code) {
537,034✔
1971
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pNewSubplan);
537,034✔
1972
  }
1973
  if (TSDB_CODE_SUCCESS == code) {
537,034✔
1974
    code = splMountSubplan(pNewSubplan, pSubplanChildren);
537,034✔
1975
  }
1976

1977
  SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
537,034✔
1978
  ++(pCxt->groupId);
537,034✔
1979
  pCxt->split = true;
537,034✔
1980
  return code;
537,034✔
1981
}
1982

1983
typedef struct SVirtualTableSplitInfo {
1984
  SVirtualScanLogicNode *pVirtual;
1985
  SLogicSubplan          *pSubplan;
1986
} SVirtualTableSplitInfo;
1987

1988
static bool virtualTableFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,927,119,596✔
1989
                                      SVirtualTableSplitInfo* pInfo) {
1990
  if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode) && 0 != LIST_LENGTH(pNode->pChildren) &&
1,927,119,596✔
1991
      QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
10,038,164✔
1992
    pInfo->pVirtual = (SVirtualScanLogicNode*)pNode;
4,649,987✔
1993
    pInfo->pSubplan = pSubplan;
4,649,987✔
1994
    return true;
4,649,987✔
1995
  }
1996
  return false;
1,922,468,800✔
1997
}
1998

1999
static bool needProcessOneBlockEachTime(SVirtualScanLogicNode* pVirtual) {
15,692,636✔
2000
  if (pVirtual->node.pParent && QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pVirtual->node.pParent)) {
15,692,636✔
2001
    return true;
3,487,960✔
2002
  }
2003
  return false;
12,204,676✔
2004
}
2005

2006
static int32_t virtualTableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,543,367✔
2007
  int32_t                code = TSDB_CODE_SUCCESS;
515,543,367✔
2008
  SVirtualTableSplitInfo info = {0};
515,543,367✔
2009
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)virtualTableFindSplitNode, &info)) {
515,544,916✔
2010
    return TSDB_CODE_SUCCESS;
511,002,736✔
2011
  }
2012
  SNode*  pChild = NULL;
4,647,664✔
2013
  FOREACH(pChild, info.pVirtual->node.pChildren) {
20,340,300✔
2014
    SExchangeLogicNode* pExchange = NULL;
15,692,636✔
2015
    PLAN_ERR_JRET(splCreateExchangeNode(pCxt, (SLogicNode*)pChild, &pExchange));
15,692,636✔
2016

2017
    pExchange->dynTbname = nodeType((SLogicNode*)pChild) == QUERY_NODE_LOGIC_PLAN_SCAN ? ((SScanLogicNode*)pChild)->phTbnameScan : false;
15,692,636✔
2018
    pExchange->seqRecvData = (info.pVirtual->tableType == TSDB_SUPER_TABLE);
15,692,636✔
2019

2020
    pExchange->node.stmtRoot = ((SLogicNode*)pChild)->stmtRoot;
15,692,636✔
2021
    REPLACE_NODE(pExchange);
15,692,636✔
2022
    pExchange->node.pParent = ((SLogicNode*)pChild)->pParent;
15,692,636✔
2023

2024
    SLogicSubplan *sub = splCreateScanSubplan(pCxt, (SLogicNode*)pChild, 0);
15,692,636✔
2025
    sub->processOneBlock = needProcessOneBlockEachTime(info.pVirtual);
15,692,636✔
2026
    PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)sub));
15,692,636✔
2027
    ++(pCxt->groupId);
15,692,636✔
2028
  }
2029
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
4,647,664✔
2030
_return:
4,647,664✔
2031
  pCxt->split = true;
4,647,664✔
2032
  return code;
4,647,664✔
2033
}
2034

2035
typedef struct SMergeAggColsSplitInfo {
2036
  SAggLogicNode   *pAgg;
2037
  SLogicNode      *pSplitNode;
2038
  SLogicSubplan   *pSubplan;
2039
} SMergeAggColsSplitInfo;
2040

2041
typedef struct SMergeTableScanSplitInfo {
2042
  SLogicNode    *pMerge;    // Dynamic merge node selected for split.
2043
  SLogicSubplan *pSubplan;  // Subplan that owns pMerge.
2044
} SMergeTableScanSplitInfo;
2045

2046
/*
2047
 * Find dynamic merge node that contains table-merge scan children.
2048
 *
2049
 * @param pCxt Split context.
2050
 * @param pSubplan Subplan currently visited.
2051
 * @param pNode Candidate logic node.
2052
 * @param pInfo Output split-node info.
2053
 *
2054
 * @return true when a split candidate is found, otherwise false.
2055
 */
2056
static bool mergeTableScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,963,666,269✔
2057
                                        SMergeTableScanSplitInfo* pInfo) {
2058
  if (QUERY_NODE_LOGIC_PLAN_MERGE != nodeType(pNode)) {
1,963,666,269✔
2059
    return false;
1,890,151,519✔
2060
  }
2061
  if (!pNode->dynamicOp) {
73,516,471✔
2062
    return false;
73,315,803✔
2063
  }
2064

2065
  SNode* pChild = NULL;
253,339✔
2066
  FOREACH(pChild, pNode->pChildren) {
926,851✔
2067
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) &&
743,526✔
2068
        ((SScanLogicNode*)pChild)->scanType == SCAN_TYPE_TABLE_MERGE &&
70,014✔
2069
        ((SLogicNode*)pChild)->dynamicOp) {
70,014✔
2070
      pInfo->pSubplan = pSubplan;
70,014✔
2071
      pInfo->pMerge = pNode;
70,014✔
2072
      return true;
70,014✔
2073
    }
2074
  }
2075
  return false;
183,325✔
2076
}
2077

2078
/*
2079
 * Split dynamic table-merge scan children into independent scan subplans.
2080
 *
2081
 * @param pCxt Split context.
2082
 * @param pSubplan Subplan to split.
2083
 *
2084
 * @return TSDB_CODE_SUCCESS when completed or no split needed, otherwise error code.
2085
 */
2086
static int32_t mergeTableScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,544,003✔
2087
  int32_t                   code = TSDB_CODE_SUCCESS;
515,544,003✔
2088
  SMergeTableScanSplitInfo  info = {0};
515,544,003✔
2089
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)mergeTableScanFindSplitNode, &info)) {
515,545,716✔
2090
    return TSDB_CODE_SUCCESS;
515,578,220✔
2091
  }
2092
  SMergeLogicNode* pMerge = (SMergeLogicNode*)info.pMerge;
72,033✔
2093
  // set group id range for merge node
2094
  pMerge->srcGroupId = pCxt->groupId;
72,033✔
2095

2096
  SNode*  pChild = NULL;
72,033✔
2097
  FOREACH(pChild, info.pMerge->pChildren) {
408,789✔
2098
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) &&
336,756✔
2099
        ((SScanLogicNode*)pChild)->scanType == SCAN_TYPE_TABLE_MERGE &&
336,756✔
2100
        ((SLogicNode*)pChild)->dynamicOp) {
336,756✔
2101
      PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)pChild, SUBPLAN_TYPE_SCAN, false));
336,756✔
2102
      PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pChild, 0)));
336,756✔
2103
      ++(pCxt->groupId);
336,756✔
2104
    }
2105
  }
2106

2107
  pMerge->srcEndGroupId = pCxt->groupId - 1;
72,033✔
2108

2109
  pCxt->split = true;
72,033✔
2110
  return code;
72,033✔
2111
}
2112

2113
static bool mergeAggColsNeedSplit(SLogicNode* pNode) {
1,964,376,067✔
2114
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
1,964,376,067✔
2115
      NULL != pNode->pParent &&
211,768,097✔
2116
      QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode->pParent) &&
80,395,518✔
2117
      ((SMergeLogicNode *)pNode->pParent)->colsMerge &&
164,973✔
2118
      QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))) {
164,973✔
2119
    return true;
72,300✔
2120
  }
2121
  return false;
1,964,304,186✔
2122
}
2123

2124
static bool mergeAggColsFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,964,370,700✔
2125
                                      SMergeAggColsSplitInfo* pInfo) {
2126
  if (mergeAggColsNeedSplit(pNode)) {
1,964,370,700✔
2127
    pInfo->pAgg = (SAggLogicNode *)pNode;
70,116✔
2128
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
71,924✔
2129
    pInfo->pSubplan = pSubplan;
72,300✔
2130
    return true;
72,300✔
2131
  }
2132
  return false;
1,964,386,430✔
2133
}
2134

2135
static int32_t mergeAggColsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,520,587✔
2136
  int32_t                code = TSDB_CODE_SUCCESS;
515,520,587✔
2137
  SMergeAggColsSplitInfo info = {0};
515,520,587✔
2138
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)mergeAggColsFindSplitNode, &info)) {
515,521,515✔
2139
    return TSDB_CODE_SUCCESS;
515,577,222✔
2140
  }
2141

2142
  PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, SUBPLAN_TYPE_MERGE, false));
71,928✔
2143
  PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)));
72,300✔
2144

2145
  ++(pCxt->groupId);
72,300✔
2146
  pCxt->split = true;
72,300✔
2147
  return code;
72,300✔
2148
}
2149

2150
typedef struct SMergeExtWinSplitInfo {
2151
  SLogicNode      *pSplitNode;
2152
  SLogicSubplan   *pSubplan;
2153
} SMergeExtWinSplitInfo;
2154

2155
static bool mergeExtWinNeedSplit(SLogicNode* pNode) {
1,965,578,023✔
2156
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) &&
1,965,578,023✔
2157
      pNode->pParent &&
597,279,780✔
2158
      pNode->pParent->pParent &&
431,927,450✔
2159
      QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
141,110,344✔
2160
    // right part, which calculate the final result depends on the time range from left part
2161
    // virtual normal/child table
2162
    if (((SWindowLogicNode*)(pNode->pParent))->winType == WINDOW_TYPE_EXTERNAL &&
8,297,396✔
2163
        QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode->pParent->pParent)) {
1,254,185✔
2164
      return true;
744,114✔
2165
    }
2166
    // virtual super table
2167
    if (((SWindowLogicNode*)(pNode->pParent))->winType == WINDOW_TYPE_EXTERNAL &&
7,553,282✔
2168
        QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode->pParent->pParent) &&
510,071✔
2169
        ((SDynQueryCtrlLogicNode*)pNode->pParent->pParent)->qType == DYN_QTYPE_VTB_WINDOW) {
384,083✔
2170
      return true;
384,083✔
2171
    }
2172
    // left part, which calculate the window range
2173
    if (((SWindowLogicNode*)(pNode->pParent))->winType != WINDOW_TYPE_EXTERNAL &&
7,169,199✔
2174
        QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode->pParent->pParent)) {
7,046,454✔
2175
      return true;
311,313✔
2176
    }
2177
    return false;
6,857,886✔
2178
  }
2179
  return false;
1,957,281,538✔
2180
}
2181

2182
static bool mergeExtWinFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,965,551,988✔
2183
                                     SMergeExtWinSplitInfo* pInfo) {
2184
  if (mergeExtWinNeedSplit(pNode)) {
1,965,551,988✔
2185
    pInfo->pSplitNode = pNode;
1,440,005✔
2186
    pInfo->pSubplan = pSubplan;
1,439,510✔
2187
    return true;
1,439,510✔
2188
  }
2189
  return false;
1,964,204,035✔
2190
}
2191

2192
static int32_t mergeExtWinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,537,272✔
2193
  int32_t                code = TSDB_CODE_SUCCESS;
515,537,272✔
2194
  SMergeExtWinSplitInfo  info = {0};
515,537,272✔
2195
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)mergeExtWinFindSplitNode, &info)) {
515,537,884✔
2196
    return TSDB_CODE_SUCCESS;
514,212,628✔
2197
  }
2198

2199
  PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false));
1,439,510✔
2200
  PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)));
1,439,510✔
2201

2202
  ++(pCxt->groupId);
1,439,510✔
2203
  pCxt->split = true;
1,439,510✔
2204
  return code;
1,439,510✔
2205
}
2206

2207
typedef struct SQnodeSplitInfo {
2208
  SLogicNode*    pSplitNode;
2209
  SLogicSubplan* pSubplan;
2210
} SQnodeSplitInfo;
2211

2212
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
2,647,087✔
2213
                                SQnodeSplitInfo* pInfo) {
2214
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
2,647,087✔
2215
      QUERY_NODE_LOGIC_PLAN_ANALYSIS_FUNC != nodeType(pNode->pParent) &&
1,018,934✔
2216
      QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
1,018,934✔
2217
      ((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
1,018,934✔
2218
    pInfo->pSplitNode = pNode;
1,018,934✔
2219
    pInfo->pSubplan = pSubplan;
1,018,934✔
2220
    return true;
1,018,934✔
2221
  }
2222
  return false;
1,628,153✔
2223
}
2224

2225
static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
371,244,570✔
2226
  if (QUERY_POLICY_QNODE != tsQueryPolicy) {
371,244,570✔
2227
    return TSDB_CODE_SUCCESS;
369,672,050✔
2228
  }
2229

2230
  SQnodeSplitInfo info = {0};
1,597,271✔
2231
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
1,593,916✔
2232
    return TSDB_CODE_SUCCESS;
574,982✔
2233
  }
2234
  ((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
1,018,934✔
2235
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false);
1,018,934✔
2236
  if (TSDB_CODE_SUCCESS == code) {
1,018,934✔
2237
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
1,018,934✔
2238
    if (NULL != pScanSubplan) {
1,018,934✔
2239
      if (NULL != info.pSubplan->pVgroupList) {
1,018,934✔
2240
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
1,022✔
2241
        TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
1,022✔
2242
      } else {
2243
        info.pSubplan->numOfComputeNodes = 1;
1,017,912✔
2244
      }
2245
      code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
1,018,934✔
2246
    } else {
2247
      code = terrno;
×
2248
    }
2249
  }
2250
  info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
1,018,934✔
2251
  ++(pCxt->groupId);
1,018,934✔
2252
  pCxt->split = true;
1,018,934✔
2253
  return code;
1,018,934✔
2254
}
2255

2256
typedef struct SDynVirtualScanSplitInfo {
2257
  SScanLogicNode         *pDyn;
2258
  SLogicSubplan          *pSubplan;
2259
} SDynVirtualScanSplitInfo;
2260

2261
static bool dynVirtualScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,942,135,025✔
2262
                                        SDynVirtualScanSplitInfo* pInfo) {
2263
  if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent) {
1,942,135,025✔
2264
    return false;
1,508,879,426✔
2265
  }
2266

2267
  SLogicNode*   pParent = pNode->pParent;
433,259,509✔
2268
  EScanType     scanType = ((SScanLogicNode*)pNode)->scanType;
433,259,458✔
2269

2270
  // 1. split for system table scan under dynamic query control node(virtual stable scan)
2271
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pParent) &&
433,259,161✔
2272
      (((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_SCAN ||
4,637,142✔
2273
       ((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_WINDOW ||
2,591,974✔
2274
       ((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_TS_SCAN) &&
4,637,142✔
2275
      scanType == SCAN_TYPE_SYSTEM_TABLE) {
2276
    pInfo->pDyn = (SScanLogicNode*)pNode;
2,499,265✔
2277
    pInfo->pSubplan = pSubplan;
2,499,265✔
2278
    return true;
2,499,265✔
2279
  }
2280

2281
  // 2. split for system table scan under dynamic query control node(virtual stable agg)
2282
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pParent) &&
430,759,871✔
2283
      (((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_AGG ||
2,137,877✔
2284
       ((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_INTERVAL) &&
2,137,877✔
2285
      scanType == SCAN_TYPE_SYSTEM_TABLE) {
2286
    pInfo->pDyn = (SScanLogicNode*)pNode;
2,009,613✔
2287
    pInfo->pSubplan = pSubplan;
2,009,613✔
2288
    return true;
2,009,613✔
2289
  }
2290

2291
  // 3. split for tag scan under partition node under dynamic query control node(virtual stable agg)
2292
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pParent) && NULL != pParent->pParent &&
428,748,761✔
2293
      (((SDynQueryCtrlLogicNode*)(pParent->pParent))->qType == DYN_QTYPE_VTB_AGG ||
15,901,744✔
2294
       ((SDynQueryCtrlLogicNode*)(pParent->pParent))->qType == DYN_QTYPE_VTB_INTERVAL) &&
15,901,744✔
2295
      scanType == SCAN_TYPE_TAG) {
2296
    pInfo->pDyn = (SScanLogicNode*)pNode;
1,606,554✔
2297
    pInfo->pSubplan = pSubplan;
1,606,554✔
2298
    return true;
1,606,554✔
2299
  }
2300

2301
  // 4. split for tag scan under dynamic query control node(virtual stable agg)
2302
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pParent) &&
427,142,119✔
2303
      (((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_AGG ||
128,264✔
2304
       ((SDynQueryCtrlLogicNode*)(pParent))->qType == DYN_QTYPE_VTB_INTERVAL) &&
128,264✔
2305
      scanType == SCAN_TYPE_TAG) {
2306
    pInfo->pDyn = (SScanLogicNode*)pNode;
128,264✔
2307
    pInfo->pSubplan = pSubplan;
128,264✔
2308
    return true;
128,264✔
2309
  }
2310

2311
  return false;
427,013,891✔
2312
}
2313

2314
static int32_t dynVirtualScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,516,394✔
2315
  int32_t                  code = TSDB_CODE_SUCCESS;
515,516,394✔
2316
  SDynVirtualScanSplitInfo info = {0};
515,516,394✔
2317
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)dynVirtualScanFindSplitNode, &info)) {
515,517,484✔
2318
    return TSDB_CODE_SUCCESS;
509,380,207✔
2319
  }
2320

2321
  PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pDyn, info.pSubplan->subplanType, false));
6,251,954✔
2322
  PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)info.pDyn, 0)));
6,243,696✔
2323
  
2324
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
6,243,696✔
2325
  ++(pCxt->groupId);
6,243,696✔
2326

2327
  pCxt->split = true;
6,243,696✔
2328
  return code;
6,243,696✔
2329
}
2330

2331
typedef struct SVstbAggSplitInfo {
2332
  SLogicNode             *pAgg;
2333
  SLogicNode             *pDyn;
2334
  SLogicSubplan          *pSubplan;
2335
  bool                    needPartAgg;
2336
} SVstbAggSplitInfo;
2337

2338
static bool vstbAggFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,968,432,899✔
2339
                                 SVstbAggSplitInfo* pInfo) {
2340
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && NULL != pNode->pParent && LIST_LENGTH(pNode->pChildren) == 1) {
1,968,432,899✔
2341
    if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode->pParent) &&
80,416,687✔
2342
        (((SDynQueryCtrlLogicNode*)(pNode->pParent))->qType == DYN_QTYPE_VTB_AGG ||
4,260,642✔
2343
         ((SDynQueryCtrlLogicNode*)(pNode->pParent))->qType == DYN_QTYPE_VTB_INTERVAL) &&
×
2344
        QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))) {
4,258,517✔
2345
      if (((SDynQueryCtrlLogicNode*)(pNode->pParent))->vtbScan.batchProcessChild) {
1,966,316✔
2346
        pInfo->pAgg = (SLogicNode *)pNode;
1,262,912✔
2347
        pInfo->pSubplan = pSubplan;
1,262,912✔
2348
        pInfo->needPartAgg = true;
1,262,912✔
2349
      } else {
2350
        pInfo->pAgg = (SLogicNode *)pNode;
703,404✔
2351
        pInfo->pDyn = pNode->pParent;
703,404✔
2352
        pInfo->pSubplan = pSubplan;
703,404✔
2353
        pInfo->needPartAgg = false;
703,404✔
2354
      }
2355
      return true;
1,966,316✔
2356
    }
2357
  }
2358
  return false;
1,966,504,653✔
2359

2360
}
2361

2362
static int32_t vstbAggSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,514,924✔
2363
  int32_t                  code = TSDB_CODE_SUCCESS;
515,514,924✔
2364
  int32_t                  lino = 0;
515,514,924✔
2365
  struct SVstbAggSplitInfo info = {0};
515,514,924✔
2366
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)vstbAggFindSplitNode, &info)) {
515,515,884✔
2367
    return TSDB_CODE_SUCCESS;
513,681,617✔
2368
  }
2369

2370
  SLogicNode* pPartAgg = NULL;
1,965,403✔
2371

2372
  if (info.needPartAgg) {
1,965,403✔
2373
    PLAN_ERR_JRET(stbSplCreatePartAggNode((SAggLogicNode*)info.pAgg, &pPartAgg));
1,262,912✔
2374
    PLAN_ERR_JRET(stbSplCreateExchangeNode(pCxt, info.pAgg, pPartAgg));
1,262,912✔
2375
    PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pPartAgg, 0)));
1,262,912✔
2376
  } else {
2377
    PLAN_ERR_JRET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg, info.pSubplan->subplanType, false));
703,216✔
2378
    PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)info.pAgg, 0)));
703,404✔
2379
  }
2380

2381

2382
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1,966,316✔
2383
  ++(pCxt->groupId);
1,966,316✔
2384

2385
  pCxt->split = true;
1,966,316✔
2386
  return code;
1,966,316✔
2387
_return:
×
2388
  planError("%s failed, code: %d, line: %d", __func__, code, lino);
×
2389
  return code;
×
2390
}
2391

2392
typedef struct SVstbIntervalSplitInfo {
2393
  SLogicNode*    pWindow;   // Interval-window node selected for split.
2394
  SLogicSubplan* pSubplan;  // Subplan that owns pWindow.
2395
} SVstbIntervalSplitInfo;
2396

2397
/*
2398
 * Find virtual-stable interval window that can be split for batch processing.
2399
 *
2400
 * @param pCxt Split context.
2401
 * @param pSubplan Subplan currently visited.
2402
 * @param pNode Candidate logic node.
2403
 * @param pInfo Output split-node info.
2404
 *
2405
 * @return true when a split candidate is found, otherwise false.
2406
 */
2407
static bool vstbIntervalFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
1,970,799,195✔
2408
                                      SVstbIntervalSplitInfo* pInfo) {
2409
  (void)pCxt;
2410
  if (QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode) || LIST_LENGTH(pNode->pChildren) != 1 ||
1,970,799,195✔
2411
      WINDOW_TYPE_INTERVAL != ((SWindowLogicNode*)pNode)->winType || NULL == pNode->pParent ||
42,215,611✔
2412
      QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL != nodeType(pNode->pParent) ||
4,212,412✔
2413
      DYN_QTYPE_VTB_INTERVAL != ((SDynQueryCtrlLogicNode*)pNode->pParent)->qType ||
43,297✔
2414
      !((SDynQueryCtrlLogicNode*)pNode->pParent)->vtbScan.batchProcessChild ||
43,297✔
2415
      QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
43,297✔
2416
    return false;
1,970,758,718✔
2417
  }
2418

2419
  pInfo->pWindow = pNode;
43,537✔
2420
  pInfo->pSubplan = pSubplan;
43,537✔
2421
  return true;
43,537✔
2422
}
2423

2424
/*
2425
 * Split a virtual-stable interval window subplan into per-group scan subplans.
2426
 *
2427
 * @param pCxt Split context.
2428
 * @param pSubplan Subplan to split.
2429
 *
2430
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
2431
 */
2432
static int32_t vstbIntervalSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
515,526,619✔
2433
  SVstbIntervalSplitInfo info = {0};
515,526,619✔
2434
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)vstbIntervalFindSplitNode, &info)) {
515,527,281✔
2435
    return TSDB_CODE_SUCCESS;
515,600,255✔
2436
  }
2437

2438
  SStableSplitInfo splitInfo = {
45,012✔
2439
    .pSplitNode = info.pWindow,
45,012✔
2440
    .pSubplan = info.pSubplan,
45,012✔
2441
  };
2442

2443
  int32_t code = stbSplSplitIntervalForBatch(pCxt, &splitInfo);
45,012✔
2444
  if (TSDB_CODE_SUCCESS == code) {
43,297✔
2445
    info.pWindow->splitDone = true;
43,297✔
2446
    pCxt->split = true;
43,297✔
2447
  }
2448
  return code;
43,297✔
2449
}
2450

2451
typedef struct SStreamScanSplitInfo {
2452
  SLogicNode             *pSplitNode;
2453
  SLogicSubplan          *pSubplan;
2454
} SStreamScanSplitInfo;
2455

2456
static bool streamScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
4,141,804✔
2457
                                           SStreamScanSplitInfo* pInfo) {
2458
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent) {
4,141,804✔
2459
    pInfo->pSplitNode = (SLogicNode *)pNode;
629,190✔
2460
    pInfo->pSubplan = pSubplan;
629,190✔
2461
    return true;
629,190✔
2462
  }
2463
  return false;
3,512,614✔
2464
}
2465

2466
static int32_t streamScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
371,296,249✔
2467
  int32_t                     code = TSDB_CODE_SUCCESS;
371,296,249✔
2468
  SStreamScanSplitInfo info = {0};
371,296,249✔
2469
  if (!inStreamCalcClause(pCxt->pPlanCxt) && !inStreamTriggerClause(pCxt->pPlanCxt)) {
371,298,373✔
2470
    return TSDB_CODE_SUCCESS;
370,453,288✔
2471
  }
2472
  while (splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)streamScanFindSplitNode, &info)) {
1,490,521✔
2473
    PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false));
629,190✔
2474
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
629,190✔
2475
    if (NULL != pScanSubplan) {
629,190✔
2476
      if (NULL != info.pSubplan->pVgroupList) {
629,190✔
2477
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
94,779✔
2478
      } else {
2479
        info.pSubplan->numOfComputeNodes = 1;
534,411✔
2480
      }
2481
      if (!pScanSubplan->pVgroupList) {
629,190✔
2482
        PLAN_ERR_RET(cloneVgroups(&pScanSubplan->pVgroupList, info.pSubplan->pVgroupList));
96,942✔
2483
      }
2484
      pScanSubplan->dynTbname = ((SScanLogicNode*)info.pSplitNode)->phTbnameScan;
629,190✔
2485
      PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan));
629,190✔
2486
    } else {
2487
      PLAN_ERR_RET(terrno);
×
2488
    }
2489
    info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
629,190✔
2490
    ++(pCxt->groupId);
629,190✔
2491
    info.pSplitNode->splitDone = true;
629,190✔
2492
    pCxt->split = true;
629,190✔
2493
  }
2494

2495
  return code;
861,331✔
2496
}
2497

2498
// clang-format off
2499
static const SSplitRule splitRuleSet[] = {
2500
  {.pName = "SuperTableSplit",        .splitFunc = stableSplit},
2501
  {.pName = "SingleTableJoinSplit",   .splitFunc = singleTableJoinSplit},
2502
  {.pName = "UnionAllSplit",          .splitFunc = unionAllSplit},
2503
  {.pName = "UnionDistinctSplit",     .splitFunc = unionDistinctSplit},
2504
  {.pName = "SmaIndexSplit",          .splitFunc = smaIndexSplit}, // not used yet
2505
  {.pName = "InsertSelectSplit",      .splitFunc = insertSelectSplit},
2506
  {.pName = "VirtualtableSplit",      .splitFunc = virtualTableSplit},
2507
  {.pName = "MergeTableScanSplit",    .splitFunc = mergeTableScanSplit},
2508
  {.pName = "MergeAggColsSplit",      .splitFunc = mergeAggColsSplit},
2509
  {.pName = "DynVirtualScanSplit",    .splitFunc = dynVirtualScanSplit},
2510
  {.pName = "VStbIntervalSplit",      .splitFunc = vstbIntervalSplit},
2511
  {.pName = "MergeExtWinSplit",       .splitFunc = mergeExtWinSplit},
2512
  {.pName = "VStbAggSplit",           .splitFunc = vstbAggSplit},
2513
};
2514
// clang-format on
2515

2516
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
2517

2518
static int32_t dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
532,247,179✔
2519
  int32_t code = 0;
532,247,179✔
2520
  if (!tsQueryPlannerTrace) {
532,247,179✔
2521
    return code;
532,177,943✔
2522
  }
2523
  char* pStr = NULL;
70,019✔
2524
  code = nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
69,902✔
2525
  if (TSDB_CODE_SUCCESS == code) {
103,700✔
2526
    if (NULL == pRuleName) {
103,700✔
2527
      qDebugL("before split, JsonPlan: %s", pStr);
77,632✔
2528
    } else {
2529
      qDebugL("apply split %s rule, JsonPlan: %s", pRuleName, pStr);
26,068✔
2530
    }
2531
    taosMemoryFree(pStr);
103,700✔
2532
  }
2533
  return code;
103,700✔
2534
}
2535

2536
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
371,238,339✔
2537
  SSplitContext cxt = {
371,238,339✔
2538
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pCxt->groupId + 1, .split = false};
371,239,224✔
2539
  bool    split = false;
371,235,582✔
2540
  int32_t code =TSDB_CODE_SUCCESS;
371,235,582✔
2541
  PLAN_ERR_RET(dumpLogicSubplan(NULL, pSubplan));
371,235,582✔
2542
  do {
2543
    split = false;
515,603,569✔
2544
    for (int32_t i = 0; i < splitRuleNum; ++i) {
2,147,483,647✔
2545
      cxt.split = false;
2,147,483,647✔
2546
      PLAN_ERR_RET(splitRuleSet[i].splitFunc(&cxt, pSubplan));
2,147,483,647✔
2547
      if (cxt.split) {
2,147,483,647✔
2548
        split = true;
161,009,737✔
2549
        PLAN_ERR_RET(dumpLogicSubplan(splitRuleSet[i].pName, pSubplan));
161,009,737✔
2550
      }
2551
    }
2552
  } while (split);
515,625,074✔
2553

2554
  PLAN_ERR_RET(streamScanSplit(&cxt, pSubplan));
371,358,934✔
2555
  PLAN_ERR_RET(qnodeSplit(&cxt, pSubplan));
371,325,503✔
2556

2557
  pCxt->groupId = cxt.groupId + 1;
371,336,078✔
2558
  
2559
  PLAN_RET(code);
371,335,485✔
2560
}
2561

2562
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
550,338,077✔
2563
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
550,338,077✔
2564
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
1,469,822✔
2565
    return;
1,469,822✔
2566
  } else if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode)) {
548,875,023✔
2567
    // do nothing, since virtual table scan node is SUBPLAN_TYPE_MERGE
2568
    return;
×
2569
  }
2570

2571
  SNode* pChild;
2572
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
551,804,817✔
2573
}
2574

2575
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
918,641,794✔
2576
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
918,641,794✔
2577
    return true;
370,733,506✔
2578
  }
2579
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
547,910,006✔
2580
  return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
547,910,259✔
2581
}
2582

2583
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
918,649,388✔
2584
  if (!needSplitSubplan(pLogicSubplan)) {
918,649,388✔
2585
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
547,406,149✔
2586
    return TSDB_CODE_SUCCESS;
547,378,892✔
2587
  }
2588
  return applySplitRule(pCxt, pLogicSubplan);
371,273,691✔
2589
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc