• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.6
/source/libs/planner/src/planPhysiCreater.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "nodes.h"
17
#include "planInt.h"
18

19
#include "catalog.h"
20
#include "functionMgt.h"
21
#include "systable.h"
22
#include "tglobal.h"
23

24
typedef struct SSlotIdInfo {
25
  int16_t slotId;
26
  bool    set;
27
} SSlotIdInfo;
28

29
typedef struct SSlotIndex {
30
  int16_t dataBlockId;
31
  SArray* pSlotIdsInfo;  // duplicate name slot
32
} SSlotIndex;
33

34
enum {
35
  SLOT_KEY_TYPE_ALL = 1,
36
  SLOT_KEY_TYPE_COLNAME = 2,
37
  SLOT_KEY_TYPE_REF = 3,
38
};
39

40
static int32_t getSlotKeyHelper(SNode* pNode, const char* pPreName, const char* name, char** ppKey, int32_t callocLen,
1,057,744✔
41
                                int32_t* pLen, uint16_t extraBufLen, int8_t slotKeyType) {
42
  int32_t code = 0;
1,057,744✔
43
  *ppKey = taosMemoryCalloc(1, callocLen);
1,057,744!
44
  if (!*ppKey) {
1,057,529!
45
    return terrno;
×
46
  }
47
  if (slotKeyType == SLOT_KEY_TYPE_ALL) {
1,057,529✔
48
    TAOS_STRNCAT(*ppKey, pPreName, TSDB_TABLE_NAME_LEN);
676,886✔
49
    TAOS_STRNCAT(*ppKey, ".", 2);
676,886✔
50
    TAOS_STRNCAT(*ppKey, name, TSDB_COL_NAME_LEN);
676,886✔
51
    *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
1,353,971✔
52
  } else {
53
    TAOS_STRNCAT(*ppKey, name, TSDB_COL_NAME_LEN);
380,643✔
54
    *pLen = strlen(*ppKey);
380,643✔
55
  }
56

57
  return code;
1,057,728✔
58
}
59

60
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int32_t* pLen, uint16_t extraBufLen) {
1,175,478✔
61
  int32_t code = 0;
1,175,478✔
62
  int32_t callocLen = 0;
1,175,478✔
63
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
1,175,478✔
64
    SColumnNode* pCol = (SColumnNode*)pNode;
1,037,693✔
65
    if (NULL != pStmtName) {
1,037,693✔
66
      if ('\0' != pStmtName[0]) {
55,102✔
67
        callocLen = TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
54,616✔
68
        return getSlotKeyHelper(pNode, pStmtName, pCol->node.aliasName, ppKey, callocLen, pLen, extraBufLen,
54,616✔
69
                                SLOT_KEY_TYPE_ALL);
70
      } else {
71
        callocLen = TSDB_COL_NAME_LEN + 1 + extraBufLen;
486✔
72
        return getSlotKeyHelper(pNode, pStmtName, pCol->node.aliasName, ppKey, callocLen, pLen, extraBufLen,
486✔
73
                                SLOT_KEY_TYPE_COLNAME);
74
      }
75
    }
76
    if ('\0' == pCol->tableAlias[0]) {
982,591✔
77
      callocLen = TSDB_COL_NAME_LEN + 1 + extraBufLen;
248,462✔
78
      return getSlotKeyHelper(pNode, pStmtName, pCol->colName, ppKey, callocLen, pLen, extraBufLen,
248,462✔
79
                              SLOT_KEY_TYPE_COLNAME);
80
    }
81
    if (pCol->hasRef) {
734,129✔
82
      *ppKey = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
34,014!
83
      if (!*ppKey) {
34,014!
84
        return terrno;
×
85
      }
86
      TAOS_STRNCAT(*ppKey, pCol->refDbName, TSDB_DB_NAME_LEN);
34,014✔
87
      TAOS_STRNCAT(*ppKey, ".", 2);
34,014✔
88
      TAOS_STRNCAT(*ppKey, pCol->refTableName, TSDB_TABLE_NAME_LEN);
34,014✔
89
      TAOS_STRNCAT(*ppKey, ".", 2);
34,014✔
90
      TAOS_STRNCAT(*ppKey, pCol->refColName, TSDB_COL_NAME_LEN);
34,014✔
91
      *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
34,014✔
92
      return code;
34,014✔
93
    }
94
    if (pCol->hasDep) {
700,115✔
95
      *ppKey = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
83,700!
96
      if (!*ppKey) {
83,700!
97
        return terrno;
×
98
      }
99
      TAOS_STRNCAT(*ppKey, pCol->dbName, TSDB_DB_NAME_LEN);
83,700✔
100
      TAOS_STRNCAT(*ppKey, ".", 2);
83,700✔
101
      TAOS_STRNCAT(*ppKey, pCol->tableAlias, TSDB_TABLE_NAME_LEN);
83,700✔
102
      TAOS_STRNCAT(*ppKey, ".", 2);
83,700✔
103
      TAOS_STRNCAT(*ppKey, pCol->colName, TSDB_COL_NAME_LEN);
83,700✔
104
      *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
83,700✔
105
      return code;
83,700✔
106
    }
107
    callocLen = TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
616,415✔
108
    return getSlotKeyHelper(pNode, pCol->tableAlias, pCol->colName, ppKey, callocLen, pLen, extraBufLen,
616,415✔
109
                            SLOT_KEY_TYPE_ALL);
110
  } else if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
137,785✔
111
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
132,553✔
112
    if (FUNCTION_TYPE_TBNAME == pFunc->funcType) {
132,553✔
113
      SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 0);
1,816✔
114
      if (pVal) {
1,816!
115
        if (NULL != pStmtName && '\0' != pStmtName[0]) {
×
116
          callocLen = TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
×
117
          return getSlotKeyHelper(pNode, pStmtName, ((SExprNode*)pNode)->aliasName, ppKey, callocLen, pLen, extraBufLen,
×
118
                                  SLOT_KEY_TYPE_ALL);
119
        }
120
        int32_t literalLen = strlen(pVal->literal);
×
121
        callocLen = literalLen + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
×
122
        return getSlotKeyHelper(pNode, pVal->literal, ((SExprNode*)pNode)->aliasName, ppKey, callocLen, pLen,
×
123
                                extraBufLen, SLOT_KEY_TYPE_ALL);
124
      }
125
    }
126
  }
127

128
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
137,785✔
129
    callocLen = TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
6,003✔
130
    return getSlotKeyHelper(pNode, pStmtName, ((SExprNode*)pNode)->aliasName, ppKey, callocLen, pLen, extraBufLen,
6,003✔
131
                            SLOT_KEY_TYPE_ALL);
132
  }
133

134
  callocLen = TSDB_COL_NAME_LEN + 1 + extraBufLen;
131,782✔
135
  return getSlotKeyHelper(pNode, pStmtName, ((SExprNode*)pNode)->aliasName, ppKey, callocLen, pLen, extraBufLen,
131,782✔
136
                          SLOT_KEY_TYPE_COLNAME);
137
  return code;
138
}
139

140
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId,
467,598✔
141
                             bool output, bool reserve) {
142
  SSlotDescNode* pSlot = NULL;
467,598✔
143
  int32_t        code = nodesMakeNode(QUERY_NODE_SLOT_DESC, (SNode**)&pSlot);
467,598✔
144
  if (NULL == pSlot) {
467,639✔
145
    terrno = code;
6✔
146
    return NULL;
×
147
  }
148
  snprintf(pSlot->name, sizeof(pSlot->name), "%s", pName);
467,633✔
149
  pSlot->slotId = slotId;
467,633✔
150
  pSlot->dataType = ((SExprNode*)pNode)->resType;
467,633✔
151
  pSlot->reserve = reserve;
467,633✔
152
  pSlot->output = output;
467,633✔
153
  return (SNode*)pSlot;
467,633✔
154
}
155

156
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
393,616✔
157
  STargetNode* pTarget = NULL;
393,616✔
158
  int32_t      code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTarget);
393,616✔
159
  if (NULL == pTarget) {
393,631!
160
    return code;
×
161
  }
162

163
  pTarget->dataBlockId = dataBlockId;
393,631✔
164
  pTarget->slotId = slotId;
393,631✔
165
  pTarget->pExpr = pNode;
393,631✔
166

167
  *pOutput = (SNode*)pTarget;
393,631✔
168
  return TSDB_CODE_SUCCESS;
393,631✔
169
}
170

171
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
530,284✔
172
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
530,284✔
173
  if (NULL != pIndex) {
530,286✔
174
    SSlotIdInfo info = {.slotId = slotId, .set = false};
12✔
175
    if (NULL == taosArrayPush(pIndex->pSlotIdsInfo, &info)) {
24!
176
      return terrno;
×
177
    }
178
    return TSDB_CODE_SUCCESS;
12✔
179
  }
180

181
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
530,274✔
182
  if (NULL == index.pSlotIdsInfo) {
530,270!
183
    return terrno;
×
184
  }
185
  SSlotIdInfo info = {.slotId = slotId, .set = false};
530,270✔
186
  if (NULL == taosArrayPush(index.pSlotIdsInfo, &info)) {
1,060,550!
187
    return terrno;
×
188
  }
189
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
530,280✔
190
}
191

192
static int32_t putSlotToHash(const char* pName, int32_t len, int16_t dataBlockId, int16_t slotId, SNode* pNode,
527,794✔
193
                             SHashObj* pHash) {
194
  return putSlotToHashImpl(dataBlockId, slotId, pName, len, pHash);
527,794✔
195
}
196

197
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
96,659✔
198
                                       SHashObj** pDescHash, SHashObj** ppProjIdxDescHash) {
199
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
96,659✔
200
  if (NULL == pHash) {
96,661!
201
    return terrno;
×
202
  }
203
  SHashObj* pProjIdxHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
96,661✔
204
  if (!pProjIdxHash) {
96,664!
205
    taosHashCleanup(pHash);
×
206
    return terrno;
×
207
  }
208
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
96,664!
209
    taosHashCleanup(pHash);
×
210
    taosHashCleanup(pProjIdxHash);
×
211
    return terrno;
×
212
  }
213
  if (NULL == taosArrayInsert(pCxt->pProjIdxLocHelper, dataBlockId, &pProjIdxHash)) {
96,656!
214
    taosHashCleanup(pHash);
×
215
    taosHashCleanup(pProjIdxHash);
×
216
    return terrno;
×
217
  }
218

219
  *pDescHash = pHash;
96,660✔
220
  *ppProjIdxDescHash = pProjIdxHash;
96,660✔
221
  return TSDB_CODE_SUCCESS;
96,660✔
222
}
223

224
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
96,656✔
225
                                   SHashObj* pHash, SHashObj* pProjIdxDescHash) {
226
  pDataBlockDesc->pSlots = NULL;
96,656✔
227
  int32_t code = nodesMakeList(&pDataBlockDesc->pSlots);
96,656✔
228
  if (NULL == pDataBlockDesc->pSlots) {
96,675!
229
    return code;
×
230
  }
231

232
  int16_t slotId = 0;
96,675✔
233
  SNode*  pNode = NULL;
96,675✔
234
  FOREACH(pNode, pList) {
561,823!
235
    char*   name = NULL;
465,151✔
236
    int32_t len = 0;
465,151✔
237
    code = getSlotKey(pNode, NULL, &name, &len, 16);
465,151✔
238
    if (TSDB_CODE_SUCCESS == code) {
465,110!
239
      code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pNode, slotId, true, false));
465,115✔
240
    }
241
    code = putSlotToHash(name, len, pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
465,151✔
242
    if (TSDB_CODE_SUCCESS == code) {
465,161!
243
      if (nodeType(pNode) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode)->resIdx > 0) {
465,165!
244
        snprintf(name + strlen(name), 16, "_%d", ((SColumnNode*)pNode)->resIdx);
62,666✔
245
        code = putSlotToHash(name, strlen(name), pDataBlockDesc->dataBlockId, slotId, pNode, pProjIdxDescHash);
62,666✔
246
      }
247
    }
248
    taosMemoryFree(name);
465,162!
249
    if (TSDB_CODE_SUCCESS == code) {
465,148!
250
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
465,148✔
251
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
465,148✔
252
      ++slotId;
465,148✔
253
    } else {
254
      break;
×
255
    }
256
  }
257
  return code;
96,672✔
258
}
259

260
static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
96,662✔
261
  SDataBlockDescNode* pDesc = NULL;
96,662✔
262
  int32_t             code = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC, (SNode**)&pDesc);
96,662✔
263
  if (NULL == pDesc) {
96,671!
264
    return code;
×
265
  }
266
  pDesc->dataBlockId = pCxt->nextDataBlockId++;
96,671✔
267

268
  SHashObj* pHash = NULL;
96,671✔
269
  SHashObj* pProjIdxHash = NULL;
96,671✔
270
  code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash, &pProjIdxHash);
96,671✔
271
  if (TSDB_CODE_SUCCESS == code) {
96,653!
272
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash, pProjIdxHash);
96,656✔
273
  }
274

275
  if (TSDB_CODE_SUCCESS == code) {
96,673!
276
    *pDataBlockDesc = pDesc;
96,673✔
277
  } else {
278
    nodesDestroyNode((SNode*)pDesc);
×
279
  }
280

281
  return code;
96,673✔
282
}
283

284
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
391,137✔
285
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
391,137✔
286
  for (int32_t i = 0; i < size; ++i) {
397,066✔
287
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
391,147✔
288
    if (!pInfo->set) {
391,147✔
289
      pInfo->set = true;
385,214✔
290
      return pInfo->slotId;
385,214✔
291
    }
292
  }
293
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
5,919✔
294
}
295

296
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
123,258✔
297
                                     const char* pStmtName, bool output, bool reserve) {
298
  if (NULL == pList) {
123,258✔
299
    return TSDB_CODE_SUCCESS;
34,524✔
300
  }
301

302
  int32_t   code = TSDB_CODE_SUCCESS;
88,734✔
303
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
88,734✔
304
  int16_t   nextSlotId = LIST_LENGTH(pDataBlockDesc->pSlots), slotId = 0;
88,749!
305
  SNode*    pNode = NULL;
88,749✔
306
  FOREACH(pNode, pList) {
482,378!
307
    SNode*  pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
393,607!
308
    char*   name = NULL;
393,607✔
309
    int32_t len = 0;
393,607✔
310
    code = getSlotKey(pExpr, pStmtName, &name, &len, 0);
393,607✔
311
    if (TSDB_CODE_SUCCESS == code) {
393,623✔
312
      SSlotIndex* pIndex = taosHashGet(pHash, name, len);
393,620✔
313
      if (NULL == pIndex) {
393,628✔
314
        code = nodesListStrictAppend(pDataBlockDesc->pSlots,
2,485✔
315
                                     createSlotDesc(pCxt, name, pExpr, nextSlotId, output, reserve));
316
        if (TSDB_CODE_SUCCESS == code) {
2,485!
317
          code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
2,485✔
318
        }
319
        pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
2,485✔
320
        if (output) {
2,485✔
321
          pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
1,184✔
322
        }
323
        slotId = nextSlotId;
2,485✔
324
        ++nextSlotId;
2,485✔
325
      } else {
326
        slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
391,143✔
327
      }
328
    }
329

330
    taosMemoryFree(name);
393,622!
331
    if (TSDB_CODE_SUCCESS == code) {
393,627!
332
      SNode* pTarget = NULL;
393,627✔
333
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
393,627✔
334
      if (TSDB_CODE_SUCCESS == code) {
393,629✔
335
        REPLACE_NODE(pTarget);
393,628✔
336
      }
337
    }
338

339
    if (TSDB_CODE_SUCCESS != code) {
393,629!
340
      break;
×
341
    }
342
  }
343
  return code;
88,771✔
344
}
345

346
static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
110,161✔
347
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, true);
110,161✔
348
}
349

350
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
1,081✔
351
  if (NULL == pNode || NULL == *pNode) {
1,081!
352
    return TSDB_CODE_SUCCESS;
×
353
  }
354

355
  SNodeList* pList = NULL;
1,081✔
356
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
1,081✔
357
  if (TSDB_CODE_SUCCESS == code) {
1,081!
358
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
1,081✔
359
  }
360
  if (TSDB_CODE_SUCCESS == code) {
1,081!
361
    *pNode = nodesListGetNode(pList, 0);
1,081✔
362
  }
363
  nodesClearList(pList);
1,081✔
364
  return code;
1,081✔
365
}
366

367
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
11,922✔
368
                                           SDataBlockDescNode* pDataBlockDesc) {
369
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, true);
11,922✔
370
}
371

372
static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
1,176✔
373
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true, true);
1,176✔
374
}
375

376
typedef struct SSetSlotIdCxt {
377
  int32_t   errCode;
378
  SHashObj* pLeftHash;
379
  SHashObj* pLeftProjIdxHash;
380
  SHashObj* pRightHash;
381
  SHashObj* pRightProdIdxHash;
382
} SSetSlotIdCxt;
383

384
typedef struct SMultiTableSetSlotIdCxt {
385
  int32_t    errCode;
386
  SArray*    hashArray;
387
  SArray*    projIdxHashArray;
388
  SNodeList* pChild;
389
  bool       isVtb;
390
} SMultiTableSetSlotIdCxt;
391

392
static void dumpSlots(const char* pName, SHashObj* pHash) {
×
393
  if (NULL == pHash) {
×
394
    return;
×
395
  }
396
  planDebug("%s", pName);
×
397
  void* pIt = taosHashIterate(pHash, NULL);
×
398
  while (NULL != pIt) {
×
399
    size_t len = 0;
×
400
    char*  pKey = taosHashGetKey(pIt, &len);
×
401
    char   name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
×
402
    strncpy(name, pKey, len);
×
403
    planDebug("\tslot name = %s", name);
×
404
    pIt = taosHashIterate(pHash, pIt);
×
405
  }
406
}
407

408
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
1,108,557✔
409
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
1,108,557!
410
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
308,719✔
411
    char*          name = NULL;
308,719✔
412
    int32_t        len = 0;
308,719✔
413
    pCxt->errCode = getSlotKey(pNode, NULL, &name, &len, 64);
308,719✔
414
    if (TSDB_CODE_SUCCESS != pCxt->errCode) {
308,724!
415
      return DEAL_RES_ERROR;
×
416
    }
417
    SSlotIndex* pIndex = NULL;
308,724✔
418
    if (((SColumnNode*)pNode)->projRefIdx > 0) {
308,724✔
419
      snprintf(name + strlen(name), 16, "_%d", ((SColumnNode*)pNode)->projRefIdx);
2,636✔
420
      pIndex = taosHashGet(pCxt->pLeftProjIdxHash, name, strlen(name));
2,636✔
421
      if (!pIndex) {
2,636✔
422
        pIndex = taosHashGet(pCxt->pRightProdIdxHash, name, strlen(name));
742✔
423
      }
424
    }
425

426
    if (NULL == pIndex) {
308,724✔
427
      name[len] = 0;
306,213✔
428
      pIndex = taosHashGet(pCxt->pLeftHash, name, len);
306,213✔
429
      if (NULL == pIndex) {
306,209✔
430
        pIndex = taosHashGet(pCxt->pRightHash, name, len);
5,311✔
431
      }
432
    }
433
    // pIndex is definitely not NULL, otherwise it is a bug
434
    if (NULL == pIndex) {
308,714!
435
      planError("doSetSlotId failed, invalid slot name %s", name);
×
436
      dumpSlots("left datablock desc", pCxt->pLeftHash);
×
437
      dumpSlots("right datablock desc", pCxt->pRightHash);
×
438
      pCxt->errCode = TSDB_CODE_PLAN_SLOT_NOT_FOUND;
×
439
      taosMemoryFree(name);
×
440
      return DEAL_RES_ERROR;
×
441
    }
442
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
308,714✔
443
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
308,714✔
444
    taosMemoryFree(name);
308,708!
445
    return DEAL_RES_IGNORE_CHILD;
308,728✔
446
  }
447
  return DEAL_RES_CONTINUE;
799,838✔
448
}
449

450
static EDealRes doSetMultiTableSlotId(SNode* pNode, void* pContext) {
10,298✔
451
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
10,298!
452
    SMultiTableSetSlotIdCxt* pCxt = (SMultiTableSetSlotIdCxt*)pContext;
10,298✔
453
    char*                    name = NULL;
10,298✔
454
    int32_t                  len = 0;
10,298✔
455
    SColumnNode*             pCol = (SColumnNode*)pNode;
10,298✔
456
    if (pCxt->isVtb && !pCol->hasRef && pCol->colType != COLUMN_TYPE_TAG && '\0' != pCol->tableAlias[0]) {
10,298✔
457
      // set slot id for :
458
      // 1. column with ref
459
      // 2. tag column
460
      // 3. pseduo column function
461
      return DEAL_RES_CONTINUE;
2,236✔
462
    }
463

464
    pCxt->errCode = getSlotKey(pNode, NULL, &name, &len, 16);
8,062✔
465
    if (TSDB_CODE_SUCCESS != pCxt->errCode) {
8,062!
466
      return DEAL_RES_ERROR;
×
467
    }
468
    SSlotIndex* pIndex = NULL;
8,062✔
469
    int32_t idx = 0;
8,062✔
470
    if (pCol->projRefIdx > 0) {
8,062!
471
      sprintf(name + strlen(name), "_%d", pCol->projRefIdx);
×
472
      while (!pIndex && idx < LIST_LENGTH(pCxt->pChild)) {
×
473
        SHashObj *tmpHash =
474
            taosArrayGetP(pCxt->projIdxHashArray,
×
475
                          ((SPhysiNode*)nodesListGetNode(pCxt->pChild, idx))->pOutputDataBlockDesc->dataBlockId);
×
476
        pIndex = taosHashGet(tmpHash, name, strlen(name));
×
477
        idx++;
×
478
      }
479
    } else {
480
      while (!pIndex && idx < LIST_LENGTH(pCxt->pChild)) {
41,118!
481
        SHashObj *tmpHash =
482
            taosArrayGetP(pCxt->hashArray,
33,056✔
483
                          ((SPhysiNode*)nodesListGetNode(pCxt->pChild, idx))->pOutputDataBlockDesc->dataBlockId);
33,056✔
484
        pIndex = taosHashGet(tmpHash, name, len);
33,056✔
485
        idx++;
33,056✔
486
      }
487
    }
488
    // pIndex is definitely not NULL, otherwise it is a bug
489
    if (NULL == pIndex) {
8,062!
490
      planError("doSetMultiTableSlotId failed, invalid slot name %s", name);
×
491
      for (int32_t i = 0; i < taosArrayGetSize(pCxt->hashArray); i++) {
×
492
        //dumpSlots("vtable datablock desc", taosArrayGetP(pCxt->hashArray, i));
493
      }
494
      pCxt->errCode = TSDB_CODE_PLAN_SLOT_NOT_FOUND;
×
495
      taosMemoryFree(name);
×
496
      return DEAL_RES_ERROR;
×
497
    }
498
    pCol->dataBlockId = pIndex->dataBlockId;
8,062✔
499
    pCol->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
8,062✔
500
    taosMemoryFree(name);
8,062!
501
    return DEAL_RES_IGNORE_CHILD;
8,062✔
502
  }
503
  return DEAL_RES_CONTINUE;
×
504
}
505

506
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
15,677✔
507
                             SNode** pOutput) {
508
  if (NULL == pNode) {
15,677!
509
    return TSDB_CODE_SUCCESS;
×
510
  }
511

512
  SNode*  pRes = NULL;
15,677✔
513
  int32_t code = nodesCloneNode(pNode, &pRes);
15,677✔
514
  if (NULL == pRes) {
15,677!
515
    return code;
×
516
  }
517

518
  SSetSlotIdCxt cxt = {
78,387✔
519
      .errCode = TSDB_CODE_SUCCESS,
520
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
15,677✔
521
      .pLeftProjIdxHash = taosArrayGetP(pCxt->pProjIdxLocHelper, leftDataBlockId),
15,678✔
522
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)),
15,678✔
523
      .pRightProdIdxHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pProjIdxLocHelper, rightDataBlockId))};
15,677✔
524
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
15,677✔
525
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
15,677!
526
    nodesDestroyNode(pRes);
×
527
    return cxt.errCode;
×
528
  }
529

530
  *pOutput = pRes;
15,677✔
531
  return TSDB_CODE_SUCCESS;
15,677✔
532
}
533

534
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
46,188✔
535
                             const SNodeList* pList, SNodeList** pOutput) {
536
  if (NULL == pList) {
46,188✔
537
    return TSDB_CODE_SUCCESS;
19✔
538
  }
539

540
  SNodeList* pRes = NULL;
46,169✔
541
  int32_t    code = nodesCloneList(pList, &pRes);
46,169✔
542
  if (NULL == pRes) {
46,185✔
543
    return code;
95✔
544
  }
545

546
  SSetSlotIdCxt cxt = {
230,442✔
547
      .errCode = TSDB_CODE_SUCCESS,
548
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
46,090✔
549
      .pLeftProjIdxHash = taosArrayGetP(pCxt->pProjIdxLocHelper, leftDataBlockId),
46,091✔
550
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)),
46,091✔
551
      .pRightProdIdxHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pProjIdxLocHelper, rightDataBlockId))};
46,085✔
552
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
46,085✔
553
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
46,085!
554
    nodesDestroyList(pRes);
×
555
    return cxt.errCode;
×
556
  }
557
  *pOutput = pRes;
46,086✔
558
  return TSDB_CODE_SUCCESS;
46,086✔
559
}
560

561
static int32_t setMultiBlockSlotId(SPhysiPlanContext* pCxt, SNodeList* pChild, bool isVtb, const SNodeList* pList,
2,413✔
562
                                   SNodeList** pOutput) {
563
  int32_t code = TSDB_CODE_SUCCESS;
2,413✔
564
  if (NULL == pList) {
2,413✔
565
    PLAN_RET(code);
3!
566
  }
567

568
  SNodeList* pRes = NULL;
2,410✔
569
  PLAN_ERR_JRET(nodesCloneList(pList, &pRes));
2,410!
570

571
  SMultiTableSetSlotIdCxt cxt = {
2,410✔
572
    .errCode = TSDB_CODE_SUCCESS,
573
    .hashArray = pCxt->pLocationHelper,
2,410✔
574
    .projIdxHashArray = pCxt->pProjIdxLocHelper,
2,410✔
575
    .pChild = pChild,
576
    .isVtb = isVtb
577
  };
578

579
  nodesWalkExprs(pRes, doSetMultiTableSlotId, &cxt);
2,410✔
580
  PLAN_ERR_JRET(cxt.errCode);
2,410!
581

582
  *pOutput = pRes;
2,410✔
583
  return code;
2,410✔
584
_return:
×
585
  nodesDestroyList(pRes);
×
586
  return code;
×
587
}
588

589
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
96,647✔
590
  SPhysiNode* pPhysiNode = NULL;
96,647✔
591
  int32_t     code = nodesMakeNode(type, (SNode**)&pPhysiNode);
96,647✔
592
  if (NULL == pPhysiNode) {
96,672!
593
    terrno = code;
×
594
    return NULL;
×
595
  }
596

597
  TSWAP(pPhysiNode->pLimit, pLogicNode->pLimit);
96,672✔
598
  TSWAP(pPhysiNode->pSlimit, pLogicNode->pSlimit);
96,672✔
599
  pPhysiNode->dynamicOp = pLogicNode->dynamicOp;
96,672✔
600
  pPhysiNode->inputTsOrder = pLogicNode->inputTsOrder;
96,672✔
601
  pPhysiNode->outputTsOrder = pLogicNode->outputTsOrder;
96,672✔
602

603
  code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
96,672✔
604
  if (TSDB_CODE_SUCCESS != code) {
96,669!
605
    nodesDestroyNode((SNode*)pPhysiNode);
×
606
    terrno = code;
×
607
    return NULL;
1✔
608
  }
609
  pPhysiNode->pOutputDataBlockDesc->precision = pLogicNode->precision;
96,676✔
610
  return pPhysiNode;
96,676✔
611
}
612

613
static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
94,618✔
614
  if (NULL != pLogicNode->pConditions) {
94,618✔
615
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
4,393✔
616
                         &pPhysiNode->pConditions);
617
  }
618
  return TSDB_CODE_SUCCESS;
90,225✔
619
}
620

621
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
355,014✔
622
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
355,014✔
623
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
355,014✔
624
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
355,014✔
625
}
626

627
static int32_t sortScanCols(SNodeList* pScanCols) {
39,869✔
628
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
39,869!
629
  if (NULL == pArray) {
39,866!
630
    return terrno;
×
631
  }
632

633
  int32_t code = 0;
39,867✔
634
  SNode*  pCol = NULL;
39,867✔
635
  FOREACH(pCol, pScanCols) {
205,648!
636
    if (NULL == taosArrayPush(pArray, &pCol)) {
165,781!
637
      code = terrno;
×
638
      break;
×
639
    }
640
  }
641
  if (TSDB_CODE_SUCCESS != code) {
39,865!
642
    taosArrayDestroy(pArray);
×
643
    return code;
×
644
  }
645
  taosArraySort(pArray, colIdCompare);
39,865✔
646

647
  int32_t index = 0;
39,861✔
648
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
205,647!
649
  taosArrayDestroy(pArray);
39,855✔
650

651
  return TSDB_CODE_SUCCESS;
39,864✔
652
}
653

654
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
37,861✔
655
  if (NULL == pScanCols) {
37,861✔
656
    return TSDB_CODE_SUCCESS;
299✔
657
  }
658

659
  pScanPhysiNode->pScanCols = NULL;
37,562✔
660
  int32_t code = nodesCloneList(pScanCols, &pScanPhysiNode->pScanCols);
37,562✔
661
  if (NULL == pScanPhysiNode->pScanCols) {
37,573!
662
    return code;
×
663
  }
664
  return sortScanCols(pScanPhysiNode->pScanCols);
37,573✔
665
}
666

667
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
37,860✔
668
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
669
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
37,860✔
670
  if (TSDB_CODE_SUCCESS == code) {
37,863!
671
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
37,863✔
672
  }
673

674
  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
37,871✔
675
    pScanPhysiNode->pScanPseudoCols = NULL;
3,730✔
676
    code = nodesCloneList(pScanLogicNode->pScanPseudoCols, &pScanPhysiNode->pScanPseudoCols);
3,730✔
677
  }
678

679
  if (TSDB_CODE_SUCCESS == code) {
37,871!
680
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
37,872✔
681
  }
682

683
  if (TSDB_CODE_SUCCESS == code) {
37,872!
684
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
37,873✔
685
  }
686

687
  if (TSDB_CODE_SUCCESS == code) {
37,871!
688
    pScanPhysiNode->uid = pScanLogicNode->tableId;
37,873✔
689
    pScanPhysiNode->suid = pScanLogicNode->stableId;
37,873✔
690
    pScanPhysiNode->tableType = pScanLogicNode->tableType;
37,873✔
691
    pScanPhysiNode->groupOrderScan = pScanLogicNode->groupOrderScan;
37,873✔
692
    pScanPhysiNode->virtualStableScan = pScanLogicNode->virtualStableScan;
37,873✔
693
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
37,873✔
694
    if (NULL != pScanLogicNode->pTagCond) {
37,873✔
695
      pSubplan->pTagCond = NULL;
1,253✔
696
      code = nodesCloneNode(pScanLogicNode->pTagCond, &pSubplan->pTagCond);
1,253✔
697
    }
698
  }
699

700
  if (TSDB_CODE_SUCCESS == code) {
37,871✔
701
    if (NULL != pScanLogicNode->pTagIndexCond) {
37,864✔
702
      pSubplan->pTagIndexCond = NULL;
272✔
703
      code = nodesCloneNode(pScanLogicNode->pTagIndexCond, &pSubplan->pTagIndexCond);
272✔
704
    }
705
  }
706

707
  if (TSDB_CODE_SUCCESS == code) {
37,870!
708
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
37,870✔
709
  } else {
710
    nodesDestroyNode((SNode*)pScanPhysiNode);
×
711
  }
712

713
  return code;
37,869✔
714
}
715

716
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
39,733✔
717
  pNodeAddr->nodeId = vg->vgId;
39,733✔
718
  pNodeAddr->epSet = vg->epSet;
39,733✔
719
}
39,733✔
720

721
static ENodeType getScanOperatorType(EScanType scanType) {
35,670✔
722
  switch (scanType) {
35,670!
723
    case SCAN_TYPE_TAG:
×
724
      return QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
×
725
    case SCAN_TYPE_TABLE:
33,221✔
726
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
33,221✔
727
    case SCAN_TYPE_STREAM:
306✔
728
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
306✔
729
    case SCAN_TYPE_TABLE_MERGE:
2,134✔
730
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
2,134✔
731
    case SCAN_TYPE_BLOCK_INFO:
9✔
732
      return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
9✔
733
    case SCAN_TYPE_TABLE_COUNT:
×
734
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN;
×
735
    default:
×
736
      break;
×
737
  }
738
  return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
739
}
740

741
static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
9✔
742
                                         SPhysiNode** pPhyNode) {
743
  SScanPhysiNode* pScan =
744
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, getScanOperatorType(pScanLogicNode->scanType));
9✔
745
  if (NULL == pScan) {
9!
746
    return terrno;
×
747
  }
748

749
  if (pScanLogicNode->pVgroupList) {
9!
750
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
9✔
751
  }
752
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
9✔
753
}
754

755
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
305✔
756
                                      SPhysiNode** pPhyNode) {
757
  STagScanPhysiNode* pScan =
758
      (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
305✔
759
  if (NULL == pScan) {
305!
760
    return terrno;
×
761
  }
762
  if (pScanLogicNode->pVgroupList) {
305✔
763
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
304✔
764
  }
765
  pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx;
305✔
766

767
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
305✔
768
}
769

770
static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
792✔
771
                                          SPhysiNode** pPhyNode) {
772
  SLastRowScanPhysiNode* pScan =
773
      (SLastRowScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN);
792✔
774
  if (NULL == pScan) {
792!
775
    return terrno;
×
776
  }
777
  pScan->pTargets = NULL;
792✔
778
  int32_t code = nodesCloneList(pScanLogicNode->node.pTargets, &pScan->pTargets);
792✔
779
  if (TSDB_CODE_SUCCESS != code) {
792!
780
    nodesDestroyNode((SNode*)pScan);
×
781
    return code;
×
782
  }
783
  pScan->pGroupTags = NULL;
792✔
784
  code = nodesCloneList(pScanLogicNode->pGroupTags, &pScan->pGroupTags);
792✔
785
  if (TSDB_CODE_SUCCESS != code) {
792!
786
    nodesDestroyNode((SNode*)pScan);
×
787
    return code;
×
788
  }
789

790
  pScan->groupSort = pScanLogicNode->groupSort;
792✔
791
  pScan->ignoreNull = pScanLogicNode->igLastNull;
792✔
792

793
  if (pScanLogicNode->pVgroupList) {
792!
794
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
792✔
795
  }
796

797
  code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
792✔
798
  if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) {
792!
799
    pScan->pFuncTypes = taosArrayInit(taosArrayGetSize(pScanLogicNode->pFuncTypes), sizeof(int32_t));
96✔
800
    if (NULL == pScan->pFuncTypes) {
96!
801
      return terrno;
×
802
    }
803

804
    SNode* pTargetNode = NULL;
96✔
805
    int    funcTypeIndex = 0;
96✔
806
    FOREACH(pTargetNode, ((SScanPhysiNode*)pScan)->pScanCols) {
5,280!
807
      if (((STargetNode*)pTargetNode)->pExpr->type != QUERY_NODE_COLUMN) {
5,184!
808
        continue;
×
809
      }
810
      SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pTargetNode)->pExpr;
5,184✔
811

812
      for (int i = 0; i < TARRAY_SIZE(pScanLogicNode->pFuncTypes); ++i) {
145,056!
813
        SFunctParam* pFunctParam = taosArrayGet(pScanLogicNode->pFuncTypes, i);
145,056✔
814
        if (pColNode->colId == pFunctParam->pCol->colId &&
145,056✔
815
            0 == strncmp(pColNode->colName, pFunctParam->pCol->name, strlen(pColNode->colName))) {
7,776✔
816
          if (NULL == taosArrayInsert(pScan->pFuncTypes, funcTypeIndex, &pFunctParam->type)) {
5,184!
817
            code = terrno;
×
818
          }
819
          break;
5,184✔
820
        }
821
      }
822
      funcTypeIndex++;
5,184✔
823
    }
824
  }
825
  return code;
792✔
826
}
827

828
static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
×
829
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
830
  STableCountScanPhysiNode* pScan = (STableCountScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
×
831
                                                                             QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN);
832
  if (NULL == pScan) {
×
833
    return terrno;
×
834
  }
835

836
  pScan->pGroupTags = NULL;
×
837
  int32_t code = nodesCloneList(pScanLogicNode->pGroupTags, &pScan->pGroupTags);
×
838
  if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
×
839
    nodesDestroyNode((SNode*)pScan);
×
840
    return code;
×
841
  }
842

843
  pScan->groupSort = pScanLogicNode->groupSort;
×
844
  if (pScanLogicNode->pVgroupList) {
×
845
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
×
846
  }
847
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
×
848
}
849

850
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
35,660✔
851
                                        SPhysiNode** pPhyNode) {
852
  STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
35,660✔
853
                                                                        getScanOperatorType(pScanLogicNode->scanType));
854
  if (NULL == pTableScan) {
35,677!
855
    return terrno;
×
856
  }
857

858
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
35,677✔
859
  pTableScan->scanRange = pScanLogicNode->scanRange;
35,677✔
860
  pTableScan->ratio = pScanLogicNode->ratio;
35,677✔
861
  if (pScanLogicNode->pVgroupList) {
35,677✔
862
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
35,366✔
863
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
35,363✔
864
  }
865
  (void)tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
35,674✔
866
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
35,673✔
867
  pTableScan->pDynamicScanFuncs = NULL;
35,673✔
868
  int32_t code = nodesCloneList(pScanLogicNode->pDynamicScanFuncs, &pTableScan->pDynamicScanFuncs);
35,673✔
869
  if (TSDB_CODE_SUCCESS != code) {
35,673!
870
    nodesDestroyNode((SNode*)pTableScan);
×
871
    return code;
×
872
  }
873
  pTableScan->pGroupTags = NULL;
35,673✔
874
  code = nodesCloneList(pScanLogicNode->pGroupTags, &pTableScan->pGroupTags);
35,673✔
875
  if (TSDB_CODE_SUCCESS != code) {
35,671!
876
    nodesDestroyNode((SNode*)pTableScan);
×
877
    return code;
×
878
  }
879
  code = nodesCloneNode(pScanLogicNode->pTimeRange, &pTableScan->pTimeRange);
35,671✔
880
  if (TSDB_CODE_SUCCESS != code) {
35,673!
881
    nodesDestroyNode((SNode*)pTableScan);
×
882
    return code;
×
883
  }
884

885
  pTableScan->groupSort = pScanLogicNode->groupSort;
35,673✔
886
  pTableScan->interval = pScanLogicNode->interval;
35,673✔
887
  pTableScan->offset = pScanLogicNode->offset;
35,673✔
888
  pTableScan->sliding = pScanLogicNode->sliding;
35,673✔
889
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
35,673✔
890
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
35,673✔
891
  pTableScan->triggerType = pScanLogicNode->triggerType;
35,673✔
892
  pTableScan->watermark = pScanLogicNode->watermark;
35,673✔
893
  pTableScan->igExpired = pScanLogicNode->igExpired;
35,673✔
894
  pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
35,673✔
895
  pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
35,673✔
896
  pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
35,673✔
897
  pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
35,673✔
898
  pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
35,673✔
899
  pTableScan->smallDataTsSort = pScanLogicNode->smallDataTsSort;
35,673✔
900

901
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);;
35,673✔
902
}
903

904
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
1,088✔
905
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
906
  SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
1,088✔
907
                                                                               QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
908
  if (NULL == pScan) {
1,089!
909
    return terrno;
×
910
  }
911

912
  pSubplan->showRewrite = pScanLogicNode->showRewrite;
1,089✔
913
  pScan->showRewrite = pScanLogicNode->showRewrite;
1,089✔
914
  pScan->accountId = pCxt->pPlanCxt->acctId;
1,089✔
915
  pScan->sysInfo = pCxt->pPlanCxt->sysInfo;
1,089✔
916
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
1,089✔
917
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) ||
710✔
918
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS) ||
211✔
919
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_DISK_USAGE) ||
208!
920
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_FILESETS) ||
208!
921
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_VC_COLS)) {
208!
922
    if (pScanLogicNode->pVgroupList) {
881!
923
      vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
882✔
924
    }
925
  } else {
926
    pSubplan->execNode.nodeId = MNODE_HANDLE;
208✔
927
    pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
208✔
928
  }
929
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES) && pScanLogicNode->pVgroupList) {
1,091!
930
    pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
6✔
931
  } else {
932
    pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
1,085✔
933
  }
934
  (void)tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
1,091✔
935

936
  pCxt->hasSysScan = true;
1,089✔
937
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
1,089✔
938
}
939

940
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
306✔
941
                                         SPhysiNode** pPhyNode) {
942
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
306✔
943
}
944

945
static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
2,134✔
946
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
947
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
2,134✔
948
}
949

950
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
37,854✔
951
                                   SPhysiNode** pPhyNode) {
952
  pCxt->hasScan = true;
37,854✔
953

954
  switch (pScanLogicNode->scanType) {
37,854!
955
    case SCAN_TYPE_TAG:
305✔
956
      PLAN_ERR_RET(createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode));
305!
957
      break;
305✔
958
    case SCAN_TYPE_BLOCK_INFO:
9✔
959
      PLAN_ERR_RET(createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode));
9!
960
      break;
9✔
961
    case SCAN_TYPE_TABLE_COUNT:
×
962
      PLAN_ERR_RET(createTableCountScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode));
×
963
      break;
×
964
    case SCAN_TYPE_LAST_ROW:
792✔
965
      PLAN_ERR_RET(createLastRowScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode));
792!
966
      break;
792✔
967
    case SCAN_TYPE_TABLE:
33,221✔
968
      PLAN_ERR_RET(createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode));
33,221!
969
      break;
33,231✔
970
    case SCAN_TYPE_SYSTEM_TABLE:
1,091✔
971
      PLAN_ERR_RET(createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode));
1,091!
972
      break;
1,090✔
973
    case SCAN_TYPE_STREAM:
306✔
974
      PLAN_ERR_RET(createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode));
306!
975
      break;
306✔
976
    case SCAN_TYPE_TABLE_MERGE:
2,134✔
977
      PLAN_ERR_RET(createTableMergeScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode));
2,134!
978
      break;
2,134✔
979
    default:
×
980
      PLAN_ERR_RET(TSDB_CODE_FAILED);
×
981
      break;
×
982
  }
983

984
  if (pCxt->pPlanCxt->streamTriggerQuery && !pCxt->pPlanCxt->streamTriggerScanSubplan) {
37,863✔
985
    pCxt->pPlanCxt->streamTriggerScanSubplan = (SNode*)pSubplan;
1,200✔
986
  }
987
  if (pCxt->pPlanCxt->streamCalcQuery) {
37,863✔
988
    SStreamCalcScan pStreamCalcScan = {0};
810✔
989
    pStreamCalcScan.vgList = taosArrayInit(1, sizeof(int32_t));
810✔
990
    if (NULL == taosArrayPush(pStreamCalcScan.vgList, &pSubplan->execNode.nodeId)) {
1,620!
991
      PLAN_ERR_RET(terrno);
×
992
    }
993
    pStreamCalcScan.scanPlan = (void*)pSubplan;
810✔
994
    if (pScanLogicNode->placeholderType == SP_PARTITION_ROWS) {
810✔
995
      pStreamCalcScan.readFromCache = true;
6✔
996
    }
997
    if (NULL == taosArrayPush(pCxt->pPlanCxt->pStreamCalcVgArray, &pStreamCalcScan)) {
1,620!
998
      PLAN_ERR_RET(terrno);
×
999
    }
1000
  }
1001

1002
  return TSDB_CODE_SUCCESS;
37,863✔
1003
}
1004

1005
static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) {
5,238✔
1006
  if (2 == pChildren->length) {
5,238!
1007
    *ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc;
5,238✔
1008
  } else if (1 == pChildren->length &&
×
1009
             nodeType(nodesListGetNode(pChildren, 0)) == QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE) {
×
1010
    SGroupCachePhysiNode* pGrpCache = (SGroupCachePhysiNode*)nodesListGetNode(pChildren, 0);
×
1011
    *ppDesc = ((SPhysiNode*)nodesListGetNode(pGrpCache->node.pChildren, idx))->pOutputDataBlockDesc;
×
1012
  } else {
1013
    planError("Invalid join children num:%d or child type:%d", pChildren->length,
×
1014
              nodeType(nodesListGetNode(pChildren, 0)));
1015
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1016
  }
1017

1018
  return TSDB_CODE_SUCCESS;
5,238✔
1019
}
1020

1021
static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SNodeList** ppLeft,
302✔
1022
                            SNodeList** ppRight) {
1023
  int32_t code = 0;
302✔
1024
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) {
604!
1025
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
302✔
1026
    SNode*         pNew = NULL;
302✔
1027
    if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
302✔
1028
      code = nodesCloneNode(pOp->pLeft, &pNew);
174✔
1029
      if (TSDB_CODE_SUCCESS == code) {
174!
1030
        code = nodesListMakeStrictAppend(ppLeft, pNew);
174✔
1031
      }
1032
    } else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
128!
1033
      code = nodesCloneNode(pOp->pLeft, &pNew);
128✔
1034
      if (TSDB_CODE_SUCCESS == code) {
128!
1035
        code = nodesListMakeStrictAppend(ppRight, pNew);
128✔
1036
      }
1037
    } else {
1038
      planError("invalid col equal list, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId);
×
1039
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1040
    }
1041
    if (TSDB_CODE_SUCCESS == code) {
302!
1042
      pNew = NULL;
302✔
1043
      if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
302✔
1044
        code = nodesCloneNode(pOp->pRight, &pNew);
128✔
1045
        if (TSDB_CODE_SUCCESS == code) {
128!
1046
          code = nodesListMakeStrictAppend(ppLeft, pNew);
128✔
1047
        }
1048
      } else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
174!
1049
        code = nodesCloneNode(pOp->pRight, &pNew);
174✔
1050
        if (TSDB_CODE_SUCCESS == code) {
174!
1051
          code = nodesListMakeStrictAppend(ppRight, pNew);
174✔
1052
        }
1053
      } else {
1054
        planError("invalid col equal list, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId);
×
1055
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1056
      }
1057
    }
1058
  } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEqCond) &&
×
1059
             ((SLogicConditionNode*)pEqCond)->condType == LOGIC_COND_TYPE_AND) {
×
1060
    SLogicConditionNode* pLogic = (SLogicConditionNode*)pEqCond;
×
1061
    SNode*               pNode = NULL;
×
1062
    FOREACH(pNode, pLogic->pParameterList) {
×
1063
      int32_t code = setColEqList(pNode, leftBlkId, rightBlkId, ppLeft, ppRight);
×
1064
      if (code) {
×
1065
        return code;
×
1066
      }
1067
    }
1068
  } else {
1069
    planError("invalid col equal cond, type:%d", nodeType(pEqCond));
×
1070
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1071
  }
1072

1073
  return code;
302✔
1074
}
1075

1076
static int32_t setMergeJoinPrimColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId,
2,619✔
1077
                                         SSortMergeJoinPhysiNode* pJoin, SJoinLogicNode* pJoinLogicNode) {
1078
  int32_t code = 0;
2,619✔
1079
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
2,619!
1080
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
2,619✔
1081
    if (pOp->opType != OP_TYPE_EQUAL && JOIN_STYPE_ASOF != subType) {
2,619!
1082
      planError("invalid primary cond opType, opType:%d", pOp->opType);
×
1083
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1084
    }
1085

1086
    switch (nodeType(pOp->pLeft)) {
2,619!
1087
      case QUERY_NODE_COLUMN: {
2,139✔
1088
        SColumnNode* pCol = (SColumnNode*)pOp->pLeft;
2,139✔
1089
        if (leftBlkId == pCol->dataBlockId) {
2,139✔
1090
          pJoin->leftPrimSlotId = pCol->slotId;
1,661✔
1091
          pJoin->asofOpType = pOp->opType;
1,661✔
1092
        } else if (rightBlkId == pCol->dataBlockId) {
478!
1093
          pJoin->rightPrimSlotId = pCol->slotId;
478✔
1094
        } else {
1095
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1096
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1097
        }
1098
        break;
2,139✔
1099
      }
1100
      case QUERY_NODE_VALUE: {
478✔
1101
        if (pJoinLogicNode && pJoinLogicNode->leftConstPrimGot) {
478!
1102
          pJoin->leftPrimExpr = NULL;
478✔
1103
          code = nodesCloneNode(pOp->pLeft, &pJoin->leftPrimExpr);
478✔
1104
          break;
478✔
1105
        }
1106
        
1107
        planError("value node got in prim eq left cond, rightType:%d", pOp->pRight ? nodeType(pOp->pRight) : 0);
×
1108
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1109
      }
1110
      case QUERY_NODE_FUNCTION: {
2✔
1111
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft;
2✔
1112
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
2!
1113
          planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType);
×
1114
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1115
        }
1116
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
2✔
1117
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
2!
1118
          planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
×
1119
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1120
        }
1121
        SColumnNode* pCol = (SColumnNode*)pParam;
2✔
1122
        if (leftBlkId == pCol->dataBlockId) {
2!
1123
          pJoin->leftPrimSlotId = pCol->slotId;
2✔
1124
          pJoin->asofOpType = pOp->opType;
2✔
1125
          pJoin->leftPrimExpr = NULL;
2✔
1126
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
2✔
1127
        } else if (rightBlkId == pCol->dataBlockId) {
×
1128
          pJoin->rightPrimSlotId = pCol->slotId;
×
1129
          pJoin->rightPrimExpr = NULL;
×
1130
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
×
1131
        } else {
1132
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1133
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1134
        }
1135
        break;
2✔
1136
      }
1137
      default:
×
1138
        planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft));
×
1139
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1140
    }
1141
    if (TSDB_CODE_SUCCESS != code) {
2,619!
1142
      return code;
×
1143
    }
1144
    switch (nodeType(pOp->pRight)) {
2,619!
1145
      case QUERY_NODE_COLUMN: {
2,061✔
1146
        SColumnNode* pCol = (SColumnNode*)pOp->pRight;
2,061✔
1147
        if (leftBlkId == pCol->dataBlockId) {
2,061✔
1148
          pJoin->leftPrimSlotId = pCol->slotId;
478✔
1149
          pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
478✔
1150
        } else if (rightBlkId == pCol->dataBlockId) {
1,583!
1151
          pJoin->rightPrimSlotId = pCol->slotId;
1,583✔
1152
        } else {
1153
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1154
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1155
        }
1156
        break;
2,061✔
1157
      }
1158
      case QUERY_NODE_VALUE: {
556✔
1159
        if (pJoinLogicNode && pJoinLogicNode->rightConstPrimGot) {
556!
1160
          pJoin->rightPrimExpr = NULL;
556✔
1161
          code = nodesCloneNode(pOp->pRight, &pJoin->rightPrimExpr);
556✔
1162
          break;
556✔
1163
        }
1164
        
1165
        planError("value node got in prim eq right cond, leftType:%d", pOp->pLeft ? nodeType(pOp->pLeft) : 0);
×
1166
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1167
      }      
1168
      case QUERY_NODE_FUNCTION: {
2✔
1169
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight;
2✔
1170
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
2!
1171
          planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType);
×
1172
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1173
        }
1174
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
2✔
1175
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
2!
1176
          planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
×
1177
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1178
        }
1179
        SColumnNode* pCol = (SColumnNode*)pParam;
2✔
1180
        if (leftBlkId == pCol->dataBlockId) {
2!
1181
          pJoin->leftPrimSlotId = pCol->slotId;
×
1182
          pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
×
1183
          pJoin->leftPrimExpr = NULL;
×
1184
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
×
1185
        } else if (rightBlkId == pCol->dataBlockId) {
2!
1186
          pJoin->rightPrimSlotId = pCol->slotId;
2✔
1187
          pJoin->rightPrimExpr = NULL;
2✔
1188
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
2✔
1189
        } else {
1190
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1191
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1192
        }
1193
        break;
2✔
1194
      }
1195
      default:
×
1196
        planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight));
×
1197
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1198
    }
1199
  } else {
1200
    planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
×
1201
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1202
  }
1203

1204
  return code;
2,619✔
1205
}
1206

1207
static int32_t removePrimColFromJoinTargets(SNodeList* pTargets, SValueNode* pPrimExpr, SColumnNode** ppRemoved) {
1,034✔
1208
  int32_t code = TSDB_CODE_SUCCESS;
1,034✔
1209
  SNode* pNode = NULL;
1,034✔
1210
  FOREACH(pNode, pTargets) {
2,615!
1211
    SColumnNode* pCol = (SColumnNode*)pNode;
2,330✔
1212
    if (0 == strcmp(pCol->tableAlias, pPrimExpr->node.srcTable) && 0 == strcmp(pCol->colName, pPrimExpr->node.aliasName)) {
2,330✔
1213
      code = nodesCloneNode(pNode, (SNode**)ppRemoved);
749✔
1214
      ERASE_NODE(pTargets);
749✔
1215
      break;
749✔
1216
    }
1217
  }
1218

1219
  return code;
1,034✔
1220
}
1221

1222
static int32_t appendPrimColToJoinTargets(SSortMergeJoinPhysiNode* pJoin, SColumnNode** ppTarget, STargetNode* primExpr, int16_t blkId) {
749✔
1223
  SColumnNode* pCol = *ppTarget;
749✔
1224
  if (TSDB_DATA_TYPE_TIMESTAMP != pCol->node.resType.type) {
749!
1225
    planError("primary key output type is not ts, type:%d", pCol->node.resType.type);
×
1226
    return TSDB_CODE_PAR_PRIM_KEY_MUST_BE_TS;
×
1227
  }
1228
  pCol->dataBlockId = blkId;
749✔
1229
  pCol->slotId = primExpr->slotId;
749✔
1230
  int32_t code = nodesListMakeStrictAppend(&pJoin->pTargets, (SNode *)pCol);
749✔
1231
  if (TSDB_CODE_SUCCESS == code) {
749!
1232
    *ppTarget = NULL;
749✔
1233
  }
1234

1235
  return code;
749✔
1236
}
1237

1238
static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
2,619✔
1239
                                        SPhysiNode** pPhyNode) {
1240
  SSortMergeJoinPhysiNode* pJoin =
1241
      (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
2,619✔
1242
  if (NULL == pJoin) {
2,619!
1243
    return terrno;
×
1244
  }
1245

1246
  SDataBlockDescNode* pLeftDesc = NULL;
2,619✔
1247
  SDataBlockDescNode* pRightDesc = NULL;
2,619✔
1248
  pJoin->joinType = pJoinLogicNode->joinType;
2,619✔
1249
  pJoin->subType = pJoinLogicNode->subType;
2,619✔
1250
  pJoin->pWindowOffset = NULL;
2,619✔
1251
  int32_t code = nodesCloneNode(pJoinLogicNode->pWindowOffset, &pJoin->pWindowOffset);
2,619✔
1252
  if (TSDB_CODE_SUCCESS == code) {
2,619!
1253
    pJoin->pJLimit = NULL;
2,619✔
1254
    code = nodesCloneNode(pJoinLogicNode->pJLimit, (SNode**)&pJoin->pJLimit);
2,619✔
1255
  }
1256
  if (TSDB_CODE_SUCCESS == code) {
2,619!
1257
    pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
2,619✔
1258
    pJoin->seqWinGroup = pJoinLogicNode->seqWinGroup;
2,619✔
1259
    pJoin->grpJoin = pJoinLogicNode->grpJoin;
2,619✔
1260
    code = getJoinDataBlockDescNode(pChildren, 0, &pLeftDesc);
2,619✔
1261
  }
1262

1263
  if (TSDB_CODE_SUCCESS == code) {
2,619!
1264
    code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc);
2,619✔
1265
  }
1266

1267
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pPrimKeyEqCond) {
2,619!
1268
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
2,619✔
1269
                         &pJoin->pPrimKeyCond);
1270
    if (TSDB_CODE_SUCCESS == code) {
2,619!
1271
      code = setMergeJoinPrimColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId,
2,619✔
1272
                                       pRightDesc->dataBlockId, pJoin, pJoinLogicNode);
2,619✔
1273
    }
1274
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
2,619!
1275
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
480✔
1276
    }
1277
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
2,619!
1278
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
558✔
1279
    }
1280
  }
1281

1282
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->addPrimEqCond) {
2,619!
1283
    SNode* pPrimKeyCond = NULL;
×
1284
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond,
×
1285
                         &pPrimKeyCond);
1286
    if (TSDB_CODE_SUCCESS == code) {
×
1287
      code = setMergeJoinPrimColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId,
×
1288
                                       pJoin, NULL);
1289
    }
1290
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
×
1291
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
×
1292
    }
1293
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
×
1294
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
×
1295
    }
1296
    nodesDestroyNode(pPrimKeyCond);
×
1297
  }
1298

1299
  SValueNode* pLeftPrimExpr = NULL, *pRightPrimExpr = NULL;
2,619✔
1300
  SColumnNode* pLeftTarget = NULL, *pRightTarget = NULL;
2,619✔
1301
  if (TSDB_CODE_SUCCESS == code && pJoinLogicNode->leftConstPrimGot && pJoin->leftPrimExpr 
2,619!
1302
      && QUERY_NODE_VALUE == nodeType(((STargetNode*)pJoin->leftPrimExpr)->pExpr)) {
478!
1303
    pLeftPrimExpr = (SValueNode*)((STargetNode*)pJoin->leftPrimExpr)->pExpr;
478✔
1304
    code = removePrimColFromJoinTargets(pJoinLogicNode->node.pTargets, pLeftPrimExpr, &pLeftTarget);
478✔
1305
  }
1306

1307
  if (TSDB_CODE_SUCCESS == code && pJoinLogicNode->rightConstPrimGot && pJoin->rightPrimExpr 
2,619!
1308
      && QUERY_NODE_VALUE == nodeType(((STargetNode*)pJoin->rightPrimExpr)->pExpr)) {
556!
1309
    pRightPrimExpr = (SValueNode*)((STargetNode*)pJoin->rightPrimExpr)->pExpr;
556✔
1310
    code = removePrimColFromJoinTargets(pJoinLogicNode->node.pTargets, pRightPrimExpr, &pRightTarget);
556✔
1311
  }
1312

1313
  if (TSDB_CODE_SUCCESS == code) {
2,619!
1314
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
2,619✔
1315
                         &pJoin->pTargets);
1316
  }
1317

1318
  if (TSDB_CODE_SUCCESS == code && pLeftPrimExpr && pLeftTarget) {
2,619!
1319
    code = appendPrimColToJoinTargets(pJoin, &pLeftTarget, (STargetNode*)pJoin->leftPrimExpr, pLeftDesc->dataBlockId);
344✔
1320
  }
1321
  if (TSDB_CODE_SUCCESS == code && pRightPrimExpr && pRightTarget) {
2,619!
1322
    code = appendPrimColToJoinTargets(pJoin, &pRightTarget, (STargetNode*)pJoin->rightPrimExpr, pRightDesc->dataBlockId);
405✔
1323
  }
1324

1325
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pFullOnCond) {
2,619!
1326
    code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pFullOnCond,
468✔
1327
                         &pJoin->pFullOnCond);
1328
  }
1329

1330
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) {
2,619!
1331
    code = mergeJoinConds(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond);
302✔
1332
  }
1333
  // TODO set from input blocks for group algo
1334
  /*
1335
    if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
1336
      code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
1337
    &pJoin->pColEqCond);
1338
    }
1339
  */
1340
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
2,619!
1341
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
302✔
1342
                         &pJoin->pColEqCond);
1343
    if (TSDB_CODE_SUCCESS == code) {
302!
1344
      code = setColEqList(pJoin->pColEqCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->pEqLeft,
302✔
1345
                          &pJoin->pEqRight);
1346
    }
1347
  }
1348

1349
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) {
2,619!
1350
    code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond);
240✔
1351
  }
1352
  // TODO set from input blocks for group algo
1353
  /*
1354
    if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
1355
      code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColOnCond,
1356
    &pJoin->pColOnCond);
1357
    }
1358
  */
1359
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
2,619!
1360
    code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pColOnCond,
240✔
1361
                         &pJoin->pColOnCond);
1362
  }
1363

1364
  if (TSDB_CODE_SUCCESS == code) {
2,619!
1365
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
2,619✔
1366
  }
1367

1368
  if (TSDB_CODE_SUCCESS == code) {
2,619!
1369
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
2,619✔
1370
  }
1371

1372
  if (TSDB_CODE_SUCCESS == code) {
2,619!
1373
    *pPhyNode = (SPhysiNode*)pJoin;
2,619✔
1374
  } else {
1375
    nodesDestroyNode((SNode*)pJoin);
×
1376
    nodesDestroyNode((SNode*)pLeftTarget);
×
1377
    nodesDestroyNode((SNode*)pRightTarget);
×
1378
  }
1379

1380
  return code;
2,619✔
1381
}
1382

1383
static int32_t extractHashJoinOpCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq, SHashJoinPhysiNode* pJoin) {
×
1384
  int32_t code = 0;
×
1385
  if (QUERY_NODE_OPERATOR == nodeType(pEq)) {
×
1386
    SOperatorNode* pOp = (SOperatorNode*)pEq;
×
1387
    SColumnNode*   pLeft = (SColumnNode*)pOp->pLeft;
×
1388
    SColumnNode*   pRight = (SColumnNode*)pOp->pRight;
×
1389
    if (lBlkId == pLeft->dataBlockId && rBlkId == pRight->dataBlockId) {
×
1390
      SNode *pL = NULL, *pR = NULL;
×
1391
      code = nodesCloneNode(pOp->pLeft, &pL);
×
1392
      if (TSDB_CODE_SUCCESS == code) {
×
1393
        code = nodesListStrictAppend(pJoin->pOnLeft, pL);
×
1394
      }
1395
      if (TSDB_CODE_SUCCESS == code) {
×
1396
        code = nodesCloneNode(pOp->pRight, &pR);
×
1397
      }
1398
      if (TSDB_CODE_SUCCESS == code) {
×
1399
        code = nodesListStrictAppend(pJoin->pOnRight, pR);
×
1400
      }
1401
    } else if (rBlkId == pLeft->dataBlockId && lBlkId == pRight->dataBlockId) {
×
1402
      SNode *pL = NULL, *pR = NULL;
×
1403
      code = nodesCloneNode(pOp->pRight, &pR);
×
1404
      if (TSDB_CODE_SUCCESS == code) {
×
1405
        code = nodesListStrictAppend(pJoin->pOnLeft, pR);
×
1406
      }
1407
      if (TSDB_CODE_SUCCESS == code) {
×
1408
        code = nodesCloneNode(pOp->pLeft, &pL);
×
1409
      }
1410
      if (TSDB_CODE_SUCCESS == code) {
×
1411
        code = nodesListStrictAppend(pJoin->pOnRight, pL);
×
1412
      }
1413
    } else {
1414
      planError("Invalid join equal cond, lbid:%d, rbid:%d, oplid:%d, oprid:%d", lBlkId, rBlkId, pLeft->dataBlockId,
×
1415
                pRight->dataBlockId);
1416
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1417
    }
1418

1419
    return code;
×
1420
  }
1421

1422
  planError("Invalid join equal node type:%d", nodeType(pEq));
×
1423
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1424
}
1425

1426
static int32_t extractHashJoinOnCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq, SHashJoinPhysiNode* pJoin) {
×
1427
  if (NULL == pEq) {
×
1428
    return TSDB_CODE_SUCCESS;
×
1429
  }
1430

1431
  int32_t code = TSDB_CODE_SUCCESS;
×
1432
  if (QUERY_NODE_OPERATOR == nodeType(pEq)) {
×
1433
    code = extractHashJoinOpCols(lBlkId, rBlkId, pEq, pJoin);
×
1434
  } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEq)) {
×
1435
    SLogicConditionNode* pLogic = (SLogicConditionNode*)pEq;
×
1436
    SNode*               pNode = NULL;
×
1437
    FOREACH(pNode, pLogic->pParameterList) {
×
1438
      code = extractHashJoinOpCols(lBlkId, rBlkId, pNode, pJoin);
×
1439
      if (code) {
×
1440
        break;
×
1441
      }
1442
    }
1443
  } else {
1444
    planError("Invalid join equal node type:%d", nodeType(pEq));
×
1445
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1446
  }
1447

1448
  return code;
×
1449
}
1450

1451
static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1, SNode* pEq2, SNode* pEq3,
×
1452
                                     SHashJoinPhysiNode* pJoin) {
1453
  int32_t code = TSDB_CODE_SUCCESS;
×
1454
  pJoin->pOnLeft = NULL;
×
1455
  code = nodesMakeList(&pJoin->pOnLeft);
×
1456
  if (TSDB_CODE_SUCCESS != code) {
×
1457
    return code;
×
1458
  }
1459
  pJoin->pOnRight = NULL;
×
1460
  code = nodesMakeList(&pJoin->pOnRight);
×
1461
  if (TSDB_CODE_SUCCESS != code) {
×
1462
    return code;
×
1463
  }
1464

1465
  code = extractHashJoinOnCols(lBlkId, rBlkId, pEq1, pJoin);
×
1466
  if (TSDB_CODE_SUCCESS == code) {
×
1467
    code = extractHashJoinOnCols(lBlkId, rBlkId, pEq2, pJoin);
×
1468
  }
1469
  if (TSDB_CODE_SUCCESS == code) {
×
1470
    code = extractHashJoinOnCols(lBlkId, rBlkId, pEq3, pJoin);
×
1471
  }
1472
  if (TSDB_CODE_SUCCESS == code && pJoin->pOnLeft->length <= 0) {
×
1473
    planError("Invalid join equal column num: %d", pJoin->pOnLeft->length);
×
1474
    code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1475
  }
1476

1477
  return code;
×
1478
}
1479

1480
static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhysiNode* pJoin) {
×
1481
  SNode*     pNode = NULL;
×
1482
  SSHashObj* pHash = tSimpleHashInit(pJoin->pTargets->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
1483
  if (NULL == pHash) {
×
1484
    return TSDB_CODE_OUT_OF_MEMORY;
×
1485
  }
1486
  SNodeList* pNew = NULL;
×
1487
  int32_t    code = nodesMakeList(&pNew);
×
1488

1489
  if (TSDB_CODE_SUCCESS == code) {
×
1490
    FOREACH(pNode, pJoin->pTargets) {
×
1491
      SColumnNode* pCol = (SColumnNode*)pNode;
×
1492
      char*        pName = NULL;
×
1493
      int32_t      len = 0;
×
1494
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
×
1495
      if (TSDB_CODE_SUCCESS == code) {
×
1496
        code = tSimpleHashPut(pHash, pName, len, &pCol, POINTER_BYTES);
×
1497
      }
1498
      taosMemoryFree(pName);
×
1499
      if (TSDB_CODE_SUCCESS != code) {
×
1500
        break;
×
1501
      }
1502
    }
1503
  }
1504
  if (TSDB_CODE_SUCCESS == code) {
×
1505
    nodesClearList(pJoin->pTargets);
×
1506
    pJoin->pTargets = pNew;
×
1507

1508
    FOREACH(pNode, pJoin->pOnLeft) {
×
1509
      char*        pName = NULL;
×
1510
      SColumnNode* pCol = (SColumnNode*)pNode;
×
1511
      int32_t      len = 0;
×
1512
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
×
1513
      if (TSDB_CODE_SUCCESS == code) {
×
1514
        SNode** p = tSimpleHashGet(pHash, pName, len);
×
1515
        if (p) {
×
1516
          code = nodesListStrictAppend(pJoin->pTargets, *p);
×
1517
          if (TSDB_CODE_SUCCESS == code) {
×
1518
            code = tSimpleHashRemove(pHash, pName, len);
×
1519
          }
1520
        }
1521
      }
1522
      taosMemoryFree(pName);
×
1523
      if (TSDB_CODE_SUCCESS != code) {
×
1524
        break;
×
1525
      }
1526
    }
1527
  }
1528
  if (TSDB_CODE_SUCCESS == code) {
×
1529
    FOREACH(pNode, pJoin->pOnRight) {
×
1530
      char*        pName = NULL;
×
1531
      SColumnNode* pCol = (SColumnNode*)pNode;
×
1532
      int32_t      len = 0;
×
1533
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
×
1534
      if (TSDB_CODE_SUCCESS == code) {
×
1535
        SNode** p = tSimpleHashGet(pHash, pName, len);
×
1536
        if (p) {
×
1537
          code = nodesListStrictAppend(pJoin->pTargets, *p);
×
1538
          if (TSDB_CODE_SUCCESS == code) {
×
1539
            code = tSimpleHashRemove(pHash, pName, len);
×
1540
          }
1541
        }
1542
      }
1543
      taosMemoryFree(pName);
×
1544
      if (TSDB_CODE_SUCCESS != code) {
×
1545
        break;
×
1546
      }
1547
    }
1548
  }
1549
  if (TSDB_CODE_SUCCESS == code) {
×
1550
    if (tSimpleHashGetSize(pHash) > 0) {
×
1551
      SNode** p = NULL;
×
1552
      int32_t iter = 0;
×
1553
      while (1) {
1554
        p = tSimpleHashIterate(pHash, p, &iter);
×
1555
        if (p == NULL) {
×
1556
          break;
×
1557
        }
1558

1559
        code = nodesListStrictAppend(pJoin->pTargets, *p);
×
1560
        if (TSDB_CODE_SUCCESS != code) {
×
1561
          break;
×
1562
        }
1563
      }
1564
    }
1565
  }
1566

1567
  tSimpleHashCleanup(pHash);
×
1568

1569
  return code;
×
1570
}
1571

1572
static int32_t setHashJoinPrimColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId,
×
1573
                                        SHashJoinPhysiNode* pJoin) {
1574
  int32_t code = 0;
×
1575
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
×
1576
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
×
1577
    if (pOp->opType != OP_TYPE_EQUAL) {
×
1578
      planError("invalid primary cond opType, opType:%d", pOp->opType);
×
1579
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1580
    }
1581

1582
    switch (nodeType(pOp->pLeft)) {
×
1583
      case QUERY_NODE_COLUMN: {
×
1584
        SColumnNode* pCol = (SColumnNode*)pOp->pLeft;
×
1585
        if (leftBlkId == pCol->dataBlockId) {
×
1586
          pJoin->leftPrimSlotId = pCol->slotId;
×
1587
        } else if (rightBlkId == pCol->dataBlockId) {
×
1588
          pJoin->rightPrimSlotId = pCol->slotId;
×
1589
        } else {
1590
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1591
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1592
        }
1593
        break;
×
1594
      }
1595
      case QUERY_NODE_FUNCTION: {
×
1596
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft;
×
1597
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
×
1598
          planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType);
×
1599
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1600
        }
1601
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1602
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
×
1603
          planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
×
1604
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1605
        }
1606
        SColumnNode* pCol = (SColumnNode*)pParam;
×
1607
        if (leftBlkId == pCol->dataBlockId) {
×
1608
          pJoin->leftPrimSlotId = pCol->slotId;
×
1609
          pJoin->leftPrimExpr = NULL;
×
1610
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
×
1611
        } else if (rightBlkId == pCol->dataBlockId) {
×
1612
          pJoin->rightPrimSlotId = pCol->slotId;
×
1613
          pJoin->rightPrimExpr = NULL;
×
1614
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
×
1615
        } else {
1616
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1617
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1618
        }
1619
        break;
×
1620
      }
1621
      default:
×
1622
        planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft));
×
1623
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1624
    }
1625
    if (TSDB_CODE_SUCCESS != code) {
×
1626
      return code;
×
1627
    }
1628
    switch (nodeType(pOp->pRight)) {
×
1629
      case QUERY_NODE_COLUMN: {
×
1630
        SColumnNode* pCol = (SColumnNode*)pOp->pRight;
×
1631
        if (leftBlkId == pCol->dataBlockId) {
×
1632
          pJoin->leftPrimSlotId = pCol->slotId;
×
1633
        } else if (rightBlkId == pCol->dataBlockId) {
×
1634
          pJoin->rightPrimSlotId = pCol->slotId;
×
1635
        } else {
1636
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1637
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1638
        }
1639
        break;
×
1640
      }
1641
      case QUERY_NODE_FUNCTION: {
×
1642
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight;
×
1643
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
×
1644
          planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType);
×
1645
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1646
        }
1647
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1648
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
×
1649
          planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
×
1650
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1651
        }
1652
        SColumnNode* pCol = (SColumnNode*)pParam;
×
1653
        if (leftBlkId == pCol->dataBlockId) {
×
1654
          pJoin->leftPrimSlotId = pCol->slotId;
×
1655
          pJoin->leftPrimExpr = NULL;
×
1656
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
×
1657
        } else if (rightBlkId == pCol->dataBlockId) {
×
1658
          pJoin->rightPrimSlotId = pCol->slotId;
×
1659
          pJoin->rightPrimExpr = NULL;
×
1660
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
×
1661
        } else {
1662
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1663
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1664
        }
1665
        break;
×
1666
      }
1667
      default:
×
1668
        planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight));
×
1669
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1670
    }
1671
  } else {
1672
    planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
×
1673
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1674
  }
1675

1676
  return code;
×
1677
}
1678

1679
static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
×
1680
                                       SPhysiNode** pPhyNode) {
1681
  SHashJoinPhysiNode* pJoin =
1682
      (SHashJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN);
×
1683
  if (NULL == pJoin) {
×
1684
    return terrno;
×
1685
  }
1686

1687
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
×
1688
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
×
1689
  int32_t             code = TSDB_CODE_SUCCESS;
×
1690

1691
  pJoin->joinType = pJoinLogicNode->joinType;
×
1692
  pJoin->subType = pJoinLogicNode->subType;
×
1693
  pJoin->pWindowOffset = NULL;
×
1694
  code = nodesCloneNode(pJoinLogicNode->pWindowOffset, &pJoin->pWindowOffset);
×
1695
  if (TSDB_CODE_SUCCESS != code) {
×
1696
    nodesDestroyNode((SNode*)pJoin);
×
1697
    return code;
×
1698
  }
1699
  pJoin->pJLimit = NULL;
×
1700
  code = nodesCloneNode(pJoinLogicNode->pJLimit, &pJoin->pJLimit);
×
1701
  if (TSDB_CODE_SUCCESS != code) {
×
1702
    nodesDestroyNode((SNode*)pJoin);
×
1703
    return code;
×
1704
  }
1705
  pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
×
1706
  pJoin->timeRangeTarget = pJoinLogicNode->timeRangeTarget;
×
1707
  pJoin->timeRange.skey = pJoinLogicNode->timeRange.skey;
×
1708
  pJoin->timeRange.ekey = pJoinLogicNode->timeRange.ekey;
×
1709

1710
  if (NULL != pJoinLogicNode->pPrimKeyEqCond) {
×
1711
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
×
1712
                         &pJoin->pPrimKeyCond);
1713
    if (TSDB_CODE_SUCCESS == code) {
×
1714
      code = setHashJoinPrimColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
×
1715
    }
1716
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
×
1717
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
×
1718
    }
1719
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
×
1720
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
×
1721
    }
1722
  }
1723
  if (TSDB_CODE_SUCCESS == code) {
×
1724
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
×
1725
                         &pJoin->pColEqCond);
1726
  }
1727
  if (TSDB_CODE_SUCCESS == code) {
×
1728
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond,
×
1729
                         &pJoin->pTagEqCond);
1730
  }
1731
  if (TSDB_CODE_SUCCESS == code) {
×
1732
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, -1, pJoinLogicNode->pLeftOnCond, &pJoin->pLeftOnCond);
×
1733
  }
1734
  if (TSDB_CODE_SUCCESS == code) {
×
1735
    code = setNodeSlotId(pCxt, -1, pRightDesc->dataBlockId, pJoinLogicNode->pRightOnCond, &pJoin->pRightOnCond);
×
1736
  }
1737
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) {
×
1738
    code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond);
×
1739
  }
1740
  SNode* pOnCond = (NULL != pJoinLogicNode->pColOnCond) ? pJoinLogicNode->pColOnCond : pJoinLogicNode->pTagOnCond;
×
1741
  if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
×
1742
    code =
1743
        setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pOnCond, &pJoin->pFullOnCond);
×
1744
  }
1745
  if (TSDB_CODE_SUCCESS == code) {
×
1746
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
×
1747
                         &pJoin->pTargets);
1748
  }
1749
  if (TSDB_CODE_SUCCESS == code) {
×
1750
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
×
1751
  }
1752
  if (TSDB_CODE_SUCCESS == code) {
×
1753
    code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond,
×
1754
                                 pJoin->pColEqCond, pJoin->pTagEqCond, pJoin);
1755
  }
1756
  if (TSDB_CODE_SUCCESS == code) {
×
1757
    code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
×
1758
  }
1759
  if (TSDB_CODE_SUCCESS == code) {
×
1760
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
×
1761
  }
1762

1763
  if (TSDB_CODE_SUCCESS == code) {
×
1764
    *pPhyNode = (SPhysiNode*)pJoin;
×
1765
  } else {
1766
    nodesDestroyNode((SNode*)pJoin);
×
1767
  }
1768

1769
  return code;
×
1770
}
1771

1772
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
2,619✔
1773
                                   SPhysiNode** pPhyNode) {
1774
  switch (pJoinLogicNode->joinAlgo) {
2,619!
1775
    case JOIN_ALGO_MERGE:
2,619✔
1776
      return createMergeJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode);
2,619✔
1777
    case JOIN_ALGO_HASH:
×
1778
      return createHashJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode);
×
1779
    default:
×
1780
      planError("Invalid join algorithm:%d", pJoinLogicNode->joinAlgo);
×
1781
      break;
×
1782
  }
1783

1784
  return TSDB_CODE_FAILED;
×
1785
}
1786

1787
static int32_t createVirtualScanCols(SPhysiPlanContext* pCxt, SVirtualScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
2,362✔
1788
  if (NULL == pScanCols) {
2,362✔
1789
    return TSDB_CODE_SUCCESS;
63✔
1790
  }
1791

1792
  pScanPhysiNode->scan.pScanCols = NULL;
2,299✔
1793
  int32_t code = nodesCloneList(pScanCols, &pScanPhysiNode->scan.pScanCols);
2,299✔
1794
  if (NULL == pScanPhysiNode->scan.pScanCols) {
2,299!
1795
    return code;
×
1796
  }
1797
  return sortScanCols(pScanPhysiNode->scan.pScanCols);
2,299✔
1798
}
1799

1800
static int32_t createVirtualTableScanPhysiNodeFinalize(SPhysiPlanContext* pCxt,
2,362✔
1801
                                                       SNodeList* pChild,
1802
                                                       SVirtualScanLogicNode* pScanLogicNode,
1803
                                                       SVirtualScanPhysiNode* pScanPhysiNode,
1804
                                                       SPhysiNode** pPhyNode) {
1805
  int32_t code = TSDB_CODE_SUCCESS;
2,362✔
1806

1807
  PLAN_ERR_JRET(createVirtualScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols));
2,362!
1808
  PLAN_ERR_JRET(addDataBlockSlots(pCxt, pScanPhysiNode->scan.pScanCols, pScanPhysiNode->scan.node.pOutputDataBlockDesc));
2,362!
1809
  if (NULL != pScanLogicNode->pScanPseudoCols) {
2,362✔
1810
    pScanPhysiNode->scan.pScanPseudoCols = NULL;
166✔
1811
    PLAN_ERR_JRET(nodesCloneList(pScanLogicNode->pScanPseudoCols, &pScanPhysiNode->scan.pScanPseudoCols));
166!
1812
    PLAN_ERR_JRET(addDataBlockSlots(pCxt, pScanPhysiNode->scan.pScanPseudoCols, pScanPhysiNode->scan.node.pOutputDataBlockDesc));
166!
1813
  }
1814

1815
  PLAN_ERR_JRET(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode));
2,362!
1816

1817
  pScanPhysiNode->scan.uid = pScanLogicNode->tableId;
2,362✔
1818
  pScanPhysiNode->scan.suid = pScanLogicNode->stableId;
2,362✔
1819
  pScanPhysiNode->scan.tableType = pScanLogicNode->tableType;
2,362✔
1820
  memcpy(&pScanPhysiNode->scan.tableName, &pScanLogicNode->tableName, sizeof(SName));
2,362✔
1821
  pScanPhysiNode->scanAllCols = pScanLogicNode->scanAllCols;
2,362✔
1822

1823
  *pPhyNode = (SPhysiNode*)pScanPhysiNode;
2,362✔
1824
  return code;
2,362✔
1825

1826
_return:
×
1827
  nodesDestroyNode((SNode*)pScanPhysiNode);
×
1828
  return code;
×
1829
}
1830

1831
static int32_t createVirtualTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SNodeList* pChildren,
2,362✔
1832
                                               SVirtualScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
1833
  int32_t                 code = TSDB_CODE_SUCCESS;
2,362✔
1834
  SVirtualScanPhysiNode * pVirtualScan =
1835
      (SVirtualScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN);
2,362✔
1836
  if (NULL == pVirtualScan) {
2,362!
1837
    return terrno;
×
1838
  }
1839

1840
  if (pScanLogicNode->pVgroupList) {
2,362!
1841
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
2,362✔
1842
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
2,362✔
1843
  }
1844

1845
  (void)tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
2,362✔
1846
  PLAN_ERR_RET(createVirtualTableScanPhysiNodeFinalize(pCxt, pChildren, pScanLogicNode, (SVirtualScanPhysiNode*)pVirtualScan, pPhyNode));
2,362!
1847
  PLAN_ERR_RET(setMultiBlockSlotId(pCxt, pChildren, true, pScanLogicNode->node.pTargets, &pVirtualScan->pTargets));
2,362!
1848
  return code;
2,362✔
1849
}
1850

1851
static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
1852
                                         SGroupCacheLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
1853
  SGroupCachePhysiNode* pGrpCache =
1854
      (SGroupCachePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE);
×
1855
  if (NULL == pGrpCache) {
×
1856
    return terrno;
×
1857
  }
1858

1859
  pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull;
×
1860
  pGrpCache->grpByUid = pLogicNode->grpByUid;
×
1861
  pGrpCache->globalGrp = pLogicNode->globalGrp;
×
1862
  pGrpCache->batchFetch = pLogicNode->batchFetch;
×
1863
  SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
×
1864
  int32_t             code = TSDB_CODE_SUCCESS;
×
1865
  /*
1866
    if (TSDB_CODE_SUCCESS == code) {
1867
      code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols);
1868
    }
1869
  */
1870

1871
  *pPhyNode = (SPhysiNode*)pGrpCache;
×
1872

1873
  return code;
×
1874
}
1875

1876
static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
1877
                                             SDynQueryCtrlLogicNode* pLogicNode, SDynQueryCtrlPhysiNode* pDynCtrl) {
1878
  SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
×
1879
  SNodeList*          pVgList = NULL;
×
1880
  SNodeList*          pUidList = NULL;
×
1881
  int32_t             code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pVgList, &pVgList);
×
1882
  if (TSDB_CODE_SUCCESS == code) {
×
1883
    code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pUidList, &pUidList);
×
1884
  }
1885
  if (TSDB_CODE_SUCCESS == code) {
×
1886
    memcpy(pDynCtrl->stbJoin.srcScan, pLogicNode->stbJoin.srcScan, sizeof(pDynCtrl->stbJoin.srcScan));
×
1887

1888
    SNode*  pNode = NULL;
×
1889
    int32_t i = 0;
×
1890
    FOREACH(pNode, pVgList) {
×
1891
      pDynCtrl->stbJoin.vgSlot[i] = ((SColumnNode*)pNode)->slotId;
×
1892
      ++i;
×
1893
    }
1894
    i = 0;
×
1895
    FOREACH(pNode, pUidList) {
×
1896
      pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId;
×
1897
      ++i;
×
1898
    }
1899
    pDynCtrl->stbJoin.batchFetch = pLogicNode->stbJoin.batchFetch;
×
1900
  }
1901
  nodesDestroyList(pVgList);
×
1902
  nodesDestroyList(pUidList);
×
1903

1904
  return code;
×
1905
}
1906

1907
static int32_t updateDynQueryCtrlVtbScanInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
1908
                                             SDynQueryCtrlLogicNode* pLogicNode, SDynQueryCtrlPhysiNode* pDynCtrl,
1909
                                             SSubplan* pSubPlan) {
1910
  int32_t code = TSDB_CODE_SUCCESS;
×
1911

1912
  if (pLogicNode->vtbScan.pVgroupList) {
×
1913
    vgroupInfoToNodeAddr(pLogicNode->vtbScan.pVgroupList->vgroups, &pSubPlan->execNode);
×
1914
    pSubPlan->execNodeStat.tableNum = pLogicNode->vtbScan.pVgroupList->vgroups[0].numOfTable;
×
1915
  }
1916

1917
  PLAN_ERR_JRET(nodesCloneList(pLogicNode->node.pTargets, &pDynCtrl->vtbScan.pScanCols));
×
1918

1919
  pDynCtrl->vtbScan.scanAllCols = pLogicNode->vtbScan.scanAllCols;
×
1920
  pDynCtrl->vtbScan.suid = pLogicNode->vtbScan.suid;
×
1921
  pDynCtrl->vtbScan.mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
×
1922
  pDynCtrl->vtbScan.accountId = pCxt->pPlanCxt->acctId;
×
1923
  tstrncpy(pDynCtrl->vtbScan.dbName, pLogicNode->vtbScan.dbName, TSDB_DB_NAME_LEN);
×
1924
  tstrncpy(pDynCtrl->vtbScan.stbName, pLogicNode->vtbScan.stbName, TSDB_TABLE_NAME_LEN);
×
1925

1926
  return code;
×
1927
_return:
×
1928
  planError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1929
  return code;
×
1930
}
1931

1932

1933
static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
1934
                                           SDynQueryCtrlLogicNode* pLogicNode, SPhysiNode** pPhyNode, SSubplan* pSubPlan) {
1935
  int32_t                 code = TSDB_CODE_SUCCESS;
×
1936
  int32_t                 lino = 0;
×
1937
  SDynQueryCtrlPhysiNode* pDynCtrl =
1938
      (SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL);
×
1939
  QUERY_CHECK_NULL(pDynCtrl, code, lino, _return, terrno);
×
1940

1941
  switch (pLogicNode->qType) {
×
1942
    case DYN_QTYPE_STB_HASH:
×
1943
      PLAN_ERR_JRET(updateDynQueryCtrlStbJoinInfo(pCxt, pChildren, pLogicNode, pDynCtrl));
×
1944
      break;
×
1945
    case DYN_QTYPE_VTB_SCAN:
×
1946
      PLAN_ERR_JRET(updateDynQueryCtrlVtbScanInfo(pCxt, pChildren, pLogicNode, pDynCtrl, pSubPlan));
×
1947
      break;
×
1948
    default:
×
1949
      PLAN_ERR_JRET(TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE);
×
1950
  }
1951

1952
  pDynCtrl->qType = pLogicNode->qType;
×
1953
  pDynCtrl->dynTbname = pLogicNode->dynTbname;
×
1954
  *pPhyNode = (SPhysiNode*)pDynCtrl;
×
1955

1956
  return code;
×
1957
_return:
×
1958
  planError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1959
  return code;
×
1960
}
1961

1962
typedef struct SRewritePrecalcExprsCxt {
1963
  int32_t    errCode;
1964
  int32_t    planNodeId;
1965
  int32_t    rewriteId;
1966
  SNodeList* pPrecalcExprs;
1967
} SRewritePrecalcExprsCxt;
1968

1969
static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
1,199✔
1970
  SNode* pExpr = NULL;
1,199✔
1971
  pCxt->errCode = nodesCloneNode(*pNode, &pExpr);
1,199✔
1972
  if (NULL == pExpr) {
1,199!
1973
    return DEAL_RES_ERROR;
×
1974
  }
1975
  if (TSDB_CODE_SUCCESS != (pCxt->errCode = nodesListAppend(pCxt->pPrecalcExprs, pExpr))) {
1,199!
1976
    nodesDestroyNode(pExpr);
×
1977
    return DEAL_RES_ERROR;
×
1978
  }
1979
  SColumnNode* pCol = NULL;
1,199✔
1980
  pCxt->errCode = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
1,199✔
1981
  if (NULL == pCol) {
1,199!
1982
    nodesDestroyNode(pExpr);
×
1983
    return DEAL_RES_ERROR;
×
1984
  }
1985
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
1,199✔
1986
  pCol->node.resType = pRewrittenExpr->resType;
1,199✔
1987
  if ('\0' != pRewrittenExpr->aliasName[0]) {
1,199✔
1988
    tstrncpy(pCol->colName, pRewrittenExpr->aliasName, TSDB_COL_NAME_LEN);
1,130✔
1989
  } else {
1990
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
69✔
1991
             pCxt->rewriteId);
1992
    tstrncpy(pCol->colName, pRewrittenExpr->aliasName, TSDB_COL_NAME_LEN);
69✔
1993
  }
1994
  nodesDestroyNode(*pNode);
1,199✔
1995
  *pNode = (SNode*)pCol;
1,199✔
1996
  return DEAL_RES_IGNORE_CHILD;
1,199✔
1997
}
1998

1999
static int32_t rewriteValueToOperator(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
412✔
2000
  SOperatorNode* pOper = NULL;
412✔
2001
  int32_t        code = nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&pOper);
412✔
2002
  if (NULL == pOper) {
412!
2003
    return code;
×
2004
  }
2005
  pOper->pLeft = NULL;
412✔
2006
  code = nodesMakeNode(QUERY_NODE_LEFT_VALUE, &pOper->pLeft);
412✔
2007
  if (NULL == pOper->pLeft) {
412!
2008
    nodesDestroyNode((SNode*)pOper);
×
2009
    return code;
×
2010
  }
2011
  SValueNode* pVal = (SValueNode*)*pNode;
412✔
2012
  pOper->node.resType = pVal->node.resType;
412✔
2013
  tstrncpy(pOper->node.aliasName, pVal->node.aliasName, TSDB_COL_NAME_LEN);
412✔
2014
  pOper->opType = OP_TYPE_ASSIGN;
412✔
2015
  pOper->pRight = *pNode;
412✔
2016
  *pNode = (SNode*)pOper;
412✔
2017
  return TSDB_CODE_SUCCESS;
412✔
2018
}
2019

2020
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
336,603✔
2021
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
336,603✔
2022
  switch (nodeType(*pNode)) {
336,603✔
2023
    case QUERY_NODE_VALUE: {
506✔
2024
      if (((SValueNode*)*pNode)->notReserved) {
506✔
2025
        break;
94✔
2026
      }
2027
      pCxt->errCode = rewriteValueToOperator(pCxt, pNode);
412✔
2028
      if (TSDB_CODE_SUCCESS != pCxt->errCode) {
412!
2029
        return DEAL_RES_ERROR;
×
2030
      }
2031
      return collectAndRewrite(pCxt, pNode);
412✔
2032
    }
2033
    case QUERY_NODE_OPERATOR:
527✔
2034
    case QUERY_NODE_LOGIC_CONDITION:
2035
    case QUERY_NODE_CASE_WHEN: {
2036
      return collectAndRewrite(pCxt, pNode);
527✔
2037
    }
2038
    case QUERY_NODE_FUNCTION: {
127,987✔
2039
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
127,987✔
2040
        return collectAndRewrite(pCxt, pNode);
255✔
2041
      }
2042
    }
2043
    default:
2044
      break;
335,323✔
2045
  }
2046
  return DEAL_RES_CONTINUE;
335,417✔
2047
}
2048

2049
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
40,843✔
2050
                                   SNodeList** pRewrittenList) {
2051
  if (NULL == pList) {
40,843✔
2052
    return TSDB_CODE_SUCCESS;
16,729✔
2053
  }
2054
  int32_t code = 0;
24,114✔
2055
  if (NULL == *pPrecalcExprs) {
24,114✔
2056
    code = nodesMakeList(pPrecalcExprs);
24,085✔
2057
    if (NULL == *pPrecalcExprs) {
24,090!
2058
      return code;
×
2059
    }
2060
  }
2061
  if (NULL == *pRewrittenList) {
24,119!
2062
    code = nodesMakeList(pRewrittenList);
24,119✔
2063
    if (NULL == *pRewrittenList) {
24,119!
2064
      return code;
×
2065
    }
2066
  }
2067
  SNode* pNode = NULL;
24,119✔
2068
  FOREACH(pNode, pList) {
156,179✔
2069
    SNode* pNew = NULL;
132,047✔
2070
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
132,047✔
2071
      code = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0), &pNew);
1,260✔
2072
    } else {
2073
      code = nodesCloneNode(pNode, &pNew);
130,787✔
2074
    }
2075
    if (NULL == pNew) {
132,048!
2076
      return code;
×
2077
    }
2078
    if (TSDB_CODE_SUCCESS != (code = nodesListAppend(*pRewrittenList, pNew))) {
132,048!
2079
      return code;
×
2080
    }
2081
  }
2082
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
24,132✔
2083
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
24,132✔
2084
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) {
24,120!
2085
    NODES_DESTORY_LIST(*pPrecalcExprs);
22,901✔
2086
  }
2087
  return cxt.errCode;
24,119✔
2088
}
2089

2090
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
55✔
2091
                                  SNode** pRewritten) {
2092
  if (NULL == pNode) {
55!
2093
    return TSDB_CODE_SUCCESS;
×
2094
  }
2095

2096
  SNodeList* pList = NULL;
55✔
2097
  int32_t    code = nodesListMakeAppend(&pList, pNode);
55✔
2098
  SNodeList* pRewrittenList = NULL;
55✔
2099
  if (TSDB_CODE_SUCCESS == code) {
55!
2100
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
55✔
2101
  }
2102
  if (TSDB_CODE_SUCCESS == code) {
55!
2103
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
55✔
2104
  }
2105
  nodesClearList(pList);
55✔
2106
  nodesClearList(pRewrittenList);
55✔
2107
  return code;
55✔
2108
}
2109

2110
static EDealRes hasCountLikeFunc(SNode* pNode, void* res) {
294,047✔
2111
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
294,047✔
2112
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
109,479✔
2113
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
109,479✔
2114
      *(bool*)res = true;
5,478✔
2115
      return DEAL_RES_END;
5,478✔
2116
    }
2117
  }
2118
  return DEAL_RES_CONTINUE;
288,574✔
2119
}
2120

2121
static bool isDynVirtualStableScan(SNode* pNode) {
23,883✔
2122
  if (NULL == pNode || QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL != nodeType(pNode)) {
23,883!
2123
    return false;
23,883✔
2124
  }
2125
  SDynQueryCtrlPhysiNode* pDynCtrl = (SDynQueryCtrlPhysiNode*)pNode;
×
2126
  if (DYN_QTYPE_VTB_SCAN != pDynCtrl->qType) {
×
2127
    return false;
×
2128
  }
2129
  if (nodesListGetNode(pDynCtrl->node.pChildren, 0)== NULL) {
×
2130
    return false;
×
2131
  }
2132
  if (QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN != nodeType(nodesListGetNode(pDynCtrl->node.pChildren, 0))) {
×
2133
    return false;
×
2134
  }
2135
  return true;
×
2136
}
2137

2138
static int32_t getChildTuple(SDataBlockDescNode **pChildTuple, SNodeList* pChildren) {
23,878✔
2139
  if (isDynVirtualStableScan(nodesListGetNode(pChildren, 0))) {
23,878!
2140
    SPhysiNode* pDynNode = (SPhysiNode*)nodesListGetNode(pChildren, 0);
×
2141
    if (pDynNode == NULL) {
×
2142
      planError("get child tuple failed, pDynNode is NULL, childrenNum:%d", pChildren->length);
×
2143
      return TSDB_CODE_INVALID_PARA;
×
2144
    }
2145
    SPhysiNode* pVtbScan = (SPhysiNode*)nodesListGetNode(pDynNode->pChildren, 0);
×
2146
    if (pVtbScan == NULL) {
×
2147
      planError("get child tuple failed, pVtbScan is NULL, dynNode childrenNum:%d", pDynNode->pChildren->length);
×
2148
      return TSDB_CODE_INVALID_PARA;
×
2149
    }
2150
    *pChildTuple = pVtbScan->pOutputDataBlockDesc;
×
2151
  } else {
2152
    SPhysiNode* pChild = (SPhysiNode*)nodesListGetNode(pChildren, 0);
23,878✔
2153
    if (pChild == NULL) {
23,882!
2154
      planError("get child tuple failed, pChild is NULL, childrenNum:%d", pChildren->length);
×
2155
      return TSDB_CODE_INVALID_PARA;
×
2156
    }
2157
    *pChildTuple = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
23,882✔
2158
  }
2159
  return TSDB_CODE_SUCCESS;
23,887✔
2160
}
2161

2162
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
17,056✔
2163
                                  SPhysiNode** pPhyNode, SSubplan* pSubPlan) {
2164
  SAggPhysiNode* pAgg =
2165
      (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
17,056✔
2166
  if (NULL == pAgg) {
17,066!
2167
    return terrno;
×
2168
  }
2169
  if (pAgg->node.pSlimit && ((SLimitNode*)pAgg->node.pSlimit)->limit) {
17,066!
2170
    pSubPlan->dynamicRowThreshold = true;
23✔
2171
    pSubPlan->rowsThreshold = ((SLimitNode*)pAgg->node.pSlimit)->limit->datum.i;
23✔
2172
  }
2173

2174
  pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
17,066✔
2175
  pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
17,066✔
2176
  pAgg->node.forceCreateNonBlockingOptr = pAggLogicNode->node.forceCreateNonBlockingOptr;
17,066✔
2177

2178
  SNodeList* pPrecalcExprs = NULL;
17,066✔
2179
  SNodeList* pGroupKeys = NULL;
17,066✔
2180
  SNodeList* pAggFuncs = NULL;
17,066✔
2181
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
17,066✔
2182
  if (TSDB_CODE_SUCCESS == code) {
17,058!
2183
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
17,061✔
2184
  }
2185
  nodesWalkExprs(pAggFuncs, hasCountLikeFunc, &pAgg->hasCountLikeFunc);
17,059✔
2186

2187
  SDataBlockDescNode* pChildTupe = NULL;
17,068✔
2188
  if (TSDB_CODE_SUCCESS == code) {
17,068!
2189
    code = getChildTuple(&pChildTupe, pChildren);
17,068✔
2190
  }
2191

2192
  // push down expression to pOutputDataBlockDesc of child node
2193
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
17,066!
2194
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
844✔
2195
    if (TSDB_CODE_SUCCESS == code) {
844!
2196
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
844✔
2197
    }
2198
  }
2199

2200
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
17,066✔
2201
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
1,277✔
2202
    if (TSDB_CODE_SUCCESS == code) {
1,277!
2203
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
1,277✔
2204
    }
2205
  }
2206

2207
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
17,066✔
2208
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
16,115✔
2209
    if (TSDB_CODE_SUCCESS == code) {
16,114!
2210
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
16,115✔
2211
    }
2212
  }
2213

2214
  if (TSDB_CODE_SUCCESS == code) {
17,068✔
2215
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
17,065✔
2216
  }
2217

2218
  if (TSDB_CODE_SUCCESS == code) {
17,065!
2219
    *pPhyNode = (SPhysiNode*)pAgg;
17,065✔
2220
  } else {
2221
    nodesDestroyNode((SNode*)pAgg);
×
2222
  }
2223

2224
  nodesDestroyList(pPrecalcExprs);
17,065✔
2225
  nodesDestroyList(pGroupKeys);
17,065✔
2226
  nodesDestroyList(pAggFuncs);
17,064✔
2227

2228
  return code;
17,064✔
2229
}
2230

2231
static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
48✔
2232
                                            SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
2233
  SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode(
48✔
2234
      pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC);
2235
  if (NULL == pIdfRowsFunc) {
48!
2236
    return terrno;
×
2237
  }
2238

2239
  SNodeList* pPrecalcExprs = NULL;
48✔
2240
  SNodeList* pFuncs = NULL;
48✔
2241
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
48✔
2242

2243
  if (pIdfRowsFunc->node.inputTsOrder == 0) {
48!
2244
    // default to asc
2245
    pIdfRowsFunc->node.inputTsOrder = TSDB_ORDER_ASC;
×
2246
  }
2247

2248
  SDataBlockDescNode* pChildTupe = NULL;
48✔
2249
  if (TSDB_CODE_SUCCESS == code) {
48!
2250
    code = getChildTuple(&pChildTupe, pChildren);
48✔
2251
  }
2252

2253
  // push down expression to pOutputDataBlockDesc of child node
2254
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
48!
2255
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs);
3✔
2256
    if (TSDB_CODE_SUCCESS == code) {
3!
2257
      code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe);
3✔
2258
    }
2259
  }
2260

2261
  if (TSDB_CODE_SUCCESS == code) {
48!
2262
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pIdfRowsFunc->pFuncs);
48✔
2263
    if (TSDB_CODE_SUCCESS == code) {
48!
2264
      code = addDataBlockSlots(pCxt, pIdfRowsFunc->pFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc);
48✔
2265
    }
2266
  }
2267

2268
  if (TSDB_CODE_SUCCESS == code) {
48!
2269
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pIdfRowsFunc);
48✔
2270
  }
2271

2272
  if (TSDB_CODE_SUCCESS == code) {
48!
2273
    *pPhyNode = (SPhysiNode*)pIdfRowsFunc;
48✔
2274
  } else {
2275
    nodesDestroyNode((SNode*)pIdfRowsFunc);
×
2276
  }
2277

2278
  nodesDestroyList(pPrecalcExprs);
48✔
2279
  nodesDestroyList(pFuncs);
48✔
2280

2281
  return code;
48✔
2282
}
2283

2284
static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
331✔
2285
                                         SInterpFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
2286
  SInterpFuncPhysiNode* pInterpFunc = (SInterpFuncPhysiNode*)makePhysiNode(
331✔
2287
      pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC);
2288
  if (NULL == pInterpFunc) {
331!
2289
    return terrno;
×
2290
  }
2291

2292
  SNodeList* pPrecalcExprs = NULL;
331✔
2293
  SNodeList* pFuncs = NULL;
331✔
2294
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
331✔
2295

2296
  SDataBlockDescNode* pChildTupe = NULL;
331✔
2297
  if (TSDB_CODE_SUCCESS == code) {
331!
2298
    code = getChildTuple(&pChildTupe, pChildren);
331✔
2299
  }
2300

2301
  // push down expression to pOutputDataBlockDesc of child node
2302
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
331!
2303
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pInterpFunc->pExprs);
×
2304
    if (TSDB_CODE_SUCCESS == code) {
×
2305
      code = pushdownDataBlockSlots(pCxt, pInterpFunc->pExprs, pChildTupe);
×
2306
    }
2307
  }
2308

2309
  if (TSDB_CODE_SUCCESS == code) {
331!
2310
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pInterpFunc->pFuncs);
331✔
2311
    if (TSDB_CODE_SUCCESS == code) {
331!
2312
      code = addDataBlockSlots(pCxt, pInterpFunc->pFuncs, pInterpFunc->node.pOutputDataBlockDesc);
331✔
2313
    }
2314
  }
2315

2316
  if (TSDB_CODE_SUCCESS == code) {
331!
2317
    pInterpFunc->timeRange = pFuncLogicNode->timeRange;
331✔
2318
    pInterpFunc->interval = pFuncLogicNode->interval;
331✔
2319
    pInterpFunc->fillMode = pFuncLogicNode->fillMode;
331✔
2320
    pInterpFunc->intervalUnit = pFuncLogicNode->intervalUnit;
331✔
2321
    pInterpFunc->precision = pFuncLogicNode->node.precision;
331✔
2322
    pInterpFunc->pFillValues = NULL;
331✔
2323
    pInterpFunc->rangeInterval = pFuncLogicNode->rangeInterval;
331✔
2324
    pInterpFunc->rangeIntervalUnit = pFuncLogicNode->rangeIntervalUnit;
331✔
2325
    code = nodesCloneNode(pFuncLogicNode->pFillValues, &pInterpFunc->pFillValues);
331✔
2326
  }
2327

2328
  if (TSDB_CODE_SUCCESS == code) {
331!
2329
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncLogicNode->pTimeSeries, &pInterpFunc->pTimeSeries);
331✔
2330
  }
2331

2332
  if (TSDB_CODE_SUCCESS == code) {
331!
2333
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pInterpFunc);
331✔
2334
  }
2335

2336
  if (TSDB_CODE_SUCCESS == code) {
331!
2337
    *pPhyNode = (SPhysiNode*)pInterpFunc;
331✔
2338
  } else {
2339
    nodesDestroyNode((SNode*)pInterpFunc);
×
2340
  }
2341

2342
  nodesDestroyList(pPrecalcExprs);
331✔
2343
  nodesDestroyList(pFuncs);
331✔
2344

2345
  return code;
331✔
2346
}
2347

2348
static int32_t createForecastFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
2349
                                           SForecastFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
2350
  SForecastFuncPhysiNode* pForecastFunc =
2351
      (SForecastFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC);
×
2352
  if (NULL == pForecastFunc) {
×
2353
    return terrno;
×
2354
  }
2355

2356
  SNodeList* pPrecalcExprs = NULL;
×
2357
  SNodeList* pFuncs = NULL;
×
2358
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
×
2359

2360
  SDataBlockDescNode* pChildTupe = NULL;
×
2361
  if (TSDB_CODE_SUCCESS == code) {
×
2362
    code = getChildTuple(&pChildTupe, pChildren);
×
2363
  }
2364

2365
  // push down expression to pOutputDataBlockDesc of child node
2366
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
×
2367
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pForecastFunc->pExprs);
×
2368
    if (TSDB_CODE_SUCCESS == code) {
×
2369
      code = pushdownDataBlockSlots(pCxt, pForecastFunc->pExprs, pChildTupe);
×
2370
    }
2371
  }
2372

2373
  if (TSDB_CODE_SUCCESS == code) {
×
2374
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pForecastFunc->pFuncs);
×
2375
    if (TSDB_CODE_SUCCESS == code) {
×
2376
      code = addDataBlockSlots(pCxt, pForecastFunc->pFuncs, pForecastFunc->node.pOutputDataBlockDesc);
×
2377
    }
2378
  }
2379

2380
  if (TSDB_CODE_SUCCESS == code) {
×
2381
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pForecastFunc);
×
2382
  }
2383

2384
  if (TSDB_CODE_SUCCESS == code) {
×
2385
    *pPhyNode = (SPhysiNode*)pForecastFunc;
×
2386
  } else {
2387
    nodesDestroyNode((SNode*)pForecastFunc);
×
2388
  }
2389

2390
  nodesDestroyList(pPrecalcExprs);
×
2391
  nodesDestroyList(pFuncs);
×
2392

2393
  return code;
×
2394
}
2395

2396
static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) {
11,924✔
2397
  if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
11,924!
2398
    return false;
×
2399
  }
2400
  if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) {
11,924✔
2401
    return true;
9,651✔
2402
  }
2403
  if (1 != LIST_LENGTH(pProject->node.pChildren)) {
2,273!
2404
    return true;
×
2405
  }
2406
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
2,273✔
2407
  return DATA_ORDER_LEVEL_GLOBAL == pChild->resultDataOrder ? true : false;
2,273✔
2408
}
2409

2410
bool projectCouldMergeUnsortDataBlock(SProjectLogicNode* pProject) {
15,537✔
2411
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
15,537✔
2412
  if (DATA_ORDER_LEVEL_GLOBAL == pChild->resultDataOrder) {
15,542✔
2413
    return false;
6,309✔
2414
  }
2415
  if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
9,233!
2416
    return false;
×
2417
  }
2418
  if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) {
9,233✔
2419
    return true;
9,156✔
2420
  }
2421
  if (1 != LIST_LENGTH(pProject->node.pChildren)) {
77!
2422
    return true;
23✔
2423
  }
2424
  return false;
54✔
2425
}
2426

2427
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
11,924✔
2428
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
2429
  SProjectPhysiNode* pProject =
2430
      (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
11,924✔
2431
  if (NULL == pProject) {
11,924!
2432
    return terrno;
×
2433
  }
2434

2435
  pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode);
11,924✔
2436
  pProject->ignoreGroupId = pProjectLogicNode->ignoreGroupId;
11,924✔
2437
  pProject->inputIgnoreGroup = pProjectLogicNode->inputIgnoreGroup;
11,924✔
2438

2439
  int32_t code = TSDB_CODE_SUCCESS;
11,924✔
2440
  if (0 == LIST_LENGTH(pChildren)) {
11,924!
2441
    code = nodesCloneList(pProjectLogicNode->pProjections, &pProject->pProjections);
8✔
2442
  } else {
2443
    code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1,
11,916✔
2444
                         pProjectLogicNode->pProjections, &pProject->pProjections);
11,916✔
2445
  }
2446
  if (TSDB_CODE_SUCCESS == code) {
11,923!
2447
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
11,923✔
2448
                                       pProject->node.pOutputDataBlockDesc);
2449
  }
2450
  if (TSDB_CODE_SUCCESS == code) {
11,924!
2451
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
11,924✔
2452
  }
2453

2454
  if (TSDB_CODE_SUCCESS == code) {
11,924!
2455
    *pPhyNode = (SPhysiNode*)pProject;
11,924✔
2456
  } else {
2457
    nodesDestroyNode((SNode*)pProject);
×
2458
  }
2459

2460
  return code;
11,924✔
2461
}
2462

2463
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
16,067✔
2464
                                         SPhysiNode** pPhyNode) {
2465
  SExchangePhysiNode* pExchange =
2466
      (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
16,067✔
2467
  if (NULL == pExchange) {
16,067!
2468
    return terrno;
×
2469
  }
2470

2471
  pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
16,067✔
2472
  pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
16,067✔
2473
  pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
16,067✔
2474
  pExchange->dynTbname = pExchangeLogicNode->dynTbname;
16,067✔
2475

2476
  int32_t code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pExchange);
16,067✔
2477
  if (TSDB_CODE_SUCCESS == code) {
16,067!
2478
    *pPhyNode = (SPhysiNode*)pExchange;
16,067✔
2479
  } else {
2480
    nodesDestroyNode((SNode*)pExchange);
×
2481
  }
2482

2483
  return code;
16,067✔
2484
}
2485

2486
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
×
2487
                                                   SPhysiNode** pPhyNode) {
2488
  SScanPhysiNode* pScan =
2489
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
×
2490
  if (NULL == pScan) {
×
2491
    return terrno;
×
2492
  }
2493

2494
  int32_t code = TSDB_CODE_SUCCESS;
×
2495

2496
  pScan->pScanCols = NULL;
×
2497
  code = nodesCloneList(pExchangeLogicNode->node.pTargets, &pScan->pScanCols);
×
2498

2499
  if (TSDB_CODE_SUCCESS == code) {
×
2500
    code = sortScanCols(pScan->pScanCols);
×
2501
  }
2502

2503
  if (TSDB_CODE_SUCCESS == code) {
×
2504
    code = sortScanCols(pScan->pScanCols);
×
2505
  }
2506
  if (TSDB_CODE_SUCCESS == code) {
×
2507
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
×
2508
  }
2509
  if (TSDB_CODE_SUCCESS == code) {
×
2510
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pScan);
×
2511
  }
2512
  if (TSDB_CODE_SUCCESS == code) {
×
2513
    SStreamScanPhysiNode* pTableScan = (SStreamScanPhysiNode*)pScan;
×
2514
    pTableScan->triggerType = pCxt->pPlanCxt->triggerType;
×
2515
  }
2516

2517
  if (TSDB_CODE_SUCCESS == code) {
×
2518
    *pPhyNode = (SPhysiNode*)pScan;
×
2519
  } else {
2520
    nodesDestroyNode((SNode*)pScan);
×
2521
  }
2522

2523
  return code;
×
2524
}
2525

2526
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
16,067✔
2527
                                       SPhysiNode** pPhyNode) {
2528
  return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
16,067✔
2529
}
2530

2531
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowPhysiNode* pWindow,
3,966✔
2532
                                             SWindowLogicNode* pWindowLogicNode) {
2533
  pWindow->triggerType = pWindowLogicNode->triggerType;
3,966✔
2534
  pWindow->watermark = pWindowLogicNode->watermark;
3,966✔
2535
  pWindow->deleteMark = pWindowLogicNode->deleteMark;
3,966✔
2536
  pWindow->igExpired = pWindowLogicNode->igExpired;
3,966✔
2537
  pWindow->indefRowsFunc = pWindowLogicNode->indefRowsFunc;
3,966✔
2538
  pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true);
3,966✔
2539
  pWindow->node.inputTsOrder = pWindowLogicNode->node.inputTsOrder;
3,966✔
2540
  pWindow->node.outputTsOrder = pWindowLogicNode->node.outputTsOrder;
3,966✔
2541
  if (nodeType(pWindow) == QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL) {
3,966✔
2542
    pWindow->node.inputTsOrder = pWindowLogicNode->node.outputTsOrder;
878✔
2543
  }
2544
  pWindow->recalculateInterval = pWindowLogicNode->recalculateInterval;
3,966✔
2545

2546
  SNodeList* pPrecalcExprs = NULL;
3,966✔
2547
  SNodeList* pFuncs = NULL;
3,966✔
2548
  SNodeList* pProjs = pWindowLogicNode->pProjs;
3,966✔
2549
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
3,966✔
2550

2551
  SDataBlockDescNode* pChildTupe = NULL;
3,966✔
2552
  if (TSDB_CODE_SUCCESS == code) {
3,966!
2553
    code = getChildTuple(&pChildTupe, pChildren);
3,966✔
2554
  }
2555
  // push down expression to pOutputDataBlockDesc of child node
2556
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
3,966!
2557
    SNodeList* pOutput;
2558
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pOutput);
9✔
2559
    if (TSDB_CODE_SUCCESS == code) {
9!
2560
      code = addDataBlockSlots(pCxt, pOutput, pChildTupe);
9✔
2561
    }
2562
    if (TSDB_CODE_SUCCESS == code) {
9!
2563
      if (pWindow->pExprs == NULL) {
9!
2564
        pWindow->pExprs = pOutput;
9✔
2565
      } else {
2566
        code = nodesListAppendList(pWindow->pExprs, pOutput);
×
2567
      }
2568
    }
2569
  }
2570

2571
  if (TSDB_CODE_SUCCESS == code) {
3,966!
2572
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
3,966✔
2573
  }
2574
  if (TSDB_CODE_SUCCESS == code && pWindowLogicNode->pTsEnd) {
3,966!
2575
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTsEnd, &pWindow->pTsEnd);
76✔
2576
  }
2577

2578
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
3,966!
2579
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
3,966✔
2580
    if (TSDB_CODE_SUCCESS == code) {
3,966!
2581
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
3,966✔
2582
    }
2583
  }
2584

2585
  if (TSDB_CODE_SUCCESS == code && NULL != pProjs) {
3,966!
2586
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pProjs, &pWindow->pProjs);
×
2587
    if (TSDB_CODE_SUCCESS == code) {
×
2588
      code = addDataBlockSlots(pCxt, pWindow->pProjs, pWindow->node.pOutputDataBlockDesc);
×
2589
    }
2590
  }
2591

2592
  if (TSDB_CODE_SUCCESS == code) {
3,966!
2593
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pWindowLogicNode, (SPhysiNode*)pWindow);
3,966✔
2594
  }
2595

2596
  nodesDestroyList(pPrecalcExprs);
3,966✔
2597
  nodesDestroyList(pFuncs);
3,966✔
2598

2599
  return code;
3,966✔
2600
}
2601

2602
static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
3,741✔
2603
  switch (windowAlgo) {
3,741!
2604
    case INTERVAL_ALGO_HASH:
2,863✔
2605
      return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
2,863✔
2606
    case INTERVAL_ALGO_MERGE:
878✔
2607
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
878✔
2608
    default:
×
2609
      break;
×
2610
  }
2611
  return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
×
2612
}
2613

2614
static ENodeType getExternalOperatorType(EWindowAlgorithm windowAlgo) {
×
2615
  switch (windowAlgo) {
×
2616
    case EXTERNAL_ALGO_HASH:
×
2617
      return QUERY_NODE_PHYSICAL_PLAN_HASH_EXTERNAL;
×
2618
    case EXTERNAL_ALGO_MERGE:
×
2619
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_EXTERNAL;
×
2620
    default:
×
2621
      break;
×
2622
  }
2623
  return QUERY_NODE_PHYSICAL_PLAN_HASH_EXTERNAL;
×
2624
}
2625

2626
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
3,741✔
2627
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2628
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
3,741✔
2629
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
2630
  if (NULL == pInterval) {
3,741!
2631
    return terrno;
×
2632
  }
2633

2634
  pInterval->interval = pWindowLogicNode->interval;
3,741✔
2635
  pInterval->offset = pWindowLogicNode->offset;
3,741✔
2636
  pInterval->sliding = pWindowLogicNode->sliding;
3,741✔
2637
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
3,741✔
2638
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;
3,741✔
2639
  pInterval->timeRange = pWindowLogicNode->timeRange;
3,741✔
2640

2641
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode);
3,741✔
2642
  if (TSDB_CODE_SUCCESS == code) {
3,741!
2643
    *pPhyNode = (SPhysiNode*)pInterval;
3,741✔
2644
  } else {
2645
    nodesDestroyNode((SNode*)pInterval);
×
2646
  }
2647

2648
  return code;
3,741✔
2649
}
2650

2651
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
76✔
2652
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2653
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
76✔
2654
      pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION);
2655
  if (NULL == pSession) {
76!
2656
    return terrno;
×
2657
  }
2658

2659
  pSession->gap = pWindowLogicNode->sessionGap;
76✔
2660

2661
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode);
76✔
2662
  if (TSDB_CODE_SUCCESS == code) {
76!
2663
    *pPhyNode = (SPhysiNode*)pSession;
76✔
2664
  } else {
2665
    nodesDestroyNode((SNode*)pSession);
×
2666
  }
2667

2668
  return code;
76✔
2669
}
2670

2671
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
55✔
2672
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2673
  ENodeType type = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE;
55✔
2674
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
55✔
2675
      pCxt, (SLogicNode*)pWindowLogicNode, type);
2676
  if (NULL == pState) {
55!
2677
    return terrno;
×
2678
  }
2679

2680
  SNodeList* pPrecalcExprs = NULL;
55✔
2681
  SNode*     pStateKey = NULL;
55✔
2682
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
55✔
2683

2684
  SDataBlockDescNode* pChildTupe = NULL;
55✔
2685
  if (TSDB_CODE_SUCCESS == code) {
55!
2686
    code = getChildTuple(&pChildTupe, pChildren);
55✔
2687
  }
2688
  // push down expression to pOutputDataBlockDesc of child node
2689
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
55!
2690
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
3✔
2691
    if (TSDB_CODE_SUCCESS == code) {
3!
2692
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
3✔
2693
    }
2694
  }
2695

2696
  if (TSDB_CODE_SUCCESS == code) {
55!
2697
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
55✔
2698
    // if (TSDB_CODE_SUCCESS == code) {
2699
    //   code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
2700
    // }
2701
  }
2702

2703
  pState->trueForLimit = pWindowLogicNode->trueForLimit;
55✔
2704

2705
  if (TSDB_CODE_SUCCESS == code) {
55!
2706
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode);
55✔
2707
  }
2708

2709
  if (TSDB_CODE_SUCCESS == code) {
55!
2710
    *pPhyNode = (SPhysiNode*)pState;
55✔
2711
  } else {
2712
    nodesDestroyNode((SNode*)pState);
×
2713
  }
2714

2715
  nodesDestroyList(pPrecalcExprs);
55✔
2716
  nodesDestroyNode(pStateKey);
55✔
2717

2718
  return code;
55✔
2719
}
2720

2721
static int32_t createEventWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
56✔
2722
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2723
  ENodeType type = QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT;
56✔
2724

2725
  SEventWinodwPhysiNode* pEvent = (SEventWinodwPhysiNode*)makePhysiNode(
56✔
2726
      pCxt, (SLogicNode*)pWindowLogicNode, type);
2727
  if (NULL == pEvent) {
56!
2728
    return terrno;
×
2729
  }
2730
  int32_t             code = TSDB_CODE_SUCCESS;
56✔
2731
  SDataBlockDescNode* pChildTupe = NULL;
56✔
2732
  if (TSDB_CODE_SUCCESS == code) {
56!
2733
    code = getChildTuple(&pChildTupe, pChildren);
56✔
2734
  }
2735
  if (TSDB_CODE_SUCCESS == code) {
56!
2736
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pStartCond, &pEvent->pStartCond);
56✔
2737
  }
2738
  if (TSDB_CODE_SUCCESS == code) {
56!
2739
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pEndCond, &pEvent->pEndCond);
56✔
2740
  }
2741
  pEvent->trueForLimit = pWindowLogicNode->trueForLimit;
56✔
2742
  if (TSDB_CODE_SUCCESS == code) {
56!
2743
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pEvent->window, pWindowLogicNode);
56✔
2744
  }
2745

2746
  if (TSDB_CODE_SUCCESS == code) {
56!
2747
    *pPhyNode = (SPhysiNode*)pEvent;
56✔
2748
  } else {
2749
    nodesDestroyNode((SNode*)pEvent);
×
2750
  }
2751

2752
  return code;
56✔
2753
}
2754

2755
static int32_t createCountWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
38✔
2756
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2757
  ENodeType type = QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT;
38✔
2758

2759
  SCountWindowPhysiNode* pCount = (SCountWindowPhysiNode*)makePhysiNode(
38✔
2760
      pCxt, (SLogicNode*)pWindowLogicNode, type);
2761
  if (NULL == pCount) {
38!
2762
    return terrno;
×
2763
  }
2764
  pCount->windowCount = pWindowLogicNode->windowCount;
38✔
2765
  pCount->windowSliding = pWindowLogicNode->windowSliding;
38✔
2766

2767
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pCount->window, pWindowLogicNode);
38✔
2768
  if (TSDB_CODE_SUCCESS == code) {
38!
2769
    *pPhyNode = (SPhysiNode*)pCount;
38✔
2770
  } else {
2771
    nodesDestroyNode((SNode*)pCount);
×
2772
  }
2773

2774
  return code;
38✔
2775
}
2776

2777
static int32_t createAnomalyWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
2778
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2779
  SAnomalyWindowPhysiNode* pAnomaly = (SAnomalyWindowPhysiNode*)makePhysiNode(
×
2780
      pCxt, (SLogicNode*)pWindowLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY);
2781
  if (NULL == pAnomaly) {
×
2782
    return terrno;
×
2783
  }
2784

2785
  SNodeList* pPrecalcExprs = NULL;
×
2786
  SNode*     pAnomalyKey = NULL;
×
2787
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pAnomalyExpr, &pPrecalcExprs, &pAnomalyKey);
×
2788

2789
  SDataBlockDescNode* pChildTupe = NULL;
×
2790
  if (TSDB_CODE_SUCCESS == code) {
×
2791
    code = getChildTuple(&pChildTupe, pChildren);
×
2792
  }
2793
  // push down expression to pOutputDataBlockDesc of child node
2794
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
×
2795
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAnomaly->window.pExprs);
×
2796
    if (TSDB_CODE_SUCCESS == code) {
×
2797
      code = addDataBlockSlots(pCxt, pAnomaly->window.pExprs, pChildTupe);
×
2798
    }
2799
  }
2800

2801
  if (TSDB_CODE_SUCCESS == code) {
×
2802
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pAnomalyKey, &pAnomaly->pAnomalyKey);
×
2803
    // if (TSDB_CODE_SUCCESS == code) {
2804
    //   code = addDataBlockSlot(pCxt, &pAnomaly->pAnomalyKey, pAnomaly->window.node.pOutputDataBlockDesc);
2805
    // }
2806
  }
2807

2808
  tstrncpy(pAnomaly->anomalyOpt, pWindowLogicNode->anomalyOpt, sizeof(pAnomaly->anomalyOpt));
×
2809

2810
  if (TSDB_CODE_SUCCESS == code) {
×
2811
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pAnomaly->window, pWindowLogicNode);
×
2812
  }
2813

2814
  if (TSDB_CODE_SUCCESS == code) {
×
2815
    *pPhyNode = (SPhysiNode*)pAnomaly;
×
2816
  } else {
2817
    nodesDestroyNode((SNode*)pAnomaly);
×
2818
  }
2819

2820
  nodesDestroyList(pPrecalcExprs);
×
2821
  nodesDestroyNode(pAnomalyKey);
×
2822

2823
  return code;
×
2824
}
2825

2826
static int32_t createExternalWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
×
2827
                                             SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2828
  SExternalWindowPhysiNode* pExternal = (SExternalWindowPhysiNode*)makePhysiNode(
×
2829
      pCxt, (SLogicNode*)pWindowLogicNode, getExternalOperatorType(pWindowLogicNode->windowAlgo));
2830
  if (NULL == pExternal) {
×
2831
    return terrno;
×
2832
  }
2833

2834
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pExternal->window, pWindowLogicNode);
×
2835
  if (TSDB_CODE_SUCCESS == code) {
×
2836
    *pPhyNode = (SPhysiNode*)pExternal;
×
2837
  } else {
2838
    nodesDestroyNode((SNode*)pExternal);
×
2839
  }
2840

2841
  return code;
×
2842
}
2843

2844
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
3,966✔
2845
                                     SPhysiNode** pPhyNode) {
2846
  switch (pWindowLogicNode->winType) {
3,966!
2847
    case WINDOW_TYPE_INTERVAL:
3,741✔
2848
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
3,741✔
2849
    case WINDOW_TYPE_SESSION:
76✔
2850
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
76✔
2851
    case WINDOW_TYPE_STATE:
55✔
2852
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
55✔
2853
    case WINDOW_TYPE_EVENT:
56✔
2854
      return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
56✔
2855
    case WINDOW_TYPE_COUNT:
38✔
2856
      return createCountWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
38✔
2857
    case WINDOW_TYPE_ANOMALY:
×
2858
      return createAnomalyWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
×
2859
    case WINDOW_TYPE_EXTERNAL:
×
2860
      return createExternalWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
×
2861
    default:
×
2862
      break;
×
2863
  }
2864
  return TSDB_CODE_FAILED;
×
2865
}
2866

2867
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
2,143✔
2868
                                   SPhysiNode** pPhyNode) {
2869
  SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(
2,143✔
2870
      pCxt, (SLogicNode*)pSortLogicNode,
2871
      pSortLogicNode->groupSort ? QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT : QUERY_NODE_PHYSICAL_PLAN_SORT);
2,143✔
2872
  if (NULL == pSort) {
2,143!
2873
    return terrno;
×
2874
  }
2875

2876
  SNodeList* pPrecalcExprs = NULL;
2,143✔
2877
  SNodeList* pSortKeys = NULL;
2,143✔
2878
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
2,143✔
2879
  pSort->calcGroupId = pSortLogicNode->calcGroupId;
2,143✔
2880
  pSort->excludePkCol = pSortLogicNode->excludePkCol;
2,143✔
2881

2882
  SDataBlockDescNode* pChildTupe = NULL;
2,143✔
2883
  if (TSDB_CODE_SUCCESS == code) {
2,143!
2884
    code = getChildTuple(&pChildTupe, pChildren);
2,143✔
2885
  }
2886

2887
  // push down expression to pOutputDataBlockDesc of child node
2888
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
2,143!
2889
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
319✔
2890
    if (TSDB_CODE_SUCCESS == code) {
319!
2891
      code = pushdownDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
319✔
2892
    }
2893
  }
2894

2895
  if (TSDB_CODE_SUCCESS == code) {
2,143!
2896
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
2,143✔
2897
  }
2898

2899
  if (TSDB_CODE_SUCCESS == code) {
2,143!
2900
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
2,143✔
2901
    if (TSDB_CODE_SUCCESS == code) {
2,143!
2902
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
2,143✔
2903
    }
2904
  }
2905

2906
  if (TSDB_CODE_SUCCESS == code) {
2,143!
2907
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pSortLogicNode, (SPhysiNode*)pSort);
2,143✔
2908
  }
2909

2910
  if (TSDB_CODE_SUCCESS == code) {
2,143!
2911
    *pPhyNode = (SPhysiNode*)pSort;
2,143✔
2912
  } else {
2913
    nodesDestroyNode((SNode*)pSort);
×
2914
  }
2915

2916
  nodesDestroyList(pPrecalcExprs);
2,143✔
2917
  nodesDestroyList(pSortKeys);
2,143✔
2918

2919
  return code;
2,143✔
2920
}
2921

2922
static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* pChildren,
179✔
2923
                                            SPartitionLogicNode* pPartLogicNode, ENodeType type,
2924
                                            SPhysiNode** pPhyNode) {
2925
  SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, type);
179✔
2926
  if (NULL == pPart) {
179!
2927
    return terrno;
×
2928
  }
2929

2930
  SNodeList* pPrecalcExprs = NULL;
179✔
2931
  SNodeList* pPartitionKeys = NULL;
179✔
2932
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
179✔
2933
  pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder;
179✔
2934
  SDataBlockDescNode* pChildTupe = NULL;
179✔
2935
  if (TSDB_CODE_SUCCESS == code) {
179!
2936
    code = getChildTuple(&pChildTupe, pChildren);
179✔
2937
  }
2938
  // push down expression to pOutputDataBlockDesc of child node
2939
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
179!
2940
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
10✔
2941
    if (TSDB_CODE_SUCCESS == code) {
10!
2942
      code = pushdownDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
10✔
2943
    }
2944
  }
2945

2946
  if (TSDB_CODE_SUCCESS == code) {
179!
2947
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
179✔
2948
  }
2949

2950
  if (TSDB_CODE_SUCCESS == code) {
179!
2951
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
179✔
2952
    if (TSDB_CODE_SUCCESS == code) {
179!
2953
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
179✔
2954
    }
2955
  }
2956

2957
  if (pPart->needBlockOutputTsOrder) {
179✔
2958
    SNode* node;
2959
    bool   found = false;
35✔
2960
    FOREACH(node, pPartLogicNode->node.pTargets) {
35!
2961
      if (nodeType(node) == QUERY_NODE_COLUMN) {
35!
2962
        SColumnNode* pCol = (SColumnNode*)node;
35✔
2963
        if (pCol->tableId == pPartLogicNode->pkTsColTbId && pCol->colId == pPartLogicNode->pkTsColId) {
35!
2964
          pPart->tsSlotId = pCol->slotId;
35✔
2965
          found = true;
35✔
2966
          break;
35✔
2967
        }
2968
      }
2969
    }
2970
    if (!found) code = TSDB_CODE_PLAN_INTERNAL_ERROR;
35!
2971
  }
2972

2973
  if (TSDB_CODE_SUCCESS == code) {
179!
2974
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart);
179✔
2975
  }
2976

2977
  if (TSDB_CODE_SUCCESS == code) {
179!
2978
    *pPhyNode = (SPhysiNode*)pPart;
179✔
2979
  } else {
2980
    nodesDestroyNode((SNode*)pPart);
×
2981
  }
2982

2983
  nodesDestroyList(pPrecalcExprs);
179✔
2984
  nodesDestroyList(pPartitionKeys);
179✔
2985

2986
  return code;
179✔
2987
}
2988

2989
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
179✔
2990
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
2991
  return createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION, pPhyNode);
179✔
2992
}
2993

2994
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
43✔
2995
                                   SPhysiNode** pPhyNode) {
2996
  SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(
43✔
2997
      pCxt, (SLogicNode*)pFillNode, QUERY_NODE_PHYSICAL_PLAN_FILL);
2998
  if (NULL == pFill) {
43!
2999
    return terrno;
×
3000
  }
3001

3002
  pFill->mode = pFillNode->mode;
43✔
3003
  pFill->timeRange = pFillNode->timeRange;
43✔
3004
  TSWAP(pFill->pTimeRange, pFillNode->pTimeRange);
43✔
3005
  pFill->node.inputTsOrder = pFillNode->node.inputTsOrder;
43✔
3006
  int32_t             code = TSDB_CODE_SUCCESS;
43✔
3007
  SDataBlockDescNode* pChildTupe = NULL;
43✔
3008
  if (TSDB_CODE_SUCCESS == code) {
43!
3009
    code = getChildTuple(&pChildTupe, pChildren);
43✔
3010
  }
3011
  if (TSDB_CODE_SUCCESS == code) {
43!
3012
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillExprs, &pFill->pFillExprs);
43✔
3013
  }
3014
  if (TSDB_CODE_SUCCESS == code) {
43!
3015
    code = addDataBlockSlots(pCxt, pFill->pFillExprs, pFill->node.pOutputDataBlockDesc);
43✔
3016
  }
3017
  if (TSDB_CODE_SUCCESS == code) {
43!
3018
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pNotFillExprs, &pFill->pNotFillExprs);
43✔
3019
  }
3020
  if (TSDB_CODE_SUCCESS == code) {
43!
3021
    code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc);
43✔
3022
  }
3023
  if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFillNode->pFillNullExprs) > 0) {
43!
3024
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillNullExprs, &pFill->pFillNullExprs);
×
3025
    if (TSDB_CODE_SUCCESS == code) {
×
3026
      code = addDataBlockSlots(pCxt, pFill->pFillNullExprs, pFill->node.pOutputDataBlockDesc);
×
3027
    }
3028
  }
3029

3030
  if (TSDB_CODE_SUCCESS == code) {
43!
3031
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs);
43✔
3032
  }
3033
  if (TSDB_CODE_SUCCESS == code) {
43!
3034
    code = addDataBlockSlot(pCxt, &pFill->pWStartTs, pFill->node.pOutputDataBlockDesc);
43✔
3035
  }
3036

3037
  if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
43!
3038
    code = nodesCloneNode(pFillNode->pValues, &pFill->pValues);
3✔
3039
  }
3040

3041
  if (TSDB_CODE_SUCCESS == code) {
43!
3042
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFillNode, (SPhysiNode*)pFill);
43✔
3043
  }
3044

3045
  if (TSDB_CODE_SUCCESS == code) {
43!
3046
    *pPhyNode = (SPhysiNode*)pFill;
43✔
3047
  } else {
3048
    nodesDestroyNode((SNode*)pFill);
×
3049
  }
3050

3051
  return code;
43✔
3052
}
3053

3054
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge, int32_t idx) {
5,008✔
3055
  SExchangePhysiNode* pExchange = NULL;
5,008✔
3056
  int32_t             code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, (SNode**)&pExchange);
5,008✔
3057
  if (NULL == pExchange) {
5,008!
3058
    return code;
×
3059
  }
3060
  pExchange->srcStartGroupId = pMerge->srcGroupId + idx;
5,008✔
3061
  pExchange->srcEndGroupId = pMerge->srcGroupId + idx;
5,008✔
3062
  pExchange->singleChannel = true;
5,008✔
3063
  pExchange->node.pParent = (SPhysiNode*)pMerge;
5,008✔
3064
  pExchange->node.pOutputDataBlockDesc = NULL;
5,008✔
3065
  code = nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc, (SNode**)&pExchange->node.pOutputDataBlockDesc);
5,008✔
3066
  if (NULL == pExchange->node.pOutputDataBlockDesc) {
5,008!
3067
    nodesDestroyNode((SNode*)pExchange);
×
3068
    return code;
×
3069
  }
3070
  SNode* pSlot = NULL;
5,008✔
3071
  FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; }
21,099!
3072
  return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
5,008✔
3073
}
3074

3075
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SMergeLogicNode* pMergeLogicNode,
2,051✔
3076
                                    SPhysiNode** pPhyNode) {
3077
  int32_t          code = TSDB_CODE_SUCCESS;
2,051✔
3078
  SMergePhysiNode* pMerge =
3079
      (SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
2,051✔
3080
  if (NULL == pMerge) {
2,051!
3081
    return terrno;
×
3082
  }
3083

3084
  if (pMergeLogicNode->colsMerge) {
2,051✔
3085
    pMerge->type = MERGE_TYPE_COLUMNS;
51✔
3086
  } else if (pMergeLogicNode->needSort) {
2,000!
3087
    pMerge->type = MERGE_TYPE_SORT;
2,000✔
3088
  } else {
3089
    pMerge->type = MERGE_TYPE_NON_SORT;
×
3090
  }
3091

3092
  pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
2,051✔
3093
  pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
2,051✔
3094
  pMerge->srcEndGroupId = pMergeLogicNode->srcEndGroupId;
2,051✔
3095
  pMerge->groupSort = pMergeLogicNode->groupSort;
2,051✔
3096
  pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
2,051✔
3097
  pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId;
2,051✔
3098

3099
  if (!pMergeLogicNode->colsMerge) {
2,051✔
3100
    PLAN_ERR_JRET(addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc));
2,000!
3101

3102
    for (int32_t j = 0; j < pMergeLogicNode->numOfSubplans; ++j) {
4,000✔
3103
      for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
7,008✔
3104
        PLAN_ERR_JRET(createExchangePhysiNodeByMerge(pMerge, j));
5,008!
3105
      }
3106
    }
3107

3108
    if (NULL != pMergeLogicNode->pMergeKeys) {
2,000✔
3109
      PLAN_ERR_JRET(setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
1,991!
3110
                                  &pMerge->pMergeKeys));
3111
    }
3112

3113
    PLAN_ERR_JRET(setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
2,000!
3114
                                &pMerge->pTargets));
3115
    PLAN_ERR_JRET(addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc));
2,000!
3116
  } else {
3117
    PLAN_ERR_JRET(setMultiBlockSlotId(pCxt, pChildren, false, pMergeLogicNode->node.pTargets, &pMerge->pTargets));
51!
3118
    PLAN_ERR_JRET(addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc));
51!
3119
  }
3120

3121
  *pPhyNode = (SPhysiNode*)pMerge;
2,051✔
3122
  return code;
2,051✔
3123
_return:
×
3124
  nodesDestroyNode((SNode*)pMerge);
×
3125
  return code;
×
3126
}
3127

3128
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
96,662✔
3129
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
3130
  switch (nodeType(pLogicNode)) {
96,662!
3131
    case QUERY_NODE_LOGIC_PLAN_SCAN:
37,868✔
3132
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
37,868✔
3133
    case QUERY_NODE_LOGIC_PLAN_JOIN:
2,619✔
3134
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
2,619✔
3135
    case QUERY_NODE_LOGIC_PLAN_AGG:
17,063✔
3136
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode, pSubplan);
17,063✔
3137
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
11,923✔
3138
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
11,923✔
3139
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
16,067✔
3140
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
16,067✔
3141
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
3,966✔
3142
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
3,966✔
3143
    case QUERY_NODE_LOGIC_PLAN_SORT:
2,143✔
3144
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
2,143✔
3145
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
179✔
3146
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
179✔
3147
    case QUERY_NODE_LOGIC_PLAN_FILL:
43✔
3148
      return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
43✔
3149
    case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
48✔
3150
      return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
48✔
3151
    case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
331✔
3152
      return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode);
331✔
3153
    case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
×
3154
      return createForecastFuncPhysiNode(pCxt, pChildren, (SForecastFuncLogicNode*)pLogicNode, pPhyNode);
×
3155
    case QUERY_NODE_LOGIC_PLAN_MERGE:
2,051✔
3156
      return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode);
2,051✔
3157
    case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
×
3158
      return createGroupCachePhysiNode(pCxt, pChildren, (SGroupCacheLogicNode*)pLogicNode, pPhyNode);
×
3159
    case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
×
3160
      return createDynQueryCtrlPhysiNode(pCxt, pChildren, (SDynQueryCtrlLogicNode*)pLogicNode, pPhyNode, pSubplan);
×
3161
    case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN:
2,362✔
3162
      return createVirtualTableScanPhysiNode(pCxt, pSubplan, pChildren, (SVirtualScanLogicNode*)pLogicNode, pPhyNode);
2,362✔
3163
    default:
×
3164
      break;
×
3165
  }
3166

3167
  return TSDB_CODE_FAILED;
×
3168
}
3169

3170
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
96,655✔
3171
                               SPhysiNode** pPhyNode) {
3172
  SNodeList* pChildren = NULL;
96,655✔
3173
  int32_t    code = nodesMakeList(&pChildren);
96,655✔
3174
  if (NULL == pChildren) {
96,672!
3175
    return code;
×
3176
  }
3177

3178
  SNode* pLogicChild;
3179
  FOREACH(pLogicChild, pLogicNode->pChildren) {
146,900✔
3180
    SPhysiNode* pChild = NULL;
50,224✔
3181
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
50,224✔
3182
    if (TSDB_CODE_SUCCESS == code) {
50,226!
3183
      code = nodesListStrictAppend(pChildren, (SNode*)pChild);
50,226✔
3184
    }
3185
    if (TSDB_CODE_SUCCESS != code) {
50,228!
3186
      break;
×
3187
    }
3188
  }
3189

3190
  if (TSDB_CODE_SUCCESS == code) {
96,676✔
3191
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
96,675✔
3192
  }
3193

3194
  if (TSDB_CODE_SUCCESS == code) {
96,658!
3195
    if (LIST_LENGTH(pChildren) > 0) {
137,369!
3196
      (*pPhyNode)->pChildren = pChildren;
40,711✔
3197
      SNode* pChild;
3198
      FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
90,937!
3199
    } else {
3200
      nodesDestroyList(pChildren);
55,947✔
3201
    }
3202
  } else {
3203
    nodesDestroyList(pChildren);
×
3204
  }
3205

3206
  return code;
96,672✔
3207
}
3208

3209
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
126,628✔
3210
  SDataInserterNode* pInserter = NULL;
126,628✔
3211
  int32_t            code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT, (SNode**)&pInserter);
126,628✔
3212
  if (NULL == pInserter) {
126,708!
3213
    return code;
×
3214
  }
3215

3216
  pInserter->numOfTables = pBlocks->numOfTables;
126,708✔
3217
  pInserter->size = pBlocks->size;
126,708✔
3218
  pInserter->pData = pBlocks->pData;
126,708✔
3219
  pBlocks->pData = NULL;
126,708✔
3220

3221
  *pSink = (SDataSinkNode*)pInserter;
126,708✔
3222
  return TSDB_CODE_SUCCESS;
126,708✔
3223
}
3224

3225
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
45,087✔
3226
  SDataDispatcherNode* pDispatcher = NULL;
45,087✔
3227
  int32_t              code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&pDispatcher);
45,087✔
3228
  if (NULL == pDispatcher) {
45,095!
3229
    return code;
×
3230
  }
3231

3232
  pDispatcher->sink.pInputDataBlockDesc = NULL;
45,095✔
3233
  code = nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc, (SNode**)&pDispatcher->sink.pInputDataBlockDesc);
45,095✔
3234
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
45,097!
3235
    nodesDestroyNode((SNode*)pDispatcher);
×
3236
    return code;
×
3237
  }
3238

3239
  *pSink = (SDataSinkNode*)pDispatcher;
45,097✔
3240
  return TSDB_CODE_SUCCESS;
45,097✔
3241
}
3242

3243
static int32_t makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** ppSubplan) {
173,020✔
3244
  SSubplan* pSubplan = NULL;
173,020✔
3245
  int32_t   code = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN, (SNode**)&pSubplan);
173,020✔
3246
  if (NULL == pSubplan) {
173,139!
3247
    return code;
×
3248
  }
3249
  pSubplan->id = pLogicSubplan->id;
173,139✔
3250
  pSubplan->subplanType = pLogicSubplan->subplanType;
173,139✔
3251
  pSubplan->level = pLogicSubplan->level;
173,139✔
3252
  pSubplan->rowsThreshold = 4096;
173,139✔
3253
  pSubplan->dynamicRowThreshold = false;
173,139✔
3254
  pSubplan->isView = pCxt->pPlanCxt->isView;
173,139✔
3255
  pSubplan->isAudit = pCxt->pPlanCxt->isAudit;
173,139✔
3256
  pSubplan->processOneBlock = pLogicSubplan->processOneBlock;
173,139✔
3257
  pSubplan->dynTbname = pLogicSubplan->dynTbname;
173,139✔
3258
  if (NULL != pCxt->pPlanCxt->pUser) {
173,139✔
3259
    snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser);
169,139✔
3260
  }
3261
  *ppSubplan = pSubplan;
173,139✔
3262
  return code;
173,139✔
3263
}
3264

3265
static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
126,645✔
3266
  pSubplan->msgType = pModify->msgType;
126,645✔
3267
  pSubplan->execNode.nodeId = pModify->pVgDataBlocks->vg.vgId;
126,645✔
3268
  pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
126,645✔
3269
  return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
126,645✔
3270
}
3271

3272
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan,
19✔
3273
                                   SDataSinkNode** pSink) {
3274
  SQueryInserterNode* pInserter = NULL;
19✔
3275
  int32_t             code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, (SNode**)&pInserter);
19✔
3276
  if (NULL == pInserter) {
19!
3277
    return code;
×
3278
  }
3279

3280
  pInserter->tableId = pModify->tableId;
19✔
3281
  pInserter->stableId = pModify->stableId;
19✔
3282
  pInserter->tableType = pModify->tableType;
19✔
3283
  tstrncpy(pInserter->tableName, pModify->tableName, TSDB_TABLE_NAME_LEN);
19✔
3284
  pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false);
19✔
3285
  if (pModify->pVgroupList) {
19!
3286
    pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
19✔
3287
    pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
19✔
3288
    vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
19✔
3289
  }
3290
  code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
19✔
3291
                       &pInserter->pCols);
19✔
3292
  if (TSDB_CODE_SUCCESS == code) {
19!
3293
    pInserter->sink.pInputDataBlockDesc = NULL;
19✔
3294
    code = nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc, (SNode**)&pInserter->sink.pInputDataBlockDesc);
19✔
3295
  }
3296

3297
  if (TSDB_CODE_SUCCESS == code) {
19!
3298
    *pSink = (SDataSinkNode*)pInserter;
19✔
3299
  } else {
3300
    nodesDestroyNode((SNode*)pInserter);
×
3301
  }
3302

3303
  return code;
19✔
3304
}
3305

3306
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
19✔
3307
  int32_t code =
3308
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
19✔
3309
  if (TSDB_CODE_SUCCESS == code) {
19!
3310
    code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
19✔
3311
  }
3312
  pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
19✔
3313
  return code;
19✔
3314
}
3315

3316
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
126,625✔
3317
  if (NULL == pModify->node.pChildren) {
126,625!
3318
    return buildInsertValuesSubplan(pCxt, pModify, pSubplan);
126,645✔
3319
  }
3320
  return buildInsertSelectSubplan(pCxt, pModify, pSubplan);
×
3321
}
3322

3323
static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
1,024✔
3324
                                 SDataSinkNode** pSink) {
3325
  SDataDeleterNode* pDeleter = NULL;
1,024✔
3326
  int32_t           code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE, (SNode**)&pDeleter);
1,024✔
3327
  if (NULL == pDeleter) {
1,024!
3328
    return code;
×
3329
  }
3330

3331
  pDeleter->tableId = pModify->tableId;
1,024✔
3332
  pDeleter->tableType = pModify->tableType;
1,024✔
3333
  tstrncpy(pDeleter->tableFName, pModify->tableName, TSDB_TABLE_NAME_LEN);
1,024✔
3334
  tstrncpy(pDeleter->tsColName, pModify->tsColName, TSDB_COL_NAME_LEN);
1,024✔
3335
  pDeleter->deleteTimeRange = pModify->deleteTimeRange;
1,024✔
3336

3337
  code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
1,024✔
3338
                       &pDeleter->pAffectedRows);
1,024✔
3339
  if (TSDB_CODE_SUCCESS == code) {
1,024!
3340
    code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pStartTs, &pDeleter->pStartTs);
1,024✔
3341
  }
3342
  if (TSDB_CODE_SUCCESS == code) {
1,024!
3343
    code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pEndTs, &pDeleter->pEndTs);
1,024✔
3344
  }
3345
  if (TSDB_CODE_SUCCESS == code) {
1,024!
3346
    pDeleter->sink.pInputDataBlockDesc = NULL;
1,024✔
3347
    code = nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc, (SNode**)&pDeleter->sink.pInputDataBlockDesc);
1,024✔
3348
  }
3349

3350
  if (TSDB_CODE_SUCCESS == code) {
1,024!
3351
    *pSink = (SDataSinkNode*)pDeleter;
1,024✔
3352
  } else {
3353
    nodesDestroyNode((SNode*)pDeleter);
×
3354
  }
3355

3356
  return TSDB_CODE_SUCCESS;
1,024✔
3357
}
3358

3359
static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
1,024✔
3360
  int32_t code =
3361
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
1,024✔
3362
  if (TSDB_CODE_SUCCESS == code) {
1,024!
3363
    code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
1,024✔
3364
  }
3365
  pSubplan->msgType = TDMT_VND_DELETE;
1,024✔
3366
  return code;
1,024✔
3367
}
3368

3369
static int32_t buildVnodeModifySubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pSubplan) {
127,647✔
3370
  int32_t                code = TSDB_CODE_SUCCESS;
127,647✔
3371
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
127,647✔
3372
  switch (pModify->modifyType) {
127,647!
3373
    case MODIFY_TABLE_TYPE_INSERT:
126,679✔
3374
      code = buildInsertSubplan(pCxt, pModify, pSubplan);
126,679✔
3375
      break;
126,733✔
3376
    case MODIFY_TABLE_TYPE_DELETE:
1,024✔
3377
      code = buildDeleteSubplan(pCxt, pModify, pSubplan);
1,024✔
3378
      break;
1,024✔
3379
    default:
×
3380
      code = TSDB_CODE_FAILED;
×
3381
      break;
×
3382
  }
3383
  return code;
127,701✔
3384
}
3385

3386
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
173,046✔
3387
  SSubplan* pSubplan = NULL;
173,046✔
3388
  int32_t   code = makeSubplan(pCxt, pLogicSubplan, &pSubplan);
173,046✔
3389
  if (NULL == pSubplan) {
173,145!
3390
    return code;
×
3391
  }
3392

3393
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
173,145✔
3394
    code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
127,761✔
3395
  } else {
3396
    if (SUBPLAN_TYPE_SCAN == pSubplan->subplanType) {
45,384✔
3397
      pSubplan->msgType = TDMT_SCH_QUERY;
36,839✔
3398
    } else {
3399
      pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
8,545✔
3400
    }
3401
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
45,384✔
3402
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->topicQuery) {
45,395!
3403
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
45,096✔
3404
    }
3405
  }
3406

3407
  if (TSDB_CODE_SUCCESS == code) {
173,117!
3408
    *pPhysiSubplan = pSubplan;
173,117✔
3409
  } else {
3410
    nodesDestroyNode((SNode*)pSubplan);
×
3411
  }
3412

3413
  return code;
173,138✔
3414
}
3415

3416
static int32_t makeQueryPhysiPlan(SPhysiPlanContext* pCxt, SQueryPlan** ppQueryPlan) {
140,969✔
3417
  SQueryPlan* pPlan = NULL;
140,969✔
3418
  int32_t     code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN, (SNode**)&pPlan);
140,969✔
3419
  if (NULL == pPlan) {
141,017!
3420
    return code;
×
3421
  }
3422
  pPlan->pSubplans = NULL;
141,017✔
3423
  code = nodesMakeList(&pPlan->pSubplans);
141,017✔
3424
  if (NULL == pPlan->pSubplans) {
141,042✔
3425
    nodesDestroyNode((SNode*)pPlan);
57✔
3426
    return code;
×
3427
  }
3428
  pPlan->queryId = pCxt->pPlanCxt->queryId;
140,985✔
3429
  *ppQueryPlan = pPlan;
140,985✔
3430
  return code;
140,985✔
3431
}
3432

3433
static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNode* pSubplan, int32_t level, SNodeList* pSubplans) {
173,053✔
3434
  SNodeListNode* pGroup = NULL;
173,053✔
3435
  if (level >= LIST_LENGTH(pSubplans)) {
173,053!
3436
    pGroup = NULL;
150,980✔
3437
    int32_t code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&pGroup);
150,980✔
3438
    if (NULL == pGroup) {
150,998!
3439
      return code;
×
3440
    }
3441
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, (SNode*)pGroup)) {
150,998!
3442
      return TSDB_CODE_OUT_OF_MEMORY;
×
3443
    }
3444
  } else {
3445
    pGroup = (SNodeListNode*)nodesListGetNode(pSubplans, level);
22,073✔
3446
  }
3447
  if (NULL == pGroup->pNodeList) {
173,154✔
3448
    int32_t code = nodesMakeList(&pGroup->pNodeList);
151,012✔
3449
    if (NULL == pGroup->pNodeList) {
151,020!
3450
      return code;
×
3451
    }
3452
  }
3453
  return nodesListAppend(pGroup->pNodeList, (SNode*)pSubplan);
173,162✔
3454
}
3455

3456
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
173,053✔
3457
                              SQueryPlan* pQueryPlan) {
3458
  SSubplan* pSubplan = NULL;
173,053✔
3459
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
173,053✔
3460

3461
  if (TSDB_CODE_SUCCESS == code) {
173,129!
3462
    code = pushSubplan(pCxt, (SNode*)pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
173,189✔
3463
    ++(pQueryPlan->numOfSubplans);
173,171✔
3464
  }
3465

3466
  if (TSDB_CODE_SUCCESS != code) {
173,111!
3467
    nodesDestroyNode((SNode*)pSubplan);
×
3468
    return code;
×
3469
  }
3470

3471
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
173,111!
3472
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pSubplan);
24,565✔
3473
    if (TSDB_CODE_SUCCESS == code) {
24,565!
3474
      code = nodesListMakeAppend(&pSubplan->pParents, (SNode*)pParent);
24,565✔
3475
    }
3476
  }
3477

3478
  if (TSDB_CODE_SUCCESS == code) {
173,145!
3479
    SNode* pChild = NULL;
173,145✔
3480
    FOREACH(pChild, pLogicSubplan->pChildren) {
197,710✔
3481
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
24,565✔
3482
      if (TSDB_CODE_SUCCESS != code) {
24,565!
3483
        break;
×
3484
      }
3485
    }
3486
  }
3487

3488
  return code;
173,145✔
3489
}
3490

3491
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
140,956✔
3492
  SQueryPlan* pPlan = NULL;
140,956✔
3493
  int32_t     code = makeQueryPhysiPlan(pCxt, &pPlan);
140,956✔
3494
  if (NULL == pPlan) {
141,000!
3495
    return code;
×
3496
  }
3497

3498
  SNode* pSubplan = NULL;
141,000✔
3499
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
289,562!
3500
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
148,578✔
3501
    if (TSDB_CODE_SUCCESS != code) {
148,562!
3502
      break;
×
3503
    }
3504
  }
3505

3506
  if (TSDB_CODE_SUCCESS == code) {
140,984!
3507
    *pPhysiPlan = pPlan;
141,020✔
3508
  } else {
3509
    nodesDestroyNode((SNode*)pPlan);
×
3510
  }
3511

3512
  return code;
141,019✔
3513
}
3514

3515
static void destoryLocationHash(void* p) {
193,326✔
3516
  SHashObj*   pHash = *(SHashObj**)p;
193,326✔
3517
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
193,326✔
3518
  while (NULL != pIndex) {
723,644✔
3519
    taosArrayDestroy(pIndex->pSlotIdsInfo);
530,297✔
3520
    pIndex = taosHashIterate(pHash, pIndex);
530,280✔
3521
  }
3522
  taosHashCleanup(pHash);
193,347✔
3523
}
193,326✔
3524

3525
static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
140,985✔
3526
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
140,985✔
3527
  taosArrayDestroyEx(pCxt->pProjIdxLocHelper, destoryLocationHash);
141,016✔
3528
}
141,023✔
3529

3530
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
140,923✔
3531
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
140,923✔
3532
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
1,263✔
3533
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
1,263✔
3534
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
1,263✔
3535
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
1,263✔
3536
  } else {
3537
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
139,660✔
3538
  }
3539
}
140,923✔
3540

3541
static int32_t setExecNodeList(SPhysiPlanContext* pCxt, SArray* pExecNodeList) {
140,945✔
3542
  int32_t code = 0;
140,945✔
3543
  if (NULL == pExecNodeList) {
140,945✔
3544
    return code;
2,308✔
3545
  }
3546
  if (pCxt->hasSysScan || !pCxt->hasScan) {
138,637✔
3547
    SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
119,876✔
3548
    if (NULL == taosArrayPush(pExecNodeList, &node)) code = terrno;
119,934!
3549
  }
3550
  return code;
138,695✔
3551
}
3552

3553
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
140,870✔
3554
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
422,764✔
3555
                           .errCode = TSDB_CODE_SUCCESS,
3556
                           .nextDataBlockId = 0,
3557
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
140,870✔
3558
                           .pProjIdxLocHelper = taosArrayInit(32, POINTER_BYTES),
140,932✔
3559
                           .hasScan = false,
3560
                           .hasSysScan = false};
3561
  if (NULL == cxt.pLocationHelper || !cxt.pProjIdxLocHelper) {
140,962!
3562
    taosArrayDestroy(cxt.pLocationHelper);
×
3563
    taosArrayDestroy(cxt.pProjIdxLocHelper);
×
3564
    return terrno;
×
3565
  }
3566

3567
  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
140,967✔
3568
  if (TSDB_CODE_SUCCESS == code) {
140,994✔
3569
    setExplainInfo(pCxt, *pPlan);
140,930✔
3570
    code = setExecNodeList(&cxt, pExecNodeList);
140,967✔
3571
  }
3572

3573
  destoryPhysiPlanContext(&cxt);
141,092✔
3574
  return code;
141,026✔
3575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc