• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.74
/source/libs/planner/src/planSpliter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "functionMgt.h"
17
#include "planInt.h"
18
#include "tglobal.h"
19

20
#define SPLIT_FLAG_MASK(n) (1 << n)
21

22
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
23
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
24

25
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
26
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
27

28
typedef struct SSplitContext {
29
  SPlanContext* pPlanCxt;
30
  uint64_t      queryId;
31
  int32_t       groupId;
32
  bool          split;
33
} SSplitContext;
34

35
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
36

37
typedef struct SSplitRule {
38
  char*  pName;
39
  FSplit splitFunc;
40
} SSplitRule;
41

42
typedef struct SFindSplitNodeCtx {
43
  const SSplitContext* pSplitCtx;
44
  const SLogicSubplan* pSubplan;
45
} SFindSplitNodeCtx;
46

47
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
48

49
static int32_t cloneVgroups(SVgroupsInfo **pDst, SVgroupsInfo* pSrc) {
222✔
50
  if (pSrc == NULL) {
222!
51
    *pDst = NULL;
×
52
    return TSDB_CODE_SUCCESS;
×
53
  }
54
  int32_t len = VGROUPS_INFO_SIZE(pSrc);
222!
55
  *pDst = taosMemoryMalloc(len);
222!
56
  if (NULL == *pDst) {
222!
57
    return terrno;
×
58
  }
59
  memcpy(*pDst, pSrc, len);
222✔
60
  return TSDB_CODE_SUCCESS;
222✔
61
}
62

63
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput);
64

65
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
22,562✔
66
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
22,562✔
67
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
17,109✔
68
  } else if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode)) {
5,453✔
69
    // do nothing, since virtual table scan node is SUBPLAN_TYPE_MERGE
70
  } else if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode) && ((SDynQueryCtrlLogicNode *)pNode)->qType == DYN_QTYPE_VTB_SCAN) {
4,769!
71
    TSWAP(pSubplan->pVgroupList, ((SDynQueryCtrlLogicNode*)pNode)->vtbScan.pVgroupList);
×
72
  } else {
73
    if (1 == LIST_LENGTH(pNode->pChildren)) {
4,769!
74
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
4,769✔
75
    }
76
  }
77
}
22,562✔
78

79
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
17,793✔
80
  SLogicSubplan* pSubplan = NULL;
17,793✔
81
  terrno = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan);
17,793✔
82
  if (NULL == pSubplan) {
17,793!
83
    return NULL;
×
84
  }
85
  pSubplan->id.queryId = pCxt->queryId;
17,793✔
86
  pSubplan->id.groupId = pCxt->groupId;
17,793✔
87
  // TODO(smj):refact here.
88
  pSubplan->subplanType = nodeType(pNode) == QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN ? SUBPLAN_TYPE_MERGE : SUBPLAN_TYPE_SCAN;
17,793✔
89
  pSubplan->pNode = pNode;
17,793✔
90
  pSubplan->pNode->pParent = NULL;
17,793✔
91
  splSetSubplanVgroups(pSubplan, pNode);
17,793✔
92
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
17,793✔
93
  return pSubplan;
17,793✔
94
}
95

96
static bool splHasScan(SLogicNode* pNode) {
1,585✔
97
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
1,585!
98
    return true;
×
99
  }
100

101
  SNode* pChild = NULL;
1,585✔
102
  FOREACH(pChild, pNode->pChildren) {
1,585✔
103
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
509✔
104
      return true;
142✔
105
    }
106
    return splHasScan((SLogicNode*)pChild);
367✔
107
  }
108

109
  return false;
1,076✔
110
}
111

112
static void splSetSubplanType(SLogicSubplan* pSubplan) {
1,218✔
113
  pSubplan->subplanType = splHasScan(pSubplan->pNode) ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
1,218✔
114
}
1,218✔
115

116
static int32_t splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, SLogicSubplan** ppSubplan) {
163✔
117
  SLogicSubplan* pSubplan = NULL;
163✔
118
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan);
163✔
119
  if (NULL == pSubplan) {
163!
120
    return code;
×
121
  }
122
  pSubplan->id.queryId = pCxt->queryId;
163✔
123
  pSubplan->id.groupId = pCxt->groupId;
163✔
124
  pSubplan->pNode = pNode;
163✔
125
  pNode->pParent = NULL;
163✔
126
  splSetSubplanType(pSubplan);
163✔
127
  *ppSubplan = pSubplan;
163✔
128
  return code;
163✔
129
}
130

131
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
15,811✔
132
  SExchangeLogicNode* pExchange = NULL;
15,811✔
133
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
15,811✔
134
  if (NULL == pExchange) {
15,812!
135
    return code;
×
136
  }
137

138
  pExchange->srcStartGroupId = pCxt->groupId;
15,812✔
139
  pExchange->srcEndGroupId = pCxt->groupId;
15,812✔
140
  pExchange->node.precision = pChild->precision;
15,812✔
141
  pExchange->node.dynamicOp = pChild->dynamicOp;
15,812✔
142
  pExchange->node.pTargets = NULL;
15,812✔
143
  code = nodesCloneList(pChild->pTargets, &pExchange->node.pTargets);
15,812✔
144
  if (NULL == pExchange->node.pTargets) {
15,812!
145
    nodesDestroyNode((SNode*)pExchange);
×
146
    return code;
×
147
  }
148
  if (NULL != pChild->pLimit) {
15,812✔
149
    pExchange->node.pLimit = NULL; 
392✔
150
    code = nodesCloneNode(pChild->pLimit, &pExchange->node.pLimit);
392✔
151
    if (NULL == pExchange->node.pLimit) {
392!
152
      nodesDestroyNode((SNode*)pExchange);
×
153
      return code;
×
154
    }
155
    if (((SLimitNode*)pChild->pLimit)->limit && ((SLimitNode*)pChild->pLimit)->offset) {
392!
156
      ((SLimitNode*)pChild->pLimit)->limit->datum.i += ((SLimitNode*)pChild->pLimit)->offset->datum.i;
3✔
157
    }
158
    if (((SLimitNode*)pChild->pLimit)->offset) {
392✔
159
      ((SLimitNode*)pChild->pLimit)->offset->datum.i = 0;
3✔
160
    }
161
  }
162

163
  *pOutput = pExchange;
15,812✔
164
  return TSDB_CODE_SUCCESS;
15,812✔
165
}
166

167
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
14,436✔
168
                                               ESubplanType subplanType, bool seqScan) {
169
  SExchangeLogicNode* pExchange = NULL;
14,436✔
170
  int32_t             code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
14,436✔
171
  if (TSDB_CODE_SUCCESS == code) {
14,436!
172
    if (nodeType(pSplitNode) == QUERY_NODE_LOGIC_PLAN_SCAN) {
14,436✔
173
      pExchange->dynTbname = ((SScanLogicNode*)pSplitNode)->phTbnameScan;
11,312✔
174
    } else {
175
      pExchange->dynTbname = false;
3,124✔
176
    }
177
    pExchange->seqRecvData = seqScan;
14,436✔
178
    code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
14,436✔
179
  }
180
  if (TSDB_CODE_SUCCESS == code) {
14,436!
181
    pSubplan->subplanType = subplanType;
14,436✔
182
  } else {
183
    nodesDestroyNode((SNode*)pExchange);
×
184
  }
185
  return code;
14,436✔
186
}
187

188
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
72✔
189
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
72✔
190
    return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
42!
191
           groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
21✔
192
  }
193

194
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
51✔
195
    return ((SMergeLogicNode*)pLogicNode)->srcGroupId <= groupId &&
18!
196
           ((SMergeLogicNode*)pLogicNode)->srcEndGroupId >= groupId;
9✔
197
  }
198

199
  SNode* pChild;
200
  FOREACH(pChild, pLogicNode->pChildren) {
54✔
201
    bool isChild = splIsChildSubplan((SLogicNode*)pChild, groupId);
39✔
202
    if (isChild) {
39✔
203
      return isChild;
27✔
204
    }
205
  }
206
  return false;
15✔
207
}
208

209
static int32_t splMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
163✔
210
  SNode* pChild = NULL;
163✔
211
  WHERE_EACH(pChild, pChildren) {
196✔
212
    if (splIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
33✔
213
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
24✔
214
      if (TSDB_CODE_SUCCESS == code) {
24!
215
        REPLACE_NODE(NULL);
24✔
216
        ERASE_NODE(pChildren);
24✔
217
        continue;
24✔
218
      } else {
219
        return code;
×
220
      }
221
    }
222
    WHERE_NEXT;
9✔
223
  }
224
  return TSDB_CODE_SUCCESS;
163✔
225
}
226

227
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
1,258,334✔
228
                           void* pInfo) {
229
  if (!pNode->splitDone && func(pCxt, pSubplan, pNode, pInfo)) {
1,258,334✔
230
    return true;
10,992✔
231
  }
232
  SNode* pChild;
233
  FOREACH(pChild, pNode->pChildren) {
1,975,299✔
234
    if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) {
743,930✔
235
      return true;
16,056✔
236
    }
237
  }
238
  return false;
1,231,369✔
239
}
240

241
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
519,990✔
242
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
519,990✔
243
    if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) {
514,863✔
244
      return true;
10,992✔
245
    }
246
  }
247
  SNode* pChild;
248
  FOREACH(pChild, pSubplan->pChildren) {
754,828✔
249
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
246,682✔
250
      return true;
918✔
251
    }
252
  }
253
  return false;
508,146✔
254
}
255

256
static void splSetParent(SLogicNode* pNode) {
3,306✔
257
  SNode* pChild = NULL;
3,306✔
258
  FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
5,605✔
259
}
3,306✔
260

261
typedef struct SStableSplitInfo {
262
  SLogicNode*    pSplitNode;
263
  SLogicSubplan* pSubplan;
264
} SStableSplitInfo;
265

266
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
15,701✔
267
  SNode* pFunc = NULL;
15,701✔
268
  FOREACH(pFunc, pFuncs) {
71,489✔
269
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
56,461✔
270
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
54,579✔
271
      return true;
659✔
272
    }
273
  }
274
  return false;
15,028✔
275
}
276

277
static bool stbSplIsMultiTbScan(SScanLogicNode* pScan) {
55,529✔
278
  return ((NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) || pScan->needSplit) &&
55,529!
279
         pScan->placeholderType != SP_PARTITION_TBNAME &&
5,135✔
280
         pScan->placeholderType != SP_PARTITION_ROWS &&
5,128!
281
         !pScan->phTbnameScan && !pScan->virtualStableScan;
111,058!
282
}
283

284
static bool stbSplHasMultiTbScan(SLogicNode* pNode) {
20,278✔
285
  if (1 != LIST_LENGTH(pNode->pChildren)) {
20,278!
286
    return false;
27✔
287
  }
288
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
20,251✔
289
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
20,259✔
290
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
173!
291
      return false;
×
292
    }
293
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
173✔
294
  }
295
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan((SScanLogicNode*)pChild)) {
20,259✔
296
    return true;
3,073✔
297
  }
298

299
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
17,188✔
300
    if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) || (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) &&
9,338✔
301
                                                         ((SWindowLogicNode*)pNode)->winType == WINDOW_TYPE_INTERVAL)) {
72✔
302
      return ((SScanLogicNode*)pChild)->needSplit;
9,196✔
303
    }
304
  }
305
  return false;
7,992✔
306
}
307

308
static bool stbSplIsMultiTbScanChild(SLogicNode* pNode) {
265✔
309
  if (1 != LIST_LENGTH(pNode->pChildren)) {
265!
310
    return false;
×
311
  }
312
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
265✔
313
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan((SScanLogicNode*)pChild));
265✔
314
}
315

316
static bool stbSplNeedSplitWindow(SLogicNode* pNode) {
1,553✔
317
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
1,553✔
318
  if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
1,553✔
319
    return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(pNode);
1,175!
320
  }
321

322
  if (WINDOW_TYPE_EXTERNAL == pWindow->winType) {
378!
323
    return pWindow->pFuncs && !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(pNode);
×
324
  }
325

326
  if (WINDOW_TYPE_SESSION == pWindow->winType) {
378✔
327
    return stbSplHasMultiTbScan(pNode);
114✔
328
  }
329

330
  if (WINDOW_TYPE_STATE == pWindow->winType || WINDOW_TYPE_COUNT == pWindow->winType) {
264✔
331
    return stbSplHasMultiTbScan(pNode);
155✔
332
  }
333

334
  return false;
109✔
335
}
336

337
static bool stbSplNeedSplitJoin(SJoinLogicNode* pJoin) {
6,201✔
338
  if (pJoin->isSingleTableJoin || JOIN_ALGO_HASH == pJoin->joinAlgo) {
6,201!
339
    return false;
5,360✔
340
  }
341
  SNode* pChild = NULL;
841✔
342
  FOREACH(pChild, pJoin->node.pChildren) {
861!
343
    if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) && QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pChild)) {
851!
344
      return false;
831✔
345
    }
346
  }
347
  return true;
10✔
348
}
349

350
static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
2,084✔
351
  if (1 != LIST_LENGTH(pNode->pChildren)) {
2,084!
352
    return false;
×
353
  }
354
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
2,084✔
355
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
2,084!
356
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
×
357
      return false;
×
358
    }
359
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
×
360
  }
361
  return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
2,084!
362
}
363

364
static bool stbSplNeedSplit(SFindSplitNodeCtx* pCtx, SLogicNode* pNode) {
116,200✔
365
  switch (nodeType(pNode)) {
116,200✔
366
    case QUERY_NODE_LOGIC_PLAN_SCAN:
43,084✔
367
      return stbSplIsMultiTbScan((SScanLogicNode*)pNode);
43,084✔
368
    case QUERY_NODE_LOGIC_PLAN_JOIN:
6,201✔
369
      return stbSplNeedSplitJoin((SJoinLogicNode*)pNode);
6,201✔
370
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
265✔
371
      return stbSplIsMultiTbScanChild(pNode);
265✔
372
    case QUERY_NODE_LOGIC_PLAN_AGG:
14,524✔
373
      return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
15,183✔
374
              isPartTableAgg((SAggLogicNode*)pNode)) &&
31,135✔
375
             (stbSplHasMultiTbScan(pNode) && !stbSplIsTableCountQuery(pNode));
16,113!
376
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
1,553✔
377
      return stbSplNeedSplitWindow(pNode);
1,553✔
378
    case QUERY_NODE_LOGIC_PLAN_SORT:
4,811✔
379
      return stbSplHasMultiTbScan(pNode);
4,811✔
380

381
    default:
45,762✔
382
      break;
45,762✔
383
  }
384
  return false;
45,762✔
385
}
386

387
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
116,203✔
388
                                SStableSplitInfo* pInfo) {
389
  SFindSplitNodeCtx ctx = {.pSplitCtx = pCxt, .pSubplan = pSubplan};
116,203✔
390
  if (stbSplNeedSplit(&ctx, pNode)) {
116,203✔
391
    pInfo->pSplitNode = pNode;
5,138✔
392
    pInfo->pSubplan = pSubplan;
5,138✔
393
    return true;
5,138✔
394
  }
395
  return false;
111,049✔
396
}
397

398
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMidFuncs, SNodeList** pMergeFuncs) {
2,245✔
399
  SNode* pNode = NULL;
2,245✔
400
  FOREACH(pNode, pFuncs) {
18,120✔
401
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
15,874✔
402
    SFunctionNode* pPartFunc = NULL;
15,874✔
403
    SFunctionNode* pMidFunc = NULL;
15,874✔
404
    SFunctionNode* pMergeFunc = NULL;
15,874✔
405
    int32_t        code = TSDB_CODE_SUCCESS;
15,874✔
406
    if (fmIsWindowPseudoColumnFunc(pFunc->funcId) || fmIsPlaceHolderFunc(pFunc->funcId)) {
15,874!
407
      code = nodesCloneNode(pNode, (SNode**)&pPartFunc);
1,428✔
408
      if (TSDB_CODE_SUCCESS == code) {
1,428!
409
        code = nodesCloneNode(pNode, (SNode**)&pMergeFunc);
1,428✔
410
      }
411
      if(TSDB_CODE_SUCCESS == code && pMidFuncs != NULL){
1,428!
412
        code = nodesCloneNode(pNode, (SNode**)&pMidFunc);
×
413
        if (NULL == pMidFunc) {
×
414
          nodesDestroyNode((SNode*)pMidFunc);
×
415
        }
416
      }
417
    } else {
418
      code = fmGetDistMethod(pFunc, &pPartFunc, &pMidFunc, &pMergeFunc);
14,445✔
419
    }
420
    if (TSDB_CODE_SUCCESS == code) {
15,873!
421
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
15,873✔
422
    }
423
    if (TSDB_CODE_SUCCESS == code) {
15,875!
424
      if(pMidFuncs != NULL){
15,875!
425
        code = nodesListMakeStrictAppend(pMidFuncs, (SNode*)pMidFunc);
×
426
      }else{
427
        nodesDestroyNode((SNode*)pMidFunc);
15,875✔
428
      }
429
    }
430
    if (TSDB_CODE_SUCCESS == code) {
15,875!
431
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
15,875✔
432
    }
433
    if (TSDB_CODE_SUCCESS != code) {
15,875!
434
      nodesDestroyNode((SNode*)pPartFunc);
×
435
      nodesDestroyNode((SNode*)pMidFunc);
×
436
      nodesDestroyNode((SNode*)pMergeFunc);
×
437
      return code;
×
438
    }
439
  }
440
  return TSDB_CODE_SUCCESS;
2,246✔
441
}
442

443
static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex, uint8_t precision) {
878✔
444
  int32_t index = 0;
878✔
445
  SNode*  pFunc = NULL;
878✔
446
  FOREACH(pFunc, pFuncs) {
1,732!
447
    if (FUNCTION_TYPE_WSTART == ((SFunctionNode*)pFunc)->funcType) {
1,364✔
448
      *pIndex = index;
510✔
449
      return TSDB_CODE_SUCCESS;
510✔
450
    }
451
    ++index;
854✔
452
  }
453

454
  SFunctionNode* pWStart = NULL;
368✔
455
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pWStart);
368✔
456
  if (NULL == pWStart) {
368!
457
    return code;
×
458
  }
459
  tstrncpy(pWStart->functionName, "_wstart", TSDB_FUNC_NAME_LEN);
368✔
460
  int64_t pointer = (int64_t)pWStart;
368✔
461
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
368✔
462
  int32_t len = tsnprintf(name, sizeof(name) - 1, "%s.%" PRId64, pWStart->functionName, pointer);
368✔
463
  (void)taosHashBinary(name, len);
464
  tstrncpy(pWStart->node.aliasName, name, TSDB_COL_NAME_LEN);
368✔
465
  pWStart->node.resType.precision = precision;
368✔
466

467
  code = fmGetFuncInfo(pWStart, NULL, 0);
368✔
468
  if (TSDB_CODE_SUCCESS == code) {
368!
469
    code = nodesListStrictAppend(pFuncs, (SNode*)pWStart);
368✔
470
  }
471
  *pIndex = index;
368✔
472
  return code;
368✔
473
}
474

475
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
×
476
  int32_t index = 0;
×
477
  SNode*  pFunc = NULL;
×
478
  FOREACH(pFunc, pWin->pFuncs) {
×
479
    if (FUNCTION_TYPE_WEND == ((SFunctionNode*)pFunc)->funcType) {
×
480
      *pIndex = index;
×
481
      return TSDB_CODE_SUCCESS;
×
482
    }
483
    ++index;
×
484
  }
485

486
  SFunctionNode* pWEnd = NULL;
×
487
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pWEnd);
×
488
  if (NULL == pWEnd) {
×
489
    return code;
×
490
  }
491
  tstrncpy(pWEnd->functionName, "_wend", TSDB_FUNC_NAME_LEN);
×
492
  int64_t pointer = (int64_t)pWEnd;
×
493
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
×
494
  int32_t len = tsnprintf(name, sizeof(name) - 1, "%s.%" PRId64, pWEnd->functionName, pointer);
×
495
  (void)taosHashBinary(name, len);
496
  tstrncpy(pWEnd->node.aliasName, name, TSDB_COL_NAME_LEN);
×
497

498
  code = fmGetFuncInfo(pWEnd, NULL, 0);
×
499
  if (TSDB_CODE_SUCCESS == code) {
×
500
    code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
×
501
  }
502
  *pIndex = index;
×
503
  if (TSDB_CODE_SUCCESS == code) {
×
504
    code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
×
505
  }
506
  return code;
×
507
}
508

509
static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
878✔
510
  SNodeList* pFunc = pMergeWindow->pFuncs;
878✔
511
  pMergeWindow->pFuncs = NULL;
878✔
512
  SNodeList* pTargets = pMergeWindow->node.pTargets;
878✔
513
  pMergeWindow->node.pTargets = NULL;
878✔
514
  SNodeList* pChildren = pMergeWindow->node.pChildren;
878✔
515
  pMergeWindow->node.pChildren = NULL;
878✔
516
  SNode* pConditions = pMergeWindow->node.pConditions;
878✔
517
  pMergeWindow->node.pConditions = NULL;
878✔
518

519
  SWindowLogicNode* pPartWin = NULL;
878✔
520
  int32_t code = nodesCloneNode((SNode*)pMergeWindow, (SNode**)&pPartWin);
878✔
521
  if (NULL == pPartWin) {
878!
522
    return code;
×
523
  }
524

525
  pPartWin->node.groupAction = GROUP_ACTION_KEEP;
878✔
526
  pMergeWindow->node.pTargets = pTargets;
878✔
527
  pMergeWindow->node.pConditions = pConditions;
878✔
528
  pPartWin->node.pChildren = pChildren;
878✔
529
  splSetParent((SLogicNode*)pPartWin);
878✔
530

531
  int32_t index = 0;
878✔
532
  code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, NULL, &pMergeWindow->pFuncs);
878✔
533
  if (TSDB_CODE_SUCCESS == code) {
878!
534
    code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
878✔
535
  }
536
  if (TSDB_CODE_SUCCESS == code) {
878!
537
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
878✔
538
  }
539
  if (TSDB_CODE_SUCCESS == code) {
878!
540
    nodesDestroyNode(pMergeWindow->pTspk);
878✔
541
    pMergeWindow->pTspk = NULL;
878✔
542
    code = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index), &pMergeWindow->pTspk);
878✔
543
  }
544

545
  nodesDestroyList(pFunc);
878✔
546
  if (TSDB_CODE_SUCCESS == code) {
878!
547
    *pPartWindow = (SLogicNode*)pPartWin;
878✔
548
  } else {
549
    nodesDestroyNode((SNode*)pPartWin);
×
550
  }
551

552
  return code;
878✔
553
}
554

555
static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) {
×
556
  SNodeList* pFunc = pMergeWindow->pFuncs;
×
557
  pMergeWindow->pFuncs = NULL;
×
558
  SNodeList* pTargets = pMergeWindow->node.pTargets;
×
559
  pMergeWindow->node.pTargets = NULL;
×
560
  SNodeList* pChildren = pMergeWindow->node.pChildren;
×
561
  pMergeWindow->node.pChildren = NULL;
×
562
  SNode* pConditions = pMergeWindow->node.pConditions;
×
563
  pMergeWindow->node.pConditions = NULL;
×
564

565
  SWindowLogicNode* pPartWin = NULL;
×
566
  int32_t code = nodesCloneNode((SNode*)pMergeWindow, (SNode**)&pPartWin);
×
567
  if (NULL == pPartWin) {
×
568
    return code;
×
569
  }
570

571
  SWindowLogicNode* pMidWin = NULL;
×
572
  code = nodesCloneNode((SNode*)pMergeWindow, (SNode**)&pMidWin);
×
573
  if (NULL == pMidWin) {
×
574
    nodesDestroyNode((SNode*)pPartWin);
×
575
    return code;
×
576
  }
577

578
  pPartWin->node.groupAction = GROUP_ACTION_KEEP;
×
579
  pMidWin->node.groupAction = GROUP_ACTION_KEEP;
×
580
  pMergeWindow->node.pTargets = pTargets;
×
581
  pMergeWindow->node.pConditions = pConditions;
×
582

583
  pPartWin->node.pChildren = pChildren;
×
584
  splSetParent((SLogicNode*)pPartWin);
×
585

586
  SNodeList* pFuncPart = NULL;
×
587
  SNodeList* pFuncMid = NULL;
×
588
  SNodeList* pFuncMerge = NULL;
×
589
  code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMid, &pFuncMerge);
×
590
  pPartWin->pFuncs = pFuncPart;
×
591
  pMidWin->pFuncs = pFuncMid;
×
592
  pMergeWindow->pFuncs = pFuncMerge;
×
593

594
  int32_t index = 0;
×
595
  if (TSDB_CODE_SUCCESS == code) {
×
596
    code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
×
597
  }
598
  if (TSDB_CODE_SUCCESS == code) {
×
599
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
×
600
  }
601

602
  if (TSDB_CODE_SUCCESS == code) {
×
603
    nodesDestroyNode(pMidWin->pTspk);
×
604
    pMidWin->pTspk = NULL;
×
605
    code = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index), &pMidWin->pTspk);
×
606
  }
607

608
  if (TSDB_CODE_SUCCESS == code) {
×
609
    code = stbSplAppendWStart(pMidWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
×
610
  }
611
  if (TSDB_CODE_SUCCESS == code) {
×
612
    code = createColumnByRewriteExprs(pMidWin->pFuncs, &pMidWin->node.pTargets);
×
613
  }
614

615
  if (TSDB_CODE_SUCCESS == code) {
×
616
    nodesDestroyNode(pMergeWindow->pTspk);
×
617
    code = nodesCloneNode(nodesListGetNode(pMidWin->node.pTargets, index), &pMergeWindow->pTspk);
×
618
  }
619

620
  nodesDestroyList(pFunc);
×
621
  if (TSDB_CODE_SUCCESS == code) {
×
622
    *pPartWindow = (SLogicNode*)pPartWin;
×
623
    *pMidWindow = (SLogicNode*)pMidWin;
×
624
  } else {
625
    nodesDestroyNode((SNode*)pPartWin);
×
626
    nodesDestroyNode((SNode*)pMidWin);
×
627
  }
628

629
  return code;
×
630
}
631

632
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
2,950✔
633
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
2,950✔
634
    return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
2,000✔
635
  } else {
636
    if (1 == LIST_LENGTH(pNode->pChildren)) {
950!
637
      return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
950✔
638
    }
639
  }
640
  return 0;
×
641
}
642

643
static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* pNode) {
2,000✔
644
  int32_t code = TSDB_CODE_SUCCESS;
2,000✔
645
  pMerge->node.inputTsOrder = pNode->outputTsOrder;
2,000✔
646
  pMerge->node.outputTsOrder = pNode->outputTsOrder;
2,000✔
647

648
  switch (nodeType(pNode)) {
2,000✔
649
    case QUERY_NODE_LOGIC_PLAN_PROJECT: {
6✔
650
      SProjectLogicNode *pLogicNode = (SProjectLogicNode*)pNode;
6✔
651
      if (pLogicNode->ignoreGroupId && (pMerge->node.pLimit || pMerge->node.pSlimit)) {
6!
652
        pMerge->ignoreGroupId = true;
×
653
        pLogicNode->ignoreGroupId = false;
×
654
      }
655
      break;
6✔
656
    }
657
    case QUERY_NODE_LOGIC_PLAN_WINDOW: {
878✔
658
      SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
878✔
659
      if (pMerge->node.pLimit) {
878✔
660
        nodesDestroyNode(pMerge->node.pLimit);
463✔
661
        pMerge->node.pLimit = NULL;
463✔
662
      }
663
      if (pMerge->node.pSlimit) {
878!
664
        nodesDestroyNode(pMerge->node.pSlimit);
×
665
        pMerge->node.pSlimit = NULL;
×
666
      }
667
      break;
878✔
668
    }
669
    case QUERY_NODE_LOGIC_PLAN_SORT: {
54✔
670
      SSortLogicNode* pSort = (SSortLogicNode*)pNode;
54✔
671
      if (pSort->calcGroupId) pMerge->inputWithGroupId = true;
54!
672
      break;
54✔
673
    }
674
    default:
1,062✔
675
      break;
1,062✔
676
  }
677

678
  return code;
2,000✔
679
}
680

681
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
2,000✔
682
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort, bool needSort) {
683
  SMergeLogicNode* pMerge = NULL;
2,000✔
684
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE, (SNode**)&pMerge);
2,000✔
685
  if (NULL == pMerge) {
2,000!
686
    return code;
×
687
  }
688
  pMerge->needSort = needSort;
2,000✔
689
  pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
2,000✔
690
  pMerge->srcGroupId = pCxt->groupId;
2,000✔
691
  pMerge->srcEndGroupId = pCxt->groupId;
2,000✔
692
  pMerge->node.precision = pPartChild->precision;
2,000✔
693
  pMerge->pMergeKeys = pMergeKeys;
2,000✔
694
  pMerge->groupSort = groupSort;
2,000✔
695
  pMerge->numOfSubplans = 1;
2,000✔
696

697
  pMerge->pInputs = NULL;
2,000✔
698
  code = nodesCloneList(pPartChild->pTargets, &pMerge->pInputs);
2,000✔
699
  if (TSDB_CODE_SUCCESS == code) {
2,000!
700
    // NULL != pSubplan means 'merge node' replaces 'split node'.
701
    if (NULL == pSubplan) {
2,000✔
702
      code = nodesCloneList(pPartChild->pTargets, &pMerge->node.pTargets);
878✔
703
    } else {
704
      code = nodesCloneList(pSplitNode->pTargets, &pMerge->node.pTargets);
1,122✔
705
    }
706
  }
707
  if (TSDB_CODE_SUCCESS == code && NULL != pSplitNode->pLimit) {
2,000!
708
    pMerge->node.pLimit = NULL;
1,404✔
709
    code = nodesCloneNode(pSplitNode->pLimit, &pMerge->node.pLimit);
1,404✔
710
    if (((SLimitNode*)pSplitNode->pLimit)->limit && ((SLimitNode*)pSplitNode->pLimit)->offset) {
1,404!
711
      ((SLimitNode*)pSplitNode->pLimit)->limit->datum.i += ((SLimitNode*)pSplitNode->pLimit)->offset->datum.i;
5✔
712
    }
713
    if (((SLimitNode*)pSplitNode->pLimit)->offset) {
1,404✔
714
      ((SLimitNode*)pSplitNode->pLimit)->offset->datum.i = 0;
5✔
715
    }
716
  }
717
  if (TSDB_CODE_SUCCESS == code) {
2,000!
718
    code = stbSplRewriteFromMergeNode(pMerge, pSplitNode);
2,000✔
719
  }
720
  if (TSDB_CODE_SUCCESS == code) {
2,000!
721
    if (NULL == pSubplan) {
2,000✔
722
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
878✔
723
    } else {
724
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
1,122✔
725
    }
726
  }
727
  if (TSDB_CODE_SUCCESS != code) {
2,000!
728
    nodesDestroyNode((SNode*)pMerge);
×
729
  }
730
  return code;
2,000✔
731
}
732

733
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
1,366✔
734
  SExchangeLogicNode* pExchange = NULL;
1,366✔
735
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
1,366✔
736
  if (TSDB_CODE_SUCCESS == code) {
1,367!
737
    pExchange->node.pParent = pParent;
1,367✔
738
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
1,367✔
739
  }
740
  return code;
1,367✔
741
}
742

743
static int32_t stbSplCreateMergeKeysByExpr(SNode* pExpr, EOrder order, SNodeList** pMergeKeys) {
1,937✔
744
  SOrderByExprNode* pOrderByExpr = NULL;
1,937✔
745
  int32_t code = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExpr);
1,937✔
746
  if (NULL == pOrderByExpr) {
1,937!
747
    return code;
×
748
  }
749
  pOrderByExpr->pExpr = NULL;
1,937✔
750
  code = nodesCloneNode(pExpr, &pOrderByExpr->pExpr);
1,937✔
751
  if (NULL == pOrderByExpr->pExpr) {
1,937!
752
    nodesDestroyNode((SNode*)pOrderByExpr);
×
753
    return code;
×
754
  }
755
  pOrderByExpr->order = order;
1,937✔
756
  pOrderByExpr->nullOrder = (order == ORDER_ASC) ? NULL_ORDER_FIRST : NULL_ORDER_LAST;
1,937✔
757
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pOrderByExpr);
1,937✔
758
}
759

760
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
1,937✔
761
  return stbSplCreateMergeKeysByExpr(pPrimaryKey, order, pMergeKeys);
1,937✔
762
}
763

764
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
878✔
765
  if (((SWindowLogicNode*)pInfo->pSplitNode)->winType == WINDOW_TYPE_EXTERNAL) {
878!
766
    if (!((SWindowLogicNode*)pInfo->pSplitNode)->pFuncs) {
×
767
      // only have projection in external window.
768
      return TSDB_CODE_SUCCESS;
×
769
    }
770
  }
771
  SLogicNode* pPartWindow = NULL;
878✔
772
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
878✔
773
  if (TSDB_CODE_SUCCESS == code) {
878!
774
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = ((SWindowLogicNode*)pInfo->pSplitNode)->winType == WINDOW_TYPE_INTERVAL ? INTERVAL_ALGO_HASH : EXTERNAL_ALGO_HASH;
878!
775
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = ((SWindowLogicNode*)pInfo->pSplitNode)->winType == WINDOW_TYPE_INTERVAL ? INTERVAL_ALGO_MERGE : EXTERNAL_ALGO_MERGE;
878!
776
    SNodeList* pMergeKeys = NULL;
878✔
777
    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk,
878✔
778
                                             ((SWindowLogicNode*)pInfo->pSplitNode)->node.outputTsOrder, &pMergeKeys);
878✔
779
    if (TSDB_CODE_SUCCESS == code) {
878!
780
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true, true);
878✔
781
    }
782
    if (TSDB_CODE_SUCCESS != code) {
878!
783
      nodesDestroyList(pMergeKeys);
×
784
    }
785
  }
786
  SLogicSubplan* pSplitSubPlan = NULL;
878✔
787
  if (TSDB_CODE_SUCCESS == code) {
878!
788
    pSplitSubPlan = splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT);
878✔
789
    if (!pSplitSubPlan) code = terrno;
878!
790
  }
791
  if (code == TSDB_CODE_SUCCESS) {
878!
792
    SNode* pNode;
793
    SMergeLogicNode* pMerge = (SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode;
878✔
794
    SWindowLogicNode* pWindow = (SWindowLogicNode*)pInfo->pSplitNode;
878✔
795
    if (LIST_LENGTH(pWindow->pTsmaSubplans) > 0) {
878!
796
      FOREACH(pNode, pWindow->pTsmaSubplans) {
×
797
        ++(pCxt->groupId);
×
798
        SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
×
799
        pSubplan->id.groupId = pCxt->groupId;
×
800
        pSubplan->id.queryId = pCxt->queryId;
×
801
        //pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
802
        splSetSubplanVgroups(pSubplan, pSubplan->pNode);
×
803
        code = stbSplCreatePartWindowNode((SWindowLogicNode*)pSubplan->pNode, &pPartWindow);
×
804
        if (TSDB_CODE_SUCCESS == code) {
×
805
          nodesDestroyNode((SNode*)pSubplan->pNode);
×
806
          pSubplan->pNode = pPartWindow;
×
807
        }
808
      }
809
      code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pWindow->pTsmaSubplans);
×
810
      pMerge->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
×
811
    }
812
    pMerge->srcEndGroupId = pCxt->groupId;
878✔
813
  }
814
  if (code == TSDB_CODE_SUCCESS) {
878!
815
    code = nodesListMakePushFront(&pInfo->pSubplan->pChildren, (SNode*)pSplitSubPlan);
878✔
816
  }
817
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
878✔
818
  ++(pCxt->groupId);
878✔
819
  return code;
878✔
820
}
821

822
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
51✔
823
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
51✔
824
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
48✔
825
    pScan->scanType = SCAN_TYPE_TABLE_MERGE;
48✔
826
    pScan->filesetDelimited = true;
48✔
827
    if (NULL != pScan->pGroupTags) {
48!
828
      pScan->groupSort = true;
×
829
    }
830
  } else {
831
    if (1 == LIST_LENGTH(pNode->pChildren)) {
3!
832
      stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
3✔
833
    }
834
  }
835
}
51✔
836

837
static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
48✔
838
  SLogicNode* pWindow = pInfo->pSplitNode;
48✔
839
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
48✔
840

841
  SNodeList* pMergeKeys = NULL;
48✔
842
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk,
48✔
843
                                                      ((SWindowLogicNode*)pWindow)->node.inputTsOrder, &pMergeKeys);
844

845
  if (TSDB_CODE_SUCCESS == code) {
48!
846
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true, true);
48✔
847
  }
848

849
  if (TSDB_CODE_SUCCESS == code) {
48!
850
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
48✔
851
                                     (SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
48✔
852
  }
853

854
  if (TSDB_CODE_SUCCESS == code) {
48!
855
    stbSplSetTableMergeScan(pChild);
48✔
856
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
48✔
857
    //SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
858
    ++(pCxt->groupId);
48✔
859
  } else {
860
    nodesDestroyList(pMergeKeys);
×
861
  }
862

863
  return code;
48✔
864
}
865

866
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
926✔
867
  switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
926!
868
    case WINDOW_TYPE_INTERVAL:
878✔
869
    case WINDOW_TYPE_EXTERNAL:
870
      return stbSplSplitIntervalForBatch(pCxt, pInfo);
878✔
871
    case WINDOW_TYPE_SESSION:
48✔
872
    case WINDOW_TYPE_STATE:
873
    case WINDOW_TYPE_EVENT:
874
    case WINDOW_TYPE_COUNT:
875
    case WINDOW_TYPE_ANOMALY:
876
      return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
48✔
877
    default:
×
878
      break;
×
879
  }
880
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
881
}
882

883
static bool stbSplNeedSeqRecvData(SLogicNode* pNode) {
18✔
884
  if (NULL == pNode) {
18✔
885
    return false;
9✔
886
  }
887

888
  if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
9!
889
    return true;
×
890
  }
891
  return stbSplNeedSeqRecvData(pNode->pParent);
9✔
892
}
893

894
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
9✔
895
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_FILL == nodeType(pInfo->pSplitNode->pParent)) {
9!
896
    pInfo->pSplitNode = pInfo->pSplitNode->pParent;
3✔
897
  }
898
  SExchangeLogicNode* pExchange = NULL;
9✔
899
  int32_t             code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
9✔
900
  if (TSDB_CODE_SUCCESS == code) {
9!
901
    code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
9✔
902
  }
903
  if (TSDB_CODE_SUCCESS == code) {
9!
904
    pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange);
9✔
905
    pExchange->dynTbname = false;
9✔
906
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
9✔
907
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
9✔
908
  }
909
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
9✔
910
  ++(pCxt->groupId);
9✔
911
  return code;
9✔
912
}
913

914
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
935✔
915
  if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) &&
935✔
916
      (LIST_LENGTH(((SWindowLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) == 0)) {
9!
917
    return stbSplSplitWindowForPartTable(pCxt, pInfo);
9✔
918
  } else {
919
    return stbSplSplitWindowForCrossTable(pCxt, pInfo);
926✔
920
  }
921
}
922

923
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
1,366✔
924
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
1,366✔
925
  pMergeAgg->pAggFuncs = NULL;
1,366✔
926
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
1,366✔
927
  pMergeAgg->pGroupKeys = NULL;
1,366✔
928
  SNodeList* pTargets = pMergeAgg->node.pTargets;
1,366✔
929
  pMergeAgg->node.pTargets = NULL;
1,366✔
930
  SNodeList* pChildren = pMergeAgg->node.pChildren;
1,366✔
931
  pMergeAgg->node.pChildren = NULL;
1,366✔
932
  SNode* pConditions = pMergeAgg->node.pConditions;
1,366✔
933
  pMergeAgg->node.pConditions = NULL;
1,366✔
934

935
  SAggLogicNode* pPartAgg = NULL;
1,366✔
936
  int32_t code = nodesCloneNode((SNode*)pMergeAgg, (SNode**)&pPartAgg);
1,366✔
937
  if (NULL == pPartAgg) {
1,366!
938
    return code;
×
939
  }
940

941
  pPartAgg->node.groupAction = GROUP_ACTION_KEEP;
1,366✔
942

943
  if (NULL != pGroupKeys) {
1,366✔
944
    pPartAgg->pGroupKeys = pGroupKeys;
39✔
945
    code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
39✔
946
  }
947
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
1,366!
948
    pMergeAgg->pGroupKeys = NULL;
39✔
949
    code = nodesCloneList(pPartAgg->node.pTargets, &pMergeAgg->pGroupKeys);
39✔
950
  }
951
  if (TSDB_CODE_SUCCESS == code) {
1,366!
952
    pMergeAgg->node.pConditions = pConditions;
1,366✔
953
    pMergeAgg->node.pTargets = pTargets;
1,366✔
954
    pPartAgg->node.pChildren = pChildren;
1,366✔
955
    splSetParent((SLogicNode*)pPartAgg);
1,366✔
956

957
    code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, NULL, &pMergeAgg->pAggFuncs);
1,366✔
958
  }
959
  if (TSDB_CODE_SUCCESS == code) {
1,367!
960
    code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
1,367✔
961
  }
962

963
  nodesDestroyList(pFunc);
1,367✔
964
  if (TSDB_CODE_SUCCESS == code) {
1,366!
965
    *pOutput = (SLogicNode*)pPartAgg;
1,366✔
966
  } else {
967
    nodesDestroyNode((SNode*)pPartAgg);
×
968
  }
969

970
  return code;
1,366✔
971
}
972

973
static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
719✔
974
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE, false);
719✔
975
  if (TSDB_CODE_SUCCESS == code) {
719!
976
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
719✔
977
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
719✔
978
  }
979
  ++(pCxt->groupId);
719✔
980
  return code;
719✔
981
}
982

983

984
/**
985
 * @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes.
986
 *        For agg + partition, results are sorted by group id, use group sort.
987
 *        For agg + sort for group, results are sorted by partition keys, not group id, merges keys should be the same
988
 *            as partition keys
989
 */
990
static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* pInfo, SLogicNode* pChildAgg) {
×
991
  bool       groupSort = true;
×
992
  SNodeList* pMergeKeys = NULL;
×
993
  int32_t    code = TSDB_CODE_SUCCESS;
×
994
  bool       sortForGroup = false;
×
995

996
  if (pChildAgg->pChildren->length != 1) return TSDB_CODE_TSC_INTERNAL_ERROR;
×
997

998
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pChildAgg->pChildren, 0);
×
999
  if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_SORT) {
×
1000
    SSortLogicNode* pSort = (SSortLogicNode*)pChild;
×
1001
    if (pSort->calcGroupId) {
×
1002
      SNode *node, *node2;
1003
      groupSort = false;
×
1004
      sortForGroup = true;
×
1005
      SNodeList* extraAggFuncs = NULL;
×
1006
      uint32_t   originalLen = LIST_LENGTH(pSort->node.pTargets), idx = 0;
×
1007
      code = stbSplCreateMergeKeys(pSort->pSortKeys, pSort->node.pTargets, &pMergeKeys);
×
1008
      if (TSDB_CODE_SUCCESS != code) return code;
×
1009

1010
      // Create group_key func for all sort keys.
1011
      // We only need newly added nodes in pSort.node.pTargets when stbSplCreateMergeKeys
1012
      FOREACH(node, pSort->node.pTargets) {
×
1013
        if (idx++ < originalLen) continue;
×
1014
        SFunctionNode* pGroupKeyFunc = createGroupKeyAggFunc((SColumnNode*)node);
×
1015
        if (!pGroupKeyFunc) {
×
1016
          code = terrno;
×
1017
          break;
×
1018
        }
1019
        code = nodesListMakeStrictAppend(&extraAggFuncs, (SNode*)pGroupKeyFunc);
×
1020
        if (code != TSDB_CODE_SUCCESS) {
×
1021
          nodesDestroyNode((SNode*)pGroupKeyFunc);
×
1022
        }
1023
      }
1024

1025
      if (TSDB_CODE_SUCCESS == code) {
×
1026
        // add these extra group_key funcs into targets
1027
        code = createColumnByRewriteExprs(extraAggFuncs, &pChildAgg->pTargets);
×
1028
      }
1029
      if (code == TSDB_CODE_SUCCESS) {
×
1030
        code = nodesListAppendList(((SAggLogicNode*)pChildAgg)->pAggFuncs, extraAggFuncs);
×
1031
        extraAggFuncs = NULL;
×
1032
      }
1033

1034
      if (code == TSDB_CODE_SUCCESS) {
×
1035
        FOREACH(node, pMergeKeys) {
×
1036
          SOrderByExprNode* pOrder = (SOrderByExprNode*)node;
×
1037
          SColumnNode*      pCol = (SColumnNode*)pOrder->pExpr;
×
1038
          FOREACH(node2, ((SAggLogicNode*)pChildAgg)->pAggFuncs) {
×
1039
            SFunctionNode* pFunc = (SFunctionNode*)node2;
×
1040
            if (0 != strcmp(pFunc->functionName, "_group_key")) continue;
×
1041
            SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1042
            if (!nodesEqualNode(pParam, (SNode*)pCol)) continue;
×
1043

1044
            // use the colName of group_key func to make sure finding the right slot id for merge keys.
1045
            tstrncpy(pCol->colName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
×
1046
            tstrncpy(pCol->node.aliasName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
×
1047
            memset(pCol->tableAlias, 0, TSDB_TABLE_NAME_LEN);
×
1048
            break;
×
1049
          }
1050
        }
1051
      }
1052
      if (TSDB_CODE_SUCCESS != code) {
×
1053
        nodesDestroyList(pMergeKeys);
×
1054
        nodesDestroyList(extraAggFuncs);
×
1055
      }
1056
    }
1057
  }
1058
  if (TSDB_CODE_SUCCESS == code) {
×
1059
    code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort, true);
×
1060
  }
1061
  if (TSDB_CODE_SUCCESS == code && sortForGroup) {
×
1062
    SMergeLogicNode* pMerge =
1063
        (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
×
1064
    pMerge->inputWithGroupId = true;
×
1065
  }
1066
  return code;
×
1067
}
1068

1069
static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
1070
  SLogicNode*      pPartAgg = NULL;
×
1071
  bool             hasExchange = false;
×
1072
  SMergeLogicNode* pMergeNode = NULL;
×
1073
  SLogicSubplan*   pFirstScanSubplan = NULL;
×
1074
  int32_t          code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
×
1075

1076
  if (TSDB_CODE_SUCCESS == code) {
×
1077
    if (pInfo->pSplitNode->forceCreateNonBlockingOptr) {
×
1078
      code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
×
1079
    } else {
1080
      hasExchange = true;
×
1081
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, false, false);
×
1082
    }
1083
    pMergeNode =
1084
        (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
×
1085
  } else {
1086
    nodesDestroyNode((SNode*)pPartAgg);
×
1087
  }
1088

1089
  if (code == TSDB_CODE_SUCCESS) {
×
1090
    pFirstScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
×
1091
    if (!pFirstScanSubplan) code = terrno;
×
1092
  }
1093

1094
  if (code == TSDB_CODE_SUCCESS) {
×
1095
    SNode* pNode;
1096
    SAggLogicNode* pAgg = (SAggLogicNode*)pInfo->pSplitNode;
×
1097
    if (LIST_LENGTH(pAgg->pTsmaSubplans) > 0) {
×
1098
      FOREACH(pNode, pAgg->pTsmaSubplans) {
×
1099
        ++(pCxt->groupId);
×
1100
        SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
×
1101
        pSubplan->id.groupId = pCxt->groupId;
×
1102
        pSubplan->id.queryId = pCxt->queryId;
×
1103
        //pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
1104
        splSetSubplanVgroups(pSubplan, pSubplan->pNode);
×
1105
        code = stbSplCreatePartAggNode((SAggLogicNode*)pSubplan->pNode, &pPartAgg);
×
1106
        if (code) break;
×
1107
        nodesDestroyNode((SNode*)pSubplan->pNode);
×
1108
        pSubplan->pNode = pPartAgg;
×
1109
      }
1110
      code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pAgg->pTsmaSubplans);
×
1111
      pMergeNode->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
×
1112
    }
1113
    pMergeNode->srcEndGroupId = pCxt->groupId;
×
1114
  }
1115

1116
  if (code == TSDB_CODE_SUCCESS) {
×
1117
    code = nodesListMakeAppend(&pInfo->pSubplan->pChildren, (SNode*)pFirstScanSubplan);
×
1118
  }
1119

1120
  if (code && pFirstScanSubplan) {
×
1121
    nodesDestroyNode((SNode*)pFirstScanSubplan);
×
1122
  }
1123

1124
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
×
1125
  ++(pCxt->groupId);
×
1126
  return code;
×
1127
}
1128

1129
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1,367✔
1130
  SLogicNode* pPartAgg = NULL;
1,367✔
1131
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
1,367✔
1132
  if (TSDB_CODE_SUCCESS == code) {
1,366!
1133
    // if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
1134
    if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
1,366!
1135
      code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
×
1136
    else {
1137
      code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
1,366✔
1138
    }
1139
  } else {
1140
    nodesDestroyNode((SNode*)pPartAgg);
×
1141
  }
1142

1143
  SLogicSubplan* pScanSubplan = NULL;
1,367✔
1144
  if (TSDB_CODE_SUCCESS == code) {
1,367!
1145
    pScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
1,367✔
1146
    if (!pScanSubplan) code = terrno;
1,367!
1147
  }
1148

1149
  if (code == TSDB_CODE_SUCCESS) {
1,367!
1150
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)pScanSubplan);
1,367✔
1151
  }
1152

1153
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1,367✔
1154
  ++(pCxt->groupId);
1,367✔
1155
  return code;
1,367✔
1156
}
1157

1158
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
2,084✔
1159
  if (LIST_LENGTH(((SAggLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) > 0) {
2,084!
1160
    return stbSplSplitAggNodeForCrossTableMulSubplan(pCxt, pInfo);
×
1161
  }
1162
  if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
2,084✔
1163
    return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
719✔
1164
  }
1165
  return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
1,367✔
1166
}
1167

1168
static int32_t stbSplCreateColumnNode(SExprNode* pExpr, SNode** ppNode) {
27✔
1169
  SColumnNode* pCol = NULL;
27✔
1170
  int32_t code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
27✔
1171
  if (NULL == pCol) {
27!
1172
    return code;
×
1173
  }
1174
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
27✔
1175
    tstrncpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName, TSDB_DB_NAME_LEN);
24✔
1176
    tstrncpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName, TSDB_TABLE_NAME_LEN);
24✔
1177
    tstrncpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias, TSDB_TABLE_NAME_LEN);
24✔
1178
    tstrncpy(pCol->colName, ((SColumnNode*)pExpr)->colName, TSDB_COL_NAME_LEN);
24✔
1179
  } else {
1180
    tstrncpy(pCol->colName, pExpr->aliasName, TSDB_COL_NAME_LEN);
3✔
1181
  }
1182
  tstrncpy(pCol->node.aliasName, pExpr->aliasName, TSDB_COL_NAME_LEN);
27✔
1183
  tstrncpy(pCol->node.userAlias, pExpr->userAlias, TSDB_COL_NAME_LEN);
27✔
1184
  pCol->node.resType = pExpr->resType;
27✔
1185
  *ppNode = (SNode*)pCol;
27✔
1186
  return code;
27✔
1187
}
1188

1189
static int32_t stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol, SNode** ppNode) {
70✔
1190
  SOrderByExprNode* pOutput = NULL;
70✔
1191
  int32_t code = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOutput);
70✔
1192
  if (NULL == pOutput) {
70!
1193
    return code;
×
1194
  }
1195
  pOutput->pExpr = NULL;
70✔
1196
  code = nodesCloneNode(pCol, &pOutput->pExpr);
70✔
1197
  if (NULL == pOutput->pExpr) {
70!
1198
    nodesDestroyNode((SNode*)pOutput);
×
1199
    return code;
×
1200
  }
1201
  pOutput->order = pSortKey->order;
70✔
1202
  pOutput->nullOrder = pSortKey->nullOrder;
70✔
1203
  *ppNode = (SNode*)pOutput;
70✔
1204
  return code;
70✔
1205
}
1206

1207
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
54✔
1208
  int32_t    code = TSDB_CODE_SUCCESS;
54✔
1209
  SNodeList* pMergeKeys = NULL;
54✔
1210
  SNode*     pNode = NULL;
54✔
1211
  FOREACH(pNode, pSortKeys) {
124!
1212
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
70✔
1213
    SExprNode*        pSortExpr = (SExprNode*)pSortKey->pExpr;
70✔
1214
    SNode*            pTarget = NULL;
70✔
1215
    bool              found = false;
70✔
1216
    FOREACH(pTarget, pTargets) {
309!
1217
      if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) || 
239✔
1218
          (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
196!
1219
        SNode* pNew = NULL;
43✔
1220
        code = stbSplCreateOrderByExpr(pSortKey, pTarget, &pNew);
43✔
1221
        if (TSDB_CODE_SUCCESS == code) {
43!
1222
          code = nodesListMakeStrictAppend(&pMergeKeys, pNew);
43✔
1223
        }
1224
        if (TSDB_CODE_SUCCESS != code) {
43!
1225
          break;
×
1226
        }
1227
        found = true;
43✔
1228
      }
1229
    }
1230
    if (TSDB_CODE_SUCCESS == code && !found) {
70!
1231
      SNode* pCol = NULL;
27✔
1232
      code = stbSplCreateColumnNode(pSortExpr, &pCol);
27✔
1233
      if (TSDB_CODE_SUCCESS == code) {
27!
1234
        SNode* pNew = NULL;
27✔
1235
        code = stbSplCreateOrderByExpr(pSortKey, pCol, &pNew);
27✔
1236
        if (TSDB_CODE_SUCCESS == code) {
27!
1237
          code = nodesListMakeStrictAppend(&pMergeKeys, pNew);
27✔
1238
        }
1239
      }
1240
      if (TSDB_CODE_SUCCESS == code) {
27!
1241
        code = nodesListStrictAppend(pTargets, pCol);
27✔
1242
      } else {
1243
        nodesDestroyNode(pCol);
×
1244
      }
1245
    }
1246
    if (TSDB_CODE_SUCCESS != code) {
70!
1247
      break;
×
1248
    }
1249
  }
1250
  if (TSDB_CODE_SUCCESS == code) {
54!
1251
    *pOutput = pMergeKeys;
54✔
1252
  } else {
1253
    nodesDestroyList(pMergeKeys);
×
1254
  }
1255
  return code;
54✔
1256
}
1257

1258
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
54✔
1259
                                        SNodeList** pOutputMergeKeys) {
1260
  SNodeList* pSortKeys = pSort->pSortKeys;
54✔
1261
  pSort->pSortKeys = NULL;
54✔
1262
  SNodeList* pChildren = pSort->node.pChildren;
54✔
1263
  pSort->node.pChildren = NULL;
54✔
1264

1265
  int32_t         code = TSDB_CODE_SUCCESS;
54✔
1266
  SSortLogicNode* pPartSort = NULL;
54✔
1267
  code = nodesCloneNode((SNode*)pSort, (SNode**)&pPartSort);
54✔
1268

1269
  SNodeList* pMergeKeys = NULL;
54✔
1270
  if (TSDB_CODE_SUCCESS == code) {
54!
1271
    pPartSort->node.pChildren = pChildren;
54✔
1272
    splSetParent((SLogicNode*)pPartSort);
54✔
1273
    pPartSort->pSortKeys = pSortKeys;
54✔
1274
    pPartSort->groupSort = pSort->groupSort;
54✔
1275
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
54✔
1276
  }
1277

1278
  if (TSDB_CODE_SUCCESS == code) {
54!
1279
    *pOutputPartSort = (SLogicNode*)pPartSort;
54✔
1280
    *pOutputMergeKeys = pMergeKeys;
54✔
1281
  } else {
1282
    nodesDestroyNode((SNode*)pPartSort);
×
1283
    nodesDestroyList(pMergeKeys);
×
1284
  }
1285

1286
  return code;
54✔
1287
}
1288

1289
static void stbSplSetScanPartSort(SLogicNode* pNode) {
×
1290
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
×
1291
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
×
1292
    if (NULL != pScan->pGroupTags) {
×
1293
      pScan->groupSort = true;
×
1294
    }
1295
  } else {
1296
    if (1 == LIST_LENGTH(pNode->pChildren)) {
×
1297
      stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
×
1298
    }
1299
  }
1300
}
×
1301

1302
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
54✔
1303
  SLogicNode* pPartSort = NULL;
54✔
1304
  SNodeList*  pMergeKeys = NULL;
54✔
1305
  bool        groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
54✔
1306
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
54✔
1307
  if (TSDB_CODE_SUCCESS == code) {
54!
1308
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort, true);
54✔
1309
  }
1310
  if (TSDB_CODE_SUCCESS == code) {
54!
1311
    nodesDestroyNode((SNode*)pInfo->pSplitNode);
54✔
1312
    pInfo->pSplitNode = NULL;
54✔
1313
    if (groupSort) {
54!
1314
      stbSplSetScanPartSort(pPartSort);
×
1315
    }
1316
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
54✔
1317
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
54✔
1318
  }
1319
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
54✔
1320
  ++(pCxt->groupId);
54✔
1321
  return code;
54✔
1322
}
1323

1324
static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) {
1,061✔
1325
  *pSplitNode = pInfo->pSplitNode;
1,061✔
1326
  if (NULL != pInfo->pSplitNode->pParent && 
1,061!
1327
      QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
1,061✔
1328
      NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit && 
868!
1329
      !((SProjectLogicNode*)pInfo->pSplitNode->pParent)->inputIgnoreGroup) {
868✔
1330
    *pSplitNode = pInfo->pSplitNode->pParent;
850✔
1331
    if (NULL != pInfo->pSplitNode->pLimit) {
850✔
1332
      (*pSplitNode)->pLimit = NULL;
392✔
1333
      int32_t code = nodesCloneNode(pInfo->pSplitNode->pLimit, &(*pSplitNode)->pLimit);
392✔
1334
      if (NULL == (*pSplitNode)->pLimit) {
392!
1335
        return code;
×
1336
      }
1337
      if (((SLimitNode*)pInfo->pSplitNode->pLimit)->limit && ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset) {
392!
1338
        ((SLimitNode*)pInfo->pSplitNode->pLimit)->limit->datum.i += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset->datum.i;
3✔
1339
      }
1340
      if (((SLimitNode*)pInfo->pSplitNode->pLimit)->offset) {
392✔
1341
        ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset->datum.i = 0;
3✔
1342
      }
1343
    }
1344
  }
1345
  return TSDB_CODE_SUCCESS;
1,061✔
1346
}
1347

1348
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1,055✔
1349
  SLogicNode* pSplitNode = NULL;
1,055✔
1350
  int32_t     code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
1,055✔
1351
  if (TSDB_CODE_SUCCESS == code) {
1,055!
1352
    code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, pInfo->pSubplan->subplanType, false);
1,055✔
1353
  }
1354
  if (TSDB_CODE_SUCCESS == code) {
1,055!
1355
    splSetSubplanType(pInfo->pSubplan);
1,055✔
1356
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1,055✔
1357
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
1,055✔
1358
  }
1359
  ++(pCxt->groupId);
1,055✔
1360
  return code;
1,055✔
1361
}
1362

1363
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
6✔
1364
  SLogicNode* pSplitNode = NULL;
6✔
1365
  int32_t     code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
6✔
1366
  if (TSDB_CODE_SUCCESS == code) {
6!
1367
    bool needSort = true;
6✔
1368
    if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pSplitNode) && !pSplitNode->pLimit && !pSplitNode->pSlimit) {
6!
1369
      needSort = !((SProjectLogicNode*)pSplitNode)->ignoreGroupId;
6✔
1370
    }
1371
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, needSort, needSort);
6✔
1372
  }
1373
  if (TSDB_CODE_SUCCESS == code) {
6!
1374
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
6✔
1375
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
6✔
1376
  }
1377
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
6✔
1378
  ++(pCxt->groupId);
6✔
1379
  return code;
6✔
1380
}
1381

1382
static int32_t stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan, SNode** ppNode) {
1,011✔
1383
  bool   find = false;
1,011✔
1384
  SNode* pCol = NULL;
1,011✔
1385
  FOREACH(pCol, pScan->pScanCols) {
1,645!
1386
    if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
1,645✔
1387
      find = true;
1,011✔
1388
      break;
1,011✔
1389
    }
1390
  }
1391
  if (!find) {
1,011!
1392
    *ppNode = NULL;
×
1393
    return TSDB_CODE_SUCCESS;
×
1394
  }
1395
  SNode* pTarget = NULL;
1,011✔
1396
  FOREACH(pTarget, pScan->node.pTargets) {
1,645!
1397
    if (nodesEqualNode(pTarget, pCol)) {
1,645✔
1398
      *ppNode = pCol;
1,011✔
1399
      return TSDB_CODE_SUCCESS;
1,011✔
1400
    }
1401
  }
1402
  SNode* pNew = NULL;
×
1403
  int32_t code = nodesCloneNode(pCol, &pNew);
×
1404
  if (TSDB_CODE_SUCCESS == code) {
×
1405
    code = nodesListStrictAppend(pScan->node.pTargets, pNew);
×
1406
  }
1407
  if (TSDB_CODE_SUCCESS == code) {
×
1408
    *ppNode = pCol;
×
1409
  }
1410
  return code;
×
1411
}
1412

1413
static int32_t stbSplFindPkFromScan(SScanLogicNode* pScan, SNode** ppNode) {
1,008✔
1414
  int32_t code = 0;
1,008✔
1415
  bool   find = false;
1,008✔
1416
  SNode* pCol = NULL;
1,008✔
1417
  FOREACH(pCol, pScan->pScanCols) {
3,407!
1418
    if (((SColumnNode*)pCol)->isPk) {
2,399!
1419
      find = true;
×
1420
      break;
×
1421
    }
1422
  }
1423
  if (!find) {
1,008!
1424
    *ppNode = NULL;
1,008✔
1425
    return code;
1,008✔
1426
  }
1427
  SNode* pTarget = NULL;
×
1428
  FOREACH(pTarget, pScan->node.pTargets) {
×
1429
    if (nodesEqualNode(pTarget, pCol)) {
×
1430
      *ppNode = pCol;
×
1431
      return code;
×
1432
    }
1433
  }
1434
  SNode* pNew = NULL;
×
1435
  code = nodesCloneNode(pCol, &pNew);
×
1436
  if (TSDB_CODE_SUCCESS == code) {
×
1437
    code = nodesListStrictAppend(pScan->node.pTargets, pNew);
×
1438
  }
1439
  if (TSDB_CODE_SUCCESS == code) {
×
1440
    *ppNode = pCol;
×
1441
  }
1442
  return code;
×
1443
}
1444

1445
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
1,008✔
1446
                                         SNodeList** pOutputMergeKeys) {
1447
  SNodeList* pChildren = pScan->node.pChildren;
1,008✔
1448
  pScan->node.pChildren = NULL;
1,008✔
1449

1450
  int32_t         code = TSDB_CODE_SUCCESS;
1,008✔
1451
  SScanLogicNode* pMergeScan = NULL;
1,008✔
1452
  code = nodesCloneNode((SNode*)pScan, (SNode**)&pMergeScan);
1,008✔
1453

1454
  SNodeList* pMergeKeys = NULL;
1,008✔
1455
  if (TSDB_CODE_SUCCESS == code) {
1,008!
1456
    pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
1,008✔
1457
    pMergeScan->filesetDelimited = true;
1,008✔
1458
    pMergeScan->node.pChildren = pChildren;
1,008✔
1459
    splSetParent((SLogicNode*)pMergeScan);
1,008✔
1460

1461
    SNode* pTs = NULL;
1,008✔
1462
    code = stbSplFindPrimaryKeyFromScan(pMergeScan, &pTs);
1,008✔
1463
    if (TSDB_CODE_SUCCESS == code) {
1,008!
1464
      code = stbSplCreateMergeKeysByPrimaryKey(pTs, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
1,008✔
1465
    }
1466
    SNode* pPk = NULL;
1,008✔
1467
    if (TSDB_CODE_SUCCESS == code) {
1,008!
1468
      code = stbSplFindPkFromScan(pMergeScan, &pPk);
1,008✔
1469
    }
1470
    if (TSDB_CODE_SUCCESS == code && NULL != pPk) {
1,008!
1471
      code = stbSplCreateMergeKeysByExpr(pPk, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
×
1472
    }
1473
  }
1474

1475
  if (TSDB_CODE_SUCCESS == code) {
1,008!
1476
    *pOutputMergeScan = (SLogicNode*)pMergeScan;
1,008✔
1477
    *pOutputMergeKeys = pMergeKeys;
1,008✔
1478
  } else {
1479
    nodesDestroyNode((SNode*)pMergeScan);
×
1480
    nodesDestroyList(pMergeKeys);
×
1481
  }
1482

1483
  return code;
1,008✔
1484
}
1485

1486
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan,
1,008✔
1487
                                        bool groupSort, SStableSplitInfo* pInfo) {
1488
  SLogicNode* pMergeScan = NULL;
1,008✔
1489
  SNodeList*  pMergeKeys = NULL;
1,008✔
1490
  int32_t     code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys);
1,008✔
1491
  if (TSDB_CODE_SUCCESS == code) {
1,008!
1492
    if (NULL != pMergeScan->pLimit) {
1,008✔
1493
      if (((SLimitNode*)pMergeScan->pLimit)->limit && ((SLimitNode*)pMergeScan->pLimit)->offset) {
937!
1494
        ((SLimitNode*)pMergeScan->pLimit)->limit->datum.i += ((SLimitNode*)pMergeScan->pLimit)->offset->datum.i;
3✔
1495
      }
1496
      if (((SLimitNode*)pMergeScan->pLimit)->offset) {
937✔
1497
        ((SLimitNode*)pMergeScan->pLimit)->offset->datum.i = 0;
3✔
1498
      }
1499
    }
1500
    code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort, true);
1,008✔
1501
  }
1502
  if (TSDB_CODE_SUCCESS == code) {
1,008!
1503
    if ((void*)pInfo->pSplitNode == (void*)pScan) {
1,008✔
1504
      pInfo->pSplitNode = NULL;
988✔
1505
    }
1506
    nodesDestroyNode((SNode*)pScan);
1,008✔
1507
    code = nodesListMakeStrictAppend(&pSubplan->pChildren,
1,008✔
1508
                                     (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
1,008✔
1509
  }
1510
  ++(pCxt->groupId);
1,008✔
1511
  return code;
1,008✔
1512
}
1513

1514
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
2,049✔
1515
  SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
2,049✔
1516
  if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) {
2,049✔
1517
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
988✔
1518
    return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan, true, pInfo);
988✔
1519
  }
1520
  if (NULL != pScan->pGroupTags) {
1,061✔
1521
    return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
6✔
1522
  }
1523
  return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
1,055✔
1524
}
1525

1526
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin, SStableSplitInfo* pInfo) {
10✔
1527
  int32_t code = TSDB_CODE_SUCCESS;
10✔
1528
  SNode*  pChild = NULL;
10✔
1529
  FOREACH(pChild, pJoin->node.pChildren) {
30!
1530
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
20!
1531
      //if (pJoin->node.dynamicOp) {
1532
      //  code = TSDB_CODE_SUCCESS;
1533
      //} else {
1534
        code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, pJoin->grpJoin ? true : false, pInfo);
20✔
1535
      //}
1536
    } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
×
1537
      code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild, pInfo);
×
1538
    } else {
1539
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1540
    }
1541
    if (TSDB_CODE_SUCCESS != code) {
20!
1542
      break;
×
1543
    }
1544
  }
1545
  return code;
10✔
1546
}
1547

1548
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
10✔
1549
  int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode, pInfo);
10✔
1550
  if (TSDB_CODE_SUCCESS == code) {
10!
1551
    //if (!pInfo->pSplitNode->dynamicOp) {
1552
      pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
10✔
1553
    //}
1554
    //SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
1555
    pInfo->pSplitNode->splitDone = true;
10✔
1556
  }
1557
  return code;
10✔
1558
}
1559

1560
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
3✔
1561
  SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0);
3✔
1562
  SNode*          pPK = NULL;
3✔
1563
  SNode*          pPrimaryKey = NULL;
3✔
1564
  int32_t code = stbSplFindPrimaryKeyFromScan(pScan, &pPK);
3✔
1565
  if (TSDB_CODE_SUCCESS == code) {
3!
1566
    code = nodesCloneNode(pPK, &pPrimaryKey);
3✔
1567
  }
1568
  if (NULL == pPrimaryKey) {
3!
1569
    return code;
×
1570
  }
1571
  code = nodesListStrictAppend(pPart->pTargets, pPrimaryKey);
3✔
1572
  if (TSDB_CODE_SUCCESS == code) {
3!
1573
    code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys);
3!
1574
  }
1575
  return code;
3✔
1576
}
1577

1578
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
6✔
1579
  int32_t    code = TSDB_CODE_SUCCESS;
6✔
1580
  SNodeList* pMergeKeys = NULL;
6✔
1581
  if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) {
6✔
1582
    code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys);
3✔
1583
  }
1584
  if (TSDB_CODE_SUCCESS == code) {
6!
1585
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true, true);
6✔
1586
  }
1587
  if (TSDB_CODE_SUCCESS == code) {
6!
1588
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
6✔
1589
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
6✔
1590
  }
1591
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
6✔
1592
  ++(pCxt->groupId);
6✔
1593
  return code;
6✔
1594
}
1595

1596
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,038✔
1597
  if (pCxt->pPlanCxt->rSmaQuery) {
30,038!
1598
    return TSDB_CODE_SUCCESS;
×
1599
  }
1600

1601
  SStableSplitInfo info = {0};
30,038✔
1602
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
30,038✔
1603
    return TSDB_CODE_SUCCESS;
24,928✔
1604
  }
1605

1606
  int32_t code = TSDB_CODE_SUCCESS;
5,128✔
1607
  switch (nodeType(info.pSplitNode)) {
5,128!
1608
    case QUERY_NODE_LOGIC_PLAN_SCAN:
2,049✔
1609
      code = stbSplSplitScanNode(pCxt, &info);
2,049✔
1610
      break;
2,049✔
1611
    case QUERY_NODE_LOGIC_PLAN_JOIN:
10✔
1612
      code = stbSplSplitJoinNode(pCxt, &info);
10✔
1613
      break;
10✔
1614
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
6✔
1615
      code = stbSplSplitPartitionNode(pCxt, &info);
6✔
1616
      break;
6✔
1617
    case QUERY_NODE_LOGIC_PLAN_AGG:
2,084✔
1618
      code = stbSplSplitAggNode(pCxt, &info);
2,084✔
1619
      break;
2,086✔
1620
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
935✔
1621
      code = stbSplSplitWindowNode(pCxt, &info);
935✔
1622
      break;
935✔
1623
    case QUERY_NODE_LOGIC_PLAN_SORT:
54✔
1624
      code = stbSplSplitSortNode(pCxt, &info);
54✔
1625
      break;
54✔
1626
    default:
×
1627
      break;
×
1628
  }
1629

1630
  if (info.pSplitNode && !pCxt->pPlanCxt->streamTriggerQuery && !pCxt->pPlanCxt->streamCalcQuery) {
5,130!
1631
    info.pSplitNode->splitDone = true;
3,876✔
1632
  }
1633
  pCxt->split = true;
5,130✔
1634
  return code;
5,130✔
1635
}
1636

1637
typedef struct SSigTbJoinSplitInfo {
1638
  SJoinLogicNode* pJoin;
1639
  SLogicNode*     pSplitNode;
1640
  SLogicSubplan*  pSubplan;
1641
} SSigTbJoinSplitInfo;
1642

1643
static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) {
122,702✔
1644
  if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
122,702✔
1645
    return false;
116,521✔
1646
  }
1647

1648
  SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
6,181✔
1649
  if (!pJoin->isSingleTableJoin) {
6,181✔
1650
    return false;
831✔
1651
  }
1652
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
10,704!
1653
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
5,354✔
1654
}
1655

1656
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
122,692✔
1657
                                      SSigTbJoinSplitInfo* pInfo) {
1658
  if (sigTbJoinSplNeedSplit(pNode)) {
122,692✔
1659
    pInfo->pJoin = (SJoinLogicNode*)pNode;
2,333✔
1660
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
2,333✔
1661
    pInfo->pSubplan = pSubplan;
2,332✔
1662
    return true;
2,332✔
1663
  }
1664
  return false;
120,373✔
1665
}
1666

1667
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,050✔
1668
  SSigTbJoinSplitInfo info = {0};
30,050✔
1669
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
30,050✔
1670
    return TSDB_CODE_SUCCESS;
27,736✔
1671
  }
1672
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false);
2,332✔
1673
  if (TSDB_CODE_SUCCESS == code) {
2,332!
1674
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
2,332✔
1675
  }
1676
  ++(pCxt->groupId);
2,332✔
1677
  pCxt->split = true;
2,332✔
1678
  return code;
2,332✔
1679
}
1680

1681
static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
69✔
1682
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
69✔
1683
  pUnionSubplan->pChildren = NULL;
69✔
1684

1685
  int32_t code = TSDB_CODE_SUCCESS;
69✔
1686

1687
  SNode* pChild = NULL;
69✔
1688
  FOREACH(pChild, pSplitNode->pChildren) {
213!
1689
    SLogicSubplan* pNewSubplan = NULL;
144✔
1690
    code = splCreateSubplan(pCxt, (SLogicNode*)pChild, &pNewSubplan);
144✔
1691
    if (TSDB_CODE_SUCCESS == code) {
144!
1692
      code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
144✔
1693
    }
1694
    if (TSDB_CODE_SUCCESS == code) {
144!
1695
      REPLACE_NODE(NULL);
144✔
1696
      code = splMountSubplan(pNewSubplan, pSubplanChildren);
144✔
1697
    }
1698
    if (TSDB_CODE_SUCCESS != code) {
144!
1699
      break;
×
1700
    }
1701
    ++(pCxt->groupId);
144✔
1702
  }
1703
  if (TSDB_CODE_SUCCESS == code) {
69!
1704
    if (NULL != pSubplanChildren) {
69✔
1705
      if (pSubplanChildren->length > 0) {
15!
1706
        code = nodesListMakeStrictAppendList(&pUnionSubplan->pChildren, pSubplanChildren);
×
1707
      } else {
1708
        nodesDestroyList(pSubplanChildren);
15✔
1709
      }
1710
    }
1711
    NODES_DESTORY_LIST(pSplitNode->pChildren);
69✔
1712
  }
1713
  return code;
69✔
1714
}
1715

1716
typedef struct SUnionAllSplitInfo {
1717
  SProjectLogicNode* pProject;
1718
  SLogicSubplan*     pSubplan;
1719
} SUnionAllSplitInfo;
1720

1721
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
134,228✔
1722
                                  SUnionAllSplitInfo* pInfo) {
1723
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
134,228✔
1724
    pInfo->pProject = (SProjectLogicNode*)pNode;
39✔
1725
    pInfo->pSubplan = pSubplan;
39✔
1726
    return true;
39✔
1727
  }
1728
  return false;
134,189✔
1729
}
1730

1731
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
39✔
1732
                                          SProjectLogicNode* pProject) {
1733
  SExchangeLogicNode* pExchange = NULL;
39✔
1734
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
39✔
1735
  if (NULL == pExchange) {
39!
1736
    return code;
×
1737
  }
1738
  pExchange->srcStartGroupId = startGroupId;
39✔
1739
  pExchange->srcEndGroupId = pCxt->groupId - 1;
39✔
1740
  pExchange->node.precision = pProject->node.precision;
39✔
1741
  pExchange->node.pTargets = NULL;
39✔
1742
  code = nodesCloneList(pProject->node.pTargets, &pExchange->node.pTargets);
39✔
1743
  if (TSDB_CODE_SUCCESS != code) {
39!
1744
    nodesDestroyNode((SNode*)pExchange);
×
1745
    return code;
×
1746
  }
1747
  pExchange->node.pConditions = NULL;
39✔
1748
  code = nodesCloneNode(pProject->node.pConditions, &pExchange->node.pConditions);
39✔
1749
  if (TSDB_CODE_SUCCESS != code) {
39!
1750
    nodesDestroyNode((SNode*)pExchange);
×
1751
    return code;
×
1752
  }
1753
  TSWAP(pExchange->node.pLimit, pProject->node.pLimit);
39✔
1754

1755
  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
39✔
1756

1757
  if (NULL == pProject->node.pParent) {
39✔
1758
    pSubplan->pNode = (SLogicNode*)pExchange;
27✔
1759
    nodesDestroyNode((SNode*)pProject);
27✔
1760
    return TSDB_CODE_SUCCESS;
27✔
1761
  }
1762

1763
  SNode* pNode;
1764
  FOREACH(pNode, pProject->node.pParent->pChildren) {
12!
1765
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
12!
1766
      REPLACE_NODE(pExchange);
12✔
1767
      nodesDestroyNode(pNode);
12✔
1768
      return TSDB_CODE_SUCCESS;
12✔
1769
    }
1770
  }
1771
  nodesDestroyNode((SNode*)pExchange);
×
1772
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1773
}
1774

1775
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,054✔
1776
  SUnionAllSplitInfo info = {0};
30,054✔
1777
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
30,054✔
1778
    return TSDB_CODE_SUCCESS;
30,021✔
1779
  }
1780

1781
  int32_t startGroupId = pCxt->groupId;
39✔
1782
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
39✔
1783
  if (TSDB_CODE_SUCCESS == code) {
39!
1784
    code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
39✔
1785
  }
1786
  pCxt->split = true;
39✔
1787
  return code;
39✔
1788
}
1789

1790
typedef struct SUnionDistinctSplitInfo {
1791
  SAggLogicNode* pAgg;
1792
  SLogicSubplan* pSubplan;
1793
} SUnionDistinctSplitInfo;
1794

1795
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
30✔
1796
                                           SAggLogicNode* pAgg) {
1797
  SExchangeLogicNode* pExchange = NULL;
30✔
1798
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
30✔
1799
  if (NULL == pExchange) {
30!
1800
    return code;
×
1801
  }
1802
  pExchange->srcStartGroupId = startGroupId;
30✔
1803
  pExchange->srcEndGroupId = pCxt->groupId - 1;
30✔
1804
  pExchange->node.precision = pAgg->node.precision;
30✔
1805
  pExchange->node.pTargets = NULL;
30✔
1806
  code = nodesCloneList(pAgg->pGroupKeys, &pExchange->node.pTargets);
30✔
1807
  if (NULL == pExchange->node.pTargets) {
30!
1808
    nodesDestroyNode((SNode*)pExchange);
×
1809
    return code;
×
1810
  }
1811

1812
  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
30✔
1813

1814
  return nodesListMakeStrictAppend(&pAgg->node.pChildren, (SNode*)pExchange);
30✔
1815
}
1816

1817
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
134,208✔
1818
                                   SUnionDistinctSplitInfo* pInfo) {
1819
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
134,208!
1820
    pInfo->pAgg = (SAggLogicNode*)pNode;
30✔
1821
    if (!pInfo->pAgg->pGroupKeys) return false;
30!
1822
    pInfo->pSubplan = pSubplan;
30✔
1823
    return true;
30✔
1824
  }
1825
  return false;
134,178✔
1826
}
1827

1828
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,049✔
1829
  SUnionDistinctSplitInfo info = {0};
30,049✔
1830
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
30,049✔
1831
    return TSDB_CODE_SUCCESS;
30,026✔
1832
  }
1833

1834
  int32_t startGroupId = pCxt->groupId;
30✔
1835
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
30✔
1836
  if (TSDB_CODE_SUCCESS == code) {
30!
1837
    code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
30✔
1838
  }
1839
  pCxt->split = true;
30✔
1840
  return code;
30✔
1841
}
1842

1843
typedef struct SSmaIndexSplitInfo {
1844
  SMergeLogicNode* pMerge;
1845
  SLogicSubplan*   pSubplan;
1846
} SSmaIndexSplitInfo;
1847

1848
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
134,399✔
1849
                                   SSmaIndexSplitInfo* pInfo) {
1850
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
134,399!
1851
    int32_t nodeType = nodeType(nodesListGetNode(pNode->pChildren, 0));
105✔
1852
    if (nodeType == QUERY_NODE_LOGIC_PLAN_EXCHANGE || nodeType == QUERY_NODE_LOGIC_PLAN_MERGE) {
105!
1853
      pInfo->pMerge = (SMergeLogicNode*)pNode;
×
1854
      pInfo->pSubplan = pSubplan;
×
1855
      return true;
×
1856
    }
1857
  }
1858
  return false;
134,399✔
1859
}
1860

1861
static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,042✔
1862
  SSmaIndexSplitInfo info = {0};
30,042✔
1863
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
30,042!
1864
    return TSDB_CODE_SUCCESS;
30,060✔
1865
  }
1866

1867
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
×
1868
  if (TSDB_CODE_SUCCESS == code) {
×
1869
    info.pMerge->srcGroupId = pCxt->groupId;
×
1870
  }
1871
  ++(pCxt->groupId);
×
1872
  pCxt->split = true;
×
1873
  return code;
×
1874
}
1875

1876
typedef struct SInsertSelectSplitInfo {
1877
  SLogicNode*    pQueryRoot;
1878
  SLogicSubplan* pSubplan;
1879
} SInsertSelectSplitInfo;
1880

1881
static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
134,308✔
1882
                                   SInsertSelectSplitInfo* pInfo) {
1883
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
134,308!
1884
      MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
19!
1885
    pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
19✔
1886
    pInfo->pSubplan = pSubplan;
19✔
1887
    return true;
19✔
1888
  }
1889
  return false;
134,289✔
1890
}
1891

1892
static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,043✔
1893
  SInsertSelectSplitInfo info = {0};
30,043✔
1894
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
30,043✔
1895
    return TSDB_CODE_SUCCESS;
30,041✔
1896
  }
1897

1898
  SLogicSubplan* pNewSubplan = NULL;
19✔
1899
  SNodeList*     pSubplanChildren = info.pSubplan->pChildren;
19✔
1900
  int32_t        code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY, false);
19✔
1901
  if (TSDB_CODE_SUCCESS == code) {
19!
1902
    code = splCreateSubplan(pCxt, info.pQueryRoot, &pNewSubplan);
19✔
1903
  }
1904
  if (TSDB_CODE_SUCCESS == code) {
19!
1905
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pNewSubplan);
19✔
1906
  }
1907
  if (TSDB_CODE_SUCCESS == code) {
19!
1908
    code = splMountSubplan(pNewSubplan, pSubplanChildren);
19✔
1909
  }
1910

1911
  SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
19✔
1912
  ++(pCxt->groupId);
19✔
1913
  pCxt->split = true;
19✔
1914
  return code;
19✔
1915
}
1916

1917
typedef struct SVirtualTableSplitInfo {
1918
  SVirtualScanLogicNode *pVirtual;
1919
  SLogicSubplan          *pSubplan;
1920
} SVirtualTableSplitInfo;
1921

1922
static bool virtualTableFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
120,770✔
1923
                                      SVirtualTableSplitInfo* pInfo) {
1924
  if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode) && 0 != LIST_LENGTH(pNode->pChildren) &&
120,770!
1925
      QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
5,392✔
1926
    pInfo->pVirtual = (SVirtualScanLogicNode*)pNode;
2,354✔
1927
    pInfo->pSubplan = pSubplan;
2,354✔
1928
    return true;
2,354✔
1929
  }
1930
  return false;
118,416✔
1931
}
1932

1933
static int32_t virtualTableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,047✔
1934
  int32_t                code = TSDB_CODE_SUCCESS;
30,047✔
1935
  SVirtualTableSplitInfo info = {0};
30,047✔
1936
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)virtualTableFindSplitNode, &info)) {
30,047✔
1937
    return TSDB_CODE_SUCCESS;
27,706✔
1938
  }
1939
  int32_t startGroupId = pCxt->groupId;
2,354✔
1940
  SNode*  pChild = NULL;
2,354✔
1941
  FOREACH(pChild, info.pVirtual->node.pChildren) {
11,585!
1942
    PLAN_ERR_JRET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)pChild, info.pSubplan->subplanType, info.pVirtual->tableType == TSDB_SUPER_TABLE));
9,231!
1943
    SLogicSubplan *sub = splCreateScanSubplan(pCxt, (SLogicNode*)pChild, 0);
9,231✔
1944
    sub->processOneBlock = (info.pVirtual->tableType == TSDB_SUPER_TABLE);
9,231✔
1945
    PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)sub));
9,231!
1946
    ++(pCxt->groupId);
9,231✔
1947
  }
1948
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
2,354✔
1949
_return:
2,354✔
1950
  pCxt->split = true;
2,354✔
1951
  return code;
2,354✔
1952
}
1953

1954
typedef struct SMergeAggColsSplitInfo {
1955
  SAggLogicNode   *pAgg;
1956
  SLogicNode      *pSplitNode;
1957
  SLogicSubplan   *pSubplan;
1958
} SMergeAggColsSplitInfo;
1959

1960
static bool mergeAggColsNeedSplit(SLogicNode* pNode) {
143,495✔
1961
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
143,495!
1962
      NULL != pNode->pParent &&
15,421✔
1963
      QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode->pParent) &&
3,834✔
1964
      ((SMergeLogicNode *)pNode->pParent)->colsMerge &&
111!
1965
      QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))) {
111✔
1966
    return true;
54✔
1967
  }
1968
  return false;
143,441✔
1969
}
1970

1971

1972
static bool mergeAggColsFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
143,494✔
1973
                                      SMergeAggColsSplitInfo* pInfo) {
1974
  if (mergeAggColsNeedSplit(pNode)) {
143,494✔
1975
    pInfo->pAgg = (SAggLogicNode *)pNode;
56✔
1976
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
56✔
1977
    pInfo->pSubplan = pSubplan;
54✔
1978
    return true;
54✔
1979
  }
1980
  return false;
143,448✔
1981
}
1982

1983
static int32_t mergeAggColsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,043✔
1984
  int32_t                code = TSDB_CODE_SUCCESS;
30,043✔
1985
  SMergeAggColsSplitInfo info = {0};
30,043✔
1986
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)mergeAggColsFindSplitNode, &info)) {
30,043✔
1987
    return TSDB_CODE_SUCCESS;
30,004✔
1988
  }
1989

1990
  PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, SUBPLAN_TYPE_MERGE, false));
54!
1991
  PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)));
54!
1992

1993
  ++(pCxt->groupId);
54✔
1994
  pCxt->split = true;
54✔
1995
  return code;
54✔
1996
}
1997

1998
typedef struct SQnodeSplitInfo {
1999
  SLogicNode*    pSplitNode;
2000
  SLogicSubplan* pSubplan;
2001
} SQnodeSplitInfo;
2002

2003
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
×
2004
                                SQnodeSplitInfo* pInfo) {
2005
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
×
2006
      QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) &&
×
2007
      QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
×
2008
      ((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
×
2009
    pInfo->pSplitNode = pNode;
×
2010
    pInfo->pSubplan = pSubplan;
×
2011
    return true;
×
2012
  }
2013
  return false;
×
2014
}
2015

2016
static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
20,822✔
2017
  if (QUERY_POLICY_QNODE != tsQueryPolicy) {
20,822!
2018
    return TSDB_CODE_SUCCESS;
20,829✔
2019
  }
2020

2021
  SQnodeSplitInfo info = {0};
×
2022
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
×
2023
    return TSDB_CODE_SUCCESS;
×
2024
  }
2025
  ((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
×
2026
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false);
×
2027
  if (TSDB_CODE_SUCCESS == code) {
×
2028
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
×
2029
    if (NULL != pScanSubplan) {
×
2030
      if (NULL != info.pSubplan->pVgroupList) {
×
2031
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
×
2032
        TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
×
2033
      } else {
2034
        info.pSubplan->numOfComputeNodes = 1;
×
2035
      }
2036
      code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
×
2037
    } else {
2038
      code = terrno;
×
2039
    }
2040
  }
2041
  info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
×
2042
  ++(pCxt->groupId);
×
2043
  pCxt->split = true;
×
2044
  return code;
×
2045
}
2046

2047
typedef struct SDynVirtualScanSplitInfo {
2048
  SScanLogicNode         *pDyn;
2049
  SLogicSubplan          *pSubplan;
2050
} SDynVirtualScanSplitInfo;
2051

2052
static bool dynVirtualScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
143,700✔
2053
                                        SDynVirtualScanSplitInfo* pInfo) {
2054
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
143,700✔
2055
      QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode->pParent) &&
25,230!
2056
      ((SScanLogicNode *)pNode)->scanType == SCAN_TYPE_SYSTEM_TABLE) {
×
2057
    pInfo->pDyn = (SScanLogicNode*)pNode;
×
2058
    pInfo->pSubplan = pSubplan;
×
2059
    return true;
×
2060
  }
2061
  return false;
143,700✔
2062
}
2063

2064
static int32_t dynVirtualScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
30,040✔
2065
  int32_t                  code = TSDB_CODE_SUCCESS;
30,040✔
2066
  SDynVirtualScanSplitInfo info = {0};
30,040✔
2067
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)dynVirtualScanFindSplitNode, &info)) {
30,040!
2068
    return TSDB_CODE_SUCCESS;
30,055✔
2069
  }
2070
  splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pDyn, info.pSubplan->subplanType, false);
×
2071
  nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)info.pDyn, 0));
×
2072

2073
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
×
2074
  ++(pCxt->groupId);
×
2075

2076
_return:
×
2077
  pCxt->split = true;
×
2078
  return code;
×
2079
}
2080

2081
typedef struct SStreamScanSplitInfo {
2082
  SLogicNode             *pSplitNode;
2083
  SLogicSubplan          *pSubplan;
2084
} SStreamScanSplitInfo;
2085

2086
static bool streamScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
6,966✔
2087
                                           SStreamScanSplitInfo* pInfo) {
2088
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent) {
6,966✔
2089
    pInfo->pSplitNode = (SLogicNode *)pNode;
1,026✔
2090
    pInfo->pSubplan = pSubplan;
1,026✔
2091
    return true;
1,026✔
2092
  }
2093
  return false;
5,940✔
2094
}
2095

2096
static int32_t streamScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
20,827✔
2097
  int32_t                     code = TSDB_CODE_SUCCESS;
20,827✔
2098
  SStreamScanSplitInfo info = {0};
20,827✔
2099
  if (!pCxt->pPlanCxt->streamTriggerQuery && !pCxt->pPlanCxt->streamCalcQuery) {
20,827✔
2100
    return TSDB_CODE_SUCCESS;
18,829✔
2101
  }
2102
  while (splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)streamScanFindSplitNode, &info)) {
3,024✔
2103
    PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false));
1,026!
2104
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
1,026✔
2105
    if (NULL != pScanSubplan) {
1,026!
2106
      if (NULL != info.pSubplan->pVgroupList) {
1,026✔
2107
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
222✔
2108
      } else {
2109
        info.pSubplan->numOfComputeNodes = 1;
804✔
2110
      }
2111
      if (!pScanSubplan->pVgroupList) {
1,026✔
2112
        PLAN_ERR_RET(cloneVgroups(&pScanSubplan->pVgroupList, info.pSubplan->pVgroupList));
222!
2113
      }
2114
      pScanSubplan->dynTbname = ((SScanLogicNode*)info.pSplitNode)->phTbnameScan;
1,026✔
2115
      PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan));
1,026!
2116
    } else {
2117
      PLAN_ERR_RET(terrno);
×
2118
    }
2119
    info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
1,026✔
2120
    ++(pCxt->groupId);
1,026✔
2121
    info.pSplitNode->splitDone = true;
1,026✔
2122
    pCxt->split = true;
1,026✔
2123
  }
2124

2125
  return code;
2,004✔
2126
}
2127

2128
// clang-format off
2129
static const SSplitRule splitRuleSet[] = {
2130
  {.pName = "SuperTableSplit",        .splitFunc = stableSplit},
2131
  {.pName = "SingleTableJoinSplit",   .splitFunc = singleTableJoinSplit},
2132
  {.pName = "UnionAllSplit",          .splitFunc = unionAllSplit},
2133
  {.pName = "UnionDistinctSplit",     .splitFunc = unionDistinctSplit},
2134
  {.pName = "SmaIndexSplit",          .splitFunc = smaIndexSplit}, // not used yet
2135
  {.pName = "InsertSelectSplit",      .splitFunc = insertSelectSplit},
2136
  {.pName = "VirtualtableSplit",      .splitFunc = virtualTableSplit},
2137
  {.pName = "MergeAggColsSplit",      .splitFunc = mergeAggColsSplit},
2138
  {.pName = "DynVirtualScanSplit",    .splitFunc = dynVirtualScanSplit},
2139
};
2140
// clang-format on
2141

2142
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
2143

2144
static int32_t dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
30,803✔
2145
  int32_t code = 0;
30,803✔
2146
  if (!tsQueryPlannerTrace) {
30,803✔
2147
    return code;
29,597✔
2148
  }
2149
  char* pStr = NULL;
1,206✔
2150
  code = nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
1,206✔
2151
  if (TSDB_CODE_SUCCESS == code) {
1,209!
2152
    if (NULL == pRuleName) {
1,209✔
2153
      qDebugL("before split, JsonPlan: %s", pStr);
813!
2154
    } else {
2155
      qDebugL("apply split %s rule, JsonPlan: %s", pRuleName, pStr);
396!
2156
    }
2157
    taosMemoryFree(pStr);
1,209!
2158
  }
2159
  return code;
1,209✔
2160
}
2161

2162
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
20,839✔
2163
  SSplitContext cxt = {
20,839✔
2164
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
20,839✔
2165
  bool    split = false;
20,839✔
2166
  int32_t code =TSDB_CODE_SUCCESS;
20,839✔
2167
  PLAN_ERR_RET(dumpLogicSubplan(NULL, pSubplan));
20,839!
2168
  do {
2169
    split = false;
30,049✔
2170
    for (int32_t i = 0; i < splitRuleNum; ++i) {
300,386✔
2171
      cxt.split = false;
270,377✔
2172
      PLAN_ERR_RET(splitRuleSet[i].splitFunc(&cxt, pSubplan));
270,377!
2173
      if (cxt.split) {
270,291✔
2174
        split = true;
9,968✔
2175
        PLAN_ERR_RET(dumpLogicSubplan(splitRuleSet[i].pName, pSubplan));
9,968!
2176
      }
2177
    }
2178
  } while (split);
30,009✔
2179

2180
  PLAN_ERR_RET(streamScanSplit(&cxt, pSubplan));
20,831!
2181
  PLAN_RET(qnodeSplit(&cxt, pSubplan));
20,835!
2182
}
2183

2184
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
122,163✔
2185
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
122,163✔
2186
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
1,018✔
2187
    return;
1,018✔
2188
  } else if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode)) {
121,145!
2189
    // do nothing, since virtual table scan node is SUBPLAN_TYPE_MERGE
2190
    return;
×
2191
  }
2192

2193
  SNode* pChild;
2194
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
123,181✔
2195
}
2196

2197
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
140,914✔
2198
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
140,914✔
2199
    return true;
20,829✔
2200
  }
2201
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
120,085✔
2202
  return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
120,085✔
2203
}
2204

2205
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
140,870✔
2206
  if (!needSplitSubplan(pLogicSubplan)) {
140,870✔
2207
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
120,168✔
2208
    return TSDB_CODE_SUCCESS;
120,131✔
2209
  }
2210
  return applySplitRule(pCxt, pLogicSubplan);
20,805✔
2211
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc