• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3903

24 Apr 2025 11:36AM UTC coverage: 55.307% (+0.09%) from 55.213%
#3903

push

travis-ci

happyguoxy
Sync branches at 2025-04-24 19:35

175024 of 316459 relevant lines covered (55.31%)

1151858.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.16
/source/libs/planner/src/planSpliter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "functionMgt.h"
17
#include "planInt.h"
18
#include "tglobal.h"
19

20
#define SPLIT_FLAG_MASK(n) (1 << n)
21

22
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
23
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
24

25
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
26
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
27

28
typedef struct SSplitContext {
29
  SPlanContext* pPlanCxt;
30
  uint64_t      queryId;
31
  int32_t       groupId;
32
  bool          split;
33
} SSplitContext;
34

35
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
36

37
typedef struct SSplitRule {
38
  char*  pName;
39
  FSplit splitFunc;
40
} SSplitRule;
41

42
typedef struct SFindSplitNodeCtx {
43
  const SSplitContext* pSplitCtx;
44
  const SLogicSubplan* pSubplan;
45
} SFindSplitNodeCtx;
46

47
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
48

49
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput);
50

51
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
2,112✔
52
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
2,112✔
53
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
1,199✔
54
  } else if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode)) {
913✔
55
    // do nothing, since virtual table scan node is SUBPLAN_TYPE_MERGE
56
  } else if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode) && ((SDynQueryCtrlLogicNode *)pNode)->qType == DYN_QTYPE_VTB_SCAN) {
913✔
57
    TSWAP(pSubplan->pVgroupList, ((SDynQueryCtrlLogicNode*)pNode)->vtbScan.pVgroupList);
×
58
  } else {
59
    if (1 == LIST_LENGTH(pNode->pChildren)) {
913✔
60
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
913✔
61
    }
62
  }
63
}
2,112✔
64

65
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
1,199✔
66
  SLogicSubplan* pSubplan = NULL;
1,199✔
67
  terrno = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan);
1,199✔
68
  if (NULL == pSubplan) {
1,199✔
69
    return NULL;
×
70
  }
71
  pSubplan->id.queryId = pCxt->queryId;
1,199✔
72
  pSubplan->id.groupId = pCxt->groupId;
1,199✔
73
  // TODO(smj):refact here.
74
  pSubplan->subplanType = nodeType(pNode) == QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN ? SUBPLAN_TYPE_MERGE : SUBPLAN_TYPE_SCAN;
1,199✔
75
  pSubplan->pNode = pNode;
1,199✔
76
  pSubplan->pNode->pParent = NULL;
1,199✔
77
  splSetSubplanVgroups(pSubplan, pNode);
1,199✔
78
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
1,199✔
79
  return pSubplan;
1,199✔
80
}
81

82
static bool splHasScan(SLogicNode* pNode) {
957✔
83
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
957✔
84
    return true;
×
85
  }
86

87
  SNode* pChild = NULL;
957✔
88
  FOREACH(pChild, pNode->pChildren) {
957✔
89
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
322✔
90
      return true;
83✔
91
    }
92
    return splHasScan((SLogicNode*)pChild);
239✔
93
  }
94

95
  return false;
635✔
96
}
97

98
static void splSetSubplanType(SLogicSubplan* pSubplan) {
718✔
99
  pSubplan->subplanType = splHasScan(pSubplan->pNode) ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
718✔
100
}
718✔
101

102
static int32_t splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, SLogicSubplan** ppSubplan) {
95✔
103
  SLogicSubplan* pSubplan = NULL;
95✔
104
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN, (SNode**)&pSubplan);
95✔
105
  if (NULL == pSubplan) {
95✔
106
    return code;
×
107
  }
108
  pSubplan->id.queryId = pCxt->queryId;
95✔
109
  pSubplan->id.groupId = pCxt->groupId;
95✔
110
  pSubplan->pNode = pNode;
95✔
111
  pNode->pParent = NULL;
95✔
112
  splSetSubplanType(pSubplan);
95✔
113
  *ppSubplan = pSubplan;
95✔
114
  return code;
95✔
115
}
116

117
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
1,086✔
118
  SExchangeLogicNode* pExchange = NULL;
1,086✔
119
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
1,086✔
120
  if (NULL == pExchange) {
1,086✔
121
    return code;
×
122
  }
123

124
  pExchange->srcStartGroupId = pCxt->groupId;
1,086✔
125
  pExchange->srcEndGroupId = pCxt->groupId;
1,086✔
126
  pExchange->node.precision = pChild->precision;
1,086✔
127
  pExchange->node.dynamicOp = pChild->dynamicOp;
1,086✔
128
  pExchange->node.pTargets = NULL;
1,086✔
129
  code = nodesCloneList(pChild->pTargets, &pExchange->node.pTargets);
1,086✔
130
  if (NULL == pExchange->node.pTargets) {
1,086✔
131
    nodesDestroyNode((SNode*)pExchange);
×
132
    return code;
×
133
  }
134
  if (NULL != pChild->pLimit) {
1,086✔
135
    pExchange->node.pLimit = NULL; 
67✔
136
    code = nodesCloneNode(pChild->pLimit, &pExchange->node.pLimit);
67✔
137
    if (NULL == pExchange->node.pLimit) {
67✔
138
      nodesDestroyNode((SNode*)pExchange);
×
139
      return code;
×
140
    }
141
    if (((SLimitNode*)pChild->pLimit)->limit && ((SLimitNode*)pChild->pLimit)->offset) {
67✔
142
      ((SLimitNode*)pChild->pLimit)->limit->datum.i += ((SLimitNode*)pChild->pLimit)->offset->datum.i;
2✔
143
    }
144
    if (((SLimitNode*)pChild->pLimit)->offset) {
67✔
145
      ((SLimitNode*)pChild->pLimit)->offset->datum.i = 0;
2✔
146
    }
147
  }
148

149
  *pOutput = pExchange;
1,086✔
150
  return TSDB_CODE_SUCCESS;
1,086✔
151
}
152

153
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
742✔
154
                                               ESubplanType subplanType, bool seqScan) {
155
  SExchangeLogicNode* pExchange = NULL;
742✔
156
  int32_t             code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
742✔
157
  if (TSDB_CODE_SUCCESS == code) {
742✔
158
    pExchange->seqRecvData = seqScan;
742✔
159
    code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
742✔
160
  }
161
  if (TSDB_CODE_SUCCESS == code) {
742✔
162
    pSubplan->subplanType = subplanType;
742✔
163
  } else {
164
    nodesDestroyNode((SNode*)pExchange);
×
165
  }
166
  return code;
742✔
167
}
168

169
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
28✔
170
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
28✔
171
    return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
14✔
172
           groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
7✔
173
  }
174

175
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
21✔
176
    return ((SMergeLogicNode*)pLogicNode)->srcGroupId <= groupId &&
10✔
177
           ((SMergeLogicNode*)pLogicNode)->srcEndGroupId >= groupId;
5✔
178
  }
179

180
  SNode* pChild;
181
  FOREACH(pChild, pLogicNode->pChildren) {
21✔
182
    bool isChild = splIsChildSubplan((SLogicNode*)pChild, groupId);
15✔
183
    if (isChild) {
15✔
184
      return isChild;
10✔
185
    }
186
  }
187
  return false;
6✔
188
}
189

190
static int32_t splMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
91✔
191
  SNode* pChild = NULL;
91✔
192
  WHERE_EACH(pChild, pChildren) {
104✔
193
    if (splIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
13✔
194
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
9✔
195
      if (TSDB_CODE_SUCCESS == code) {
9✔
196
        REPLACE_NODE(NULL);
9✔
197
        ERASE_NODE(pChildren);
9✔
198
        continue;
9✔
199
      } else {
200
        return code;
×
201
      }
202
    }
203
    WHERE_NEXT;
4✔
204
  }
205
  return TSDB_CODE_SUCCESS;
91✔
206
}
207

208
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
187,731✔
209
                           void* pInfo) {
210
  if (!pNode->splitDone && func(pCxt, pSubplan, pNode, pInfo)) {
187,731✔
211
    return true;
1,245✔
212
  }
213
  SNode* pChild;
214
  FOREACH(pChild, pNode->pChildren) {
270,097✔
215
    if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) {
84,512✔
216
      return true;
978✔
217
    }
218
  }
219
  return false;
185,585✔
220
}
221

222
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
104,472✔
223
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
104,472✔
224
    if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) {
103,323✔
225
      return true;
1,245✔
226
    }
227
  }
228
  SNode* pChild;
229
  FOREACH(pChild, pSubplan->pChildren) {
127,643✔
230
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
24,237✔
231
      return true;
14✔
232
    }
233
  }
234
  return false;
103,406✔
235
}
236

237
static void splSetParent(SLogicNode* pNode) {
449✔
238
  SNode* pChild = NULL;
449✔
239
  FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
845✔
240
}
449✔
241

242
typedef struct SStableSplitInfo {
243
  SLogicNode*    pSplitNode;
244
  SLogicSubplan* pSubplan;
245
} SStableSplitInfo;
246

247
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
5,066✔
248
  SNode* pFunc = NULL;
5,066✔
249
  FOREACH(pFunc, pFuncs) {
22,074✔
250
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
17,426✔
251
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
17,384✔
252
      return true;
423✔
253
    }
254
  }
255
  return false;
4,648✔
256
}
257

258
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
11,139✔
259
  return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) || pScan->needSplit;
11,139✔
260
}
261

262
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
4,858✔
263
  if (1 != LIST_LENGTH(pNode->pChildren)) {
4,858✔
264
    return false;
8✔
265
  }
266
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
4,850✔
267
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
4,852✔
268
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
10✔
269
      return false;
×
270
    }
271
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
10✔
272
  }
273
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)) {
4,847✔
274
    return true;
468✔
275
  }
276

277
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
4,386✔
278
    if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) || (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) &&
4,091✔
279
                                                         ((SWindowLogicNode*)pNode)->winType == WINDOW_TYPE_INTERVAL)) {
41✔
280
      return ((SScanLogicNode*)pChild)->needSplit;
4,071✔
281
    }
282
  }
283
  return false;
315✔
284
}
285

286
static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) {
13✔
287
  if (1 != LIST_LENGTH(pNode->pChildren)) {
13✔
288
    return false;
×
289
  }
290
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
13✔
291
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
13✔
292
}
293

294
static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
120✔
295
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
120✔
296
  if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
120✔
297
    return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
104✔
298
  }
299

300
  if (WINDOW_TYPE_SESSION == pWindow->winType) {
16✔
301
    if (!streamQuery) {
7✔
302
      return stbSplHasMultiTbScan(streamQuery, pNode);
7✔
303
    } else {
304
      return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
×
305
    }
306
  }
307

308
  if (WINDOW_TYPE_STATE == pWindow->winType || WINDOW_TYPE_COUNT == pWindow->winType) {
9✔
309
    if (!streamQuery) {
6✔
310
      return stbSplHasMultiTbScan(streamQuery, pNode);
6✔
311
    } else {
312
      return false;
×
313
    }
314
  }
315

316
  return false;
3✔
317
}
318

319
static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
93✔
320
  if (pJoin->isSingleTableJoin || JOIN_ALGO_HASH == pJoin->joinAlgo) {
93✔
321
    return false;
85✔
322
  }
323
  SNode* pChild = NULL;
8✔
324
  FOREACH(pChild, pJoin->node.pChildren) {
18✔
325
    if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) && QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pChild)) {
13✔
326
      return false;
3✔
327
    }
328
  }
329
  return true;
5✔
330
}
331

332
static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
390✔
333
  if (1 != LIST_LENGTH(pNode->pChildren)) {
390✔
334
    return false;
×
335
  }
336
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
390✔
337
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
390✔
338
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
×
339
      return false;
×
340
    }
341
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
×
342
  }
343
  return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
390✔
344
}
345

346
static bool stbSplNeedSplit(SFindSplitNodeCtx* pCtx, SLogicNode* pNode) {
14,690✔
347
  if (pCtx->pSplitCtx->pPlanCxt->virtualStableQuery) {
14,690✔
348
    return false;
×
349
  }
350
  bool streamQuery = pCtx->pSplitCtx->pPlanCxt->streamQuery;
14,690✔
351
  switch (nodeType(pNode)) {
14,690✔
352
    case QUERY_NODE_LOGIC_PLAN_SCAN:
6,593✔
353
      return streamQuery ? false : stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
6,593✔
354
    case QUERY_NODE_LOGIC_PLAN_JOIN:
93✔
355
      return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
93✔
356
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
13✔
357
      return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
13✔
358
    case QUERY_NODE_LOGIC_PLAN_AGG:
4,961✔
359
      return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
5,381✔
360
              isPartTableAgg((SAggLogicNode*)pNode)) &&
10,343✔
361
             (stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode));
4,955✔
362
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
120✔
363
      return stbSplNeedSplitWindow(streamQuery, pNode);
120✔
364
    case QUERY_NODE_LOGIC_PLAN_SORT:
190✔
365
      return stbSplHasMultiTbScan(streamQuery, pNode);
190✔
366
    default:
2,720✔
367
      break;
2,720✔
368
  }
369
  return false;
2,720✔
370
}
371

372
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
14,690✔
373
                                SStableSplitInfo* pInfo) {
374
  SFindSplitNodeCtx ctx = {.pSplitCtx = pCxt, .pSubplan = pSubplan};
14,690✔
375
  if (stbSplNeedSplit(&ctx, pNode)) {
14,690✔
376
    pInfo->pSplitNode = pNode;
1,144✔
377
    pInfo->pSubplan = pSubplan;
1,144✔
378
    return true;
1,144✔
379
  }
380
  return false;
13,547✔
381
}
382

383
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMidFuncs, SNodeList** pMergeFuncs) {
382✔
384
  SNode* pNode = NULL;
382✔
385
  FOREACH(pNode, pFuncs) {
879✔
386
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
497✔
387
    SFunctionNode* pPartFunc = NULL;
497✔
388
    SFunctionNode* pMidFunc = NULL;
497✔
389
    SFunctionNode* pMergeFunc = NULL;
497✔
390
    int32_t        code = TSDB_CODE_SUCCESS;
497✔
391
    if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
497✔
392
      code = nodesCloneNode(pNode, (SNode**)&pPartFunc);
18✔
393
      if (TSDB_CODE_SUCCESS == code) {
18✔
394
        code = nodesCloneNode(pNode, (SNode**)&pMergeFunc);
18✔
395
      }
396
      if(TSDB_CODE_SUCCESS == code && pMidFuncs != NULL){
18✔
397
        code = nodesCloneNode(pNode, (SNode**)&pMidFunc);
4✔
398
        if (NULL == pMidFunc) {
4✔
399
          nodesDestroyNode((SNode*)pMidFunc);
×
400
        }
401
      }
402
    } else {
403
      code = fmGetDistMethod(pFunc, &pPartFunc, &pMidFunc, &pMergeFunc);
479✔
404
    }
405
    if (TSDB_CODE_SUCCESS == code) {
497✔
406
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
497✔
407
    }
408
    if (TSDB_CODE_SUCCESS == code) {
497✔
409
      if(pMidFuncs != NULL){
497✔
410
        code = nodesListMakeStrictAppend(pMidFuncs, (SNode*)pMidFunc);
12✔
411
      }else{
412
        nodesDestroyNode((SNode*)pMidFunc);
485✔
413
      }
414
    }
415
    if (TSDB_CODE_SUCCESS == code) {
497✔
416
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
497✔
417
    }
418
    if (TSDB_CODE_SUCCESS != code) {
497✔
419
      nodesDestroyNode((SNode*)pPartFunc);
×
420
      nodesDestroyNode((SNode*)pMidFunc);
×
421
      nodesDestroyNode((SNode*)pMergeFunc);
×
422
      return code;
×
423
    }
424
  }
425
  return TSDB_CODE_SUCCESS;
382✔
426
}
427

428
static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex, uint8_t precision) {
55✔
429
  int32_t index = 0;
55✔
430
  SNode*  pFunc = NULL;
55✔
431
  FOREACH(pFunc, pFuncs) {
132✔
432
    if (FUNCTION_TYPE_WSTART == ((SFunctionNode*)pFunc)->funcType) {
95✔
433
      *pIndex = index;
18✔
434
      return TSDB_CODE_SUCCESS;
18✔
435
    }
436
    ++index;
77✔
437
  }
438

439
  SFunctionNode* pWStart = NULL;
37✔
440
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pWStart);
37✔
441
  if (NULL == pWStart) {
37✔
442
    return code;
×
443
  }
444
  tstrncpy(pWStart->functionName, "_wstart", TSDB_FUNC_NAME_LEN);
37✔
445
  int64_t pointer = (int64_t)pWStart;
37✔
446
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
37✔
447
  int32_t len = tsnprintf(name, sizeof(name) - 1, "%s.%" PRId64, pWStart->functionName, pointer);
37✔
448
  (void)taosHashBinary(name, len);
449
  tstrncpy(pWStart->node.aliasName, name, TSDB_COL_NAME_LEN);
37✔
450
  pWStart->node.resType.precision = precision;
37✔
451

452
  code = fmGetFuncInfo(pWStart, NULL, 0);
37✔
453
  if (TSDB_CODE_SUCCESS == code) {
37✔
454
    code = nodesListStrictAppend(pFuncs, (SNode*)pWStart);
37✔
455
  }
456
  *pIndex = index;
37✔
457
  return code;
37✔
458
}
459

460
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
×
461
  int32_t index = 0;
×
462
  SNode*  pFunc = NULL;
×
463
  FOREACH(pFunc, pWin->pFuncs) {
×
464
    if (FUNCTION_TYPE_WEND == ((SFunctionNode*)pFunc)->funcType) {
×
465
      *pIndex = index;
×
466
      return TSDB_CODE_SUCCESS;
×
467
    }
468
    ++index;
×
469
  }
470

471
  SFunctionNode* pWEnd = NULL;
×
472
  int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pWEnd);
×
473
  if (NULL == pWEnd) {
×
474
    return code;
×
475
  }
476
  tstrncpy(pWEnd->functionName, "_wend", TSDB_FUNC_NAME_LEN);
×
477
  int64_t pointer = (int64_t)pWEnd;
×
478
  char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
×
479
  int32_t len = tsnprintf(name, sizeof(name) - 1, "%s.%" PRId64, pWEnd->functionName, pointer);
×
480
  (void)taosHashBinary(name, len);
481
  tstrncpy(pWEnd->node.aliasName, name, TSDB_COL_NAME_LEN);
×
482

483
  code = fmGetFuncInfo(pWEnd, NULL, 0);
×
484
  if (TSDB_CODE_SUCCESS == code) {
×
485
    code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
×
486
  }
487
  *pIndex = index;
×
488
  if (TSDB_CODE_SUCCESS == code) {
×
489
    code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
×
490
  }
491
  return code;
×
492
}
493

494
static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
47✔
495
  SNodeList* pFunc = pMergeWindow->pFuncs;
47✔
496
  pMergeWindow->pFuncs = NULL;
47✔
497
  SNodeList* pTargets = pMergeWindow->node.pTargets;
47✔
498
  pMergeWindow->node.pTargets = NULL;
47✔
499
  SNodeList* pChildren = pMergeWindow->node.pChildren;
47✔
500
  pMergeWindow->node.pChildren = NULL;
47✔
501
  SNode* pConditions = pMergeWindow->node.pConditions;
47✔
502
  pMergeWindow->node.pConditions = NULL;
47✔
503

504
  SWindowLogicNode* pPartWin = NULL;
47✔
505
  int32_t code = nodesCloneNode((SNode*)pMergeWindow, (SNode**)&pPartWin);
47✔
506
  if (NULL == pPartWin) {
47✔
507
    return code;
×
508
  }
509

510
  pPartWin->node.groupAction = GROUP_ACTION_KEEP;
47✔
511
  pMergeWindow->node.pTargets = pTargets;
47✔
512
  pMergeWindow->node.pConditions = pConditions;
47✔
513
  pPartWin->node.pChildren = pChildren;
47✔
514
  splSetParent((SLogicNode*)pPartWin);
47✔
515

516
  int32_t index = 0;
47✔
517
  code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, NULL, &pMergeWindow->pFuncs);
47✔
518
  if (TSDB_CODE_SUCCESS == code) {
47✔
519
    code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
47✔
520
  }
521
  if (TSDB_CODE_SUCCESS == code) {
47✔
522
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
47✔
523
  }
524
  if (TSDB_CODE_SUCCESS == code) {
47✔
525
    nodesDestroyNode(pMergeWindow->pTspk);
47✔
526
    pMergeWindow->pTspk = NULL;
47✔
527
    code = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index), &pMergeWindow->pTspk);
47✔
528
  }
529

530
  nodesDestroyList(pFunc);
47✔
531
  if (TSDB_CODE_SUCCESS == code) {
47✔
532
    *pPartWindow = (SLogicNode*)pPartWin;
47✔
533
  } else {
534
    nodesDestroyNode((SNode*)pPartWin);
×
535
  }
536

537
  return code;
47✔
538
}
539

540
static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow, SLogicNode** pMidWindow) {
4✔
541
  SNodeList* pFunc = pMergeWindow->pFuncs;
4✔
542
  pMergeWindow->pFuncs = NULL;
4✔
543
  SNodeList* pTargets = pMergeWindow->node.pTargets;
4✔
544
  pMergeWindow->node.pTargets = NULL;
4✔
545
  SNodeList* pChildren = pMergeWindow->node.pChildren;
4✔
546
  pMergeWindow->node.pChildren = NULL;
4✔
547
  SNode* pConditions = pMergeWindow->node.pConditions;
4✔
548
  pMergeWindow->node.pConditions = NULL;
4✔
549

550
  SWindowLogicNode* pPartWin = NULL;
4✔
551
  int32_t code = nodesCloneNode((SNode*)pMergeWindow, (SNode**)&pPartWin);
4✔
552
  if (NULL == pPartWin) {
4✔
553
    return code;
×
554
  }
555

556
  SWindowLogicNode* pMidWin = NULL;
4✔
557
  code = nodesCloneNode((SNode*)pMergeWindow, (SNode**)&pMidWin);
4✔
558
  if (NULL == pMidWin) {
4✔
559
    nodesDestroyNode((SNode*)pPartWin);
×
560
    return code;
×
561
  }
562

563
  pPartWin->node.groupAction = GROUP_ACTION_KEEP;
4✔
564
  pMidWin->node.groupAction = GROUP_ACTION_KEEP;
4✔
565
  pMergeWindow->node.pTargets = pTargets;
4✔
566
  pMergeWindow->node.pConditions = pConditions;
4✔
567

568
  pPartWin->node.pChildren = pChildren;
4✔
569
  splSetParent((SLogicNode*)pPartWin);
4✔
570

571
  SNodeList* pFuncPart = NULL;
4✔
572
  SNodeList* pFuncMid = NULL;
4✔
573
  SNodeList* pFuncMerge = NULL;
4✔
574
  code = stbSplRewriteFuns(pFunc, &pFuncPart, &pFuncMid, &pFuncMerge);
4✔
575
  pPartWin->pFuncs = pFuncPart;
4✔
576
  pMidWin->pFuncs = pFuncMid;
4✔
577
  pMergeWindow->pFuncs = pFuncMerge;
4✔
578

579
  int32_t index = 0;
4✔
580
  if (TSDB_CODE_SUCCESS == code) {
4✔
581
    code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
4✔
582
  }
583
  if (TSDB_CODE_SUCCESS == code) {
4✔
584
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
4✔
585
  }
586

587
  if (TSDB_CODE_SUCCESS == code) {
4✔
588
    nodesDestroyNode(pMidWin->pTspk);
4✔
589
    pMidWin->pTspk = NULL;
4✔
590
    code = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index), &pMidWin->pTspk);
4✔
591
  }
592

593
  if (TSDB_CODE_SUCCESS == code) {
4✔
594
    code = stbSplAppendWStart(pMidWin->pFuncs, &index, ((SColumnNode*)pMergeWindow->pTspk)->node.resType.precision);
4✔
595
  }
596
  if (TSDB_CODE_SUCCESS == code) {
4✔
597
    code = createColumnByRewriteExprs(pMidWin->pFuncs, &pMidWin->node.pTargets);
4✔
598
  }
599

600
  if (TSDB_CODE_SUCCESS == code) {
4✔
601
    nodesDestroyNode(pMergeWindow->pTspk);
4✔
602
    code = nodesCloneNode(nodesListGetNode(pMidWin->node.pTargets, index), &pMergeWindow->pTspk);
4✔
603
  }
604

605
  nodesDestroyList(pFunc);
4✔
606
  if (TSDB_CODE_SUCCESS == code) {
4✔
607
    *pPartWindow = (SLogicNode*)pPartWin;
4✔
608
    *pMidWindow = (SLogicNode*)pMidWin;
4✔
609
  } else {
610
    nodesDestroyNode((SNode*)pPartWin);
×
611
    nodesDestroyNode((SNode*)pMidWin);
×
612
  }
613

614
  return code;
4✔
615
}
616

617
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
191✔
618
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
191✔
619
    return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
124✔
620
  } else {
621
    if (1 == LIST_LENGTH(pNode->pChildren)) {
67✔
622
      return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
67✔
623
    }
624
  }
625
  return 0;
×
626
}
627

628
static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* pNode) {
124✔
629
  int32_t code = TSDB_CODE_SUCCESS;
124✔
630
  pMerge->node.inputTsOrder = pNode->outputTsOrder;
124✔
631
  pMerge->node.outputTsOrder = pNode->outputTsOrder;
124✔
632

633
  switch (nodeType(pNode)) {
124✔
634
    case QUERY_NODE_LOGIC_PLAN_PROJECT: {
2✔
635
      SProjectLogicNode *pLogicNode = (SProjectLogicNode*)pNode;
2✔
636
      if (pLogicNode->ignoreGroupId && (pMerge->node.pLimit || pMerge->node.pSlimit)) {
2✔
637
        pMerge->ignoreGroupId = true;
×
638
        pLogicNode->ignoreGroupId = false;
×
639
      }
640
      break;
2✔
641
    }
642
    case QUERY_NODE_LOGIC_PLAN_WINDOW: {
47✔
643
      SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
47✔
644
      if (pMerge->node.pLimit) {
47✔
645
        nodesDestroyNode(pMerge->node.pLimit);
×
646
        pMerge->node.pLimit = NULL;
×
647
      }
648
      if (pMerge->node.pSlimit) {
47✔
649
        nodesDestroyNode(pMerge->node.pSlimit);
×
650
        pMerge->node.pSlimit = NULL;
×
651
      }
652
      break;
47✔
653
    }
654
    case QUERY_NODE_LOGIC_PLAN_SORT: {
14✔
655
      SSortLogicNode* pSort = (SSortLogicNode*)pNode;
14✔
656
      if (pSort->calcGroupId) pMerge->inputWithGroupId = true;
14✔
657
      break;
14✔
658
    }
659
    default:
61✔
660
      break;
61✔
661
  }
662

663
  return code;
124✔
664
}
665

666
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
124✔
667
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort, bool needSort) {
668
  SMergeLogicNode* pMerge = NULL;
124✔
669
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE, (SNode**)&pMerge);
124✔
670
  if (NULL == pMerge) {
124✔
671
    return code;
×
672
  }
673
  pMerge->needSort = needSort;
124✔
674
  pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
124✔
675
  pMerge->srcGroupId = pCxt->groupId;
124✔
676
  pMerge->srcEndGroupId = pCxt->groupId;
124✔
677
  pMerge->node.precision = pPartChild->precision;
124✔
678
  pMerge->pMergeKeys = pMergeKeys;
124✔
679
  pMerge->groupSort = groupSort;
124✔
680
  pMerge->numOfSubplans = 1;
124✔
681

682
  pMerge->pInputs = NULL;
124✔
683
  code = nodesCloneList(pPartChild->pTargets, &pMerge->pInputs);
124✔
684
  if (TSDB_CODE_SUCCESS == code) {
124✔
685
    // NULL != pSubplan means 'merge node' replaces 'split node'.
686
    if (NULL == pSubplan) {
124✔
687
      code = nodesCloneList(pPartChild->pTargets, &pMerge->node.pTargets);
47✔
688
    } else {
689
      code = nodesCloneList(pSplitNode->pTargets, &pMerge->node.pTargets);
77✔
690
    }
691
  }
692
  if (TSDB_CODE_SUCCESS == code && NULL != pSplitNode->pLimit) {
124✔
693
    pMerge->node.pLimit = NULL;
12✔
694
    code = nodesCloneNode(pSplitNode->pLimit, &pMerge->node.pLimit);
12✔
695
    if (((SLimitNode*)pSplitNode->pLimit)->limit && ((SLimitNode*)pSplitNode->pLimit)->offset) {
12✔
696
      ((SLimitNode*)pSplitNode->pLimit)->limit->datum.i += ((SLimitNode*)pSplitNode->pLimit)->offset->datum.i;
2✔
697
    }
698
    if (((SLimitNode*)pSplitNode->pLimit)->offset) {
12✔
699
      ((SLimitNode*)pSplitNode->pLimit)->offset->datum.i = 0;
2✔
700
    }
701
  }
702
  if (TSDB_CODE_SUCCESS == code) {
124✔
703
    code = stbSplRewriteFromMergeNode(pMerge, pSplitNode);
124✔
704
  }
705
  if (TSDB_CODE_SUCCESS == code) {
124✔
706
    if (NULL == pSubplan) {
124✔
707
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
47✔
708
    } else {
709
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
77✔
710
    }
711
  }
712
  if (TSDB_CODE_SUCCESS != code) {
124✔
713
    nodesDestroyNode((SNode*)pMerge);
×
714
  }
715
  return code;
124✔
716
}
717

718
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
339✔
719
  SExchangeLogicNode* pExchange = NULL;
339✔
720
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
339✔
721
  if (TSDB_CODE_SUCCESS == code) {
339✔
722
    pExchange->node.pParent = pParent;
339✔
723
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
339✔
724
  }
725
  return code;
339✔
726
}
727

728
static int32_t stbSplCreateMergeKeysByExpr(SNode* pExpr, EOrder order, SNodeList** pMergeKeys) {
106✔
729
  SOrderByExprNode* pOrderByExpr = NULL;
106✔
730
  int32_t code = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOrderByExpr);
106✔
731
  if (NULL == pOrderByExpr) {
106✔
732
    return code;
×
733
  }
734
  pOrderByExpr->pExpr = NULL;
106✔
735
  code = nodesCloneNode(pExpr, &pOrderByExpr->pExpr);
106✔
736
  if (NULL == pOrderByExpr->pExpr) {
106✔
737
    nodesDestroyNode((SNode*)pOrderByExpr);
×
738
    return code;
×
739
  }
740
  pOrderByExpr->order = order;
106✔
741
  pOrderByExpr->nullOrder = (order == ORDER_ASC) ? NULL_ORDER_FIRST : NULL_ORDER_LAST;
106✔
742
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pOrderByExpr);
106✔
743
}
744

745
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
106✔
746
  return stbSplCreateMergeKeysByExpr(pPrimaryKey, order, pMergeKeys);
106✔
747
}
748

749
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
47✔
750
  SLogicNode* pPartWindow = NULL;
47✔
751
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
47✔
752
  if (TSDB_CODE_SUCCESS == code) {
47✔
753
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
47✔
754
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
47✔
755
    SNodeList* pMergeKeys = NULL;
47✔
756
    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk,
47✔
757
                                             ((SWindowLogicNode*)pInfo->pSplitNode)->node.outputTsOrder, &pMergeKeys);
47✔
758
    if (TSDB_CODE_SUCCESS == code) {
47✔
759
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true, true);
47✔
760
    }
761
    if (TSDB_CODE_SUCCESS != code) {
47✔
762
      nodesDestroyList(pMergeKeys);
×
763
    }
764
  }
765
  SLogicSubplan* pSplitSubPlan = NULL;
47✔
766
  if (TSDB_CODE_SUCCESS == code) {
47✔
767
    pSplitSubPlan = splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT);
47✔
768
    if (!pSplitSubPlan) code = terrno;
47✔
769
  }
770
  if (code == TSDB_CODE_SUCCESS) {
47✔
771
    SNode* pNode;
772
    SMergeLogicNode* pMerge = (SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode;
47✔
773
    SWindowLogicNode* pWindow = (SWindowLogicNode*)pInfo->pSplitNode;
47✔
774
    if (LIST_LENGTH(pWindow->pTsmaSubplans) > 0) {
47✔
775
      FOREACH(pNode, pWindow->pTsmaSubplans) {
×
776
        ++(pCxt->groupId);
×
777
        SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
×
778
        pSubplan->id.groupId = pCxt->groupId;
×
779
        pSubplan->id.queryId = pCxt->queryId;
×
780
        //pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
781
        splSetSubplanVgroups(pSubplan, pSubplan->pNode);
×
782
        code = stbSplCreatePartWindowNode((SWindowLogicNode*)pSubplan->pNode, &pPartWindow);
×
783
        if (TSDB_CODE_SUCCESS == code) {
×
784
          nodesDestroyNode((SNode*)pSubplan->pNode);
×
785
          pSubplan->pNode = pPartWindow;
×
786
        }
787
      }
788
      code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pWindow->pTsmaSubplans);
×
789
      pMerge->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
×
790
    }
791
    pMerge->srcEndGroupId = pCxt->groupId;
47✔
792
  }
793
  if (code == TSDB_CODE_SUCCESS) {
47✔
794
    code = nodesListMakePushFront(&pInfo->pSubplan->pChildren, (SNode*)pSplitSubPlan);
47✔
795
  }
796
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
47✔
797
  ++(pCxt->groupId);
47✔
798
  return code;
47✔
799
}
800

801
static int32_t stbSplSplitIntervalForStreamMultiAgg(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
4✔
802
  SLogicNode* pPartWindow = NULL;
4✔
803
  SLogicNode* pMidWindow  = NULL;
4✔
804
  int32_t     code = stbSplCreatePartMidWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow, &pMidWindow);
4✔
805
  if (TSDB_CODE_SUCCESS == code) {
4✔
806
    ((SWindowLogicNode*)pMidWindow)->windowAlgo = INTERVAL_ALGO_STREAM_MID;
4✔
807
    if (pCxt->pPlanCxt->triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
4✔
808
      ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_CONTINUE_FINAL;
×
809
      ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_CONTINUE_SEMI;
×
810
    } else {
811
      ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
4✔
812
      ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
4✔
813
    }
814
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pMidWindow);
4✔
815
    if (TSDB_CODE_SUCCESS == code) {
4✔
816
      code = stbSplCreateExchangeNode(pCxt, pMidWindow, pPartWindow);
4✔
817
    }
818
  }
819

820
  if (TSDB_CODE_SUCCESS == code) {
4✔
821
    SNode* subPlan = NULL;
4✔
822
    code = splCreateSubplan(pCxt, pMidWindow, (SLogicSubplan**)&subPlan);
4✔
823
    if (TSDB_CODE_SUCCESS  == code) {
4✔
824
      ((SLogicSubplan*)subPlan)->subplanType = SUBPLAN_TYPE_MERGE;
4✔
825

826
      code = nodesListMakeStrictAppend(&((SLogicSubplan*)subPlan)->pChildren,
4✔
827
          (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
4✔
828
    }
829
    if (TSDB_CODE_SUCCESS == code) {
4✔
830
      code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, subPlan);
4✔
831
    }
832
  }
833

834
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
4✔
835
  ++(pCxt->groupId);
4✔
836
  return code;
4✔
837
}
838

839
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
51✔
840
  if (pCxt->pPlanCxt->streamQuery) {
51✔
841
    return stbSplSplitIntervalForStreamMultiAgg(pCxt, pInfo);
4✔
842
  } else {
843
    return stbSplSplitIntervalForBatch(pCxt, pInfo);
47✔
844
  }
845
}
846

847
static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
848
  SLogicNode* pPartWindow = NULL;
×
849
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
×
850
  if (TSDB_CODE_SUCCESS == code) {
×
851
    SWindowLogicNode* pPartWin = (SWindowLogicNode*)pPartWindow;
×
852
    SWindowLogicNode* pMergeWin = (SWindowLogicNode*)pInfo->pSplitNode;
×
853
    if (pCxt->pPlanCxt->triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
×
854
      pPartWin->windowAlgo = SESSION_ALGO_STREAM_CONTINUE_SEMI;
×
855
      pMergeWin->windowAlgo = SESSION_ALGO_STREAM_CONTINUE_FINAL;
×
856
    } else {
857
      pPartWin->windowAlgo = SESSION_ALGO_STREAM_SEMI;
×
858
      pMergeWin->windowAlgo = SESSION_ALGO_STREAM_FINAL;
×
859
    }
860

861
    int32_t index = 0;
×
862
    int32_t code = stbSplAppendWEnd(pPartWin, &index);
×
863
    if (TSDB_CODE_SUCCESS == code) {
×
864
      nodesDestroyNode(pMergeWin->pTsEnd);
×
865
      pMergeWin->pTsEnd = NULL;
×
866
      code = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index), &pMergeWin->pTsEnd);
×
867
    }
868
    if (TSDB_CODE_SUCCESS == code)
×
869
      code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
×
870
  }
871
  if (TSDB_CODE_SUCCESS == code) {
×
872
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
×
873
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
×
874
  }
875
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
×
876
  ++(pCxt->groupId);
×
877
  return code;
×
878
}
879

880
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
6✔
881
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
6✔
882
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
5✔
883
    pScan->scanType = SCAN_TYPE_TABLE_MERGE;
5✔
884
    pScan->filesetDelimited = true;
5✔
885
    if (NULL != pScan->pGroupTags) {
5✔
886
      pScan->groupSort = true;
×
887
    }
888
  } else {
889
    if (1 == LIST_LENGTH(pNode->pChildren)) {
1✔
890
      stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
1✔
891
    }
892
  }
893
}
6✔
894

895
static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
5✔
896
  SLogicNode* pWindow = pInfo->pSplitNode;
5✔
897
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
5✔
898

899
  SNodeList* pMergeKeys = NULL;
5✔
900
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk,
5✔
901
                                                      ((SWindowLogicNode*)pWindow)->node.inputTsOrder, &pMergeKeys);
902

903
  if (TSDB_CODE_SUCCESS == code) {
5✔
904
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true, true);
5✔
905
  }
906

907
  if (TSDB_CODE_SUCCESS == code) {
5✔
908
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
5✔
909
                                     (SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
5✔
910
  }
911

912
  if (TSDB_CODE_SUCCESS == code) {
5✔
913
    stbSplSetTableMergeScan(pChild);
5✔
914
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
5✔
915
    //SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
916
    ++(pCxt->groupId);
5✔
917
  } else {
918
    nodesDestroyList(pMergeKeys);
×
919
  }
920

921
  return code;
5✔
922
}
923

924
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
3✔
925
  if (pCxt->pPlanCxt->streamQuery) {
3✔
926
    return stbSplSplitSessionForStream(pCxt, pInfo);
×
927
  } else {
928
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
3✔
929
  }
930
}
931

932
static int32_t stbSplSplitStateForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
933
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
934
}
935

936
static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
2✔
937
  if (pCxt->pPlanCxt->streamQuery) {
2✔
938
    return stbSplSplitStateForStream(pCxt, pInfo);
×
939
  } else {
940
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
2✔
941
  }
942
}
943

944
static int32_t stbSplSplitEventForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
945
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
946
}
947

948
static int32_t stbSplSplitEvent(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
949
  if (pCxt->pPlanCxt->streamQuery) {
×
950
    return stbSplSplitEventForStream(pCxt, pInfo);
×
951
  } else {
952
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
×
953
  }
954
}
955

956
static int32_t stbSplSplitCountForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
957
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
958
}
959

960
static int32_t stbSplSplitCount(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
961
  if (pCxt->pPlanCxt->streamQuery) {
×
962
    return stbSplSplitCountForStream(pCxt, pInfo);
×
963
  } else {
964
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
×
965
  }
966
}
967

968
static int32_t stbSplSplitAnomalyForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
969
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
970
}
971

972
static int32_t stbSplSplitAnomaly(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
973
  if (pCxt->pPlanCxt->streamQuery) {
×
974
    return stbSplSplitAnomalyForStream(pCxt, pInfo);
×
975
  } else {
976
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
×
977
  }
978
}
979

980
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
56✔
981
  switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
56✔
982
    case WINDOW_TYPE_INTERVAL:
51✔
983
      return stbSplSplitInterval(pCxt, pInfo);
51✔
984
    case WINDOW_TYPE_SESSION:
3✔
985
      return stbSplSplitSession(pCxt, pInfo);
3✔
986
    case WINDOW_TYPE_STATE:
2✔
987
      return stbSplSplitState(pCxt, pInfo);
2✔
988
    case WINDOW_TYPE_EVENT:
×
989
      return stbSplSplitEvent(pCxt, pInfo);
×
990
    case WINDOW_TYPE_COUNT:
×
991
      return stbSplSplitCount(pCxt, pInfo);
×
992
    case WINDOW_TYPE_ANOMALY:
×
993
      return stbSplSplitAnomaly(pCxt, pInfo);
×
994
    default:
×
995
      break;
×
996
  }
997
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
998
}
999

1000
static bool stbSplNeedSeqRecvData(SLogicNode* pNode) {
9✔
1001
  if (NULL == pNode) {
9✔
1002
    return false;
3✔
1003
  }
1004

1005
  if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
6✔
1006
    return true;
2✔
1007
  }
1008
  return stbSplNeedSeqRecvData(pNode->pParent);
4✔
1009
}
1010

1011
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
8✔
1012
  if (pCxt->pPlanCxt->streamQuery) {
8✔
1013
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
3✔
1014
    return TSDB_CODE_SUCCESS;
3✔
1015
  }
1016

1017
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_FILL == nodeType(pInfo->pSplitNode->pParent)) {
5✔
1018
    pInfo->pSplitNode = pInfo->pSplitNode->pParent;
1✔
1019
  }
1020
  SExchangeLogicNode* pExchange = NULL;
5✔
1021
  int32_t             code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
5✔
1022
  if (TSDB_CODE_SUCCESS == code) {
5✔
1023
    code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
5✔
1024
  }
1025
  if (TSDB_CODE_SUCCESS == code) {
5✔
1026
    pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange);
5✔
1027
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
5✔
1028
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
5✔
1029
  }
1030
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
5✔
1031
  ++(pCxt->groupId);
5✔
1032
  return code;
5✔
1033
}
1034

1035
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
64✔
1036
  if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) &&
64✔
1037
      (LIST_LENGTH(((SWindowLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) == 0)) {
8✔
1038
    return stbSplSplitWindowForPartTable(pCxt, pInfo);
8✔
1039
  } else {
1040
    return stbSplSplitWindowForCrossTable(pCxt, pInfo);
56✔
1041
  }
1042
}
1043

1044
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
331✔
1045
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
331✔
1046
  pMergeAgg->pAggFuncs = NULL;
331✔
1047
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
331✔
1048
  pMergeAgg->pGroupKeys = NULL;
331✔
1049
  SNodeList* pTargets = pMergeAgg->node.pTargets;
331✔
1050
  pMergeAgg->node.pTargets = NULL;
331✔
1051
  SNodeList* pChildren = pMergeAgg->node.pChildren;
331✔
1052
  pMergeAgg->node.pChildren = NULL;
331✔
1053
  SNode* pConditions = pMergeAgg->node.pConditions;
331✔
1054
  pMergeAgg->node.pConditions = NULL;
331✔
1055

1056
  SAggLogicNode* pPartAgg = NULL;
331✔
1057
  int32_t code = nodesCloneNode((SNode*)pMergeAgg, (SNode**)&pPartAgg);
331✔
1058
  if (NULL == pPartAgg) {
331✔
1059
    return code;
×
1060
  }
1061

1062
  pPartAgg->node.groupAction = GROUP_ACTION_KEEP;
331✔
1063

1064
  if (NULL != pGroupKeys) {
331✔
1065
    pPartAgg->pGroupKeys = pGroupKeys;
30✔
1066
    code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
30✔
1067
  }
1068
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
331✔
1069
    pMergeAgg->pGroupKeys = NULL;
30✔
1070
    code = nodesCloneList(pPartAgg->node.pTargets, &pMergeAgg->pGroupKeys);
30✔
1071
  }
1072
  if (TSDB_CODE_SUCCESS == code) {
331✔
1073
    pMergeAgg->node.pConditions = pConditions;
331✔
1074
    pMergeAgg->node.pTargets = pTargets;
331✔
1075
    pPartAgg->node.pChildren = pChildren;
331✔
1076
    splSetParent((SLogicNode*)pPartAgg);
331✔
1077

1078
    code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, NULL, &pMergeAgg->pAggFuncs);
331✔
1079
  }
1080
  if (TSDB_CODE_SUCCESS == code) {
331✔
1081
    code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
331✔
1082
  }
1083

1084
  nodesDestroyList(pFunc);
331✔
1085
  if (TSDB_CODE_SUCCESS == code) {
331✔
1086
    *pOutput = (SLogicNode*)pPartAgg;
331✔
1087
  } else {
1088
    nodesDestroyNode((SNode*)pPartAgg);
×
1089
  }
1090

1091
  return code;
331✔
1092
}
1093

1094
static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
59✔
1095
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE, false);
59✔
1096
  if (TSDB_CODE_SUCCESS == code) {
59✔
1097
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
59✔
1098
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
59✔
1099
  }
1100
  ++(pCxt->groupId);
59✔
1101
  return code;
59✔
1102
}
1103

1104

1105
/**
1106
 * @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes.
1107
 *        For agg + partition, results are sorted by group id, use group sort.
1108
 *        For agg + sort for group, results are sorted by partition keys, not group id, merges keys should be the same
1109
 *            as partition keys
1110
 */
1111
static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* pInfo, SLogicNode* pChildAgg) {
×
1112
  bool       groupSort = true;
×
1113
  SNodeList* pMergeKeys = NULL;
×
1114
  int32_t    code = TSDB_CODE_SUCCESS;
×
1115
  bool       sortForGroup = false;
×
1116

1117
  if (pChildAgg->pChildren->length != 1) return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1118

1119
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pChildAgg->pChildren, 0);
×
1120
  if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_SORT) {
×
1121
    SSortLogicNode* pSort = (SSortLogicNode*)pChild;
×
1122
    if (pSort->calcGroupId) {
×
1123
      SNode *node, *node2;
1124
      groupSort = false;
×
1125
      sortForGroup = true;
×
1126
      SNodeList* extraAggFuncs = NULL;
×
1127
      uint32_t   originalLen = LIST_LENGTH(pSort->node.pTargets), idx = 0;
×
1128
      code = stbSplCreateMergeKeys(pSort->pSortKeys, pSort->node.pTargets, &pMergeKeys);
×
1129
      if (TSDB_CODE_SUCCESS != code) return code;
×
1130

1131
      // Create group_key func for all sort keys.
1132
      // We only need newly added nodes in pSort.node.pTargets when stbSplCreateMergeKeys
1133
      FOREACH(node, pSort->node.pTargets) {
×
1134
        if (idx++ < originalLen) continue;
×
1135
        SFunctionNode* pGroupKeyFunc = createGroupKeyAggFunc((SColumnNode*)node);
×
1136
        if (!pGroupKeyFunc) {
×
1137
          code = terrno;
×
1138
          break;
×
1139
        }
1140
        code = nodesListMakeStrictAppend(&extraAggFuncs, (SNode*)pGroupKeyFunc);
×
1141
        if (code != TSDB_CODE_SUCCESS) {
×
1142
          nodesDestroyNode((SNode*)pGroupKeyFunc);
×
1143
        }
1144
      }
1145

1146
      if (TSDB_CODE_SUCCESS == code) {
×
1147
        // add these extra group_key funcs into targets
1148
        code = createColumnByRewriteExprs(extraAggFuncs, &pChildAgg->pTargets);
×
1149
      }
1150
      if (code == TSDB_CODE_SUCCESS) {
×
1151
        code = nodesListAppendList(((SAggLogicNode*)pChildAgg)->pAggFuncs, extraAggFuncs);
×
1152
        extraAggFuncs = NULL;
×
1153
      }
1154

1155
      if (code == TSDB_CODE_SUCCESS) {
×
1156
        FOREACH(node, pMergeKeys) {
×
1157
          SOrderByExprNode* pOrder = (SOrderByExprNode*)node;
×
1158
          SColumnNode*      pCol = (SColumnNode*)pOrder->pExpr;
×
1159
          FOREACH(node2, ((SAggLogicNode*)pChildAgg)->pAggFuncs) {
×
1160
            SFunctionNode* pFunc = (SFunctionNode*)node2;
×
1161
            if (0 != strcmp(pFunc->functionName, "_group_key")) continue;
×
1162
            SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1163
            if (!nodesEqualNode(pParam, (SNode*)pCol)) continue;
×
1164

1165
            // use the colName of group_key func to make sure finding the right slot id for merge keys.
1166
            tstrncpy(pCol->colName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
×
1167
            tstrncpy(pCol->node.aliasName, pFunc->node.aliasName, TSDB_COL_NAME_LEN);
×
1168
            memset(pCol->tableAlias, 0, TSDB_TABLE_NAME_LEN);
×
1169
            break;
×
1170
          }
1171
        }
1172
      }
1173
      if (TSDB_CODE_SUCCESS != code) {
×
1174
        nodesDestroyList(pMergeKeys);
×
1175
        nodesDestroyList(extraAggFuncs);
×
1176
      }
1177
    }
1178
  }
1179
  if (TSDB_CODE_SUCCESS == code) {
×
1180
    code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort, true);
×
1181
  }
1182
  if (TSDB_CODE_SUCCESS == code && sortForGroup) {
×
1183
    SMergeLogicNode* pMerge =
1184
        (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
×
1185
    pMerge->inputWithGroupId = true;
×
1186
  }
1187
  return code;
×
1188
}
1189

1190
static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
×
1191
  SLogicNode*      pPartAgg = NULL;
×
1192
  bool             hasExchange = false;
×
1193
  SMergeLogicNode* pMergeNode = NULL;
×
1194
  SLogicSubplan*   pFirstScanSubplan = NULL;
×
1195
  int32_t          code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
×
1196

1197
  if (TSDB_CODE_SUCCESS == code) {
×
1198
    if (pInfo->pSplitNode->forceCreateNonBlockingOptr) {
×
1199
      code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
×
1200
    } else {
1201
      hasExchange = true;
×
1202
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, false, false);
×
1203
    }
1204
    pMergeNode =
1205
        (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
×
1206
  } else {
1207
    nodesDestroyNode((SNode*)pPartAgg);
×
1208
  }
1209

1210
  if (code == TSDB_CODE_SUCCESS) {
×
1211
    pFirstScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
×
1212
    if (!pFirstScanSubplan) code = terrno;
×
1213
  }
1214

1215
  if (code == TSDB_CODE_SUCCESS) {
×
1216
    SNode* pNode;
1217
    SAggLogicNode* pAgg = (SAggLogicNode*)pInfo->pSplitNode;
×
1218
    if (LIST_LENGTH(pAgg->pTsmaSubplans) > 0) {
×
1219
      FOREACH(pNode, pAgg->pTsmaSubplans) {
×
1220
        ++(pCxt->groupId);
×
1221
        SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
×
1222
        pSubplan->id.groupId = pCxt->groupId;
×
1223
        pSubplan->id.queryId = pCxt->queryId;
×
1224
        //pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
1225
        splSetSubplanVgroups(pSubplan, pSubplan->pNode);
×
1226
        code = stbSplCreatePartAggNode((SAggLogicNode*)pSubplan->pNode, &pPartAgg);
×
1227
        if (code) break;
×
1228
        nodesDestroyNode((SNode*)pSubplan->pNode);
×
1229
        pSubplan->pNode = pPartAgg;
×
1230
      }
1231
      code = nodesListMakeStrictAppendList(&pInfo->pSubplan->pChildren, pAgg->pTsmaSubplans);
×
1232
      pMergeNode->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
×
1233
    }
1234
    pMergeNode->srcEndGroupId = pCxt->groupId;
×
1235
  }
1236

1237
  if (code == TSDB_CODE_SUCCESS) {
×
1238
    code = nodesListMakeAppend(&pInfo->pSubplan->pChildren, (SNode*)pFirstScanSubplan);
×
1239
  }
1240

1241
  if (code && pFirstScanSubplan) {
×
1242
    nodesDestroyNode((SNode*)pFirstScanSubplan);
×
1243
  }
1244

1245
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
×
1246
  ++(pCxt->groupId);
×
1247
  return code;
×
1248
}
1249

1250
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
331✔
1251
  SLogicNode* pPartAgg = NULL;
331✔
1252
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
331✔
1253
  if (TSDB_CODE_SUCCESS == code) {
331✔
1254
    // if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
1255
    if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
331✔
1256
      code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
×
1257
    else {
1258
      code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
331✔
1259
    }
1260
  } else {
1261
    nodesDestroyNode((SNode*)pPartAgg);
×
1262
  }
1263

1264
  SLogicSubplan* pScanSubplan = NULL;
331✔
1265
  if (TSDB_CODE_SUCCESS == code) {
331✔
1266
    pScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
331✔
1267
    if (!pScanSubplan) code = terrno;
331✔
1268
  }
1269

1270
  if (code == TSDB_CODE_SUCCESS) {
331✔
1271
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)pScanSubplan);
331✔
1272
  }
1273

1274
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
331✔
1275
  ++(pCxt->groupId);
331✔
1276
  return code;
331✔
1277
}
1278

1279
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
390✔
1280
  if (LIST_LENGTH(((SAggLogicNode*)pInfo->pSplitNode)->pTsmaSubplans) > 0) {
390✔
1281
    return stbSplSplitAggNodeForCrossTableMulSubplan(pCxt, pInfo);
×
1282
  }
1283
  if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
390✔
1284
    return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
59✔
1285
  }
1286
  return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
331✔
1287
}
1288

1289
static int32_t stbSplCreateColumnNode(SExprNode* pExpr, SNode** ppNode) {
7✔
1290
  SColumnNode* pCol = NULL;
7✔
1291
  int32_t code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
7✔
1292
  if (NULL == pCol) {
7✔
1293
    return code;
×
1294
  }
1295
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
7✔
1296
    tstrncpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName, TSDB_DB_NAME_LEN);
6✔
1297
    tstrncpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName, TSDB_TABLE_NAME_LEN);
6✔
1298
    tstrncpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias, TSDB_TABLE_NAME_LEN);
6✔
1299
    tstrncpy(pCol->colName, ((SColumnNode*)pExpr)->colName, TSDB_COL_NAME_LEN);
6✔
1300
  } else {
1301
    tstrncpy(pCol->colName, pExpr->aliasName, TSDB_COL_NAME_LEN);
1✔
1302
  }
1303
  tstrncpy(pCol->node.aliasName, pExpr->aliasName, TSDB_COL_NAME_LEN);
7✔
1304
  tstrncpy(pCol->node.userAlias, pExpr->userAlias, TSDB_COL_NAME_LEN);
7✔
1305
  pCol->node.resType = pExpr->resType;
7✔
1306
  *ppNode = (SNode*)pCol;
7✔
1307
  return code;
7✔
1308
}
1309

1310
static int32_t stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol, SNode** ppNode) {
19✔
1311
  SOrderByExprNode* pOutput = NULL;
19✔
1312
  int32_t code = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR, (SNode**)&pOutput);
19✔
1313
  if (NULL == pOutput) {
19✔
1314
    return code;
×
1315
  }
1316
  pOutput->pExpr = NULL;
19✔
1317
  code = nodesCloneNode(pCol, &pOutput->pExpr);
19✔
1318
  if (NULL == pOutput->pExpr) {
19✔
1319
    nodesDestroyNode((SNode*)pOutput);
×
1320
    return code;
×
1321
  }
1322
  pOutput->order = pSortKey->order;
19✔
1323
  pOutput->nullOrder = pSortKey->nullOrder;
19✔
1324
  *ppNode = (SNode*)pOutput;
19✔
1325
  return code;
19✔
1326
}
1327

1328
static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
14✔
1329
  int32_t    code = TSDB_CODE_SUCCESS;
14✔
1330
  SNodeList* pMergeKeys = NULL;
14✔
1331
  SNode*     pNode = NULL;
14✔
1332
  FOREACH(pNode, pSortKeys) {
33✔
1333
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
19✔
1334
    SExprNode*        pSortExpr = (SExprNode*)pSortKey->pExpr;
19✔
1335
    SNode*            pTarget = NULL;
19✔
1336
    bool              found = false;
19✔
1337
    FOREACH(pTarget, pTargets) {
59✔
1338
      if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) || 
40✔
1339
          (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
28✔
1340
        SNode* pNew = NULL;
12✔
1341
        code = stbSplCreateOrderByExpr(pSortKey, pTarget, &pNew);
12✔
1342
        if (TSDB_CODE_SUCCESS == code) {
12✔
1343
          code = nodesListMakeStrictAppend(&pMergeKeys, pNew);
12✔
1344
        }
1345
        if (TSDB_CODE_SUCCESS != code) {
12✔
1346
          break;
×
1347
        }
1348
        found = true;
12✔
1349
      }
1350
    }
1351
    if (TSDB_CODE_SUCCESS == code && !found) {
19✔
1352
      SNode* pCol = NULL;
7✔
1353
      code = stbSplCreateColumnNode(pSortExpr, &pCol);
7✔
1354
      if (TSDB_CODE_SUCCESS == code) {
7✔
1355
        SNode* pNew = NULL;
7✔
1356
        code = stbSplCreateOrderByExpr(pSortKey, pCol, &pNew);
7✔
1357
        if (TSDB_CODE_SUCCESS == code) {
7✔
1358
          code = nodesListMakeStrictAppend(&pMergeKeys, pNew);
7✔
1359
        }
1360
      }
1361
      if (TSDB_CODE_SUCCESS == code) {
7✔
1362
        code = nodesListStrictAppend(pTargets, pCol);
7✔
1363
      } else {
1364
        nodesDestroyNode(pCol);
×
1365
      }
1366
    }
1367
    if (TSDB_CODE_SUCCESS != code) {
19✔
1368
      break;
×
1369
    }
1370
  }
1371
  if (TSDB_CODE_SUCCESS == code) {
14✔
1372
    *pOutput = pMergeKeys;
14✔
1373
  } else {
1374
    nodesDestroyList(pMergeKeys);
×
1375
  }
1376
  return code;
14✔
1377
}
1378

1379
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
14✔
1380
                                        SNodeList** pOutputMergeKeys) {
1381
  SNodeList* pSortKeys = pSort->pSortKeys;
14✔
1382
  pSort->pSortKeys = NULL;
14✔
1383
  SNodeList* pChildren = pSort->node.pChildren;
14✔
1384
  pSort->node.pChildren = NULL;
14✔
1385

1386
  int32_t         code = TSDB_CODE_SUCCESS;
14✔
1387
  SSortLogicNode* pPartSort = NULL;
14✔
1388
  code = nodesCloneNode((SNode*)pSort, (SNode**)&pPartSort);
14✔
1389

1390
  SNodeList* pMergeKeys = NULL;
14✔
1391
  if (TSDB_CODE_SUCCESS == code) {
14✔
1392
    pPartSort->node.pChildren = pChildren;
14✔
1393
    splSetParent((SLogicNode*)pPartSort);
14✔
1394
    pPartSort->pSortKeys = pSortKeys;
14✔
1395
    pPartSort->groupSort = pSort->groupSort;
14✔
1396
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
14✔
1397
  }
1398

1399
  if (TSDB_CODE_SUCCESS == code) {
14✔
1400
    *pOutputPartSort = (SLogicNode*)pPartSort;
14✔
1401
    *pOutputMergeKeys = pMergeKeys;
14✔
1402
  } else {
1403
    nodesDestroyNode((SNode*)pPartSort);
×
1404
    nodesDestroyList(pMergeKeys);
×
1405
  }
1406

1407
  return code;
14✔
1408
}
1409

1410
static void stbSplSetScanPartSort(SLogicNode* pNode) {
×
1411
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
×
1412
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
×
1413
    if (NULL != pScan->pGroupTags) {
×
1414
      pScan->groupSort = true;
×
1415
    }
1416
  } else {
1417
    if (1 == LIST_LENGTH(pNode->pChildren)) {
×
1418
      stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
×
1419
    }
1420
  }
1421
}
×
1422

1423
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
14✔
1424
  SLogicNode* pPartSort = NULL;
14✔
1425
  SNodeList*  pMergeKeys = NULL;
14✔
1426
  bool        groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
14✔
1427
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
14✔
1428
  if (TSDB_CODE_SUCCESS == code) {
14✔
1429
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort, true);
14✔
1430
  }
1431
  if (TSDB_CODE_SUCCESS == code) {
14✔
1432
    nodesDestroyNode((SNode*)pInfo->pSplitNode);
14✔
1433
    pInfo->pSplitNode = NULL;
14✔
1434
    if (groupSort) {
14✔
1435
      stbSplSetScanPartSort(pPartSort);
×
1436
    }
1437
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
14✔
1438
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
14✔
1439
  }
1440
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
14✔
1441
  ++(pCxt->groupId);
14✔
1442
  return code;
14✔
1443
}
1444

1445
static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) {
626✔
1446
  *pSplitNode = pInfo->pSplitNode;
626✔
1447
  if (NULL != pInfo->pSplitNode->pParent && 
626✔
1448
      QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
624✔
1449
      NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit && 
455✔
1450
      !((SProjectLogicNode*)pInfo->pSplitNode->pParent)->inputIgnoreGroup) {
454✔
1451
    *pSplitNode = pInfo->pSplitNode->pParent;
448✔
1452
    if (NULL != pInfo->pSplitNode->pLimit) {
448✔
1453
      (*pSplitNode)->pLimit = NULL;
66✔
1454
      int32_t code = nodesCloneNode(pInfo->pSplitNode->pLimit, &(*pSplitNode)->pLimit);
66✔
1455
      if (NULL == (*pSplitNode)->pLimit) {
66✔
1456
        return code;
×
1457
      }
1458
      if (((SLimitNode*)pInfo->pSplitNode->pLimit)->limit && ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset) {
66✔
1459
        ((SLimitNode*)pInfo->pSplitNode->pLimit)->limit->datum.i += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset->datum.i;
1✔
1460
      }
1461
      if (((SLimitNode*)pInfo->pSplitNode->pLimit)->offset) {
66✔
1462
        ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset->datum.i = 0;
1✔
1463
      }
1464
    }
1465
  }
1466
  return TSDB_CODE_SUCCESS;
626✔
1467
}
1468

1469
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
623✔
1470
  SLogicNode* pSplitNode = NULL;
623✔
1471
  int32_t     code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
623✔
1472
  if (TSDB_CODE_SUCCESS == code) {
623✔
1473
    code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, pInfo->pSubplan->subplanType, false);
623✔
1474
  }
1475
  if (TSDB_CODE_SUCCESS == code) {
623✔
1476
    splSetSubplanType(pInfo->pSubplan);
623✔
1477
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
623✔
1478
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
623✔
1479
  }
1480
  ++(pCxt->groupId);
623✔
1481
  return code;
623✔
1482
}
1483

1484
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
3✔
1485
  SLogicNode* pSplitNode = NULL;
3✔
1486
  int32_t     code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
3✔
1487
  if (TSDB_CODE_SUCCESS == code) {
3✔
1488
    bool needSort = true;
3✔
1489
    if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pSplitNode) && !pSplitNode->pLimit && !pSplitNode->pSlimit) {
3✔
1490
      needSort = !((SProjectLogicNode*)pSplitNode)->ignoreGroupId;
2✔
1491
    }
1492
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, needSort, needSort);
3✔
1493
  }
1494
  if (TSDB_CODE_SUCCESS == code) {
3✔
1495
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
3✔
1496
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
3✔
1497
  }
1498
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
3✔
1499
  ++(pCxt->groupId);
3✔
1500
  return code;
3✔
1501
}
1502

1503
static int32_t stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan, SNode** ppNode) {
54✔
1504
  bool   find = false;
54✔
1505
  SNode* pCol = NULL;
54✔
1506
  FOREACH(pCol, pScan->pScanCols) {
102✔
1507
    if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
102✔
1508
      find = true;
54✔
1509
      break;
54✔
1510
    }
1511
  }
1512
  if (!find) {
54✔
1513
    *ppNode = NULL;
×
1514
    return TSDB_CODE_SUCCESS;
×
1515
  }
1516
  SNode* pTarget = NULL;
54✔
1517
  FOREACH(pTarget, pScan->node.pTargets) {
102✔
1518
    if (nodesEqualNode(pTarget, pCol)) {
102✔
1519
      *ppNode = pCol;
54✔
1520
      return TSDB_CODE_SUCCESS;
54✔
1521
    }
1522
  }
1523
  SNode* pNew = NULL;
×
1524
  int32_t code = nodesCloneNode(pCol, &pNew);
×
1525
  if (TSDB_CODE_SUCCESS == code) {
×
1526
    code = nodesListStrictAppend(pScan->node.pTargets, pNew);
×
1527
  }
1528
  if (TSDB_CODE_SUCCESS == code) {
×
1529
    *ppNode = pCol;
×
1530
  }
1531
  return code;
×
1532
}
1533

1534
static int32_t stbSplFindPkFromScan(SScanLogicNode* pScan, SNode** ppNode) {
53✔
1535
  int32_t code = 0;
53✔
1536
  bool   find = false;
53✔
1537
  SNode* pCol = NULL;
53✔
1538
  FOREACH(pCol, pScan->pScanCols) {
199✔
1539
    if (((SColumnNode*)pCol)->isPk) {
146✔
1540
      find = true;
×
1541
      break;
×
1542
    }
1543
  }
1544
  if (!find) {
53✔
1545
    *ppNode = NULL;
53✔
1546
    return code;
53✔
1547
  }
1548
  SNode* pTarget = NULL;
×
1549
  FOREACH(pTarget, pScan->node.pTargets) {
×
1550
    if (nodesEqualNode(pTarget, pCol)) {
×
1551
      *ppNode = pCol;
×
1552
      return code;
×
1553
    }
1554
  }
1555
  SNode* pNew = NULL;
×
1556
  code = nodesCloneNode(pCol, &pNew);
×
1557
  if (TSDB_CODE_SUCCESS == code) {
×
1558
    code = nodesListStrictAppend(pScan->node.pTargets, pNew);
×
1559
  }
1560
  if (TSDB_CODE_SUCCESS == code) {
×
1561
    *ppNode = pCol;
×
1562
  }
1563
  return code;
×
1564
}
1565

1566
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
53✔
1567
                                         SNodeList** pOutputMergeKeys) {
1568
  SNodeList* pChildren = pScan->node.pChildren;
53✔
1569
  pScan->node.pChildren = NULL;
53✔
1570

1571
  int32_t         code = TSDB_CODE_SUCCESS;
53✔
1572
  SScanLogicNode* pMergeScan = NULL;
53✔
1573
  code = nodesCloneNode((SNode*)pScan, (SNode**)&pMergeScan);
53✔
1574

1575
  SNodeList* pMergeKeys = NULL;
53✔
1576
  if (TSDB_CODE_SUCCESS == code) {
53✔
1577
    pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
53✔
1578
    pMergeScan->filesetDelimited = true;
53✔
1579
    pMergeScan->node.pChildren = pChildren;
53✔
1580
    splSetParent((SLogicNode*)pMergeScan);
53✔
1581

1582
    SNode* pTs = NULL;
53✔
1583
    code = stbSplFindPrimaryKeyFromScan(pMergeScan, &pTs);
53✔
1584
    if (TSDB_CODE_SUCCESS == code) {
53✔
1585
      code = stbSplCreateMergeKeysByPrimaryKey(pTs, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
53✔
1586
    }
1587
    SNode* pPk = NULL;
53✔
1588
    if (TSDB_CODE_SUCCESS == code) {
53✔
1589
      code = stbSplFindPkFromScan(pMergeScan, &pPk);
53✔
1590
    }
1591
    if (TSDB_CODE_SUCCESS == code && NULL != pPk) {
53✔
1592
      code = stbSplCreateMergeKeysByExpr(pPk, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
×
1593
    }
1594
  }
1595

1596
  if (TSDB_CODE_SUCCESS == code) {
53✔
1597
    *pOutputMergeScan = (SLogicNode*)pMergeScan;
53✔
1598
    *pOutputMergeKeys = pMergeKeys;
53✔
1599
  } else {
1600
    nodesDestroyNode((SNode*)pMergeScan);
×
1601
    nodesDestroyList(pMergeKeys);
×
1602
  }
1603

1604
  return code;
53✔
1605
}
1606

1607
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan,
53✔
1608
                                        bool groupSort, SStableSplitInfo* pInfo) {
1609
  SLogicNode* pMergeScan = NULL;
53✔
1610
  SNodeList*  pMergeKeys = NULL;
53✔
1611
  int32_t     code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys);
53✔
1612
  if (TSDB_CODE_SUCCESS == code) {
53✔
1613
    if (NULL != pMergeScan->pLimit) {
53✔
1614
      if (((SLimitNode*)pMergeScan->pLimit)->limit && ((SLimitNode*)pMergeScan->pLimit)->offset) {
11✔
1615
        ((SLimitNode*)pMergeScan->pLimit)->limit->datum.i += ((SLimitNode*)pMergeScan->pLimit)->offset->datum.i;
1✔
1616
      }
1617
      if (((SLimitNode*)pMergeScan->pLimit)->offset) {
11✔
1618
        ((SLimitNode*)pMergeScan->pLimit)->offset->datum.i = 0;
1✔
1619
      }
1620
    }
1621
    code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort, true);
53✔
1622
  }
1623
  if (TSDB_CODE_SUCCESS == code) {
53✔
1624
    if ((void*)pInfo->pSplitNode == (void*)pScan) {
53✔
1625
      pInfo->pSplitNode = NULL;
43✔
1626
    }
1627
    nodesDestroyNode((SNode*)pScan);
53✔
1628
    code = nodesListMakeStrictAppend(&pSubplan->pChildren,
53✔
1629
                                     (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
53✔
1630
  }
1631
  ++(pCxt->groupId);
53✔
1632
  return code;
53✔
1633
}
1634

1635
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
669✔
1636
  SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
669✔
1637
  if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) {
669✔
1638
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
43✔
1639
    return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan, true, pInfo);
43✔
1640
  }
1641
  if (NULL != pScan->pGroupTags) {
626✔
1642
    return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
3✔
1643
  }
1644
  return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
623✔
1645
}
1646

1647
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin, SStableSplitInfo* pInfo) {
5✔
1648
  int32_t code = TSDB_CODE_SUCCESS;
5✔
1649
  SNode*  pChild = NULL;
5✔
1650
  FOREACH(pChild, pJoin->node.pChildren) {
15✔
1651
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
10✔
1652
      //if (pJoin->node.dynamicOp) {
1653
      //  code = TSDB_CODE_SUCCESS;
1654
      //} else {
1655
        code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, pJoin->grpJoin ? true : false, pInfo);
10✔
1656
      //}
1657
    } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
×
1658
      code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild, pInfo);
×
1659
    } else {
1660
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1661
    }
1662
    if (TSDB_CODE_SUCCESS != code) {
10✔
1663
      break;
×
1664
    }
1665
  }
1666
  return code;
5✔
1667
}
1668

1669
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
5✔
1670
  int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode, pInfo);
5✔
1671
  if (TSDB_CODE_SUCCESS == code) {
5✔
1672
    //if (!pInfo->pSplitNode->dynamicOp) {
1673
      pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
5✔
1674
    //}
1675
    //SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
1676
    pInfo->pSplitNode->splitDone = true;
5✔
1677
  }
1678
  return code;
5✔
1679
}
1680

1681
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
1✔
1682
  SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0);
1✔
1683
  SNode*          pPK = NULL;
1✔
1684
  SNode*          pPrimaryKey = NULL;
1✔
1685
  int32_t code = stbSplFindPrimaryKeyFromScan(pScan, &pPK);
1✔
1686
  if (TSDB_CODE_SUCCESS == code) {
1✔
1687
    code = nodesCloneNode(pPK, &pPrimaryKey);
1✔
1688
  }
1689
  if (NULL == pPrimaryKey) {
1✔
1690
    return code;
×
1691
  }
1692
  code = nodesListStrictAppend(pPart->pTargets, pPrimaryKey);
1✔
1693
  if (TSDB_CODE_SUCCESS == code) {
1✔
1694
    code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys);
1✔
1695
  }
1696
  return code;
1✔
1697
}
1698

1699
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
2✔
1700
  int32_t    code = TSDB_CODE_SUCCESS;
2✔
1701
  SNodeList* pMergeKeys = NULL;
2✔
1702
  if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) {
2✔
1703
    code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys);
1✔
1704
  }
1705
  if (TSDB_CODE_SUCCESS == code) {
2✔
1706
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true, true);
2✔
1707
  }
1708
  if (TSDB_CODE_SUCCESS == code) {
2✔
1709
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
2✔
1710
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
2✔
1711
  }
1712
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
2✔
1713
  ++(pCxt->groupId);
2✔
1714
  return code;
2✔
1715
}
1716

1717
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,030✔
1718
  if (pCxt->pPlanCxt->rSmaQuery) {
8,030✔
1719
    return TSDB_CODE_SUCCESS;
×
1720
  }
1721

1722
  SStableSplitInfo info = {0};
8,030✔
1723
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
8,030✔
1724
    return TSDB_CODE_SUCCESS;
6,893✔
1725
  }
1726

1727
  int32_t code = TSDB_CODE_SUCCESS;
1,140✔
1728
  switch (nodeType(info.pSplitNode)) {
1,140✔
1729
    case QUERY_NODE_LOGIC_PLAN_SCAN:
669✔
1730
      code = stbSplSplitScanNode(pCxt, &info);
669✔
1731
      break;
669✔
1732
    case QUERY_NODE_LOGIC_PLAN_JOIN:
5✔
1733
      code = stbSplSplitJoinNode(pCxt, &info);
5✔
1734
      break;
5✔
1735
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
2✔
1736
      code = stbSplSplitPartitionNode(pCxt, &info);
2✔
1737
      break;
2✔
1738
    case QUERY_NODE_LOGIC_PLAN_AGG:
390✔
1739
      code = stbSplSplitAggNode(pCxt, &info);
390✔
1740
      break;
390✔
1741
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
64✔
1742
      code = stbSplSplitWindowNode(pCxt, &info);
64✔
1743
      break;
64✔
1744
    case QUERY_NODE_LOGIC_PLAN_SORT:
14✔
1745
      code = stbSplSplitSortNode(pCxt, &info);
14✔
1746
      break;
14✔
1747
    default:
×
1748
      break;
×
1749
  }
1750

1751
  if (info.pSplitNode) {
1,140✔
1752
    info.pSplitNode->splitDone = true;
1,087✔
1753
  }
1754
  pCxt->split = true;
1,140✔
1755
  return code;
1,140✔
1756
}
1757

1758
typedef struct SSigTbJoinSplitInfo {
1759
  SJoinLogicNode* pJoin;
1760
  SLogicNode*     pSplitNode;
1761
  SLogicSubplan*  pSubplan;
1762
} SSigTbJoinSplitInfo;
1763

1764
static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) {
16,915✔
1765
  if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
16,915✔
1766
    return false;
16,830✔
1767
  }
1768

1769
  SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
85✔
1770
  if (!pJoin->isSingleTableJoin) {
85✔
1771
    return false;
10✔
1772
  }
1773
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
153✔
1774
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
78✔
1775
}
1776

1777
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
16,916✔
1778
                                      SSigTbJoinSplitInfo* pInfo) {
1779
  if (sigTbJoinSplNeedSplit(pNode)) {
16,916✔
1780
    pInfo->pJoin = (SJoinLogicNode*)pNode;
26✔
1781
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
26✔
1782
    pInfo->pSubplan = pSubplan;
38✔
1783
    return true;
38✔
1784
  }
1785
  return false;
16,892✔
1786
}
1787

1788
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,033✔
1789
  SSigTbJoinSplitInfo info = {0};
8,033✔
1790
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
8,033✔
1791
    return TSDB_CODE_SUCCESS;
8,008✔
1792
  }
1793
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false);
38✔
1794
  if (TSDB_CODE_SUCCESS == code) {
38✔
1795
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
38✔
1796
  }
1797
  ++(pCxt->groupId);
38✔
1798
  pCxt->split = true;
38✔
1799
  return code;
38✔
1800
}
1801

1802
static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
41✔
1803
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
41✔
1804
  pUnionSubplan->pChildren = NULL;
41✔
1805

1806
  int32_t code = TSDB_CODE_SUCCESS;
41✔
1807

1808
  SNode* pChild = NULL;
41✔
1809
  FOREACH(pChild, pSplitNode->pChildren) {
125✔
1810
    SLogicSubplan* pNewSubplan = NULL;
84✔
1811
    code = splCreateSubplan(pCxt, (SLogicNode*)pChild, &pNewSubplan);
84✔
1812
    if (TSDB_CODE_SUCCESS == code) {
84✔
1813
      code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
84✔
1814
    }
1815
    if (TSDB_CODE_SUCCESS == code) {
84✔
1816
      REPLACE_NODE(NULL);
84✔
1817
      code = splMountSubplan(pNewSubplan, pSubplanChildren);
84✔
1818
    }
1819
    if (TSDB_CODE_SUCCESS != code) {
84✔
1820
      break;
×
1821
    }
1822
    ++(pCxt->groupId);
84✔
1823
  }
1824
  if (TSDB_CODE_SUCCESS == code) {
41✔
1825
    if (NULL != pSubplanChildren) {
41✔
1826
      if (pSubplanChildren->length > 0) {
5✔
1827
        code = nodesListMakeStrictAppendList(&pUnionSubplan->pChildren, pSubplanChildren);
×
1828
      } else {
1829
        nodesDestroyList(pSubplanChildren);
5✔
1830
      }
1831
    }
1832
    NODES_DESTORY_LIST(pSplitNode->pChildren);
41✔
1833
  }
1834
  return code;
41✔
1835
}
1836

1837
typedef struct SUnionAllSplitInfo {
1838
  SProjectLogicNode* pProject;
1839
  SLogicSubplan*     pSubplan;
1840
} SUnionAllSplitInfo;
1841

1842
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
16,939✔
1843
                                  SUnionAllSplitInfo* pInfo) {
1844
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
16,939✔
1845
    pInfo->pProject = (SProjectLogicNode*)pNode;
22✔
1846
    pInfo->pSubplan = pSubplan;
22✔
1847
    return true;
22✔
1848
  }
1849
  return false;
16,917✔
1850
}
1851

1852
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
22✔
1853
                                          SProjectLogicNode* pProject) {
1854
  SExchangeLogicNode* pExchange = NULL;
22✔
1855
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
22✔
1856
  if (NULL == pExchange) {
22✔
1857
    return code;
×
1858
  }
1859
  pExchange->srcStartGroupId = startGroupId;
22✔
1860
  pExchange->srcEndGroupId = pCxt->groupId - 1;
22✔
1861
  pExchange->node.precision = pProject->node.precision;
22✔
1862
  pExchange->node.pTargets = NULL;
22✔
1863
  code = nodesCloneList(pProject->node.pTargets, &pExchange->node.pTargets);
22✔
1864
  if (TSDB_CODE_SUCCESS != code) {
22✔
1865
    nodesDestroyNode((SNode*)pExchange);
×
1866
    return code;
×
1867
  }
1868
  pExchange->node.pConditions = NULL;
22✔
1869
  code = nodesCloneNode(pProject->node.pConditions, &pExchange->node.pConditions);
22✔
1870
  if (TSDB_CODE_SUCCESS != code) {
22✔
1871
    nodesDestroyNode((SNode*)pExchange);
×
1872
    return code;
×
1873
  }
1874
  TSWAP(pExchange->node.pLimit, pProject->node.pLimit);
22✔
1875

1876
  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
22✔
1877

1878
  if (NULL == pProject->node.pParent) {
22✔
1879
    pSubplan->pNode = (SLogicNode*)pExchange;
9✔
1880
    nodesDestroyNode((SNode*)pProject);
9✔
1881
    return TSDB_CODE_SUCCESS;
9✔
1882
  }
1883

1884
  SNode* pNode;
1885
  FOREACH(pNode, pProject->node.pParent->pChildren) {
13✔
1886
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
13✔
1887
      REPLACE_NODE(pExchange);
13✔
1888
      nodesDestroyNode(pNode);
13✔
1889
      return TSDB_CODE_SUCCESS;
13✔
1890
    }
1891
  }
1892
  nodesDestroyNode((SNode*)pExchange);
×
1893
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1894
}
1895

1896
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,028✔
1897
  SUnionAllSplitInfo info = {0};
8,028✔
1898
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
8,028✔
1899
    return TSDB_CODE_SUCCESS;
8,016✔
1900
  }
1901

1902
  int32_t startGroupId = pCxt->groupId;
20✔
1903
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
20✔
1904
  if (TSDB_CODE_SUCCESS == code) {
22✔
1905
    code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
22✔
1906
  }
1907
  pCxt->split = true;
22✔
1908
  return code;
22✔
1909
}
1910

1911
typedef struct SUnionDistinctSplitInfo {
1912
  SAggLogicNode* pAgg;
1913
  SLogicSubplan* pSubplan;
1914
} SUnionDistinctSplitInfo;
1915

1916
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
19✔
1917
                                           SAggLogicNode* pAgg) {
1918
  SExchangeLogicNode* pExchange = NULL;
19✔
1919
  int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE, (SNode**)&pExchange);
19✔
1920
  if (NULL == pExchange) {
19✔
1921
    return code;
×
1922
  }
1923
  pExchange->srcStartGroupId = startGroupId;
19✔
1924
  pExchange->srcEndGroupId = pCxt->groupId - 1;
19✔
1925
  pExchange->node.precision = pAgg->node.precision;
19✔
1926
  pExchange->node.pTargets = NULL;
19✔
1927
  code = nodesCloneList(pAgg->pGroupKeys, &pExchange->node.pTargets);
19✔
1928
  if (NULL == pExchange->node.pTargets) {
19✔
1929
    nodesDestroyNode((SNode*)pExchange);
×
1930
    return code;
×
1931
  }
1932

1933
  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
19✔
1934

1935
  return nodesListMakeStrictAppend(&pAgg->node.pChildren, (SNode*)pExchange);
19✔
1936
}
1937

1938
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
16,939✔
1939
                                   SUnionDistinctSplitInfo* pInfo) {
1940
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
16,939✔
1941
    pInfo->pAgg = (SAggLogicNode*)pNode;
19✔
1942
    if (!pInfo->pAgg->pGroupKeys) return false;
19✔
1943
    pInfo->pSubplan = pSubplan;
19✔
1944
    return true;
19✔
1945
  }
1946
  return false;
16,920✔
1947
}
1948

1949
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,028✔
1950
  SUnionDistinctSplitInfo info = {0};
8,028✔
1951
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
8,028✔
1952
    return TSDB_CODE_SUCCESS;
8,023✔
1953
  }
1954

1955
  int32_t startGroupId = pCxt->groupId;
19✔
1956
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
19✔
1957
  if (TSDB_CODE_SUCCESS == code) {
19✔
1958
    code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
19✔
1959
  }
1960
  pCxt->split = true;
19✔
1961
  return code;
19✔
1962
}
1963

1964
typedef struct SSmaIndexSplitInfo {
1965
  SMergeLogicNode* pMerge;
1966
  SLogicSubplan*   pSubplan;
1967
} SSmaIndexSplitInfo;
1968

1969
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
17,041✔
1970
                                   SSmaIndexSplitInfo* pInfo) {
1971
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
17,041✔
1972
    int32_t nodeType = nodeType(nodesListGetNode(pNode->pChildren, 0));
3✔
1973
    if (nodeType == QUERY_NODE_LOGIC_PLAN_EXCHANGE || nodeType == QUERY_NODE_LOGIC_PLAN_MERGE) {
3✔
1974
      pInfo->pMerge = (SMergeLogicNode*)pNode;
×
1975
      pInfo->pSubplan = pSubplan;
×
1976
      return true;
×
1977
    }
1978
  }
1979
  return false;
17,041✔
1980
}
1981

1982
static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,027✔
1983
  SSmaIndexSplitInfo info = {0};
8,027✔
1984
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
8,027✔
1985
    return TSDB_CODE_SUCCESS;
8,048✔
1986
  }
1987

1988
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
×
1989
  if (TSDB_CODE_SUCCESS == code) {
×
1990
    info.pMerge->srcGroupId = pCxt->groupId;
×
1991
  }
1992
  ++(pCxt->groupId);
×
1993
  pCxt->split = true;
×
1994
  return code;
×
1995
}
1996

1997
typedef struct SInsertSelectSplitInfo {
1998
  SLogicNode*    pQueryRoot;
1999
  SLogicSubplan* pSubplan;
2000
} SInsertSelectSplitInfo;
2001

2002
static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
17,021✔
2003
                                   SInsertSelectSplitInfo* pInfo) {
2004
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
17,021✔
2005
      MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
7✔
2006
    pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
7✔
2007
    pInfo->pSubplan = pSubplan;
7✔
2008
    return true;
7✔
2009
  }
2010
  return false;
17,014✔
2011
}
2012

2013
static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,032✔
2014
  SInsertSelectSplitInfo info = {0};
8,032✔
2015
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
8,032✔
2016
    return TSDB_CODE_SUCCESS;
8,040✔
2017
  }
2018

2019
  SLogicSubplan* pNewSubplan = NULL;
7✔
2020
  SNodeList*     pSubplanChildren = info.pSubplan->pChildren;
7✔
2021
  int32_t        code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY, false);
7✔
2022
  if (TSDB_CODE_SUCCESS == code) {
7✔
2023
    code = splCreateSubplan(pCxt, info.pQueryRoot, &pNewSubplan);
7✔
2024
  }
2025
  if (TSDB_CODE_SUCCESS == code) {
7✔
2026
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pNewSubplan);
7✔
2027
  }
2028
  if (TSDB_CODE_SUCCESS == code) {
7✔
2029
    code = splMountSubplan(pNewSubplan, pSubplanChildren);
7✔
2030
  }
2031

2032
  SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
7✔
2033
  ++(pCxt->groupId);
7✔
2034
  pCxt->split = true;
7✔
2035
  return code;
7✔
2036
}
2037

2038
typedef struct SStreamVTableSplitInfo {
2039
  SLogicNode*    pSplitNode;
2040
  SLogicSubplan* pSubplan;
2041
} SStreamVTableSplitInfo;
2042

2043

2044
static bool streamVTableFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
17,062✔
2045
                                      SStreamVTableSplitInfo* pInfo) {
2046
  if (!pCxt->pPlanCxt->streamQuery) {
17,062✔
2047
    return false;
17,015✔
2048
  }
2049
  if (1 == LIST_LENGTH(pNode->pChildren) && QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))) {
47✔
2050
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
×
2051
    pInfo->pSubplan = pSubplan;
×
2052
    return true;
×
2053
  }
2054
  return false;
54✔
2055
}
2056

2057
static int32_t streamVTableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,032✔
2058
  int32_t                code = TSDB_CODE_SUCCESS;
8,032✔
2059
  SStreamVTableSplitInfo info = {0};
8,032✔
2060
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)streamVTableFindSplitNode, &info)) {
8,032✔
2061
    return TSDB_CODE_SUCCESS;
8,045✔
2062
  }
2063

2064
  PLAN_ERR_JRET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false));
×
2065
  SLogicSubplan *sub = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
×
2066
  PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)sub));
×
2067
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
×
2068
  ++(pCxt->groupId);
×
2069
  
2070
_return:
×
2071

2072
  pCxt->split = true;
×
2073
  return code;
×
2074
}
2075

2076
typedef struct SVirtualTableSplitInfo {
2077
  SVirtualScanLogicNode *pVirtual;
2078
  SLogicSubplan          *pSubplan;
2079
} SVirtualTableSplitInfo;
2080

2081
static bool virtualTableFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
17,055✔
2082
                                      SVirtualTableSplitInfo* pInfo) {
2083
  if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode) && 0 != LIST_LENGTH(pNode->pChildren) &&
17,055✔
2084
      QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
×
2085
    pInfo->pVirtual = (SVirtualScanLogicNode*)pNode;
×
2086
    pInfo->pSubplan = pSubplan;
×
2087
    return true;
×
2088
  }
2089
  return false;
17,055✔
2090
}
2091

2092
static int32_t virtualTableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,027✔
2093
  int32_t                code = TSDB_CODE_SUCCESS;
8,027✔
2094
  SVirtualTableSplitInfo info = {0};
8,027✔
2095
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)virtualTableFindSplitNode, &info)) {
8,027✔
2096
    return TSDB_CODE_SUCCESS;
8,045✔
2097
  }
2098
  int32_t startGroupId = pCxt->groupId;
1✔
2099
  SNode*  pChild = NULL;
1✔
2100
  FOREACH(pChild, info.pVirtual->node.pChildren) {
1✔
2101
    PLAN_ERR_JRET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)pChild, info.pSubplan->subplanType, info.pVirtual->tableType == TSDB_SUPER_TABLE));
×
2102
    SLogicSubplan *sub = splCreateScanSubplan(pCxt, (SLogicNode*)pChild, 0);
×
2103
    sub->processOneBlock = (info.pVirtual->tableType == TSDB_SUPER_TABLE);
×
2104
    PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)sub));
×
2105
    ++(pCxt->groupId);
×
2106
  }
2107
  info.pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1✔
2108
_return:
1✔
2109
  pCxt->split = true;
1✔
2110
  return code;
1✔
2111
}
2112

2113
typedef struct SMergeAggColsSplitInfo {
2114
  SAggLogicNode   *pAgg;
2115
  SLogicNode      *pSplitNode;
2116
  SLogicSubplan   *pSubplan;
2117
} SMergeAggColsSplitInfo;
2118

2119
static bool mergeAggColsNeedSplit(SLogicNode* pNode) {
17,050✔
2120
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
17,050✔
2121
      NULL != pNode->pParent &&
5,229✔
2122
      QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode->pParent) &&
103✔
2123
      ((SMergeLogicNode *)pNode->pParent)->colsMerge &&
5✔
2124
      QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0))) {
5✔
2125
    return true;
2✔
2126
  }
2127
  return false;
17,048✔
2128
}
2129

2130

2131
static bool mergeAggColsFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
17,047✔
2132
                                      SMergeAggColsSplitInfo* pInfo) {
2133
  if (mergeAggColsNeedSplit(pNode)) {
17,047✔
2134
    pInfo->pAgg = (SAggLogicNode *)pNode;
×
2135
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
×
2136
    pInfo->pSubplan = pSubplan;
2✔
2137
    return true;
2✔
2138
  }
2139
  return false;
17,064✔
2140
}
2141

2142
static int32_t mergeAggColsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,026✔
2143
  int32_t                code = TSDB_CODE_SUCCESS;
8,026✔
2144
  SMergeAggColsSplitInfo info = {0};
8,026✔
2145
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)mergeAggColsFindSplitNode, &info)) {
8,026✔
2146
    return TSDB_CODE_SUCCESS;
8,043✔
2147
  }
2148

2149
  PLAN_ERR_RET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, SUBPLAN_TYPE_MERGE, false));
2✔
2150
  PLAN_ERR_RET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0)));
2✔
2151

2152
  ++(pCxt->groupId);
2✔
2153
  pCxt->split = true;
2✔
2154
  return code;
2✔
2155
}
2156

2157
typedef struct SQnodeSplitInfo {
2158
  SLogicNode*    pSplitNode;
2159
  SLogicSubplan* pSubplan;
2160
} SQnodeSplitInfo;
2161

2162
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
51✔
2163
                                SQnodeSplitInfo* pInfo) {
2164
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
51✔
2165
      QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) &&
13✔
2166
      QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
13✔
2167
      ((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
13✔
2168
    pInfo->pSplitNode = pNode;
13✔
2169
    pInfo->pSubplan = pSubplan;
13✔
2170
    return true;
13✔
2171
  }
2172
  return false;
38✔
2173
}
2174

2175
static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
6,799✔
2176
  if (QUERY_POLICY_QNODE != tsQueryPolicy) {
6,799✔
2177
    return TSDB_CODE_SUCCESS;
6,794✔
2178
  }
2179

2180
  SQnodeSplitInfo info = {0};
5✔
2181
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
5✔
2182
    return TSDB_CODE_SUCCESS;
4✔
2183
  }
2184
  ((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
13✔
2185
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType, false);
13✔
2186
  if (TSDB_CODE_SUCCESS == code) {
13✔
2187
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
13✔
2188
    if (NULL != pScanSubplan) {
13✔
2189
      if (NULL != info.pSubplan->pVgroupList) {
13✔
2190
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
4✔
2191
        TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
4✔
2192
      } else {
2193
        info.pSubplan->numOfComputeNodes = 1;
9✔
2194
      }
2195
      code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
13✔
2196
    } else {
2197
      code = terrno;
×
2198
    }
2199
  }
2200
  info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
13✔
2201
  ++(pCxt->groupId);
13✔
2202
  pCxt->split = true;
13✔
2203
  return code;
13✔
2204
}
2205

2206
typedef struct SDynVirtualScanSplitInfo {
2207
  SDynQueryCtrlLogicNode *pDyn;
2208
  SLogicSubplan          *pSubplan;
2209
} SDynVirtualScanSplitInfo;
2210

2211
static bool dynVirtualScanFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
17,054✔
2212
                                        SDynVirtualScanSplitInfo* pInfo) {
2213
  if (QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL == nodeType(pNode) && NULL != pNode->pParent &&
17,054✔
2214
      QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(pNode->pParent) &&
5✔
2215
      ((SDynQueryCtrlLogicNode*)pNode)->qType == DYN_QTYPE_VTB_SCAN) {
5✔
2216
    pInfo->pDyn = (SDynQueryCtrlLogicNode*)pNode;
×
2217
    pInfo->pSubplan = pSubplan;
×
2218
    return true;
×
2219
  }
2220
  return false;
17,054✔
2221
}
2222

2223
static int32_t dynVirtualScanSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
8,025✔
2224
  int32_t                  code = TSDB_CODE_SUCCESS;
8,025✔
2225
  SDynVirtualScanSplitInfo info = {0};
8,025✔
2226
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)dynVirtualScanFindSplitNode, &info)) {
8,025✔
2227
    return TSDB_CODE_SUCCESS;
8,041✔
2228
  }
2229

2230
  SNode         *pSub = NULL;
×
2231
  SNodeList     *pSubplanChildren = info.pSubplan->pChildren;
×
2232
  PLAN_ERR_JRET(splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pDyn, info.pSubplan->subplanType, true));
×
2233
  PLAN_ERR_JRET(splCreateSubplan(pCxt, (SLogicNode*)info.pDyn, (SLogicSubplan**)&pSub));
×
2234
  ((SLogicSubplan*)pSub)->subplanType = SUBPLAN_TYPE_MERGE;
×
2235
  splSetSubplanVgroups((SLogicSubplan*)pSub, (SLogicNode*)info.pDyn);
×
2236
  PLAN_ERR_JRET(nodesListMakeStrictAppend(&info.pSubplan->pChildren, pSub));
×
2237
  PLAN_ERR_JRET(splMountSubplan((SLogicSubplan*)pSub, pSubplanChildren));
×
2238
  ++(pCxt->groupId);
×
2239

2240
_return:
×
2241
  pCxt->split = true;
×
2242
  return code;
×
2243
}
2244

2245
// clang-format off
2246
static const SSplitRule splitRuleSet[] = {
2247
  {.pName = "SuperTableSplit",      .splitFunc = stableSplit},
2248
  {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
2249
  {.pName = "UnionAllSplit",        .splitFunc = unionAllSplit},
2250
  {.pName = "UnionDistinctSplit",   .splitFunc = unionDistinctSplit},
2251
  {.pName = "SmaIndexSplit",        .splitFunc = smaIndexSplit}, // not used yet
2252
  {.pName = "InsertSelectSplit",    .splitFunc = insertSelectSplit},
2253
  {.pName = "StreamVtableSplit",    .splitFunc = streamVTableSplit},
2254
  {.pName = "VirtualtableSplit",    .splitFunc = virtualTableSplit},
2255
  {.pName = "MergeAggColsSplit",    .splitFunc = mergeAggColsSplit},
2256
  {.pName = "DynVirtualScanSplit",  .splitFunc = dynVirtualScanSplit},
2257
};
2258
// clang-format on
2259

2260
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
2261

2262
static int32_t dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
8,039✔
2263
  int32_t code = 0;
8,039✔
2264
  if (!tsQueryPlannerTrace) {
8,039✔
2265
    return code;
7,626✔
2266
  }
2267
  char* pStr = NULL;
413✔
2268
  code = nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
413✔
2269
  if (TSDB_CODE_SUCCESS == code) {
416✔
2270
    if (NULL == pRuleName) {
416✔
2271
      qDebugL("before split, JsonPlan: %s", pStr);
280✔
2272
    } else {
2273
      qDebugL("apply split %s rule, JsonPlan: %s", pRuleName, pStr);
136✔
2274
    }
2275
    taosMemoryFree(pStr);
416✔
2276
  }
2277
  return code;
416✔
2278
}
2279

2280
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
6,808✔
2281
  SSplitContext cxt = {
6,808✔
2282
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
6,808✔
2283
  bool split = false;
6,808✔
2284
  int32_t code = dumpLogicSubplan(NULL, pSubplan);
6,808✔
2285
  if (TSDB_CODE_SUCCESS != code) {
6,808✔
2286
    return code;
×
2287
  }
2288
  do {
2289
    split = false;
8,032✔
2290
    for (int32_t i = 0; i < splitRuleNum; ++i) {
88,368✔
2291
      cxt.split = false;
80,357✔
2292
      int32_t code = splitRuleSet[i].splitFunc(&cxt, pSubplan);
80,357✔
2293
      if (TSDB_CODE_SUCCESS != code) {
80,336✔
2294
        return code;
×
2295
      }
2296
      if (cxt.split) {
80,336✔
2297
        split = true;
1,232✔
2298
        code = dumpLogicSubplan(splitRuleSet[i].pName, pSubplan);
1,232✔
2299
        if (TSDB_CODE_SUCCESS != code) {
1,232✔
2300
          return code;
×
2301
        }
2302
      }
2303
    }
2304
  } while (split);
8,011✔
2305
  return qnodeSplit(&cxt, pSubplan);
6,787✔
2306
}
2307

2308
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
155,653✔
2309
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
155,653✔
2310
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
203✔
2311
    return;
203✔
2312
  } else if (QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN == nodeType(pNode)) {
155,450✔
2313
    // do nothing, since virtual table scan node is SUBPLAN_TYPE_MERGE
2314
    return;
×
2315
  }
2316

2317
  SNode* pChild;
2318
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
155,856✔
2319
}
2320

2321
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
162,047✔
2322
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
162,047✔
2323
    return true;
6,809✔
2324
  }
2325
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
155,238✔
2326
  return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
155,238✔
2327
}
2328

2329
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
162,050✔
2330
  if (!needSplitSubplan(pLogicSubplan)) {
162,050✔
2331
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
155,324✔
2332
    return TSDB_CODE_SUCCESS;
155,198✔
2333
  }
2334
  return applySplitRule(pCxt, pLogicSubplan);
6,797✔
2335
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc