• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.26
/source/libs/planner/src/planPhysiCreater.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "nodes.h"
17
#include "planInt.h"
18

19
#include "catalog.h"
20
#include "functionMgt.h"
21
#include "systable.h"
22
#include "tglobal.h"
23

24
typedef struct SSlotIdInfo {
25
  int16_t slotId;
26
  bool    set;
27
} SSlotIdInfo;
28

29
typedef struct SSlotIndex {
30
  int16_t dataBlockId;
31
  SArray* pSlotIdsInfo;  // duplicate name slot
32
} SSlotIndex;
33

34
enum {
35
  SLOT_KEY_TYPE_ALL = 1,
36
  SLOT_KEY_TYPE_COLNAME = 2,
37
  SLOT_KEY_TYPE_REF = 3,
38
};
39

40
static int32_t getSlotKeyHelper(SNode* pNode, const char* pPreName, const char* name, char** ppKey, int32_t callocLen,
51,337,429✔
41
                                int32_t* pLen, uint16_t extraBufLen, int8_t slotKeyType) {
42
  int32_t code = 0;
51,337,429✔
43
  *ppKey = taosMemoryCalloc(1, callocLen);
51,337,429!
44
  if (!*ppKey) {
51,337,895!
45
    return terrno;
×
46
  }
47
  if (slotKeyType == SLOT_KEY_TYPE_ALL) {
51,337,895✔
48
    TAOS_STRNCAT(*ppKey, pPreName, TSDB_TABLE_NAME_LEN);
38,492,091✔
49
    TAOS_STRNCAT(*ppKey, ".", 2);
38,492,091✔
50
    TAOS_STRNCAT(*ppKey, name, TSDB_COL_NAME_LEN);
38,492,091✔
51
    *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
76,984,369✔
52
  } else {
53
    TAOS_STRNCAT(*ppKey, name, TSDB_COL_NAME_LEN);
12,845,804✔
54
    *pLen = strlen(*ppKey);
12,845,804✔
55
  }
56

57
  return code;
51,338,082✔
58
}
59

60
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int32_t* pLen, uint16_t extraBufLen) {
51,654,466✔
61
  int32_t code = 0;
51,654,466✔
62
  int32_t callocLen = 0;
51,654,466✔
63
  if (QUERY_NODE_COLUMN == nodeType(pNode)) {
51,654,466✔
64
    SColumnNode* pCol = (SColumnNode*)pNode;
47,075,253✔
65
    if (NULL != pStmtName) {
47,075,253✔
66
      if ('\0' != pStmtName[0]) {
2,462,037✔
67
        callocLen = TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
2,452,886✔
68
        return getSlotKeyHelper(pNode, pStmtName, pCol->node.aliasName, ppKey, callocLen, pLen, extraBufLen,
2,452,886✔
69
                                SLOT_KEY_TYPE_ALL);
70
      } else {
71
        callocLen = TSDB_COL_NAME_LEN + 1 + extraBufLen;
9,151✔
72
        return getSlotKeyHelper(pNode, pStmtName, pCol->node.aliasName, ppKey, callocLen, pLen, extraBufLen,
9,151✔
73
                                SLOT_KEY_TYPE_COLNAME);
74
      }
75
    }
76
    if ('\0' == pCol->tableAlias[0]) {
44,613,216✔
77
      callocLen = TSDB_COL_NAME_LEN + 1 + extraBufLen;
9,055,978✔
78
      return getSlotKeyHelper(pNode, pStmtName, pCol->colName, ppKey, callocLen, pLen, extraBufLen,
9,055,978✔
79
                              SLOT_KEY_TYPE_COLNAME);
80
    }
81
    if (pCol->hasRef) {
35,557,238✔
82
      *ppKey = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
119,311!
83
      if (!*ppKey) {
119,311!
84
        return terrno;
×
85
      }
86
      TAOS_STRNCAT(*ppKey, pCol->refDbName, TSDB_DB_NAME_LEN);
119,311✔
87
      TAOS_STRNCAT(*ppKey, ".", 2);
119,311✔
88
      TAOS_STRNCAT(*ppKey, pCol->refTableName, TSDB_TABLE_NAME_LEN);
119,311✔
89
      TAOS_STRNCAT(*ppKey, ".", 2);
119,311✔
90
      TAOS_STRNCAT(*ppKey, pCol->refColName, TSDB_COL_NAME_LEN);
119,311✔
91
      *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
119,311✔
92
      return code;
119,311✔
93
    }
94
    if (pCol->hasDep) {
35,437,927✔
95
      *ppKey = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen);
197,828!
96
      if (!*ppKey) {
197,828!
97
        return terrno;
×
98
      }
99
      TAOS_STRNCAT(*ppKey, pCol->dbName, TSDB_DB_NAME_LEN);
197,828✔
100
      TAOS_STRNCAT(*ppKey, ".", 2);
197,828✔
101
      TAOS_STRNCAT(*ppKey, pCol->tableAlias, TSDB_TABLE_NAME_LEN);
197,828✔
102
      TAOS_STRNCAT(*ppKey, ".", 2);
197,828✔
103
      TAOS_STRNCAT(*ppKey, pCol->colName, TSDB_COL_NAME_LEN);
197,828✔
104
      *pLen = taosHashBinary(*ppKey, strlen(*ppKey));
197,828✔
105
      return code;
197,828✔
106
    }
107
    callocLen = TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
35,240,099✔
108
    return getSlotKeyHelper(pNode, pCol->tableAlias, pCol->colName, ppKey, callocLen, pLen, extraBufLen,
35,240,099✔
109
                            SLOT_KEY_TYPE_ALL);
110
  } else if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
4,579,213✔
111
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
4,180,795✔
112
    if (FUNCTION_TYPE_TBNAME == pFunc->funcType) {
4,180,795✔
113
      SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 0);
233,916✔
114
      if (pVal) {
233,916✔
115
        if (NULL != pStmtName && '\0' != pStmtName[0]) {
7,690!
116
          callocLen = TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
×
117
          return getSlotKeyHelper(pNode, pStmtName, ((SExprNode*)pNode)->aliasName, ppKey, callocLen, pLen, extraBufLen,
×
118
                                  SLOT_KEY_TYPE_ALL);
119
        }
120
        int32_t literalLen = strlen(pVal->literal);
7,690✔
121
        callocLen = literalLen + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
7,690✔
122
        return getSlotKeyHelper(pNode, pVal->literal, ((SExprNode*)pNode)->aliasName, ppKey, callocLen, pLen,
7,690✔
123
                                extraBufLen, SLOT_KEY_TYPE_ALL);
124
      }
125
    }
126
  }
127

128
  if (NULL != pStmtName && '\0' != pStmtName[0]) {
4,571,523✔
129
    callocLen = TSDB_TABLE_NAME_LEN + 1 + TSDB_COL_NAME_LEN + 1 + extraBufLen;
791,014✔
130
    return getSlotKeyHelper(pNode, pStmtName, ((SExprNode*)pNode)->aliasName, ppKey, callocLen, pLen, extraBufLen,
791,014✔
131
                            SLOT_KEY_TYPE_ALL);
132
  }
133

134
  callocLen = TSDB_COL_NAME_LEN + 1 + extraBufLen;
3,780,509✔
135
  return getSlotKeyHelper(pNode, pStmtName, ((SExprNode*)pNode)->aliasName, ppKey, callocLen, pLen, extraBufLen,
3,780,509✔
136
                          SLOT_KEY_TYPE_COLNAME);
137
  return code;
138
}
139

140
static SNode* createSlotDesc(SPhysiPlanContext* pCxt, const char* pName, const SNode* pNode, int16_t slotId,
20,636,367✔
141
                             bool output, bool reserve) {
142
  SSlotDescNode* pSlot = NULL;
20,636,367✔
143
  int32_t        code = nodesMakeNode(QUERY_NODE_SLOT_DESC, (SNode**)&pSlot);
20,636,367✔
144
  if (NULL == pSlot) {
20,636,299!
145
    terrno = code;
×
146
    return NULL;
×
147
  }
148
  snprintf(pSlot->name, sizeof(pSlot->name), "%s", pName);
20,636,469✔
149
  pSlot->slotId = slotId;
20,636,469✔
150
  pSlot->dataType = ((SExprNode*)pNode)->resType;
20,636,469✔
151
  pSlot->reserve = reserve;
20,636,469✔
152
  pSlot->output = output;
20,636,469✔
153
  return (SNode*)pSlot;
20,636,469✔
154
}
155

156
static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
18,517,475✔
157
  STargetNode* pTarget = NULL;
18,517,475✔
158
  int32_t      code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTarget);
18,517,475✔
159
  if (NULL == pTarget) {
18,517,404!
160
    return code;
×
161
  }
162

163
  pTarget->dataBlockId = dataBlockId;
18,517,404✔
164
  pTarget->slotId = slotId;
18,517,404✔
165
  pTarget->pExpr = pNode;
18,517,404✔
166

167
  *pOutput = (SNode*)pTarget;
18,517,404✔
168
  return TSDB_CODE_SUCCESS;
18,517,404✔
169
}
170

171
static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) {
24,329,364✔
172
  SSlotIndex* pIndex = taosHashGet(pHash, pName, len);
24,329,364✔
173
  if (NULL != pIndex) {
24,329,341✔
174
    SSlotIdInfo info = {.slotId = slotId, .set = false};
7,382✔
175
    if (NULL == taosArrayPush(pIndex->pSlotIdsInfo, &info)) {
14,764!
176
      return terrno;
×
177
    }
178
    return TSDB_CODE_SUCCESS;
7,382✔
179
  }
180

181
  SSlotIndex index = {.dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo))};
24,321,959✔
182
  if (NULL == index.pSlotIdsInfo) {
24,322,227!
183
    return terrno;
×
184
  }
185
  SSlotIdInfo info = {.slotId = slotId, .set = false};
24,322,227✔
186
  if (NULL == taosArrayPush(index.pSlotIdsInfo, &info)) {
48,644,358!
187
    return terrno;
×
188
  }
189
  return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex));
24,322,131✔
190
}
191

192
static int32_t putSlotToHash(const char* pName, int32_t len, int16_t dataBlockId, int16_t slotId, SNode* pNode,
23,919,155✔
193
                             SHashObj* pHash) {
194
  return putSlotToHashImpl(dataBlockId, slotId, pName, len, pHash);
23,919,155✔
195
}
196

197
static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId,
5,062,058✔
198
                                       SHashObj** pDescHash, SHashObj** ppProjIdxDescHash) {
199
  SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
5,062,058✔
200
  if (NULL == pHash) {
5,062,107!
201
    return terrno;
×
202
  }
203
  SHashObj* pProjIdxHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
5,062,107✔
204
  if (!pProjIdxHash) {
5,062,174!
205
    taosHashCleanup(pHash);
×
206
    return terrno;
×
207
  }
208
  if (NULL == taosArrayInsert(pCxt->pLocationHelper, dataBlockId, &pHash)) {
5,062,174!
209
    taosHashCleanup(pHash);
×
210
    taosHashCleanup(pProjIdxHash);
×
211
    return terrno;
×
212
  }
213
  if (NULL == taosArrayInsert(pCxt->pProjIdxLocHelper, dataBlockId, &pProjIdxHash)) {
5,062,090✔
214
    taosHashCleanup(pHash);
69✔
215
    taosHashCleanup(pProjIdxHash);
×
216
    return terrno;
×
217
  }
218

219
  *pDescHash = pHash;
5,062,070✔
220
  *ppProjIdxDescHash = pProjIdxHash;
5,062,070✔
221
  return TSDB_CODE_SUCCESS;
5,062,070✔
222
}
223

224
static int32_t buildDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
5,062,094✔
225
                                   SHashObj* pHash, SHashObj* pProjIdxDescHash) {
226
  pDataBlockDesc->pSlots = NULL;
5,062,094✔
227
  int32_t code = nodesMakeList(&pDataBlockDesc->pSlots);
5,062,094✔
228
  if (NULL == pDataBlockDesc->pSlots) {
5,062,136!
229
    return code;
×
230
  }
231

232
  int16_t slotId = 0;
5,062,136✔
233
  SNode*  pNode = NULL;
5,062,136✔
234
  FOREACH(pNode, pList) {
25,288,288✔
235
    char*   name = NULL;
20,226,130✔
236
    int32_t len = 0;
20,226,130✔
237
    code = getSlotKey(pNode, NULL, &name, &len, 16);
20,226,130✔
238
    if (TSDB_CODE_SUCCESS == code) {
20,226,111!
239
      code = nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, name, pNode, slotId, true, false));
20,226,139✔
240
    }
241
    code = putSlotToHash(name, len, pDataBlockDesc->dataBlockId, slotId, pNode, pHash);
20,226,037✔
242
    if (TSDB_CODE_SUCCESS == code) {
20,226,258!
243
      if (nodeType(pNode) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode)->resIdx > 0) {
20,226,261!
244
        snprintf(name + strlen(name), 16, "_%d", ((SColumnNode*)pNode)->resIdx);
3,693,135✔
245
        code = putSlotToHash(name, strlen(name), pDataBlockDesc->dataBlockId, slotId, pNode, pProjIdxDescHash);
3,693,135✔
246
      }
247
    }
248
    taosMemoryFree(name);
20,226,258!
249
    if (TSDB_CODE_SUCCESS == code) {
20,226,152!
250
      pDataBlockDesc->totalRowSize += ((SExprNode*)pNode)->resType.bytes;
20,226,152✔
251
      pDataBlockDesc->outputRowSize += ((SExprNode*)pNode)->resType.bytes;
20,226,152✔
252
      ++slotId;
20,226,152✔
253
    } else {
254
      break;
×
255
    }
256
  }
257
  return code;
5,062,158✔
258
}
259

260
static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode** pDataBlockDesc) {
5,062,034✔
261
  SDataBlockDescNode* pDesc = NULL;
5,062,034✔
262
  int32_t             code = nodesMakeNode(QUERY_NODE_DATABLOCK_DESC, (SNode**)&pDesc);
5,062,034✔
263
  if (NULL == pDesc) {
5,062,084!
264
    return code;
×
265
  }
266
  pDesc->dataBlockId = pCxt->nextDataBlockId++;
5,062,084✔
267

268
  SHashObj* pHash = NULL;
5,062,084✔
269
  SHashObj* pProjIdxHash = NULL;
5,062,084✔
270
  code = createDataBlockDescHash(pCxt, LIST_LENGTH(pList), pDesc->dataBlockId, &pHash, &pProjIdxHash);
5,062,084!
271
  if (TSDB_CODE_SUCCESS == code) {
5,062,085!
272
    code = buildDataBlockSlots(pCxt, pList, pDesc, pHash, pProjIdxHash);
5,062,095✔
273
  }
274

275
  if (TSDB_CODE_SUCCESS == code) {
5,062,172!
276
    *pDataBlockDesc = pDesc;
5,062,172✔
277
  } else {
278
    nodesDestroyNode((SNode*)pDesc);
×
279
  }
280

281
  return code;
5,062,170✔
282
}
283

284
static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) {
18,107,224✔
285
  int32_t size = taosArrayGetSize(pSlotIdsInfo);
18,107,224✔
286
  for (int32_t i = 0; i < size; ++i) {
18,924,354✔
287
    SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i);
18,117,173✔
288
    if (!pInfo->set) {
18,117,135✔
289
      pInfo->set = true;
17,299,925✔
290
      return pInfo->slotId;
17,299,925✔
291
    }
292
  }
293
  return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId;
807,181✔
294
}
295

296
static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc,
6,659,381✔
297
                                     const char* pStmtName, bool output, bool reserve) {
298
  if (NULL == pList) {
6,659,381✔
299
    return TSDB_CODE_SUCCESS;
1,329,959✔
300
  }
301

302
  int32_t   code = TSDB_CODE_SUCCESS;
5,329,422✔
303
  SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId);
5,329,422✔
304
  int16_t   nextSlotId = LIST_LENGTH(pDataBlockDesc->pSlots), slotId = 0;
5,329,513!
305
  SNode*    pNode = NULL;
5,329,513✔
306
  FOREACH(pNode, pList) {
23,846,941!
307
    SNode*  pExpr = QUERY_NODE_ORDER_BY_EXPR == nodeType(pNode) ? ((SOrderByExprNode*)pNode)->pExpr : pNode;
18,517,380!
308
    char*   name = NULL;
18,517,380✔
309
    int32_t len = 0;
18,517,380✔
310
    code = getSlotKey(pExpr, pStmtName, &name, &len, 0);
18,517,380✔
311
    if (TSDB_CODE_SUCCESS == code) {
18,517,614!
312
      SSlotIndex* pIndex = taosHashGet(pHash, name, len);
18,517,655✔
313
      if (NULL == pIndex) {
18,517,549✔
314
        code = nodesListStrictAppend(pDataBlockDesc->pSlots,
410,252✔
315
                                     createSlotDesc(pCxt, name, pExpr, nextSlotId, output, reserve));
316
        if (TSDB_CODE_SUCCESS == code) {
410,252!
317
          code = putSlotToHashImpl(pDataBlockDesc->dataBlockId, nextSlotId, name, len, pHash);
410,252✔
318
        }
319
        pDataBlockDesc->totalRowSize += ((SExprNode*)pExpr)->resType.bytes;
410,252✔
320
        if (output) {
410,252✔
321
          pDataBlockDesc->outputRowSize += ((SExprNode*)pExpr)->resType.bytes;
258,834✔
322
        }
323
        slotId = nextSlotId;
410,252✔
324
        ++nextSlotId;
410,252✔
325
      } else {
326
        slotId = getUnsetSlotId(pIndex->pSlotIdsInfo);
18,107,297✔
327
      }
328
    }
329

330
    taosMemoryFree(name);
18,517,334!
331
    if (TSDB_CODE_SUCCESS == code) {
18,517,553✔
332
      SNode* pTarget = NULL;
18,517,533✔
333
      code = createTarget(pNode, pDataBlockDesc->dataBlockId, slotId, &pTarget);
18,517,533✔
334
      if (TSDB_CODE_SUCCESS == code) {
18,517,408!
335
        REPLACE_NODE(pTarget);
18,517,426✔
336
      }
337
    }
338

339
    if (TSDB_CODE_SUCCESS != code) {
18,517,428!
340
      break;
×
341
    }
342
  }
343
  return code;
5,329,561✔
344
}
345

346
static int32_t addDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
5,642,201✔
347
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, false, true);
5,642,201✔
348
}
349

350
static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlockDescNode* pDataBlockDesc) {
20,085✔
351
  if (NULL == pNode || NULL == *pNode) {
20,085!
352
    return TSDB_CODE_SUCCESS;
×
353
  }
354

355
  SNodeList* pList = NULL;
20,085✔
356
  int32_t    code = nodesListMakeAppend(&pList, *pNode);
20,085✔
357
  if (TSDB_CODE_SUCCESS == code) {
20,085!
358
    code = addDataBlockSlots(pCxt, pList, pDataBlockDesc);
20,085✔
359
  }
360
  if (TSDB_CODE_SUCCESS == code) {
20,085!
361
    *pNode = nodesListGetNode(pList, 0);
20,085✔
362
  }
363
  nodesClearList(pList);
20,085✔
364
  return code;
20,085✔
365
}
366

367
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
759,335✔
368
                                           SDataBlockDescNode* pDataBlockDesc) {
369
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, true);
759,335✔
370
}
371

372
static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
257,814✔
373
  return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, NULL, true, true);
257,814✔
374
}
375

376
typedef struct SSetSlotIdCxt {
377
  int32_t   errCode;
378
  SHashObj* pLeftHash;
379
  SHashObj* pLeftProjIdxHash;
380
  SHashObj* pRightHash;
381
  SHashObj* pRightProdIdxHash;
382
} SSetSlotIdCxt;
383

384
typedef struct SMultiTableSetSlotIdCxt {
385
  int32_t    errCode;
386
  SArray*    hashArray;
387
  SArray*    projIdxHashArray;
388
  SNodeList* pChild;
389
  bool       isVtb;
390
} SMultiTableSetSlotIdCxt;
391

392
static void dumpSlots(const char* pName, SHashObj* pHash) {
×
393
  if (NULL == pHash) {
×
394
    return;
×
395
  }
396
  planDebug("%s", pName);
×
397
  void* pIt = taosHashIterate(pHash, NULL);
×
398
  while (NULL != pIt) {
×
399
    size_t len = 0;
×
400
    char*  pKey = taosHashGetKey(pIt, &len);
×
401
    char   name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN] = {0};
×
402
    strncpy(name, pKey, len);
×
403
    planDebug("\tslot name = %s", name);
×
404
    pIt = taosHashIterate(pHash, pIt);
×
405
  }
406
}
407

408
static EDealRes doSetSlotId(SNode* pNode, void* pContext) {
23,310,733✔
409
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
23,310,733!
410
    SSetSlotIdCxt* pCxt = (SSetSlotIdCxt*)pContext;
12,770,597✔
411
    char*          name = NULL;
12,770,597✔
412
    int32_t        len = 0;
12,770,597✔
413
    pCxt->errCode = getSlotKey(pNode, NULL, &name, &len, 64);
12,770,597✔
414
    if (TSDB_CODE_SUCCESS != pCxt->errCode) {
12,770,649!
415
      return DEAL_RES_ERROR;
×
416
    }
417
    SSlotIndex* pIndex = NULL;
12,770,649✔
418
    if (((SColumnNode*)pNode)->projRefIdx > 0) {
12,770,649✔
419
      snprintf(name + strlen(name), 16, "_%d", ((SColumnNode*)pNode)->projRefIdx);
60,577✔
420
      pIndex = taosHashGet(pCxt->pLeftProjIdxHash, name, strlen(name));
60,577✔
421
      if (!pIndex) {
60,577✔
422
        pIndex = taosHashGet(pCxt->pRightProdIdxHash, name, strlen(name));
791✔
423
      }
424
    }
425

426
    if (NULL == pIndex) {
12,770,649✔
427
      name[len] = 0;
12,710,176✔
428
      pIndex = taosHashGet(pCxt->pLeftHash, name, len);
12,710,176✔
429
      if (NULL == pIndex) {
12,710,153✔
430
        pIndex = taosHashGet(pCxt->pRightHash, name, len);
388,260✔
431
      }
432
    }
433
    // pIndex is definitely not NULL, otherwise it is a bug
434
    if (NULL == pIndex) {
12,770,562!
435
      planError("doSetSlotId failed, invalid slot name %s", name);
×
436
      dumpSlots("left datablock desc", pCxt->pLeftHash);
×
437
      dumpSlots("right datablock desc", pCxt->pRightHash);
×
438
      pCxt->errCode = TSDB_CODE_PLAN_SLOT_NOT_FOUND;
×
439
      taosMemoryFree(name);
×
440
      return DEAL_RES_ERROR;
×
441
    }
442
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
12,770,562✔
443
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
12,770,562✔
444
    taosMemoryFree(name);
12,770,507!
445
    return DEAL_RES_IGNORE_CHILD;
12,770,713✔
446
  }
447
  return DEAL_RES_CONTINUE;
10,540,136✔
448
}
449

450
static EDealRes doSetMultiTableSlotId(SNode* pNode, void* pContext) {
72,393✔
451
  if (QUERY_NODE_COLUMN == nodeType(pNode) && 0 != strcmp(((SColumnNode*)pNode)->colName, "*")) {
72,393!
452
    SMultiTableSetSlotIdCxt* pCxt = (SMultiTableSetSlotIdCxt*)pContext;
72,393✔
453
    char*                    name = NULL;
72,393✔
454
    int32_t                  len = 0;
72,393✔
455
    if (pCxt->isVtb && !((SColumnNode*)pNode)->hasRef) {
72,393✔
456
      return DEAL_RES_CONTINUE;
44,913✔
457
    }
458
    
459
    pCxt->errCode = getSlotKey(pNode, NULL, &name, &len, 16);
27,480✔
460
    if (TSDB_CODE_SUCCESS != pCxt->errCode) {
27,480!
461
      return DEAL_RES_ERROR;
×
462
    }
463
    SSlotIndex* pIndex = NULL;
27,480✔
464
    int32_t idx = 0;
27,480✔
465
    if (((SColumnNode*)pNode)->projRefIdx > 0) {
27,480!
466
      sprintf(name + strlen(name), "_%d", ((SColumnNode*)pNode)->projRefIdx);
×
467
      while (!pIndex && idx < LIST_LENGTH(pCxt->pChild)) {
×
468
        SHashObj *tmpHash =
469
            taosArrayGetP(pCxt->projIdxHashArray,
×
470
                          ((SPhysiNode*)nodesListGetNode(pCxt->pChild, idx))->pOutputDataBlockDesc->dataBlockId);
×
471
        pIndex = taosHashGet(tmpHash, name, strlen(name));
×
472
        idx++;
×
473
      }
474
    } else {
475
      while (!pIndex && idx < LIST_LENGTH(pCxt->pChild)) {
112,630!
476
        SHashObj *tmpHash =
477
            taosArrayGetP(pCxt->hashArray,
85,150✔
478
                          ((SPhysiNode*)nodesListGetNode(pCxt->pChild, idx))->pOutputDataBlockDesc->dataBlockId);
85,150✔
479
        pIndex = taosHashGet(tmpHash, name, len);
85,150✔
480
        idx++;
85,150✔
481
      }
482
    }
483
    // pIndex is definitely not NULL, otherwise it is a bug
484
    if (NULL == pIndex) {
27,480!
485
      planError("doSetMultiTableSlotId failed, invalid slot name %s", name);
×
486
      for (int32_t i = 0; i < taosArrayGetSize(pCxt->hashArray); i++) {
×
487
        //dumpSlots("vtable datablock desc", taosArrayGetP(pCxt->hashArray, i));
488
      }
489
      pCxt->errCode = TSDB_CODE_PLAN_SLOT_NOT_FOUND;
×
490
      taosMemoryFree(name);
×
491
      return DEAL_RES_ERROR;
×
492
    }
493
    ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId;
27,480✔
494
    ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId;
27,480✔
495
    taosMemoryFree(name);
27,480!
496
    return DEAL_RES_IGNORE_CHILD;
27,480✔
497
  }
498
  return DEAL_RES_CONTINUE;
×
499
}
500

501
static int32_t setNodeSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId, SNode* pNode,
2,909,275✔
502
                             SNode** pOutput) {
503
  if (NULL == pNode) {
2,909,275✔
504
    return TSDB_CODE_SUCCESS;
1,773,475✔
505
  }
506

507
  SNode*  pRes = NULL;
1,135,800✔
508
  int32_t code = nodesCloneNode(pNode, &pRes);
1,135,800✔
509
  if (NULL == pRes) {
1,135,852!
510
    return code;
×
511
  }
512

513
  SSetSlotIdCxt cxt = {
5,679,229✔
514
      .errCode = TSDB_CODE_SUCCESS,
515
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
1,135,852✔
516
      .pLeftProjIdxHash = taosArrayGetP(pCxt->pProjIdxLocHelper, leftDataBlockId),
1,135,848✔
517
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)),
1,135,847✔
518
      .pRightProdIdxHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pProjIdxLocHelper, rightDataBlockId))};
1,135,841✔
519
  nodesWalkExpr(pRes, doSetSlotId, &cxt);
1,135,841✔
520
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
1,135,855!
521
    nodesDestroyNode(pRes);
×
522
    return cxt.errCode;
×
523
  }
524

525
  *pOutput = pRes;
1,135,856✔
526
  return TSDB_CODE_SUCCESS;
1,135,856✔
527
}
528

529
static int32_t setVtableNodeSlotId(SPhysiPlanContext* pCxt, SNodeList* pChild, SNode* pNode,
×
530
                                   SNode** pOutput) {
531
  int32_t code = TSDB_CODE_SUCCESS;
×
532
  if (NULL == pNode) {
×
533
    PLAN_RET(code);
×
534
  }
535

536
  SNode* pRes = NULL;
×
537
  PLAN_ERR_JRET(nodesCloneNode(pNode, &pRes));
×
538

539
  SMultiTableSetSlotIdCxt cxt = {
×
540
      .errCode = TSDB_CODE_SUCCESS,
541
      .hashArray = pCxt->pLocationHelper,
×
542
      .projIdxHashArray = pCxt->pProjIdxLocHelper,
×
543
      .pChild = pChild
544
  };
545

546
  nodesWalkExpr(pRes, doSetMultiTableSlotId, &cxt);
×
547
  PLAN_ERR_JRET(cxt.errCode);
×
548

549
  *pOutput = pRes;
×
550
  return code;
×
551
_return:
×
552
  nodesDestroyNode(pRes);
×
553
  return code;
×
554
}
555

556
static int32_t setListSlotId(SPhysiPlanContext* pCxt, int16_t leftDataBlockId, int16_t rightDataBlockId,
4,936,109✔
557
                             const SNodeList* pList, SNodeList** pOutput) {
558
  if (NULL == pList) {
4,936,109✔
559
    return TSDB_CODE_SUCCESS;
1,717,223✔
560
  }
561

562
  SNodeList* pRes = NULL;
3,218,886✔
563
  int32_t    code = nodesCloneList(pList, &pRes);
3,218,886✔
564
  if (NULL == pRes) {
3,218,942✔
565
    return code;
95✔
566
  }
567

568
  SSetSlotIdCxt cxt = {
16,094,177✔
569
      .errCode = TSDB_CODE_SUCCESS,
570
      .pLeftHash = taosArrayGetP(pCxt->pLocationHelper, leftDataBlockId),
3,218,847✔
571
      .pLeftProjIdxHash = taosArrayGetP(pCxt->pProjIdxLocHelper, leftDataBlockId),
3,218,843✔
572
      .pRightHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pLocationHelper, rightDataBlockId)),
3,218,827✔
573
      .pRightProdIdxHash = (rightDataBlockId < 0 ? NULL : taosArrayGetP(pCxt->pProjIdxLocHelper, rightDataBlockId))};
3,218,830✔
574
  nodesWalkExprs(pRes, doSetSlotId, &cxt);
3,218,830✔
575
  if (TSDB_CODE_SUCCESS != cxt.errCode) {
3,218,846✔
576
    nodesDestroyList(pRes);
17✔
577
    return cxt.errCode;
×
578
  }
579
  *pOutput = pRes;
3,218,829✔
580
  return TSDB_CODE_SUCCESS;
3,218,829✔
581
}
582

583
static int32_t setMultiBlockSlotId(SPhysiPlanContext* pCxt, SNodeList* pChild, bool isVtb, const SNodeList* pList,
11,595✔
584
                                   SNodeList** pOutput) {
585
  int32_t code = TSDB_CODE_SUCCESS;
11,595✔
586
  if (NULL == pList) {
11,595✔
587
    PLAN_RET(code);
12!
588
  }
589

590
  SNodeList* pRes = NULL;
11,583✔
591
  PLAN_ERR_JRET(nodesCloneList(pList, &pRes));
11,583!
592

593
  SMultiTableSetSlotIdCxt cxt = {
11,583✔
594
    .errCode = TSDB_CODE_SUCCESS,
595
    .hashArray = pCxt->pLocationHelper,
11,583✔
596
    .projIdxHashArray = pCxt->pProjIdxLocHelper,
11,583✔
597
    .pChild = pChild,
598
    .isVtb = isVtb
599
  };
600

601
  nodesWalkExprs(pRes, doSetMultiTableSlotId, &cxt);
11,583✔
602
  PLAN_ERR_JRET(cxt.errCode);
11,583!
603

604
  *pOutput = pRes;
11,583✔
605
  return code;
11,583✔
606
_return:
×
607
  nodesDestroyList(pRes);
×
608
  return code;
×
609
}
610

611
static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, ENodeType type) {
5,061,941✔
612
  SPhysiNode* pPhysiNode = NULL;
5,061,941✔
613
  int32_t     code = nodesMakeNode(type, (SNode**)&pPhysiNode);
5,061,941✔
614
  if (NULL == pPhysiNode) {
5,062,110!
615
    terrno = code;
×
616
    return NULL;
×
617
  }
618

619
  TSWAP(pPhysiNode->pLimit, pLogicNode->pLimit);
5,062,110✔
620
  TSWAP(pPhysiNode->pSlimit, pLogicNode->pSlimit);
5,062,110✔
621
  pPhysiNode->dynamicOp = pLogicNode->dynamicOp;
5,062,110✔
622
  pPhysiNode->inputTsOrder = pLogicNode->inputTsOrder;
5,062,110✔
623
  pPhysiNode->outputTsOrder = pLogicNode->outputTsOrder;
5,062,110✔
624

625
  code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
5,062,110✔
626
  if (TSDB_CODE_SUCCESS != code) {
5,062,153!
627
    nodesDestroyNode((SNode*)pPhysiNode);
×
628
    terrno = code;
×
629
    return NULL;
4✔
630
  }
631
  pPhysiNode->pOutputDataBlockDesc->precision = pLogicNode->precision;
5,062,177✔
632
  return pPhysiNode;
5,062,177✔
633
}
634

635
static int32_t setConditionsSlotId(SPhysiPlanContext* pCxt, const SLogicNode* pLogicNode, SPhysiNode* pPhysiNode) {
4,878,282✔
636
  if (NULL != pLogicNode->pConditions) {
4,878,282✔
637
    return setNodeSlotId(pCxt, pPhysiNode->pOutputDataBlockDesc->dataBlockId, -1, pLogicNode->pConditions,
533,700✔
638
                         &pPhysiNode->pConditions);
639
  }
640
  return TSDB_CODE_SUCCESS;
4,344,582✔
641
}
642

643
static int32_t colIdCompare(const void* pLeft, const void* pRight) {
16,308,757✔
644
  SColumnNode* pLeftCol = *(SColumnNode**)pLeft;
16,308,757✔
645
  SColumnNode* pRightCol = *(SColumnNode**)pRight;
16,308,757✔
646
  return pLeftCol->colId > pRightCol->colId ? 1 : -1;
16,308,757✔
647
}
648

649
static int32_t sortScanCols(SNodeList* pScanCols) {
1,787,873✔
650
  SArray* pArray = taosArrayInit(LIST_LENGTH(pScanCols), POINTER_BYTES);
1,787,873!
651
  if (NULL == pArray) {
1,787,917✔
652
    return terrno;
18✔
653
  }
654

655
  int32_t code = 0;
1,787,899✔
656
  SNode*  pCol = NULL;
1,787,899✔
657
  FOREACH(pCol, pScanCols) {
8,730,153!
658
    if (NULL == taosArrayPush(pArray, &pCol)) {
6,942,254!
659
      code = terrno;
×
660
      break;
×
661
    }
662
  }
663
  if (TSDB_CODE_SUCCESS != code) {
1,787,911!
664
    taosArrayDestroy(pArray);
×
665
    return code;
×
666
  }
667
  taosArraySort(pArray, colIdCompare);
1,787,911✔
668

669
  int32_t index = 0;
1,787,813✔
670
  FOREACH(pCol, pScanCols) { REPLACE_NODE(taosArrayGetP(pArray, index++)); }
8,730,061!
671
  taosArrayDestroy(pArray);
1,787,819✔
672

673
  return TSDB_CODE_SUCCESS;
1,787,918✔
674
}
675

676
static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
1,852,721✔
677
  if (NULL == pScanCols) {
1,852,721✔
678
    return TSDB_CODE_SUCCESS;
76,590✔
679
  }
680

681
  pScanPhysiNode->pScanCols = NULL;
1,776,131✔
682
  int32_t code = nodesCloneList(pScanCols, &pScanPhysiNode->pScanCols);
1,776,131✔
683
  if (NULL == pScanPhysiNode->pScanCols) {
1,776,218!
684
    return code;
×
685
  }
686
  return sortScanCols(pScanPhysiNode->pScanCols);
1,776,218✔
687
}
688

689
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
1,852,741✔
690
                                           SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
691
  int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
1,852,741✔
692
  if (TSDB_CODE_SUCCESS == code) {
1,852,807!
693
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc);
1,852,818✔
694
  }
695

696
  if (TSDB_CODE_SUCCESS == code && NULL != pScanLogicNode->pScanPseudoCols) {
1,852,794!
697
    pScanPhysiNode->pScanPseudoCols = NULL;
599,945✔
698
    code = nodesCloneList(pScanLogicNode->pScanPseudoCols, &pScanPhysiNode->pScanPseudoCols);
599,945✔
699
  }
700

701
  if (TSDB_CODE_SUCCESS == code) {
1,852,794!
702
    code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanPseudoCols, pScanPhysiNode->node.pOutputDataBlockDesc);
1,852,810✔
703
  }
704

705
  if (TSDB_CODE_SUCCESS == code) {
1,852,771!
706
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode);
1,852,794✔
707
  }
708

709
  if (TSDB_CODE_SUCCESS == code) {
1,852,769!
710
    pScanPhysiNode->uid = pScanLogicNode->tableId;
1,852,804✔
711
    pScanPhysiNode->suid = pScanLogicNode->stableId;
1,852,804✔
712
    pScanPhysiNode->tableType = pScanLogicNode->tableType;
1,852,804✔
713
    pScanPhysiNode->groupOrderScan = pScanLogicNode->groupOrderScan;
1,852,804✔
714
    pScanPhysiNode->virtualStableScan = pScanLogicNode->virtualStableScan;
1,852,804✔
715
    memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
1,852,804✔
716
    if (NULL != pScanLogicNode->pTagCond) {
1,852,804✔
717
      pSubplan->pTagCond = NULL;
111,144✔
718
      code = nodesCloneNode(pScanLogicNode->pTagCond, &pSubplan->pTagCond);
111,144✔
719
    }
720
  }
721

722
  if (TSDB_CODE_SUCCESS == code) {
1,852,769✔
723
    if (NULL != pScanLogicNode->pTagIndexCond) {
1,852,750✔
724
      pSubplan->pTagIndexCond = NULL;
10,844✔
725
      code = nodesCloneNode(pScanLogicNode->pTagIndexCond, &pSubplan->pTagIndexCond);
10,844✔
726
    }
727
  }
728

729
  if (TSDB_CODE_SUCCESS == code) {
1,852,759!
730
    *pPhyNode = (SPhysiNode*)pScanPhysiNode;
1,852,759✔
731
  } else {
732
    nodesDestroyNode((SNode*)pScanPhysiNode);
×
733
  }
734

735
  return code;
1,852,769✔
736
}
737

738
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
1,853,323✔
739
  pNodeAddr->nodeId = vg->vgId;
1,853,323✔
740
  pNodeAddr->epSet = vg->epSet;
1,853,323✔
741
}
1,853,323✔
742

743
static ENodeType getScanOperatorType(EScanType scanType) {
1,716,911✔
744
  switch (scanType) {
1,716,911!
745
    case SCAN_TYPE_TAG:
×
746
      return QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
×
747
    case SCAN_TYPE_TABLE:
1,444,646✔
748
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1,444,646✔
749
    case SCAN_TYPE_STREAM:
2,029✔
750
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
2,029✔
751
    case SCAN_TYPE_TABLE_MERGE:
270,261✔
752
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
270,261✔
753
    case SCAN_TYPE_BLOCK_INFO:
17✔
754
      return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN;
17✔
755
    case SCAN_TYPE_TABLE_COUNT:
×
756
      return QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN;
×
757
    default:
×
758
      break;
×
759
  }
760
  return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
761
}
762

763
static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
17✔
764
                                         SPhysiNode** pPhyNode) {
765
  SScanPhysiNode* pScan =
766
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, getScanOperatorType(pScanLogicNode->scanType));
17✔
767
  if (NULL == pScan) {
17!
768
    return terrno;
×
769
  }
770

771
  if (pScanLogicNode->pVgroupList) {
17!
772
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
17✔
773
  }
774
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
17✔
775
}
776

777
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
76,652✔
778
                                      SPhysiNode** pPhyNode) {
779
  STagScanPhysiNode* pScan =
780
      (STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
76,652✔
781
  if (NULL == pScan) {
76,652!
782
    return terrno;
×
783
  }
784
  if (pScanLogicNode->pVgroupList) {
76,652✔
785
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
76,651✔
786
  }
787
  pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx;
76,652✔
788

789
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
76,652✔
790
}
791

792
static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
19,277✔
793
                                          SPhysiNode** pPhyNode) {
794
  SLastRowScanPhysiNode* pScan =
795
      (SLastRowScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN);
19,277✔
796
  if (NULL == pScan) {
19,277!
797
    return terrno;
×
798
  }
799
  pScan->pTargets = NULL;
19,277✔
800
  int32_t code = nodesCloneList(pScanLogicNode->node.pTargets, &pScan->pTargets);
19,277✔
801
  if (TSDB_CODE_SUCCESS != code) {
19,277!
802
    nodesDestroyNode((SNode*)pScan);
×
803
    return code;
×
804
  }
805
  pScan->pGroupTags = NULL;
19,277✔
806
  code = nodesCloneList(pScanLogicNode->pGroupTags, &pScan->pGroupTags);
19,277✔
807
  if (TSDB_CODE_SUCCESS != code) {
19,277!
808
    nodesDestroyNode((SNode*)pScan);
×
809
    return code;
×
810
  }
811

812
  pScan->groupSort = pScanLogicNode->groupSort;
19,277✔
813
  pScan->ignoreNull = pScanLogicNode->igLastNull;
19,277✔
814

815
  if (pScanLogicNode->pVgroupList) {
19,277!
816
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
19,277✔
817
  }
818

819
  code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
19,277✔
820
  if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) {
19,277!
821
    pScan->pFuncTypes = taosArrayInit(taosArrayGetSize(pScanLogicNode->pFuncTypes), sizeof(int32_t));
368✔
822
    if (NULL == pScan->pFuncTypes) {
368!
823
      return terrno;
×
824
    }
825

826
    SNode* pTargetNode = NULL;
368✔
827
    int    funcTypeIndex = 0;
368✔
828
    FOREACH(pTargetNode, ((SScanPhysiNode*)pScan)->pScanCols) {
9,784!
829
      if (((STargetNode*)pTargetNode)->pExpr->type != QUERY_NODE_COLUMN) {
9,416!
830
        continue;
×
831
      }
832
      SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pTargetNode)->pExpr;
9,416✔
833

834
      for (int i = 0; i < TARRAY_SIZE(pScanLogicNode->pFuncTypes); ++i) {
234,094!
835
        SFunctParam* pFunctParam = taosArrayGet(pScanLogicNode->pFuncTypes, i);
234,094✔
836
        if (pColNode->colId == pFunctParam->pCol->colId &&
234,094✔
837
            0 == strncmp(pColNode->colName, pFunctParam->pCol->name, strlen(pColNode->colName))) {
14,270✔
838
          if (NULL == taosArrayInsert(pScan->pFuncTypes, funcTypeIndex, &pFunctParam->type)) {
9,416!
839
            code = terrno;
×
840
          }
841
          break;
9,416✔
842
        }
843
      }
844
      funcTypeIndex++;
9,416✔
845
    }
846
  }
847
  return code;
19,277✔
848
}
849

850
static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
×
851
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
852
  STableCountScanPhysiNode* pScan = (STableCountScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
×
853
                                                                             QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN);
854
  if (NULL == pScan) {
×
855
    return terrno;
×
856
  }
857

858
  pScan->pGroupTags = NULL;
×
859
  int32_t code = nodesCloneList(pScanLogicNode->pGroupTags, &pScan->pGroupTags);
×
860
  if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
×
861
    nodesDestroyNode((SNode*)pScan);
×
862
    return code;
×
863
  }
864

865
  pScan->groupSort = pScanLogicNode->groupSort;
×
866
  if (pScanLogicNode->pVgroupList) {
×
867
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
×
868
  }
869
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
×
870
}
871

872
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
1,716,904✔
873
                                        SPhysiNode** pPhyNode) {
874
  STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
1,716,904✔
875
                                                                        getScanOperatorType(pScanLogicNode->scanType));
876
  if (NULL == pTableScan) {
1,717,007!
877
    return terrno;
×
878
  }
879

880
  memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
1,717,007✔
881
  pTableScan->scanRange = pScanLogicNode->scanRange;
1,717,007✔
882
  pTableScan->ratio = pScanLogicNode->ratio;
1,717,007✔
883
  if (pScanLogicNode->pVgroupList) {
1,717,007✔
884
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
1,716,372✔
885
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
1,716,337✔
886
  }
887
  (void)tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
1,716,972✔
888
  pTableScan->dataRequired = pScanLogicNode->dataRequired;
1,716,977✔
889
  pTableScan->pDynamicScanFuncs = NULL;
1,716,977✔
890
  int32_t code = nodesCloneList(pScanLogicNode->pDynamicScanFuncs, &pTableScan->pDynamicScanFuncs);
1,716,977✔
891
  if (TSDB_CODE_SUCCESS != code) {
1,716,933!
892
    nodesDestroyNode((SNode*)pTableScan);
×
893
    return code;
×
894
  }
895
  pTableScan->pGroupTags = NULL;
1,716,933✔
896
  code = nodesCloneList(pScanLogicNode->pGroupTags, &pTableScan->pGroupTags);
1,716,933✔
897
  if (TSDB_CODE_SUCCESS != code) {
1,716,979!
898
    nodesDestroyNode((SNode*)pTableScan);
×
899
    return code;
×
900
  }
901
  pTableScan->groupSort = pScanLogicNode->groupSort;
1,716,979✔
902
  pTableScan->interval = pScanLogicNode->interval;
1,716,979✔
903
  pTableScan->offset = pScanLogicNode->offset;
1,716,979✔
904
  pTableScan->sliding = pScanLogicNode->sliding;
1,716,979✔
905
  pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
1,716,979✔
906
  pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
1,716,979✔
907
  pTableScan->triggerType = pScanLogicNode->triggerType;
1,716,979✔
908
  pTableScan->watermark = pScanLogicNode->watermark;
1,716,979✔
909
  pTableScan->igExpired = pScanLogicNode->igExpired;
1,716,979✔
910
  pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
1,716,979✔
911
  pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
1,716,979✔
912
  pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
1,716,979✔
913
  pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
1,716,979✔
914
  pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
1,716,979✔
915
  pTableScan->smallDataTsSort = pScanLogicNode->smallDataTsSort;
1,716,979✔
916
  tstrncpy(pTableScan->pStbFullName, pCxt->pPlanCxt->pStbFullName, TSDB_TABLE_FNAME_LEN);
1,716,979✔
917
  tstrncpy(pTableScan->pWstartName, pCxt->pPlanCxt->pWstartName, TSDB_COL_NAME_LEN);
1,716,979✔
918
  tstrncpy(pTableScan->pWendName, pCxt->pPlanCxt->pWendName, TSDB_COL_NAME_LEN);
1,716,979✔
919
  tstrncpy(pTableScan->pGroupIdName, pCxt->pPlanCxt->pGroupIdName, TSDB_COL_NAME_LEN);
1,716,979✔
920
  tstrncpy(pTableScan->pIsWindowFilledName, pCxt->pPlanCxt->pIsWindowFilledName, TSDB_COL_NAME_LEN);
1,716,979✔
921

922
  code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
1,716,979✔
923
  if (TSDB_CODE_SUCCESS == code) {
1,716,967✔
924
    code = setListSlotId(pCxt, pTableScan->scan.node.pOutputDataBlockDesc->dataBlockId, -1, pScanLogicNode->pTags,
1,716,965✔
925
                         &pTableScan->pTags);
926
  }
927
  if (TSDB_CODE_SUCCESS == code) {
1,716,918!
928
    code = setNodeSlotId(pCxt, pTableScan->scan.node.pOutputDataBlockDesc->dataBlockId, -1, pScanLogicNode->pSubtable,
1,716,940✔
929
                         &pTableScan->pSubtable);
930
  }
931
  return code;
1,716,945✔
932
}
933

934
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
39,855✔
935
                                              SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
936
  SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode,
39,855✔
937
                                                                               QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN);
938
  if (NULL == pScan) {
39,856!
939
    return terrno;
×
940
  }
941

942
  pSubplan->showRewrite = pScanLogicNode->showRewrite;
39,856✔
943
  pScan->showRewrite = pScanLogicNode->showRewrite;
39,856✔
944
  pScan->accountId = pCxt->pPlanCxt->acctId;
39,856✔
945
  pScan->sysInfo = pCxt->pPlanCxt->sysInfo;
39,856✔
946
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TABLES) ||
39,856✔
947
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_TAGS) ||
33,407✔
948
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_COLS) ||
30,825✔
949
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_DISK_USAGE) ||
10,241✔
950
      0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_FILESETS)) {
10,205✔
951
    if (pScanLogicNode->pVgroupList) {
29,653!
952
      vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
29,653✔
953
    }
954
  } else {
955
    pSubplan->execNode.nodeId = MNODE_HANDLE;
10,203✔
956
    pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
10,203✔
957
  }
958
  if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES) && pScanLogicNode->pVgroupList) {
39,858!
959
    pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
353✔
960
  } else {
961
    pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
39,505✔
962
  }
963
  (void)tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
39,858✔
964

965
  pCxt->hasSysScan = true;
39,857✔
966
  return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
39,857✔
967
}
968

969
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
2,029✔
970
                                         SPhysiNode** pPhyNode) {
971
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
2,029✔
972
}
973

974
static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
270,261✔
975
                                             SScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
976
  return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
270,261✔
977
}
978

979
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
1,852,697✔
980
                                   SPhysiNode** pPhyNode) {
981
  pCxt->hasScan = true;
1,852,697✔
982
  switch (pScanLogicNode->scanType) {
1,852,697!
983
    case SCAN_TYPE_TAG:
76,652✔
984
      return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
76,652✔
985
    case SCAN_TYPE_BLOCK_INFO:
17✔
986
      return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
17✔
987
    case SCAN_TYPE_TABLE_COUNT:
×
988
      return createTableCountScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
×
989
    case SCAN_TYPE_LAST_ROW:
19,277✔
990
      return createLastRowScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
19,277✔
991
    case SCAN_TYPE_TABLE:
1,444,654✔
992
      return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
1,444,654✔
993
    case SCAN_TYPE_SYSTEM_TABLE:
39,855✔
994
      return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
39,855✔
995
    case SCAN_TYPE_STREAM:
2,029✔
996
      return createStreamScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
2,029✔
997
    case SCAN_TYPE_TABLE_MERGE:
270,261✔
998
      return createTableMergeScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
270,261✔
999
    default:
×
1000
      break;
×
1001
  }
1002
  return TSDB_CODE_FAILED;
×
1003
}
1004

1005
static int32_t getJoinDataBlockDescNode(SNodeList* pChildren, int32_t idx, SDataBlockDescNode** ppDesc) {
252,344✔
1006
  if (2 == pChildren->length) {
252,344✔
1007
    *ppDesc = ((SPhysiNode*)nodesListGetNode(pChildren, idx))->pOutputDataBlockDesc;
214,416✔
1008
  } else if (1 == pChildren->length &&
37,928!
1009
             nodeType(nodesListGetNode(pChildren, 0)) == QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE) {
37,928!
1010
    SGroupCachePhysiNode* pGrpCache = (SGroupCachePhysiNode*)nodesListGetNode(pChildren, 0);
37,928✔
1011
    *ppDesc = ((SPhysiNode*)nodesListGetNode(pGrpCache->node.pChildren, idx))->pOutputDataBlockDesc;
37,928✔
1012
  } else {
1013
    planError("Invalid join children num:%d or child type:%d", pChildren->length,
×
1014
              nodeType(nodesListGetNode(pChildren, 0)));
1015
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1016
  }
1017

1018
  return TSDB_CODE_SUCCESS;
252,344✔
1019
}
1020

1021
static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SNodeList** ppLeft,
601✔
1022
                            SNodeList** ppRight) {
1023
  int32_t code = 0;
601✔
1024
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) {
1,180!
1025
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
579✔
1026
    SNode*         pNew = NULL;
579✔
1027
    if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
579✔
1028
      code = nodesCloneNode(pOp->pLeft, &pNew);
352✔
1029
      if (TSDB_CODE_SUCCESS == code) {
352!
1030
        code = nodesListMakeStrictAppend(ppLeft, pNew);
352✔
1031
      }
1032
    } else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
227!
1033
      code = nodesCloneNode(pOp->pLeft, &pNew);
227✔
1034
      if (TSDB_CODE_SUCCESS == code) {
227!
1035
        code = nodesListMakeStrictAppend(ppRight, pNew);
227✔
1036
      }
1037
    } else {
1038
      planError("invalid col equal list, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId);
×
1039
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1040
    }
1041
    if (TSDB_CODE_SUCCESS == code) {
579!
1042
      pNew = NULL;
579✔
1043
      if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
579✔
1044
        code = nodesCloneNode(pOp->pRight, &pNew);
227✔
1045
        if (TSDB_CODE_SUCCESS == code) {
227!
1046
          code = nodesListMakeStrictAppend(ppLeft, pNew);
227✔
1047
        }
1048
      } else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) {
352!
1049
        code = nodesCloneNode(pOp->pRight, &pNew);
352✔
1050
        if (TSDB_CODE_SUCCESS == code) {
352!
1051
          code = nodesListMakeStrictAppend(ppRight, pNew);
352✔
1052
        }
1053
      } else {
1054
        planError("invalid col equal list, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId);
×
1055
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1056
      }
1057
    }
1058
  } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEqCond) &&
22!
1059
             ((SLogicConditionNode*)pEqCond)->condType == LOGIC_COND_TYPE_AND) {
44!
1060
    SLogicConditionNode* pLogic = (SLogicConditionNode*)pEqCond;
22✔
1061
    SNode*               pNode = NULL;
22✔
1062
    FOREACH(pNode, pLogic->pParameterList) {
66!
1063
      int32_t code = setColEqList(pNode, leftBlkId, rightBlkId, ppLeft, ppRight);
44✔
1064
      if (code) {
44!
1065
        return code;
×
1066
      }
1067
    }
1068
  } else {
1069
    planError("invalid col equal cond, type:%d", nodeType(pEqCond));
×
1070
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1071
  }
1072

1073
  return code;
601✔
1074
}
1075

1076
static int32_t setMergeJoinPrimColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId,
126,172✔
1077
                                         SSortMergeJoinPhysiNode* pJoin, SJoinLogicNode* pJoinLogicNode) {
1078
  int32_t code = 0;
126,172✔
1079
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
126,172!
1080
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
126,172✔
1081
    if (pOp->opType != OP_TYPE_EQUAL && JOIN_STYPE_ASOF != subType) {
126,172!
1082
      planError("invalid primary cond opType, opType:%d", pOp->opType);
×
1083
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1084
    }
1085

1086
    switch (nodeType(pOp->pLeft)) {
126,172!
1087
      case QUERY_NODE_COLUMN: {
124,993✔
1088
        SColumnNode* pCol = (SColumnNode*)pOp->pLeft;
124,993✔
1089
        if (leftBlkId == pCol->dataBlockId) {
124,993✔
1090
          pJoin->leftPrimSlotId = pCol->slotId;
124,297✔
1091
          pJoin->asofOpType = pOp->opType;
124,297✔
1092
        } else if (rightBlkId == pCol->dataBlockId) {
696!
1093
          pJoin->rightPrimSlotId = pCol->slotId;
696✔
1094
        } else {
1095
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1096
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1097
        }
1098
        break;
124,993✔
1099
      }
1100
      case QUERY_NODE_VALUE: {
480✔
1101
        if (pJoinLogicNode && pJoinLogicNode->leftConstPrimGot) {
480!
1102
          pJoin->leftPrimExpr = NULL;
480✔
1103
          code = nodesCloneNode(pOp->pLeft, &pJoin->leftPrimExpr);
480✔
1104
          break;
480✔
1105
        }
1106
        
1107
        planError("value node got in prim eq left cond, rightType:%d", pOp->pRight ? nodeType(pOp->pRight) : 0);
×
1108
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1109
      }
1110
      case QUERY_NODE_FUNCTION: {
699✔
1111
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft;
699✔
1112
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
699!
1113
          planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType);
×
1114
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1115
        }
1116
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
699✔
1117
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
699!
1118
          planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
×
1119
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1120
        }
1121
        SColumnNode* pCol = (SColumnNode*)pParam;
699✔
1122
        if (leftBlkId == pCol->dataBlockId) {
699✔
1123
          pJoin->leftPrimSlotId = pCol->slotId;
613✔
1124
          pJoin->asofOpType = pOp->opType;
613✔
1125
          pJoin->leftPrimExpr = NULL;
613✔
1126
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
613✔
1127
        } else if (rightBlkId == pCol->dataBlockId) {
86!
1128
          pJoin->rightPrimSlotId = pCol->slotId;
86✔
1129
          pJoin->rightPrimExpr = NULL;
86✔
1130
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
86✔
1131
        } else {
1132
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1133
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1134
        }
1135
        break;
699✔
1136
      }
1137
      default:
×
1138
        planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft));
×
1139
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1140
    }
1141
    if (TSDB_CODE_SUCCESS != code) {
126,172!
1142
      return code;
×
1143
    }
1144
    switch (nodeType(pOp->pRight)) {
126,172!
1145
      case QUERY_NODE_COLUMN: {
125,120✔
1146
        SColumnNode* pCol = (SColumnNode*)pOp->pRight;
125,120✔
1147
        if (leftBlkId == pCol->dataBlockId) {
125,120✔
1148
          pJoin->leftPrimSlotId = pCol->slotId;
754✔
1149
          pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
754✔
1150
        } else if (rightBlkId == pCol->dataBlockId) {
124,366!
1151
          pJoin->rightPrimSlotId = pCol->slotId;
124,366✔
1152
        } else {
1153
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1154
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1155
        }
1156
        break;
125,120✔
1157
      }
1158
      case QUERY_NODE_VALUE: {
559✔
1159
        if (pJoinLogicNode && pJoinLogicNode->rightConstPrimGot) {
559!
1160
          pJoin->rightPrimExpr = NULL;
559✔
1161
          code = nodesCloneNode(pOp->pRight, &pJoin->rightPrimExpr);
559✔
1162
          break;
559✔
1163
        }
1164
        
1165
        planError("value node got in prim eq right cond, leftType:%d", pOp->pLeft ? nodeType(pOp->pLeft) : 0);
×
1166
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1167
      }      
1168
      case QUERY_NODE_FUNCTION: {
493✔
1169
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight;
493✔
1170
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
493!
1171
          planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType);
×
1172
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1173
        }
1174
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
493✔
1175
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
493!
1176
          planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
×
1177
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1178
        }
1179
        SColumnNode* pCol = (SColumnNode*)pParam;
493✔
1180
        if (leftBlkId == pCol->dataBlockId) {
493✔
1181
          pJoin->leftPrimSlotId = pCol->slotId;
28✔
1182
          pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
28✔
1183
          pJoin->leftPrimExpr = NULL;
28✔
1184
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
28✔
1185
        } else if (rightBlkId == pCol->dataBlockId) {
465!
1186
          pJoin->rightPrimSlotId = pCol->slotId;
465✔
1187
          pJoin->rightPrimExpr = NULL;
465✔
1188
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
465✔
1189
        } else {
1190
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1191
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1192
        }
1193
        break;
493✔
1194
      }
1195
      default:
×
1196
        planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight));
×
1197
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1198
    }
1199
  } else {
1200
    planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
×
1201
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1202
  }
1203

1204
  return code;
126,172✔
1205
}
1206

1207
static int32_t removePrimColFromJoinTargets(SNodeList* pTargets, SValueNode* pPrimExpr, SColumnNode** ppRemoved) {
1,039✔
1208
  int32_t code = TSDB_CODE_SUCCESS;
1,039✔
1209
  SNode* pNode = NULL;
1,039✔
1210
  FOREACH(pNode, pTargets) {
2,635!
1211
    SColumnNode* pCol = (SColumnNode*)pNode;
2,347✔
1212
    if (0 == strcmp(pCol->tableAlias, pPrimExpr->node.srcTable) && 0 == strcmp(pCol->colName, pPrimExpr->node.aliasName)) {
2,347✔
1213
      code = nodesCloneNode(pNode, (SNode**)ppRemoved);
751✔
1214
      ERASE_NODE(pTargets);
751✔
1215
      break;
751✔
1216
    }
1217
  }
1218

1219
  return code;
1,039✔
1220
}
1221

1222
static int32_t appendPrimColToJoinTargets(SSortMergeJoinPhysiNode* pJoin, SColumnNode** ppTarget, STargetNode* primExpr, int16_t blkId) {
751✔
1223
  SColumnNode* pCol = *ppTarget;
751✔
1224
  if (TSDB_DATA_TYPE_TIMESTAMP != pCol->node.resType.type) {
751✔
1225
    planError("primary key output type is not ts, type:%d", pCol->node.resType.type);
2!
1226
    return TSDB_CODE_PAR_PRIM_KEY_MUST_BE_TS;
2✔
1227
  }
1228
  pCol->dataBlockId = blkId;
749✔
1229
  pCol->slotId = primExpr->slotId;
749✔
1230
  int32_t code = nodesListMakeStrictAppend(&pJoin->pTargets, (SNode *)pCol);
749✔
1231
  if (TSDB_CODE_SUCCESS == code) {
749!
1232
    *ppTarget = NULL;
749✔
1233
  }
1234

1235
  return code;
749✔
1236
}
1237

1238
static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
126,172✔
1239
                                        SPhysiNode** pPhyNode) {
1240
  SSortMergeJoinPhysiNode* pJoin =
1241
      (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
126,172✔
1242
  if (NULL == pJoin) {
126,172!
1243
    return terrno;
×
1244
  }
1245

1246
  SDataBlockDescNode* pLeftDesc = NULL;
126,172✔
1247
  SDataBlockDescNode* pRightDesc = NULL;
126,172✔
1248
  pJoin->joinType = pJoinLogicNode->joinType;
126,172✔
1249
  pJoin->subType = pJoinLogicNode->subType;
126,172✔
1250
  pJoin->pWindowOffset = NULL;
126,172✔
1251
  int32_t code = nodesCloneNode(pJoinLogicNode->pWindowOffset, &pJoin->pWindowOffset);
126,172✔
1252
  if (TSDB_CODE_SUCCESS == code) {
126,172!
1253
    pJoin->pJLimit = NULL;
126,172✔
1254
    code = nodesCloneNode(pJoinLogicNode->pJLimit, (SNode**)&pJoin->pJLimit);
126,172✔
1255
  }
1256
  if (TSDB_CODE_SUCCESS == code) {
126,172!
1257
    pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
126,172✔
1258
    pJoin->seqWinGroup = pJoinLogicNode->seqWinGroup;
126,172✔
1259
    pJoin->grpJoin = pJoinLogicNode->grpJoin;
126,172✔
1260
    code = getJoinDataBlockDescNode(pChildren, 0, &pLeftDesc);
126,172✔
1261
  }
1262

1263
  if (TSDB_CODE_SUCCESS == code) {
126,172!
1264
    code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc);
126,172✔
1265
  }
1266

1267
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pPrimKeyEqCond) {
126,172!
1268
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
125,026✔
1269
                         &pJoin->pPrimKeyCond);
1270
    if (TSDB_CODE_SUCCESS == code) {
125,026!
1271
      code = setMergeJoinPrimColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId,
125,026✔
1272
                                       pRightDesc->dataBlockId, pJoin, pJoinLogicNode);
125,026✔
1273
    }
1274
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
125,026!
1275
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
1,121✔
1276
    }
1277
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
125,026!
1278
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
1,110✔
1279
    }
1280
  }
1281

1282
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->addPrimEqCond) {
126,172!
1283
    SNode* pPrimKeyCond = NULL;
1,146✔
1284
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond,
1,146✔
1285
                         &pPrimKeyCond);
1286
    if (TSDB_CODE_SUCCESS == code) {
1,146!
1287
      code = setMergeJoinPrimColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId,
1,146✔
1288
                                       pJoin, NULL);
1289
    }
1290
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
1,146!
1291
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
×
1292
    }
1293
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
1,146!
1294
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
×
1295
    }
1296
    nodesDestroyNode(pPrimKeyCond);
1,146✔
1297
  }
1298

1299
  SValueNode* pLeftPrimExpr = NULL, *pRightPrimExpr = NULL;
126,172✔
1300
  SColumnNode* pLeftTarget = NULL, *pRightTarget = NULL;
126,172✔
1301
  if (TSDB_CODE_SUCCESS == code && pJoinLogicNode->leftConstPrimGot && pJoin->leftPrimExpr 
126,172!
1302
      && QUERY_NODE_VALUE == nodeType(((STargetNode*)pJoin->leftPrimExpr)->pExpr)) {
480!
1303
    pLeftPrimExpr = (SValueNode*)((STargetNode*)pJoin->leftPrimExpr)->pExpr;
480✔
1304
    code = removePrimColFromJoinTargets(pJoinLogicNode->node.pTargets, pLeftPrimExpr, &pLeftTarget);
480✔
1305
  }
1306

1307
  if (TSDB_CODE_SUCCESS == code && pJoinLogicNode->rightConstPrimGot && pJoin->rightPrimExpr 
126,172!
1308
      && QUERY_NODE_VALUE == nodeType(((STargetNode*)pJoin->rightPrimExpr)->pExpr)) {
559!
1309
    pRightPrimExpr = (SValueNode*)((STargetNode*)pJoin->rightPrimExpr)->pExpr;
559✔
1310
    code = removePrimColFromJoinTargets(pJoinLogicNode->node.pTargets, pRightPrimExpr, &pRightTarget);
559✔
1311
  }
1312

1313
  if (TSDB_CODE_SUCCESS == code) {
126,172!
1314
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
126,172✔
1315
                         &pJoin->pTargets);
1316
  }
1317

1318
  if (TSDB_CODE_SUCCESS == code && pLeftPrimExpr && pLeftTarget) {
126,172!
1319
    code = appendPrimColToJoinTargets(pJoin, &pLeftTarget, (STargetNode*)pJoin->leftPrimExpr, pLeftDesc->dataBlockId);
344✔
1320
  }
1321
  if (TSDB_CODE_SUCCESS == code && pRightPrimExpr && pRightTarget) {
126,172!
1322
    code = appendPrimColToJoinTargets(pJoin, &pRightTarget, (STargetNode*)pJoin->rightPrimExpr, pRightDesc->dataBlockId);
407✔
1323
  }
1324

1325
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pFullOnCond) {
126,172✔
1326
    code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pFullOnCond,
57,894✔
1327
                         &pJoin->pFullOnCond);
1328
  }
1329

1330
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColEqCond) || (NULL != pJoinLogicNode->pTagEqCond))) {
126,172✔
1331
    code = mergeJoinConds(&pJoinLogicNode->pColEqCond, &pJoinLogicNode->pTagEqCond);
557✔
1332
  }
1333
  // TODO set from input blocks for group algo
1334
  /*
1335
    if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
1336
      code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
1337
    &pJoin->pColEqCond);
1338
    }
1339
  */
1340
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColEqCond) {
126,172✔
1341
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
557✔
1342
                         &pJoin->pColEqCond);
1343
    if (TSDB_CODE_SUCCESS == code) {
557!
1344
      code = setColEqList(pJoin->pColEqCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->pEqLeft,
557✔
1345
                          &pJoin->pEqRight);
1346
    }
1347
  }
1348

1349
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) {
126,172✔
1350
    code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond);
57,508✔
1351
  }
1352
  // TODO set from input blocks for group algo
1353
  /*
1354
    if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
1355
      code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColOnCond,
1356
    &pJoin->pColOnCond);
1357
    }
1358
  */
1359
  if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pColOnCond) {
126,172✔
1360
    code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pColOnCond,
57,508✔
1361
                         &pJoin->pColOnCond);
1362
  }
1363

1364
  if (TSDB_CODE_SUCCESS == code) {
126,172✔
1365
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
126,170✔
1366
  }
1367

1368
  if (TSDB_CODE_SUCCESS == code) {
126,172✔
1369
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
126,170✔
1370
  }
1371

1372
  if (TSDB_CODE_SUCCESS == code) {
126,172✔
1373
    *pPhyNode = (SPhysiNode*)pJoin;
126,170✔
1374
  } else {
1375
    nodesDestroyNode((SNode*)pJoin);
2✔
1376
    nodesDestroyNode((SNode*)pLeftTarget);
2✔
1377
    nodesDestroyNode((SNode*)pRightTarget);
2✔
1378
  }
1379

1380
  return code;
126,172✔
1381
}
1382

1383
static int32_t extractHashJoinOpCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq, SHashJoinPhysiNode* pJoin) {
19,008✔
1384
  int32_t code = 0;
19,008✔
1385
  if (QUERY_NODE_OPERATOR == nodeType(pEq)) {
19,008!
1386
    SOperatorNode* pOp = (SOperatorNode*)pEq;
19,008✔
1387
    SColumnNode*   pLeft = (SColumnNode*)pOp->pLeft;
19,008✔
1388
    SColumnNode*   pRight = (SColumnNode*)pOp->pRight;
19,008✔
1389
    if (lBlkId == pLeft->dataBlockId && rBlkId == pRight->dataBlockId) {
37,913!
1390
      SNode *pL = NULL, *pR = NULL;
18,905✔
1391
      code = nodesCloneNode(pOp->pLeft, &pL);
18,905✔
1392
      if (TSDB_CODE_SUCCESS == code) {
18,905!
1393
        code = nodesListStrictAppend(pJoin->pOnLeft, pL);
18,905✔
1394
      }
1395
      if (TSDB_CODE_SUCCESS == code) {
18,905!
1396
        code = nodesCloneNode(pOp->pRight, &pR);
18,905✔
1397
      }
1398
      if (TSDB_CODE_SUCCESS == code) {
18,905!
1399
        code = nodesListStrictAppend(pJoin->pOnRight, pR);
18,905✔
1400
      }
1401
    } else if (rBlkId == pLeft->dataBlockId && lBlkId == pRight->dataBlockId) {
206!
1402
      SNode *pL = NULL, *pR = NULL;
103✔
1403
      code = nodesCloneNode(pOp->pRight, &pR);
103✔
1404
      if (TSDB_CODE_SUCCESS == code) {
103!
1405
        code = nodesListStrictAppend(pJoin->pOnLeft, pR);
103✔
1406
      }
1407
      if (TSDB_CODE_SUCCESS == code) {
103!
1408
        code = nodesCloneNode(pOp->pLeft, &pL);
103✔
1409
      }
1410
      if (TSDB_CODE_SUCCESS == code) {
103!
1411
        code = nodesListStrictAppend(pJoin->pOnRight, pL);
103✔
1412
      }
1413
    } else {
1414
      planError("Invalid join equal cond, lbid:%d, rbid:%d, oplid:%d, oprid:%d", lBlkId, rBlkId, pLeft->dataBlockId,
×
1415
                pRight->dataBlockId);
1416
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1417
    }
1418

1419
    return code;
19,008✔
1420
  }
1421

1422
  planError("Invalid join equal node type:%d", nodeType(pEq));
×
1423
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1424
}
1425

1426
static int32_t extractHashJoinOnCols(int16_t lBlkId, int16_t rBlkId, SNode* pEq, SHashJoinPhysiNode* pJoin) {
56,892✔
1427
  if (NULL == pEq) {
56,892✔
1428
    return TSDB_CODE_SUCCESS;
37,928✔
1429
  }
1430

1431
  int32_t code = TSDB_CODE_SUCCESS;
18,964✔
1432
  if (QUERY_NODE_OPERATOR == nodeType(pEq)) {
18,964✔
1433
    code = extractHashJoinOpCols(lBlkId, rBlkId, pEq, pJoin);
18,920✔
1434
  } else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pEq)) {
44!
1435
    SLogicConditionNode* pLogic = (SLogicConditionNode*)pEq;
44✔
1436
    SNode*               pNode = NULL;
44✔
1437
    FOREACH(pNode, pLogic->pParameterList) {
132!
1438
      code = extractHashJoinOpCols(lBlkId, rBlkId, pNode, pJoin);
88✔
1439
      if (code) {
88!
1440
        break;
×
1441
      }
1442
    }
1443
  } else {
1444
    planError("Invalid join equal node type:%d", nodeType(pEq));
×
1445
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1446
  }
1447

1448
  return code;
18,964✔
1449
}
1450

1451
static int32_t createHashJoinColList(int16_t lBlkId, int16_t rBlkId, SNode* pEq1, SNode* pEq2, SNode* pEq3,
18,964✔
1452
                                     SHashJoinPhysiNode* pJoin) {
1453
  int32_t code = TSDB_CODE_SUCCESS;
18,964✔
1454
  pJoin->pOnLeft = NULL;
18,964✔
1455
  code = nodesMakeList(&pJoin->pOnLeft);
18,964✔
1456
  if (TSDB_CODE_SUCCESS != code) {
18,964!
1457
    return code;
×
1458
  }
1459
  pJoin->pOnRight = NULL;
18,964✔
1460
  code = nodesMakeList(&pJoin->pOnRight);
18,964✔
1461
  if (TSDB_CODE_SUCCESS != code) {
18,964!
1462
    return code;
×
1463
  }
1464

1465
  code = extractHashJoinOnCols(lBlkId, rBlkId, pEq1, pJoin);
18,964✔
1466
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1467
    code = extractHashJoinOnCols(lBlkId, rBlkId, pEq2, pJoin);
18,964✔
1468
  }
1469
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1470
    code = extractHashJoinOnCols(lBlkId, rBlkId, pEq3, pJoin);
18,964✔
1471
  }
1472
  if (TSDB_CODE_SUCCESS == code && pJoin->pOnLeft->length <= 0) {
18,964!
1473
    planError("Invalid join equal column num: %d", pJoin->pOnLeft->length);
×
1474
    code = TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1475
  }
1476

1477
  return code;
18,964✔
1478
}
1479

1480
static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhysiNode* pJoin) {
18,964✔
1481
  SNode*     pNode = NULL;
18,964✔
1482
  SSHashObj* pHash = tSimpleHashInit(pJoin->pTargets->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
18,964✔
1483
  if (NULL == pHash) {
18,964!
1484
    return TSDB_CODE_OUT_OF_MEMORY;
×
1485
  }
1486
  SNodeList* pNew = NULL;
18,964✔
1487
  int32_t    code = nodesMakeList(&pNew);
18,964✔
1488

1489
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1490
    FOREACH(pNode, pJoin->pTargets) {
94,820!
1491
      SColumnNode* pCol = (SColumnNode*)pNode;
75,856✔
1492
      char*        pName = NULL;
75,856✔
1493
      int32_t      len = 0;
75,856✔
1494
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
75,856✔
1495
      if (TSDB_CODE_SUCCESS == code) {
75,856!
1496
        code = tSimpleHashPut(pHash, pName, len, &pCol, POINTER_BYTES);
75,856✔
1497
      }
1498
      taosMemoryFree(pName);
75,856!
1499
      if (TSDB_CODE_SUCCESS != code) {
75,856!
1500
        break;
×
1501
      }
1502
    }
1503
  }
1504
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1505
    nodesClearList(pJoin->pTargets);
18,964✔
1506
    pJoin->pTargets = pNew;
18,964✔
1507

1508
    FOREACH(pNode, pJoin->pOnLeft) {
37,972!
1509
      char*        pName = NULL;
19,008✔
1510
      SColumnNode* pCol = (SColumnNode*)pNode;
19,008✔
1511
      int32_t      len = 0;
19,008✔
1512
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
19,008✔
1513
      if (TSDB_CODE_SUCCESS == code) {
19,008!
1514
        SNode** p = tSimpleHashGet(pHash, pName, len);
19,008✔
1515
        if (p) {
19,008!
1516
          code = nodesListStrictAppend(pJoin->pTargets, *p);
×
1517
          if (TSDB_CODE_SUCCESS == code) {
×
1518
            code = tSimpleHashRemove(pHash, pName, len);
×
1519
          }
1520
        }
1521
      }
1522
      taosMemoryFree(pName);
19,008!
1523
      if (TSDB_CODE_SUCCESS != code) {
19,008!
1524
        break;
×
1525
      }
1526
    }
1527
  }
1528
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1529
    FOREACH(pNode, pJoin->pOnRight) {
37,972!
1530
      char*        pName = NULL;
19,008✔
1531
      SColumnNode* pCol = (SColumnNode*)pNode;
19,008✔
1532
      int32_t      len = 0;
19,008✔
1533
      code = getSlotKey(pNode, NULL, &pName, &len, 0);
19,008✔
1534
      if (TSDB_CODE_SUCCESS == code) {
19,008!
1535
        SNode** p = tSimpleHashGet(pHash, pName, len);
19,008✔
1536
        if (p) {
19,008!
1537
          code = nodesListStrictAppend(pJoin->pTargets, *p);
×
1538
          if (TSDB_CODE_SUCCESS == code) {
×
1539
            code = tSimpleHashRemove(pHash, pName, len);
×
1540
          }
1541
        }
1542
      }
1543
      taosMemoryFree(pName);
19,008!
1544
      if (TSDB_CODE_SUCCESS != code) {
19,008!
1545
        break;
×
1546
      }
1547
    }
1548
  }
1549
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1550
    if (tSimpleHashGetSize(pHash) > 0) {
18,964!
1551
      SNode** p = NULL;
18,964✔
1552
      int32_t iter = 0;
18,964✔
1553
      while (1) {
1554
        p = tSimpleHashIterate(pHash, p, &iter);
94,820✔
1555
        if (p == NULL) {
94,820✔
1556
          break;
18,964✔
1557
        }
1558

1559
        code = nodesListStrictAppend(pJoin->pTargets, *p);
75,856✔
1560
        if (TSDB_CODE_SUCCESS != code) {
75,856!
1561
          break;
×
1562
        }
1563
      }
1564
    }
1565
  }
1566

1567
  tSimpleHashCleanup(pHash);
18,964✔
1568

1569
  return code;
18,964✔
1570
}
1571

1572
static int32_t setHashJoinPrimColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId,
×
1573
                                        SHashJoinPhysiNode* pJoin) {
1574
  int32_t code = 0;
×
1575
  if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
×
1576
    SOperatorNode* pOp = (SOperatorNode*)pEqCond;
×
1577
    if (pOp->opType != OP_TYPE_EQUAL) {
×
1578
      planError("invalid primary cond opType, opType:%d", pOp->opType);
×
1579
      return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1580
    }
1581

1582
    switch (nodeType(pOp->pLeft)) {
×
1583
      case QUERY_NODE_COLUMN: {
×
1584
        SColumnNode* pCol = (SColumnNode*)pOp->pLeft;
×
1585
        if (leftBlkId == pCol->dataBlockId) {
×
1586
          pJoin->leftPrimSlotId = pCol->slotId;
×
1587
        } else if (rightBlkId == pCol->dataBlockId) {
×
1588
          pJoin->rightPrimSlotId = pCol->slotId;
×
1589
        } else {
1590
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1591
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1592
        }
1593
        break;
×
1594
      }
1595
      case QUERY_NODE_FUNCTION: {
×
1596
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft;
×
1597
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
×
1598
          planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType);
×
1599
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1600
        }
1601
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1602
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
×
1603
          planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
×
1604
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1605
        }
1606
        SColumnNode* pCol = (SColumnNode*)pParam;
×
1607
        if (leftBlkId == pCol->dataBlockId) {
×
1608
          pJoin->leftPrimSlotId = pCol->slotId;
×
1609
          pJoin->leftPrimExpr = NULL;
×
1610
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
×
1611
        } else if (rightBlkId == pCol->dataBlockId) {
×
1612
          pJoin->rightPrimSlotId = pCol->slotId;
×
1613
          pJoin->rightPrimExpr = NULL;
×
1614
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
×
1615
        } else {
1616
          planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
×
1617
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1618
        }
1619
        break;
×
1620
      }
1621
      default:
×
1622
        planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft));
×
1623
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1624
    }
1625
    if (TSDB_CODE_SUCCESS != code) {
×
1626
      return code;
×
1627
    }
1628
    switch (nodeType(pOp->pRight)) {
×
1629
      case QUERY_NODE_COLUMN: {
×
1630
        SColumnNode* pCol = (SColumnNode*)pOp->pRight;
×
1631
        if (leftBlkId == pCol->dataBlockId) {
×
1632
          pJoin->leftPrimSlotId = pCol->slotId;
×
1633
        } else if (rightBlkId == pCol->dataBlockId) {
×
1634
          pJoin->rightPrimSlotId = pCol->slotId;
×
1635
        } else {
1636
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1637
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1638
        }
1639
        break;
×
1640
      }
1641
      case QUERY_NODE_FUNCTION: {
×
1642
        SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight;
×
1643
        if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
×
1644
          planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType);
×
1645
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1646
        }
1647
        SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
×
1648
        if (QUERY_NODE_COLUMN != nodeType(pParam)) {
×
1649
          planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
×
1650
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1651
        }
1652
        SColumnNode* pCol = (SColumnNode*)pParam;
×
1653
        if (leftBlkId == pCol->dataBlockId) {
×
1654
          pJoin->leftPrimSlotId = pCol->slotId;
×
1655
          pJoin->leftPrimExpr = NULL;
×
1656
          code = nodesCloneNode((SNode*)pFunc, &pJoin->leftPrimExpr);
×
1657
        } else if (rightBlkId == pCol->dataBlockId) {
×
1658
          pJoin->rightPrimSlotId = pCol->slotId;
×
1659
          pJoin->rightPrimExpr = NULL;
×
1660
          code = nodesCloneNode((SNode*)pFunc, &pJoin->rightPrimExpr);
×
1661
        } else {
1662
          planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
×
1663
          return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1664
        }
1665
        break;
×
1666
      }
1667
      default:
×
1668
        planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight));
×
1669
        return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1670
    }
1671
  } else {
1672
    planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
×
1673
    return TSDB_CODE_PLAN_INTERNAL_ERROR;
×
1674
  }
1675

1676
  return code;
×
1677
}
1678

1679
static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
18,964✔
1680
                                       SPhysiNode** pPhyNode) {
1681
  SHashJoinPhysiNode* pJoin =
1682
      (SHashJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN);
18,964✔
1683
  if (NULL == pJoin) {
18,964!
1684
    return terrno;
×
1685
  }
1686

1687
  SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
18,964✔
1688
  SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
18,964✔
1689
  int32_t             code = TSDB_CODE_SUCCESS;
18,964✔
1690

1691
  pJoin->joinType = pJoinLogicNode->joinType;
18,964✔
1692
  pJoin->subType = pJoinLogicNode->subType;
18,964✔
1693
  pJoin->pWindowOffset = NULL;
18,964✔
1694
  code = nodesCloneNode(pJoinLogicNode->pWindowOffset, &pJoin->pWindowOffset);
18,964✔
1695
  if (TSDB_CODE_SUCCESS != code) {
18,964!
1696
    nodesDestroyNode((SNode*)pJoin);
×
1697
    return code;
×
1698
  }
1699
  pJoin->pJLimit = NULL;
18,964✔
1700
  code = nodesCloneNode(pJoinLogicNode->pJLimit, &pJoin->pJLimit);
18,964✔
1701
  if (TSDB_CODE_SUCCESS != code) {
18,964!
1702
    nodesDestroyNode((SNode*)pJoin);
×
1703
    return code;
×
1704
  }
1705
  pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
18,964✔
1706
  pJoin->timeRangeTarget = pJoinLogicNode->timeRangeTarget;
18,964✔
1707
  pJoin->timeRange.skey = pJoinLogicNode->timeRange.skey;
18,964✔
1708
  pJoin->timeRange.ekey = pJoinLogicNode->timeRange.ekey;
18,964✔
1709

1710
  if (NULL != pJoinLogicNode->pPrimKeyEqCond) {
18,964!
1711
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
×
1712
                         &pJoin->pPrimKeyCond);
1713
    if (TSDB_CODE_SUCCESS == code) {
×
1714
      code = setHashJoinPrimColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
×
1715
    }
1716
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
×
1717
      code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
×
1718
    }
1719
    if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
×
1720
      code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
×
1721
    }
1722
  }
1723
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1724
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond,
18,964✔
1725
                         &pJoin->pColEqCond);
1726
  }
1727
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1728
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond,
18,964✔
1729
                         &pJoin->pTagEqCond);
1730
  }
1731
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1732
    code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, -1, pJoinLogicNode->pLeftOnCond, &pJoin->pLeftOnCond);
18,964✔
1733
  }
1734
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1735
    code = setNodeSlotId(pCxt, -1, pRightDesc->dataBlockId, pJoinLogicNode->pRightOnCond, &pJoin->pRightOnCond);
18,964✔
1736
  }
1737
  if (TSDB_CODE_SUCCESS == code && ((NULL != pJoinLogicNode->pColOnCond) || (NULL != pJoinLogicNode->pTagOnCond))) {
18,964!
1738
    code = mergeJoinConds(&pJoinLogicNode->pColOnCond, &pJoinLogicNode->pTagOnCond);
×
1739
  }
1740
  SNode* pOnCond = (NULL != pJoinLogicNode->pColOnCond) ? pJoinLogicNode->pColOnCond : pJoinLogicNode->pTagOnCond;
18,964!
1741
  if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) {
18,964!
1742
    code =
1743
        setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pOnCond, &pJoin->pFullOnCond);
×
1744
  }
1745
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1746
    code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
18,964✔
1747
                         &pJoin->pTargets);
1748
  }
1749
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1750
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
18,964✔
1751
  }
1752
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1753
    code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond,
18,964✔
1754
                                 pJoin->pColEqCond, pJoin->pTagEqCond, pJoin);
1755
  }
1756
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1757
    code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
18,964✔
1758
  }
1759
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1760
    code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
18,964✔
1761
  }
1762

1763
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1764
    *pPhyNode = (SPhysiNode*)pJoin;
18,964✔
1765
  } else {
1766
    nodesDestroyNode((SNode*)pJoin);
×
1767
  }
1768

1769
  return code;
18,964✔
1770
}
1771

1772
static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
145,136✔
1773
                                   SPhysiNode** pPhyNode) {
1774
  switch (pJoinLogicNode->joinAlgo) {
145,136!
1775
    case JOIN_ALGO_MERGE:
126,172✔
1776
      return createMergeJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode);
126,172✔
1777
    case JOIN_ALGO_HASH:
18,964✔
1778
      return createHashJoinPhysiNode(pCxt, pChildren, pJoinLogicNode, pPhyNode);
18,964✔
1779
    default:
×
1780
      planError("Invalid join algorithm:%d", pJoinLogicNode->joinAlgo);
×
1781
      break;
×
1782
  }
1783

1784
  return TSDB_CODE_FAILED;
×
1785
}
1786

1787
static int32_t createVirtualScanCols(SPhysiPlanContext* pCxt, SVirtualScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) {
11,252✔
1788
  if (NULL == pScanCols) {
11,252✔
1789
    return TSDB_CODE_SUCCESS;
252✔
1790
  }
1791

1792
  pScanPhysiNode->scan.pScanCols = NULL;
11,000✔
1793
  int32_t code = nodesCloneList(pScanCols, &pScanPhysiNode->scan.pScanCols);
11,000✔
1794
  if (NULL == pScanPhysiNode->scan.pScanCols) {
11,000!
1795
    return code;
×
1796
  }
1797
  return sortScanCols(pScanPhysiNode->scan.pScanCols);
11,000✔
1798
}
1799

1800
static int32_t createVirtualTableScanPhysiNodeFinalize(SPhysiPlanContext* pCxt,
11,252✔
1801
                                                       SNodeList* pChild,
1802
                                                       SVirtualScanLogicNode* pScanLogicNode,
1803
                                                       SVirtualScanPhysiNode* pScanPhysiNode,
1804
                                                       SPhysiNode** pPhyNode) {
1805
  int32_t code = TSDB_CODE_SUCCESS;
11,252✔
1806

1807
  PLAN_ERR_JRET(createVirtualScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols));
11,252!
1808
  PLAN_ERR_JRET(addDataBlockSlots(pCxt, pScanPhysiNode->scan.pScanCols, pScanPhysiNode->scan.node.pOutputDataBlockDesc));
11,252!
1809
  if (NULL != pScanLogicNode->pScanPseudoCols) {
11,252✔
1810
    pScanPhysiNode->scan.pScanPseudoCols = NULL;
4,206✔
1811
    PLAN_ERR_JRET(nodesCloneList(pScanLogicNode->pScanPseudoCols, &pScanPhysiNode->scan.pScanPseudoCols));
4,206!
1812
    PLAN_ERR_JRET(addDataBlockSlots(pCxt, pScanPhysiNode->scan.pScanPseudoCols, pScanPhysiNode->scan.node.pOutputDataBlockDesc));
4,206!
1813
  }
1814

1815
  PLAN_ERR_JRET(setConditionsSlotId(pCxt, (const SLogicNode*)pScanLogicNode, (SPhysiNode*)pScanPhysiNode));
11,252!
1816

1817
  pScanPhysiNode->scan.uid = pScanLogicNode->tableId;
11,252✔
1818
  pScanPhysiNode->scan.suid = pScanLogicNode->stableId;
11,252✔
1819
  pScanPhysiNode->scan.tableType = pScanLogicNode->tableType;
11,252✔
1820
  memcpy(&pScanPhysiNode->scan.tableName, &pScanLogicNode->tableName, sizeof(SName));
11,252✔
1821
  pScanPhysiNode->scanAllCols = pScanLogicNode->scanAllCols;
11,252✔
1822

1823
  *pPhyNode = (SPhysiNode*)pScanPhysiNode;
11,252✔
1824
  return code;
11,252✔
1825

1826
_return:
×
1827
  nodesDestroyNode((SNode*)pScanPhysiNode);
×
1828
  return code;
×
1829
}
1830

1831
static int32_t createVirtualTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SNodeList* pChildren,
11,252✔
1832
                                               SVirtualScanLogicNode* pScanLogicNode, SPhysiNode** pPhyNode) {
1833
  int32_t                 code = TSDB_CODE_SUCCESS;
11,252✔
1834
  SVirtualScanPhysiNode * pVirtualScan =
1835
      (SVirtualScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN);
11,252✔
1836
  if (NULL == pVirtualScan) {
11,252!
1837
    return terrno;
×
1838
  }
1839

1840
  if (pScanLogicNode->pVgroupList) {
11,252✔
1841
    vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
6,768✔
1842
    pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable;
6,768✔
1843
  }
1844

1845
  (void)tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
11,252✔
1846
  PLAN_ERR_RET(createVirtualTableScanPhysiNodeFinalize(pCxt, pChildren, pScanLogicNode, (SVirtualScanPhysiNode*)pVirtualScan, pPhyNode));
11,252!
1847
  PLAN_ERR_RET(setMultiBlockSlotId(pCxt, pChildren, true, pScanLogicNode->node.pTargets, &pVirtualScan->pTargets));
11,252!
1848
  return code;
11,252✔
1849
}
1850

1851
static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
18,964✔
1852
                                         SGroupCacheLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
1853
  SGroupCachePhysiNode* pGrpCache =
1854
      (SGroupCachePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE);
18,964✔
1855
  if (NULL == pGrpCache) {
18,964!
1856
    return terrno;
×
1857
  }
1858

1859
  pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull;
18,964✔
1860
  pGrpCache->grpByUid = pLogicNode->grpByUid;
18,964✔
1861
  pGrpCache->globalGrp = pLogicNode->globalGrp;
18,964✔
1862
  pGrpCache->batchFetch = pLogicNode->batchFetch;
18,964✔
1863
  SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
18,964✔
1864
  int32_t             code = TSDB_CODE_SUCCESS;
18,964✔
1865
  /*
1866
    if (TSDB_CODE_SUCCESS == code) {
1867
      code = setListSlotId(pCxt, pChildDesc->dataBlockId, -1, pLogicNode->pGroupCols, &pGrpCache->pGroupCols);
1868
    }
1869
  */
1870

1871
  *pPhyNode = (SPhysiNode*)pGrpCache;
18,964✔
1872

1873
  return code;
18,964✔
1874
}
1875

1876
static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren,
18,964✔
1877
                                             SDynQueryCtrlLogicNode* pLogicNode, SDynQueryCtrlPhysiNode* pDynCtrl) {
1878
  SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
18,964✔
1879
  SNodeList*          pVgList = NULL;
18,964✔
1880
  SNodeList*          pUidList = NULL;
18,964✔
1881
  int32_t             code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pVgList, &pVgList);
18,964✔
1882
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1883
    code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pUidList, &pUidList);
18,964✔
1884
  }
1885
  if (TSDB_CODE_SUCCESS == code) {
18,964!
1886
    memcpy(pDynCtrl->stbJoin.srcScan, pLogicNode->stbJoin.srcScan, sizeof(pDynCtrl->stbJoin.srcScan));
18,964✔
1887

1888
    SNode*  pNode = NULL;
18,964✔
1889
    int32_t i = 0;
18,964✔
1890
    FOREACH(pNode, pVgList) {
56,892!
1891
      pDynCtrl->stbJoin.vgSlot[i] = ((SColumnNode*)pNode)->slotId;
37,928✔
1892
      ++i;
37,928✔
1893
    }
1894
    i = 0;
18,964✔
1895
    FOREACH(pNode, pUidList) {
56,892!
1896
      pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId;
37,928✔
1897
      ++i;
37,928✔
1898
    }
1899
    pDynCtrl->stbJoin.batchFetch = pLogicNode->stbJoin.batchFetch;
18,964✔
1900
  }
1901
  nodesDestroyList(pVgList);
18,964✔
1902
  nodesDestroyList(pUidList);
18,964✔
1903

1904
  return code;
18,964✔
1905
}
1906

1907
static int32_t updateDynQueryCtrlVtbScanInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren,
4,484✔
1908
                                             SDynQueryCtrlLogicNode* pLogicNode, SDynQueryCtrlPhysiNode* pDynCtrl,
1909
                                             SSubplan* pSubPlan) {
1910
  int32_t code = TSDB_CODE_SUCCESS;
4,484✔
1911

1912
  if (pLogicNode->vtbScan.pVgroupList) {
4,484!
1913
    vgroupInfoToNodeAddr(pLogicNode->vtbScan.pVgroupList->vgroups, &pSubPlan->execNode);
4,484✔
1914
    pSubPlan->execNodeStat.tableNum = pLogicNode->vtbScan.pVgroupList->vgroups[0].numOfTable;
4,484✔
1915
  }
1916

1917
  PLAN_ERR_JRET(nodesCloneList(pLogicNode->node.pTargets, &pDynCtrl->vtbScan.pScanCols));
4,484!
1918

1919
  pDynCtrl->vtbScan.scanAllCols = pLogicNode->vtbScan.scanAllCols;
4,484✔
1920
  pDynCtrl->vtbScan.suid = pLogicNode->vtbScan.suid;
4,484✔
1921
  pDynCtrl->vtbScan.mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
4,484✔
1922
  pDynCtrl->vtbScan.accountId = pCxt->pPlanCxt->acctId;
4,484✔
1923
  tstrncpy(pDynCtrl->vtbScan.dbName, pLogicNode->vtbScan.dbName, TSDB_DB_NAME_LEN);
4,484✔
1924

1925
  return code;
4,484✔
1926
_return:
×
1927
  planError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1928
  return code;
×
1929
}
1930

1931

1932
static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
23,448✔
1933
                                           SDynQueryCtrlLogicNode* pLogicNode, SPhysiNode** pPhyNode, SSubplan* pSubPlan) {
1934
  int32_t                 code = TSDB_CODE_SUCCESS;
23,448✔
1935
  int32_t                 lino = 0;
23,448✔
1936
  SDynQueryCtrlPhysiNode* pDynCtrl =
1937
      (SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL);
23,448✔
1938
  QUERY_CHECK_NULL(pDynCtrl, code, lino, _return, terrno);
23,448!
1939

1940
  switch (pLogicNode->qType) {
23,448!
1941
    case DYN_QTYPE_STB_HASH:
18,964✔
1942
      PLAN_ERR_JRET(updateDynQueryCtrlStbJoinInfo(pCxt, pChildren, pLogicNode, pDynCtrl));
18,964!
1943
      break;
18,964✔
1944
    case DYN_QTYPE_VTB_SCAN:
4,484✔
1945
      PLAN_ERR_JRET(updateDynQueryCtrlVtbScanInfo(pCxt, pChildren, pLogicNode, pDynCtrl, pSubPlan));
4,484!
1946
      break;
4,484✔
1947
    default:
×
1948
      PLAN_ERR_JRET(TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE);
×
1949
  }
1950

1951
  pDynCtrl->qType = pLogicNode->qType;
23,448✔
1952
  *pPhyNode = (SPhysiNode*)pDynCtrl;
23,448✔
1953

1954
  return code;
23,448✔
1955
_return:
×
1956
  planError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1957
  return code;
×
1958
}
1959

1960
typedef struct SRewritePrecalcExprsCxt {
1961
  int32_t    errCode;
1962
  int32_t    planNodeId;
1963
  int32_t    rewriteId;
1964
  SNodeList* pPrecalcExprs;
1965
} SRewritePrecalcExprsCxt;
1966

1967
static EDealRes collectAndRewrite(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
391,664✔
1968
  SNode* pExpr = NULL;
391,664✔
1969
  pCxt->errCode = nodesCloneNode(*pNode, &pExpr);
391,664✔
1970
  if (NULL == pExpr) {
391,664!
1971
    return DEAL_RES_ERROR;
×
1972
  }
1973
  if (TSDB_CODE_SUCCESS != (pCxt->errCode = nodesListAppend(pCxt->pPrecalcExprs, pExpr))) {
391,664!
1974
    nodesDestroyNode(pExpr);
×
1975
    return DEAL_RES_ERROR;
×
1976
  }
1977
  SColumnNode* pCol = NULL;
391,664✔
1978
  pCxt->errCode = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
391,664✔
1979
  if (NULL == pCol) {
391,664!
1980
    nodesDestroyNode(pExpr);
×
1981
    return DEAL_RES_ERROR;
×
1982
  }
1983
  SExprNode* pRewrittenExpr = (SExprNode*)pExpr;
391,664✔
1984
  pCol->node.resType = pRewrittenExpr->resType;
391,664✔
1985
  if ('\0' != pRewrittenExpr->aliasName[0]) {
391,664✔
1986
    tstrncpy(pCol->colName, pRewrittenExpr->aliasName, TSDB_COL_NAME_LEN);
388,912✔
1987
  } else {
1988
    snprintf(pRewrittenExpr->aliasName, sizeof(pRewrittenExpr->aliasName), "#expr_%d_%d", pCxt->planNodeId,
2,752✔
1989
             pCxt->rewriteId);
1990
    tstrncpy(pCol->colName, pRewrittenExpr->aliasName, TSDB_COL_NAME_LEN);
2,752✔
1991
  }
1992
  nodesDestroyNode(*pNode);
391,664✔
1993
  *pNode = (SNode*)pCol;
391,664✔
1994
  return DEAL_RES_IGNORE_CHILD;
391,664✔
1995
}
1996

1997
static int32_t rewriteValueToOperator(SRewritePrecalcExprsCxt* pCxt, SNode** pNode) {
66,020✔
1998
  SOperatorNode* pOper = NULL;
66,020✔
1999
  int32_t        code = nodesMakeNode(QUERY_NODE_OPERATOR, (SNode**)&pOper);
66,020✔
2000
  if (NULL == pOper) {
66,020!
2001
    return code;
×
2002
  }
2003
  pOper->pLeft = NULL;
66,020✔
2004
  code = nodesMakeNode(QUERY_NODE_LEFT_VALUE, &pOper->pLeft);
66,020✔
2005
  if (NULL == pOper->pLeft) {
66,020!
2006
    nodesDestroyNode((SNode*)pOper);
×
2007
    return code;
×
2008
  }
2009
  SValueNode* pVal = (SValueNode*)*pNode;
66,020✔
2010
  pOper->node.resType = pVal->node.resType;
66,020✔
2011
  tstrncpy(pOper->node.aliasName, pVal->node.aliasName, TSDB_COL_NAME_LEN);
66,020✔
2012
  pOper->opType = OP_TYPE_ASSIGN;
66,020✔
2013
  pOper->pRight = *pNode;
66,020✔
2014
  *pNode = (SNode*)pOper;
66,020✔
2015
  return TSDB_CODE_SUCCESS;
66,020✔
2016
}
2017

2018
static EDealRes doRewritePrecalcExprs(SNode** pNode, void* pContext) {
7,773,716✔
2019
  SRewritePrecalcExprsCxt* pCxt = (SRewritePrecalcExprsCxt*)pContext;
7,773,716✔
2020
  switch (nodeType(*pNode)) {
7,773,716✔
2021
    case QUERY_NODE_VALUE: {
285,995✔
2022
      if (((SValueNode*)*pNode)->notReserved) {
285,995✔
2023
        break;
219,975✔
2024
      }
2025
      pCxt->errCode = rewriteValueToOperator(pCxt, pNode);
66,020✔
2026
      if (TSDB_CODE_SUCCESS != pCxt->errCode) {
66,020!
2027
        return DEAL_RES_ERROR;
×
2028
      }
2029
      return collectAndRewrite(pCxt, pNode);
66,020✔
2030
    }
2031
    case QUERY_NODE_OPERATOR:
57,516✔
2032
    case QUERY_NODE_LOGIC_CONDITION:
2033
    case QUERY_NODE_CASE_WHEN: {
2034
      return collectAndRewrite(pCxt, pNode);
57,516✔
2035
    }
2036
    case QUERY_NODE_FUNCTION: {
3,274,816✔
2037
      if (fmIsScalarFunc(((SFunctionNode*)(*pNode))->funcId)) {
3,274,816✔
2038
        return collectAndRewrite(pCxt, pNode);
268,117✔
2039
      }
2040
    }
2041
    default:
2042
      break;
7,162,083✔
2043
  }
2044
  return DEAL_RES_CONTINUE;
7,382,058✔
2045
}
2046

2047
static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SNodeList** pPrecalcExprs,
2,363,914✔
2048
                                   SNodeList** pRewrittenList) {
2049
  if (NULL == pList) {
2,363,914✔
2050
    return TSDB_CODE_SUCCESS;
932,060✔
2051
  }
2052
  int32_t code = 0;
1,431,854✔
2053
  if (NULL == *pPrecalcExprs) {
1,431,854✔
2054
    code = nodesMakeList(pPrecalcExprs);
1,431,093✔
2055
    if (NULL == *pPrecalcExprs) {
1,431,105!
2056
      return code;
×
2057
    }
2058
  }
2059
  if (NULL == *pRewrittenList) {
1,431,866!
2060
    code = nodesMakeList(pRewrittenList);
1,431,921✔
2061
    if (NULL == *pRewrittenList) {
1,431,919!
2062
      return code;
×
2063
    }
2064
  }
2065
  SNode* pNode = NULL;
1,431,864✔
2066
  FOREACH(pNode, pList) {
5,271,290!
2067
    SNode* pNew = NULL;
3,839,354✔
2068
    if (QUERY_NODE_GROUPING_SET == nodeType(pNode)) {
3,839,354✔
2069
      code = nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pNode)->pParameterList, 0), &pNew);
296,012✔
2070
    } else {
2071
      code = nodesCloneNode(pNode, &pNew);
3,543,342✔
2072
    }
2073
    if (NULL == pNew) {
3,839,426!
2074
      return code;
×
2075
    }
2076
    if (TSDB_CODE_SUCCESS != (code = nodesListAppend(*pRewrittenList, pNew))) {
3,839,426!
2077
      return code;
×
2078
    }
2079
  }
2080
  SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs};
1,431,936✔
2081
  nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt);
1,431,936✔
2082
  if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) {
1,431,925!
2083
    NODES_DESTORY_LIST(*pPrecalcExprs);
1,160,154✔
2084
  }
2085
  return cxt.errCode;
1,431,925✔
2086
}
2087

2088
static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeList** pPrecalcExprs,
7,563✔
2089
                                  SNode** pRewritten) {
2090
  if (NULL == pNode) {
7,563!
2091
    return TSDB_CODE_SUCCESS;
×
2092
  }
2093

2094
  SNodeList* pList = NULL;
7,563✔
2095
  int32_t    code = nodesListMakeAppend(&pList, pNode);
7,563✔
2096
  SNodeList* pRewrittenList = NULL;
7,563✔
2097
  if (TSDB_CODE_SUCCESS == code) {
7,563!
2098
    code = rewritePrecalcExprs(pCxt, pList, pPrecalcExprs, &pRewrittenList);
7,563✔
2099
  }
2100
  if (TSDB_CODE_SUCCESS == code) {
7,563!
2101
    *pRewritten = nodesListGetNode(pRewrittenList, 0);
7,563✔
2102
  }
2103
  nodesClearList(pList);
7,563✔
2104
  nodesClearList(pRewrittenList);
7,563✔
2105
  return code;
7,563✔
2106
}
2107

2108
static EDealRes hasCountLikeFunc(SNode* pNode, void* res) {
2,327,296✔
2109
  if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
2,327,296✔
2110
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
1,229,704✔
2111
    if (fmIsCountLikeFunc(pFunc->funcId) || (pFunc->hasOriginalFunc && fmIsCountLikeFunc(pFunc->originalFuncId))) {
1,229,704✔
2112
      *(bool*)res = true;
500,499✔
2113
      return DEAL_RES_END;
500,499✔
2114
    }
2115
  }
2116
  return DEAL_RES_CONTINUE;
1,826,829✔
2117
}
2118

2119
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
957,187✔
2120
                                  SPhysiNode** pPhyNode, SSubplan* pSubPlan) {
2121
  SAggPhysiNode* pAgg =
2122
      (SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
957,187✔
2123
  if (NULL == pAgg) {
957,287!
2124
    return terrno;
×
2125
  }
2126
  if (pAgg->node.pSlimit && ((SLimitNode*)pAgg->node.pSlimit)->limit) {
957,287!
2127
    pSubPlan->dynamicRowThreshold = true;
3,831✔
2128
    pSubPlan->rowsThreshold = ((SLimitNode*)pAgg->node.pSlimit)->limit->datum.i;
3,831✔
2129
  }
2130

2131
  pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
957,287✔
2132
  pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
957,287✔
2133
  pAgg->node.forceCreateNonBlockingOptr = pAggLogicNode->node.forceCreateNonBlockingOptr;
957,287✔
2134

2135
  SNodeList* pPrecalcExprs = NULL;
957,287✔
2136
  SNodeList* pGroupKeys = NULL;
957,287✔
2137
  SNodeList* pAggFuncs = NULL;
957,287✔
2138
  int32_t    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pGroupKeys, &pPrecalcExprs, &pGroupKeys);
957,287✔
2139
  if (TSDB_CODE_SUCCESS == code) {
957,224!
2140
    code = rewritePrecalcExprs(pCxt, pAggLogicNode->pAggFuncs, &pPrecalcExprs, &pAggFuncs);
957,263✔
2141
  }
2142
  nodesWalkExprs(pAggFuncs, hasCountLikeFunc, &pAgg->hasCountLikeFunc);
957,231✔
2143

2144
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
957,288✔
2145
  // push down expression to pOutputDataBlockDesc of child node
2146
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
957,287!
2147
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAgg->pExprs);
254,448✔
2148
    if (TSDB_CODE_SUCCESS == code) {
254,448!
2149
      code = pushdownDataBlockSlots(pCxt, pAgg->pExprs, pChildTupe);
254,448✔
2150
    }
2151
  }
2152

2153
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
957,287✔
2154
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pGroupKeys, &pAgg->pGroupKeys);
257,695✔
2155
    if (TSDB_CODE_SUCCESS == code) {
257,692!
2156
      code = addDataBlockSlots(pCxt, pAgg->pGroupKeys, pAgg->node.pOutputDataBlockDesc);
257,692✔
2157
    }
2158
  }
2159

2160
  if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncs) {
957,284✔
2161
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pAggFuncs, &pAgg->pAggFuncs);
724,750✔
2162
    if (TSDB_CODE_SUCCESS == code) {
724,752✔
2163
      code = addDataBlockSlots(pCxt, pAgg->pAggFuncs, pAgg->node.pOutputDataBlockDesc);
724,745✔
2164
    }
2165
  }
2166

2167
  if (TSDB_CODE_SUCCESS == code) {
957,298✔
2168
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pAggLogicNode, (SPhysiNode*)pAgg);
957,280✔
2169
  }
2170

2171
  if (TSDB_CODE_SUCCESS == code) {
957,283!
2172
    *pPhyNode = (SPhysiNode*)pAgg;
957,283✔
2173
  } else {
2174
    nodesDestroyNode((SNode*)pAgg);
×
2175
  }
2176

2177
  nodesDestroyList(pPrecalcExprs);
957,283✔
2178
  nodesDestroyList(pGroupKeys);
957,278✔
2179
  nodesDestroyList(pAggFuncs);
957,283✔
2180

2181
  return code;
957,277✔
2182
}
2183

2184
static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
36,173✔
2185
                                            SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
2186
  SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode(
36,173✔
2187
      pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC);
2188
  if (NULL == pIdfRowsFunc) {
36,173!
2189
    return terrno;
×
2190
  }
2191

2192
  SNodeList* pPrecalcExprs = NULL;
36,173✔
2193
  SNodeList* pFuncs = NULL;
36,173✔
2194
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
36,173✔
2195

2196
  if (pIdfRowsFunc->node.inputTsOrder == 0) {
36,173✔
2197
    // default to asc
2198
    pIdfRowsFunc->node.inputTsOrder = TSDB_ORDER_ASC;
1,227✔
2199
  }
2200

2201
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
36,173✔
2202
  // push down expression to pOutputDataBlockDesc of child node
2203
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
36,173!
2204
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs);
132✔
2205
    if (TSDB_CODE_SUCCESS == code) {
132!
2206
      code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe);
132✔
2207
    }
2208
  }
2209

2210
  if (TSDB_CODE_SUCCESS == code) {
36,173!
2211
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pIdfRowsFunc->pFuncs);
36,173✔
2212
    if (TSDB_CODE_SUCCESS == code) {
36,173!
2213
      code = addDataBlockSlots(pCxt, pIdfRowsFunc->pFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc);
36,173✔
2214
    }
2215
  }
2216

2217
  if (TSDB_CODE_SUCCESS == code) {
36,173!
2218
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pIdfRowsFunc);
36,173✔
2219
  }
2220

2221
  if (TSDB_CODE_SUCCESS == code) {
36,173!
2222
    *pPhyNode = (SPhysiNode*)pIdfRowsFunc;
36,173✔
2223
  } else {
2224
    nodesDestroyNode((SNode*)pIdfRowsFunc);
×
2225
  }
2226

2227
  nodesDestroyList(pPrecalcExprs);
36,173✔
2228
  nodesDestroyList(pFuncs);
36,173✔
2229

2230
  return code;
36,173✔
2231
}
2232

2233
static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
14,480✔
2234
                                         SInterpFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
2235
  SInterpFuncPhysiNode* pInterpFunc = (SInterpFuncPhysiNode*)makePhysiNode(
14,480✔
2236
      pCxt, (SLogicNode*)pFuncLogicNode,
2237
      pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC : QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC);
14,480✔
2238
  if (NULL == pInterpFunc) {
14,487!
2239
    return terrno;
×
2240
  }
2241

2242
  SNodeList* pPrecalcExprs = NULL;
14,487✔
2243
  SNodeList* pFuncs = NULL;
14,487✔
2244
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
14,487✔
2245

2246
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
14,486✔
2247
  // push down expression to pOutputDataBlockDesc of child node
2248
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
14,487!
2249
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pInterpFunc->pExprs);
1,265✔
2250
    if (TSDB_CODE_SUCCESS == code) {
1,265!
2251
      code = pushdownDataBlockSlots(pCxt, pInterpFunc->pExprs, pChildTupe);
1,265✔
2252
    }
2253
  }
2254

2255
  if (TSDB_CODE_SUCCESS == code) {
14,487!
2256
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pInterpFunc->pFuncs);
14,487✔
2257
    if (TSDB_CODE_SUCCESS == code) {
14,486!
2258
      code = addDataBlockSlots(pCxt, pInterpFunc->pFuncs, pInterpFunc->node.pOutputDataBlockDesc);
14,486✔
2259
    }
2260
  }
2261

2262
  if (TSDB_CODE_SUCCESS == code) {
14,486!
2263
    pInterpFunc->timeRange = pFuncLogicNode->timeRange;
14,486✔
2264
    pInterpFunc->interval = pFuncLogicNode->interval;
14,486✔
2265
    pInterpFunc->fillMode = pFuncLogicNode->fillMode;
14,486✔
2266
    pInterpFunc->intervalUnit = pFuncLogicNode->intervalUnit;
14,486✔
2267
    pInterpFunc->precision = pFuncLogicNode->node.precision;
14,486✔
2268
    pInterpFunc->pFillValues = NULL;
14,486✔
2269
    pInterpFunc->rangeInterval = pFuncLogicNode->rangeInterval;
14,486✔
2270
    pInterpFunc->rangeIntervalUnit = pFuncLogicNode->rangeIntervalUnit;
14,486✔
2271
    code = nodesCloneNode(pFuncLogicNode->pFillValues, &pInterpFunc->pFillValues);
14,486✔
2272
  }
2273

2274
  if (TSDB_CODE_SUCCESS == code) {
14,486!
2275
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncLogicNode->pTimeSeries, &pInterpFunc->pTimeSeries);
14,486✔
2276
  }
2277

2278
  if (TSDB_CODE_SUCCESS == code) {
14,487!
2279
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pInterpFunc);
14,487✔
2280
  }
2281

2282
  if (pCxt->pPlanCxt->streamQuery) {
14,486✔
2283
    pInterpFunc->streamNodeOption = pFuncLogicNode->streamNodeOption;
134✔
2284
  }
2285

2286
  if (TSDB_CODE_SUCCESS == code) {
14,486!
2287
    *pPhyNode = (SPhysiNode*)pInterpFunc;
14,486✔
2288
  } else {
2289
    nodesDestroyNode((SNode*)pInterpFunc);
×
2290
  }
2291

2292
  nodesDestroyList(pPrecalcExprs);
14,486✔
2293
  nodesDestroyList(pFuncs);
14,487✔
2294

2295
  return code;
14,486✔
2296
}
2297

2298
static int32_t createForecastFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
41✔
2299
                                           SForecastFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
2300
  SForecastFuncPhysiNode* pForecastFunc =
2301
      (SForecastFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC);
41✔
2302
  if (NULL == pForecastFunc) {
41!
2303
    return terrno;
×
2304
  }
2305

2306
  SNodeList* pPrecalcExprs = NULL;
41✔
2307
  SNodeList* pFuncs = NULL;
41✔
2308
  int32_t    code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
41✔
2309

2310
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
41✔
2311
  // push down expression to pOutputDataBlockDesc of child node
2312
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
41!
2313
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pForecastFunc->pExprs);
×
2314
    if (TSDB_CODE_SUCCESS == code) {
×
2315
      code = pushdownDataBlockSlots(pCxt, pForecastFunc->pExprs, pChildTupe);
×
2316
    }
2317
  }
2318

2319
  if (TSDB_CODE_SUCCESS == code) {
41!
2320
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pForecastFunc->pFuncs);
41✔
2321
    if (TSDB_CODE_SUCCESS == code) {
41!
2322
      code = addDataBlockSlots(pCxt, pForecastFunc->pFuncs, pForecastFunc->node.pOutputDataBlockDesc);
41✔
2323
    }
2324
  }
2325

2326
  if (TSDB_CODE_SUCCESS == code) {
41!
2327
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pForecastFunc);
41✔
2328
  }
2329

2330
  if (TSDB_CODE_SUCCESS == code) {
41!
2331
    *pPhyNode = (SPhysiNode*)pForecastFunc;
41✔
2332
  } else {
2333
    nodesDestroyNode((SNode*)pForecastFunc);
×
2334
  }
2335

2336
  nodesDestroyList(pPrecalcExprs);
41✔
2337
  nodesDestroyList(pFuncs);
41✔
2338

2339
  return code;
41✔
2340
}
2341

2342
static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) {
759,335✔
2343
  if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
759,335✔
2344
    return false;
1,011✔
2345
  }
2346
  if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) {
758,324✔
2347
    return true;
611,185✔
2348
  }
2349
  if (1 != LIST_LENGTH(pProject->node.pChildren)) {
147,139!
2350
    return true;
66✔
2351
  }
2352
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
147,073✔
2353
  return DATA_ORDER_LEVEL_GLOBAL == pChild->resultDataOrder ? true : false;
147,073✔
2354
}
2355

2356
bool projectCouldMergeUnsortDataBlock(SProjectLogicNode* pProject) {
720,903✔
2357
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0);
720,903✔
2358
  if (DATA_ORDER_LEVEL_GLOBAL == pChild->resultDataOrder) {
720,918✔
2359
    return false;
385,620✔
2360
  }
2361
  if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
335,298!
2362
    return false;
×
2363
  }
2364
  if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) {
335,298✔
2365
    return true;
276,127✔
2366
  }
2367
  if (1 != LIST_LENGTH(pProject->node.pChildren)) {
59,171!
2368
    return true;
15,737✔
2369
  }
2370
  return false;
43,434✔
2371
}
2372

2373
static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
759,334✔
2374
                                      SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) {
2375
  SProjectPhysiNode* pProject =
2376
      (SProjectPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pProjectLogicNode, QUERY_NODE_PHYSICAL_PLAN_PROJECT);
759,334✔
2377
  if (NULL == pProject) {
759,335!
2378
    return terrno;
×
2379
  }
2380

2381
  pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode);
759,335✔
2382
  pProject->ignoreGroupId = pProjectLogicNode->ignoreGroupId;
759,335✔
2383
  pProject->inputIgnoreGroup = pProjectLogicNode->inputIgnoreGroup;
759,335✔
2384

2385
  int32_t code = TSDB_CODE_SUCCESS;
759,335✔
2386
  if (0 == LIST_LENGTH(pChildren)) {
759,335!
2387
    code = nodesCloneList(pProjectLogicNode->pProjections, &pProject->pProjections);
2,809✔
2388
  } else {
2389
    code = setListSlotId(pCxt, ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc->dataBlockId, -1,
756,525✔
2390
                         pProjectLogicNode->pProjections, &pProject->pProjections);
756,526✔
2391
  }
2392
  if (TSDB_CODE_SUCCESS == code) {
759,335!
2393
    code = addDataBlockSlotsForProject(pCxt, pProjectLogicNode->stmtName, pProject->pProjections,
759,335✔
2394
                                       pProject->node.pOutputDataBlockDesc);
2395
  }
2396
  if (TSDB_CODE_SUCCESS == code) {
759,334!
2397
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pProjectLogicNode, (SPhysiNode*)pProject);
759,334✔
2398
  }
2399

2400
  if (TSDB_CODE_SUCCESS == code) {
759,334!
2401
    *pPhyNode = (SPhysiNode*)pProject;
759,334✔
2402
  } else {
2403
    nodesDestroyNode((SNode*)pProject);
×
2404
  }
2405

2406
  return code;
759,334✔
2407
}
2408

2409
static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
692,426✔
2410
                                         SPhysiNode** pPhyNode) {
2411
  SExchangePhysiNode* pExchange =
2412
      (SExchangePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE);
692,426✔
2413
  if (NULL == pExchange) {
692,430!
2414
    return terrno;
×
2415
  }
2416

2417
  pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
692,430✔
2418
  pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
692,430✔
2419
  pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
692,430✔
2420

2421
  int32_t code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pExchange);
692,430✔
2422
  if (TSDB_CODE_SUCCESS == code) {
692,429!
2423
    *pPhyNode = (SPhysiNode*)pExchange;
692,429✔
2424
  } else {
2425
    nodesDestroyNode((SNode*)pExchange);
×
2426
  }
2427

2428
  return code;
692,430✔
2429
}
2430

2431
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
349✔
2432
                                                   SPhysiNode** pPhyNode) {
2433
  SScanPhysiNode* pScan =
2434
      (SScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pExchangeLogicNode, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
349✔
2435
  if (NULL == pScan) {
349!
2436
    return terrno;
×
2437
  }
2438

2439
  int32_t code = TSDB_CODE_SUCCESS;
349✔
2440

2441
  pScan->pScanCols = NULL;
349✔
2442
  code = nodesCloneList(pExchangeLogicNode->node.pTargets, &pScan->pScanCols);
349✔
2443

2444
  if (TSDB_CODE_SUCCESS == code) {
349!
2445
    code = sortScanCols(pScan->pScanCols);
349✔
2446
  }
2447

2448
  if (TSDB_CODE_SUCCESS == code) {
349!
2449
    code = sortScanCols(pScan->pScanCols);
349✔
2450
  }
2451
  if (TSDB_CODE_SUCCESS == code) {
349!
2452
    code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
349✔
2453
  }
2454
  if (TSDB_CODE_SUCCESS == code) {
349!
2455
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pScan);
349✔
2456
  }
2457
  if (TSDB_CODE_SUCCESS == code) {
349!
2458
    SStreamScanPhysiNode* pTableScan = (SStreamScanPhysiNode*)pScan;
349✔
2459
    pTableScan->triggerType = pCxt->pPlanCxt->triggerType;
349✔
2460
    tstrncpy(pTableScan->pStbFullName, pCxt->pPlanCxt->pStbFullName, TSDB_TABLE_FNAME_LEN);
349✔
2461
    tstrncpy(pTableScan->pWstartName, pCxt->pPlanCxt->pWstartName, TSDB_COL_NAME_LEN);
349✔
2462
    tstrncpy(pTableScan->pWendName, pCxt->pPlanCxt->pWendName, TSDB_COL_NAME_LEN);
349✔
2463
    tstrncpy(pTableScan->pGroupIdName, pCxt->pPlanCxt->pGroupIdName, TSDB_COL_NAME_LEN);
349✔
2464
    tstrncpy(pTableScan->pIsWindowFilledName, pCxt->pPlanCxt->pIsWindowFilledName, TSDB_COL_NAME_LEN);
349✔
2465
  }
2466

2467
  if (TSDB_CODE_SUCCESS == code) {
349!
2468
    *pPhyNode = (SPhysiNode*)pScan;
349✔
2469
  } else {
2470
    nodesDestroyNode((SNode*)pScan);
×
2471
  }
2472

2473
  return code;
349✔
2474
}
2475

2476
static int32_t createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
692,775✔
2477
                                       SPhysiNode** pPhyNode) {
2478
  if (pCxt->pPlanCxt->streamQuery) {
692,775✔
2479
    return createStreamScanPhysiNodeByExchange(pCxt, pExchangeLogicNode, pPhyNode);
349✔
2480
  } else {
2481
    return doCreateExchangePhysiNode(pCxt, pExchangeLogicNode, pPhyNode);
692,426✔
2482
  }
2483
}
2484

2485
static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowPhysiNode* pWindow,
124,060✔
2486
                                             SWindowLogicNode* pWindowLogicNode) {
2487
  pWindow->triggerType = pWindowLogicNode->triggerType;
124,060✔
2488
  pWindow->watermark = pWindowLogicNode->watermark;
124,060✔
2489
  pWindow->deleteMark = pWindowLogicNode->deleteMark;
124,060✔
2490
  pWindow->igExpired = pWindowLogicNode->igExpired;
124,060✔
2491
  if (pCxt->pPlanCxt->streamQuery) {
124,060✔
2492
    pWindow->destHasPrimaryKey = pCxt->pPlanCxt->destHasPrimaryKey;
1,487✔
2493
  }
2494
  pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true);
124,060✔
2495
  pWindow->node.inputTsOrder = pWindowLogicNode->node.inputTsOrder;
124,060✔
2496
  pWindow->node.outputTsOrder = pWindowLogicNode->node.outputTsOrder;
124,060✔
2497
  if (nodeType(pWindow) == QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL) {
124,060✔
2498
    pWindow->node.inputTsOrder = pWindowLogicNode->node.outputTsOrder;
11,860✔
2499
  }
2500
  pWindow->recalculateInterval = pWindowLogicNode->recalculateInterval;
124,060✔
2501

2502
  SNodeList* pPrecalcExprs = NULL;
124,060✔
2503
  SNodeList* pFuncs = NULL;
124,060✔
2504
  int32_t    code = rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
124,060✔
2505

2506
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
124,057✔
2507
  // push down expression to pOutputDataBlockDesc of child node
2508
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
124,055!
2509
    SNodeList* pOutput;
2510
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pOutput);
9,414✔
2511
    if (TSDB_CODE_SUCCESS == code) {
9,414!
2512
      code = addDataBlockSlots(pCxt, pOutput, pChildTupe);
9,414✔
2513
    }
2514
    if (TSDB_CODE_SUCCESS == code) {
9,414!
2515
      if (pWindow->pExprs == NULL) {
9,414✔
2516
        pWindow->pExprs = pOutput;
9,413✔
2517
      } else {
2518
        code = nodesListAppendList(pWindow->pExprs, pOutput);
1✔
2519
      }
2520
    }
2521
  }
2522

2523
  if (TSDB_CODE_SUCCESS == code) {
124,055!
2524
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pWindow->pTspk);
124,056✔
2525
  }
2526
  if (TSDB_CODE_SUCCESS == code && pWindowLogicNode->pTsEnd) {
124,059!
2527
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTsEnd, &pWindow->pTsEnd);
8,281✔
2528
  }
2529

2530
  if (TSDB_CODE_SUCCESS == code && NULL != pFuncs) {
124,059!
2531
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pWindow->pFuncs);
124,060✔
2532
    if (TSDB_CODE_SUCCESS == code) {
124,058!
2533
      code = addDataBlockSlots(pCxt, pWindow->pFuncs, pWindow->node.pOutputDataBlockDesc);
124,061✔
2534
    }
2535
  }
2536

2537
  if (TSDB_CODE_SUCCESS == code) {
124,051!
2538
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pWindowLogicNode, (SPhysiNode*)pWindow);
124,056✔
2539
  }
2540

2541
  nodesDestroyList(pPrecalcExprs);
124,049✔
2542
  nodesDestroyList(pFuncs);
124,053✔
2543

2544
  return code;
124,051✔
2545
}
2546

2547
static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) {
112,934✔
2548
  switch (windowAlgo) {
112,934!
2549
    case INTERVAL_ALGO_HASH:
91,744✔
2550
      return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
91,744✔
2551
    case INTERVAL_ALGO_MERGE:
11,860✔
2552
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
11,860✔
2553
    case INTERVAL_ALGO_STREAM_FINAL:
159✔
2554
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
159✔
2555
    case INTERVAL_ALGO_STREAM_SEMI:
159✔
2556
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
159✔
2557
    case INTERVAL_ALGO_STREAM_MID:
159✔
2558
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL;
159✔
2559
    case INTERVAL_ALGO_STREAM_SINGLE:
539✔
2560
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL;
539✔
2561
    case SESSION_ALGO_STREAM_FINAL:
31✔
2562
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION;
31✔
2563
    case SESSION_ALGO_STREAM_SEMI:
31✔
2564
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION;
31✔
2565
    case SESSION_ALGO_STREAM_SINGLE:
176✔
2566
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
176✔
2567
    case SESSION_ALGO_MERGE:
8,043✔
2568
      return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
8,043✔
2569
    case INTERVAL_ALGO_STREAM_CONTINUE_SINGLE:
35✔
2570
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL;
35✔
2571
    case INTERVAL_ALGO_STREAM_CONTINUE_FINAL:
×
2572
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_INTERVAL;
×
2573
    case INTERVAL_ALGO_STREAM_CONTINUE_SEMI:
×
2574
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_INTERVAL;
×
2575
    case SESSION_ALGO_STREAM_CONTINUE_SINGLE:
×
2576
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SESSION;
×
2577
    case SESSION_ALGO_STREAM_CONTINUE_FINAL:
×
2578
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_FINAL_SESSION;
×
2579
    case SESSION_ALGO_STREAM_CONTINUE_SEMI:
×
2580
      return QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_SEMI_SESSION;
×
2581
    default:
×
2582
      break;
×
2583
  }
2584
  return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
×
2585
}
2586

2587
static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
104,653✔
2588
                                       SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2589
  SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(
104,653✔
2590
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
2591
  if (NULL == pInterval) {
104,662!
2592
    return terrno;
×
2593
  }
2594

2595
  pInterval->interval = pWindowLogicNode->interval;
104,662✔
2596
  pInterval->offset = pWindowLogicNode->offset;
104,662✔
2597
  pInterval->sliding = pWindowLogicNode->sliding;
104,662✔
2598
  pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
104,662✔
2599
  pInterval->slidingUnit = pWindowLogicNode->slidingUnit;
104,662✔
2600
  pInterval->timeRange = pWindowLogicNode->timeRange;
104,662✔
2601

2602
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode);
104,662✔
2603
  if (TSDB_CODE_SUCCESS == code) {
104,648!
2604
    *pPhyNode = (SPhysiNode*)pInterval;
104,649✔
2605
  } else {
2606
    nodesDestroyNode((SNode*)pInterval);
×
2607
  }
2608

2609
  return code;
104,651✔
2610
}
2611

2612
static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
8,281✔
2613
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2614
  SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode(
8,281✔
2615
      pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo));
2616
  if (NULL == pSession) {
8,281!
2617
    return terrno;
×
2618
  }
2619

2620
  pSession->gap = pWindowLogicNode->sessionGap;
8,281✔
2621

2622
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pSession->window, pWindowLogicNode);
8,281✔
2623
  if (TSDB_CODE_SUCCESS == code) {
8,281!
2624
    *pPhyNode = (SPhysiNode*)pSession;
8,281✔
2625
  } else {
2626
    nodesDestroyNode((SNode*)pSession);
×
2627
  }
2628

2629
  return code;
8,281✔
2630
}
2631

2632
static int32_t createStateWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
7,551✔
2633
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2634
  ENodeType type = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE;
7,551✔
2635
  if (pCxt->pPlanCxt->streamQuery) {
7,551✔
2636
    if (pCxt->pPlanCxt->triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
106!
2637
      type = QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_STATE;
×
2638
    } else {
2639
      type = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
106✔
2640
    }
2641
  }
2642
  SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)makePhysiNode(
7,551✔
2643
      pCxt, (SLogicNode*)pWindowLogicNode, type);
2644
  if (NULL == pState) {
7,551!
2645
    return terrno;
×
2646
  }
2647

2648
  SNodeList* pPrecalcExprs = NULL;
7,551✔
2649
  SNode*     pStateKey = NULL;
7,551✔
2650
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pStateExpr, &pPrecalcExprs, &pStateKey);
7,551✔
2651

2652
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
7,551✔
2653
  // push down expression to pOutputDataBlockDesc of child node
2654
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
7,551!
2655
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pState->window.pExprs);
3,727✔
2656
    if (TSDB_CODE_SUCCESS == code) {
3,727!
2657
      code = addDataBlockSlots(pCxt, pState->window.pExprs, pChildTupe);
3,727✔
2658
    }
2659
  }
2660

2661
  if (TSDB_CODE_SUCCESS == code) {
7,551!
2662
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pStateKey, &pState->pStateKey);
7,551✔
2663
    // if (TSDB_CODE_SUCCESS == code) {
2664
    //   code = addDataBlockSlot(pCxt, &pState->pStateKey, pState->window.node.pOutputDataBlockDesc);
2665
    // }
2666
  }
2667

2668
  pState->trueForLimit = pWindowLogicNode->trueForLimit;
7,551✔
2669

2670
  if (TSDB_CODE_SUCCESS == code) {
7,551!
2671
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pState->window, pWindowLogicNode);
7,551✔
2672
  }
2673

2674
  if (TSDB_CODE_SUCCESS == code) {
7,551!
2675
    *pPhyNode = (SPhysiNode*)pState;
7,551✔
2676
  } else {
2677
    nodesDestroyNode((SNode*)pState);
×
2678
  }
2679

2680
  nodesDestroyList(pPrecalcExprs);
7,551✔
2681
  nodesDestroyNode(pStateKey);
7,551✔
2682

2683
  return code;
7,551✔
2684
}
2685

2686
static int32_t createEventWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
1,094✔
2687
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2688
  ENodeType type = QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT;
1,094✔
2689
  if (pCxt->pPlanCxt->streamQuery) {
1,094✔
2690
    if (pCxt->pPlanCxt->triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
32!
2691
      type = QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_EVENT;
×
2692
    } else {
2693
      type = QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT;
32✔
2694
    }
2695
  }
2696

2697
  SEventWinodwPhysiNode* pEvent = (SEventWinodwPhysiNode*)makePhysiNode(
1,094✔
2698
      pCxt, (SLogicNode*)pWindowLogicNode, type);
2699
  if (NULL == pEvent) {
1,094!
2700
    return terrno;
×
2701
  }
2702

2703
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
1,094✔
2704
  int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pStartCond, &pEvent->pStartCond);
1,094✔
2705
  if (TSDB_CODE_SUCCESS == code) {
1,094!
2706
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pEndCond, &pEvent->pEndCond);
1,094✔
2707
  }
2708
  pEvent->trueForLimit = pWindowLogicNode->trueForLimit;
1,094✔
2709
  if (TSDB_CODE_SUCCESS == code) {
1,094!
2710
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pEvent->window, pWindowLogicNode);
1,094✔
2711
  }
2712

2713
  if (TSDB_CODE_SUCCESS == code) {
1,094!
2714
    *pPhyNode = (SPhysiNode*)pEvent;
1,094✔
2715
  } else {
2716
    nodesDestroyNode((SNode*)pEvent);
×
2717
  }
2718

2719
  return code;
1,094✔
2720
}
2721

2722
static int32_t createCountWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
2,461✔
2723
                                          SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2724
  ENodeType type = QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT;
2,461✔
2725
  if (pCxt->pPlanCxt->streamQuery) {
2,461✔
2726
    if (pCxt->pPlanCxt->triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
60!
2727
      type = QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_COUNT;
×
2728
    } else {
2729
      type = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
60✔
2730
    }
2731
  }
2732

2733
  SCountWinodwPhysiNode* pCount = (SCountWinodwPhysiNode*)makePhysiNode(
2,461✔
2734
      pCxt, (SLogicNode*)pWindowLogicNode, type);
2735
  if (NULL == pCount) {
2,461!
2736
    return terrno;
×
2737
  }
2738
  pCount->windowCount = pWindowLogicNode->windowCount;
2,461✔
2739
  pCount->windowSliding = pWindowLogicNode->windowSliding;
2,461✔
2740

2741
  int32_t code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pCount->window, pWindowLogicNode);
2,461✔
2742
  if (TSDB_CODE_SUCCESS == code) {
2,461!
2743
    *pPhyNode = (SPhysiNode*)pCount;
2,461✔
2744
  } else {
2745
    nodesDestroyNode((SNode*)pCount);
×
2746
  }
2747

2748
  return code;
2,461✔
2749
}
2750

2751
static int32_t createAnomalyWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
12✔
2752
                                            SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
2753
  SAnomalyWindowPhysiNode* pAnomaly = (SAnomalyWindowPhysiNode*)makePhysiNode(
12✔
2754
      pCxt, (SLogicNode*)pWindowLogicNode,
2755
      (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY : QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY));
12!
2756
  if (NULL == pAnomaly) {
12!
2757
    return terrno;
×
2758
  }
2759

2760
  SNodeList* pPrecalcExprs = NULL;
12✔
2761
  SNode*     pAnomalyKey = NULL;
12✔
2762
  int32_t    code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pAnomalyExpr, &pPrecalcExprs, &pAnomalyKey);
12✔
2763

2764
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
12✔
2765
  // push down expression to pOutputDataBlockDesc of child node
2766
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
12!
2767
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAnomaly->window.pExprs);
×
2768
    if (TSDB_CODE_SUCCESS == code) {
×
2769
      code = addDataBlockSlots(pCxt, pAnomaly->window.pExprs, pChildTupe);
×
2770
    }
2771
  }
2772

2773
  if (TSDB_CODE_SUCCESS == code) {
12!
2774
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pAnomalyKey, &pAnomaly->pAnomalyKey);
12✔
2775
    // if (TSDB_CODE_SUCCESS == code) {
2776
    //   code = addDataBlockSlot(pCxt, &pAnomaly->pAnomalyKey, pAnomaly->window.node.pOutputDataBlockDesc);
2777
    // }
2778
  }
2779

2780
  tstrncpy(pAnomaly->anomalyOpt, pWindowLogicNode->anomalyOpt, sizeof(pAnomaly->anomalyOpt));
12✔
2781

2782
  if (TSDB_CODE_SUCCESS == code) {
12!
2783
    code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pAnomaly->window, pWindowLogicNode);
12✔
2784
  }
2785

2786
  if (TSDB_CODE_SUCCESS == code) {
12!
2787
    *pPhyNode = (SPhysiNode*)pAnomaly;
12✔
2788
  } else {
2789
    nodesDestroyNode((SNode*)pAnomaly);
×
2790
  }
2791

2792
  nodesDestroyList(pPrecalcExprs);
12✔
2793
  nodesDestroyNode(pAnomalyKey);
12✔
2794

2795
  return code;
12✔
2796
}
2797

2798
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
124,053✔
2799
                                     SPhysiNode** pPhyNode) {
2800
  switch (pWindowLogicNode->winType) {
124,053!
2801
    case WINDOW_TYPE_INTERVAL:
104,657✔
2802
      return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
104,657✔
2803
    case WINDOW_TYPE_SESSION:
8,281✔
2804
      return createSessionWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
8,281✔
2805
    case WINDOW_TYPE_STATE:
7,551✔
2806
      return createStateWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
7,551✔
2807
    case WINDOW_TYPE_EVENT:
1,094✔
2808
      return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
1,094✔
2809
    case WINDOW_TYPE_COUNT:
2,461✔
2810
      return createCountWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
2,461✔
2811
    case WINDOW_TYPE_ANOMALY:
12✔
2812
      return createAnomalyWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
12✔
2813
    default:
×
2814
      break;
×
2815
  }
2816
  return TSDB_CODE_FAILED;
×
2817
}
2818

2819
static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SSortLogicNode* pSortLogicNode,
243,428✔
2820
                                   SPhysiNode** pPhyNode) {
2821
  SSortPhysiNode* pSort = (SSortPhysiNode*)makePhysiNode(
243,428✔
2822
      pCxt, (SLogicNode*)pSortLogicNode,
2823
      pSortLogicNode->groupSort ? QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT : QUERY_NODE_PHYSICAL_PLAN_SORT);
243,428✔
2824
  if (NULL == pSort) {
243,428!
2825
    return terrno;
×
2826
  }
2827

2828
  SNodeList* pPrecalcExprs = NULL;
243,428✔
2829
  SNodeList* pSortKeys = NULL;
243,428✔
2830
  int32_t    code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
243,428✔
2831
  pSort->calcGroupId = pSortLogicNode->calcGroupId;
243,428✔
2832
  pSort->excludePkCol = pSortLogicNode->excludePkCol;
243,428✔
2833

2834
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
243,428✔
2835
  // push down expression to pOutputDataBlockDesc of child node
2836
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
243,428!
2837
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pSort->pExprs);
493✔
2838
    if (TSDB_CODE_SUCCESS == code) {
493!
2839
      code = pushdownDataBlockSlots(pCxt, pSort->pExprs, pChildTupe);
493✔
2840
    }
2841
  }
2842

2843
  if (TSDB_CODE_SUCCESS == code) {
243,428!
2844
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortKeys, &pSort->pSortKeys);
243,428✔
2845
  }
2846

2847
  if (TSDB_CODE_SUCCESS == code) {
243,428!
2848
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pSortLogicNode->node.pTargets, &pSort->pTargets);
243,428✔
2849
    if (TSDB_CODE_SUCCESS == code) {
243,428!
2850
      code = addDataBlockSlots(pCxt, pSort->pTargets, pSort->node.pOutputDataBlockDesc);
243,428✔
2851
    }
2852
  }
2853

2854
  if (TSDB_CODE_SUCCESS == code) {
243,428!
2855
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pSortLogicNode, (SPhysiNode*)pSort);
243,428✔
2856
  }
2857

2858
  if (TSDB_CODE_SUCCESS == code) {
243,428!
2859
    *pPhyNode = (SPhysiNode*)pSort;
243,428✔
2860
  } else {
2861
    nodesDestroyNode((SNode*)pSort);
×
2862
  }
2863

2864
  nodesDestroyList(pPrecalcExprs);
243,428✔
2865
  nodesDestroyList(pSortKeys);
243,428✔
2866

2867
  return code;
243,428✔
2868
}
2869

2870
static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* pChildren,
23,721✔
2871
                                            SPartitionLogicNode* pPartLogicNode, ENodeType type,
2872
                                            SPhysiNode** pPhyNode) {
2873
  SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pPartLogicNode, type);
23,721✔
2874
  if (NULL == pPart) {
23,721!
2875
    return terrno;
×
2876
  }
2877

2878
  SNodeList* pPrecalcExprs = NULL;
23,721✔
2879
  SNodeList* pPartitionKeys = NULL;
23,721✔
2880
  int32_t    code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
23,721✔
2881
  pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder;
23,721✔
2882

2883
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
23,721✔
2884
  // push down expression to pOutputDataBlockDesc of child node
2885
  if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
23,721!
2886
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pPart->pExprs);
1,476✔
2887
    if (TSDB_CODE_SUCCESS == code) {
1,476!
2888
      code = pushdownDataBlockSlots(pCxt, pPart->pExprs, pChildTupe);
1,476✔
2889
    }
2890
  }
2891

2892
  if (TSDB_CODE_SUCCESS == code) {
23,721!
2893
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartitionKeys, &pPart->pPartitionKeys);
23,721✔
2894
  }
2895

2896
  if (TSDB_CODE_SUCCESS == code) {
23,721!
2897
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->node.pTargets, &pPart->pTargets);
23,721✔
2898
    if (TSDB_CODE_SUCCESS == code) {
23,721!
2899
      code = addDataBlockSlots(pCxt, pPart->pTargets, pPart->node.pOutputDataBlockDesc);
23,721✔
2900
    }
2901
  }
2902

2903
  if (pPart->needBlockOutputTsOrder) {
23,721✔
2904
    SNode* node;
2905
    bool   found = false;
2,457✔
2906
    FOREACH(node, pPartLogicNode->node.pTargets) {
2,457!
2907
      if (nodeType(node) == QUERY_NODE_COLUMN) {
2,457!
2908
        SColumnNode* pCol = (SColumnNode*)node;
2,457✔
2909
        if (pCol->tableId == pPartLogicNode->pkTsColTbId && pCol->colId == pPartLogicNode->pkTsColId) {
2,457!
2910
          pPart->tsSlotId = pCol->slotId;
2,457✔
2911
          found = true;
2,457✔
2912
          break;
2,457✔
2913
        }
2914
      }
2915
    }
2916
    if (!found) code = TSDB_CODE_PLAN_INTERNAL_ERROR;
2,457!
2917
  }
2918

2919
  if (TSDB_CODE_SUCCESS == code) {
23,721!
2920
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart);
23,721✔
2921
  }
2922

2923
  if (TSDB_CODE_SUCCESS == code) {
23,721!
2924
    *pPhyNode = (SPhysiNode*)pPart;
23,721✔
2925
  } else {
2926
    nodesDestroyNode((SNode*)pPart);
×
2927
  }
2928

2929
  nodesDestroyList(pPrecalcExprs);
23,721✔
2930
  nodesDestroyList(pPartitionKeys);
23,721✔
2931

2932
  return code;
23,721✔
2933
}
2934

2935
static int32_t createStreamPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
279✔
2936
                                              SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
2937
  SStreamPartitionPhysiNode* pPart = NULL;
279✔
2938
  int32_t                    code = createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode,
279✔
2939
                                                                 QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, (SPhysiNode**)&pPart);
2940
  SDataBlockDescNode*        pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
279✔
2941
  if (TSDB_CODE_SUCCESS == code) {
279!
2942
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pTags, &pPart->pTags);
279✔
2943
  }
2944
  if (TSDB_CODE_SUCCESS == code) {
279!
2945
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pPartLogicNode->pSubtable, &pPart->pSubtable);
279✔
2946
  }
2947
  if (TSDB_CODE_SUCCESS == code) {
279!
2948
    *pPhyNode = (SPhysiNode*)pPart;
279✔
2949
  } else {
2950
    nodesDestroyNode((SNode*)pPart);
×
2951
  }
2952
  return code;
279✔
2953
}
2954

2955
static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
23,721✔
2956
                                        SPartitionLogicNode* pPartLogicNode, SPhysiNode** pPhyNode) {
2957
  if (pCxt->pPlanCxt->streamQuery) {
23,721✔
2958
    return createStreamPartitionPhysiNode(pCxt, pChildren, pPartLogicNode, pPhyNode);
279✔
2959
  }
2960
  return createPartitionPhysiNodeImpl(pCxt, pChildren, pPartLogicNode, QUERY_NODE_PHYSICAL_PLAN_PARTITION, pPhyNode);
23,442✔
2961
}
2962

2963
static int32_t createFillPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SFillLogicNode* pFillNode,
17,853✔
2964
                                   SPhysiNode** pPhyNode) {
2965
  SFillPhysiNode* pFill = (SFillPhysiNode*)makePhysiNode(
17,853✔
2966
      pCxt, (SLogicNode*)pFillNode,
2967
      pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL : QUERY_NODE_PHYSICAL_PLAN_FILL);
17,853✔
2968
  if (NULL == pFill) {
17,854!
2969
    return terrno;
×
2970
  }
2971

2972
  pFill->mode = pFillNode->mode;
17,854✔
2973
  pFill->timeRange = pFillNode->timeRange;
17,854✔
2974
  pFill->node.inputTsOrder = pFillNode->node.inputTsOrder;
17,854✔
2975

2976
  SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
17,854✔
2977
  int32_t code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillExprs, &pFill->pFillExprs);
17,854✔
2978
  if (TSDB_CODE_SUCCESS == code) {
17,854!
2979
    code = addDataBlockSlots(pCxt, pFill->pFillExprs, pFill->node.pOutputDataBlockDesc);
17,854✔
2980
  }
2981
  if (TSDB_CODE_SUCCESS == code) {
17,854!
2982
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pNotFillExprs, &pFill->pNotFillExprs);
17,854✔
2983
  }
2984
  if (TSDB_CODE_SUCCESS == code) {
17,854!
2985
    code = addDataBlockSlots(pCxt, pFill->pNotFillExprs, pFill->node.pOutputDataBlockDesc);
17,854✔
2986
  }
2987
  if (TSDB_CODE_SUCCESS == code && LIST_LENGTH(pFillNode->pFillNullExprs) > 0) {
17,853!
2988
    code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pFillNullExprs, &pFill->pFillNullExprs);
41✔
2989
    if (TSDB_CODE_SUCCESS == code) {
41!
2990
      code = addDataBlockSlots(pCxt, pFill->pFillNullExprs, pFill->node.pOutputDataBlockDesc);
41✔
2991
    }
2992
  }
2993

2994
  if (TSDB_CODE_SUCCESS == code) {
17,853!
2995
    code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pFillNode->pWStartTs, &pFill->pWStartTs);
17,853✔
2996
  }
2997
  if (TSDB_CODE_SUCCESS == code) {
17,854!
2998
    code = addDataBlockSlot(pCxt, &pFill->pWStartTs, pFill->node.pOutputDataBlockDesc);
17,854✔
2999
  }
3000

3001
  if (TSDB_CODE_SUCCESS == code && NULL != pFillNode->pValues) {
17,854!
3002
    code = nodesCloneNode(pFillNode->pValues, &pFill->pValues);
428✔
3003
  }
3004

3005
  if (TSDB_CODE_SUCCESS == code) {
17,854!
3006
    code = setConditionsSlotId(pCxt, (const SLogicNode*)pFillNode, (SPhysiNode*)pFill);
17,854✔
3007
  }
3008

3009
  if (TSDB_CODE_SUCCESS == code) {
17,854!
3010
    *pPhyNode = (SPhysiNode*)pFill;
17,854✔
3011
  } else {
3012
    nodesDestroyNode((SNode*)pFill);
×
3013
  }
3014

3015
  return code;
17,854✔
3016
}
3017

3018
static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge, int32_t idx) {
354,956✔
3019
  SExchangePhysiNode* pExchange = NULL;
354,956✔
3020
  int32_t             code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, (SNode**)&pExchange);
354,956✔
3021
  if (NULL == pExchange) {
354,956!
3022
    return code;
×
3023
  }
3024
  pExchange->srcStartGroupId = pMerge->srcGroupId + idx;
354,956✔
3025
  pExchange->srcEndGroupId = pMerge->srcGroupId + idx;
354,956✔
3026
  pExchange->singleChannel = true;
354,956✔
3027
  pExchange->node.pParent = (SPhysiNode*)pMerge;
354,956✔
3028
  pExchange->node.pOutputDataBlockDesc = NULL;
354,956✔
3029
  code = nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc, (SNode**)&pExchange->node.pOutputDataBlockDesc);
354,956✔
3030
  if (NULL == pExchange->node.pOutputDataBlockDesc) {
354,955!
3031
    nodesDestroyNode((SNode*)pExchange);
×
3032
    return code;
×
3033
  }
3034
  SNode* pSlot = NULL;
354,955✔
3035
  FOREACH(pSlot, pExchange->node.pOutputDataBlockDesc->pSlots) { ((SSlotDescNode*)pSlot)->output = true; }
2,096,077!
3036
  return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
354,955✔
3037
}
3038

3039
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SMergeLogicNode* pMergeLogicNode,
141,412✔
3040
                                    SPhysiNode** pPhyNode) {
3041
  int32_t          code = TSDB_CODE_SUCCESS;
141,412✔
3042
  SMergePhysiNode* pMerge =
3043
      (SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
141,412✔
3044
  if (NULL == pMerge) {
141,412!
3045
    return terrno;
×
3046
  }
3047

3048
  if (pMergeLogicNode->colsMerge) {
141,412✔
3049
    pMerge->type = MERGE_TYPE_COLUMNS;
343✔
3050
  } else if (pMergeLogicNode->needSort) {
141,069✔
3051
    pMerge->type = MERGE_TYPE_SORT;
140,936✔
3052
  } else {
3053
    pMerge->type = MERGE_TYPE_NON_SORT;
133✔
3054
  }
3055

3056
  pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
141,412✔
3057
  pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
141,412✔
3058
  pMerge->srcEndGroupId = pMergeLogicNode->srcEndGroupId;
141,412✔
3059
  pMerge->groupSort = pMergeLogicNode->groupSort;
141,412✔
3060
  pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
141,412✔
3061
  pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId;
141,412✔
3062

3063
  if (!pMergeLogicNode->colsMerge) {
141,412✔
3064
    PLAN_ERR_JRET(addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc));
141,069!
3065

3066
    for (int32_t j = 0; j < pMergeLogicNode->numOfSubplans; ++j) {
282,398✔
3067
      for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
496,285✔
3068
        PLAN_ERR_JRET(createExchangePhysiNodeByMerge(pMerge, j));
354,956!
3069
      }
3070
    }
3071

3072
    if (NULL != pMergeLogicNode->pMergeKeys) {
141,069✔
3073
      PLAN_ERR_JRET(setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
139,950!
3074
                                  &pMerge->pMergeKeys));
3075
    }
3076

3077
    PLAN_ERR_JRET(setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
141,069!
3078
                                &pMerge->pTargets));
3079
    PLAN_ERR_JRET(addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc));
141,069!
3080
  } else {
3081
    PLAN_ERR_JRET(setMultiBlockSlotId(pCxt, pChildren, false, pMergeLogicNode->node.pTargets, &pMerge->pTargets));
343!
3082
    PLAN_ERR_JRET(addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc));
343!
3083
  }
3084

3085
  *pPhyNode = (SPhysiNode*)pMerge;
141,412✔
3086
  return code;
141,412✔
3087
_return:
×
3088
  nodesDestroyNode((SNode*)pMerge);
×
3089
  return code;
×
3090
}
3091

3092
static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
5,062,053✔
3093
                                 SNodeList* pChildren, SPhysiNode** pPhyNode) {
3094
  switch (nodeType(pLogicNode)) {
5,062,053!
3095
    case QUERY_NODE_LOGIC_PLAN_SCAN:
1,852,767✔
3096
      return createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicNode, pPhyNode);
1,852,767✔
3097
    case QUERY_NODE_LOGIC_PLAN_JOIN:
145,136✔
3098
      return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
145,136✔
3099
    case QUERY_NODE_LOGIC_PLAN_AGG:
957,272✔
3100
      return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode, pSubplan);
957,272✔
3101
    case QUERY_NODE_LOGIC_PLAN_PROJECT:
759,334✔
3102
      return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
759,334✔
3103
    case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
692,775✔
3104
      return createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicNode, pPhyNode);
692,775✔
3105
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
124,053✔
3106
      return createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicNode, pPhyNode);
124,053✔
3107
    case QUERY_NODE_LOGIC_PLAN_SORT:
243,428✔
3108
      return createSortPhysiNode(pCxt, pChildren, (SSortLogicNode*)pLogicNode, pPhyNode);
243,428✔
3109
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
23,721✔
3110
      return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode);
23,721✔
3111
    case QUERY_NODE_LOGIC_PLAN_FILL:
17,854✔
3112
      return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode);
17,854✔
3113
    case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC:
36,173✔
3114
      return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
36,173✔
3115
    case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
14,481✔
3116
      return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode);
14,481✔
3117
    case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
41✔
3118
      return createForecastFuncPhysiNode(pCxt, pChildren, (SForecastFuncLogicNode*)pLogicNode, pPhyNode);
41✔
3119
    case QUERY_NODE_LOGIC_PLAN_MERGE:
141,412✔
3120
      return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode);
141,412✔
3121
    case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
18,964✔
3122
      return createGroupCachePhysiNode(pCxt, pChildren, (SGroupCacheLogicNode*)pLogicNode, pPhyNode);
18,964✔
3123
    case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
23,448✔
3124
      return createDynQueryCtrlPhysiNode(pCxt, pChildren, (SDynQueryCtrlLogicNode*)pLogicNode, pPhyNode, pSubplan);
23,448✔
3125
    case QUERY_NODE_LOGIC_PLAN_VIRTUAL_TABLE_SCAN:
11,252✔
3126
      return createVirtualTableScanPhysiNode(pCxt, pSubplan, pChildren, (SVirtualScanLogicNode*)pLogicNode, pPhyNode);
11,252✔
3127
    default:
×
3128
      break;
×
3129
  }
3130

3131
  return TSDB_CODE_FAILED;
×
3132
}
3133

3134
static int32_t createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubplan* pSubplan,
5,062,086✔
3135
                               SPhysiNode** pPhyNode) {
3136
  SNodeList* pChildren = NULL;
5,062,086✔
3137
  int32_t    code = nodesMakeList(&pChildren);
5,062,086✔
3138
  if (NULL == pChildren) {
5,062,148!
3139
    return code;
×
3140
  }
3141

3142
  SNode* pLogicChild;
3143
  FOREACH(pLogicChild, pLogicNode->pChildren) {
7,615,389✔
3144
    SPhysiNode* pChild = NULL;
2,553,233✔
3145
    code = createPhysiNode(pCxt, (SLogicNode*)pLogicChild, pSubplan, &pChild);
2,553,233✔
3146
    if (TSDB_CODE_SUCCESS == code) {
2,553,215!
3147
      code = nodesListStrictAppend(pChildren, (SNode*)pChild);
2,553,215✔
3148
    }
3149
    if (TSDB_CODE_SUCCESS != code) {
2,553,243✔
3150
      break;
2✔
3151
    }
3152
  }
3153

3154
  if (TSDB_CODE_SUCCESS == code) {
5,062,158✔
3155
    code = doCreatePhysiNode(pCxt, pLogicNode, pSubplan, pChildren, pPhyNode);
5,062,153✔
3156
  }
3157

3158
  if (TSDB_CODE_SUCCESS == code) {
5,062,027✔
3159
    if (LIST_LENGTH(pChildren) > 0) {
7,434,613!
3160
      (*pPhyNode)->pChildren = pChildren;
2,372,590✔
3161
      SNode* pChild;
3162
      FOREACH(pChild, (*pPhyNode)->pChildren) { ((SPhysiNode*)pChild)->pParent = (*pPhyNode); }
4,925,798✔
3163
    } else {
3164
      nodesDestroyList(pChildren);
2,689,433✔
3165
    }
3166
  } else {
3167
    nodesDestroyList(pChildren);
4✔
3168
  }
3169

3170
  return code;
5,062,116✔
3171
}
3172

3173
static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlocks, SDataSinkNode** pSink) {
9,880,096✔
3174
  SDataInserterNode* pInserter = NULL;
9,880,096✔
3175
  int32_t            code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT, (SNode**)&pInserter);
9,880,096✔
3176
  if (NULL == pInserter) {
9,905,636!
3177
    return code;
×
3178
  }
3179

3180
  pInserter->numOfTables = pBlocks->numOfTables;
9,905,636✔
3181
  pInserter->size = pBlocks->size;
9,905,636✔
3182
  pInserter->pData = pBlocks->pData;
9,905,636✔
3183
  pBlocks->pData = NULL;
9,905,636✔
3184

3185
  *pSink = (SDataSinkNode*)pInserter;
9,905,636✔
3186
  return TSDB_CODE_SUCCESS;
9,905,636✔
3187
}
3188

3189
static int32_t createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysiNode* pRoot, SDataSinkNode** pSink) {
2,451,021✔
3190
  SDataDispatcherNode* pDispatcher = NULL;
2,451,021✔
3191
  int32_t              code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&pDispatcher);
2,451,021✔
3192
  if (NULL == pDispatcher) {
2,451,063!
3193
    return code;
×
3194
  }
3195

3196
  pDispatcher->sink.pInputDataBlockDesc = NULL;
2,451,063✔
3197
  code = nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc, (SNode**)&pDispatcher->sink.pInputDataBlockDesc);
2,451,063✔
3198
  if (NULL == pDispatcher->sink.pInputDataBlockDesc) {
2,451,085!
3199
    nodesDestroyNode((SNode*)pDispatcher);
×
3200
    return code;
×
3201
  }
3202

3203
  *pSink = (SDataSinkNode*)pDispatcher;
2,451,085✔
3204
  return TSDB_CODE_SUCCESS;
2,451,085✔
3205
}
3206

3207
static int32_t makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** ppSubplan) {
12,387,607✔
3208
  SSubplan* pSubplan = NULL;
12,387,607✔
3209
  int32_t   code = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN, (SNode**)&pSubplan);
12,387,607✔
3210
  if (NULL == pSubplan) {
12,423,008!
3211
    return code;
×
3212
  }
3213
  pSubplan->id = pLogicSubplan->id;
12,423,008✔
3214
  pSubplan->subplanType = pLogicSubplan->subplanType;
12,423,008✔
3215
  pSubplan->level = pLogicSubplan->level;
12,423,008✔
3216
  pSubplan->rowsThreshold = 4096;
12,423,008✔
3217
  pSubplan->dynamicRowThreshold = false;
12,423,008✔
3218
  pSubplan->isView = pCxt->pPlanCxt->isView;
12,423,008✔
3219
  pSubplan->isAudit = pCxt->pPlanCxt->isAudit;
12,423,008✔
3220
  pSubplan->processOneBlock = pLogicSubplan->processOneBlock;
12,423,008✔
3221
  if (NULL != pCxt->pPlanCxt->pUser) {
12,423,008!
3222
    snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser);
12,428,855✔
3223
  }
3224
  *ppSubplan = pSubplan;
12,423,008✔
3225
  return code;
12,423,008✔
3226
}
3227

3228
static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
9,894,392✔
3229
  pSubplan->msgType = pModify->msgType;
9,894,392✔
3230
  pSubplan->execNode.nodeId = pModify->pVgDataBlocks->vg.vgId;
9,894,392✔
3231
  pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
9,894,392✔
3232
  return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
9,894,392✔
3233
}
3234

3235
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan,
137✔
3236
                                   SDataSinkNode** pSink) {
3237
  SQueryInserterNode* pInserter = NULL;
137✔
3238
  int32_t             code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, (SNode**)&pInserter);
137✔
3239
  if (NULL == pInserter) {
137!
3240
    return code;
×
3241
  }
3242

3243
  pInserter->tableId = pModify->tableId;
137✔
3244
  pInserter->stableId = pModify->stableId;
137✔
3245
  pInserter->tableType = pModify->tableType;
137✔
3246
  tstrncpy(pInserter->tableName, pModify->tableName, TSDB_TABLE_NAME_LEN);
137✔
3247
  pInserter->explain = (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pPlanCxt->pAstRoot) ? true : false);
137✔
3248
  if (pModify->pVgroupList) {
137!
3249
    pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
137✔
3250
    pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
137✔
3251
    vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
137✔
3252
  }
3253
  code = setListSlotId(pCxt, pSubplan->pNode->pOutputDataBlockDesc->dataBlockId, -1, pModify->pInsertCols,
137✔
3254
                       &pInserter->pCols);
137✔
3255
  if (TSDB_CODE_SUCCESS == code) {
137!
3256
    pInserter->sink.pInputDataBlockDesc = NULL;
137✔
3257
    code = nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc, (SNode**)&pInserter->sink.pInputDataBlockDesc);
137✔
3258
  }
3259

3260
  if (TSDB_CODE_SUCCESS == code) {
137!
3261
    *pSink = (SDataSinkNode*)pInserter;
137✔
3262
  } else {
3263
    nodesDestroyNode((SNode*)pInserter);
×
3264
  }
3265

3266
  return code;
137✔
3267
}
3268

3269
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
137✔
3270
  int32_t code =
3271
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
137✔
3272
  if (TSDB_CODE_SUCCESS == code) {
137!
3273
    code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
137✔
3274
  }
3275
  pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
137✔
3276
  return code;
137✔
3277
}
3278

3279
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
9,880,746✔
3280
  if (NULL == pModify->node.pChildren) {
9,880,746!
3281
    return buildInsertValuesSubplan(pCxt, pModify, pSubplan);
9,895,973✔
3282
  }
UNCOV
3283
  return buildInsertSelectSubplan(pCxt, pModify, pSubplan);
×
3284
}
3285

3286
static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
55,317✔
3287
                                 SDataSinkNode** pSink) {
3288
  SDataDeleterNode* pDeleter = NULL;
55,317✔
3289
  int32_t           code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE, (SNode**)&pDeleter);
55,317✔
3290
  if (NULL == pDeleter) {
55,332!
3291
    return code;
×
3292
  }
3293

3294
  pDeleter->tableId = pModify->tableId;
55,332✔
3295
  pDeleter->tableType = pModify->tableType;
55,332✔
3296
  tstrncpy(pDeleter->tableFName, pModify->tableName, TSDB_TABLE_NAME_LEN);
55,332✔
3297
  tstrncpy(pDeleter->tsColName, pModify->tsColName, TSDB_COL_NAME_LEN);
55,332✔
3298
  pDeleter->deleteTimeRange = pModify->deleteTimeRange;
55,332✔
3299

3300
  code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pAffectedRows,
55,332✔
3301
                       &pDeleter->pAffectedRows);
55,332✔
3302
  if (TSDB_CODE_SUCCESS == code) {
55,333!
3303
    code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pStartTs, &pDeleter->pStartTs);
55,333✔
3304
  }
3305
  if (TSDB_CODE_SUCCESS == code) {
55,334!
3306
    code = setNodeSlotId(pCxt, pRoot->pOutputDataBlockDesc->dataBlockId, -1, pModify->pEndTs, &pDeleter->pEndTs);
55,334✔
3307
  }
3308
  if (TSDB_CODE_SUCCESS == code) {
55,334!
3309
    pDeleter->sink.pInputDataBlockDesc = NULL;
55,334✔
3310
    code = nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc, (SNode**)&pDeleter->sink.pInputDataBlockDesc);
55,334✔
3311
  }
3312

3313
  if (TSDB_CODE_SUCCESS == code) {
55,332!
3314
    *pSink = (SDataSinkNode*)pDeleter;
55,332✔
3315
  } else {
3316
    nodesDestroyNode((SNode*)pDeleter);
×
3317
  }
3318

3319
  return TSDB_CODE_SUCCESS;
55,332✔
3320
}
3321

3322
static int32_t buildDeleteSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
55,320✔
3323
  int32_t code =
3324
      createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
55,320✔
3325
  if (TSDB_CODE_SUCCESS == code) {
55,319!
3326
    code = createDataDeleter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
55,321✔
3327
  }
3328
  pSubplan->msgType = TDMT_VND_DELETE;
55,332✔
3329
  return code;
55,332✔
3330
}
3331

3332
static int32_t buildVnodeModifySubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pSubplan) {
9,927,347✔
3333
  int32_t                code = TSDB_CODE_SUCCESS;
9,927,347✔
3334
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
9,927,347✔
3335
  switch (pModify->modifyType) {
9,927,347!
3336
    case MODIFY_TABLE_TYPE_INSERT:
9,904,608✔
3337
      code = buildInsertSubplan(pCxt, pModify, pSubplan);
9,904,608✔
3338
      break;
9,917,150✔
3339
    case MODIFY_TABLE_TYPE_DELETE:
55,330✔
3340
      code = buildDeleteSubplan(pCxt, pModify, pSubplan);
55,330✔
3341
      break;
55,329✔
3342
    default:
×
3343
      code = TSDB_CODE_FAILED;
×
3344
      break;
×
3345
  }
3346
  return code;
9,939,888✔
3347
}
3348

3349
static int32_t createPhysiSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan** pPhysiSubplan) {
12,391,343✔
3350
  SSubplan* pSubplan = NULL;
12,391,343✔
3351
  int32_t   code = makeSubplan(pCxt, pLogicSubplan, &pSubplan);
12,391,343✔
3352
  if (NULL == pSubplan) {
12,435,927!
3353
    return code;
×
3354
  }
3355

3356
  if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) {
12,435,927✔
3357
    code = buildVnodeModifySubplan(pCxt, pLogicSubplan, pSubplan);
9,989,541✔
3358
  } else {
3359
    if (SUBPLAN_TYPE_SCAN == pSubplan->subplanType) {
2,446,386✔
3360
      pSubplan->msgType = TDMT_SCH_QUERY;
1,797,366✔
3361
    } else {
3362
      pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
649,020✔
3363
    }
3364
    code = createPhysiNode(pCxt, pLogicSubplan->pNode, pSubplan, &pSubplan->pNode);
2,446,386✔
3365
    if (TSDB_CODE_SUCCESS == code && !pCxt->pPlanCxt->streamQuery && !pCxt->pPlanCxt->topicQuery) {
2,453,422!
3366
      code = createDataDispatcher(pCxt, pSubplan->pNode, &pSubplan->pDataSink);
2,451,065✔
3367
    }
3368
  }
3369

3370
  if (TSDB_CODE_SUCCESS == code) {
12,393,750✔
3371
    *pPhysiSubplan = pSubplan;
12,393,748✔
3372
  } else {
3373
    nodesDestroyNode((SNode*)pSubplan);
2✔
3374
  }
3375

3376
  return code;
12,401,787✔
3377
}
3378

3379
static int32_t makeQueryPhysiPlan(SPhysiPlanContext* pCxt, SQueryPlan** ppQueryPlan) {
10,792,438✔
3380
  SQueryPlan* pPlan = NULL;
10,792,438✔
3381
  int32_t     code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN, (SNode**)&pPlan);
10,792,438✔
3382
  if (NULL == pPlan) {
10,801,139!
3383
    return code;
×
3384
  }
3385
  pPlan->pSubplans = NULL;
10,801,139✔
3386
  code = nodesMakeList(&pPlan->pSubplans);
10,801,139✔
3387
  if (NULL == pPlan->pSubplans) {
10,810,956!
UNCOV
3388
    nodesDestroyNode((SNode*)pPlan);
×
3389
    return code;
×
3390
  }
3391
  pPlan->queryId = pCxt->pPlanCxt->queryId;
10,812,044✔
3392
  *ppQueryPlan = pPlan;
10,812,044✔
3393
  return code;
10,812,044✔
3394
}
3395

3396
static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNode* pSubplan, int32_t level, SNodeList* pSubplans) {
12,382,772✔
3397
  SNodeListNode* pGroup = NULL;
12,382,772✔
3398
  if (level >= LIST_LENGTH(pSubplans)) {
12,382,772!
3399
    pGroup = NULL;
11,379,434✔
3400
    int32_t code = nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&pGroup);
11,379,434✔
3401
    if (NULL == pGroup) {
11,388,671!
3402
      return code;
×
3403
    }
3404
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pSubplans, (SNode*)pGroup)) {
11,388,671!
3405
      return TSDB_CODE_OUT_OF_MEMORY;
×
3406
    }
3407
  } else {
3408
    pGroup = (SNodeListNode*)nodesListGetNode(pSubplans, level);
1,003,338✔
3409
  }
3410
  if (NULL == pGroup->pNodeList) {
12,436,175✔
3411
    int32_t code = nodesMakeList(&pGroup->pNodeList);
11,404,314✔
3412
    if (NULL == pGroup->pNodeList) {
11,403,300!
3413
      return code;
×
3414
    }
3415
  }
3416
  return nodesListAppend(pGroup->pNodeList, (SNode*)pSubplan);
12,435,161✔
3417
}
3418

3419
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SSubplan* pParent,
12,402,323✔
3420
                              SQueryPlan* pQueryPlan) {
3421
  SSubplan* pSubplan = NULL;
12,402,323✔
3422
  int32_t   code = createPhysiSubplan(pCxt, pLogicSubplan, &pSubplan);
12,402,323✔
3423

3424
  if (TSDB_CODE_SUCCESS == code) {
12,405,302✔
3425
    code = pushSubplan(pCxt, (SNode*)pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans);
12,403,304✔
3426
    ++(pQueryPlan->numOfSubplans);
12,438,792✔
3427
  }
3428

3429
  if (TSDB_CODE_SUCCESS != code) {
12,440,790✔
3430
    nodesDestroyNode((SNode*)pSubplan);
2✔
3431
    return code;
2✔
3432
  }
3433

3434
  if (TSDB_CODE_SUCCESS == code && NULL != pParent) {
12,440,788!
3435
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pSubplan);
1,563,181✔
3436
    if (TSDB_CODE_SUCCESS == code) {
1,563,177!
3437
      code = nodesListMakeAppend(&pSubplan->pParents, (SNode*)pParent);
1,563,178✔
3438
    }
3439
  }
3440

3441
  if (TSDB_CODE_SUCCESS == code) {
12,426,748✔
3442
    SNode* pChild = NULL;
12,426,362✔
3443
    FOREACH(pChild, pLogicSubplan->pChildren) {
13,989,539✔
3444
      code = buildPhysiPlan(pCxt, (SLogicSubplan*)pChild, pSubplan, pQueryPlan);
1,563,181✔
3445
      if (TSDB_CODE_SUCCESS != code) {
1,563,177!
3446
        break;
×
3447
      }
3448
    }
3449
  }
3450

3451
  return code;
12,426,744✔
3452
}
3453

3454
static int32_t doCreatePhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPhysiPlan) {
10,795,281✔
3455
  SQueryPlan* pPlan = NULL;
10,795,281✔
3456
  int32_t     code = makeQueryPhysiPlan(pCxt, &pPlan);
10,795,281✔
3457
  if (NULL == pPlan) {
10,811,410!
3458
    return code;
×
3459
  }
3460

3461
  SNode* pSubplan = NULL;
10,811,410✔
3462
  FOREACH(pSubplan, pLogicPlan->pTopSubplans) {
21,673,004!
3463
    code = buildPhysiPlan(pCxt, (SLogicSubplan*)pSubplan, NULL, pPlan);
10,862,040✔
3464
    if (TSDB_CODE_SUCCESS != code) {
10,861,596✔
3465
      break;
2✔
3466
    }
3467
  }
3468

3469
  if (TSDB_CODE_SUCCESS == code) {
10,810,966✔
3470
    *pPhysiPlan = pPlan;
10,806,086✔
3471
  } else {
3472
    nodesDestroyNode((SNode*)pPlan);
4,880✔
3473
  }
3474

3475
  return code;
10,810,302✔
3476
}
3477

3478
static void destoryLocationHash(void* p) {
10,124,312✔
3479
  SHashObj*   pHash = *(SHashObj**)p;
10,124,312✔
3480
  SSlotIndex* pIndex = taosHashIterate(pHash, NULL);
10,124,312✔
3481
  while (NULL != pIndex) {
34,446,610✔
3482
    taosArrayDestroy(pIndex->pSlotIdsInfo);
24,322,268✔
3483
    pIndex = taosHashIterate(pHash, pIndex);
24,322,258✔
3484
  }
3485
  taosHashCleanup(pHash);
10,124,342✔
3486
}
10,124,371✔
3487

3488
static void destoryPhysiPlanContext(SPhysiPlanContext* pCxt) {
10,755,816✔
3489
  taosArrayDestroyEx(pCxt->pLocationHelper, destoryLocationHash);
10,755,816✔
3490
  taosArrayDestroyEx(pCxt->pProjIdxLocHelper, destoryLocationHash);
10,808,563✔
3491
}
10,814,092✔
3492

3493
static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
10,772,766✔
3494
  if (QUERY_NODE_EXPLAIN_STMT == nodeType(pCxt->pAstRoot)) {
10,772,766✔
3495
    SExplainStmt* pStmt = (SExplainStmt*)pCxt->pAstRoot;
125,442✔
3496
    pPlan->explainInfo.mode = pStmt->analyze ? EXPLAIN_MODE_ANALYZE : EXPLAIN_MODE_STATIC;
125,442✔
3497
    pPlan->explainInfo.verbose = pStmt->pOptions->verbose;
125,442✔
3498
    pPlan->explainInfo.ratio = pStmt->pOptions->ratio;
125,442✔
3499
  } else {
3500
    pPlan->explainInfo.mode = EXPLAIN_MODE_DISABLE;
10,647,324✔
3501
  }
3502
}
10,772,766✔
3503

3504
static int32_t setExecNodeList(SPhysiPlanContext* pCxt, SArray* pExecNodeList) {
10,781,221✔
3505
  int32_t code = 0;
10,781,221✔
3506
  if (NULL == pExecNodeList) {
10,781,221✔
3507
    return code;
2,023✔
3508
  }
3509
  if (pCxt->hasSysScan || !pCxt->hasScan) {
10,779,198✔
3510
    SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
9,859,710✔
3511
    if (NULL == taosArrayPush(pExecNodeList, &node)) code = terrno;
9,841,454!
3512
  }
3513
  return code;
10,760,942✔
3514
}
3515

3516
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
10,759,359✔
3517
  SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
32,371,906✔
3518
                           .errCode = TSDB_CODE_SUCCESS,
3519
                           .nextDataBlockId = 0,
3520
                           .pLocationHelper = taosArrayInit(32, POINTER_BYTES),
10,759,359✔
3521
                           .pProjIdxLocHelper = taosArrayInit(32, POINTER_BYTES),
10,801,711✔
3522
                           .hasScan = false,
3523
                           .hasSysScan = false};
3524
  if (NULL == cxt.pLocationHelper || !cxt.pProjIdxLocHelper) {
10,810,836!
3525
    taosArrayDestroy(cxt.pLocationHelper);
11,676✔
3526
    taosArrayDestroy(cxt.pProjIdxLocHelper);
×
3527
    return terrno;
×
3528
  }
3529

3530
  int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
10,799,160✔
3531
  if (TSDB_CODE_SUCCESS == code) {
10,803,775!
3532
    setExplainInfo(pCxt, *pPlan);
10,806,832✔
3533
    code = setExecNodeList(&cxt, pExecNodeList);
10,784,792✔
3534
  }
3535

3536
  destoryPhysiPlanContext(&cxt);
10,766,541✔
3537
  return code;
10,812,921✔
3538
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc