• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4932

19 Jan 2026 12:29PM UTC coverage: 66.646% (-0.1%) from 66.749%
#4932

push

travis-ci

web-flow
chore: upgrade taospy (#34272)

202981 of 304565 relevant lines covered (66.65%)

126831443.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.75
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "nodes.h"
18
#include "operator.h"
19
#include "os.h"
20
#include "plannodes.h"
21
#include "query.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tarray.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "thash.h"
28
#include "tmsg.h"
29
#include "trpc.h"
30
#include "ttypes.h"
31
#include "tdataformat.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,151,338✔
41
  SStbJoinTableList* pNext = NULL;
1,151,338✔
42
  
43
  while (pListHead) {
1,151,832✔
44
    taosMemoryFree(pListHead->pLeftVg);
494✔
45
    taosMemoryFree(pListHead->pLeftUid);
494✔
46
    taosMemoryFree(pListHead->pRightVg);
494✔
47
    taosMemoryFree(pListHead->pRightUid);
494✔
48
    pNext = pListHead->pNext;
494✔
49
    taosMemoryFree(pListHead);
494✔
50
    pListHead = pNext;
494✔
51
  }
52
}
1,151,338✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,151,338✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,151,338✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1,151,338✔
60
    if (pStbJoin->ctx.prev.leftHash) {
1,150,252✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,080,025✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,080,025✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1,150,252✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,080,025✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,080,025✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
1,086✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,086✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
1,086✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,086✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
1,086✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,086✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,151,338✔
81
}
1,151,338✔
82

83
void destroyColRefInfo(void *info) {
80,269,073✔
84
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
80,269,073✔
85
  if (pColRefInfo) {
80,269,073✔
86
    taosMemoryFree(pColRefInfo->colName);
80,269,073✔
87
    taosMemoryFree(pColRefInfo->colrefName);
80,269,073✔
88
  }
89
}
80,269,073✔
90

91
void destroyColRefArray(void *info) {
4,989,072✔
92
  SArray *pColRefArray = *(SArray **)info;
4,989,072✔
93
  if (pColRefArray) {
4,989,072✔
94
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
4,989,072✔
95
  }
96
}
4,989,072✔
97

98
void freeUseDbOutput(void* pOutput) {
1,450,471✔
99
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
1,450,471✔
100
  if (NULL == pOutput) {
1,450,471✔
101
    return;
×
102
  }
103

104
  if (pOut->dbVgroup) {
1,450,471✔
105
    freeVgInfo(pOut->dbVgroup);
1,450,471✔
106
  }
107
  taosMemFree(pOut);
1,450,471✔
108
}
109

110
void destroyOtbInfoArray(void *info) {
434,932✔
111
  SArray *pOtbInfoArray = *(SArray **)info;
434,932✔
112
  if (pOtbInfoArray) {
434,932✔
113
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
434,932✔
114
  }
115
}
434,932✔
116

117
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
36,672✔
118
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
36,672✔
119
  if (pOtbVgIdToOtbInfoArrayMap) {
36,672✔
120
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
36,672✔
121
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
36,672✔
122
  }
123
}
36,672✔
124

125
void destroyTagList(void *info) {
18,224✔
126
  SArray *pTagList = *(SArray **)info;
18,224✔
127
  if (pTagList) {
18,224✔
128
    taosArrayDestroyEx(pTagList, destroyTagVal);
18,224✔
129
  }
130
}
18,224✔
131

132
void destroyVtbUidTagListMap(void *info) {
×
133
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
×
134
  if (pVtbUidTagListMap) {
×
135
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
×
136
    taosHashCleanup(pVtbUidTagListMap);
×
137
  }
138
}
×
139

140
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
1,861,286✔
141
  if (pVtbScan->dbName) {
1,861,286✔
142
    taosMemoryFreeClear(pVtbScan->dbName);
1,861,286✔
143
  }
144
  if (pVtbScan->tbName) {
1,861,286✔
145
    taosMemoryFreeClear(pVtbScan->tbName);
1,861,286✔
146
  }
147
  if (pVtbScan->childTableList) {
1,861,286✔
148
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
1,861,286✔
149
  }
150
  if (pVtbScan->colRefInfo) {
1,861,286✔
151
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
152
    pVtbScan->colRefInfo = NULL;
×
153
  }
154
  if (pVtbScan->childTableMap) {
1,861,286✔
155
    taosHashCleanup(pVtbScan->childTableMap);
1,541,892✔
156
  }
157
  if (pVtbScan->readColList) {
1,861,286✔
158
    taosArrayDestroy(pVtbScan->readColList);
1,861,286✔
159
  }
160
  if (pVtbScan->dbVgInfoMap) {
1,861,286✔
161
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
1,861,286✔
162
    taosHashCleanup(pVtbScan->dbVgInfoMap);
1,861,286✔
163
  }
164
  if (pVtbScan->otbNameToOtbInfoMap) {
1,861,286✔
165
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
203,213✔
166
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
203,213✔
167
  }
168
  if (pVtbScan->pRsp) {
1,861,286✔
169
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
170
    taosMemoryFreeClear(pVtbScan->pRsp);
×
171
  }
172
  if (pVtbScan->existOrgTbVg) {
1,861,286✔
173
    taosHashCleanup(pVtbScan->existOrgTbVg);
1,861,286✔
174
  }
175
  if (pVtbScan->curOrgTbVg) {
1,861,286✔
176
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,508✔
177
  }
178
  if (pVtbScan->newAddedVgInfo) {
1,861,286✔
179
    taosHashCleanup(pVtbScan->newAddedVgInfo);
860✔
180
  }
181
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
1,861,286✔
182
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
193,421✔
183
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
193,421✔
184
  }
185
  if (pVtbScan->vtbUidToVgIdMapMap) {
1,861,286✔
186
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,556✔
187
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
4,556✔
188
  }
189
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
1,861,286✔
190
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,612✔
191
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
4,612✔
192
  }
193
  if (pVtbScan->vtbUidTagListMap) {
1,861,286✔
194
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
4,556✔
195
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
4,556✔
196
  }
197
  if (pVtbScan->vtbGroupIdTagListMap) {
1,861,286✔
198
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
4,612✔
199
  }
200
  if (pVtbScan->vtbUidToGroupIdMap) {
1,861,286✔
201
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
4,612✔
202
  }
203
}
1,861,286✔
204

205
void destroyWinArray(void *info) {
1,292,074✔
206
  SArray *pWinArray = *(SArray **)info;
1,292,074✔
207
  if (pWinArray) {
1,292,074✔
208
    taosArrayDestroy(pWinArray);
1,292,074✔
209
  }
210
}
1,292,074✔
211

212
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
465,145✔
213
  if (pVtbWindow->pRes) {
465,145✔
214
    blockDataDestroy(pVtbWindow->pRes);
465,145✔
215
  }
216
  if (pVtbWindow->pWins) {
465,145✔
217
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
465,145✔
218
  }
219
}
465,145✔
220

221
static void destroyDynQueryCtrlOperator(void* param) {
3,011,886✔
222
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
3,011,886✔
223

224
  switch (pDyn->qType) {
3,011,886✔
225
    case DYN_QTYPE_STB_HASH:
1,150,600✔
226
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,150,600✔
227
      break;
1,150,600✔
228
    case DYN_QTYPE_VTB_WINDOW:
465,145✔
229
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
465,145✔
230
    case DYN_QTYPE_VTB_AGG:
1,861,286✔
231
    case DYN_QTYPE_VTB_SCAN:
232
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
1,861,286✔
233
      break;
1,861,286✔
234
    default:
×
235
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
236
      break;
×
237
  }
238

239
  taosMemoryFreeClear(param);
3,011,886✔
240
}
3,011,886✔
241

242
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
243
  if (batchFetch) {
7,344,506✔
244
    return true;
7,340,162✔
245
  }
246
  
247
  if (rightTable) {
4,344✔
248
    return pPost->rightCurrUid == pPost->rightNextUid;
2,172✔
249
  }
250

251
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,172✔
252

253
  return (NULL == num) ? false : true;
2,172✔
254
}
255

256
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,672,253✔
257
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,672,253✔
258
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,672,253✔
259
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,672,253✔
260
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,672,253✔
261
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,672,253✔
262
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,672,253✔
263
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,672,253✔
264
  int64_t                    readIdx = pNode->readIdx + 1;
3,672,253✔
265
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,672,253✔
266

267
  pPost->leftCurrUid = *leftUid;
3,672,253✔
268
  pPost->rightCurrUid = *rightUid;
3,672,253✔
269

270
  pPost->leftVgId = *leftVgId;
3,672,253✔
271
  pPost->rightVgId = *rightVgId;
3,672,253✔
272

273
  while (true) {
274
    if (readIdx < pNode->uidNum) {
3,672,253✔
275
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,601,434✔
276
      break;
3,601,434✔
277
    }
278
    
279
    pNode = pNode->pNext;
70,819✔
280
    if (NULL == pNode) {
70,819✔
281
      pPost->rightNextUid = 0;
70,819✔
282
      break;
70,819✔
283
    }
284
    
285
    rightUid = pNode->pRightUid;
×
286
    readIdx = 0;
×
287
  }
288

289
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,344,506✔
290
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,344,506✔
291

292
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,672,253✔
293
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
294
    pStbJoin->execInfo.rightCacheNum++;
×
295
  }  
296

297
  return TSDB_CODE_SUCCESS;
3,672,253✔
298
}
299

300
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
6,254,587✔
301
  int32_t     code = TSDB_CODE_SUCCESS;
6,254,587✔
302
  int32_t     lino = 0;
6,254,587✔
303
  SOrgTbInfo* pTbInfo = NULL;
6,254,587✔
304

305
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
6,254,587✔
306

307
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
6,254,587✔
308
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
6,254,587✔
309

310
  pTbInfo->vgId = pSrc->vgId;
6,254,587✔
311
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
6,254,587✔
312

313
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
6,254,587✔
314
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
6,254,587✔
315

316
  *ppDst = pTbInfo;
6,254,587✔
317

318
  return code;
6,254,587✔
319
_return:
×
320
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
321
  if (pTbInfo) {
×
322
    if (pTbInfo->colMap) {
×
323
      taosArrayDestroy(pTbInfo->colMap);
×
324
    }
325
    taosMemoryFreeClear(pTbInfo);
×
326
  }
327
  return code;
×
328
}
329

330
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
27,336✔
331
  int32_t  code = TSDB_CODE_SUCCESS;
27,336✔
332
  int32_t  lino = 0;
27,336✔
333
  STagVal  tmpTag;
27,336✔
334

335
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
27,336✔
336
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
27,336✔
337

338
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
54,672✔
339
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
27,336✔
340
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
27,336✔
341
    tmpTag.type = pSrcTag->type;
27,336✔
342
    tmpTag.cid = pSrcTag->cid;
27,336✔
343
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
27,336✔
344
      tmpTag.nData = pSrcTag->nData;
×
345
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
×
346
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
×
347
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
×
348
    } else {
349
      tmpTag.i64 = pSrcTag->i64;
27,336✔
350
    }
351

352
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
54,672✔
353
    tmpTag = (STagVal){0};
27,336✔
354
  }
355

356
  return code;
27,336✔
357
_return:
×
358
  if (pBasic->tagList) {
×
359
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
360
    pBasic->tagList = NULL;
×
361
  }
362
  if (tmpTag.pData) {
×
363
    taosMemoryFree(tmpTag.pData);
×
364
  }
365
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
366
  return code;
×
367
}
368

369
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
461,404✔
370
  int32_t     code = TSDB_CODE_SUCCESS;
461,404✔
371
  int32_t     lino = 0;
461,404✔
372
  SOrgTbInfo  batchInfo;
461,404✔
373

374
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
461,404✔
375
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
461,404✔
376

377
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
1,939,799✔
378
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
1,478,395✔
379
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
1,478,395✔
380
    batchInfo.vgId = pSrc->vgId;
1,478,395✔
381
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
1,478,395✔
382
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
1,478,395✔
383
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
1,478,395✔
384
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
2,956,790✔
385
    batchInfo = (SOrgTbInfo){0};
1,478,395✔
386
  }
387

388
  return code;
461,404✔
389
_return:
×
390
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
391
  if (pBasic->batchOrgTbInfo) {
×
392
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
393
    pBasic->batchOrgTbInfo = NULL;
×
394
  }
395
  if (batchInfo.colMap) {
×
396
    taosArrayDestroy(batchInfo.colMap);
×
397
    batchInfo.colMap = NULL;
×
398
  }
399
  return code;
×
400
}
401

402
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,344,506✔
403
  int32_t code = TSDB_CODE_SUCCESS;
7,344,506✔
404
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,344,506✔
405
  if (NULL == *ppRes) {
7,344,506✔
406
    code = terrno;
×
407
    freeOperatorParam(pChild, OP_GET_PARAM);
×
408
    return code;
×
409
  }
410
  if (pChild) {
7,344,506✔
411
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
144,798✔
412
    if (NULL == (*ppRes)->pChildren) {
144,798✔
413
      code = terrno;
×
414
      freeOperatorParam(pChild, OP_GET_PARAM);
×
415
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
416
      *ppRes = NULL;
×
417
      return code;
×
418
    }
419
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
289,596✔
420
      code = terrno;
×
421
      freeOperatorParam(pChild, OP_GET_PARAM);
×
422
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
423
      *ppRes = NULL;
×
424
      return code;
×
425
    }
426
  } else {
427
    (*ppRes)->pChildren = NULL;
7,199,708✔
428
  }
429

430
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,344,506✔
431
  if (NULL == pGc) {
7,344,506✔
432
    code = terrno;
×
433
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
434
    *ppRes = NULL;
×
435
    return code;
×
436
  }
437

438
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,344,506✔
439
  pGc->downstreamIdx = downstreamIdx;
7,344,506✔
440
  pGc->vgId = vgId;
7,344,506✔
441
  pGc->tbUid = tbUid;
7,344,506✔
442
  pGc->needCache = needCache;
7,344,506✔
443

444
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,344,506✔
445
  (*ppRes)->downstreamIdx = downstreamIdx;
7,344,506✔
446
  (*ppRes)->value = pGc;
7,344,506✔
447
  (*ppRes)->reUse = false;
7,344,506✔
448

449
  return TSDB_CODE_SUCCESS;
7,344,506✔
450
}
451

452

453
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
454
  int32_t code = TSDB_CODE_SUCCESS;
×
455
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
456
  if (NULL == *ppRes) {
×
457
    return terrno;
×
458
  }
459
  (*ppRes)->pChildren = NULL;
×
460

461
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
462
  if (NULL == pGc) {
×
463
    code = terrno;
×
464
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
465
    return code;
×
466
  }
467

468
  pGc->downstreamIdx = downstreamIdx;
×
469
  pGc->vgId = vgId;
×
470
  pGc->tbUid = tbUid;
×
471

472
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
473
  (*ppRes)->downstreamIdx = downstreamIdx;
×
474
  (*ppRes)->value = pGc;
×
475
  (*ppRes)->reUse = false;
×
476

477
  return TSDB_CODE_SUCCESS;
×
478
}
479

480
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
11,948,506✔
481
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
482
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
483
                                               SArray* pOrgTbInfoArray, STimeWindow window,
484
                                               SDownstreamSourceNode* pDownstreamSourceNode,
485
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
486
  int32_t code = TSDB_CODE_SUCCESS;
11,948,506✔
487
  int32_t lino = 0;
11,948,506✔
488

489
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
11,948,506✔
490
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
491

492
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
11,948,506✔
493
  pBasic->srcOpType = srcOpType;
11,948,506✔
494
  pBasic->vgId = vgId;
11,948,506✔
495
  pBasic->groupid = groupId;
11,948,506✔
496
  pBasic->window = window;
11,948,506✔
497
  pBasic->tableSeq = tableSeq;
11,948,506✔
498
  pBasic->type = exchangeType;
11,948,506✔
499
  pBasic->isNewParam = isNewParam;
11,948,506✔
500

501
  if (pDownstreamSourceNode) {
11,948,506✔
502
    pBasic->isNewDeployed = true;
2,365✔
503
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,365✔
504
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,365✔
505
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,365✔
506
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,365✔
507
    pBasic->newDeployedSrc.localExec = false;
2,365✔
508
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,365✔
509
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,365✔
510
  } else {
511
    pBasic->isNewDeployed = false;
11,946,141✔
512
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
11,946,141✔
513
  }
514

515
  if (pUidList) {
11,948,506✔
516
    pBasic->uidList = taosArrayDup(pUidList, NULL);
4,532,282✔
517
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
4,532,282✔
518
  } else {
519
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
7,416,224✔
520
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
7,416,224✔
521
  }
522

523
  if (pOrgTbInfo) {
11,948,506✔
524
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
6,254,587✔
525
    QUERY_CHECK_CODE(code, lino, _return);
6,254,587✔
526
  } else {
527
    pBasic->orgTbInfo = NULL;
5,693,919✔
528
  }
529

530
  if (pTagList) {
11,948,506✔
531
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
27,336✔
532
    QUERY_CHECK_CODE(code, lino, _return);
27,336✔
533
  } else {
534
    pBasic->tagList = NULL;
11,921,170✔
535
  }
536

537
  if (pOrgTbInfoArray) {
11,948,506✔
538
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
461,404✔
539
    QUERY_CHECK_CODE(code, lino, _return);
461,404✔
540
  } else {
541
    pBasic->batchOrgTbInfo = NULL;
11,487,102✔
542
  }
543
  return code;
11,948,506✔
544

545
_return:
×
546
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
547
  freeExchangeGetBasicOperatorParam(pBasic);
×
548
  return code;
×
549
}
550

551
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
11,259,151✔
552
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
553
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
554
                                              SArray* pOrgTbInfoArray, STimeWindow window,
555
                                              SDownstreamSourceNode* pDownstreamSourceNode,
556
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
557

558
  int32_t                      code = TSDB_CODE_SUCCESS;
11,259,151✔
559
  int32_t                      lino = 0;
11,259,151✔
560
  SOperatorParam*              pParam = NULL;
11,259,151✔
561
  SExchangeOperatorParam*      pExc = NULL;
11,259,151✔
562

563
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
11,259,151✔
564
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
11,259,151✔
565

566
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
11,259,151✔
567
  pParam->downstreamIdx = downstreamIdx;
11,259,151✔
568
  pParam->reUse = reUse;
11,259,151✔
569
  pParam->pChildren = NULL;
11,259,151✔
570
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
11,259,151✔
571
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
11,259,151✔
572

573
  pExc = (SExchangeOperatorParam*)pParam->value;
11,259,151✔
574
  pExc->multiParams = false;
11,259,151✔
575

576
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
11,259,151✔
577
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
578
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
579

580
  *ppRes = pParam;
11,259,151✔
581
  return code;
11,259,151✔
582
_return:
×
583
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
584
  if (pParam) {
×
585
    freeOperatorParam(pParam, OP_GET_PARAM);
×
586
  }
587
  return code;
×
588
}
589

590
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,344✔
591
  int32_t code = TSDB_CODE_SUCCESS;
4,344✔
592
  int32_t lino = 0;
4,344✔
593

594
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,344✔
595
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,344✔
596

597
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,344✔
598

599
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,344✔
600
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,344✔
601
  QUERY_CHECK_CODE(code, lino, _return);
4,344✔
602

603
_return:
4,344✔
604
  if (code) {
4,344✔
605
    qError("failed to build exchange operator param, code:%d", code);
×
606
  }
607
  taosArrayDestroy(pUidList);
4,344✔
608
  return code;
4,344✔
609
}
610

611
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
700,233✔
612
  int32_t                   code = TSDB_CODE_SUCCESS;
700,233✔
613
  int32_t                   lino = 0;
700,233✔
614

615
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VTB_WIN_SCAN,
700,233✔
616
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
617
  QUERY_CHECK_CODE(code, lino, _return);
700,233✔
618

619
  return code;
700,233✔
620
_return:
×
621
  qError("failed to build exchange operator param for external window, code:%d", code);
×
622
  return code;
×
623
}
624

625
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
4,299,987✔
626
  int32_t                      code = TSDB_CODE_SUCCESS;
4,299,987✔
627
  int32_t                      lino = 0;
4,299,987✔
628
  SArray*                      pUidList = NULL;
4,299,987✔
629

630
  pUidList = taosArrayInit(1, sizeof(int64_t));
4,299,987✔
631
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,299,987✔
632

633
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
4,299,987✔
634

635
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
4,299,987✔
636
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
4,299,987✔
637
  QUERY_CHECK_CODE(code, lino, _return);
4,299,987✔
638

639
_return:
4,299,987✔
640
  if (code) {
4,299,987✔
641
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
642
  }
643
  taosArrayDestroy(pUidList);
4,299,987✔
644
  return code;
4,299,987✔
645
}
646

647
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
6,254,587✔
648
                                                  SDownstreamSourceNode* pNewSource) {
649
  int32_t                      code = TSDB_CODE_SUCCESS;
6,254,587✔
650
  int32_t                      lino = 0;
6,254,587✔
651

652
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
6,254,587✔
653
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
6,254,587✔
654
  QUERY_CHECK_CODE(code, lino, _return);
6,254,587✔
655

656
  return code;
6,254,587✔
657
_return:
×
658
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
659
  return code;
×
660
}
661

662
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
129,276✔
663
  int32_t                       code = TSDB_CODE_SUCCESS;
129,276✔
664
  int32_t                       line = 0;
129,276✔
665
  SOperatorParam*               pParam = NULL;
129,276✔
666
  SExchangeOperatorBatchParam*  pExc = NULL;
129,276✔
667
  SExchangeOperatorBasicParam   basic = {0};
129,276✔
668

669
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
129,276✔
670
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
129,276✔
671

672
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
129,276✔
673
  pParam->downstreamIdx = downstreamIdx;
129,276✔
674
  pParam->reUse = false;
129,276✔
675
  pParam->pChildren = NULL;
129,276✔
676
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
129,276✔
677
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
129,276✔
678

679
  pExc = pParam->value;
129,276✔
680
  pExc->multiParams = true;
129,276✔
681
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
129,276✔
682
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
129,276✔
683

684
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
129,276✔
685

686
  int32_t iter = 0;
129,276✔
687
  void*   p = NULL;
129,276✔
688
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
357,227✔
689
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
227,951✔
690
    SArray*  pUidList = *(SArray**)p;
227,951✔
691

692
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
227,951✔
693
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
694
                                           pUidList, NULL, NULL, NULL,
695
                                           (STimeWindow){0}, NULL, false, false, false);
227,951✔
696
    QUERY_CHECK_CODE(code, line, _return);
227,951✔
697

698
    // already transferred to batch param, can free here
699
    taosArrayDestroy(pUidList);
227,951✔
700

701
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
227,951✔
702

703
    basic = (SExchangeOperatorBasicParam){0};
227,951✔
704
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
227,951✔
705
    *(SArray**)p = NULL;
227,951✔
706
  }
707
  *ppRes = pParam;
129,276✔
708

709
  return code;
129,276✔
710
  
711
_return:
×
712
  qError("failed to build batch exchange operator param, code:%d", code);
×
713
  freeOperatorParam(pParam, OP_GET_PARAM);
×
714
  freeExchangeGetBasicOperatorParam(&basic);
×
715
  return code;
×
716
}
717

718
static int32_t buildBatchExchangeOperatorParamForVirtual(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps, STimeWindow window, EExchangeSourceType type) {
247,941✔
719
  int32_t                       code = TSDB_CODE_SUCCESS;
247,941✔
720
  int32_t                       lino = 0;
247,941✔
721
  SOperatorParam*               pParam = NULL;
247,941✔
722
  SExchangeOperatorBatchParam*  pExc = NULL;
247,941✔
723
  SExchangeOperatorBasicParam   basic = {0};
247,941✔
724

725
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
247,941✔
726
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
247,941✔
727

728
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
247,941✔
729
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
247,941✔
730

731
  pExc = pParam->value;
247,941✔
732
  pExc->multiParams = true;
247,941✔
733

734
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
247,941✔
735
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
247,941✔
736
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
247,941✔
737

738
  size_t keyLen = 0;
247,941✔
739
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
247,941✔
740
  while (pIter != NULL) {
709,345✔
741
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
461,404✔
742
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
461,404✔
743

744
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
461,404✔
745
                                           type, *vgId, groupid,
746
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
747
                                           window, NULL, false, true, false);
748
    QUERY_CHECK_CODE(code, lino, _return);
461,404✔
749

750
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
461,404✔
751
    QUERY_CHECK_CODE(code, lino, _return);
461,404✔
752

753
    basic = (SExchangeOperatorBasicParam){0};
461,404✔
754
    pIter = taosHashIterate(pBatchMaps, pIter);
461,404✔
755
  }
756

757
  pParam->pChildren = NULL;
247,941✔
758
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
247,941✔
759
  pParam->downstreamIdx = downstreamIdx;
247,941✔
760
  pParam->reUse = false;
247,941✔
761

762
  *ppRes = pParam;
247,941✔
763
  return code;
247,941✔
764

765
_return:
×
766
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
767
  freeOperatorParam(pParam, OP_GET_PARAM);
×
768
  freeExchangeGetBasicOperatorParam(&basic);
×
769
  return code;
×
770
}
771

772
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,672,253✔
773
  int32_t code = TSDB_CODE_SUCCESS;
3,672,253✔
774
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,672,253✔
775
  if (NULL == *ppRes) {
3,672,253✔
776
    code = terrno;
×
777
    return code;
×
778
  }
779
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,672,253✔
780
  if (NULL == (*ppRes)->pChildren) {
3,672,253✔
781
    code = terrno;
×
782
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
783
    *ppRes = NULL;
×
784
    return code;
×
785
  }
786
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,344,506✔
787
    code = terrno;
×
788
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
789
    *ppRes = NULL;
×
790
    return code;
×
791
  }
792
  *ppChild0 = NULL;
3,672,253✔
793
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,344,506✔
794
    code = terrno;
×
795
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
796
    *ppRes = NULL;
×
797
    return code;
×
798
  }
799
  *ppChild1 = NULL;
3,672,253✔
800
  
801
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,672,253✔
802
  if (NULL == pJoin) {
3,672,253✔
803
    code = terrno;
×
804
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
805
    *ppRes = NULL;
×
806
    return code;
×
807
  }
808

809
  pJoin->initDownstream = initParam;
3,672,253✔
810
  
811
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,672,253✔
812
  (*ppRes)->value = pJoin;
3,672,253✔
813
  (*ppRes)->reUse = false;
3,672,253✔
814

815
  return TSDB_CODE_SUCCESS;
3,672,253✔
816
}
817

818
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
819
  int32_t code = TSDB_CODE_SUCCESS;
×
820
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
821
  if (NULL == *ppRes) {
×
822
    code = terrno;
×
823
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
824
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
825
    return code;
×
826
  }
827
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
828
  if (NULL == *ppRes) {
×
829
    code = terrno;
×
830
    taosMemoryFreeClear(*ppRes);
×
831
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
832
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
833
    return code;
×
834
  }
835
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
836
    code = terrno;
×
837
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
838
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
839
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
840
    *ppRes = NULL;
×
841
    return code;
×
842
  }
843
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
844
    code = terrno;
×
845
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
846
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
847
    *ppRes = NULL;
×
848
    return code;
×
849
  }
850
  
851
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
852
  (*ppRes)->value = NULL;
×
853
  (*ppRes)->reUse = false;
×
854

855
  return TSDB_CODE_SUCCESS;
×
856
}
857

858
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
11,178✔
859
  int32_t code = TSDB_CODE_SUCCESS;
11,178✔
860
  int32_t vgNum = tSimpleHashGetSize(pVg);
11,178✔
861
  if (vgNum <= 0 || vgNum > 1) {
11,178✔
862
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
863
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
864
  }
865

866
  int32_t iter = 0;
11,178✔
867
  void* p = NULL;
11,178✔
868
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
22,356✔
869
    SArray* pUidList = *(SArray**)p;
11,178✔
870

871
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
11,178✔
872
    if (code) {
11,178✔
873
      return code;
×
874
    }
875
    taosArrayDestroy(pUidList);
11,178✔
876
    *(SArray**)p = NULL;
11,178✔
877
  }
878
  
879
  return TSDB_CODE_SUCCESS;
11,178✔
880
}
881

882
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
883
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
884
  if (NULL == pUidList) {
×
885
    return terrno;
×
886
  }
887
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
888
    return terrno;
×
889
  }
890

891
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
892
  taosArrayDestroy(pUidList);
×
893
  if (code) {
×
894
    return code;
×
895
  }
896
  
897
  return TSDB_CODE_SUCCESS;
×
898
}
899

900
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,672,253✔
901
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,672,253✔
902
  SOperatorParam*             pSrcParam0 = NULL;
3,672,253✔
903
  SOperatorParam*             pSrcParam1 = NULL;
3,672,253✔
904
  SOperatorParam*             pGcParam0 = NULL;
3,672,253✔
905
  SOperatorParam*             pGcParam1 = NULL;  
3,672,253✔
906
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,672,253✔
907
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,672,253✔
908
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,672,253✔
909
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,672,253✔
910
  int32_t                     code = TSDB_CODE_SUCCESS;
3,672,253✔
911

912
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,672,253✔
913
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
914

915
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,672,253✔
916
  
917
  if (pInfo->stbJoin.basic.batchFetch) {
3,672,253✔
918
    if (pPrev->leftHash) {
3,670,081✔
919
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
70,227✔
920
      if (TSDB_CODE_SUCCESS == code) {
70,227✔
921
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
70,227✔
922
      }
923
      if (TSDB_CODE_SUCCESS == code) {
70,227✔
924
        tSimpleHashCleanup(pPrev->leftHash);
70,227✔
925
        tSimpleHashCleanup(pPrev->rightHash);
70,227✔
926
        pPrev->leftHash = NULL;
70,227✔
927
        pPrev->rightHash = NULL;
70,227✔
928
      }
929
    }
930
  } else {
931
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,172✔
932
    if (TSDB_CODE_SUCCESS == code) {
2,172✔
933
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,172✔
934
    }
935
  }
936

937
  bool initParam = pSrcParam0 ? true : false;
3,672,253✔
938
  if (TSDB_CODE_SUCCESS == code) {
3,672,253✔
939
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,672,253✔
940
    pSrcParam0 = NULL;
3,672,253✔
941
  }
942
  if (TSDB_CODE_SUCCESS == code) {
3,672,253✔
943
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,672,253✔
944
    pSrcParam1 = NULL;
3,672,253✔
945
  }
946
  if (TSDB_CODE_SUCCESS == code) {
3,672,253✔
947
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,672,253✔
948
  }
949
  if (TSDB_CODE_SUCCESS != code) {
3,672,253✔
950
    if (pSrcParam0) {
×
951
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
952
    }
953
    if (pSrcParam1) {
×
954
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
955
    }
956
    if (pGcParam0) {
×
957
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
958
    }
959
    if (pGcParam1) {
×
960
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
961
    }
962
    if (*ppParam) {
×
963
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
964
      *ppParam = NULL;
×
965
    }
966
  }
967
  
968
  return code;
3,672,253✔
969
}
970

971
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
4,299,987✔
972
  int32_t                   code = TSDB_CODE_SUCCESS;
4,299,987✔
973
  int32_t                   lino = 0;
4,299,987✔
974
  SVTableScanOperatorParam* pVScan = NULL;
4,299,987✔
975
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,299,987✔
976
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,299,987✔
977

978
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
4,299,987✔
979
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
4,299,987✔
980

981
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
4,299,987✔
982
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
4,299,987✔
983
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
4,299,987✔
984
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
4,299,987✔
985
  pVScan->uid = uid;
4,299,987✔
986
  pVScan->window = pInfo->vtbScan.window;
4,299,987✔
987

988
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
4,299,987✔
989
  (*ppRes)->downstreamIdx = 0;
4,299,987✔
990
  (*ppRes)->value = pVScan;
4,299,987✔
991
  (*ppRes)->reUse = false;
4,299,987✔
992

993
  return TSDB_CODE_SUCCESS;
4,299,987✔
994
_return:
×
995
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
996
  if (pVScan) {
×
997
    taosArrayDestroy(pVScan->pOpParamArray);
×
998
    taosMemoryFreeClear(pVScan);
×
999
  }
1000
  if (*ppRes) {
×
1001
    taosArrayDestroy((*ppRes)->pChildren);
×
1002
    taosMemoryFreeClear(*ppRes);
×
1003
  }
1004
  return code;
×
1005
}
1006

1007
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
700,233✔
1008
  int32_t                       code = TSDB_CODE_SUCCESS;
700,233✔
1009
  int32_t                       lino = 0;
700,233✔
1010
  SExternalWindowOperatorParam* pExtWinOp = NULL;
700,233✔
1011

1012
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
700,233✔
1013
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
700,233✔
1014

1015
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
700,233✔
1016
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
700,233✔
1017

1018
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
700,233✔
1019
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
700,233✔
1020

1021
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
700,233✔
1022
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
700,233✔
1023

1024
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
700,233✔
1025
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
700,233✔
1026

1027
  SOperatorParam* pExchangeOperator = NULL;
700,233✔
1028
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
700,233✔
1029
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
700,233✔
1030
  QUERY_CHECK_CODE(code, lino, _return);
700,233✔
1031
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
1,400,466✔
1032

1033
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
700,233✔
1034
  (*ppRes)->downstreamIdx = idx;
700,233✔
1035
  (*ppRes)->value = pExtWinOp;
700,233✔
1036
  (*ppRes)->reUse = false;
700,233✔
1037

1038
  return code;
700,233✔
1039
_return:
×
1040
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1041
  if (pExtWinOp) {
×
1042
    if (pExtWinOp->ExtWins) {
×
1043
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1044
    }
1045
    taosMemoryFree(pExtWinOp);
×
1046
  }
1047
  if (*ppRes) {
×
1048
    if ((*ppRes)->pChildren) {
×
1049
      taosArrayDestroy((*ppRes)->pChildren);
×
1050
    }
1051
    taosMemoryFree(*ppRes);
×
1052
    *ppRes = NULL;
×
1053
  }
1054
  return code;
×
1055
}
1056

1057
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
291,186✔
1058
                                       int32_t numOfDownstream, int32_t numOfWins) {
1059
  int32_t                   code = TSDB_CODE_SUCCESS;
291,186✔
1060
  int32_t                   lino = 0;
291,186✔
1061
  SMergeOperatorParam*      pMergeOp = NULL;
291,186✔
1062

1063
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
291,186✔
1064
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
291,186✔
1065

1066
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
291,186✔
1067
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
291,186✔
1068

1069
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
291,186✔
1070
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
291,186✔
1071

1072
  pMergeOp->winNum = numOfWins;
291,186✔
1073

1074
  for (int32_t i = 0; i < numOfDownstream; i++) {
991,419✔
1075
    SOperatorParam* pExternalWinParam = NULL;
700,233✔
1076
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
700,233✔
1077
    QUERY_CHECK_CODE(code, lino, _return);
700,233✔
1078
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
1,400,466✔
1079
  }
1080

1081
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
291,186✔
1082
  (*ppRes)->downstreamIdx = 0;
291,186✔
1083
  (*ppRes)->value = pMergeOp;
291,186✔
1084
  (*ppRes)->reUse = false;
291,186✔
1085

1086
  return TSDB_CODE_SUCCESS;
291,186✔
1087
_return:
×
1088
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1089
  if (pMergeOp) {
×
1090
    taosMemoryFree(pMergeOp);
×
1091
  }
1092
  if (*ppRes) {
×
1093
    if ((*ppRes)->pChildren) {
×
1094
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1095
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
1096
        if (pChildParam) {
×
1097
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
1098
          if (pExtWinOp) {
×
1099
            if (pExtWinOp->ExtWins) {
×
1100
              taosArrayDestroy(pExtWinOp->ExtWins);
×
1101
            }
1102
            taosMemoryFree(pExtWinOp);
×
1103
          }
1104
          taosMemoryFree(pChildParam);
×
1105
        }
1106
      }
1107
      taosArrayDestroy((*ppRes)->pChildren);
×
1108
    }
1109
    taosMemoryFree(*ppRes);
×
1110
    *ppRes = NULL;
×
1111
  }
1112
  return code;
×
1113
}
1114

1115
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
12,529✔
1116
  int32_t                   code = TSDB_CODE_SUCCESS;
12,529✔
1117
  int32_t                   lino = 0;
12,529✔
1118
  SOperatorParam*           pParam = NULL;
12,529✔
1119
  SOperatorParam*           pExchangeParam = NULL;
12,529✔
1120
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,529✔
1121
  bool                      freeExchange = false;
12,529✔
1122

1123
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
12,529✔
1124
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
12,529✔
1125

1126
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
12,529✔
1127
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
12,529✔
1128

1129
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
12,529✔
1130
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
12,529✔
1131

1132
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
12,529✔
1133
  QUERY_CHECK_CODE(code, lino, _return);
12,529✔
1134

1135
  freeExchange = true;
12,529✔
1136

1137
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
25,058✔
1138

1139
  freeExchange = false;
12,529✔
1140

1141
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
12,529✔
1142
  pParam->downstreamIdx = 0;
12,529✔
1143
  pParam->reUse = false;
12,529✔
1144

1145
  *ppRes = pParam;
12,529✔
1146

1147
  return code;
12,529✔
1148
_return:
×
1149
  if (freeExchange) {
×
1150
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1151
  }
1152
  if (pParam) {
×
1153
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1154
  }
1155
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1156
  return code;
×
1157
}
1158

1159
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
23,060✔
1160
  int32_t                   code = TSDB_CODE_SUCCESS;
23,060✔
1161
  int32_t                   lino = 0;
23,060✔
1162
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
23,060✔
1163
  SOperatorParam*           pParam = NULL;
23,060✔
1164
  SOperatorParam*           pExchangeParam = NULL;
23,060✔
1165
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
23,060✔
1166
  bool                      freeExchange = false;
23,060✔
1167
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
23,060✔
1168

1169
  if (!pIter) {
23,060✔
1170
    *ppRes = NULL;
×
1171
    return code;
×
1172
  }
1173

1174
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
23,060✔
1175

1176
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
23,060✔
1177
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
23,060✔
1178

1179
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
23,060✔
1180
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
23,060✔
1181

1182
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
23,060✔
1183
  QUERY_CHECK_CODE(code, lino, _return);
23,060✔
1184

1185
  freeExchange = true;
23,060✔
1186

1187
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
46,120✔
1188

1189
  freeExchange = false;
23,060✔
1190

1191
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
23,060✔
1192
  pParam->downstreamIdx = 0;
23,060✔
1193
  pParam->value = NULL;
23,060✔
1194
  pParam->reUse = false;
23,060✔
1195

1196
  *ppRes = pParam;
23,060✔
1197

1198
  return code;
23,060✔
1199
_return:
×
1200
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1201
  if (freeExchange) {
×
1202
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1203
  }
1204
  if (pParam) {
×
1205
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1206
  }
1207
  return code;
×
1208
}
1209

1210
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
18,224✔
1211
  int32_t                   code = TSDB_CODE_SUCCESS;
18,224✔
1212
  int32_t                   lino = 0;
18,224✔
1213
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
18,224✔
1214
  SOperatorParam*           pParam = NULL;
18,224✔
1215
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
18,224✔
1216
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
18,224✔
1217

1218
  if (pIter) {
18,224✔
1219
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
18,224✔
1220

1221
    code = buildBatchExchangeOperatorParamForVirtual(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
18,224✔
1222
    QUERY_CHECK_CODE(code, lino, _return);
18,224✔
1223

1224
    *ppRes = pParam;
18,224✔
1225
  } else {
1226
    *ppRes = NULL;
×
1227
  }
1228

1229
  return code;
18,224✔
1230
_return:
×
1231
  if (pParam) {
×
1232
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1233
  }
1234
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1235
  return code;
×
1236
}
1237

1238
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,672,253✔
1239
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,672,253✔
1240
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,672,253✔
1241
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,672,253✔
1242
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,672,253✔
1243
  SOperatorParam*            pParam = NULL;
3,672,253✔
1244
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,672,253✔
1245
  if (TSDB_CODE_SUCCESS != code) {
3,672,253✔
1246
    pOperator->pTaskInfo->code = code;
×
1247
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1248
  }
1249

1250
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,672,253✔
1251
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,672,253✔
1252
  if (*ppRes && (code == 0)) {
3,672,253✔
1253
    code = blockDataCheck(*ppRes);
206,576✔
1254
    if (code) {
206,576✔
1255
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1256
      pOperator->pTaskInfo->code = code;
×
1257
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1258
    }
1259
    pPost->isStarted = true;
206,576✔
1260
    pStbJoin->execInfo.postBlkNum++;
206,576✔
1261
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
206,576✔
1262
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
206,576✔
1263
  } else {
1264
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,465,677✔
1265
  }
1266
}
3,672,253✔
1267

1268

1269
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1270
  SOperatorParam* pGcParam = NULL;
×
1271
  SOperatorParam* pMergeJoinParam = NULL;
×
1272
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1273
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1274
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1275

1276
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1277

1278
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1279
  if (TSDB_CODE_SUCCESS != code) {
×
1280
    return code;
×
1281
  }
1282
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1283
  if (TSDB_CODE_SUCCESS != code) {
×
1284
    return code;
×
1285
  }
1286

1287
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1288
}
1289

1290
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,671,759✔
1291
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,671,759✔
1292
  int32_t code = 0;
3,671,759✔
1293
  
1294
  pPost->isStarted = false;
3,671,759✔
1295
  
1296
  if (pStbJoin->basic.batchFetch) {
3,671,759✔
1297
    return TSDB_CODE_SUCCESS;
3,669,587✔
1298
  }
1299
  
1300
  if (pPost->leftNeedCache) {
2,172✔
1301
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1302
    if (num && --(*num) <= 0) {
×
1303
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1304
      if (code) {
×
1305
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1306
        QRY_ERR_RET(code);
×
1307
      }
1308
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1309
    }
1310
  }
1311
  
1312
  if (!pPost->rightNeedCache) {
2,172✔
1313
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,172✔
1314
    if (NULL != v) {
2,172✔
1315
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1316
      if (code) {
×
1317
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1318
        QRY_ERR_RET(code);
×
1319
      }
1320
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1321
    }
1322
  }
1323

1324
  return TSDB_CODE_SUCCESS;
2,172✔
1325
}
1326

1327

1328
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1329
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
277,395✔
1330
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
277,395✔
1331
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
277,395✔
1332

1333
  if (!pPost->isStarted) {
277,395✔
1334
    return TSDB_CODE_SUCCESS;
71,313✔
1335
  }
1336
  
1337
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
206,082✔
1338
  
1339
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
206,082✔
1340
  if (NULL == *ppRes) {
206,082✔
1341
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
206,082✔
1342
    pPrev->pListHead->readIdx++;
206,082✔
1343
  } else {
1344
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1345
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1346
  }
1347

1348
  return TSDB_CODE_SUCCESS;
206,082✔
1349
}
1350

1351
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1352
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,344,114✔
1353
  if (NULL == ppArray) {
7,344,114✔
1354
    SArray* pArray = taosArrayInit(10, valSize);
239,129✔
1355
    if (NULL == pArray) {
239,129✔
1356
      return terrno;
×
1357
    }
1358
    if (NULL == taosArrayPush(pArray, pVal)) {
478,258✔
1359
      taosArrayDestroy(pArray);
×
1360
      return terrno;
×
1361
    }
1362
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
239,129✔
1363
      taosArrayDestroy(pArray);      
×
1364
      return terrno;
×
1365
    }
1366
    return TSDB_CODE_SUCCESS;
239,129✔
1367
  }
1368

1369
  if (NULL == taosArrayPush(*ppArray, pVal)) {
14,209,970✔
1370
    return terrno;
×
1371
  }
1372
  
1373
  return TSDB_CODE_SUCCESS;
7,104,985✔
1374
}
1375

1376
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1377
  int32_t code = TSDB_CODE_SUCCESS;
2,172✔
1378
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,172✔
1379
  if (NULL == pNum) {
2,172✔
1380
    uint32_t n = 1;
2,172✔
1381
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,172✔
1382
    if (code) {
2,172✔
1383
      return code;
×
1384
    }
1385
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,172✔
1386
    if (code) {
2,172✔
1387
      return code;
×
1388
    }
1389
    return TSDB_CODE_SUCCESS;
2,172✔
1390
  }
1391

1392
  switch (*pNum) {
×
1393
    case 0:
×
1394
      break;
×
1395
    case UINT32_MAX:
×
1396
      *pNum = 0;
×
1397
      break;
×
1398
    default:
×
1399
      if (1 == (*pNum)) {
×
1400
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1401
        if (code) {
×
1402
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1403
          QRY_ERR_RET(code);
×
1404
        }
1405
      }
1406
      (*pNum)++;
×
1407
      break;
×
1408
  }
1409
  
1410
  return TSDB_CODE_SUCCESS;
×
1411
}
1412

1413

1414
static void freeStbJoinTableList(SStbJoinTableList* pList) {
70,819✔
1415
  if (NULL == pList) {
70,819✔
1416
    return;
×
1417
  }
1418
  taosMemoryFree(pList->pLeftVg);
70,819✔
1419
  taosMemoryFree(pList->pLeftUid);
70,819✔
1420
  taosMemoryFree(pList->pRightVg);
70,819✔
1421
  taosMemoryFree(pList->pRightUid);
70,819✔
1422
  taosMemoryFree(pList);
70,819✔
1423
}
1424

1425
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
71,313✔
1426
  int32_t code = TSDB_CODE_SUCCESS;
71,313✔
1427
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
71,313✔
1428
  if (NULL == pNew) {
71,313✔
1429
    return terrno;
×
1430
  }
1431
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
71,313✔
1432
  if (NULL == pNew->pLeftVg) {
71,313✔
1433
    code = terrno;
×
1434
    freeStbJoinTableList(pNew);
×
1435
    return code;
×
1436
  }
1437
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
71,313✔
1438
  if (NULL == pNew->pLeftUid) {
71,313✔
1439
    code = terrno;
×
1440
    freeStbJoinTableList(pNew);
×
1441
    return code;
×
1442
  }
1443
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
71,313✔
1444
  if (NULL == pNew->pRightVg) {
71,313✔
1445
    code = terrno;
×
1446
    freeStbJoinTableList(pNew);
×
1447
    return code;
×
1448
  }
1449
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
71,313✔
1450
  if (NULL == pNew->pRightUid) {
71,313✔
1451
    code = terrno;
×
1452
    freeStbJoinTableList(pNew);
×
1453
    return code;
×
1454
  }
1455

1456
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
71,313✔
1457
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
71,313✔
1458
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
71,313✔
1459
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
71,313✔
1460

1461
  pNew->readIdx = 0;
71,313✔
1462
  pNew->uidNum = rows;
71,313✔
1463
  pNew->pNext = NULL;
71,313✔
1464
  
1465
  if (pCtx->pListTail) {
71,313✔
1466
    pCtx->pListTail->pNext = pNew;
×
1467
    pCtx->pListTail = pNew;
×
1468
  } else {
1469
    pCtx->pListHead = pNew;
71,313✔
1470
    pCtx->pListTail= pNew;
71,313✔
1471
  }
1472

1473
  return TSDB_CODE_SUCCESS;
71,313✔
1474
}
1475

1476
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
71,313✔
1477
  int32_t                    code = TSDB_CODE_SUCCESS;
71,313✔
1478
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
71,313✔
1479
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
71,313✔
1480
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
71,313✔
1481
  if (NULL == pVg0) {
71,313✔
1482
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1483
  }
1484
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
71,313✔
1485
  if (NULL == pVg1) {
71,313✔
1486
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1487
  }
1488
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
71,313✔
1489
  if (NULL == pUid0) {
71,313✔
1490
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1491
  }
1492
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
71,313✔
1493
  if (NULL == pUid1) {
71,313✔
1494
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1495
  }
1496

1497
  if (pStbJoin->basic.batchFetch) {
71,313✔
1498
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,742,284✔
1499
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,672,057✔
1500
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,672,057✔
1501
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,672,057✔
1502
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,672,057✔
1503

1504
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,672,057✔
1505
      if (TSDB_CODE_SUCCESS != code) {
3,672,057✔
1506
        break;
×
1507
      }
1508
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,672,057✔
1509
      if (TSDB_CODE_SUCCESS != code) {
3,672,057✔
1510
        break;
×
1511
      }
1512
    }
1513
  } else {
1514
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,258✔
1515
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,172✔
1516
    
1517
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,172✔
1518
      if (TSDB_CODE_SUCCESS != code) {
2,172✔
1519
        break;
×
1520
      }
1521
    }
1522
  }
1523

1524
  if (TSDB_CODE_SUCCESS == code) {
71,313✔
1525
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
71,313✔
1526
    if (TSDB_CODE_SUCCESS == code) {
71,313✔
1527
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
71,313✔
1528
    }
1529
  }
1530

1531
_return:
×
1532

1533
  if (TSDB_CODE_SUCCESS != code) {
71,313✔
1534
    pOperator->pTaskInfo->code = code;
×
1535
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1536
  }
1537
}
71,313✔
1538

1539

1540
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,151,092✔
1541
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,151,092✔
1542
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,151,092✔
1543

1544
  if (pStbJoin->basic.batchFetch) {
1,151,092✔
1545
    return;
1,150,006✔
1546
  }
1547

1548
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,086✔
1549
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,086✔
1550
    return;
1,086✔
1551
  }
1552

1553
  uint64_t* pUid = NULL;
×
1554
  int32_t iter = 0;
×
1555
  int32_t code = 0;
×
1556
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1557
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1558
    if (code) {
×
1559
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1560
    }
1561
  }
1562

1563
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1564
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1565

1566
/*
1567
  // debug only
1568
  iter = 0;
1569
  uint32_t* num = NULL;
1570
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1571
    A S S E R T(*num > 1);
1572
  }
1573
*/  
1574
}
1575

1576
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,151,092✔
1577
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,151,092✔
1578
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,151,092✔
1579

1580
  while (true) {
71,313✔
1581
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,222,405✔
1582
    if (NULL == pBlock) {
1,222,405✔
1583
      break;
1,151,092✔
1584
    }
1585

1586
    pStbJoin->execInfo.prevBlkNum++;
71,313✔
1587
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
71,313✔
1588
    
1589
    doBuildStbJoinTableHash(pOperator, pBlock);
71,313✔
1590
  }
1591

1592
  postProcessStbJoinTableHash(pOperator);
1,151,092✔
1593

1594
  pStbJoin->ctx.prev.joinBuild = true;
1,151,092✔
1595
}
1,151,092✔
1596

1597
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
277,395✔
1598
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
277,395✔
1599
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
277,395✔
1600
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
277,395✔
1601
  SStbJoinTableList*         pNode = pPrev->pListHead;
277,395✔
1602

1603
  while (pNode) {
3,813,891✔
1604
    if (pNode->readIdx >= pNode->uidNum) {
3,743,072✔
1605
      pPrev->pListHead = pNode->pNext;
70,819✔
1606
      freeStbJoinTableList(pNode);
70,819✔
1607
      pNode = pPrev->pListHead;
70,819✔
1608
      continue;
70,819✔
1609
    }
1610
    
1611
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,672,253✔
1612
    if (*ppRes) {
3,672,253✔
1613
      return TSDB_CODE_SUCCESS;
206,576✔
1614
    }
1615

1616
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,465,677✔
1617
    pPrev->pListHead->readIdx++;
3,465,677✔
1618
  }
1619

1620
  *ppRes = NULL;
70,819✔
1621
  setOperatorCompleted(pOperator);
70,819✔
1622

1623
  return TSDB_CODE_SUCCESS;
70,819✔
1624
}
1625

1626
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,357,174✔
1627
  if (pBlock) {
1,357,174✔
1628
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
206,576✔
1629
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
206,576✔
1630
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
206,576✔
1631

1632
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
208,748✔
1633
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,172✔
1634
        if (pSlot == NULL) {
2,172✔
1635
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1636
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1637
        }
1638
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,172✔
1639
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,172✔
1640
        if (code != TSDB_CODE_SUCCESS) {
2,172✔
1641
          return code;
×
1642
        }
1643
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,172✔
1644
        if (code != TSDB_CODE_SUCCESS) {
2,172✔
1645
          return code;
×
1646
        }
1647
      }
1648
    } else {
1649
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1650
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1651
    }
1652
  }
1653
  return TSDB_CODE_SUCCESS;
1,357,174✔
1654
}
1655

1656
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,384,662✔
1657
  int32_t                    code = TSDB_CODE_SUCCESS;
1,384,662✔
1658
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,384,662✔
1659
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,384,662✔
1660

1661
  QRY_PARAM_CHECK(pRes);
1,384,662✔
1662
  if (pOperator->status == OP_EXEC_DONE) {
1,384,662✔
1663
    return code;
27,488✔
1664
  }
1665

1666
  int64_t st = 0;
1,357,174✔
1667
  if (pOperator->cost.openCost == 0) {
1,357,174✔
1668
    st = taosGetTimestampUs();
1,150,600✔
1669
  }
1670

1671
  if (!pStbJoin->ctx.prev.joinBuild) {
1,357,174✔
1672
    buildStbJoinTableList(pOperator);
1,151,092✔
1673
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,151,092✔
1674
      setOperatorCompleted(pOperator);
1,079,779✔
1675
      goto _return;
1,079,779✔
1676
    }
1677
  }
1678

1679
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
277,395✔
1680
  if (*pRes) {
277,395✔
1681
    goto _return;
×
1682
  }
1683

1684
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
277,395✔
1685

1686
_return:
277,395✔
1687
  if (pOperator->cost.openCost == 0) {
1,357,174✔
1688
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,150,600✔
1689
  }
1690

1691
  if (code) {
1,357,174✔
1692
    qError("%s failed since %s", __func__, tstrerror(code));
×
1693
    pOperator->pTaskInfo->code = code;
×
1694
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1695
  } else {
1696
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,357,174✔
1697
  }
1698
  return code;
1,357,174✔
1699
}
1700

1701
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,450,471✔
1702
  int32_t                    lino = 0;
1,450,471✔
1703
  SOperatorInfo*             operator=(SOperatorInfo*) param;
1,450,471✔
1704
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
1,450,471✔
1705

1706
  if (TSDB_CODE_SUCCESS != code) {
1,450,471✔
1707
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1708
    if (operator->pTaskInfo->code != code) {
×
1709
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1710
             tstrerror(operator->pTaskInfo->code));
1711
    } else {
1712
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1713
    }
1714
    goto _return;
×
1715
  }
1716

1717
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
1,450,471✔
1718
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
1,450,471✔
1719

1720
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
1,450,471✔
1721
  QUERY_CHECK_CODE(code, lino, _return);
1,450,471✔
1722

1723
  taosMemoryFreeClear(pMsg->pData);
1,450,471✔
1724

1725
  code = tsem_post(&pScanResInfo->vtbScan.ready);
1,450,471✔
1726
  QUERY_CHECK_CODE(code, lino, _return);
1,450,471✔
1727

1728
  return code;
1,450,471✔
1729
_return:
×
1730
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1731
  return code;
×
1732
}
1733

1734
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
1,450,471✔
1735
  int32_t                    code = TSDB_CODE_SUCCESS;
1,450,471✔
1736
  int32_t                    lino = 0;
1,450,471✔
1737
  char*                      buf1 = NULL;
1,450,471✔
1738
  SUseDbReq*                 pReq = NULL;
1,450,471✔
1739
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
1,450,471✔
1740

1741
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
1,450,471✔
1742
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
1,450,471✔
1743
  code = tNameGetFullDbName(name, pReq->db);
1,450,471✔
1744
  QUERY_CHECK_CODE(code, lino, _return);
1,450,471✔
1745
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
1,450,471✔
1746
  buf1 = taosMemoryCalloc(1, contLen);
1,450,471✔
1747
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
1,450,471✔
1748
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
1,450,471✔
1749
  if (tempRes < 0) {
1,450,471✔
1750
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1751
  }
1752

1753
  // send the fetch remote task result request
1754
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,450,471✔
1755
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
1,450,471✔
1756

1757
  pMsgSendInfo->param = pOperator;
1,450,471✔
1758
  pMsgSendInfo->msgInfo.pData = buf1;
1,450,471✔
1759
  pMsgSendInfo->msgInfo.len = contLen;
1,450,471✔
1760
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
1,450,471✔
1761
  pMsgSendInfo->fp = dynProcessUseDbRsp;
1,450,471✔
1762
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
1,450,471✔
1763

1764
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
1,450,471✔
1765
  QUERY_CHECK_CODE(code, lino, _return);
1,450,471✔
1766

1767
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
1,450,471✔
1768
  QUERY_CHECK_CODE(code, lino, _return);
1,450,471✔
1769

1770
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
1,450,471✔
1771
  QUERY_CHECK_CODE(code, lino, _return);
1,450,471✔
1772

1773
_return:
1,450,471✔
1774
  if (code) {
1,450,471✔
1775
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1776
     taosMemoryFree(buf1);
×
1777
  }
1778
  taosMemoryFree(pReq);
1,450,471✔
1779
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
1,450,471✔
1780
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
1,450,471✔
1781
  return code;
1,450,471✔
1782
}
1783

1784
int dynVgInfoComp(const void* lp, const void* rp) {
2,891,468✔
1785
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
2,891,468✔
1786
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
2,891,468✔
1787
  if (pLeft->hashBegin < pRight->hashBegin) {
2,891,468✔
1788
    return -1;
2,891,468✔
1789
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1790
    return 1;
×
1791
  }
1792

1793
  return 0;
×
1794
}
1795

1796
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
7,639,276✔
1797
  if (NULL == dbInfo) {
7,639,276✔
1798
    return TSDB_CODE_SUCCESS;
×
1799
  }
1800

1801
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
7,639,276✔
1802
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
1,450,471✔
1803
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
1,450,471✔
1804
    if (NULL == dbInfo->vgArray) {
1,450,471✔
1805
      return terrno;
×
1806
    }
1807

1808
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
1,450,471✔
1809
    while (pIter) {
4,346,676✔
1810
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
5,792,410✔
1811
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1812
        return terrno;
×
1813
      }
1814

1815
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
2,896,205✔
1816
    }
1817

1818
    taosArraySort(dbInfo->vgArray, sort_func);
1,450,471✔
1819
  }
1820

1821
  return TSDB_CODE_SUCCESS;
7,639,276✔
1822
}
1823

1824
int32_t dynHashValueComp(void const* lp, void const* rp) {
11,895,039✔
1825
  uint32_t*    key = (uint32_t*)lp;
11,895,039✔
1826
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
11,895,039✔
1827

1828
  if (*key < pVg->hashBegin) {
11,895,039✔
1829
    return -1;
×
1830
  } else if (*key > pVg->hashEnd) {
11,895,039✔
1831
    return 1;
4,255,763✔
1832
  }
1833

1834
  return 0;
7,639,276✔
1835
}
1836

1837
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
7,639,276✔
1838
  int32_t code = 0;
7,639,276✔
1839
  int32_t lino = 0;
7,639,276✔
1840
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
7,639,276✔
1841
  QUERY_CHECK_CODE(code, lino, _return);
7,639,276✔
1842

1843
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
7,639,276✔
1844
  if (vgNum <= 0) {
7,639,276✔
1845
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1846
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1847
  }
1848

1849
  SVgroupInfo* vgInfo = NULL;
7,639,276✔
1850
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
7,639,276✔
1851
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
7,639,276✔
1852
  int32_t offset = (int32_t)strlen(tbFullName);
7,639,276✔
1853

1854
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
7,639,276✔
1855
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
15,278,552✔
1856
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
7,639,276✔
1857

1858
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
7,639,276✔
1859
  if (NULL == vgInfo) {
7,639,276✔
1860
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1861
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1862
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1863
  }
1864

1865
  *vgId = vgInfo->vgId;
7,639,276✔
1866

1867
_return:
7,639,276✔
1868
  return code;
7,639,276✔
1869
}
1870

1871
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
49,037,246✔
1872
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
49,037,246✔
1873
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
49,037,246✔
1874
  SArray *                   pColList = pVtbScan->readColList;
49,037,246✔
1875
  if (pVtbScan->scanAllCols) {
49,037,246✔
1876
    return true;
5,404,959✔
1877
  }
1878
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
225,701,817✔
1879
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
196,798,242✔
1880
      return true;
14,728,712✔
1881
    }
1882
  }
1883
  return false;
28,903,575✔
1884
}
1885

1886
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
20,159,089✔
1887
  int32_t                    code = TSDB_CODE_SUCCESS;
20,159,089✔
1888
  int32_t                    line = 0;
20,159,089✔
1889
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
20,159,089✔
1890
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
20,159,089✔
1891
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
20,159,089✔
1892
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
20,159,089✔
1893
  SUseDbOutput*              output = NULL;
20,159,089✔
1894
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
20,159,089✔
1895

1896
  QRY_PARAM_CHECK(dbVgInfo);
20,159,089✔
1897

1898
  if (find == NULL) {
20,159,089✔
1899
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
1,450,471✔
1900
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
1,450,471✔
1901
    QUERY_CHECK_CODE(code, line, _return);
1,450,471✔
1902
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
1,450,471✔
1903
    QUERY_CHECK_CODE(code, line, _return);
1,450,471✔
1904
  } else {
1905
    output = *find;
18,708,618✔
1906
  }
1907

1908
  *dbVgInfo = output->dbVgroup;
20,159,089✔
1909
  return code;
20,159,089✔
1910
_return:
×
1911
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1912
  freeUseDbOutput(output);
×
1913
  return code;
×
1914
}
1915

1916
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
20,159,089✔
1917
  int32_t     code = TSDB_CODE_SUCCESS;
20,159,089✔
1918
  int32_t     line = 0;
20,159,089✔
1919

1920
  const char *first_dot = strchr(colref, '.');
20,159,089✔
1921
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
20,159,089✔
1922

1923
  const char *second_dot = strchr(first_dot + 1, '.');
20,159,089✔
1924
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
20,159,089✔
1925

1926
  size_t db_len = first_dot - colref;
20,159,089✔
1927
  size_t table_len = second_dot - first_dot - 1;
20,159,089✔
1928
  size_t col_len = strlen(second_dot + 1);
20,159,089✔
1929

1930
  *refDb = taosMemoryMalloc(db_len + 1);
20,159,089✔
1931
  *refTb = taosMemoryMalloc(table_len + 1);
20,159,089✔
1932
  *refCol = taosMemoryMalloc(col_len + 1);
20,159,089✔
1933
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
20,159,089✔
1934
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
20,159,089✔
1935
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
20,159,089✔
1936

1937
  tstrncpy(*refDb, colref, db_len + 1);
20,159,089✔
1938
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
20,159,089✔
1939
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
20,159,089✔
1940

1941
  return TSDB_CODE_SUCCESS;
20,159,089✔
1942
_return:
×
1943
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1944
  if (*refDb) {
×
1945
    taosMemoryFree(*refDb);
×
1946
    *refDb = NULL;
×
1947
  }
1948
  if (*refTb) {
×
1949
    taosMemoryFree(*refTb);
×
1950
    *refTb = NULL;
×
1951
  }
1952
  if (*refCol) {
×
1953
    taosMemoryFree(*refCol);
×
1954
    *refCol = NULL;
×
1955
  }
1956
  return code;
×
1957
}
1958

1959
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
162,410,945✔
1960
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
162,410,945✔
1961
      strlen(expectTbName) == varDataLen(tbName) &&
80,269,073✔
1962
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
80,269,073✔
1963
      strlen(expectDbName) == varDataLen(dbName)) {
80,269,073✔
1964
    return true;
80,269,073✔
1965
  }
1966
  return false;
82,141,872✔
1967
}
1968

1969
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
80,269,073✔
1970
  int32_t          code = TSDB_CODE_SUCCESS;
80,269,073✔
1971
  int32_t          line = 0;
80,269,073✔
1972

1973
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
80,269,073✔
1974
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
80,269,073✔
1975
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
80,269,073✔
1976
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
80,269,073✔
1977
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
80,269,073✔
1978
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
80,269,073✔
1979

1980
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
80,269,073✔
1981
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
80,269,073✔
1982
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
80,269,073✔
1983
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
80,269,073✔
1984
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
80,269,073✔
1985
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
80,269,073✔
1986

1987
  if (colDataIsNull_s(pRefCol, index)) {
160,538,146✔
1988
    pInfo->colrefName = NULL;
31,225,592✔
1989
  } else {
1990
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
49,043,481✔
1991
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
49,043,481✔
1992
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
49,043,481✔
1993
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
49,043,481✔
1994
  }
1995

1996
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
80,269,073✔
1997
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
80,269,073✔
1998
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
80,269,073✔
1999
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
80,269,073✔
2000

2001
  if (!colDataIsNull_s(pUidCol, index)) {
160,538,146✔
2002
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
80,269,073✔
2003
  }
2004
  if (!colDataIsNull_s(pColIdCol, index)) {
160,538,146✔
2005
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
49,043,481✔
2006
  }
2007
  if (!colDataIsNull_s(pVgIdCol, index)) {
160,538,146✔
2008
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
80,269,073✔
2009
  }
2010

2011
_return:
×
2012
  return code;
80,269,073✔
2013
}
2014

2015
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,469,182✔
2016
  int32_t                    code = TSDB_CODE_SUCCESS;
1,469,182✔
2017
  int32_t                    line = 0;
1,469,182✔
2018

2019
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,469,182✔
2020
    return code;
1,339,303✔
2021
  }
2022

2023
  if (pVtbScan->existOrgTbVg == NULL) {
129,879✔
2024
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2025
    pVtbScan->curOrgTbVg = NULL;
×
2026
  }
2027

2028
  if (pVtbScan->curOrgTbVg != NULL) {
129,879✔
2029
    // which means rversion has changed
2030
    void*   pCurIter = NULL;
9,898✔
2031
    SArray* tmpArray = NULL;
9,898✔
2032
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
28,611✔
2033
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
18,713✔
2034
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
18,713✔
2035
        if (tmpArray == NULL) {
2,365✔
2036
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,365✔
2037
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,365✔
2038
        }
2039
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,365✔
2040
      }
2041
    }
2042
    if (tmpArray == NULL) {
9,898✔
2043
      return TSDB_CODE_SUCCESS;
7,533✔
2044
    }
2045
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,365✔
2046
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,365✔
2047
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,365✔
2048
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2049
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2050
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2051
        }
2052
        taosArrayDestroy(expiredInfo);
×
2053
      }
2054
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,365✔
2055
        taosArrayDestroy(tmpArray);
×
2056
      }
2057
    }
2058
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,365✔
2059
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,365✔
2060
    taosHashClear(pVtbScan->curOrgTbVg);
2,365✔
2061
    pVtbScan->needRedeploy = true;
2,365✔
2062
    pVtbScan->rversion = rversion;
2,365✔
2063
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,365✔
2064
  }
2065
  return code;
119,981✔
2066
_return:
×
2067
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2068
  return code;
×
2069
}
2070

2071
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
25,418✔
2072
  int32_t                    code =TSDB_CODE_SUCCESS;
25,418✔
2073
  int32_t                    line = 0;
25,418✔
2074
  char*                      refDbName = NULL;
25,418✔
2075
  char*                      refTbName = NULL;
25,418✔
2076
  char*                      refColName = NULL;
25,418✔
2077
  SDBVgInfo*                 dbVgInfo = NULL;
25,418✔
2078
  SName                      name = {0};
25,418✔
2079
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
25,418✔
2080
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
25,418✔
2081

2082
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
25,418✔
2083
  QUERY_CHECK_CODE(code, line, _return);
25,418✔
2084

2085
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
25,418✔
2086

2087
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
25,418✔
2088
  QUERY_CHECK_CODE(code, line, _return);
25,418✔
2089

2090
  code = tNameGetFullDbName(&name, dbFname);
25,418✔
2091
  QUERY_CHECK_CODE(code, line, _return);
25,418✔
2092

2093
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
25,418✔
2094
  QUERY_CHECK_CODE(code, line, _return);
25,418✔
2095

2096
_return:
25,418✔
2097
  if (code) {
25,418✔
2098
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2099
  }
2100
  taosMemoryFree(refDbName);
25,418✔
2101
  taosMemoryFree(refTbName);
25,418✔
2102
  taosMemoryFree(refColName);
25,418✔
2103
  return code;
25,418✔
2104
}
2105

2106
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
18,224✔
2107
  int32_t code = TSDB_CODE_SUCCESS;
18,224✔
2108
  int32_t line = 0;
18,224✔
2109
  STagVal tagVal = {0};
18,224✔
2110
  // last col is uid
2111

2112
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
18,224✔
2113
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
18,224✔
2114

2115
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
36,448✔
2116
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
18,224✔
2117
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
18,224✔
2118
    tagVal.type = pTagCol->info.type;
18,224✔
2119
    tagVal.cid = pTagCol->info.colId;
18,224✔
2120
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
36,448✔
2121
      char*   pData = colDataGetData(pTagCol, rowIdx);
18,224✔
2122
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
18,224✔
2123
        tagVal.nData = varDataLen(pData);
×
2124
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
×
2125
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
×
2126
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
×
2127
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2128
      } else {
2129
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
18,224✔
2130
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
36,448✔
2131
      }
2132
    } else {
2133
      tagVal.pData = NULL;
×
2134
      tagVal.nData = 0;
×
2135
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2136
    }
2137
    tagVal = (STagVal){0};
18,224✔
2138
  }
2139
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
18,224✔
2140
  QUERY_CHECK_CODE(code, line, _return);
18,224✔
2141

2142
  return code;
18,224✔
2143
_return:
×
2144
  if (tagVal.pData) {
×
2145
    taosMemoryFreeClear(tagVal.pData);
×
2146
  }
2147
  if (pTagList) {
×
2148
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2149
  }
2150
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2151
  return code;
×
2152
}
2153

2154
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
5,110,343✔
2155
  int32_t                    code = TSDB_CODE_SUCCESS;
5,110,343✔
2156
  int32_t                    line = 0;
5,110,343✔
2157
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
5,110,343✔
2158
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
5,110,343✔
2159
  SDBVgInfo*                 dbVgInfo = NULL;
5,110,343✔
2160

2161
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
85,367,376✔
2162
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
80,257,033✔
2163
    *uid = pKV->uid;
80,257,033✔
2164
    *vgId = pKV->vgId;
80,257,033✔
2165
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
80,257,033✔
2166
      char*   refDbName = NULL;
20,133,671✔
2167
      char*   refTbName = NULL;
20,133,671✔
2168
      char*   refColName = NULL;
20,133,671✔
2169
      SName   name = {0};
20,133,671✔
2170
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
20,133,671✔
2171
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
20,133,671✔
2172

2173
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
20,133,671✔
2174
      QUERY_CHECK_CODE(code, line, _return);
20,133,671✔
2175

2176
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
20,133,671✔
2177

2178
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
20,133,671✔
2179
      QUERY_CHECK_CODE(code, line, _return);
20,133,671✔
2180
      code = tNameGetFullDbName(&name, dbFname);
20,133,671✔
2181
      QUERY_CHECK_CODE(code, line, _return);
20,133,671✔
2182
      code = tNameGetFullTableName(&name, orgTbFName);
20,133,671✔
2183
      QUERY_CHECK_CODE(code, line, _return);
20,133,671✔
2184

2185
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
20,133,671✔
2186
      if (!pVal) {
20,133,671✔
2187
        SOrgTbInfo orgTbInfo = {0};
7,613,858✔
2188
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
7,613,858✔
2189
        QUERY_CHECK_CODE(code, line, _return);
7,613,858✔
2190
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
7,613,858✔
2191
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
7,613,858✔
2192
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
7,613,858✔
2193
        SColIdNameKV colIdNameKV = {0};
7,613,858✔
2194
        colIdNameKV.colId = pKV->colId;
7,613,858✔
2195
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
7,613,858✔
2196
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
15,227,716✔
2197
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
7,613,858✔
2198
        QUERY_CHECK_CODE(code, line, _return);
7,613,858✔
2199
      } else {
2200
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
12,519,813✔
2201
        SColIdNameKV colIdNameKV = {0};
12,519,813✔
2202
        colIdNameKV.colId = pKV->colId;
12,519,813✔
2203
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
12,519,813✔
2204
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
25,039,626✔
2205
      }
2206
      taosMemoryFree(refDbName);
20,133,671✔
2207
      taosMemoryFree(refTbName);
20,133,671✔
2208
      taosMemoryFree(refColName);
20,133,671✔
2209
    }
2210
  }
2211

2212
_return:
5,110,343✔
2213
  if (code) {
5,110,343✔
2214
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2215
  }
2216
  return code;
5,110,343✔
2217
}
2218

2219
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
4,556✔
2220
  int32_t                    code = TSDB_CODE_SUCCESS;
4,556✔
2221
  int32_t                    line = 0;
4,556✔
2222
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,556✔
2223
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,556✔
2224
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,556✔
2225
  SArray*                    pColRefArray = NULL;
4,556✔
2226
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
4,556✔
2227
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
4,556✔
2228

2229
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,556✔
2230
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
4,556✔
2231
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
4,556✔
2232
  if (hasPartition) {
4,556✔
2233
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
2234
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
×
2235
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
×
2236
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
×
2237
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
×
2238
  }
2239

2240
  while (true) {
9,112✔
2241
    SSDataBlock *pTagVal = NULL;
13,668✔
2242
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
13,668✔
2243
    QUERY_CHECK_CODE(code, line, _return);
13,668✔
2244
    if (pTagVal == NULL) {
13,668✔
2245
      break;
4,556✔
2246
    }
2247
    SHashObj *vtbUidTagListMap = NULL;
9,112✔
2248
    if (hasPartition) {
9,112✔
2249
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
2250
      if (pIter) {
×
2251
        vtbUidTagListMap = *(SHashObj**)pIter;
×
2252
      } else {
2253
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
2254
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
×
2255
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
×
2256

2257
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
×
2258
        QUERY_CHECK_CODE(code, line, _return);
×
2259
      }
2260
    } else {
2261
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
9,112✔
2262
    }
2263

2264
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
9,112✔
2265
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
9,112✔
2266
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
27,336✔
2267
      tb_uid_t uid = 0;
18,224✔
2268
      if (!colDataIsNull_s(pUidCol, i)) {
36,448✔
2269
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
18,224✔
2270
        QUERY_CHECK_CODE(code, line, _return);
18,224✔
2271
      }
2272

2273
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
18,224✔
2274
      QUERY_CHECK_CODE(code, line, _return);
18,224✔
2275

2276
      if (hasPartition) {
18,224✔
2277
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
2278
        QUERY_CHECK_CODE(code, line, _return);
×
2279
      }
2280
    }
2281
  }
2282

2283
  return code;
4,556✔
2284

2285
_return:
×
2286
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2287
  return code;
×
2288
}
2289

2290
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
4,556✔
2291
  int32_t                    code = TSDB_CODE_SUCCESS;
4,556✔
2292
  int32_t                    line = 0;
4,556✔
2293
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,556✔
2294
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,556✔
2295
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,556✔
2296
  SArray*                    pColRefArray = NULL;
4,556✔
2297
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
4,556✔
2298
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
4,556✔
2299

2300
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,556✔
2301
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
4,556✔
2302

2303
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
22,780✔
2304
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
18,224✔
2305
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
18,224✔
2306

2307
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
18,224✔
2308
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
18,224✔
2309

2310
    tb_uid_t uid = 0;
18,224✔
2311
    int32_t  vgId = 0;
18,224✔
2312
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
18,224✔
2313
    QUERY_CHECK_CODE(code, line, _return);
18,224✔
2314

2315
    size_t len = 0;
18,224✔
2316
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
18,224✔
2317
    while (pOrgTbInfo != NULL) {
59,228✔
2318
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
41,004✔
2319
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
41,004✔
2320

2321
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
41,004✔
2322
      if (!pIter) {
41,004✔
2323
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
27,336✔
2324
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
27,336✔
2325
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
54,672✔
2326
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
27,336✔
2327
        QUERY_CHECK_CODE(code, line, _return);
27,336✔
2328
      } else {
2329
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
13,668✔
2330
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
13,668✔
2331
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
13,668✔
2332
      }
2333

2334
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
41,004✔
2335

2336
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
41,004✔
2337
      QUERY_CHECK_CODE(code, line, _return);
41,004✔
2338
    }
2339

2340
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
18,224✔
2341
    QUERY_CHECK_CODE(code, line, _return);
18,224✔
2342
  }
2343

2344
  return code;
4,556✔
2345
_return:
×
2346
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2347
  return code;
×
2348
}
2349

2350
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
4,556✔
2351
  int32_t                    code = TSDB_CODE_SUCCESS;
4,556✔
2352
  int32_t                    line = 0;
4,556✔
2353

2354
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
4,556✔
2355
  QUERY_CHECK_CODE(code, line, _return);
4,556✔
2356

2357
  // process tag
2358
  code = getTagBlockAndProcess(pOperator, hasPartition);
4,556✔
2359
  QUERY_CHECK_CODE(code, line, _return);
4,556✔
2360

2361
  return code;
4,556✔
2362
_return:
×
2363
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2364
  return code;
×
2365
}
2366

2367
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
198,033✔
2368
  int32_t                    code = TSDB_CODE_SUCCESS;
198,033✔
2369
  int32_t                    line = 0;
198,033✔
2370
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
198,033✔
2371
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
198,033✔
2372
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
198,033✔
2373
  SArray*                    pColRefArray = NULL;
198,033✔
2374
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
198,033✔
2375
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
198,033✔
2376

2377
  if (hasPartition) {
198,033✔
2378
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,612✔
2379
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
4,612✔
2380
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
4,612✔
2381

2382
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
4,612✔
2383
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
4,612✔
2384
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
4,612✔
2385
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,612✔
2386
  } else {
2387
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
193,421✔
2388
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
193,421✔
2389
  }
2390

2391
  while (true && hasPartition) {
216,481✔
2392
    SSDataBlock* pTagVal = NULL;
23,060✔
2393
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
23,060✔
2394
    QUERY_CHECK_CODE(code, line, _return);
23,060✔
2395
    if (pTagVal == NULL) {
23,060✔
2396
      break;
4,612✔
2397
    }
2398

2399
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
18,448✔
2400
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
18,448✔
2401
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
36,896✔
2402
      tb_uid_t uid = 0;
18,448✔
2403
      if (!colDataIsNull_s(pUidCol, i)) {
36,896✔
2404
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
18,448✔
2405
        QUERY_CHECK_CODE(code, line, _return);
18,448✔
2406
      }
2407
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
18,448✔
2408
      QUERY_CHECK_CODE(code, line, _return);
18,448✔
2409
    }
2410
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
18,448✔
2411
    QUERY_CHECK_CODE(code, line, _return);
18,448✔
2412
  }
2413

2414
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
990,165✔
2415
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
792,132✔
2416
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
792,132✔
2417
    tb_uid_t uid = 0;
792,132✔
2418
    int32_t  vgId = 0;
792,132✔
2419
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
792,132✔
2420
    QUERY_CHECK_CODE(code, line, _return);
792,132✔
2421

2422
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
792,132✔
2423
    if (hasPartition) {
792,132✔
2424
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
18,448✔
2425
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
18,448✔
2426

2427
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
18,448✔
2428
      if (pHashIter) {
18,448✔
2429
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
×
2430
      } else {
2431
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
18,448✔
2432
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
18,448✔
2433
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
18,448✔
2434
        QUERY_CHECK_CODE(code, line, _return);
18,448✔
2435
      }
2436
    } else {
2437
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
773,684✔
2438
    }
2439

2440
    size_t len = 0;
792,132✔
2441
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
792,132✔
2442
    while (pOrgTbInfo != NULL) {
2,110,399✔
2443
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
1,318,267✔
2444
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
1,318,267✔
2445
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
1,318,267✔
2446
      if (!pIter) {
1,318,267✔
2447
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
407,596✔
2448
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
407,596✔
2449
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
815,192✔
2450
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
407,596✔
2451
        QUERY_CHECK_CODE(code, line, _return);
407,596✔
2452
      } else {
2453
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
910,671✔
2454
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
910,671✔
2455
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
910,671✔
2456
      }
2457

2458
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
1,318,267✔
2459

2460
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
1,318,267✔
2461
      QUERY_CHECK_CODE(code, line, _return);
1,318,267✔
2462
    }
2463
  }
2464
  return code;
198,033✔
2465
_return:
×
2466
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2467
  return code;
×
2468
}
2469

2470
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
1,547,490✔
2471
  int32_t                    code = TSDB_CODE_SUCCESS;
1,547,490✔
2472
  int32_t                    line = 0;
1,547,490✔
2473
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,547,490✔
2474
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,547,490✔
2475
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,547,490✔
2476
  SArray*                    pColRefArray = NULL;
1,547,490✔
2477
  SOperatorInfo*             pSystableScanOp = NULL;
1,547,490✔
2478
  
2479
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,547,490✔
2480
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
1,547,490✔
2481

2482
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
1,547,490✔
2483
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
21,697✔
2484
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
21,697✔
2485
    pSystableScanOp = pOperator->pDownstream[0];
21,697✔
2486
  } else if (pInfo->qType == DYN_QTYPE_VTB_WINDOW) {
1,525,793✔
2487
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
180,892✔
2488
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
180,892✔
2489
    pSystableScanOp = pOperator->pDownstream[1];
180,892✔
2490
  } else {
2491
    pSystableScanOp = pOperator->pDownstream[1];
1,344,901✔
2492
  }
2493

2494
  while (true) {
3,097,250✔
2495
    SSDataBlock *pChildInfo = NULL;
4,644,740✔
2496
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
4,644,740✔
2497
    QUERY_CHECK_CODE(code, line, _return);
4,644,740✔
2498
    if (pChildInfo == NULL) {
4,644,740✔
2499
      break;
1,547,490✔
2500
    }
2501
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
3,097,250✔
2502
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
3,097,250✔
2503
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
3,097,250✔
2504

2505
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
3,097,250✔
2506
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
3,097,250✔
2507
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
3,097,250✔
2508

2509
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
156,617,236✔
2510
      if (!colDataIsNull_s(pStbNameCol, i)) {
307,039,972✔
2511
        char* stbrawname = colDataGetData(pStbNameCol, i);
153,519,986✔
2512
        char* dbrawname = colDataGetData(pDbNameCol, i);
153,519,986✔
2513
        char *ctbName = colDataGetData(pTableNameCol, i);
153,519,986✔
2514

2515
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
153,519,986✔
2516
          SColRefInfo info = {0};
79,651,622✔
2517
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
79,651,622✔
2518
          QUERY_CHECK_CODE(code, line, _return);
79,651,622✔
2519

2520
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
79,651,622✔
2521
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
66,685,926✔
2522
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2523
                     pInfo->vtbScan.dynTbUid);
2524
              destroyColRefInfo(&info);
×
2525
              continue;
×
2526
            }
2527

2528
            if (pTaskInfo->pStreamRuntimeInfo) {
66,685,926✔
2529
              if (pVtbScan->curOrgTbVg == NULL) {
34,464✔
2530
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,078✔
2531
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,078✔
2532
              }
2533

2534
              if (info.colrefName) {
34,464✔
2535
                int32_t vgId;
18,538✔
2536
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
18,538✔
2537
                QUERY_CHECK_CODE(code, line, _return);
18,538✔
2538
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
18,538✔
2539
                QUERY_CHECK_CODE(code, line, _return);
18,538✔
2540
              }
2541
            }
2542
          }
2543

2544
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
79,651,622✔
2545
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
4,989,072✔
2546
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
4,989,072✔
2547
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
9,978,144✔
2548
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
4,989,072✔
2549
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
9,978,144✔
2550
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
4,989,072✔
2551
            QUERY_CHECK_CODE(code, line, _return);
4,989,072✔
2552
          } else {
2553
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
74,662,550✔
2554
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
74,662,550✔
2555
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
74,662,550✔
2556
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
74,662,550✔
2557
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
149,325,100✔
2558
          }
2559
        }
2560
      }
2561
    }
2562
  }
2563

2564
  switch (pInfo->qType) {
1,547,490✔
2565
    case DYN_QTYPE_VTB_WINDOW: {
180,892✔
2566
      code = buildOrgTbInfoBatch(pOperator, false);
180,892✔
2567
      break;
180,892✔
2568
    }
2569
    case DYN_QTYPE_VTB_AGG: {
21,697✔
2570
      if (pVtbScan->batchProcessChild) {
21,697✔
2571
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
17,141✔
2572
      } else {
2573
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
4,556✔
2574
      }
2575
      break;
21,697✔
2576
    }
2577
    case DYN_QTYPE_VTB_SCAN: {
1,344,901✔
2578
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,344,901✔
2579
      break;
1,344,901✔
2580
    }
2581
    default: {
×
2582
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2583
      break;
×
2584
    }
2585
  }
2586

2587
  QUERY_CHECK_CODE(code, line, _return);
1,547,490✔
2588

2589
_return:
1,547,490✔
2590
  if (code) {
1,547,490✔
2591
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,075✔
2592
  }
2593
  return code;
1,547,490✔
2594
}
2595

2596
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
124,281✔
2597
  int32_t                    code = TSDB_CODE_SUCCESS;
124,281✔
2598
  int32_t                    line = 0;
124,281✔
2599
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
124,281✔
2600
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
124,281✔
2601
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
124,281✔
2602
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
124,281✔
2603
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
124,281✔
2604
  int32_t                    rversion = 0;
124,281✔
2605

2606
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
124,281✔
2607
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
124,281✔
2608

2609
  while (true) {
244,920✔
2610
    SSDataBlock *pTableInfo = NULL;
369,201✔
2611
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
369,201✔
2612
    if (pTableInfo == NULL) {
369,201✔
2613
      break;
124,281✔
2614
    }
2615

2616
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
244,920✔
2617
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
244,920✔
2618
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
244,920✔
2619

2620
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
244,920✔
2621
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
244,920✔
2622
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
244,920✔
2623

2624
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
9,135,879✔
2625
      if (!colDataIsNull_s(pRefVerCol, i)) {
17,781,918✔
2626
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
8,890,959✔
2627
      }
2628

2629
      if (!colDataIsNull_s(pTableNameCol, i)) {
17,781,918✔
2630
        char* tbrawname = colDataGetData(pTableNameCol, i);
8,890,959✔
2631
        char* dbrawname = colDataGetData(pDbNameCol, i);
8,890,959✔
2632
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
8,890,959✔
2633
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
8,890,959✔
2634

2635
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
8,890,959✔
2636
          SColRefInfo info = {0};
617,451✔
2637
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
617,451✔
2638
          QUERY_CHECK_CODE(code, line, _return);
617,451✔
2639

2640
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
617,451✔
2641
            if (pVtbScan->curOrgTbVg == NULL) {
6,880✔
2642
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
430✔
2643
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
430✔
2644
            }
2645
            int32_t vgId;
6,880✔
2646
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,880✔
2647
            QUERY_CHECK_CODE(code, line, _return);
6,880✔
2648
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,880✔
2649
            QUERY_CHECK_CODE(code, line, _return);
6,880✔
2650
          }
2651

2652
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,234,902✔
2653
        }
2654
      }
2655
    }
2656
  }
2657
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
124,281✔
2658
  QUERY_CHECK_CODE(code, line, _return);
124,281✔
2659

2660
_return:
122,991✔
2661
  if (code) {
124,281✔
2662
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,290✔
2663
  }
2664
  return code;
124,281✔
2665
}
2666

2667
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
884,080✔
2668
  int32_t                    code = TSDB_CODE_SUCCESS;
884,080✔
2669
  int32_t                    line = 0;
884,080✔
2670
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
884,080✔
2671
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
884,080✔
2672
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
884,080✔
2673

2674
  SArray *tmpArray = NULL;
884,080✔
2675
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
884,080✔
2676
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
884,080✔
2677
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,730✔
2678
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,365✔
2679
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,365✔
2680
      QUERY_CHECK_CODE(code, line, _return);
2,365✔
2681
      if (pVtbScan->newAddedVgInfo == NULL) {
2,365✔
2682
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
860✔
2683
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
860✔
2684
      }
2685
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,365✔
2686
      QUERY_CHECK_CODE(code, line, _return);
2,365✔
2687
    }
2688
    pVtbScan->needRedeploy = false;
2,365✔
2689
  } else {
2690
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
881,715✔
2691
    QUERY_CHECK_CODE(code, line, _return);
881,715✔
2692
  }
2693

2694
_return:
×
2695
  taosArrayClear(tmpArray);
884,080✔
2696
  taosArrayDestroy(tmpArray);
884,080✔
2697
  if (code) {
884,080✔
2698
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
881,715✔
2699
  }
2700
  return code;
884,080✔
2701
}
2702

2703
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
4,299,987✔
2704
  int32_t                    code = TSDB_CODE_SUCCESS;
4,299,987✔
2705
  int32_t                    line = 0;
4,299,987✔
2706
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,299,987✔
2707
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,299,987✔
2708
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,299,987✔
2709

2710
  pVtbScan->vtbScanParam = NULL;
4,299,987✔
2711
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
4,299,987✔
2712
  QUERY_CHECK_CODE(code, line, _return);
4,299,987✔
2713

2714
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
4,299,987✔
2715
  while (pIter != NULL) {
10,554,574✔
2716
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
6,254,587✔
2717
    SOperatorParam*  pExchangeParam = NULL;
6,254,587✔
2718
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
6,254,587✔
2719
    if (addr != NULL) {
6,254,587✔
2720
      SDownstreamSourceNode newSource = {0};
2,365✔
2721
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,365✔
2722
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,365✔
2723
      newSource.taskId = addr->taskId;
2,365✔
2724
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,365✔
2725
      newSource.localExec = false;
2,365✔
2726
      newSource.addr.nodeId = addr->nodeId;
2,365✔
2727
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,365✔
2728

2729
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,365✔
2730
      QUERY_CHECK_CODE(code, line, _return);
2,365✔
2731
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,365✔
2732
      QUERY_CHECK_CODE(code, line, _return);
2,365✔
2733
    } else {
2734
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
6,252,222✔
2735
      QUERY_CHECK_CODE(code, line, _return);
6,252,222✔
2736
    }
2737
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
12,509,174✔
2738
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
6,254,587✔
2739
  }
2740

2741
  SOperatorParam*  pExchangeParam = NULL;
4,299,987✔
2742
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
4,299,987✔
2743
  QUERY_CHECK_CODE(code, line, _return);
4,299,987✔
2744
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
4,299,987✔
2745

2746
_return:
4,299,987✔
2747
  if (code) {
4,299,987✔
2748
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2749
  }
2750
  return code;
4,299,987✔
2751
}
2752

2753
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
11,879,466✔
2754
  int32_t                    code = TSDB_CODE_SUCCESS;
11,879,466✔
2755
  int32_t                    line = 0;
11,879,466✔
2756
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,879,466✔
2757
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
11,879,466✔
2758
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
11,879,466✔
2759

2760
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
11,879,466✔
2761
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
11,879,466✔
2762
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
11,879,466✔
2763

2764
  while (true) {
2765
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
14,753,460✔
2766
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
10,453,473✔
2767
      QUERY_CHECK_CODE(code, line, _return);
10,453,473✔
2768
    } else {
2769
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
4,299,987✔
2770
      SArray* pColRefInfo = NULL;
4,299,987✔
2771
      if (pVtbScan->isSuperTable) {
4,299,987✔
2772
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
4,176,996✔
2773
      } else {
2774
        pColRefInfo = pInfo->vtbScan.colRefInfo;
122,991✔
2775
      }
2776
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
4,299,987✔
2777

2778
      tb_uid_t uid = 0;
4,299,987✔
2779
      int32_t  vgId = 0;
4,299,987✔
2780
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
4,299,987✔
2781
      QUERY_CHECK_CODE(code, line, _return);
4,299,987✔
2782

2783
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
4,299,987✔
2784
      QUERY_CHECK_CODE(code, line, _return);
4,299,987✔
2785

2786
      // reset downstream operator's status
2787
      pVtbScanOp->status = OP_NOT_OPENED;
4,299,987✔
2788
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
4,299,987✔
2789
      QUERY_CHECK_CODE(code, line, _return);
4,299,363✔
2790
    }
2791

2792
    if (*pRes) {
14,752,836✔
2793
      // has result, still read data from this table.
2794
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
10,458,029✔
2795
      break;
10,458,029✔
2796
    } else {
2797
      // no result, read next table.
2798
      pVtbScan->curTableIdx++;
4,294,807✔
2799
      if (pVtbScan->isSuperTable) {
4,294,807✔
2800
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
4,171,816✔
2801
          setOperatorCompleted(pOperator);
1,297,822✔
2802
          break;
1,297,822✔
2803
        }
2804
      } else {
2805
        setOperatorCompleted(pOperator);
122,991✔
2806
        break;
122,991✔
2807
      }
2808
    }
2809
  }
2810

2811
_return:
11,878,842✔
2812
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
11,878,842✔
2813
  pVtbScan->otbNameToOtbInfoMap = NULL;
11,878,842✔
2814
  if (code) {
11,878,842✔
2815
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2816
  }
2817
  return code;
11,878,842✔
2818
}
2819

2820
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
11,922,655✔
2821
  int32_t                    code = TSDB_CODE_SUCCESS;
11,922,655✔
2822
  int32_t                    line = 0;
11,922,655✔
2823
  int64_t                    st = 0;
11,922,655✔
2824
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,922,655✔
2825
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
11,922,655✔
2826

2827
  if (OPTR_IS_OPENED(pOperator)) {
11,922,655✔
2828
    return code;
10,453,473✔
2829
  }
2830

2831
  if (pOperator->cost.openCost == 0) {
1,469,182✔
2832
    st = taosGetTimestampUs();
1,374,444✔
2833
  }
2834

2835
  if (pVtbScan->isSuperTable) {
1,469,182✔
2836
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,344,901✔
2837
    QUERY_CHECK_CODE(code, line, _return);
1,344,901✔
2838
  } else {
2839
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
124,281✔
2840
    QUERY_CHECK_CODE(code, line, _return);
124,281✔
2841
  }
2842

2843
  OPTR_SET_OPENED(pOperator);
1,466,817✔
2844

2845
_return:
1,469,182✔
2846
  if (pOperator->cost.openCost == 0) {
1,469,182✔
2847
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,374,444✔
2848
  }
2849
  if (code) {
1,469,182✔
2850
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,365✔
2851
    pOperator->pTaskInfo->code = code;
2,365✔
2852
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,365✔
2853
  }
2854
  return code;
1,466,817✔
2855
}
2856

2857
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
12,804,370✔
2858
  int32_t                    code = TSDB_CODE_SUCCESS;
12,804,370✔
2859
  int32_t                    line = 0;
12,804,370✔
2860
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,804,370✔
2861
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,804,370✔
2862

2863
  QRY_PARAM_CHECK(pRes);
12,804,370✔
2864
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
12,804,370✔
2865
    return code;
×
2866
  }
2867
  if (pOperator->pOperatorGetParam) {
12,804,370✔
2868
    if (pOperator->status == OP_EXEC_DONE) {
×
2869
      pOperator->status = OP_OPENED;
×
2870
    }
2871
    pVtbScan->curTableIdx = 0;
×
2872
    pVtbScan->lastTableIdx = -1;
×
2873
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
2874
    pOperator->pOperatorGetParam = NULL;
×
2875
  } else {
2876
    pVtbScan->window.skey = INT64_MAX;
12,804,370✔
2877
    pVtbScan->window.ekey = INT64_MIN;
12,804,370✔
2878
  }
2879

2880
  if (pVtbScan->needRedeploy) {
12,804,370✔
2881
    code = virtualTableScanCheckNeedRedeploy(pOperator);
884,080✔
2882
    QUERY_CHECK_CODE(code, line, _return);
884,080✔
2883
  }
2884

2885
  code = pOperator->fpSet._openFn(pOperator);
11,922,655✔
2886
  QUERY_CHECK_CODE(code, line, _return);
11,920,290✔
2887

2888
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
11,920,290✔
2889
    setOperatorCompleted(pOperator);
40,824✔
2890
    return code;
40,824✔
2891
  }
2892

2893
  code = virtualTableScanGetNext(pOperator, pRes);
11,879,466✔
2894
  QUERY_CHECK_CODE(code, line, _return);
11,878,842✔
2895

2896
  return code;
11,878,842✔
2897

2898
_return:
881,715✔
2899
  if (code) {
881,715✔
2900
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
881,715✔
2901
    pOperator->pTaskInfo->code = code;
881,715✔
2902
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
881,715✔
2903
  }
2904
  return code;
×
2905
}
2906

2907
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,151,338✔
2908
  if (batchFetch) {
1,151,338✔
2909
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,150,252✔
2910
    if (NULL == pPrev->leftHash) {
1,150,252✔
2911
      return terrno;
×
2912
    }
2913
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,150,252✔
2914
    if (NULL == pPrev->rightHash) {
1,150,252✔
2915
      return terrno;
×
2916
    }
2917
  } else {
2918
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,086✔
2919
    if (NULL == pPrev->leftCache) {
1,086✔
2920
      return terrno;
×
2921
    }
2922
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,086✔
2923
    if (NULL == pPrev->rightCache) {
1,086✔
2924
      return terrno;
×
2925
    }
2926
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,086✔
2927
    if (NULL == pPrev->onceTable) {
1,086✔
2928
      return terrno;
×
2929
    }
2930
  }
2931

2932
  return TSDB_CODE_SUCCESS;
1,151,338✔
2933
}
2934

2935
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2936
  if (pStreamRuntimeInfo == NULL) {
×
2937
    return;
×
2938
  }
2939

2940
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2941
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2942
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2943
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2944
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2945

2946
      pVtbScan->dynTbUid = pValue->uid;
×
2947
      break;
×
2948
    }
2949
  }
2950
}
2951

2952
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
1,861,286✔
2953
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2954
  int32_t      code = TSDB_CODE_SUCCESS;
1,861,286✔
2955
  int32_t      line = 0;
1,861,286✔
2956

2957
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
1,861,286✔
2958
  QUERY_CHECK_CODE(code, line, _return);
1,861,286✔
2959

2960
  pInfo->vtbScan.genNewParam = true;
1,861,286✔
2961
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
1,861,286✔
2962
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
1,861,286✔
2963
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
1,861,286✔
2964
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
1,861,286✔
2965
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
1,861,286✔
2966
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
1,861,286✔
2967
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
1,861,286✔
2968
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
1,861,286✔
2969
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
1,861,286✔
2970
  pInfo->vtbScan.needRedeploy = false;
1,861,286✔
2971
  pInfo->vtbScan.pMsgCb = pMsgCb;
1,861,286✔
2972
  pInfo->vtbScan.curTableIdx = 0;
1,861,286✔
2973
  pInfo->vtbScan.lastTableIdx = -1;
1,861,286✔
2974
  pInfo->vtbScan.dynTbUid = 0;
1,861,286✔
2975
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
1,861,286✔
2976
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
1,861,286✔
2977
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
1,861,286✔
2978
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
1,861,286✔
2979
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,861,286✔
2980
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
1,861,286✔
2981
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
4,980,211✔
2982
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
3,118,925✔
2983
    int32_t vgId = (int32_t)valueNode->datum.i;
3,118,925✔
2984
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
3,118,925✔
2985
    QUERY_CHECK_CODE(code, line, _return);
3,118,925✔
2986
  }
2987

2988
  if (pPhyciNode->dynTbname && pTaskInfo) {
1,861,286✔
2989
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2990
  }
2991

2992
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
1,861,286✔
2993
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
1,861,286✔
2994

2995
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
14,012,146✔
2996
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
12,150,860✔
2997
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
12,150,860✔
2998
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
24,301,720✔
2999
  }
3000

3001
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
1,861,286✔
3002
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
1,861,286✔
3003

3004
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,861,286✔
3005
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
1,861,286✔
3006

3007
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
1,861,286✔
3008
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
1,861,286✔
3009
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
1,861,286✔
3010
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
1,861,286✔
3011
  pInfo->vtbScan.vtbUidTagListMap = NULL;
1,861,286✔
3012
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
1,861,286✔
3013
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
1,861,286✔
3014

3015
  return code;
1,861,286✔
3016
_return:
×
3017
  // no need to destroy array and hashmap allocated in this function,
3018
  // since the operator's destroy function will take care of it
3019
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3020
  return code;
×
3021
}
3022

3023
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
465,145✔
3024
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3025
  int32_t              code = TSDB_CODE_SUCCESS;
465,145✔
3026
  int32_t              line = 0;
465,145✔
3027
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
465,145✔
3028

3029
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
465,145✔
3030
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
465,145✔
3031
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
465,145✔
3032
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
465,145✔
3033
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
465,145✔
3034
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
465,145✔
3035

3036
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
465,145✔
3037
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
465,145✔
3038

3039
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
465,145✔
3040
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
465,145✔
3041

3042
  pInfo->vtbWindow.outputWstartSlotId = -1;
465,145✔
3043
  pInfo->vtbWindow.outputWendSlotId = -1;
465,145✔
3044
  pInfo->vtbWindow.outputWdurationSlotId = -1;
465,145✔
3045
  pInfo->vtbWindow.curWinBatchIdx = 0;
465,145✔
3046

3047
  initResultSizeInfo(&pOperator->resultInfo, 1);
465,145✔
3048
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
465,145✔
3049
  QUERY_CHECK_CODE(code, line, _return);
465,145✔
3050

3051
  return code;
465,145✔
3052
_return:
×
3053
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3054
  return code;
×
3055
}
3056

3057
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
2,584,148✔
3058
  int32_t code = TSDB_CODE_SUCCESS;
2,584,148✔
3059
  int32_t lino = 0;
2,584,148✔
3060

3061
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,584,148✔
3062
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
2,584,148✔
3063
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
2,584,148✔
3064

3065
    *ppTsCols = (int64_t*)pColDataInfo->pData;
2,584,148✔
3066

3067
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,584,148✔
3068
      code = blockDataUpdateTsWindow(pBlock, slotId);
252,112✔
3069
      QUERY_CHECK_CODE(code, lino, _return);
252,112✔
3070
    }
3071
  }
3072

3073
  return code;
2,584,148✔
3074
_return:
×
3075
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3076
  return code;
×
3077
}
3078

3079
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
496,659✔
3080
  int32_t                    code = TSDB_CODE_SUCCESS;
496,659✔
3081
  int32_t                    lino = 0;
496,659✔
3082
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
496,659✔
3083
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
496,659✔
3084
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
496,659✔
3085
  int64_t                    st = 0;
496,659✔
3086

3087
  if (OPTR_IS_OPENED(pOperator)) {
496,659✔
3088
    return code;
31,514✔
3089
  }
3090

3091
  if (pOperator->cost.openCost == 0) {
465,145✔
3092
    st = taosGetTimestampUs();
465,145✔
3093
  }
3094

3095
  while (1) {
1,292,074✔
3096
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,757,219✔
3097
    if (pBlock == NULL) {
1,757,219✔
3098
      break;
465,145✔
3099
    }
3100

3101
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
1,292,074✔
3102
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
3,358,120✔
3103
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
2,892,975✔
3104
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
2,892,975✔
3105
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
760,115✔
3106
            pInfo->outputWstartSlotId = i;
283,625✔
3107
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
476,490✔
3108
            pInfo->outputWendSlotId = i;
283,625✔
3109
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
192,865✔
3110
            pInfo->outputWdurationSlotId = i;
192,865✔
3111
          }
3112
        }
3113
      }
3114
    }
3115

3116
    TSKEY* wstartCol = NULL;
1,292,074✔
3117
    TSKEY* wendCol = NULL;
1,292,074✔
3118

3119
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
1,292,074✔
3120
    QUERY_CHECK_CODE(code, lino, _return);
1,292,074✔
3121
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
1,292,074✔
3122
    QUERY_CHECK_CODE(code, lino, _return);
1,292,074✔
3123

3124
    SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
1,292,074✔
3125
    QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
1,292,074✔
3126

3127
    QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
1,292,074✔
3128

3129
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
3130
      SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
2,147,483,647✔
3131
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
2,147,483,647✔
3132
      pWindow->tw.skey = wstartCol[i];
2,147,483,647✔
3133
      pWindow->tw.ekey = wendCol[i] + 1;
2,147,483,647✔
3134
      pWindow->winOutIdx = -1;
2,147,483,647✔
3135
    }
3136

3137
    QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
2,584,148✔
3138
  }
3139

3140
  // handle first window's start key and last window's end key
3141
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
465,145✔
3142
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
465,145✔
3143

3144
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
465,145✔
3145
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
465,145✔
3146

3147
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
465,145✔
3148
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
465,145✔
3149

3150
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
465,145✔
3151
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
465,145✔
3152

3153
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
465,145✔
3154
    lastWin->tw.ekey = INT64_MAX;
94,751✔
3155
  }
3156
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
465,145✔
3157
    firstWin->tw.skey = INT64_MIN;
185,197✔
3158
  }
3159

3160
  if (pInfo->isVstb) {
465,145✔
3161
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
180,892✔
3162
    QUERY_CHECK_CODE(code, lino, _return);
180,892✔
3163
  }
3164

3165
  OPTR_SET_OPENED(pOperator);
465,145✔
3166

3167
  if (pOperator->cost.openCost == 0) {
465,145✔
3168
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
465,145✔
3169
  }
3170

3171
_return:
×
3172
  if (code != TSDB_CODE_SUCCESS) {
465,145✔
3173
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3174
    pTaskInfo->code = code;
×
3175
    T_LONG_JMP(pTaskInfo->env, code);
×
3176
  }
3177
  return code;
465,145✔
3178
}
3179

3180
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
194,128✔
3181
  int32_t                       code = TSDB_CODE_SUCCESS;
194,128✔
3182
  int32_t                       lino = 0;
194,128✔
3183
  SExternalWindowOperatorParam* pExtWinOp = NULL;
194,128✔
3184

3185
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
194,128✔
3186
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
194,128✔
3187

3188
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
194,128✔
3189
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
194,128✔
3190

3191
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
194,128✔
3192
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
194,128✔
3193

3194
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
194,128✔
3195
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGetLast(pWins);
194,128✔
3196

3197
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
194,128✔
3198
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
194,128✔
3199

3200
  SOperatorParam* pExchangeParam = NULL;
194,128✔
3201
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pInfo->vtbScan.otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey}, EX_SRC_TYPE_VSTB_WIN_SCAN);
194,128✔
3202
  QUERY_CHECK_CODE(code, lino, _return);
194,128✔
3203

3204
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
388,256✔
3205

3206
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
194,128✔
3207
  (*ppRes)->downstreamIdx = idx;
194,128✔
3208
  (*ppRes)->value = pExtWinOp;
194,128✔
3209
  (*ppRes)->reUse = false;
194,128✔
3210

3211
  return code;
194,128✔
3212
_return:
×
3213
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3214
  if (pExtWinOp) {
×
3215
    if (pExtWinOp->ExtWins) {
×
3216
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3217
    }
3218
    taosMemoryFree(pExtWinOp);
×
3219
  }
3220
  if (*ppRes) {
×
3221
    if ((*ppRes)->pChildren) {
×
3222
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3223
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
3224
        if (pChildParam) {
×
3225
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
3226
          if (pDynParam) {
×
3227
            taosMemoryFree(pDynParam);
×
3228
          }
3229
          taosMemoryFree(pChildParam);
×
3230
        }
3231
      }
3232
      taosArrayDestroy((*ppRes)->pChildren);
×
3233
    }
3234
    taosMemoryFree(*ppRes);
×
3235
    *ppRes = NULL;
×
3236
  }
3237
  return code;
×
3238
}
3239

3240
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
496,659✔
3241
  int32_t                    code = TSDB_CODE_SUCCESS;
496,659✔
3242
  int32_t                    lino = 0;
496,659✔
3243
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
496,659✔
3244
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
496,659✔
3245
  int64_t                    st = taosGetTimestampUs();
496,659✔
3246
  int32_t                    numOfWins = 0;
496,659✔
3247
  SOperatorInfo*             mergeOp = NULL;
496,659✔
3248
  SOperatorInfo*             extWinOp = NULL;
496,659✔
3249
  SOperatorParam*            pMergeParam = NULL;
496,659✔
3250
  SOperatorParam*            pExtWinParam = NULL;
496,659✔
3251
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
496,659✔
3252
  SSDataBlock*               pRes = pInfo->pRes;
496,659✔
3253

3254
  code = pOperator->fpSet._openFn(pOperator);
496,659✔
3255
  QUERY_CHECK_CODE(code, lino, _return);
496,659✔
3256

3257
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
496,659✔
3258
    *ppRes = NULL;
11,345✔
3259
    return code;
11,345✔
3260
  }
3261

3262
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
485,314✔
3263
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
485,314✔
3264

3265
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
485,314✔
3266

3267
  if (pInfo->isVstb) {
485,314✔
3268
    extWinOp = pOperator->pDownstream[2];
194,128✔
3269
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
194,128✔
3270
    QUERY_CHECK_CODE(code, lino, _return);
194,128✔
3271

3272
    SSDataBlock* pExtWinBlock = NULL;
194,128✔
3273
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
194,128✔
3274
    QUERY_CHECK_CODE(code, lino, _return);
194,128✔
3275
    setOperatorCompleted(extWinOp);
194,128✔
3276

3277
    blockDataCleanup(pRes);
194,128✔
3278
    code = blockDataEnsureCapacity(pRes, numOfWins);
194,128✔
3279
    QUERY_CHECK_CODE(code, lino, _return);
194,128✔
3280

3281
    if (pExtWinBlock) {
194,128✔
3282
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
194,128✔
3283
      QUERY_CHECK_CODE(code, lino, _return);
194,128✔
3284

3285
      if (pInfo->curWinBatchIdx == 0) {
194,128✔
3286
        // first batch, get _wstart from pMergedBlock
3287
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
180,892✔
3288
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
180,892✔
3289

3290
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
180,892✔
3291
      }
3292
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
194,128✔
3293
        // last batch, get _wend from pMergedBlock
3294
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
4,412✔
3295
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
4,412✔
3296

3297
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
4,412✔
3298
      }
3299
    }
3300
  } else {
3301
    mergeOp = pOperator->pDownstream[1];
291,186✔
3302
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
291,186✔
3303
    QUERY_CHECK_CODE(code, lino, _return);
291,186✔
3304

3305
    SSDataBlock* pMergedBlock = NULL;
291,186✔
3306
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
291,186✔
3307
    QUERY_CHECK_CODE(code, lino, _return);
291,186✔
3308

3309
    blockDataCleanup(pRes);
291,186✔
3310
    code = blockDataEnsureCapacity(pRes, numOfWins);
291,186✔
3311
    QUERY_CHECK_CODE(code, lino, _return);
291,186✔
3312

3313
    if (pMergedBlock) {
291,186✔
3314
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
291,186✔
3315
      QUERY_CHECK_CODE(code, lino, _return);
291,186✔
3316

3317
      if (pInfo->curWinBatchIdx == 0) {
291,186✔
3318
        // first batch, get _wstart from pMergedBlock
3319
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
284,253✔
3320
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
284,253✔
3321

3322
        firstWin->tw.skey = pMergedBlock->info.window.skey;
284,253✔
3323
      }
3324
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
291,186✔
3325
        // last batch, get _wend from pMergedBlock
3326
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
6,933✔
3327
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
6,933✔
3328

3329
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
6,933✔
3330
      }
3331
    }
3332
  }
3333

3334

3335
  if (pInfo->outputWstartSlotId != -1) {
485,314✔
3336
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
303,794✔
3337
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
303,794✔
3338

3339
    for (int32_t i = 0; i < numOfWins; i++) {
916,511,775✔
3340
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
916,207,981✔
3341
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
916,207,981✔
3342
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
916,207,981✔
3343
      QUERY_CHECK_CODE(code, lino, _return);
916,207,981✔
3344
    }
3345
  }
3346
  if (pInfo->outputWendSlotId != -1) {
485,314✔
3347
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
303,794✔
3348
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
303,794✔
3349

3350
    for (int32_t i = 0; i < numOfWins; i++) {
916,511,775✔
3351
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
916,207,981✔
3352
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
916,207,981✔
3353
      TSKEY ekey = pWindow->tw.ekey - 1;
916,207,981✔
3354
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
916,207,981✔
3355
      QUERY_CHECK_CODE(code, lino, _return);
916,207,981✔
3356
    }
3357
  }
3358
  if (pInfo->outputWdurationSlotId != -1) {
485,314✔
3359
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
213,034✔
3360
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
213,034✔
3361

3362
    for (int32_t i = 0; i < numOfWins; i++) {
637,606,295✔
3363
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
637,393,261✔
3364
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
637,393,261✔
3365
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
637,393,261✔
3366
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
637,393,261✔
3367
      QUERY_CHECK_CODE(code, lino, _return);
637,393,261✔
3368
    }
3369
  }
3370

3371
  pRes->info.rows = numOfWins;
485,314✔
3372
  *ppRes = pRes;
485,314✔
3373
  pInfo->curWinBatchIdx++;
485,314✔
3374

3375
  return code;
485,314✔
3376

3377
_return:
×
3378
  if (code != TSDB_CODE_SUCCESS) {
×
3379
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3380
    pTaskInfo->code = code;
×
3381
    T_LONG_JMP(pTaskInfo->env, code);
×
3382
  }
3383
  return code;
×
3384
}
3385

3386
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
1,102,551✔
3387
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
1,102,551✔
3388
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
1,102,766✔
3389
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
1,102,766✔
3390

3391
  pOper->status = OP_NOT_OPENED;
1,102,766✔
3392

3393
  switch (pDyn->qType) {
1,102,766✔
3394
    case DYN_QTYPE_STB_HASH:{
738✔
3395
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
738✔
3396
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
738✔
3397
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
738✔
3398
      
3399
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
738✔
3400
      if (TSDB_CODE_SUCCESS != code) {
738✔
3401
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
3402
        return code;
×
3403
      }
3404
      pStbJoin->ctx.prev.pListHead = NULL;
738✔
3405
      pStbJoin->ctx.prev.joinBuild = false;
738✔
3406
      pStbJoin->ctx.prev.pListTail = NULL;
738✔
3407
      pStbJoin->ctx.prev.tableNum = 0;
738✔
3408

3409
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
738✔
3410
      break; 
738✔
3411
    }
3412
    case DYN_QTYPE_VTB_SCAN: {
1,101,813✔
3413
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
1,101,813✔
3414
      
3415
      if (pVtbScan->otbNameToOtbInfoMap) {
1,101,813✔
3416
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3417
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3418
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3419
      }
3420
      if (pVtbScan->pRsp) {
1,102,243✔
3421
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3422
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3423
      }
3424
      if (pVtbScan->colRefInfo) {
1,102,243✔
3425
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
124,281✔
3426
        pVtbScan->colRefInfo = NULL;
124,281✔
3427
      }
3428
      if (pVtbScan->childTableMap) {
1,102,028✔
3429
        taosHashCleanup(pVtbScan->childTableMap);
5,598✔
3430
        pVtbScan->childTableMap = NULL;
5,598✔
3431
      }
3432
      if (pVtbScan->childTableList) {
1,102,028✔
3433
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
1,101,813✔
3434
      }
3435
      if (pPhyciNode->dynTbname && pTaskInfo) {
1,102,028✔
3436
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3437
      }
3438
      pVtbScan->curTableIdx = 0;
1,102,028✔
3439
      pVtbScan->lastTableIdx = -1;
1,102,243✔
3440
      break;
1,102,243✔
3441
    }
3442
    case DYN_QTYPE_VTB_WINDOW: {
×
3443
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3444
      if (pVtbWindow->pRes) {
×
3445
        blockDataDestroy(pVtbWindow->pRes);
×
3446
        pVtbWindow->pRes = NULL;
×
3447
      }
3448
      if (pVtbWindow->pWins) {
×
3449
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
3450
        pVtbWindow->pWins = NULL;
×
3451
      }
3452
      pVtbWindow->outputWdurationSlotId = -1;
×
3453
      pVtbWindow->outputWendSlotId = -1;
×
3454
      pVtbWindow->outputWstartSlotId = -1;
×
3455
      pVtbWindow->curWinBatchIdx = 0;
×
3456
      break;
×
3457
    }
3458
    default:
×
3459
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
3460
      break;
×
3461
  }
3462
  return 0;
1,102,766✔
3463
}
3464

3465
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
62,869✔
3466
  int32_t                    code = TSDB_CODE_SUCCESS;
62,869✔
3467
  int32_t                    line = 0;
62,869✔
3468
  int64_t                    st = 0;
62,869✔
3469
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
62,869✔
3470

3471
  if (OPTR_IS_OPENED(pOperator)) {
62,869✔
3472
    return code;
41,172✔
3473
  }
3474

3475
  if (pOperator->cost.openCost == 0) {
21,697✔
3476
    st = taosGetTimestampUs();
21,697✔
3477
  }
3478

3479
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
21,697✔
3480
  QUERY_CHECK_CODE(code, line, _return);
21,697✔
3481
  OPTR_SET_OPENED(pOperator);
21,697✔
3482

3483
_return:
21,697✔
3484
  if (pOperator->cost.openCost == 0) {
21,697✔
3485
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
21,697✔
3486
  }
3487
  if (code) {
21,697✔
3488
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3489
    pOperator->pTaskInfo->code = code;
×
3490
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3491
  }
3492
  return code;
21,697✔
3493
}
3494

3495
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
62,869✔
3496
  int32_t                    code = TSDB_CODE_SUCCESS;
62,869✔
3497
  int32_t                    line = 0;
62,869✔
3498
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
62,869✔
3499
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
62,869✔
3500
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
62,869✔
3501
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
62,869✔
3502
  SOperatorParam*            pAggParam = NULL;
62,869✔
3503

3504
  if (pInfo->vtbScan.hasPartition) {
62,869✔
3505
    if (pInfo->vtbScan.batchProcessChild) {
18,448✔
3506
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
18,448✔
3507
      while (pIter) {
27,672✔
3508
        size_t     keyLen = 0;
23,060✔
3509
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
23,060✔
3510

3511
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
23,060✔
3512
        QUERY_CHECK_CODE(code, line, _return);
23,060✔
3513

3514
        if (pAggParam) {
23,060✔
3515
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
23,060✔
3516
          QUERY_CHECK_CODE(code, line, _return);
23,060✔
3517
        } else {
3518
          *pRes = NULL;
×
3519
        }
3520

3521
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
23,060✔
3522

3523
        if (*pRes) {
23,060✔
3524
          (*pRes)->info.id.groupId = groupid;
13,836✔
3525
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
13,836✔
3526
          QUERY_CHECK_CODE(code, line, _return);
13,836✔
3527
          break;
13,836✔
3528
        }
3529
      }
3530
    } else {
3531
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
×
3532
      while (pIter) {
×
3533
        size_t     keyLen = 0;
×
3534
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
×
3535
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
×
3536

3537
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
×
3538
        while (pIter2) {
×
3539
          size_t   keyLen2 = 0;
×
3540
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
×
3541
          SArray*  pTagList = *(SArray**)pIter2;
×
3542

3543
          if (pVtbScan->genNewParam) {
×
3544
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
×
3545
            QUERY_CHECK_CODE(code, line, _return);
×
3546
            if (pAggParam) {
×
3547
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
×
3548
              QUERY_CHECK_CODE(code, line, _return);
×
3549
            } else {
3550
              *pRes = NULL;
×
3551
            }
3552
          } else {
3553
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
×
3554
            QUERY_CHECK_CODE(code, line, _return);
×
3555
          }
3556

3557
          if (*pRes) {
×
3558
            pVtbScan->genNewParam = false;
×
3559
            (*pRes)->info.id.groupId = *groupid;
×
3560
            break;
×
3561
          }
3562
          pVtbScan->genNewParam = true;
×
3563
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
×
3564
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
×
3565
          QUERY_CHECK_CODE(code, line, _return);
×
3566
        }
3567
        if (*pRes) {
×
3568
          break;
×
3569
        }
3570
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
×
3571
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
×
3572
        QUERY_CHECK_CODE(code, line, _return);
×
3573
      }
3574
    }
3575

3576
  } else {
3577
    if (pInfo->vtbScan.batchProcessChild) {
44,421✔
3578
      code = buildAggOperatorParam(pInfo, &pAggParam);
12,529✔
3579
      QUERY_CHECK_CODE(code, line, _return);
12,529✔
3580

3581
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
12,529✔
3582
      QUERY_CHECK_CODE(code, line, _return);
12,529✔
3583
      setOperatorCompleted(pOperator);
12,529✔
3584
    } else {
3585
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
31,892✔
3586
      while (pIter) {
50,116✔
3587
        size_t   keyLen = 0;
45,560✔
3588
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
45,560✔
3589
        SArray*  pTagList = *(SArray**)pIter;
45,560✔
3590

3591
        if (pVtbScan->genNewParam) {
45,560✔
3592
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
18,224✔
3593
          QUERY_CHECK_CODE(code, line, _return);
18,224✔
3594

3595
          if (pAggParam) {
18,224✔
3596
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
18,224✔
3597
            QUERY_CHECK_CODE(code, line, _return);
18,224✔
3598
          } else {
3599
            *pRes = NULL;
×
3600
          }
3601
        } else {
3602
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
27,336✔
3603
          QUERY_CHECK_CODE(code, line, _return);
27,336✔
3604
        }
3605

3606
        if (*pRes) {
45,560✔
3607
          pVtbScan->genNewParam = false;
27,336✔
3608
          break;
27,336✔
3609
        }
3610
        pVtbScan->genNewParam = true;
18,224✔
3611
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
18,224✔
3612
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
18,224✔
3613
        QUERY_CHECK_CODE(code, line, _return);
18,224✔
3614
      }
3615
    }
3616
  }
3617
_return:
62,869✔
3618
  if (code) {
62,869✔
3619
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3620
  }
3621
  return code;
62,869✔
3622
}
3623

3624
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
75,398✔
3625
  int32_t                    code = TSDB_CODE_SUCCESS;
75,398✔
3626
  int32_t                    line = 0;
75,398✔
3627
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
75,398✔
3628
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
75,398✔
3629

3630
  QRY_PARAM_CHECK(pRes);
75,398✔
3631
  if (pOperator->status == OP_EXEC_DONE) {
75,398✔
3632
    return code;
12,529✔
3633
  }
3634

3635
  code = pOperator->fpSet._openFn(pOperator);
62,869✔
3636
  QUERY_CHECK_CODE(code, line, _return);
62,869✔
3637

3638
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
62,869✔
3639
    setOperatorCompleted(pOperator);
×
3640
    return code;
×
3641
  }
3642

3643
  code = virtualTableAggGetNext(pOperator, pRes);
62,869✔
3644
  QUERY_CHECK_CODE(code, line, _return);
62,869✔
3645

3646
  return code;
62,869✔
3647

3648
_return:
×
3649
  if (code) {
×
3650
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3651
    pOperator->pTaskInfo->code = code;
×
3652
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3653
  }
3654
  return code;
×
3655
}
3656

3657
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,011,886✔
3658
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3659
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3660
  QRY_PARAM_CHECK(pOptrInfo);
3,011,886✔
3661

3662
  int32_t                    code = TSDB_CODE_SUCCESS;
3,011,886✔
3663
  int32_t                    line = 0;
3,011,886✔
3664
  __optr_fn_t                nextFp = NULL;
3,011,886✔
3665
  __optr_open_fn_t           openFp = NULL;
3,011,886✔
3666
  SOperatorInfo*             pOperator = NULL;
3,011,886✔
3667
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
3,011,886✔
3668
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
3,011,886✔
3669

3670
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,011,886✔
3671
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
3,011,886✔
3672

3673
  pOperator->pPhyNode = pPhyciNode;
3,011,886✔
3674
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
3,011,886✔
3675

3676
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
3,011,886✔
3677
  QUERY_CHECK_CODE(code, line, _error);
3,011,886✔
3678

3679
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
3,011,886✔
3680
                  pInfo, pTaskInfo);
3681

3682
  pInfo->qType = pPhyciNode->qType;
3,011,886✔
3683
  switch (pInfo->qType) {
3,011,886✔
3684
    case DYN_QTYPE_STB_HASH:
1,150,600✔
3685
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,150,600✔
3686
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,150,600✔
3687
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,150,600✔
3688
      QUERY_CHECK_CODE(code, line, _error);
1,150,600✔
3689
      nextFp = seqStableJoin;
1,150,600✔
3690
      openFp = optrDummyOpenFn;
1,150,600✔
3691
      break;
1,150,600✔
3692
    case DYN_QTYPE_VTB_SCAN:
1,374,444✔
3693
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,374,444✔
3694
      QUERY_CHECK_CODE(code, line, _error);
1,374,444✔
3695
      nextFp = vtbScanNext;
1,374,444✔
3696
      openFp = vtbScanOpen;
1,374,444✔
3697
      break;
1,374,444✔
3698
    case DYN_QTYPE_VTB_WINDOW:
465,145✔
3699
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
465,145✔
3700
      QUERY_CHECK_CODE(code, line, _error);
465,145✔
3701
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
465,145✔
3702
      QUERY_CHECK_CODE(code, line, _error);
465,145✔
3703
      nextFp = vtbWindowNext;
465,145✔
3704
      openFp = vtbWindowOpen;
465,145✔
3705
      break;
465,145✔
3706
    case DYN_QTYPE_VTB_AGG:
21,697✔
3707
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
21,697✔
3708
      QUERY_CHECK_CODE(code, line, _error);
21,697✔
3709
      nextFp = vtbAggNext;
21,697✔
3710
      openFp = vtbAggOpen;
21,697✔
3711
      break;
21,697✔
3712
    default:
×
3713
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
3714
      code = TSDB_CODE_INVALID_PARA;
×
3715
      goto _error;
×
3716
  }
3717

3718
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
3,011,886✔
3719
                                         NULL, optrDefaultGetNextExtFn, NULL);
3720

3721
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
3,011,886✔
3722
  *pOptrInfo = pOperator;
3,011,886✔
3723
  return TSDB_CODE_SUCCESS;
3,011,886✔
3724

3725
_error:
×
3726
  if (pInfo != NULL) {
×
3727
    destroyDynQueryCtrlOperator(pInfo);
×
3728
  }
3729
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
3730
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
3731
  pTaskInfo->code = code;
×
3732
  return code;
×
3733
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc