• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.14
/source/libs/planner/src/planPhysiCreater.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "planInt.h"
17

18
#include "catalog.h"
19
#include "functionMgt.h"
20
#include "systable.h"
21
#include "tglobal.h"
22

23
typedef struct SSlotIdInfo {
24
  int16_t slotId;
25
  bool    set;
26
} SSlotIdInfo;
27

28
typedef struct SSlotIndex {
29
  int16_t dataBlockId;
30
  SArray* pSlotIdsInfo;  // duplicate name slot
31
} SSlotIndex;
32

33

34
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int32_t *pLen, uint16_t extraBufLen) {
45,958,705✔
35
  int32_t code = 0;
45,958,705✔
36
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
45,958,705✔
37
    SColumnNode* pCol = (SColumnNode*)pNode;
42,034,698✔
38
    if (NULL != pStmtName) {
42,034,698✔
39
      if ('\0' != pStmtName[0]) {
2,276,498✔
40
        *ppKey = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
2,270,455✔
41
        if (!*ppKey) {
2,270,456!
42
          return terrno;
×
43
        }
44
        strcat(*ppKey, pStmtName);
2,270,456✔
45
        strcat(*ppKey, ".");
2,270,456✔
46
        strcat(*ppKey, pCol->node.aliasName);
2,270,456✔
47
        *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
2,270,456✔
48
        return code;
2,270,456✔
49
      } else {
50
        *ppKey = taosMemoryCalloc(1, TSDB_COL_NAME_LEN + 1 + extraBufLen);
6,043✔
51
        if (!*ppKey) {
6,043!
52
          return terrno;
×
53
        }
54
        strcat(*ppKey, pCol->node.aliasName);
6,043✔
55
        *pLen = strlen(*ppKey);
6,043✔
56
        return code;
6,043✔
57
      }
58
    }
59
    if ('\0' == pCol->tableAlias[0]) {
39,758,200✔
60
      *ppKey = taosMemoryCalloc(1, TSDB_COL_NAME_LEN + 1 + extraBufLen);
8,044,394✔
61
      if (!*ppKey) {
8,044,426!
62
        return terrno;
×
63
      }
64
      strcat(*ppKey, pCol->colName);
8,044,453✔
65
      *pLen = strlen(*ppKey);
8,044,453✔
66
      return code;
8,044,453✔
67
    }
68

69
    *ppKey = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
31,713,806✔
70
    if (!*ppKey) {
31,713,984!
71
      return terrno;
×
72
    }
73
    strcat(*ppKey, pCol->tableAlias);
31,713,984✔
74
    strcat(*ppKey, ".");
31,713,984✔
75
    strcat(*ppKey, pCol->colName);
31,713,984✔
76
    *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
31,713,984✔
77
    return code;
31,714,025✔
78
  } else if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
3,924,007✔
79
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
3,559,489✔
80
    if (FUNCTION_TYPE_TBNAME == pFunc->funcType) {
3,559,489✔
81
      SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 0);
285,206✔
82
      if (pVal) {
285,206✔
83
        if (NULL != pStmtName && '\0' != pStmtName[0]) {
7,357!
84
          *ppKey = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
×
85
          if (!*ppKey) {
×
86
            return terrno;
×
87
          }
88
          strcat(*ppKey, pStmtName);
×
89
          strcat(*ppKey, ".");
×
90
          strcat(*ppKey, ((SExprNode*)pNode)->aliasName);
×
91
          *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
×
92
          return code;
×
93
        }
94
        *ppKey = taosMemoryCalloc(1, strlen(pVal->literal) + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
7,357✔
95
        if (!*ppKey) {
7,357!
96
          return terrno;
×
97
        }
98
        strcat(*ppKey, pVal->literal);
7,357✔
99
        strcat(*ppKey, ".");
7,357✔
100
        strcat(*ppKey, ((SExprNode*)pNode)->aliasName);
7,357✔
101
        *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
7,357✔
102
        return code;
7,357✔
103
      }
104
    }
105
  }
106

107
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
3,916,650!
108
    *ppKey = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
768,359✔
109
    if (!*ppKey) {
768,359!
110
      return terrno;
×
111
    }
112
    strcat(*ppKey, pStmtName);
768,359✔
113
    strcat(*ppKey, ".");
768,359✔
114
    strcat(*ppKey, ((SExprNode*)pNode)->aliasName);
768,359✔
115
    *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
768,359✔
116
    return code;
768,359✔
117
  }
118

119
  *ppKey = taosMemoryCalloc(1, TSDB_COL_NAME_LEN + 1 + extraBufLen);
3,148,291✔
120
  if (!*ppKey) {
3,148,547!
121
    return terrno;
×
122
  }
123
  strcat(*ppKey, ((SExprNode*)pNode)->aliasName);
3,148,547✔
124
  *pLen = strlen(*ppKey);
3,148,547✔
125
  return code;
3,148,547✔
126
}
127

128

129
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId,
18,357,594✔
130
                             bool output, bool reserve) {
131
  SSlotDescNode* pSlot = NULL;
18,357,594✔
132
  int32_t code = nodesMakeNode(QUERY_NODE_SLOT_DESC, (SNode**)&pSlot);
18,357,594✔
133
  if (NULL == pSlot) {
18,357,549!
134
    terrno = code;
×
135
    return NULL;
×
136
  }
137
  snprintf(pSlot->name, sizeof(pSlot->name), "%s", pName);
18,357,593✔
138
  pSlot->slotId = slotId;
18,357,593✔
139
  pSlot->dataType = ((SExprNode*)pNode)->resType;
18,357,593✔
140
  pSlot->reserve = reserve;
18,357,593✔
141
  pSlot->output = output;
18,357,593✔
142
  return (SNode*)pSlot;
18,357,593✔
143
}
144

145
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
16,457,995✔
146
  STargetNode* pTarget = NULL;
16,457,995✔
147
  int32_t code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTarget);
16,457,995✔
148
  if (NULL == pTarget) {
16,457,960!
149
    return code;
×
150
  }
151

152
  pTarget->dataBlockId = dataBlockId;
16,457,960✔
153
  pTarget->slotId = slotId;
16,457,960✔
154
  pTarget->pExpr = pNode;
16,457,960✔
155

156
  *pOutput = (SNode*)pTarget;
16,457,960✔
157
  return TSDB_CODE_SUCCESS;
16,457,960✔
158
}
159

160
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
18,995,618✔
161
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
18,995,618✔
162
  if (NULL != pIndex) {
18,995,633✔
163
    SSlotIdInfo info = {.slotId = slotId, .set = false};
7,153✔
164
    if (NULL == taosArrayPush(pIndex->pSlotIdsInfo, &info)) {
14,306!
165
      return terrno;
×
166
    }
167
    return TSDB_CODE_SUCCESS;
7,153✔
168
  }
169

170
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
18,988,480✔
171
  if (NULL == index.pSlotIdsInfo) {
18,988,527!
172
    return terrno;
×
173
  }
174
  SSlotIdInfo info = {.slotId = slotId, .set = false};
18,988,527✔
175
  if (NULL == taosArrayPush(index.pSlotIdsInfo, &info)) {
37,977,059!
176
    return terrno;
×
177
  }
178
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
18,988,532✔
179
}
180

181
static int32_t putSlotToHash(const char* pName, int32_t len, int16_t dataBlockId, int16_t slotId, SNode* pNode, SHashObj* pHash) {
18,627,928✔
182
  return putSlotToHashImpl(dataBlockId, slotId, pName, len, pHash);
18,627,928✔
183
}
184

185
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
4,465,899✔
186
                                       SHashObj** pDescHash, SHashObj** ppProjIdxDescHash) {
187
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
4,465,899✔
188
  if (NULL == pHash) {
4,465,909!
189
    return terrno;
×
190
  }
191
  SHashObj* pProjIdxHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
4,465,909✔
192
  if (!pProjIdxHash) {
4,465,925!
193
    taosHashCleanup(pHash);
×
194
    return terrno;
×
195
  }
196
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
4,465,925!
197
    taosHashCleanup(pHash);
×
198
    taosHashCleanup(pProjIdxHash);
×
199
    return terrno;
×
200
  }
201
  if (NULL == taosArrayInsert(pCxt->pProjIdxLocHelper, dataBlockId, &pProjIdxHash)) {
4,465,909!
202
    taosHashCleanup(pHash);
×
203
    taosHashCleanup(pProjIdxHash);
×
204
    return terrno;
×
205
  }
206

207
  *pDescHash = pHash;
4,465,907✔
208
  *ppProjIdxDescHash = pProjIdxHash;
4,465,907✔
209
  return TSDB_CODE_SUCCESS;
4,465,907✔
210
}
211

212
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
4,465,905✔
213
                                   SHashObj* pHash, SHashObj* pProjIdxDescHash) {
214
  pDataBlockDesc->pSlots = NULL;
4,465,905✔
215
  int32_t code = nodesMakeList(&pDataBlockDesc->pSlots);
4,465,905✔
216
  if (NULL == pDataBlockDesc->pSlots) {
4,465,918!
217
    return code;
×
218
  }
219

220
  int16_t slotId = 0;
4,465,918✔
221
  SNode*  pNode = NULL;
4,465,918✔
222
  FOREACH(pNode, pList) {
22,455,823!
223
    char* name = NULL;
17,989,874✔
224
    int32_t len = 0;
17,989,874✔
225
    code = getSlotKey(pNode, NULL, &name, &len, 16);
17,989,874✔
226
    if (TSDB_CODE_SUCCESS == code) {
17,989,904!
227
      code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pNode, slotId, true, false));
17,989,909✔
228
    }
229
    code = putSlotToHash(name, len, pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
17,989,825✔
230
    if (TSDB_CODE_SUCCESS == code) {
17,989,916✔
231
      if (nodeType(pNode) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode)->resIdx > 0) {
17,989,914!
232
        sprintf(name + strlen(name), "_%d", ((SColumnNode*)pNode)->resIdx);
638,101✔
233
        code = putSlotToHash(name, strlen(name), pDataBlockDesc->dataBlockId, slotId, pNode, pProjIdxDescHash);
638,101✔
234
      }
235
    }
236
    taosMemoryFree(name);
17,989,916✔
237
    if (TSDB_CODE_SUCCESS == code) {
17,989,900!
238
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
17,989,905✔
239
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
17,989,905✔
240
      ++slotId;
17,989,905✔
241
    } else {
UNCOV
242
      break;
×
243
    }
244
  }
245
  return code;
4,465,944✔
246
}
247

248
static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
4,465,900✔
249
  SDataBlockDescNode* pDesc = NULL;
4,465,900✔
250
  int32_t code = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC, (SNode**)&pDesc);
4,465,900✔
251
  if (NULL == pDesc) {
4,465,901!
252
    return code;
×
253
  }
254
  pDesc->dataBlockId = pCxt->nextDataBlockId++;
4,465,901✔
255

256
  SHashObj* pHash = NULL;
4,465,901✔
257
  SHashObj* pProjIdxHash = NULL;
4,465,901✔
258
  code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash, &pProjIdxHash);
4,465,901!
259
  if (TSDB_CODE_SUCCESS == code) {
4,465,907!
260
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash, pProjIdxHash);
4,465,908✔
261
  }
262

263
  if (TSDB_CODE_SUCCESS == code) {
4,465,933!
264
    *pDataBlockDesc = pDesc;
4,465,933✔
265
  } else {
266
    nodesDestroyNode((SNode*)pDesc);
×
267
  }
268

269
  return code;
4,465,933✔
270
}
271

272
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
16,090,282✔
273
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
16,090,282✔
274
  for (int32_t i = 0; i < size; ++i) {
16,835,724✔
275
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
16,099,685✔
276
    if (!pInfo->set) {
16,099,672✔
277
      pInfo->set = true;
15,354,222✔
278
      return pInfo->slotId;
15,354,222✔
279
    }
280
  }
281
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
736,039✔
282
}
283

284
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
5,782,661✔
285
                                     const char* pStmtName, bool output, bool reserve) {
286
  if (NULL == pList) {
5,782,661✔
287
    return TSDB_CODE_SUCCESS;
997,480✔
288
  }
289

290
  int32_t   code = TSDB_CODE_SUCCESS;
4,785,181✔
291
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
4,785,181✔
292
  int16_t   nextSlotId = LIST_LENGTH(pDataBlockDesc->pSlots), slotId = 0;
4,785,193!
293
  SNode*    pNode = NULL;
4,785,193✔
294
  FOREACH(pNode, pList) {
21,243,155!
295
    SNode*      pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
16,457,950!
296
    char        *name = NULL;
16,457,950✔
297
    int32_t     len = 0;
16,457,950✔
298
    code = getSlotKey(pExpr, pStmtName, &name, &len, 0);
16,457,950✔
299
    if (TSDB_CODE_SUCCESS == code) {
16,458,036!
300
      SSlotIndex* pIndex = taosHashGet(pHash, name, len);
16,458,036✔
301
      if (NULL == pIndex) {
16,457,979✔
302
        code =
303
          nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pExpr, nextSlotId, output, reserve));
367,690✔
304
        if (TSDB_CODE_SUCCESS == code) {
367,690!
305
          code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
367,690✔
306
        }
307
        pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
367,690✔
308
        if (output) {
367,690✔
309
          pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
219,783✔
310
        }
311
        slotId = nextSlotId;
367,690✔
312
        ++nextSlotId;
367,690✔
313
      } else {
314
        slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
16,090,289✔
315
      }
316
    }
317

318
      taosMemoryFree(name);
16,457,952✔
319
    if (TSDB_CODE_SUCCESS == code) {
16,457,989✔
320
      SNode* pTarget = NULL;
16,457,983✔
321
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
16,457,983✔
322
      if (TSDB_CODE_SUCCESS == code) {
16,457,956!
323
        REPLACE_NODE(pTarget);
16,457,962✔
324
      }
325
    }
326

327
    if (TSDB_CODE_SUCCESS != code) {
16,457,962!
328
      break;
×
329
    }
330
  }
331
  return code;
4,785,205✔
332
}
333

334
static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
4,839,896✔
335
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, true);
4,839,896✔
336
}
337

338
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
16,774✔
339
  if (NULL == pNode || NULL == *pNode) {
16,774!
340
    return TSDB_CODE_SUCCESS;
×
341
  }
342

343
  SNodeList* pList = NULL;
16,774✔
344
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
16,774✔
345
  if (TSDB_CODE_SUCCESS == code) {
16,773!
346
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
16,773✔
347
  }
348
  if (TSDB_CODE_SUCCESS == code) {
16,773!
349
    *pNode = nodesListGetNode(pList, 0);
16,773✔
350
  }
351
  nodesClearList(pList);
16,773✔
352
  return code;
16,773✔
353
}
354

355
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
724,307✔
356
                                           SDataBlockDescNode* pDataBlockDesc) {
357
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, true);
724,307✔
358
}
359

360
static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
218,466✔
361
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true, true);
218,466✔
362
}
363

364
typedef struct SSetSlotIdCxt {
365
  int32_t   errCode;
366
  SHashObj* pLeftHash;
367
  SHashObj* pLeftProjIdxHash;
368
  SHashObj* pRightHash;
369
  SHashObj* pRightProdIdxHash;
370
} SSetSlotIdCxt;
371

372
static void dumpSlots(const char* pName, SHashObj* pHash) {
×
373
  if (NULL == pHash) {
×
374
    return;
×
375
  }
376
  planDebug("%s", pName);
×
377
  void* pIt = taosHashIterate(pHash, NULL);
×
378
  while (NULL != pIt) {
×
379
    size_t len = 0;
×
380
    char*  pKey = taosHashGetKey(pIt, &len);
×
381
    char   name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
×
382
    strncpy(name, pKey, len);
×
383
    planDebug("\tslot name = %s", name);
×
384
    pIt = taosHashIterate(pHash, pIt);
×
385
  }
386
}
387

388
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
24,108,051✔
389
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
24,108,051✔
390
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
11,397,831✔
391
    char           *name = NULL;
11,397,831✔
392
    int32_t        len = 0;
11,397,831✔
393
    pCxt->errCode = getSlotKey(pNode, NULL, &name, &len, 16);
11,397,831✔
394
    if (TSDB_CODE_SUCCESS != pCxt->errCode) {
11,397,873!
395
      return DEAL_RES_ERROR;
×
396
    }
397
    SSlotIndex *pIndex = NULL;
11,397,873✔
398
    if (((SColumnNode*)pNode)->projRefIdx > 0) {
11,397,873✔
399
      sprintf(name + strlen(name), "_%d", ((SColumnNode*)pNode)->projRefIdx);
57,908✔
400
      pIndex = taosHashGet(pCxt->pLeftProjIdxHash, name, strlen(name));
57,908✔
401
      if (!pIndex) {
57,908✔
402
        pIndex = taosHashGet(pCxt->pRightProdIdxHash, name, strlen(name));
30✔
403
      }
404
    } else {
405
      pIndex = taosHashGet(pCxt->pLeftHash, name, len);
11,339,965✔
406
      if (NULL == pIndex) {
11,339,908✔
407
        pIndex = taosHashGet(pCxt->pRightHash, name, len);
370,256✔
408
      }
409
    }
410
    // pIndex is definitely not NULL, otherwise it is a bug
411
    if (NULL == pIndex) {
11,397,815!
412
      planError("doSetSlotId failed, invalid slot name %s", name);
×
413
      dumpSlots("left datablock desc", pCxt->pLeftHash);
×
414
      dumpSlots("right datablock desc", pCxt->pRightHash);
×
415
      pCxt->errCode = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
416
      taosMemoryFree(name);
×
417
      return DEAL_RES_ERROR;
×
418
    }
419
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
11,397,815✔
420
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
11,397,815✔
421
    taosMemoryFree(name);
11,397,805✔
422
    return DEAL_RES_IGNORE_CHILD;
11,397,867✔
423
  }
424
  return DEAL_RES_CONTINUE;
12,710,220✔
425
}
426

427
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
2,452,388✔
428
                             SNode** pOutput) {
429
  if (NULL == pNode) {
2,452,388✔
430
    return TSDB_CODE_SUCCESS;
1,504,560✔
431
  }
432

433
  SNode* pRes = NULL;
947,828✔
434
  int32_t code = nodesCloneNode(pNode, &pRes);
947,828✔
435
  if (NULL == pRes) {
947,834!
436
    return code;
×
437
  }
438

439
  SSetSlotIdCxt cxt = {
4,739,150✔
440
      .errCode = TSDB_CODE_SUCCESS,
441
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
947,834✔
442
      .pLeftProjIdxHash = taosArrayGetP(pCxt->pProjIdxLocHelper, leftDataBlockId),
947,832✔
443
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)),
947,828✔
444
      .pRightProdIdxHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pProjIdxLocHelper, rightDataBlockId))};
947,828✔
445
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
947,828✔
446
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
947,836✔
447
    nodesDestroyNode(pRes);
2✔
448
    return cxt.errCode;
×
449
  }
450

451
  *pOutput = pRes;
947,834✔
452
  return TSDB_CODE_SUCCESS;
947,834✔
453
}
454

455
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
4,363,377✔
456
                             const SNodeList* pList, SNodeList** pOutput) {
457
  if (NULL == pList) {
4,363,377✔
458
    return TSDB_CODE_SUCCESS;
1,448,226✔
459
  }
460

461
  SNodeList* pRes = NULL;
2,915,151✔
462
  int32_t code = nodesCloneList(pList, &pRes);
2,915,151✔
463
  if (NULL == pRes) {
2,915,159!
464
    return code;
×
465
  }
466

467
  SSetSlotIdCxt cxt = {
14,575,785✔
468
      .errCode = TSDB_CODE_SUCCESS,
469
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
2,915,159✔
470
      .pLeftProjIdxHash = taosArrayGetP(pCxt->pProjIdxLocHelper, leftDataBlockId),
2,915,158✔
471
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)),
2,915,156✔
472
      .pRightProdIdxHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pProjIdxLocHelper, rightDataBlockId))};
2,915,156✔
473
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
2,915,156✔
474
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
2,915,161✔
475
    nodesDestroyList(pRes);
1✔
476
    return cxt.errCode;
×
477
  }
478
  *pOutput = pRes;
2,915,160✔
479
  return TSDB_CODE_SUCCESS;
2,915,160✔
480
}
481

482
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
4,465,899✔
483
  SPhysiNode* pPhysiNode = NULL;
4,465,899✔
484
  int32_t code = nodesMakeNode(type, (SNode**)&pPhysiNode);
4,465,899✔
485
  if (NULL == pPhysiNode) {
4,465,904!
486
    terrno = code;
×
487
    return NULL;
×
488
  }
489

490
  TSWAP(pPhysiNode->pLimit, pLogicNode->pLimit);
4,465,904✔
491
  TSWAP(pPhysiNode->pSlimit, pLogicNode->pSlimit);
4,465,904✔
492
  pPhysiNode->dynamicOp = pLogicNode->dynamicOp;
4,465,904✔
493
  pPhysiNode->inputTsOrder = pLogicNode->inputTsOrder;
4,465,904✔
494
  pPhysiNode->outputTsOrder = pLogicNode->outputTsOrder;
4,465,904✔
495

496
  code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
4,465,904✔
497
  if (TSDB_CODE_SUCCESS != code) {
4,465,926!
498
    nodesDestroyNode((SNode*)pPhysiNode);
×
499
    terrno = code;
×
500
    return NULL;
3✔
501
  }
502
  pPhysiNode->pOutputDataBlockDesc->precision = pLogicNode->precision;
4,465,928✔
503
  return pPhysiNode;
4,465,928✔
504
}
505

506
static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
4,286,235✔
507
  if (NULL != pLogicNode->pConditions) {
4,286,235✔
508
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
512,061✔
509
                         &pPhysiNode->pConditions);
510
  }
511
  return TSDB_CODE_SUCCESS;
3,774,174✔
512
}
513

514
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
15,137,155✔
515
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
15,137,155✔
516
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
15,137,155✔
517
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
15,137,155✔
518
}
519

520
static int32_t sortScanCols(SNodeList* pScanCols) {
1,495,460✔
521
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
1,495,460!
522
  if (NULL == pArray) {
1,495,460!
UNCOV
523
    return terrno;
×
524
  }
525

526
  int32_t code = 0;
1,495,460✔
527
  SNode* pCol = NULL;
1,495,460✔
528
  FOREACH(pCol, pScanCols) {
7,551,456!
529
    if (NULL == taosArrayPush(pArray, &pCol)) {
6,055,996!
530
      code = terrno;
×
531
      break;
×
532
    }
533
  }
534
  if (TSDB_CODE_SUCCESS != code) {
1,495,458!
535
    taosArrayDestroy(pArray);
×
536
    return code;
×
537
  }
538
  taosArraySort(pArray, colIdCompare);
1,495,458✔
539

540
  int32_t index = 0;
1,495,456✔
541
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
7,551,451!
542
  taosArrayDestroy(pArray);
1,495,454✔
543

544
  return TSDB_CODE_SUCCESS;
1,495,462✔
545
}
546

547
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
1,571,099✔
548
  if (NULL == pScanCols) {
1,571,099✔
549
    return TSDB_CODE_SUCCESS;
76,241✔
550
  }
551

552
  pScanPhysiNode->pScanCols = NULL;
1,494,858✔
553
  int32_t code = nodesCloneList(pScanCols, &pScanPhysiNode->pScanCols);
1,494,858✔
554
  if (NULL == pScanPhysiNode->pScanCols) {
1,494,868!
555
    return code;
×
556
  }
557
  return sortScanCols(pScanPhysiNode->pScanCols);
1,494,868✔
558
}
559

560
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
1,571,101✔
561
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
562
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
1,571,101✔
563
  if (TSDB_CODE_SUCCESS == code) {
1,571,107!
564
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
1,571,110✔
565
  }
566

567
  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
1,571,103!
568
    pScanPhysiNode->pScanPseudoCols = NULL;
650,101✔
569
    code = nodesCloneList(pScanLogicNode->pScanPseudoCols, &pScanPhysiNode->pScanPseudoCols);
650,101✔
570
    if (NULL == pScanPhysiNode->pScanPseudoCols) {
650,100!
571
      code = code;
×
572
    }
573
  }
574

575
  if (TSDB_CODE_SUCCESS == code) {
1,571,102!
576
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
1,571,103✔
577
  }
578

579
  if (TSDB_CODE_SUCCESS == code) {
1,571,103!
580
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
1,571,105✔
581
  }
582

583
  if (TSDB_CODE_SUCCESS == code) {
1,571,102!
584
    pScanPhysiNode->uid = pScanLogicNode->tableId;
1,571,106✔
585
    pScanPhysiNode->suid = pScanLogicNode->stableId;
1,571,106✔
586
    pScanPhysiNode->tableType = pScanLogicNode->tableType;
1,571,106✔
587
    pScanPhysiNode->groupOrderScan = pScanLogicNode->groupOrderScan;
1,571,106✔
588
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
1,571,106✔
589
    if (NULL != pScanLogicNode->pTagCond) {
1,571,106✔
590
      pSubplan->pTagCond = NULL;
116,349✔
591
      code = nodesCloneNode(pScanLogicNode->pTagCond, &pSubplan->pTagCond);
116,349✔
592
      if (NULL == pSubplan->pTagCond) {
116,349!
593
        code = code;
×
594
      }
595
    }
596
  }
597

598
  if (TSDB_CODE_SUCCESS == code) {
1,571,102✔
599
    if (NULL != pScanLogicNode->pTagIndexCond) {
1,571,101✔
600
      pSubplan->pTagIndexCond = NULL;
11,262✔
601
      code = nodesCloneNode(pScanLogicNode->pTagIndexCond, &pSubplan->pTagIndexCond);
11,262✔
602
      if (NULL == pSubplan->pTagIndexCond) {
11,262!
603
        code = code;
×
604
      }
605
    }
606
  }
607

608
  if (TSDB_CODE_SUCCESS == code) {
1,571,102!
609
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
1,571,102✔
610
  } else {
611
    nodesDestroyNode((SNode*)pScanPhysiNode);
×
612
  }
613

614
  return code;
1,571,102✔
615
}
616

617
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
1,562,785✔
618
  pNodeAddr->nodeId = vg->vgId;
1,562,785✔
619
  pNodeAddr->epSet = vg->epSet;
1,562,785✔
620
}
1,562,785✔
621

622
static ENodeType getScanOperatorType(EScanType scanType) {
1,448,190✔
623
  switch (scanType) {
1,448,190!
624
    case SCAN_TYPE_TAG:
×
625
      return QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
×
626
    case SCAN_TYPE_TABLE:
1,195,048✔
627
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1,195,048✔
628
    case SCAN_TYPE_STREAM:
1,441✔
629
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
1,441✔
630
    case SCAN_TYPE_TABLE_MERGE:
251,691✔
631
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
251,691✔
632
    case SCAN_TYPE_BLOCK_INFO:
13✔
633
      return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
13✔
634
    case SCAN_TYPE_TABLE_COUNT:
×
635
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN;
×
636
    default:
×
637
      break;
×
638
  }
639
  return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
640
}
641

642
static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
13✔
643
                                         SPhysiNode** pPhyNode) {
644
  SScanPhysiNode* pScan =
645
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, getScanOperatorType(pScanLogicNode->scanType));
13✔
646
  if (NULL == pScan) {
13!
647
    return terrno;
×
648
  }
649

650
  if (pScanLogicNode->pVgroupList) {
13!
651
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
13✔
652
  }
653
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
13✔
654
}
655

656
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
76,280✔
657
                                         SPhysiNode** pPhyNode) {
658
  STagScanPhysiNode* pScan =
659
      (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
76,280✔
660
  if (NULL == pScan) {
76,280!
661
    return terrno;
×
662
  }
663
  if (pScanLogicNode->pVgroupList) {
76,280✔
664
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
76,279✔
665
  }
666
  pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx;
76,280✔
667

668
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
76,280✔
669
}
670

671
static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
11,245✔
672
                                          SPhysiNode** pPhyNode) {
673
  SLastRowScanPhysiNode* pScan =
674
      (SLastRowScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN);
11,245✔
675
  if (NULL == pScan) {
11,245!
676
    return terrno;
×
677
  }
678
  pScan->pTargets = NULL;
11,245✔
679
  int32_t code = nodesCloneList(pScanLogicNode->node.pTargets, &pScan->pTargets);
11,245✔
680
  if (TSDB_CODE_SUCCESS  != code) {
11,245!
681
    nodesDestroyNode((SNode*)pScan);
×
682
    return code;
×
683
  }
684
  pScan->pGroupTags = NULL;
11,245✔
685
  code = nodesCloneList(pScanLogicNode->pGroupTags, &pScan->pGroupTags);
11,245✔
686
  if (TSDB_CODE_SUCCESS != code) {
11,245!
687
    nodesDestroyNode((SNode*)pScan);
×
688
    return code;
×
689
  }
690

691
  pScan->groupSort = pScanLogicNode->groupSort;
11,245✔
692
  pScan->ignoreNull = pScanLogicNode->igLastNull;
11,245✔
693

694
  if (pScanLogicNode->pVgroupList) {
11,245!
695
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
11,245✔
696
  }
697

698
  code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
11,245✔
699
  if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) {
11,245!
700
    pScan->pFuncTypes = taosArrayInit(taosArrayGetSize(pScanLogicNode->pFuncTypes), sizeof(int32_t));
368✔
701
    if (NULL == pScan->pFuncTypes) {
368!
702
      return terrno;
×
703
    }
704

705
    SNode* pTargetNode = NULL;
368✔
706
    int funcTypeIndex = 0;
368✔
707
    FOREACH(pTargetNode, ((SScanPhysiNode*)pScan)->pScanCols) {
9,784!
708
      if (((STargetNode*)pTargetNode)->pExpr->type != QUERY_NODE_COLUMN) {
9,416!
709
        continue;
×
710
      }
711
      SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pTargetNode)->pExpr;
9,416✔
712

713
      for (int i = 0; i < TARRAY_SIZE(pScanLogicNode->pFuncTypes); ++i) {
234,094!
714
        SFunctParam* pFunctParam = taosArrayGet(pScanLogicNode->pFuncTypes, i);
234,094✔
715
        if (pColNode->colId == pFunctParam->pCol->colId &&
234,094✔
716
             0 == strncmp(pColNode->colName, pFunctParam->pCol->name, strlen(pColNode->colName))) {
14,270✔
717
          if (NULL == taosArrayInsert(pScan->pFuncTypes, funcTypeIndex, &pFunctParam->type)) {
9,416!
718
            code = terrno;
×
719
          }
720
          break;
9,416✔
721
        }
722
      }
723
      funcTypeIndex++;
9,416✔
724
    }
725
  }
726
  return code;
11,245✔
727
}
728

729
static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
99✔
730
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
731
  STableCountScanPhysiNode* pScan = (STableCountScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
99✔
732
                                                                             QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN);
733
  if (NULL == pScan) {
99!
734
    return terrno;
×
735
  }
736

737
  pScan->pGroupTags = NULL;
99✔
738
  int32_t code = nodesCloneList(pScanLogicNode->pGroupTags, &pScan->pGroupTags);
99✔
739
  if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
99!
740
    nodesDestroyNode((SNode*)pScan);
×
741
    return code;
×
742
  }
743

744
  pScan->groupSort = pScanLogicNode->groupSort;
99✔
745
  if (pScanLogicNode->pVgroupList) {
99!
746
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
99✔
747
  }
748
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
99✔
749
}
750

751
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
1,448,177✔
752
                                        SPhysiNode** pPhyNode) {
753
  STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
1,448,177✔
754
                                                                        getScanOperatorType(pScanLogicNode->scanType));
755
  if (NULL == pTableScan) {
1,448,183!
756
    return terrno;
×
757
  }
758

759
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
1,448,183✔
760
  pTableScan->scanRange = pScanLogicNode->scanRange;
1,448,183✔
761
  pTableScan->ratio = pScanLogicNode->ratio;
1,448,183✔
762
  if (pScanLogicNode->pVgroupList) {
1,448,183✔
763
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
1,447,604✔
764
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
1,447,602✔
765
  }
766
  (void)tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
1,448,181✔
767
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
1,448,182✔
768
  pTableScan->pDynamicScanFuncs = NULL;
1,448,182✔
769
  int32_t code = nodesCloneList(pScanLogicNode->pDynamicScanFuncs, &pTableScan->pDynamicScanFuncs);
1,448,182✔
770
  if (TSDB_CODE_SUCCESS  != code) {
1,448,175!
771
    nodesDestroyNode((SNode*)pTableScan);
×
772
    return code;
×
773
  }
774
  pTableScan->pGroupTags = NULL;
1,448,175✔
775
  code = nodesCloneList(pScanLogicNode->pGroupTags, &pTableScan->pGroupTags);
1,448,175✔
776
  if (TSDB_CODE_SUCCESS  != code) {
1,448,178!
777
    nodesDestroyNode((SNode*)pTableScan);
×
778
    return code;
×
779
  }
780
  pTableScan->groupSort = pScanLogicNode->groupSort;
1,448,178✔
781
  pTableScan->interval = pScanLogicNode->interval;
1,448,178✔
782
  pTableScan->offset = pScanLogicNode->offset;
1,448,178✔
783
  pTableScan->sliding = pScanLogicNode->sliding;
1,448,178✔
784
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
1,448,178✔
785
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
1,448,178✔
786
  pTableScan->triggerType = pScanLogicNode->triggerType;
1,448,178✔
787
  pTableScan->watermark = pScanLogicNode->watermark;
1,448,178✔
788
  pTableScan->igExpired = pScanLogicNode->igExpired;
1,448,178✔
789
  pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
1,448,178✔
790
  pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
1,448,178✔
791
  pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
1,448,178✔
792
  pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
1,448,178✔
793
  pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
1,448,178✔
794
  pTableScan->smallDataTsSort = pScanLogicNode->smallDataTsSort;
1,448,178✔
795

796
  code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
1,448,178✔
797
  if (TSDB_CODE_SUCCESS == code) {
1,448,175!
798
    code = setListSlotId(pCxt, pTableScan->scan.node.pOutputDataBlockDesc->dataBlockId, -1, pScanLogicNode->pTags,
1,448,176✔
799
                         &pTableScan->pTags);
800
  }
801
  if (TSDB_CODE_SUCCESS == code) {
1,448,172!
802
    code = setNodeSlotId(pCxt, pTableScan->scan.node.pOutputDataBlockDesc->dataBlockId, -1, pScanLogicNode->pSubtable,
1,448,175✔
803
                         &pTableScan->pSubtable);
804
  }
805
  return code;
1,448,176✔
806
}
807

808
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
35,290✔
809
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
810
  SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
35,290✔
811
                                                                               QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
812
  if (NULL == pScan) {
35,290!
813
    return terrno;
×
814
  }
815

816
  pSubplan->showRewrite = pScanLogicNode->showRewrite;
35,290✔
817
  pScan->showRewrite = pScanLogicNode->showRewrite;
35,290✔
818
  pScan->accountId = pCxt->pPlanCxt->acctId;
35,290✔
819
  pScan->sysInfo = pCxt->pPlanCxt->sysInfo;
35,290✔
820
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
35,290✔
821
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) ||
30,289✔
822
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS)) {
28,403✔
823
    if (pScanLogicNode->pVgroupList) {
27,373!
824
      vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
27,373✔
825
    }
826
  } else {
827
    pSubplan->execNode.nodeId = MNODE_HANDLE;
7,917✔
828
    pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
7,917✔
829
  }
830
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES) && pScanLogicNode->pVgroupList) {
35,288!
831
    pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
89✔
832
  } else {
833
    pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
35,199✔
834
  }
835
  (void)tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
35,288✔
836

837
  pCxt->hasSysScan = true;
35,290✔
838
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
35,290✔
839
}
840

841
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
1,441✔
842
                                         SPhysiNode** pPhyNode) {
843
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
1,441✔
844
}
845

846
static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
251,691✔
847
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
848
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
251,691✔
849
}
850

851
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
1,571,105✔
852
                                   SPhysiNode** pPhyNode) {
853
  pCxt->hasScan = true;
1,571,105✔
854
  switch (pScanLogicNode->scanType) {
1,571,105✔
855
    case SCAN_TYPE_TAG:
76,280✔
856
      return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
76,280✔
857
    case SCAN_TYPE_BLOCK_INFO:
13✔
858
      return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
13✔
859
    case SCAN_TYPE_TABLE_COUNT:
99✔
860
      return createTableCountScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
99✔
861
    case SCAN_TYPE_LAST_ROW:
11,245✔
862
      return createLastRowScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
11,245✔
863
    case SCAN_TYPE_TABLE:
1,195,045✔
864
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
1,195,045✔
865
    case SCAN_TYPE_SYSTEM_TABLE:
35,290✔
866
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
35,290✔
867
    case SCAN_TYPE_STREAM:
1,441✔
868
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
1,441✔
869
    case SCAN_TYPE_TABLE_MERGE:
251,691✔
870
      return createTableMergeScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
251,691✔
871
    default:
1✔
872
      break;
1✔
873
  }
874
  return TSDB_CODE_FAILED;
1✔
875
}
876

877
static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) {
229,486✔
878
  if (2 == pChildren->length) {
229,486✔
879
    *ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc;
191,680✔
880
  } else if (1 == pChildren->length && nodeType(nodesListGetNode(pChildren, 0)) == QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE) {
37,806!
881
    SGroupCachePhysiNode* pGrpCache = (SGroupCachePhysiNode*)nodesListGetNode(pChildren, 0);
37,806✔
882
    *ppDesc = ((SPhysiNode*)nodesListGetNode(pGrpCache->node.pChildren, idx))->pOutputDataBlockDesc;
37,806✔
883
  } else {
884
    planError("Invalid join children num:%d or child type:%d", pChildren->length, nodeType(nodesListGetNode(pChildren, 0)));
×
885
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
886
  }
887

888
  return TSDB_CODE_SUCCESS;
229,486✔
889
}
890

891
static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SNodeList** ppLeft, SNodeList** ppRight) {
281✔
892
  int32_t code = 0;
281✔
893
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) {
540!
894
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
259✔
895
    SNode* pNew = NULL;
259✔
896
    if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
259✔
897
      code = nodesCloneNode(pOp->pLeft, &pNew);
161✔
898
      if (TSDB_CODE_SUCCESS == code) {
161!
899
        code = nodesListMakeStrictAppend(ppLeft, pNew);
161✔
900
      }
901
    } else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
98!
902
      code = nodesCloneNode(pOp->pLeft, &pNew);
98✔
903
      if (TSDB_CODE_SUCCESS == code) {
98!
904
        code = nodesListMakeStrictAppend(ppRight, pNew);
98✔
905
      }
906
    } else {
907
      planError("invalid col equal list, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId);
×
908
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
909
    }
910
    if (TSDB_CODE_SUCCESS  == code) {
259!
911
      pNew = NULL;
259✔
912
      if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
259✔
913
        code = nodesCloneNode(pOp->pRight, &pNew);
98✔
914
        if (TSDB_CODE_SUCCESS == code) {
98!
915
          code = nodesListMakeStrictAppend(ppLeft, pNew);
98✔
916
        }
917
      } else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
161!
918
        code = nodesCloneNode(pOp->pRight, &pNew);
161✔
919
        if (TSDB_CODE_SUCCESS == code) {
161!
920
          code = nodesListMakeStrictAppend(ppRight, pNew);
161✔
921
        }
922
      } else {
923
        planError("invalid col equal list, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId);
×
924
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
925
      }
926
    }
927
  } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEqCond) && ((SLogicConditionNode*)pEqCond)->condType == LOGIC_COND_TYPE_AND) {
44!
928
    SLogicConditionNode* pLogic = (SLogicConditionNode*)pEqCond;
22✔
929
    SNode* pNode = NULL;
22✔
930
    FOREACH(pNode, pLogic->pParameterList) {
66!
931
      int32_t code = setColEqList(pNode, leftBlkId, rightBlkId, ppLeft, ppRight);
44✔
932
      if (code) {
44!
933
        return code;
×
934
      }
935
    }
936
  } else {
937
    planError("invalid col equal cond, type:%d", nodeType(pEqCond));
×
938
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
939
  }
940

941
  return code;
281✔
942
}
943

944
static int32_t setMergeJoinPrimColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId, SSortMergeJoinPhysiNode* pJoin) {
114,743✔
945
  int32_t code = 0;
114,743✔
946
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
114,743!
947
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
114,743✔
948
    if (pOp->opType != OP_TYPE_EQUAL && JOIN_STYPE_ASOF != subType) {
114,743!
949
      planError("invalid primary cond opType, opType:%d", pOp->opType);
×
950
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
951
    }
952

953
    switch (nodeType(pOp->pLeft)) {
114,743!
954
      case QUERY_NODE_COLUMN: {
114,170✔
955
        SColumnNode* pCol = (SColumnNode*)pOp->pLeft;
114,170✔
956
        if (leftBlkId == pCol->dataBlockId) {
114,170✔
957
          pJoin->leftPrimSlotId = pCol->slotId;
114,004✔
958
          pJoin->asofOpType = pOp->opType;
114,004✔
959
        } else if (rightBlkId == pCol->dataBlockId) {
166!
960
          pJoin->rightPrimSlotId = pCol->slotId;
166✔
961
        } else {
962
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
963
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
964
        }
965
        break;
114,170✔
966
      }
967
      case QUERY_NODE_FUNCTION: {
573✔
968
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft;
573✔
969
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
573!
970
          planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType);
×
971
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
972
        }
973
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
573✔
974
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
573!
975
          planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
×
976
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
977
        }
978
        SColumnNode* pCol = (SColumnNode*)pParam;
573✔
979
        if (leftBlkId == pCol->dataBlockId) {
573✔
980
          pJoin->leftPrimSlotId = pCol->slotId;
487✔
981
          pJoin->asofOpType = pOp->opType;
487✔
982
          pJoin->leftPrimExpr = NULL;
487✔
983
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
487✔
984
        } else if (rightBlkId == pCol->dataBlockId) {
86!
985
          pJoin->rightPrimSlotId = pCol->slotId;
86✔
986
          pJoin->rightPrimExpr = NULL;
86✔
987
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
86✔
988
        } else {
989
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
990
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
991
        }
992
        break;
573✔
993
      }
994
      default:
×
995
        planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft));
×
996
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
997
    }
998
    if (TSDB_CODE_SUCCESS  != code) {
114,743!
999
      return code;
×
1000
    }
1001
    switch (nodeType(pOp->pRight)) {
114,743!
1002
      case QUERY_NODE_COLUMN: {
114,376✔
1003
        SColumnNode* pCol = (SColumnNode*)pOp->pRight;
114,376✔
1004
        if (leftBlkId == pCol->dataBlockId) {
114,376✔
1005
          pJoin->leftPrimSlotId = pCol->slotId;
224✔
1006
          pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
224✔
1007
        } else if (rightBlkId == pCol->dataBlockId) {
114,152!
1008
          pJoin->rightPrimSlotId = pCol->slotId;
114,152✔
1009
        } else {
1010
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1011
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1012
        }
1013
        break;
114,376✔
1014
      }
1015
      case QUERY_NODE_FUNCTION: {
367✔
1016
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight;
367✔
1017
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
367!
1018
          planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType);
×
1019
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1020
        }
1021
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
367✔
1022
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
367!
1023
          planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
×
1024
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1025
        }
1026
        SColumnNode* pCol = (SColumnNode*)pParam;
367✔
1027
        if (leftBlkId == pCol->dataBlockId) {
367✔
1028
          pJoin->leftPrimSlotId = pCol->slotId;
28✔
1029
          pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
28✔
1030
          pJoin->leftPrimExpr = NULL;
28✔
1031
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
28✔
1032
        } else if (rightBlkId == pCol->dataBlockId) {
339!
1033
          pJoin->rightPrimSlotId = pCol->slotId;
339✔
1034
          pJoin->rightPrimExpr = NULL;
339✔
1035
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
339✔
1036
        } else {
1037
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1038
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1039
        }
1040
        break;
367✔
1041
      }
1042
      default:
×
1043
        planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight));
×
1044
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1045
    }
1046
  } else {
1047
    planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
×
1048
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1049
  }
1050

1051
  return code;
114,743✔
1052
}
1053

1054
static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
114,743✔
1055
                                   SPhysiNode** pPhyNode) {
1056
  SSortMergeJoinPhysiNode* pJoin =
1057
      (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
114,743✔
1058
  if (NULL == pJoin) {
114,743!
1059
    return terrno;
×
1060
  }
1061

1062
  SDataBlockDescNode* pLeftDesc = NULL;
114,743✔
1063
  SDataBlockDescNode* pRightDesc = NULL;
114,743✔
1064
  pJoin->joinType = pJoinLogicNode->joinType;
114,743✔
1065
  pJoin->subType = pJoinLogicNode->subType;
114,743✔
1066
  pJoin->pWindowOffset = NULL;
114,743✔
1067
  int32_t code = nodesCloneNode(pJoinLogicNode->pWindowOffset, &pJoin->pWindowOffset);
114,743✔
1068
  if (TSDB_CODE_SUCCESS  == code) {
114,743!
1069
    pJoin->pJLimit = NULL;
114,743✔
1070
    code = nodesCloneNode(pJoinLogicNode->pJLimit, (SNode**)&pJoin->pJLimit);
114,743✔
1071
  }
1072
  if (TSDB_CODE_SUCCESS == code) {
114,743!
1073
    pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
114,743✔
1074
    pJoin->seqWinGroup = pJoinLogicNode->seqWinGroup;
114,743✔
1075
    pJoin->grpJoin = pJoinLogicNode->grpJoin;
114,743✔
1076
    code = getJoinDataBlockDescNode(pChildren, 0, &pLeftDesc);
114,743✔
1077
  }
1078

1079
  if (TSDB_CODE_SUCCESS == code) {
114,743!
1080
    code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc);
114,743✔
1081
  }
1082

1083
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pPrimKeyEqCond) {
114,743!
1084
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
113,597✔
1085
                  &pJoin->pPrimKeyCond);
1086
    if (TSDB_CODE_SUCCESS == code) {
113,597!
1087
      code = setMergeJoinPrimColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);  
113,597✔
1088
    }
1089
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
113,597!
1090
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
515✔
1091
    }
1092
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
113,597!
1093
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
425✔
1094
    }
1095
  }
1096

1097
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->addPrimEqCond) {
114,743!
1098
    SNode* pPrimKeyCond = NULL;
1,146✔
1099
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond,
1,146✔
1100
                  &pPrimKeyCond);
1101
    if (TSDB_CODE_SUCCESS == code) {
1,146!
1102
      code = setMergeJoinPrimColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);  
1,146✔
1103
    }
1104
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
1,146!
1105
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
×
1106
    }
1107
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
1,146!
1108
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
×
1109
    }
1110
    nodesDestroyNode(pPrimKeyCond);
1,146✔
1111
  }
1112

1113
  if (TSDB_CODE_SUCCESS == code) {
114,743!
1114
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
114,743✔
1115
                         &pJoin->pTargets);
1116
  }
1117

1118
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pFullOnCond) {
114,743!
1119
    code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
57,252✔
1120
                         pJoinLogicNode->pFullOnCond, &pJoin->pFullOnCond);
1121
  }
1122

1123
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) {
114,743!
1124
    code = mergeJoinConds(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond);
237✔
1125
  }
1126
  //TODO set from input blocks for group algo
1127
/*  
1128
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
1129
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
1130
  }
1131
*/
1132
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
114,743!
1133
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId,
237✔
1134
                         pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
1135
    if (TSDB_CODE_SUCCESS == code) {        
237!
1136
      code = setColEqList(pJoin->pColEqCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->pEqLeft, &pJoin->pEqRight);  
237✔
1137
    }
1138
  }
1139

1140
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) {
114,743!
1141
    code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond);
57,111✔
1142
  }
1143
  //TODO set from input blocks for group algo
1144
  /*  
1145
    if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
1146
      code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColOnCond, &pJoin->pColOnCond);
1147
    }
1148
  */
1149
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
114,743!
1150
    code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
57,111✔
1151
                         pJoinLogicNode->pColOnCond, &pJoin->pColOnCond);
1152
  }
1153

1154
  
1155
  if (TSDB_CODE_SUCCESS == code) {
114,743!
1156
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
114,743✔
1157
  }
1158

1159
  if (TSDB_CODE_SUCCESS == code) {
114,743!
1160
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
114,743✔
1161
  }
1162

1163
  if (TSDB_CODE_SUCCESS == code) {
114,743!
1164
    *pPhyNode = (SPhysiNode*)pJoin;
114,743✔
1165
  } else {
1166
    nodesDestroyNode((SNode*)pJoin);
×
1167
  }
1168

1169
  return code;
114,743✔
1170
}
1171

1172
static int32_t extractHashJoinOpCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq, SHashJoinPhysiNode* pJoin) {
18,915✔
1173
  int32_t code = 0;
18,915✔
1174
  if (QUERY_NODE_OPERATOR == nodeType(pEq)) {
18,915!
1175
    SOperatorNode* pOp = (SOperatorNode*)pEq;
18,915✔
1176
    SColumnNode* pLeft = (SColumnNode*)pOp->pLeft;
18,915✔
1177
    SColumnNode* pRight = (SColumnNode*)pOp->pRight;
18,915✔
1178
    if (lBlkId == pLeft->dataBlockId && rBlkId == pRight->dataBlockId) {
37,805!
1179
      SNode* pL = NULL, *pR = NULL;
18,890✔
1180
      code = nodesCloneNode(pOp->pLeft, &pL);
18,890✔
1181
      if (TSDB_CODE_SUCCESS == code) {
18,890!
1182
        code = nodesListStrictAppend(pJoin->pOnLeft, pL);
18,890✔
1183
      }
1184
      if (TSDB_CODE_SUCCESS  == code) {
18,890!
1185
        code = nodesCloneNode(pOp->pRight, &pR);
18,890✔
1186
      }
1187
      if (TSDB_CODE_SUCCESS  == code) {
18,890!
1188
        code = nodesListStrictAppend(pJoin->pOnRight, pR);
18,890✔
1189
      }
1190
    } else if (rBlkId == pLeft->dataBlockId && lBlkId == pRight->dataBlockId) {
50!
1191
      SNode* pL = NULL, *pR = NULL;
25✔
1192
      code = nodesCloneNode(pOp->pRight, &pR);
25✔
1193
      if (TSDB_CODE_SUCCESS  == code) {
25!
1194
        code = nodesListStrictAppend(pJoin->pOnLeft, pR);
25✔
1195
      }
1196
      if (TSDB_CODE_SUCCESS  == code) {
25!
1197
        code = nodesCloneNode(pOp->pLeft, &pL);
25✔
1198
      }
1199
      if (TSDB_CODE_SUCCESS  == code) {
25!
1200
        code = nodesListStrictAppend(pJoin->pOnRight, pL);
25✔
1201
      }
1202
    } else {
1203
      planError("Invalid join equal cond, lbid:%d, rbid:%d, oplid:%d, oprid:%d", lBlkId, rBlkId, pLeft->dataBlockId, pRight->dataBlockId);
×
1204
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1205
    }
1206

1207
    return code;
18,915✔
1208
  }
1209

1210
  planError("Invalid join equal node type:%d", nodeType(pEq));
×
1211
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1212
}
1213

1214
static int32_t extractHashJoinOnCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq, SHashJoinPhysiNode* pJoin) {
56,709✔
1215
  if (NULL == pEq) {
56,709✔
1216
    return TSDB_CODE_SUCCESS;
37,806✔
1217
  }
1218

1219
  int32_t code = TSDB_CODE_SUCCESS;
18,903✔
1220
  if (QUERY_NODE_OPERATOR == nodeType(pEq)) {
18,903✔
1221
    code = extractHashJoinOpCols(lBlkId, rBlkId, pEq, pJoin);
18,891✔
1222
  } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEq)) {
12!
1223
    SLogicConditionNode* pLogic = (SLogicConditionNode*)pEq;
12✔
1224
    SNode* pNode = NULL;
12✔
1225
    FOREACH(pNode, pLogic->pParameterList) {
36!
1226
      code = extractHashJoinOpCols(lBlkId, rBlkId, pNode, pJoin);
24✔
1227
      if (code) {
24!
1228
        break;
×
1229
      }
1230
    }
1231
  } else {
1232
    planError("Invalid join equal node type:%d", nodeType(pEq));
×
1233
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1234
  }
1235

1236
  return code;
18,903✔
1237
}
1238

1239
static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1, SNode* pEq2, SNode* pEq3, SHashJoinPhysiNode* pJoin) {
18,903✔
1240
  int32_t code = TSDB_CODE_SUCCESS;
18,903✔
1241
  pJoin->pOnLeft = NULL;
18,903✔
1242
  code = nodesMakeList(&pJoin->pOnLeft);
18,903✔
1243
  if (TSDB_CODE_SUCCESS  != code) {
18,903!
1244
    return code;
×
1245
  }
1246
  pJoin->pOnRight = NULL;
18,903✔
1247
  code = nodesMakeList(&pJoin->pOnRight);
18,903✔
1248
  if (TSDB_CODE_SUCCESS  != code) {
18,903!
1249
    return code;
×
1250
  }
1251

1252
  code = extractHashJoinOnCols(lBlkId, rBlkId, pEq1, pJoin);
18,903✔
1253
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1254
    code = extractHashJoinOnCols(lBlkId, rBlkId, pEq2, pJoin);
18,903✔
1255
  }
1256
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1257
    code = extractHashJoinOnCols(lBlkId, rBlkId, pEq3, pJoin);
18,903✔
1258
  }
1259
  if (TSDB_CODE_SUCCESS == code && pJoin->pOnLeft->length <= 0) {
18,903!
1260
    planError("Invalid join equal column num: %d", pJoin->pOnLeft->length);
×
1261
    code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1262
  }
1263

1264
  return code;
18,903✔
1265
}
1266

1267
static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhysiNode* pJoin) {
18,903✔
1268
  SNode*  pNode = NULL;
18,903✔
1269
  SSHashObj* pHash = tSimpleHashInit(pJoin->pTargets->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
18,903✔
1270
  if (NULL == pHash) {
18,903!
1271
    return TSDB_CODE_OUT_OF_MEMORY;
×
1272
  }
1273
  SNodeList* pNew = NULL;
18,903✔
1274
  int32_t code = nodesMakeList(&pNew);
18,903✔
1275

1276
  if (TSDB_CODE_SUCCESS  == code) {
18,903!
1277
    FOREACH(pNode, pJoin->pTargets) {
94,515!
1278
      SColumnNode* pCol = (SColumnNode*)pNode;
75,612✔
1279
      char *pName = NULL;
75,612✔
1280
      int32_t len = 0;
75,612✔
1281
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
75,612✔
1282
      if (TSDB_CODE_SUCCESS == code) {
75,612!
1283
        code = tSimpleHashPut(pHash, pName, len, &pCol, POINTER_BYTES);
75,612✔
1284
      }
1285
      taosMemoryFree(pName);
75,612✔
1286
      if (TSDB_CODE_SUCCESS  != code) {
75,612!
1287
        break;
×
1288
      }
1289
    }
1290
  }
1291
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1292
    nodesClearList(pJoin->pTargets);
18,903✔
1293
    pJoin->pTargets = pNew;
18,903✔
1294

1295
    FOREACH(pNode, pJoin->pOnLeft) {
37,818!
1296
      char* pName = NULL;
18,915✔
1297
      SColumnNode* pCol = (SColumnNode*)pNode;
18,915✔
1298
      int32_t len = 0;
18,915✔
1299
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
18,915✔
1300
      if (TSDB_CODE_SUCCESS == code) {
18,915!
1301
        SNode** p = tSimpleHashGet(pHash, pName, len);
18,915✔
1302
        if (p) {
18,915!
1303
          code = nodesListStrictAppend(pJoin->pTargets, *p);
×
1304
          if (TSDB_CODE_SUCCESS == code) {
×
1305
            code = tSimpleHashRemove(pHash, pName, len);
×
1306
          }
1307
        }
1308
      }
1309
      taosMemoryFree(pName);
18,915✔
1310
      if (TSDB_CODE_SUCCESS != code) {
18,915!
1311
        break;
×
1312
      }
1313
    }
1314
  }
1315
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1316
    FOREACH(pNode, pJoin->pOnRight) {
37,818!
1317
      char* pName = NULL;
18,915✔
1318
      SColumnNode* pCol = (SColumnNode*)pNode;
18,915✔
1319
      int32_t len = 0;
18,915✔
1320
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
18,915✔
1321
      if (TSDB_CODE_SUCCESS == code) {
18,915!
1322
        SNode** p = tSimpleHashGet(pHash, pName, len);
18,915✔
1323
        if (p) {
18,915!
1324
          code = nodesListStrictAppend(pJoin->pTargets, *p);
×
1325
          if (TSDB_CODE_SUCCESS == code) {
×
1326
            code = tSimpleHashRemove(pHash, pName, len);
×
1327
          }
1328
        }
1329
      }
1330
      taosMemoryFree(pName);
18,915✔
1331
      if (TSDB_CODE_SUCCESS != code) {
18,915!
1332
        break;
×
1333
      }
1334
    }
1335
  }
1336
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1337
    if (tSimpleHashGetSize(pHash) > 0) {
18,903!
1338
      SNode** p = NULL;
18,903✔
1339
      int32_t iter = 0;
18,903✔
1340
      while (1) {
1341
        p = tSimpleHashIterate(pHash, p, &iter);
94,515✔
1342
        if (p == NULL) {
94,515✔
1343
          break;
18,903✔
1344
        }
1345

1346
        code = nodesListStrictAppend(pJoin->pTargets, *p);
75,612✔
1347
        if (TSDB_CODE_SUCCESS != code) {
75,612!
1348
          break;
×
1349
        }
1350
      }
1351
    }
1352
  }
1353

1354
  tSimpleHashCleanup(pHash);
18,903✔
1355

1356
  return code;
18,903✔
1357
}
1358

1359

1360
static int32_t setHashJoinPrimColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SHashJoinPhysiNode* pJoin) {
×
1361
  int32_t code = 0;
×
1362
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
×
1363
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
×
1364
    if (pOp->opType != OP_TYPE_EQUAL) {
×
1365
      planError("invalid primary cond opType, opType:%d", pOp->opType);
×
1366
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1367
    }
1368

1369
    switch (nodeType(pOp->pLeft)) {
×
1370
      case QUERY_NODE_COLUMN: {
×
1371
        SColumnNode* pCol = (SColumnNode*)pOp->pLeft;
×
1372
        if (leftBlkId == pCol->dataBlockId) {
×
1373
          pJoin->leftPrimSlotId = pCol->slotId;
×
1374
        } else if (rightBlkId == pCol->dataBlockId) {
×
1375
          pJoin->rightPrimSlotId = pCol->slotId;
×
1376
        } else {
1377
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1378
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1379
        }
1380
        break;
×
1381
      }
1382
      case QUERY_NODE_FUNCTION: {
×
1383
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft;
×
1384
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
×
1385
          planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType);
×
1386
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1387
        }
1388
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1389
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
×
1390
          planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
×
1391
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1392
        }
1393
        SColumnNode* pCol = (SColumnNode*)pParam;
×
1394
        if (leftBlkId == pCol->dataBlockId) {
×
1395
          pJoin->leftPrimSlotId = pCol->slotId;
×
1396
          pJoin->leftPrimExpr = NULL;
×
1397
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
×
1398
        } else if (rightBlkId == pCol->dataBlockId) {
×
1399
          pJoin->rightPrimSlotId = pCol->slotId;
×
1400
          pJoin->rightPrimExpr = NULL;
×
1401
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
×
1402
        } else {
1403
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1404
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1405
        }
1406
        break;
×
1407
      }
1408
      default:
×
1409
        planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft));
×
1410
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1411
    }
1412
    if (TSDB_CODE_SUCCESS != code) {
×
1413
      return code;
×
1414
    }
1415
    switch (nodeType(pOp->pRight)) {
×
1416
      case QUERY_NODE_COLUMN: {
×
1417
        SColumnNode* pCol = (SColumnNode*)pOp->pRight;
×
1418
        if (leftBlkId == pCol->dataBlockId) {
×
1419
          pJoin->leftPrimSlotId = pCol->slotId;
×
1420
        } else if (rightBlkId == pCol->dataBlockId) {
×
1421
          pJoin->rightPrimSlotId = pCol->slotId;
×
1422
        } else {
1423
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1424
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1425
        }
1426
        break;
×
1427
      }
1428
      case QUERY_NODE_FUNCTION: {
×
1429
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight;
×
1430
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
×
1431
          planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType);
×
1432
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1433
        }
1434
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1435
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
×
1436
          planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
×
1437
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1438
        }
1439
        SColumnNode* pCol = (SColumnNode*)pParam;
×
1440
        if (leftBlkId == pCol->dataBlockId) {
×
1441
          pJoin->leftPrimSlotId = pCol->slotId;
×
1442
          pJoin->leftPrimExpr = NULL;
×
1443
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
×
1444
        } else if (rightBlkId == pCol->dataBlockId) {
×
1445
          pJoin->rightPrimSlotId = pCol->slotId;
×
1446
          pJoin->rightPrimExpr = NULL;
×
1447
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
×
1448
        } else {
1449
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1450
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1451
        }
1452
        break;
×
1453
      }
1454
      default:
×
1455
        planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight));
×
1456
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1457
    }
1458
  } else {
1459
    planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
×
1460
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1461
  }
1462

1463
  return code;
×
1464
}
1465

1466

1467
static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
18,903✔
1468
                                  SPhysiNode** pPhyNode) {
1469
  SHashJoinPhysiNode* pJoin =
1470
     (SHashJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN);
18,903✔
1471
  if (NULL == pJoin) {
18,903!
1472
    return terrno;
×
1473
  }
1474

1475
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
18,903✔
1476
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
18,903✔
1477
  int32_t             code = TSDB_CODE_SUCCESS;
18,903✔
1478

1479
  pJoin->joinType = pJoinLogicNode->joinType;
18,903✔
1480
  pJoin->subType = pJoinLogicNode->subType;
18,903✔
1481
  pJoin->pWindowOffset = NULL;
18,903✔
1482
  code = nodesCloneNode(pJoinLogicNode->pWindowOffset, &pJoin->pWindowOffset);
18,903✔
1483
  if (TSDB_CODE_SUCCESS != code) {
18,903!
1484
    nodesDestroyNode((SNode*)pJoin);
×
1485
    return code;
×
1486
  }
1487
  pJoin->pJLimit = NULL;
18,903✔
1488
  code = nodesCloneNode(pJoinLogicNode->pJLimit, &pJoin->pJLimit);
18,903✔
1489
  if (TSDB_CODE_SUCCESS != code) {
18,903!
1490
    nodesDestroyNode((SNode*)pJoin);
×
1491
    return code;
×
1492
  }
1493
  pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
18,903✔
1494
  pJoin->timeRangeTarget = pJoinLogicNode->timeRangeTarget;
18,903✔
1495
  pJoin->timeRange.skey = pJoinLogicNode->timeRange.skey;
18,903✔
1496
  pJoin->timeRange.ekey = pJoinLogicNode->timeRange.ekey;
18,903✔
1497

1498
  if (NULL != pJoinLogicNode->pPrimKeyEqCond) {
18,903!
1499
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
×
1500
                  &pJoin->pPrimKeyCond);
1501
    if (TSDB_CODE_SUCCESS == code) {
×
1502
      code = setHashJoinPrimColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);  
×
1503
    }
1504
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
×
1505
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
×
1506
    }
1507
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
×
1508
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
×
1509
    }    
1510
  }
1511
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1512
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
18,903✔
1513
  }
1514
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1515
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond);
18,903✔
1516
  }
1517
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1518
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, -1, pJoinLogicNode->pLeftOnCond, &pJoin->pLeftOnCond);
18,903✔
1519
  }
1520
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1521
    code = setNodeSlotId(pCxt, -1, pRightDesc->dataBlockId, pJoinLogicNode->pRightOnCond, &pJoin->pRightOnCond);
18,903✔
1522
  }
1523
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) {
18,903!
1524
    code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond);
×
1525
  }  
1526
  SNode* pOnCond = (NULL != pJoinLogicNode->pColOnCond) ? pJoinLogicNode->pColOnCond : pJoinLogicNode->pTagOnCond;
18,903!
1527
  if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
18,903!
1528
    code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pOnCond, &pJoin->pFullOnCond);
×
1529
  }
1530
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1531
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets);
18,903✔
1532
  }
1533
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1534
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
18,903✔
1535
  }
1536
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1537
    code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond, pJoin->pColEqCond, pJoin->pTagEqCond, pJoin);
18,903✔
1538
  }
1539
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1540
    code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
18,903✔
1541
  }
1542
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1543
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
18,903✔
1544
  }
1545

1546
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1547
    *pPhyNode = (SPhysiNode*)pJoin;
18,903✔
1548
  } else {
1549
    nodesDestroyNode((SNode*)pJoin);
×
1550
  }
1551

1552
  return code;
18,903✔
1553
}
1554

1555
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
133,646✔
1556
                                   SPhysiNode** pPhyNode) {
1557
  switch (pJoinLogicNode->joinAlgo) {
133,646!
1558
    case JOIN_ALGO_MERGE:
114,743✔
1559
      return createMergeJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode);
114,743✔
1560
    case JOIN_ALGO_HASH:
18,903✔
1561
      return createHashJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode);
18,903✔
1562
    default:
×
1563
      planError("Invalid join algorithm:%d", pJoinLogicNode->joinAlgo);
×
1564
      break;
×
1565
  }
1566

1567
  return TSDB_CODE_FAILED;
×
1568
}
1569

1570
static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SGroupCacheLogicNode* pLogicNode,
18,903✔
1571
                                    SPhysiNode** pPhyNode) {
1572
  SGroupCachePhysiNode* pGrpCache =
1573
     (SGroupCachePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE);
18,903✔
1574
  if (NULL == pGrpCache) {
18,903!
1575
    return terrno;
×
1576
  }
1577

1578
  pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull;
18,903✔
1579
  pGrpCache->grpByUid = pLogicNode->grpByUid;
18,903✔
1580
  pGrpCache->globalGrp = pLogicNode->globalGrp;
18,903✔
1581
  pGrpCache->batchFetch = pLogicNode->batchFetch;
18,903✔
1582
  SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
18,903✔
1583
  int32_t             code = TSDB_CODE_SUCCESS;
18,903✔
1584
/*
1585
  if (TSDB_CODE_SUCCESS == code) {
1586
    code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols);
1587
  }
1588
*/
1589

1590
  *pPhyNode = (SPhysiNode*)pGrpCache;
18,903✔
1591

1592
  return code;
18,903✔
1593
}
1594

1595
static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode,
18,903✔
1596
                                            SDynQueryCtrlPhysiNode* pDynCtrl) {
1597
  SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
18,903✔
1598
  SNodeList* pVgList = NULL;
18,903✔
1599
  SNodeList* pUidList = NULL;
18,903✔
1600
  int32_t code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pVgList, &pVgList);
18,903✔
1601
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1602
    code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pUidList, &pUidList);
18,903✔
1603
  }
1604
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1605
    memcpy(pDynCtrl->stbJoin.srcScan, pLogicNode->stbJoin.srcScan, sizeof(pDynCtrl->stbJoin.srcScan));
18,903✔
1606

1607
    SNode* pNode = NULL;
18,903✔
1608
    int32_t i = 0;
18,903✔
1609
    FOREACH(pNode, pVgList) {
56,709!
1610
      pDynCtrl->stbJoin.vgSlot[i] = ((SColumnNode*)pNode)->slotId;
37,806✔
1611
      ++i;
37,806✔
1612
    }
1613
    i = 0;
18,903✔
1614
    FOREACH(pNode, pUidList) {
56,709!
1615
      pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId;
37,806✔
1616
      ++i;
37,806✔
1617
    }
1618
    pDynCtrl->stbJoin.batchFetch = pLogicNode->stbJoin.batchFetch;
18,903✔
1619
  }
1620
  nodesDestroyList(pVgList);
18,903✔
1621
  nodesDestroyList(pUidList);
18,903✔
1622

1623
  return code;
18,903✔
1624
}
1625

1626
static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode,
18,903✔
1627
                                            SPhysiNode** pPhyNode) {
1628
  int32_t code = TSDB_CODE_SUCCESS;
18,903✔
1629
  SDynQueryCtrlPhysiNode* pDynCtrl =
1630
  (SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL);
18,903✔
1631
  if (NULL == pDynCtrl) {
18,903!
1632
    return terrno;
×
1633
  }
1634

1635
  switch (pLogicNode->qType) {
18,903!
1636
    case DYN_QTYPE_STB_HASH:
18,903✔
1637
      code = updateDynQueryCtrlStbJoinInfo(pCxt, pChildren, pLogicNode, pDynCtrl);
18,903✔
1638
      break;
18,903✔
1639
    default:
×
1640
      planError("Invalid dyn query ctrl type:%d", pLogicNode->qType);
×
1641
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1642
  }
1643

1644
  if (TSDB_CODE_SUCCESS == code) {
18,903!
1645
    pDynCtrl->qType = pLogicNode->qType;
18,903✔
1646
    *pPhyNode = (SPhysiNode*)pDynCtrl;
18,903✔
1647
  }
1648

1649
  return code;
18,903✔
1650
}
1651

1652
typedef struct SRewritePrecalcExprsCxt {
1653
  int32_t    errCode;
1654
  int32_t    planNodeId;
1655
  int32_t    rewriteId;
1656
  SNodeList* pPrecalcExprs;
1657
} SRewritePrecalcExprsCxt;
1658

1659
static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
300,285✔
1660
  SNode* pExpr = NULL;
300,285✔
1661
  pCxt->errCode = nodesCloneNode(*pNode, &pExpr);
300,285✔
1662
  if (NULL == pExpr) {
300,285!
1663
    return DEAL_RES_ERROR;
×
1664
  }
1665
  if (TSDB_CODE_SUCCESS != (pCxt->errCode = nodesListAppend(pCxt->pPrecalcExprs, pExpr))) {
300,285!
1666
    nodesDestroyNode(pExpr);
×
1667
    return DEAL_RES_ERROR;
×
1668
  }
1669
  SColumnNode* pCol = NULL;
300,285✔
1670
  pCxt->errCode = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
300,285✔
1671
  if (NULL == pCol) {
300,285!
1672
    nodesDestroyNode(pExpr);
×
1673
    return DEAL_RES_ERROR;
×
1674
  }
1675
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
300,285✔
1676
  pCol->node.resType = pRewrittenExpr->resType;
300,285✔
1677
  if ('\0' != pRewrittenExpr->aliasName[0]) {
300,285✔
1678
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
297,714✔
1679
  } else {
1680
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
2,571✔
1681
             pCxt->rewriteId);
1682
    strcpy(pCol->colName, pRewrittenExpr->aliasName);
2,571✔
1683
  }
1684
  nodesDestroyNode(*pNode);
300,285✔
1685
  *pNode = (SNode*)pCol;
300,285✔
1686
  return DEAL_RES_IGNORE_CHILD;
300,285✔
1687
}
1688

1689
static int32_t rewriteValueToOperator(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
47,926✔
1690
  SOperatorNode* pOper = NULL;
47,926✔
1691
  int32_t code = nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&pOper);
47,926✔
1692
  if (NULL == pOper) {
47,926!
1693
    return code;
×
1694
  }
1695
  pOper->pLeft = NULL;
47,926✔
1696
  code = nodesMakeNode(QUERY_NODE_LEFT_VALUE, &pOper->pLeft);
47,926✔
1697
  if (NULL == pOper->pLeft) {
47,926!
1698
    nodesDestroyNode((SNode*)pOper);
×
1699
    return code;
×
1700
  }
1701
  SValueNode* pVal = (SValueNode*)*pNode;
47,926✔
1702
  pOper->node.resType = pVal->node.resType;
47,926✔
1703
  strcpy(pOper->node.aliasName, pVal->node.aliasName);
47,926✔
1704
  pOper->opType = OP_TYPE_ASSIGN;
47,926✔
1705
  pOper->pRight = *pNode;
47,926✔
1706
  *pNode = (SNode*)pOper;
47,926✔
1707
  return TSDB_CODE_SUCCESS;
47,926✔
1708
}
1709

1710
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
6,450,375✔
1711
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
6,450,375✔
1712
  switch (nodeType(*pNode)) {
6,450,375✔
1713
    case QUERY_NODE_VALUE: {
261,744✔
1714
      if (((SValueNode*)*pNode)->notReserved) {
261,744✔
1715
        break;
213,818✔
1716
      }
1717
      pCxt->errCode = rewriteValueToOperator(pCxt, pNode);
47,926✔
1718
      if (TSDB_CODE_SUCCESS != pCxt->errCode) {
47,926!
1719
        return DEAL_RES_ERROR;
×
1720
      }
1721
      return collectAndRewrite(pCxt, pNode);
47,926✔
1722
    }
1723
    case QUERY_NODE_OPERATOR:
55,286✔
1724
    case QUERY_NODE_LOGIC_CONDITION:
1725
    case QUERY_NODE_CASE_WHEN: {
1726
      return collectAndRewrite(pCxt, pNode);
55,286✔
1727
    }
1728
    case QUERY_NODE_FUNCTION: {
2,612,263✔
1729
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
2,612,263✔
1730
        return collectAndRewrite(pCxt, pNode);
197,069✔
1731
      }
1732
    }
1733
    default:
1734
      break;
5,936,277✔
1735
  }
1736
  return DEAL_RES_CONTINUE;
6,150,095✔
1737
}
1738

1739
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
1,979,647✔
1740
                                   SNodeList** pRewrittenList) {
1741
  if (NULL == pList) {
1,979,647✔
1742
    return TSDB_CODE_SUCCESS;
754,108✔
1743
  }
1744
  int32_t code = 0;
1,225,539✔
1745
  if (NULL == *pPrecalcExprs) {
1,225,539✔
1746
    code = nodesMakeList(pPrecalcExprs);
1,224,517✔
1747
    if (NULL == *pPrecalcExprs) {
1,224,518!
1748
      return code;
×
1749
    }
1750
  }
1751
  if (NULL == *pRewrittenList) {
1,225,540!
1752
    code = nodesMakeList(pRewrittenList);
1,225,541✔
1753
    if (NULL == *pRewrittenList) {
1,225,541!
1754
      return code;
×
1755
    }
1756
  }
1757
  SNode* pNode = NULL;
1,225,540✔
1758
  FOREACH(pNode, pList) {
4,424,898!
1759
    SNode* pNew = NULL;
3,199,364✔
1760
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
3,199,364✔
1761
      code = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0), &pNew);
264,283✔
1762
    } else {
1763
      code = nodesCloneNode(pNode, &pNew);
2,935,081✔
1764
    }
1765
    if (NULL == pNew) {
3,199,361!
1766
      return code;
×
1767
    }
1768
    if (TSDB_CODE_SUCCESS != (code = nodesListAppend(*pRewrittenList, pNew))) {
3,199,361!
1769
      return code;
×
1770
    }
1771
  }
1772
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
1,225,534✔
1773
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
1,225,534✔
1774
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) {
1,225,538!
1775
    NODES_DESTORY_LIST(*pPrecalcExprs);
992,586✔
1776
  }
1777
  return cxt.errCode;
1,225,538✔
1778
}
1779

1780
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
7,104✔
1781
                                  SNode** pRewritten) {
1782
  if (NULL == pNode) {
7,104!
1783
    return TSDB_CODE_SUCCESS;
×
1784
  }
1785

1786
  SNodeList* pList = NULL;
7,104✔
1787
  int32_t    code = nodesListMakeAppend(&pList, pNode);
7,104✔
1788
  SNodeList* pRewrittenList = NULL;
7,104✔
1789
  if (TSDB_CODE_SUCCESS == code) {
7,104!
1790
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
7,104✔
1791
  }
1792
  if (TSDB_CODE_SUCCESS == code) {
7,104!
1793
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
7,104✔
1794
  }
1795
  nodesClearList(pList);
7,104✔
1796
  nodesClearList(pRewrittenList);
7,104✔
1797
  return code;
7,104✔
1798
}
1799

1800
static EDealRes hasCountLikeFunc(SNode* pNode, void* res) {
2,118,197✔
1801
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
2,118,197✔
1802
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
1,048,439✔
1803
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
1,048,439✔
1804
      *(bool*)res = true;
336,286✔
1805
      return DEAL_RES_END;
336,286✔
1806
    }
1807
  }
1808
  return DEAL_RES_CONTINUE;
1,781,909✔
1809
}
1810

1811
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
773,794✔
1812
                                  SPhysiNode** pPhyNode, SSubplan* pSubPlan) {
1813
  SAggPhysiNode* pAgg =
1814
      (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
773,794✔
1815
  if (NULL == pAgg) {
773,803!
1816
    return terrno;
×
1817
  }
1818
  if (pAgg->node.pSlimit) {
773,803✔
1819
    pSubPlan->dynamicRowThreshold = true;
3,757✔
1820
    pSubPlan->rowsThreshold = ((SLimitNode*)pAgg->node.pSlimit)->limit;
3,757✔
1821
  }
1822

1823
  pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
773,803✔
1824
  pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
773,803✔
1825
  pAgg->node.forceCreateNonBlockingOptr = pAggLogicNode->node.forceCreateNonBlockingOptr;
773,803✔
1826

1827
  SNodeList* pPrecalcExprs = NULL;
773,803✔
1828
  SNodeList* pGroupKeys = NULL;
773,803✔
1829
  SNodeList* pAggFuncs = NULL;
773,803✔
1830
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
773,803✔
1831
  if (TSDB_CODE_SUCCESS == code) {
773,800!
1832
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
773,801✔
1833
  }
1834
  nodesWalkExprs(pAggFuncs, hasCountLikeFunc, &pAgg->hasCountLikeFunc);
773,802✔
1835

1836
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
773,800✔
1837
  // push down expression to pOutputDataBlockDesc of child node
1838
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
773,798!
1839
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
215,728✔
1840
    if (TSDB_CODE_SUCCESS == code) {
215,728!
1841
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
215,728✔
1842
    }
1843
  }
1844

1845
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
773,798!
1846
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
221,750✔
1847
    if (TSDB_CODE_SUCCESS == code) {
221,750!
1848
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
221,751✔
1849
    }
1850
  }
1851

1852
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
773,797!
1853
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
571,740✔
1854
    if (TSDB_CODE_SUCCESS == code) {
571,743!
1855
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
571,743✔
1856
    }
1857
  }
1858

1859
  if (TSDB_CODE_SUCCESS == code) {
773,801!
1860
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
773,803✔
1861
  }
1862

1863
  if (TSDB_CODE_SUCCESS == code) {
773,803!
1864
    *pPhyNode = (SPhysiNode*)pAgg;
773,803✔
1865
  } else {
1866
    nodesDestroyNode((SNode*)pAgg);
×
1867
  }
1868

1869
  nodesDestroyList(pPrecalcExprs);
773,803✔
1870
  nodesDestroyList(pGroupKeys);
773,802✔
1871
  nodesDestroyList(pAggFuncs);
773,802✔
1872

1873
  return code;
773,800✔
1874
}
1875

1876
static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
36,049✔
1877
                                            SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
1878
  SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode(
36,049✔
1879
      pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC);
1880
  if (NULL == pIdfRowsFunc) {
36,049!
1881
    return terrno;
×
1882
  }
1883

1884
  SNodeList* pPrecalcExprs = NULL;
36,049✔
1885
  SNodeList* pFuncs = NULL;
36,049✔
1886
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
36,049✔
1887

1888
  if (pIdfRowsFunc->node.inputTsOrder == 0) {
36,049✔
1889
    // default to asc
1890
    pIdfRowsFunc->node.inputTsOrder = TSDB_ORDER_ASC;
483✔
1891
  }
1892

1893
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
36,049✔
1894
  // push down expression to pOutputDataBlockDesc of child node
1895
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
36,049!
1896
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs);
131✔
1897
    if (TSDB_CODE_SUCCESS == code) {
131!
1898
      code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe);
131✔
1899
    }
1900
  }
1901

1902
  if (TSDB_CODE_SUCCESS == code) {
36,049!
1903
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pIdfRowsFunc->pFuncs);
36,049✔
1904
    if (TSDB_CODE_SUCCESS == code) {
36,049!
1905
      code = addDataBlockSlots(pCxt, pIdfRowsFunc->pFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc);
36,049✔
1906
    }
1907
  }
1908

1909
  if (TSDB_CODE_SUCCESS == code) {
36,049!
1910
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pIdfRowsFunc);
36,049✔
1911
  }
1912

1913
  if (TSDB_CODE_SUCCESS == code) {
36,049!
1914
    *pPhyNode = (SPhysiNode*)pIdfRowsFunc;
36,049✔
1915
  } else {
1916
    nodesDestroyNode((SNode*)pIdfRowsFunc);
×
1917
  }
1918

1919
  nodesDestroyList(pPrecalcExprs);
36,049✔
1920
  nodesDestroyList(pFuncs);
36,049✔
1921

1922
  return code;
36,049✔
1923
}
1924

1925
static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
3,672✔
1926
                                         SInterpFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
1927
  SInterpFuncPhysiNode* pInterpFunc = (SInterpFuncPhysiNode*)makePhysiNode(
3,672✔
1928
      pCxt, (SLogicNode*)pFuncLogicNode,
1929
      pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC : QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC);
3,672✔
1930
  if (NULL == pInterpFunc) {
3,672!
1931
    return terrno;
×
1932
  }
1933

1934
  SNodeList* pPrecalcExprs = NULL;
3,672✔
1935
  SNodeList* pFuncs = NULL;
3,672✔
1936
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
3,672✔
1937

1938
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
3,672✔
1939
  // push down expression to pOutputDataBlockDesc of child node
1940
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
3,672!
1941
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pInterpFunc->pExprs);
1,265✔
1942
    if (TSDB_CODE_SUCCESS == code) {
1,265!
1943
      code = pushdownDataBlockSlots(pCxt, pInterpFunc->pExprs, pChildTupe);
1,265✔
1944
    }
1945
  }
1946

1947
  if (TSDB_CODE_SUCCESS == code) {
3,672!
1948
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pInterpFunc->pFuncs);
3,672✔
1949
    if (TSDB_CODE_SUCCESS == code) {
3,672!
1950
      code = addDataBlockSlots(pCxt, pInterpFunc->pFuncs, pInterpFunc->node.pOutputDataBlockDesc);
3,672✔
1951
    }
1952
  }
1953

1954
  if (TSDB_CODE_SUCCESS == code) {
3,672!
1955
    pInterpFunc->timeRange = pFuncLogicNode->timeRange;
3,672✔
1956
    pInterpFunc->interval = pFuncLogicNode->interval;
3,672✔
1957
    pInterpFunc->fillMode = pFuncLogicNode->fillMode;
3,672✔
1958
    pInterpFunc->intervalUnit = pFuncLogicNode->intervalUnit;
3,672✔
1959
    pInterpFunc->precision = pFuncLogicNode->node.precision;
3,672✔
1960
    pInterpFunc->pFillValues = NULL;
3,672✔
1961
    code = nodesCloneNode(pFuncLogicNode->pFillValues, &pInterpFunc->pFillValues);
3,672✔
1962
    if (TSDB_CODE_SUCCESS != code) {
3,672!
1963
      code = code;
×
1964
    }
1965
  }
1966

1967
  if (TSDB_CODE_SUCCESS == code) {
3,672!
1968
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncLogicNode->pTimeSeries, &pInterpFunc->pTimeSeries);
3,672✔
1969
  }
1970

1971
  if (TSDB_CODE_SUCCESS == code) {
3,672!
1972
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pInterpFunc);
3,672✔
1973
  }
1974

1975
  if (pCxt->pPlanCxt->streamQuery) {
3,672✔
1976
    pInterpFunc->streamNodeOption = pFuncLogicNode->streamNodeOption;
9✔
1977
  }
1978

1979
  if (TSDB_CODE_SUCCESS == code) {
3,672!
1980
    *pPhyNode = (SPhysiNode*)pInterpFunc;
3,672✔
1981
  } else {
1982
    nodesDestroyNode((SNode*)pInterpFunc);
×
1983
  }
1984

1985
  nodesDestroyList(pPrecalcExprs);
3,672✔
1986
  nodesDestroyList(pFuncs);
3,672✔
1987

1988
  return code;
3,672✔
1989
}
1990

1991
static int32_t createForecastFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
1992
                                           SForecastFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
1993
  SForecastFuncPhysiNode* pForecastFunc =
1994
      (SForecastFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC);
×
1995
  if (NULL == pForecastFunc) {
×
1996
    return terrno;
×
1997
  }
1998

1999
  SNodeList* pPrecalcExprs = NULL;
×
2000
  SNodeList* pFuncs = NULL;
×
2001
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
×
2002

2003
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
×
2004
  // push down expression to pOutputDataBlockDesc of child node
2005
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
×
2006
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pForecastFunc->pExprs);
×
2007
    if (TSDB_CODE_SUCCESS == code) {
×
2008
      code = pushdownDataBlockSlots(pCxt, pForecastFunc->pExprs, pChildTupe);
×
2009
    }
2010
  }
2011

2012
  if (TSDB_CODE_SUCCESS == code) {
×
2013
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pForecastFunc->pFuncs);
×
2014
    if (TSDB_CODE_SUCCESS == code) {
×
2015
      code = addDataBlockSlots(pCxt, pForecastFunc->pFuncs, pForecastFunc->node.pOutputDataBlockDesc);
×
2016
    }
2017
  }
2018

2019
  if (TSDB_CODE_SUCCESS == code) {
×
2020
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pForecastFunc);
×
2021
  }
2022

2023
  if (TSDB_CODE_SUCCESS == code) {
×
2024
    *pPhyNode = (SPhysiNode*)pForecastFunc;
×
2025
  } else {
2026
    nodesDestroyNode((SNode*)pForecastFunc);
×
2027
  }
2028

2029
  nodesDestroyList(pPrecalcExprs);
×
2030
  nodesDestroyList(pFuncs);
×
2031

2032
  return code;
×
2033
}
2034

2035
static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) {
724,307✔
2036
  if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
724,307✔
2037
    return false;
491✔
2038
  }
2039
  if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) {
723,816✔
2040
    return true;
582,113✔
2041
  }
2042
  if (1 != LIST_LENGTH(pProject->node.pChildren)) {
141,703!
2043
    return true;
28✔
2044
  }
2045
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
141,675✔
2046
  return DATA_ORDER_LEVEL_GLOBAL == pChild->resultDataOrder ? true : false;
141,675✔
2047
}
2048

2049
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
724,307✔
2050
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
2051
  SProjectPhysiNode* pProject =
2052
      (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
724,307✔
2053
  if (NULL == pProject) {
724,307!
2054
    return terrno;
×
2055
  }
2056

2057
  pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode);
724,307✔
2058
  pProject->ignoreGroupId = pProjectLogicNode->ignoreGroupId;
724,307✔
2059
  pProject->inputIgnoreGroup = pProjectLogicNode->inputIgnoreGroup;
724,307✔
2060

2061
  int32_t code = TSDB_CODE_SUCCESS;
724,307✔
2062
  if (0 == LIST_LENGTH(pChildren)) {
724,307!
2063
    code = nodesCloneList(pProjectLogicNode->pProjections, &pProject->pProjections);
2,738✔
2064
  } else {
2065
    code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1,
721,569✔
2066
                         pProjectLogicNode->pProjections, &pProject->pProjections);
721,569✔
2067
  }
2068
  if (TSDB_CODE_SUCCESS == code) {
724,307!
2069
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
724,307✔
2070
                                       pProject->node.pOutputDataBlockDesc);
2071
  }
2072
  if (TSDB_CODE_SUCCESS == code) {
724,305!
2073
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
724,305✔
2074
  }
2075

2076
  if (TSDB_CODE_SUCCESS == code) {
724,305!
2077
    *pPhyNode = (SPhysiNode*)pProject;
724,305✔
2078
  } else {
2079
    nodesDestroyNode((SNode*)pProject);
×
2080
  }
2081

2082
  return code;
724,305✔
2083
}
2084

2085
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
642,323✔
2086
                                         SPhysiNode** pPhyNode) {
2087
  SExchangePhysiNode* pExchange =
2088
      (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
642,323✔
2089
  if (NULL == pExchange) {
642,325!
2090
    return terrno;
×
2091
  }
2092

2093
  pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
642,325✔
2094
  pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
642,325✔
2095
  pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
642,325✔
2096

2097
  int32_t code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pExchange);
642,325✔
2098
  if (TSDB_CODE_SUCCESS == code) {
642,324!
2099
    *pPhyNode = (SPhysiNode*)pExchange;
642,324✔
2100
  } else {
2101
    nodesDestroyNode((SNode*)pExchange);
×
2102
  }
2103

2104
  return code;
642,325✔
2105
}
2106

2107
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
297✔
2108
                                                   SPhysiNode** pPhyNode) {
2109
  SScanPhysiNode* pScan =
2110
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
297✔
2111
  if (NULL == pScan) {
297!
2112
    return terrno;
×
2113
  }
2114

2115
  int32_t code = TSDB_CODE_SUCCESS;
297✔
2116

2117
  pScan->pScanCols = NULL;
297✔
2118
  code = nodesCloneList(pExchangeLogicNode->node.pTargets, &pScan->pScanCols);
297✔
2119

2120
  if (TSDB_CODE_SUCCESS == code) {
297!
2121
    code = sortScanCols(pScan->pScanCols);
297✔
2122
  }
2123

2124
  if (TSDB_CODE_SUCCESS == code) {
297!
2125
    code = sortScanCols(pScan->pScanCols);
297✔
2126
  }
2127
  if (TSDB_CODE_SUCCESS == code) {
297!
2128
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
297✔
2129
  }
2130
  if (TSDB_CODE_SUCCESS == code) {
297!
2131
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pScan);
297✔
2132
  }
2133

2134
  if (TSDB_CODE_SUCCESS == code) {
297!
2135
    *pPhyNode = (SPhysiNode*)pScan;
297✔
2136
  } else {
2137
    nodesDestroyNode((SNode*)pScan);
×
2138
  }
2139

2140
  return code;
297✔
2141
}
2142

2143
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
642,620✔
2144
                                       SPhysiNode** pPhyNode) {
2145
  if (pCxt->pPlanCxt->streamQuery) {
642,620✔
2146
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
297✔
2147
  } else {
2148
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
642,323✔
2149
  }
2150
}
2151

2152
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowPhysiNode* pWindow,
124,834✔
2153
                                             SWindowLogicNode* pWindowLogicNode) {
2154
  pWindow->triggerType = pWindowLogicNode->triggerType;
124,834✔
2155
  pWindow->watermark = pWindowLogicNode->watermark;
124,834✔
2156
  pWindow->deleteMark = pWindowLogicNode->deleteMark;
124,834✔
2157
  pWindow->igExpired = pWindowLogicNode->igExpired;
124,834✔
2158
  if (pCxt->pPlanCxt->streamQuery) {
124,834✔
2159
    pWindow->destHasPrimaryKey = pCxt->pPlanCxt->destHasPrimaryKey;
1,149✔
2160
  }
2161
  pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true);
124,834✔
2162
  pWindow->node.inputTsOrder = pWindowLogicNode->node.inputTsOrder;
124,834✔
2163
  pWindow->node.outputTsOrder = pWindowLogicNode->node.outputTsOrder;
124,834✔
2164
  if (nodeType(pWindow) == QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL) {
124,834✔
2165
    pWindow->node.inputTsOrder = pWindowLogicNode->node.outputTsOrder;
12,793✔
2166
  }
2167

2168
  SNodeList* pPrecalcExprs = NULL;
124,834✔
2169
  SNodeList* pFuncs = NULL;
124,834✔
2170
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
124,834✔
2171

2172
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
124,834✔
2173
  // push down expression to pOutputDataBlockDesc of child node
2174
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
124,834!
2175
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pWindow->pExprs);
9,216✔
2176
    if (TSDB_CODE_SUCCESS == code) {
9,216!
2177
      code = addDataBlockSlots(pCxt, pWindow->pExprs, pChildTupe);
9,216✔
2178
    }
2179
  }
2180

2181
  if (TSDB_CODE_SUCCESS == code) {
124,834!
2182
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
124,834✔
2183
  }
2184
  if (TSDB_CODE_SUCCESS == code && pWindowLogicNode->pTsEnd) {
124,837!
2185
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTsEnd, &pWindow->pTsEnd);
6,626✔
2186
  }
2187

2188
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
124,837!
2189
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
124,837✔
2190
    if (TSDB_CODE_SUCCESS == code) {
124,833!
2191
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
124,833✔
2192
    }
2193
  }
2194

2195
  if (TSDB_CODE_SUCCESS == code) {
124,833!
2196
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pWindowLogicNode, (SPhysiNode*)pWindow);
124,833✔
2197
  }
2198

2199
  nodesDestroyList(pPrecalcExprs);
124,833✔
2200
  nodesDestroyList(pFuncs);
124,832✔
2201

2202
  return code;
124,824✔
2203
}
2204

2205
static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
115,796✔
2206
  switch (windowAlgo) {
115,796✔
2207
    case INTERVAL_ALGO_HASH:
95,417✔
2208
      return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
95,417✔
2209
    case INTERVAL_ALGO_MERGE:
12,792✔
2210
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
12,792✔
2211
    case INTERVAL_ALGO_STREAM_FINAL:
141✔
2212
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
141✔
2213
    case INTERVAL_ALGO_STREAM_SEMI:
141✔
2214
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
141✔
2215
    case INTERVAL_ALGO_STREAM_MID:
141✔
2216
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL;
141✔
2217
    case INTERVAL_ALGO_STREAM_SINGLE:
537✔
2218
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
537✔
2219
    case SESSION_ALGO_STREAM_FINAL:
15✔
2220
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
15✔
2221
    case SESSION_ALGO_STREAM_SEMI:
15✔
2222
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION;
15✔
2223
    case SESSION_ALGO_STREAM_SINGLE:
77✔
2224
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
77✔
2225
    case SESSION_ALGO_MERGE:
6,519✔
2226
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
6,519✔
2227
    default:
1✔
2228
      break;
1✔
2229
  }
2230
  return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
1✔
2231
}
2232

2233
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
109,170✔
2234
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2235
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
109,170✔
2236
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
2237
  if (NULL == pInterval) {
109,172!
2238
    return terrno;
×
2239
  }
2240

2241
  pInterval->interval = pWindowLogicNode->interval;
109,172✔
2242
  pInterval->offset = pWindowLogicNode->offset;
109,172✔
2243
  pInterval->sliding = pWindowLogicNode->sliding;
109,172✔
2244
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
109,172✔
2245
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;
109,172✔
2246

2247
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode);
109,172✔
2248
  if (TSDB_CODE_SUCCESS == code) {
109,160!
2249
    *pPhyNode = (SPhysiNode*)pInterval;
109,160✔
2250
  } else {
2251
    nodesDestroyNode((SNode*)pInterval);
×
2252
  }
2253

2254
  return code;
109,161✔
2255
}
2256

2257
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
6,626✔
2258
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2259
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
6,626✔
2260
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
2261
  if (NULL == pSession) {
6,626!
2262
    return terrno;
×
2263
  }
2264

2265
  pSession->gap = pWindowLogicNode->sessionGap;
6,626✔
2266

2267
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode);
6,626✔
2268
  if (TSDB_CODE_SUCCESS == code) {
6,626!
2269
    *pPhyNode = (SPhysiNode*)pSession;
6,626✔
2270
  } else {
2271
    nodesDestroyNode((SNode*)pSession);
×
2272
  }
2273

2274
  return code;
6,626✔
2275
}
2276

2277
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
7,104✔
2278
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2279
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
7,104✔
2280
      pCxt, (SLogicNode*)pWindowLogicNode,
2281
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE : QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE));
7,104✔
2282
  if (NULL == pState) {
7,104!
2283
    return terrno;
×
2284
  }
2285

2286
  SNodeList* pPrecalcExprs = NULL;
7,104✔
2287
  SNode*     pStateKey = NULL;
7,104✔
2288
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
7,104✔
2289

2290
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
7,104✔
2291
  // push down expression to pOutputDataBlockDesc of child node
2292
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
7,104!
2293
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
4,246✔
2294
    if (TSDB_CODE_SUCCESS == code) {
4,246!
2295
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
4,246✔
2296
    }
2297
  }
2298

2299
  if (TSDB_CODE_SUCCESS == code) {
7,104!
2300
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
7,104✔
2301
    // if (TSDB_CODE_SUCCESS == code) {
2302
    //   code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
2303
    // }
2304
  }
2305

2306
  if (TSDB_CODE_SUCCESS == code) {
7,104!
2307
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode);
7,104✔
2308
  }
2309

2310
  if (TSDB_CODE_SUCCESS == code) {
7,104!
2311
    *pPhyNode = (SPhysiNode*)pState;
7,104✔
2312
  } else {
2313
    nodesDestroyNode((SNode*)pState);
×
2314
  }
2315

2316
  nodesDestroyList(pPrecalcExprs);
7,104✔
2317
  nodesDestroyNode(pStateKey);
7,104✔
2318

2319
  return code;
7,104✔
2320
}
2321

2322
static int32_t createEventWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
664✔
2323
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2324
  SEventWinodwPhysiNode* pEvent = (SEventWinodwPhysiNode*)makePhysiNode(
664✔
2325
      pCxt, (SLogicNode*)pWindowLogicNode,
2326
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT : QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT));
664✔
2327
  if (NULL == pEvent) {
664!
2328
    return terrno;
×
2329
  }
2330

2331
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
664✔
2332
  int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pStartCond, &pEvent->pStartCond);
664✔
2333
  if (TSDB_CODE_SUCCESS == code) {
664!
2334
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pEndCond, &pEvent->pEndCond);
664✔
2335
  }
2336
  if (TSDB_CODE_SUCCESS == code) {
664!
2337
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pEvent->window, pWindowLogicNode);
664✔
2338
  }
2339

2340
  if (TSDB_CODE_SUCCESS == code) {
664!
2341
    *pPhyNode = (SPhysiNode*)pEvent;
664✔
2342
  } else {
2343
    nodesDestroyNode((SNode*)pEvent);
×
2344
  }
2345

2346
  return code;
664✔
2347
}
2348

2349
static int32_t createCountWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
1,270✔
2350
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2351
  SCountWinodwPhysiNode* pCount = (SCountWinodwPhysiNode*)makePhysiNode(
1,270✔
2352
      pCxt, (SLogicNode*)pWindowLogicNode,
2353
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT : QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT));
1,270✔
2354
  if (NULL == pCount) {
1,270!
2355
    return terrno;
×
2356
  }
2357
  pCount->windowCount = pWindowLogicNode->windowCount;
1,270✔
2358
  pCount->windowSliding = pWindowLogicNode->windowSliding;
1,270✔
2359

2360
  int32_t  code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pCount->window, pWindowLogicNode);
1,270✔
2361
  if (TSDB_CODE_SUCCESS == code) {
1,270!
2362
    *pPhyNode = (SPhysiNode*)pCount;
1,270✔
2363
  } else {
2364
    nodesDestroyNode((SNode*)pCount);
×
2365
  }
2366

2367
  return code;
1,270✔
2368
}
2369

2370
static int32_t createAnomalyWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
2371
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2372
  SAnomalyWindowPhysiNode* pAnomaly = (SAnomalyWindowPhysiNode*)makePhysiNode(
×
2373
      pCxt, (SLogicNode*)pWindowLogicNode,
2374
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY : QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY));
×
2375
  if (NULL == pAnomaly) {
×
2376
    return terrno;
×
2377
  }
2378

2379
  SNodeList* pPrecalcExprs = NULL;
×
2380
  SNode*     pAnomalyKey = NULL;
×
2381
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pAnomalyExpr, &pPrecalcExprs, &pAnomalyKey);
×
2382

2383
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
×
2384
  // push down expression to pOutputDataBlockDesc of child node
2385
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
×
2386
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAnomaly->window.pExprs);
×
2387
    if (TSDB_CODE_SUCCESS == code) {
×
2388
      code = addDataBlockSlots(pCxt, pAnomaly->window.pExprs, pChildTupe);
×
2389
    }
2390
  }
2391

2392
  if (TSDB_CODE_SUCCESS == code) {
×
2393
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pAnomalyKey, &pAnomaly->pAnomalyKey);
×
2394
    // if (TSDB_CODE_SUCCESS == code) {
2395
    //   code = addDataBlockSlot(pCxt, &pAnomaly->pAnomalyKey, pAnomaly->window.node.pOutputDataBlockDesc);
2396
    // }
2397
  }
2398

2399
  tstrncpy(pAnomaly->anomalyOpt, pWindowLogicNode->anomalyOpt, sizeof(pAnomaly->anomalyOpt));
×
2400

2401
  if (TSDB_CODE_SUCCESS == code) {
×
2402
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pAnomaly->window, pWindowLogicNode);
×
2403
  }
2404

2405
  if (TSDB_CODE_SUCCESS == code) {
×
2406
    *pPhyNode = (SPhysiNode*)pAnomaly;
×
2407
  } else {
2408
    nodesDestroyNode((SNode*)pAnomaly);
×
2409
  }
2410

2411
  nodesDestroyList(pPrecalcExprs);
×
2412
  nodesDestroyNode(pAnomalyKey);
×
2413

2414
  return code;
×
2415
}
2416

2417
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
124,834✔
2418
                                     SPhysiNode** pPhyNode) {
2419
  switch (pWindowLogicNode->winType) {
124,834!
2420
    case WINDOW_TYPE_INTERVAL:
109,170✔
2421
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
109,170✔
2422
    case WINDOW_TYPE_SESSION:
6,626✔
2423
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
6,626✔
2424
    case WINDOW_TYPE_STATE:
7,104✔
2425
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
7,104✔
2426
    case WINDOW_TYPE_EVENT:
664✔
2427
      return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
664✔
2428
    case WINDOW_TYPE_COUNT:
1,270✔
2429
      return createCountWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
1,270✔
2430
    case WINDOW_TYPE_ANOMALY:
×
2431
      return createAnomalyWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
×
2432
    default:
×
2433
      break;
×
2434
  }
2435
  return TSDB_CODE_FAILED;
×
2436
}
2437

2438
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
238,508✔
2439
                                   SPhysiNode** pPhyNode) {
2440
  SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(
238,508✔
2441
      pCxt, (SLogicNode*)pSortLogicNode,
2442
      pSortLogicNode->groupSort ? QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT : QUERY_NODE_PHYSICAL_PLAN_SORT);
238,508✔
2443
  if (NULL == pSort) {
238,508!
2444
    return terrno;
×
2445
  }
2446

2447
  SNodeList* pPrecalcExprs = NULL;
238,508✔
2448
  SNodeList* pSortKeys = NULL;
238,508✔
2449
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
238,508✔
2450
  pSort->calcGroupId = pSortLogicNode->calcGroupId;
238,508✔
2451
  pSort->excludePkCol = pSortLogicNode->excludePkCol;
238,508✔
2452

2453
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
238,508✔
2454
  // push down expression to pOutputDataBlockDesc of child node
2455
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
238,508!
2456
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
144✔
2457
    if (TSDB_CODE_SUCCESS == code) {
144!
2458
      code = pushdownDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
144✔
2459
    }
2460
  }
2461

2462
  if (TSDB_CODE_SUCCESS == code) {
238,508!
2463
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
238,508✔
2464
  }
2465

2466
  if (TSDB_CODE_SUCCESS == code) {
238,508!
2467
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
238,508✔
2468
    if (TSDB_CODE_SUCCESS == code) {
238,508!
2469
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
238,508✔
2470
    }
2471
  }
2472

2473
  if (TSDB_CODE_SUCCESS == code) {
238,508!
2474
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pSortLogicNode, (SPhysiNode*)pSort);
238,508✔
2475
  }
2476

2477
  if (TSDB_CODE_SUCCESS == code) {
238,508!
2478
    *pPhyNode = (SPhysiNode*)pSort;
238,508✔
2479
  } else {
2480
    nodesDestroyNode((SNode*)pSort);
×
2481
  }
2482

2483
  nodesDestroyList(pPrecalcExprs);
238,508✔
2484
  nodesDestroyList(pSortKeys);
238,508✔
2485

2486
  return code;
238,508✔
2487
}
2488

2489
static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* pChildren,
21,879✔
2490
                                            SPartitionLogicNode* pPartLogicNode, ENodeType type,
2491
                                            SPhysiNode** pPhyNode) {
2492
  SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, type);
21,879✔
2493
  if (NULL == pPart) {
21,879!
2494
    return terrno;
×
2495
  }
2496

2497
  SNodeList* pPrecalcExprs = NULL;
21,879✔
2498
  SNodeList* pPartitionKeys = NULL;
21,879✔
2499
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
21,879✔
2500
  pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder;
21,879✔
2501

2502
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
21,879✔
2503
  // push down expression to pOutputDataBlockDesc of child node
2504
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
21,879!
2505
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
1,198✔
2506
    if (TSDB_CODE_SUCCESS == code) {
1,198!
2507
      code = pushdownDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
1,198✔
2508
    }
2509
  }
2510

2511
  if (TSDB_CODE_SUCCESS == code) {
21,879!
2512
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
21,879✔
2513
  }
2514

2515
  if (TSDB_CODE_SUCCESS == code) {
21,879!
2516
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
21,879✔
2517
    if (TSDB_CODE_SUCCESS == code) {
21,879!
2518
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
21,879✔
2519
    }
2520
  }
2521

2522
  if (pPart->needBlockOutputTsOrder) {
21,879✔
2523
    SNode* node;
2524
    bool found = false;
1,925✔
2525
    FOREACH(node, pPartLogicNode->node.pTargets) {
1,925!
2526
      if (nodeType(node) == QUERY_NODE_COLUMN) {
1,925!
2527
        SColumnNode* pCol = (SColumnNode*)node;
1,925✔
2528
        if (pCol->tableId == pPartLogicNode->pkTsColTbId && pCol->colId == pPartLogicNode->pkTsColId) {
1,925!
2529
          pPart->tsSlotId = pCol->slotId;
1,925✔
2530
          found = true;
1,925✔
2531
          break;
1,925✔
2532
        }
2533
      }
2534
    }
2535
    if (!found) code = TSDB_CODE_PLAN_INTERNAL_ERROR;
1,925!
2536
  }
2537

2538
  if (TSDB_CODE_SUCCESS == code) {
21,879!
2539
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart);
21,879✔
2540
  }
2541

2542
  if (TSDB_CODE_SUCCESS == code) {
21,879!
2543
    *pPhyNode = (SPhysiNode*)pPart;
21,879✔
2544
  } else {
2545
    nodesDestroyNode((SNode*)pPart);
×
2546
  }
2547

2548
  nodesDestroyList(pPrecalcExprs);
21,879✔
2549
  nodesDestroyList(pPartitionKeys);
21,879✔
2550

2551
  return code;
21,879✔
2552
}
2553

2554
static int32_t createStreamPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
128✔
2555
                                              SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
2556
  SStreamPartitionPhysiNode* pPart = NULL;
128✔
2557
  int32_t                    code = createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode,
128✔
2558
                                                                 QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, (SPhysiNode**)&pPart);
2559
  SDataBlockDescNode*        pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
128✔
2560
  if (TSDB_CODE_SUCCESS == code) {
128!
2561
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pTags, &pPart->pTags);
128✔
2562
  }
2563
  if (TSDB_CODE_SUCCESS == code) {
128!
2564
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pSubtable, &pPart->pSubtable);
128✔
2565
  }
2566
  if (TSDB_CODE_SUCCESS == code) {
128!
2567
    *pPhyNode = (SPhysiNode*)pPart;
128✔
2568
  } else {
2569
    nodesDestroyNode((SNode*)pPart);
×
2570
  }
2571
  return code;
128✔
2572
}
2573

2574
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
21,879✔
2575
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
2576
  if (pCxt->pPlanCxt->streamQuery) {
21,879✔
2577
    return createStreamPartitionPhysiNode(pCxt, pChildren, pPartLogicNode, pPhyNode);
128✔
2578
  }
2579
  return createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION, pPhyNode);
21,751✔
2580
}
2581

2582
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
15,833✔
2583
                                   SPhysiNode** pPhyNode) {
2584
  SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(
15,833✔
2585
      pCxt, (SLogicNode*)pFillNode,
2586
      pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL : QUERY_NODE_PHYSICAL_PLAN_FILL);
15,833✔
2587
  if (NULL == pFill) {
15,835!
2588
    return terrno;
×
2589
  }
2590

2591
  pFill->mode = pFillNode->mode;
15,835✔
2592
  pFill->timeRange = pFillNode->timeRange;
15,835✔
2593
  pFill->node.inputTsOrder = pFillNode->node.inputTsOrder;
15,835✔
2594

2595
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
15,835✔
2596
  int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillExprs, &pFill->pFillExprs);
15,835✔
2597
  if (TSDB_CODE_SUCCESS == code) {
15,834!
2598
    code = addDataBlockSlots(pCxt, pFill->pFillExprs, pFill->node.pOutputDataBlockDesc);
15,834✔
2599
  }
2600
  if (TSDB_CODE_SUCCESS == code) {
15,834!
2601
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pNotFillExprs, &pFill->pNotFillExprs);
15,834✔
2602
  }
2603
  if (TSDB_CODE_SUCCESS == code) {
15,834!
2604
    code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc);
15,834✔
2605
  }
2606
  if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFillNode->pFillNullExprs) > 0) {
15,835!
2607
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillNullExprs, &pFill->pFillNullExprs);
41✔
2608
    if (TSDB_CODE_SUCCESS == code ) {
41!
2609
      code = addDataBlockSlots(pCxt, pFill->pFillNullExprs, pFill->node.pOutputDataBlockDesc);
41✔
2610
    }
2611
  }
2612

2613
  if (TSDB_CODE_SUCCESS == code) {
15,835!
2614
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs);
15,835✔
2615
  }
2616
  if (TSDB_CODE_SUCCESS == code) {
15,834!
2617
    code = addDataBlockSlot(pCxt, &pFill->pWStartTs, pFill->node.pOutputDataBlockDesc);
15,834✔
2618
  }
2619

2620
  if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
15,833!
2621
    code = nodesCloneNode(pFillNode->pValues, &pFill->pValues);
373✔
2622
  }
2623

2624
  if (TSDB_CODE_SUCCESS == code) {
15,833!
2625
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFillNode, (SPhysiNode*)pFill);
15,833✔
2626
  }
2627

2628
  if (TSDB_CODE_SUCCESS == code) {
15,833!
2629
    *pPhyNode = (SPhysiNode*)pFill;
15,833✔
2630
  } else {
2631
    nodesDestroyNode((SNode*)pFill);
×
2632
  }
2633

2634
  return code;
15,833✔
2635
}
2636

2637
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge, int32_t idx) {
348,330✔
2638
  SExchangePhysiNode* pExchange = NULL;
348,330✔
2639
  int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, (SNode**)&pExchange);
348,330✔
2640
  if (NULL == pExchange) {
348,333!
2641
    return code;
×
2642
  }
2643
  pExchange->srcStartGroupId = pMerge->srcGroupId + idx;
348,333✔
2644
  pExchange->srcEndGroupId = pMerge->srcGroupId + idx;
348,333✔
2645
  pExchange->singleChannel = true;
348,333✔
2646
  pExchange->node.pParent = (SPhysiNode*)pMerge;
348,333✔
2647
  pExchange->node.pOutputDataBlockDesc = NULL;
348,333✔
2648
  code = nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc, (SNode**)&pExchange->node.pOutputDataBlockDesc);
348,333✔
2649
  if (NULL == pExchange->node.pOutputDataBlockDesc) {
348,332!
2650
    nodesDestroyNode((SNode*)pExchange);
×
2651
    return code;
×
2652
  }
2653
  SNode* pSlot = NULL;
348,332✔
2654
  FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; }
2,016,316!
2655
  return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
348,332✔
2656
}
2657

2658
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
141,861✔
2659
  int32_t code = TSDB_CODE_SUCCESS;
141,861✔
2660
  SMergePhysiNode* pMerge =
2661
      (SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
141,861✔
2662
  if (NULL == pMerge) {
141,862!
2663
    return terrno;
×
2664
  }
2665

2666
  if (pMergeLogicNode->colsMerge) {
141,862✔
2667
    pMerge->type = MERGE_TYPE_COLUMNS;
334✔
2668
  } else if (pMergeLogicNode->needSort) {
141,528✔
2669
    pMerge->type = MERGE_TYPE_SORT;
138,571✔
2670
  } else {
2671
    pMerge->type = MERGE_TYPE_NON_SORT;
2,957✔
2672
  }
2673
  
2674
  pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
141,862✔
2675
  pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
141,862✔
2676
  pMerge->srcEndGroupId = pMergeLogicNode->srcEndGroupId;
141,862✔
2677
  pMerge->groupSort = pMergeLogicNode->groupSort;
141,862✔
2678
  pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
141,862✔
2679
  pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId;
141,862✔
2680

2681
  if (!pMergeLogicNode->colsMerge) {
141,862✔
2682
    code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
141,528✔
2683

2684
    if (TSDB_CODE_SUCCESS == code) {
141,528!
2685
      for (int32_t j = 0; j < pMergeLogicNode->numOfSubplans; ++j) {
289,263✔
2686
        for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
496,065✔
2687
          code = createExchangePhysiNodeByMerge(pMerge, j);
348,330✔
2688
          if (TSDB_CODE_SUCCESS != code) {
348,331!
2689
            break;
×
2690
          }
2691
        }
2692
        if (code) break;
147,735!
2693
      }
2694
    }
2695

2696
    if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
141,529✔
2697
      code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
137,573✔
2698
                           &pMerge->pMergeKeys);
2699
    }
2700

2701
    if (TSDB_CODE_SUCCESS == code) {
141,529✔
2702
      code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
141,528✔
2703
                           &pMerge->pTargets);
2704
    }
2705
    if (TSDB_CODE_SUCCESS == code) {
141,529✔
2706
      code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
141,528✔
2707
    }
2708
  } else {
2709
    SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
334✔
2710
    SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
334✔
2711

2712
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pMergeLogicNode->node.pTargets, &pMerge->pTargets);
334✔
2713
    if (TSDB_CODE_SUCCESS == code) {
334!
2714
      code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
334✔
2715
    }
2716
  }
2717

2718
  if (TSDB_CODE_SUCCESS == code) {
141,862!
2719
    *pPhyNode = (SPhysiNode*)pMerge;
141,862✔
2720
  } else {
2721
    nodesDestroyNode((SNode*)pMerge);
×
2722
  }
2723

2724
  return code;
141,862✔
2725
}
2726

2727
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
4,465,905✔
2728
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
2729
  switch (nodeType(pLogicNode)) {
4,465,905!
2730
    case QUERY_NODE_LOGIC_PLAN_SCAN:
1,571,106✔
2731
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
1,571,106✔
2732
    case QUERY_NODE_LOGIC_PLAN_JOIN:
133,646✔
2733
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
133,646✔
2734
    case QUERY_NODE_LOGIC_PLAN_AGG:
773,797✔
2735
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode, pSubplan);
773,797✔
2736
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
724,307✔
2737
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
724,307✔
2738
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
642,622✔
2739
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
642,622✔
2740
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
124,834✔
2741
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
124,834✔
2742
    case QUERY_NODE_LOGIC_PLAN_SORT:
238,508✔
2743
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
238,508✔
2744
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
21,879✔
2745
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
21,879✔
2746
    case QUERY_NODE_LOGIC_PLAN_FILL:
15,833✔
2747
      return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
15,833✔
2748
    case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
36,049✔
2749
      return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
36,049✔
2750
    case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
3,672✔
2751
      return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode);
3,672✔
2752
    case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
×
2753
      return createForecastFuncPhysiNode(pCxt, pChildren, (SForecastFuncLogicNode*)pLogicNode, pPhyNode);
×
2754
    case QUERY_NODE_LOGIC_PLAN_MERGE:
141,861✔
2755
      return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode);
141,861✔
2756
    case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
18,903✔
2757
      return createGroupCachePhysiNode(pCxt, pChildren, (SGroupCacheLogicNode*)pLogicNode, pPhyNode);
18,903✔
2758
    case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
18,903✔
2759
      return createDynQueryCtrlPhysiNode(pCxt, pChildren, (SDynQueryCtrlLogicNode*)pLogicNode, pPhyNode);
18,903✔
2760
    default:
×
2761
      break;
×
2762
  }
2763

2764
  return TSDB_CODE_FAILED;
×
2765
}
2766

2767
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
4,465,909✔
2768
                               SPhysiNode** pPhyNode) {
2769
  SNodeList* pChildren = NULL;
4,465,909✔
2770
  int32_t code = nodesMakeList(&pChildren);
4,465,909✔
2771
  if (NULL == pChildren) {
4,465,912!
2772
    return code;
×
2773
  }
2774

2775
  SNode* pLogicChild;
2776
  FOREACH(pLogicChild, pLogicNode->pChildren) {
6,726,721✔
2777
    SPhysiNode* pChild = NULL;
2,260,814✔
2778
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
2,260,814✔
2779
    if (TSDB_CODE_SUCCESS == code) {
2,260,811!
2780
      code = nodesListStrictAppend(pChildren, (SNode*)pChild);
2,260,812✔
2781
    }
2782
    if (TSDB_CODE_SUCCESS != code) {
2,260,809!
2783
      break;
×
2784
    }
2785
  }
2786

2787
  if (TSDB_CODE_SUCCESS == code) {
4,465,907!
2788
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
4,465,909✔
2789
  }
2790

2791
  if (TSDB_CODE_SUCCESS == code) {
4,465,887!
2792
    if (LIST_LENGTH(pChildren) > 0) {
6,573,802!
2793
      (*pPhyNode)->pChildren = pChildren;
2,107,915✔
2794
      SNode* pChild;
2795
      FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
4,368,714!
2796
    } else {
2797
      nodesDestroyList(pChildren);
2,357,972✔
2798
    }
2799
  } else {
2800
    nodesDestroyList(pChildren);
×
2801
  }
2802

2803
  return code;
4,465,891✔
2804
}
2805

2806
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
1,750,200✔
2807
  SDataInserterNode* pInserter = NULL;
1,750,200✔
2808
  int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT, (SNode**)&pInserter);
1,750,200✔
2809
  if (NULL == pInserter) {
1,750,204!
2810
    return code;
×
2811
  }
2812

2813
  pInserter->numOfTables = pBlocks->numOfTables;
1,750,204✔
2814
  pInserter->size = pBlocks->size;
1,750,204✔
2815
  TSWAP(pInserter->pData, pBlocks->pData);
1,750,204✔
2816

2817
  *pSink = (SDataSinkNode*)pInserter;
1,750,204✔
2818
  return TSDB_CODE_SUCCESS;
1,750,204✔
2819
}
2820

2821
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
2,193,960✔
2822
  SDataDispatcherNode* pDispatcher = NULL;
2,193,960✔
2823
  int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&pDispatcher);
2,193,960✔
2824
  if (NULL == pDispatcher) {
2,193,968!
2825
    return code;
×
2826
  }
2827

2828
  pDispatcher->sink.pInputDataBlockDesc = NULL;
2,193,968✔
2829
  code = nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc, (SNode**)&pDispatcher->sink.pInputDataBlockDesc);
2,193,968✔
2830
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
2,193,963✔
2831
    nodesDestroyNode((SNode*)pDispatcher);
2✔
2832
    return code;
×
2833
  }
2834

2835
  *pSink = (SDataSinkNode*)pDispatcher;
2,193,961✔
2836
  return TSDB_CODE_SUCCESS;
2,193,961✔
2837
}
2838

2839
static int32_t makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** ppSubplan) {
3,955,300✔
2840
  SSubplan* pSubplan = NULL;
3,955,300✔
2841
  int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN, (SNode**)&pSubplan);
3,955,300✔
2842
  if (NULL == pSubplan) {
3,955,311!
2843
    return code;
×
2844
  }
2845
  pSubplan->id = pLogicSubplan->id;
3,955,311✔
2846
  pSubplan->subplanType = pLogicSubplan->subplanType;
3,955,311✔
2847
  pSubplan->level = pLogicSubplan->level;
3,955,311✔
2848
  pSubplan->rowsThreshold = 4096;
3,955,311✔
2849
  pSubplan->dynamicRowThreshold = false;
3,955,311✔
2850
  pSubplan->isView = pCxt->pPlanCxt->isView;
3,955,311✔
2851
  pSubplan->isAudit = pCxt->pPlanCxt->isAudit;
3,955,311✔
2852
  if (NULL != pCxt->pPlanCxt->pUser) {
3,955,311✔
2853
    snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser);
3,953,569✔
2854
  }
2855
  *ppSubplan = pSubplan;
3,955,311✔
2856
  return code;
3,955,311✔
2857
}
2858

2859
static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
1,750,201✔
2860
  pSubplan->msgType = pModify->msgType;
1,750,201✔
2861
  pSubplan->execNode.nodeId = pModify->pVgDataBlocks->vg.vgId;
1,750,201✔
2862
  pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
1,750,201✔
2863
  return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
1,750,201✔
2864
}
2865

2866
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan,
172✔
2867
                                   SDataSinkNode** pSink) {
2868
  SQueryInserterNode* pInserter = NULL;
172✔
2869
  int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, (SNode**)&pInserter);
172✔
2870
  if (NULL == pInserter) {
172!
2871
    return code;
×
2872
  }
2873

2874
  pInserter->tableId = pModify->tableId;
172✔
2875
  pInserter->stableId = pModify->stableId;
172✔
2876
  pInserter->tableType = pModify->tableType;
172✔
2877
  strcpy(pInserter->tableName, pModify->tableName);
172✔
2878
  pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false);
172✔
2879
  if (pModify->pVgroupList) {
172!
2880
    pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
172✔
2881
    pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
172✔
2882
    vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
172✔
2883
  }
2884
  code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
172✔
2885
                               &pInserter->pCols);
172✔
2886
  if (TSDB_CODE_SUCCESS == code) {
172!
2887
    pInserter->sink.pInputDataBlockDesc = NULL;
172✔
2888
    code = nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc, (SNode**)&pInserter->sink.pInputDataBlockDesc);
172✔
2889
    if (NULL == pInserter->sink.pInputDataBlockDesc) {
172!
2890
      code = code;
×
2891
    }
2892
  }
2893

2894
  if (TSDB_CODE_SUCCESS == code) {
172!
2895
    *pSink = (SDataSinkNode*)pInserter;
172✔
2896
  } else {
2897
    nodesDestroyNode((SNode*)pInserter);
×
2898
  }
2899

2900
  return code;
172✔
2901
}
2902

2903
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
172✔
2904
  int32_t code =
2905
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
172✔
2906
  if (TSDB_CODE_SUCCESS == code) {
172!
2907
    code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
172✔
2908
  }
2909
  pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
172✔
2910
  return code;
172✔
2911
}
2912

2913
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
1,750,372✔
2914
  if (NULL == pModify->node.pChildren) {
1,750,372✔
2915
    return buildInsertValuesSubplan(pCxt, pModify, pSubplan);
1,750,202✔
2916
  }
2917
  return buildInsertSelectSubplan(pCxt, pModify, pSubplan);
170✔
2918
}
2919

2920
static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
9,225✔
2921
                                 SDataSinkNode** pSink) {
2922
  SDataDeleterNode* pDeleter = NULL;
9,225✔
2923
  int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE, (SNode**)&pDeleter);
9,225✔
2924
  if (NULL == pDeleter) {
9,225!
2925
    return code;
×
2926
  }
2927

2928
  pDeleter->tableId = pModify->tableId;
9,225✔
2929
  pDeleter->tableType = pModify->tableType;
9,225✔
2930
  strcpy(pDeleter->tableFName, pModify->tableName);
9,225✔
2931
  strcpy(pDeleter->tsColName, pModify->tsColName);
9,225✔
2932
  pDeleter->deleteTimeRange = pModify->deleteTimeRange;
9,225✔
2933

2934
  code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
9,225✔
2935
                               &pDeleter->pAffectedRows);
9,225✔
2936
  if (TSDB_CODE_SUCCESS == code) {
9,225!
2937
    code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pStartTs, &pDeleter->pStartTs);
9,225✔
2938
  }
2939
  if (TSDB_CODE_SUCCESS == code) {
9,225!
2940
    code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pEndTs, &pDeleter->pEndTs);
9,225✔
2941
  }
2942
  if (TSDB_CODE_SUCCESS == code) {
9,225!
2943
    pDeleter->sink.pInputDataBlockDesc = NULL;
9,225✔
2944
    code = nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc, (SNode**)&pDeleter->sink.pInputDataBlockDesc);
9,225✔
2945
    if (NULL == pDeleter->sink.pInputDataBlockDesc) {
9,225!
2946
      code = code;
×
2947
    }
2948
  }
2949

2950
  if (TSDB_CODE_SUCCESS == code) {
9,225!
2951
    *pSink = (SDataSinkNode*)pDeleter;
9,225✔
2952
  } else {
2953
    nodesDestroyNode((SNode*)pDeleter);
×
2954
  }
2955

2956
  return TSDB_CODE_SUCCESS;
9,225✔
2957
}
2958

2959
static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
9,225✔
2960
  int32_t code =
2961
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
9,225✔
2962
  if (TSDB_CODE_SUCCESS == code) {
9,225!
2963
    code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
9,225✔
2964
  }
2965
  pSubplan->msgType = TDMT_VND_DELETE;
9,225✔
2966
  return code;
9,225✔
2967
}
2968

2969
static int32_t buildVnodeModifySubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pSubplan) {
1,759,597✔
2970
  int32_t                code = TSDB_CODE_SUCCESS;
1,759,597✔
2971
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
1,759,597✔
2972
  switch (pModify->modifyType) {
1,759,597!
2973
    case MODIFY_TABLE_TYPE_INSERT:
1,750,375✔
2974
      code = buildInsertSubplan(pCxt, pModify, pSubplan);
1,750,375✔
2975
      break;
1,750,378✔
2976
    case MODIFY_TABLE_TYPE_DELETE:
9,225✔
2977
      code = buildDeleteSubplan(pCxt, pModify, pSubplan);
9,225✔
2978
      break;
9,225✔
2979
    default:
×
2980
      code = TSDB_CODE_FAILED;
×
2981
      break;
×
2982
  }
2983
  return code;
1,759,600✔
2984
}
2985

2986
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
3,955,307✔
2987
  SSubplan* pSubplan = NULL;
3,955,307✔
2988
  int32_t code = makeSubplan(pCxt, pLogicSubplan, &pSubplan);
3,955,307✔
2989
  if (NULL == pSubplan) {
3,955,316!
2990
    return code;
×
2991
  }
2992

2993
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
3,955,316✔
2994
    code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
1,759,602✔
2995
  } else {
2996
    if (SUBPLAN_TYPE_SCAN == pSubplan->subplanType) {
2,195,714✔
2997
      pSubplan->msgType = TDMT_SCH_QUERY;
1,561,708✔
2998
    } else {
2999
      pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
634,006✔
3000
    }
3001
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
2,195,714✔
3002
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
2,195,703!
3003
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
2,193,964✔
3004
    }
3005
  }
3006

3007
  if (TSDB_CODE_SUCCESS == code) {
3,955,300!
3008
    *pPhysiSubplan = pSubplan;
3,955,300✔
3009
  } else {
3010
    nodesDestroyNode((SNode*)pSubplan);
×
3011
  }
3012

3013
  return code;
3,955,299✔
3014
}
3015

3016
static int32_t makeQueryPhysiPlan(SPhysiPlanContext* pCxt, SQueryPlan** ppQueryPlan) {
2,456,757✔
3017
  SQueryPlan* pPlan = NULL;
2,456,757✔
3018
  int32_t code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN, (SNode**)&pPlan);
2,456,757✔
3019
  if (NULL == pPlan) {
2,456,760!
3020
    return code;
×
3021
  }
3022
  pPlan->pSubplans = NULL;
2,456,760✔
3023
  code = nodesMakeList(&pPlan->pSubplans);
2,456,760✔
3024
  if (NULL == pPlan->pSubplans) {
2,456,762✔
3025
    nodesDestroyNode((SNode*)pPlan);
5✔
3026
    return code;
×
3027
  }
3028
  pPlan->queryId = pCxt->pPlanCxt->queryId;
2,456,757✔
3029
  *ppQueryPlan = pPlan;
2,456,757✔
3030
  return code;
2,456,757✔
3031
}
3032

3033
static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNode* pSubplan, int32_t level, SNodeList* pSubplans) {
3,955,296✔
3034
  SNodeListNode* pGroup = NULL;
3,955,296✔
3035
  if (level >= LIST_LENGTH(pSubplans)) {
3,955,296!
3036
    pGroup = NULL;
3,023,429✔
3037
    int32_t code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&pGroup);
3,023,429✔
3038
    if (NULL == pGroup) {
3,023,432!
3039
      return code;
×
3040
    }
3041
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, (SNode*)pGroup)) {
3,023,432!
3042
      return TSDB_CODE_OUT_OF_MEMORY;
×
3043
    }
3044
  } else {
3045
    pGroup = (SNodeListNode*)nodesListGetNode(pSubplans, level);
931,867✔
3046
  }
3047
  if (NULL == pGroup->pNodeList) {
3,955,302✔
3048
    int32_t code = nodesMakeList(&pGroup->pNodeList);
3,023,430✔
3049
    if (NULL == pGroup->pNodeList) {
3,023,429!
3050
      return code;
×
3051
    }
3052
  }
3053
  return nodesListAppend(pGroup->pNodeList, (SNode*)pSubplan);
3,955,301✔
3054
}
3055

3056
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
3,955,307✔
3057
                              SQueryPlan* pQueryPlan) {
3058
  SSubplan* pSubplan = NULL;
3,955,307✔
3059
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
3,955,307✔
3060

3061
  if (TSDB_CODE_SUCCESS == code) {
3,955,298!
3062
    code = pushSubplan(pCxt, (SNode*)pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
3,955,302✔
3063
    ++(pQueryPlan->numOfSubplans);
3,955,301✔
3064
  }
3065

3066
  if (TSDB_CODE_SUCCESS != code) {
3,955,297!
3067
    nodesDestroyNode((SNode*)pSubplan);
×
3068
    return code;
×
3069
  }
3070

3071
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
3,955,297!
3072
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pSubplan);
1,444,359✔
3073
    if (TSDB_CODE_SUCCESS == code) {
1,444,358!
3074
      code = nodesListMakeAppend(&pSubplan->pParents, (SNode*)pParent);
1,444,358✔
3075
    }
3076
  }
3077

3078
  if (TSDB_CODE_SUCCESS == code) {
3,955,302✔
3079
    SNode* pChild = NULL;
3,955,301✔
3080
    FOREACH(pChild, pLogicSubplan->pChildren) {
5,399,657✔
3081
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
1,444,361✔
3082
      if (TSDB_CODE_SUCCESS != code) {
1,444,356!
3083
        break;
×
3084
      }
3085
    }
3086
  }
3087

3088
  return code;
3,955,297✔
3089
}
3090

3091
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
2,456,761✔
3092
  SQueryPlan* pPlan = NULL;
2,456,761✔
3093
  int32_t code = makeQueryPhysiPlan(pCxt, &pPlan);
2,456,761✔
3094
  if (NULL == pPlan) {
2,456,757!
3095
    return code;
×
3096
  }
3097

3098
  SNode* pSubplan = NULL;
2,456,757✔
3099
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
4,967,699!
3100
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
2,510,951✔
3101
    if (TSDB_CODE_SUCCESS != code) {
2,510,942!
3102
      break;
×
3103
    }
3104
  }
3105

3106
  if (TSDB_CODE_SUCCESS == code) {
2,456,748!
3107
    *pPhysiPlan = pPlan;
2,456,752✔
3108
  } else {
3109
    nodesDestroyNode((SNode*)pPlan);
×
3110
  }
3111

3112
  return code;
2,456,753✔
3113
}
3114

3115
static void destoryLocationHash(void* p) {
8,931,834✔
3116
  SHashObj*   pHash = *(SHashObj**)p;
8,931,834✔
3117
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
8,931,834✔
3118
  while (NULL != pIndex) {
27,920,374✔
3119
    taosArrayDestroy(pIndex->pSlotIdsInfo);
18,988,546✔
3120
    pIndex = taosHashIterate(pHash, pIndex);
18,988,527✔
3121
  }
3122
  taosHashCleanup(pHash);
8,931,828✔
3123
}
8,931,860✔
3124

3125
static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
2,456,748✔
3126
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
2,456,748✔
3127
  taosArrayDestroyEx(pCxt->pProjIdxLocHelper, destoryLocationHash);
2,456,759✔
3128
}
2,456,760✔
3129

3130
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
2,456,745✔
3131
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
2,456,745✔
3132
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
131,595✔
3133
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
131,595✔
3134
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
131,595✔
3135
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
131,595✔
3136
  } else {
3137
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
2,325,150✔
3138
  }
3139
}
2,456,745✔
3140

3141
static int32_t setExecNodeList(SPhysiPlanContext* pCxt, SArray* pExecNodeList) {
2,456,747✔
3142
  int32_t code = 0;
2,456,747✔
3143
  if (NULL == pExecNodeList) {
2,456,747✔
3144
    return code;
1,442✔
3145
  }
3146
  if (pCxt->hasSysScan || !pCxt->hasScan) {
2,455,305✔
3147
    SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
1,717,798✔
3148
    if (NULL == taosArrayPush(pExecNodeList, &node))
1,717,799!
3149
      code = terrno;
×
3150
  }
3151
  return code;
2,455,306✔
3152
}
3153

3154
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
2,456,746✔
3155
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
7,370,268✔
3156
                           .errCode = TSDB_CODE_SUCCESS,
3157
                           .nextDataBlockId = 0,
3158
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
2,456,746✔
3159
                           .pProjIdxLocHelper = taosArrayInit(32, POINTER_BYTES),
2,456,760✔
3160
                           .hasScan = false,
3161
                           .hasSysScan = false};
3162
  if (NULL == cxt.pLocationHelper || !cxt.pProjIdxLocHelper) {
2,456,762!
3163
    taosArrayDestroy(cxt.pLocationHelper);
×
3164
    taosArrayDestroy(cxt.pProjIdxLocHelper);
×
3165
    return terrno;
×
3166
  }
3167

3168
  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
2,456,762✔
3169
  if (TSDB_CODE_SUCCESS == code) {
2,456,753✔
3170
    setExplainInfo(pCxt, *pPlan);
2,456,745✔
3171
    code = setExecNodeList(&cxt, pExecNodeList);
2,456,747✔
3172
  }
3173

3174
  destoryPhysiPlanContext(&cxt);
2,456,759✔
3175
  return code;
2,456,760✔
3176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc