• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4975

05 Mar 2026 08:43AM UTC coverage: 68.37% (+0.7%) from 67.664%
#4975

push

travis-ci

web-flow
merge: from main to 3.0 branch #34682

250 of 345 new or added lines in 28 files covered. (72.46%)

446 existing lines in 120 files now uncovered.

210600 of 308032 relevant lines covered (68.37%)

130326818.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.39
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "nodes.h"
18
#include "operator.h"
19
#include "os.h"
20
#include "plannodes.h"
21
#include "query.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tarray.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "thash.h"
28
#include "tmsg.h"
29
#include "trpc.h"
30
#include "ttypes.h"
31
#include "tdataformat.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,150,806✔
41
  SStbJoinTableList* pNext = NULL;
1,150,806✔
42
  
43
  while (pListHead) {
1,151,311✔
44
    taosMemoryFree(pListHead->pLeftVg);
505✔
45
    taosMemoryFree(pListHead->pLeftUid);
505✔
46
    taosMemoryFree(pListHead->pRightVg);
505✔
47
    taosMemoryFree(pListHead->pRightUid);
505✔
48
    pNext = pListHead->pNext;
505✔
49
    taosMemoryFree(pListHead);
505✔
50
    pListHead = pNext;
505✔
51
  }
52
}
1,150,806✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,150,806✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,150,806✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1,150,806✔
60
    if (pStbJoin->ctx.prev.leftHash) {
1,149,684✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,070,969✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,070,969✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1,149,684✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,070,969✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,070,969✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
1,122✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,122✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
1,122✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,122✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
1,122✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,122✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,150,806✔
81
}
1,150,806✔
82

83
void destroyColRefInfo(void *info) {
220,501,784✔
84
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
220,501,784✔
85
  if (pColRefInfo) {
220,501,784✔
86
    taosMemoryFree(pColRefInfo->colName);
220,501,784✔
87
    taosMemoryFree(pColRefInfo->colrefName);
220,501,784✔
88
  }
89
}
220,501,784✔
90

91
void destroyColRefArray(void *info) {
11,567,405✔
92
  SArray *pColRefArray = *(SArray **)info;
11,567,405✔
93
  if (pColRefArray) {
11,567,405✔
94
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
11,567,405✔
95
  }
96
}
11,567,405✔
97

98
void freeUseDbOutput(void* pOutput) {
2,535,490✔
99
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
2,535,490✔
100
  if (NULL == pOutput) {
2,535,490✔
101
    return;
×
102
  }
103

104
  if (pOut->dbVgroup) {
2,535,490✔
105
    freeVgInfo(pOut->dbVgroup);
2,535,490✔
106
  }
107
  taosMemFree(pOut);
2,535,490✔
108
}
109

110
void destroyOtbInfoArray(void *info) {
5,432,948✔
111
  SArray *pOtbInfoArray = *(SArray **)info;
5,432,948✔
112
  if (pOtbInfoArray) {
5,432,948✔
113
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
5,432,948✔
114
  }
115
}
5,432,948✔
116

117
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
4,015,526✔
118
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
4,015,526✔
119
  if (pOtbVgIdToOtbInfoArrayMap) {
4,015,526✔
120
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
4,015,526✔
121
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
4,015,526✔
122
  }
123
}
4,015,526✔
124

125
void destroyTagList(void *info) {
2,817,000✔
126
  SArray *pTagList = *(SArray **)info;
2,817,000✔
127
  if (pTagList) {
2,817,000✔
128
    taosArrayDestroyEx(pTagList, destroyTagVal);
2,817,000✔
129
  }
130
}
2,817,000✔
131

132
static void destroyRefColIdGroup(void *info) {
7,224✔
133
  SRefColIdGroup *pGroup = (SRefColIdGroup *)info;
7,224✔
134
  if (pGroup && pGroup->pSlotIdList) {
7,224✔
135
    taosArrayDestroy(pGroup->pSlotIdList);
7,224✔
136
    pGroup->pSlotIdList = NULL;
7,224✔
137
  }
138
}
7,224✔
139

140
void destroyVtbUidTagListMap(void *info) {
1,061,304✔
141
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
1,061,304✔
142
  if (pVtbUidTagListMap) {
1,061,304✔
143
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
1,061,304✔
144
    taosHashCleanup(pVtbUidTagListMap);
1,061,304✔
145
  }
146
}
1,061,304✔
147

148
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
2,778,186✔
149
  if (pVtbScan->dbName) {
2,778,186✔
150
    taosMemoryFreeClear(pVtbScan->dbName);
2,778,186✔
151
  }
152
  if (pVtbScan->tbName) {
2,778,186✔
153
    taosMemoryFreeClear(pVtbScan->tbName);
2,778,186✔
154
  }
155
  if (pVtbScan->refColGroups) {
2,778,186✔
156
    taosArrayDestroyEx(pVtbScan->refColGroups, destroyRefColIdGroup);
1,206✔
157
    pVtbScan->refColGroups = NULL;
1,206✔
158
  }
159
  if (pVtbScan->childTableList) {
2,778,186✔
160
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
2,778,186✔
161
  }
162
  if (pVtbScan->colRefInfo) {
2,778,186✔
163
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
164
    pVtbScan->colRefInfo = NULL;
×
165
  }
166
  if (pVtbScan->childTableMap) {
2,778,186✔
167
    taosHashCleanup(pVtbScan->childTableMap);
2,196,069✔
168
  }
169
  if (pVtbScan->readColList) {
2,778,186✔
170
    taosArrayDestroy(pVtbScan->readColList);
2,778,186✔
171
  }
172
  if (pVtbScan->readColSet) {
2,778,186✔
173
    taosHashCleanup(pVtbScan->readColSet);
2,778,186✔
174
  }
175
  if (pVtbScan->dbVgInfoMap) {
2,778,186✔
176
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
2,778,186✔
177
    taosHashCleanup(pVtbScan->dbVgInfoMap);
2,778,186✔
178
  }
179
  if (pVtbScan->otbNameToOtbInfoMap) {
2,778,186✔
180
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
980,481✔
181
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
980,481✔
182
  }
183
  if (pVtbScan->pRsp) {
2,778,186✔
184
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
185
    taosMemoryFreeClear(pVtbScan->pRsp);
×
186
  }
187
  if (pVtbScan->existOrgTbVg) {
2,778,186✔
188
    taosHashCleanup(pVtbScan->existOrgTbVg);
2,778,186✔
189
  }
190
  if (pVtbScan->curOrgTbVg) {
2,778,186✔
191
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,756✔
192
  }
193
  if (pVtbScan->newAddedVgInfo) {
2,778,186✔
194
    taosHashCleanup(pVtbScan->newAddedVgInfo);
864✔
195
  }
196
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
2,778,186✔
197
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
203,571✔
198
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
203,571✔
199
  }
200
  if (pVtbScan->vtbUidToVgIdMapMap) {
2,778,186✔
201
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
315,540✔
202
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
315,540✔
203
  }
204
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
2,778,186✔
205
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
460,732✔
206
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
460,732✔
207
  }
208
  if (pVtbScan->vtbUidTagListMap) {
2,778,186✔
209
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
315,540✔
210
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
315,540✔
211
  }
212
  if (pVtbScan->vtbGroupIdTagListMap) {
2,778,186✔
213
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
717,532✔
214
  }
215
  if (pVtbScan->vtbUidToGroupIdMap) {
2,778,186✔
216
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
717,532✔
217
  }
218
}
2,778,186✔
219

220
void destroyWinArray(void *info) {
1,474,770✔
221
  SArray *pWinArray = *(SArray **)info;
1,474,770✔
222
  if (pWinArray) {
1,474,770✔
223
    taosArrayDestroy(pWinArray);
1,474,770✔
224
  }
225
}
1,474,770✔
226

227
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
642,183✔
228
  if (pVtbWindow->pRes) {
642,183✔
229
    blockDataDestroy(pVtbWindow->pRes);
642,183✔
230
  }
231
  if (pVtbWindow->pWins) {
642,183✔
232
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
642,183✔
233
  }
234
}
642,183✔
235

236
static void destroyDynQueryCtrlOperator(void* param) {
3,928,212✔
237
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
3,928,212✔
238

239
  switch (pDyn->qType) {
3,928,212✔
240
    case DYN_QTYPE_STB_HASH:
1,150,026✔
241
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,150,026✔
242
      break;
1,150,026✔
243
    case DYN_QTYPE_VTB_WINDOW:
642,183✔
244
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
642,183✔
245
    case DYN_QTYPE_VTB_AGG:
2,778,186✔
246
    case DYN_QTYPE_VTB_SCAN:
247
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
2,778,186✔
248
      break;
2,778,186✔
249
    default:
×
250
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
251
      break;
×
252
  }
253

254
  taosMemoryFreeClear(param);
3,928,212✔
255
}
3,928,212✔
256

257
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
258
  if (batchFetch) {
7,440,676✔
259
    return true;
7,436,188✔
260
  }
261
  
262
  if (rightTable) {
4,488✔
263
    return pPost->rightCurrUid == pPost->rightNextUid;
2,244✔
264
  }
265

266
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,244✔
267

268
  return (NULL == num) ? false : true;
2,244✔
269
}
270

271
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,720,338✔
272
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,720,338✔
273
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,720,338✔
274
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,720,338✔
275
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,720,338✔
276
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,720,338✔
277
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,720,338✔
278
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,720,338✔
279
  int64_t                    readIdx = pNode->readIdx + 1;
3,720,338✔
280
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,720,338✔
281

282
  pPost->leftCurrUid = *leftUid;
3,720,338✔
283
  pPost->rightCurrUid = *rightUid;
3,720,338✔
284

285
  pPost->leftVgId = *leftVgId;
3,720,338✔
286
  pPost->rightVgId = *rightVgId;
3,720,338✔
287

288
  while (true) {
289
    if (readIdx < pNode->uidNum) {
3,720,338✔
290
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,641,006✔
291
      break;
3,641,006✔
292
    }
293
    
294
    pNode = pNode->pNext;
79,332✔
295
    if (NULL == pNode) {
79,332✔
296
      pPost->rightNextUid = 0;
79,332✔
297
      break;
79,332✔
298
    }
299
    
300
    rightUid = pNode->pRightUid;
×
301
    readIdx = 0;
×
302
  }
303

304
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,440,676✔
305
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,440,676✔
306

307
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,720,338✔
308
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
309
    pStbJoin->execInfo.rightCacheNum++;
×
310
  }  
311

312
  return TSDB_CODE_SUCCESS;
3,720,338✔
313
}
314

315
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
6,870,272✔
316
  int32_t     code = TSDB_CODE_SUCCESS;
6,870,272✔
317
  int32_t     lino = 0;
6,870,272✔
318
  SOrgTbInfo* pTbInfo = NULL;
6,870,272✔
319

320
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
6,870,272✔
321

322
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
6,870,272✔
323
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
6,870,272✔
324

325
  pTbInfo->vgId = pSrc->vgId;
6,870,272✔
326
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
6,870,272✔
327

328
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
6,870,272✔
329
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
6,870,272✔
330

331
  *ppDst = pTbInfo;
6,870,272✔
332

333
  return code;
6,870,272✔
334
_return:
×
335
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
336
  if (pTbInfo) {
×
337
    if (pTbInfo->colMap) {
×
338
      taosArrayDestroy(pTbInfo->colMap);
×
339
    }
340
    taosMemoryFreeClear(pTbInfo);
×
341
  }
342
  return code;
×
343
}
344

345
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
2,626,916✔
346
  int32_t  code = TSDB_CODE_SUCCESS;
2,626,916✔
347
  int32_t  lino = 0;
2,626,916✔
348
  STagVal  tmpTag;
2,626,916✔
349

350
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
2,626,916✔
351
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
2,626,916✔
352

353
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
17,575,144✔
354
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
14,948,228✔
355
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
14,948,228✔
356
    tmpTag.type = pSrcTag->type;
14,948,228✔
357
    tmpTag.cid = pSrcTag->cid;
14,948,228✔
358
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
14,948,228✔
359
      tmpTag.nData = pSrcTag->nData;
6,562,500✔
360
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
6,562,500✔
361
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
6,562,500✔
362
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
6,562,500✔
363
    } else {
364
      tmpTag.i64 = pSrcTag->i64;
8,385,728✔
365
    }
366

367
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
29,896,456✔
368
    tmpTag = (STagVal){0};
14,948,228✔
369
  }
370

371
  return code;
2,626,916✔
372
_return:
×
373
  if (pBasic->tagList) {
×
374
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
375
    pBasic->tagList = NULL;
×
376
  }
377
  if (tmpTag.pData) {
×
378
    taosMemoryFree(tmpTag.pData);
×
379
  }
380
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
381
  return code;
×
382
}
383

384
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
5,547,926✔
385
  int32_t     code = TSDB_CODE_SUCCESS;
5,547,926✔
386
  int32_t     lino = 0;
5,547,926✔
387
  SOrgTbInfo  batchInfo;
5,547,926✔
388

389
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
5,547,926✔
390
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
5,547,926✔
391

392
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
14,636,913✔
393
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
9,088,987✔
394
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
9,088,987✔
395
    batchInfo.vgId = pSrc->vgId;
9,088,987✔
396
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
9,088,987✔
397
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
9,088,987✔
398
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
9,088,987✔
399
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
18,177,974✔
400
    batchInfo = (SOrgTbInfo){0};
9,088,987✔
401
  }
402

403
  return code;
5,547,926✔
404
_return:
×
405
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
406
  if (pBasic->batchOrgTbInfo) {
×
407
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
408
    pBasic->batchOrgTbInfo = NULL;
×
409
  }
410
  if (batchInfo.colMap) {
×
411
    taosArrayDestroy(batchInfo.colMap);
×
412
    batchInfo.colMap = NULL;
×
413
  }
414
  return code;
×
415
}
416

417
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,440,676✔
418
  int32_t code = TSDB_CODE_SUCCESS;
7,440,676✔
419
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,440,676✔
420
  if (NULL == *ppRes) {
7,440,676✔
421
    code = terrno;
×
422
    freeOperatorParam(pChild, OP_GET_PARAM);
×
423
    return code;
×
424
  }
425
  if (pChild) {
7,440,676✔
426
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
161,918✔
427
    if (NULL == (*ppRes)->pChildren) {
161,918✔
428
      code = terrno;
×
429
      freeOperatorParam(pChild, OP_GET_PARAM);
×
430
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
431
      *ppRes = NULL;
×
432
      return code;
×
433
    }
434
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
323,836✔
435
      code = terrno;
×
436
      freeOperatorParam(pChild, OP_GET_PARAM);
×
437
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
438
      *ppRes = NULL;
×
439
      return code;
×
440
    }
441
  } else {
442
    (*ppRes)->pChildren = NULL;
7,278,758✔
443
  }
444

445
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,440,676✔
446
  if (NULL == pGc) {
7,440,676✔
447
    code = terrno;
×
448
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
449
    *ppRes = NULL;
×
450
    return code;
×
451
  }
452

453
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,440,676✔
454
  pGc->downstreamIdx = downstreamIdx;
7,440,676✔
455
  pGc->vgId = vgId;
7,440,676✔
456
  pGc->tbUid = tbUid;
7,440,676✔
457
  pGc->needCache = needCache;
7,440,676✔
458

459
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,440,676✔
460
  (*ppRes)->downstreamIdx = downstreamIdx;
7,440,676✔
461
  (*ppRes)->value = pGc;
7,440,676✔
462
  (*ppRes)->reUse = false;
7,440,676✔
463

464
  return TSDB_CODE_SUCCESS;
7,440,676✔
465
}
466

467

468
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
469
  int32_t code = TSDB_CODE_SUCCESS;
×
470
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
471
  if (NULL == *ppRes) {
×
472
    return terrno;
×
473
  }
474
  (*ppRes)->pChildren = NULL;
×
475

476
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
477
  if (NULL == pGc) {
×
478
    code = terrno;
×
479
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
480
    return code;
×
481
  }
482

483
  pGc->downstreamIdx = downstreamIdx;
×
484
  pGc->vgId = vgId;
×
485
  pGc->tbUid = tbUid;
×
486

487
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
488
  (*ppRes)->downstreamIdx = downstreamIdx;
×
489
  (*ppRes)->value = pGc;
×
490
  (*ppRes)->reUse = false;
×
491

492
  return TSDB_CODE_SUCCESS;
×
493
}
494

495
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
18,667,328✔
496
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
497
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
498
                                               SArray* pOrgTbInfoArray, STimeWindow window,
499
                                               SDownstreamSourceNode* pDownstreamSourceNode,
500
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
501
  int32_t code = TSDB_CODE_SUCCESS;
18,667,328✔
502
  int32_t lino = 0;
18,667,328✔
503

504
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
18,667,328✔
505
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
506

507
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
18,667,328✔
508
  pBasic->srcOpType = srcOpType;
18,667,328✔
509
  pBasic->vgId = vgId;
18,667,328✔
510
  pBasic->groupid = groupId;
18,667,328✔
511
  pBasic->window = window;
18,667,328✔
512
  pBasic->tableSeq = tableSeq;
18,667,328✔
513
  pBasic->type = exchangeType;
18,667,328✔
514
  pBasic->isNewParam = isNewParam;
18,667,328✔
515

516
  if (pDownstreamSourceNode) {
18,667,328✔
517
    pBasic->isNewDeployed = true;
2,376✔
518
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,376✔
519
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,376✔
520
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,376✔
521
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,376✔
522
    pBasic->newDeployedSrc.localExec = false;
2,376✔
523
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,376✔
524
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,376✔
525
  } else {
526
    pBasic->isNewDeployed = false;
18,664,952✔
527
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
18,664,952✔
528
  }
529

530
  if (pUidList) {
18,667,328✔
531
    pBasic->uidList = taosArrayDup(pUidList, NULL);
4,901,689✔
532
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
4,901,689✔
533
  } else {
534
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
13,765,639✔
535
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
13,765,639✔
536
  }
537

538
  if (pOrgTbInfo) {
18,667,328✔
539
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
6,870,272✔
540
    QUERY_CHECK_CODE(code, lino, _return);
6,870,272✔
541
  } else {
542
    pBasic->orgTbInfo = NULL;
11,797,056✔
543
  }
544

545
  if (pTagList) {
18,667,328✔
546
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
2,626,916✔
547
    QUERY_CHECK_CODE(code, lino, _return);
2,626,916✔
548
  } else {
549
    pBasic->tagList = NULL;
16,040,412✔
550
  }
551

552
  if (pOrgTbInfoArray) {
18,667,328✔
553
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
5,547,926✔
554
    QUERY_CHECK_CODE(code, lino, _return);
5,547,926✔
555
  } else {
556
    pBasic->batchOrgTbInfo = NULL;
13,119,402✔
557
  }
558
  return code;
18,667,328✔
559

560
_return:
×
561
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
562
  freeExchangeGetBasicOperatorParam(pBasic);
×
563
  return code;
×
564
}
565

566
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
12,871,226✔
567
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
568
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
569
                                              SArray* pOrgTbInfoArray, STimeWindow window,
570
                                              SDownstreamSourceNode* pDownstreamSourceNode,
571
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
572

573
  int32_t                      code = TSDB_CODE_SUCCESS;
12,871,226✔
574
  int32_t                      lino = 0;
12,871,226✔
575
  SOperatorParam*              pParam = NULL;
12,871,226✔
576
  SExchangeOperatorParam*      pExc = NULL;
12,871,226✔
577

578
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
12,871,226✔
579
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
12,871,226✔
580

581
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
12,871,226✔
582
  pParam->downstreamIdx = downstreamIdx;
12,871,226✔
583
  pParam->reUse = reUse;
12,871,226✔
584
  pParam->pChildren = NULL;
12,871,226✔
585
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
12,871,226✔
586
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
12,871,226✔
587

588
  pExc = (SExchangeOperatorParam*)pParam->value;
12,871,226✔
589
  pExc->multiParams = false;
12,871,226✔
590

591
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
12,871,226✔
592
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
593
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
594

595
  *ppRes = pParam;
12,871,226✔
596
  return code;
12,871,226✔
597
_return:
×
598
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
599
  if (pParam) {
×
600
    freeOperatorParam(pParam, OP_GET_PARAM);
×
601
  }
602
  return code;
×
603
}
604

605
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,488✔
606
  int32_t code = TSDB_CODE_SUCCESS;
4,488✔
607
  int32_t lino = 0;
4,488✔
608

609
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,488✔
610
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,488✔
611

612
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,488✔
613

614
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,488✔
615
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,488✔
616
  QUERY_CHECK_CODE(code, lino, _return);
4,488✔
617

618
_return:
4,488✔
619
  if (code) {
4,488✔
620
    qError("failed to build exchange operator param, code:%d", code);
×
621
  }
622
  taosArrayDestroy(pUidList);
4,488✔
623
  return code;
4,488✔
624
}
625

626
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
1,347,441✔
627
  int32_t                   code = TSDB_CODE_SUCCESS;
1,347,441✔
628
  int32_t                   lino = 0;
1,347,441✔
629

630
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VTB_WIN_SCAN,
1,347,441✔
631
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
632
  QUERY_CHECK_CODE(code, lino, _return);
1,347,441✔
633

634
  return code;
1,347,441✔
635
_return:
×
636
  qError("failed to build exchange operator param for external window, code:%d", code);
×
637
  return code;
×
638
}
639

640
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
4,649,025✔
641
  int32_t                      code = TSDB_CODE_SUCCESS;
4,649,025✔
642
  int32_t                      lino = 0;
4,649,025✔
643
  SArray*                      pUidList = NULL;
4,649,025✔
644

645
  pUidList = taosArrayInit(1, sizeof(int64_t));
4,649,025✔
646
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,649,025✔
647

648
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
4,649,025✔
649

650
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
4,649,025✔
651
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
4,649,025✔
652
  QUERY_CHECK_CODE(code, lino, _return);
4,649,025✔
653

654
_return:
4,649,025✔
655
  if (code) {
4,649,025✔
656
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
657
  }
658
  taosArrayDestroy(pUidList);
4,649,025✔
659
  return code;
4,649,025✔
660
}
661

662
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
6,870,272✔
663
                                                  SDownstreamSourceNode* pNewSource) {
664
  int32_t                      code = TSDB_CODE_SUCCESS;
6,870,272✔
665
  int32_t                      lino = 0;
6,870,272✔
666

667
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
6,870,272✔
668
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
6,870,272✔
669
  QUERY_CHECK_CODE(code, lino, _return);
6,870,272✔
670

671
  return code;
6,870,272✔
672
_return:
×
673
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
674
  return code;
×
675
}
676

677
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
145,814✔
678
  int32_t                       code = TSDB_CODE_SUCCESS;
145,814✔
679
  int32_t                       line = 0;
145,814✔
680
  SOperatorParam*               pParam = NULL;
145,814✔
681
  SExchangeOperatorBatchParam*  pExc = NULL;
145,814✔
682
  SExchangeOperatorBasicParam   basic = {0};
145,814✔
683

684
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
145,814✔
685
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
145,814✔
686

687
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
145,814✔
688
  pParam->downstreamIdx = downstreamIdx;
145,814✔
689
  pParam->reUse = false;
145,814✔
690
  pParam->pChildren = NULL;
145,814✔
691
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
145,814✔
692
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
145,814✔
693

694
  pExc = pParam->value;
145,814✔
695
  pExc->multiParams = true;
145,814✔
696
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
145,814✔
697
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
145,814✔
698

699
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
145,814✔
700

701
  int32_t iter = 0;
145,814✔
702
  void*   p = NULL;
145,814✔
703
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
393,990✔
704
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
248,176✔
705
    SArray*  pUidList = *(SArray**)p;
248,176✔
706

707
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
248,176✔
708
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
709
                                           pUidList, NULL, NULL, NULL,
710
                                           (STimeWindow){0}, NULL, false, false, false);
248,176✔
711
    QUERY_CHECK_CODE(code, line, _return);
248,176✔
712

713
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
248,176✔
714

715
    basic = (SExchangeOperatorBasicParam){0};
248,176✔
716
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
248,176✔
717

718
    // already transferred to batch param, can free here
719
    taosArrayDestroy(pUidList);
248,176✔
720

721
    *(SArray**)p = NULL;
248,176✔
722
  }
723
  *ppRes = pParam;
145,814✔
724

725
  return code;
145,814✔
726
  
727
_return:
×
728
  qError("failed to build batch exchange operator param, code:%d", code);
×
729
  freeOperatorParam(pParam, OP_GET_PARAM);
×
730
  freeExchangeGetBasicOperatorParam(&basic);
×
731
  return code;
×
732
}
733

734
static int32_t buildBatchExchangeOperatorParamForVirtual(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps, STimeWindow window, EExchangeSourceType type) {
5,190,677✔
735
  int32_t                       code = TSDB_CODE_SUCCESS;
5,190,677✔
736
  int32_t                       lino = 0;
5,190,677✔
737
  SOperatorParam*               pParam = NULL;
5,190,677✔
738
  SExchangeOperatorBatchParam*  pExc = NULL;
5,190,677✔
739
  SExchangeOperatorBasicParam   basic = {0};
5,190,677✔
740

741
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
5,190,677✔
742
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
5,190,677✔
743

744
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
5,190,677✔
745
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
5,190,677✔
746

747
  pExc = pParam->value;
5,190,677✔
748
  pExc->multiParams = true;
5,190,677✔
749

750
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
5,190,677✔
751
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
5,190,677✔
752
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
5,190,677✔
753

754
  size_t keyLen = 0;
5,190,677✔
755
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
5,190,677✔
756
  while (pIter != NULL) {
10,738,603✔
757
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
5,547,926✔
758
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
5,547,926✔
759

760
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
5,547,926✔
761
                                           type, *vgId, groupid,
762
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
763
                                           window, NULL, false, true, false);
764
    QUERY_CHECK_CODE(code, lino, _return);
5,547,926✔
765

766
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
5,547,926✔
767
    QUERY_CHECK_CODE(code, lino, _return);
5,547,926✔
768

769
    basic = (SExchangeOperatorBasicParam){0};
5,547,926✔
770
    pIter = taosHashIterate(pBatchMaps, pIter);
5,547,926✔
771
  }
772

773
  pParam->pChildren = NULL;
5,190,677✔
774
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
5,190,677✔
775
  pParam->downstreamIdx = downstreamIdx;
5,190,677✔
776
  pParam->reUse = false;
5,190,677✔
777

778
  *ppRes = pParam;
5,190,677✔
779
  return code;
5,190,677✔
780

781
_return:
×
782
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
783
  freeOperatorParam(pParam, OP_GET_PARAM);
×
784
  freeExchangeGetBasicOperatorParam(&basic);
×
785
  return code;
×
786
}
787

788
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,720,338✔
789
  int32_t code = TSDB_CODE_SUCCESS;
3,720,338✔
790
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,720,338✔
791
  if (NULL == *ppRes) {
3,720,338✔
792
    code = terrno;
×
793
    return code;
×
794
  }
795
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,720,338✔
796
  if (NULL == (*ppRes)->pChildren) {
3,720,338✔
797
    code = terrno;
×
798
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
799
    *ppRes = NULL;
×
800
    return code;
×
801
  }
802
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,440,676✔
803
    code = terrno;
×
804
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
805
    *ppRes = NULL;
×
806
    return code;
×
807
  }
808
  *ppChild0 = NULL;
3,720,338✔
809
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,440,676✔
810
    code = terrno;
×
811
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
812
    *ppRes = NULL;
×
813
    return code;
×
814
  }
815
  *ppChild1 = NULL;
3,720,338✔
816
  
817
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,720,338✔
818
  if (NULL == pJoin) {
3,720,338✔
819
    code = terrno;
×
820
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
821
    *ppRes = NULL;
×
822
    return code;
×
823
  }
824

825
  pJoin->initDownstream = initParam;
3,720,338✔
826
  
827
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,720,338✔
828
  (*ppRes)->value = pJoin;
3,720,338✔
829
  (*ppRes)->reUse = false;
3,720,338✔
830

831
  return TSDB_CODE_SUCCESS;
3,720,338✔
832
}
833

834
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
835
  int32_t code = TSDB_CODE_SUCCESS;
×
836
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
837
  if (NULL == *ppRes) {
×
838
    code = terrno;
×
839
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
840
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
841
    return code;
×
842
  }
843
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
844
  if (NULL == *ppRes) {
×
845
    code = terrno;
×
846
    taosMemoryFreeClear(*ppRes);
×
847
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
848
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
849
    return code;
×
850
  }
851
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
852
    code = terrno;
×
853
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
854
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
855
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
856
    *ppRes = NULL;
×
857
    return code;
×
858
  }
859
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
860
    code = terrno;
×
861
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
862
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
863
    *ppRes = NULL;
×
864
    return code;
×
865
  }
866
  
867
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
868
  (*ppRes)->value = NULL;
×
869
  (*ppRes)->reUse = false;
×
870

871
  return TSDB_CODE_SUCCESS;
×
872
}
873

874
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
11,616✔
875
  int32_t code = TSDB_CODE_SUCCESS;
11,616✔
876
  int32_t vgNum = tSimpleHashGetSize(pVg);
11,616✔
877
  if (vgNum <= 0 || vgNum > 1) {
11,616✔
878
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
879
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
880
  }
881

882
  int32_t iter = 0;
11,616✔
883
  void* p = NULL;
11,616✔
884
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
23,232✔
885
    SArray* pUidList = *(SArray**)p;
11,616✔
886

887
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
11,616✔
888
    if (code) {
11,616✔
889
      return code;
×
890
    }
891
    taosArrayDestroy(pUidList);
11,616✔
892
    *(SArray**)p = NULL;
11,616✔
893
  }
894
  
895
  return TSDB_CODE_SUCCESS;
11,616✔
896
}
897

898
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
899
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
900
  if (NULL == pUidList) {
×
901
    return terrno;
×
902
  }
903
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
904
    return terrno;
×
905
  }
906

907
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
908
  taosArrayDestroy(pUidList);
×
909
  if (code) {
×
910
    return code;
×
911
  }
912
  
913
  return TSDB_CODE_SUCCESS;
×
914
}
915

916
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,720,338✔
917
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,720,338✔
918
  SOperatorParam*             pSrcParam0 = NULL;
3,720,338✔
919
  SOperatorParam*             pSrcParam1 = NULL;
3,720,338✔
920
  SOperatorParam*             pGcParam0 = NULL;
3,720,338✔
921
  SOperatorParam*             pGcParam1 = NULL;  
3,720,338✔
922
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,720,338✔
923
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,720,338✔
924
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,720,338✔
925
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,720,338✔
926
  int32_t                     code = TSDB_CODE_SUCCESS;
3,720,338✔
927

928
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,720,338✔
929
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
930

931
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,720,338✔
932
  
933
  if (pInfo->stbJoin.basic.batchFetch) {
3,720,338✔
934
    if (pPrev->leftHash) {
3,718,094✔
935
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
78,715✔
936
      if (TSDB_CODE_SUCCESS == code) {
78,715✔
937
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
78,715✔
938
      }
939
      if (TSDB_CODE_SUCCESS == code) {
78,715✔
940
        tSimpleHashCleanup(pPrev->leftHash);
78,715✔
941
        tSimpleHashCleanup(pPrev->rightHash);
78,715✔
942
        pPrev->leftHash = NULL;
78,715✔
943
        pPrev->rightHash = NULL;
78,715✔
944
      }
945
    }
946
  } else {
947
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,244✔
948
    if (TSDB_CODE_SUCCESS == code) {
2,244✔
949
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,244✔
950
    }
951
  }
952

953
  bool initParam = pSrcParam0 ? true : false;
3,720,338✔
954
  if (TSDB_CODE_SUCCESS == code) {
3,720,338✔
955
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,720,338✔
956
    pSrcParam0 = NULL;
3,720,338✔
957
  }
958
  if (TSDB_CODE_SUCCESS == code) {
3,720,338✔
959
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,720,338✔
960
    pSrcParam1 = NULL;
3,720,338✔
961
  }
962
  if (TSDB_CODE_SUCCESS == code) {
3,720,338✔
963
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,720,338✔
964
  }
965
  if (TSDB_CODE_SUCCESS != code) {
3,720,338✔
966
    if (pSrcParam0) {
×
967
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
968
    }
969
    if (pSrcParam1) {
×
970
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
971
    }
972
    if (pGcParam0) {
×
973
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
974
    }
975
    if (pGcParam1) {
×
976
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
977
    }
978
    if (*ppParam) {
×
979
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
980
      *ppParam = NULL;
×
981
    }
982
  }
983
  
984
  return code;
3,720,338✔
985
}
986

987
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
4,649,025✔
988
  int32_t                   code = TSDB_CODE_SUCCESS;
4,649,025✔
989
  int32_t                   lino = 0;
4,649,025✔
990
  SVTableScanOperatorParam* pVScan = NULL;
4,649,025✔
991
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,649,025✔
992
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,649,025✔
993

994
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
4,649,025✔
995
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
4,649,025✔
996

997
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
4,649,025✔
998
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
4,649,025✔
999
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
4,649,025✔
1000
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
4,649,025✔
1001
  pVScan->uid = uid;
4,649,025✔
1002
  pVScan->window = pInfo->vtbScan.window;
4,649,025✔
1003
  if (pInfo->vtbScan.refColGroups) {
4,649,025✔
1004
    pVScan->pRefColGroups = taosArrayInit(taosArrayGetSize(pInfo->vtbScan.refColGroups), sizeof(SRefColIdGroup));
4,824✔
1005
    QUERY_CHECK_NULL(pVScan->pRefColGroups, code, lino, _return, terrno)
4,824✔
1006
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->vtbScan.refColGroups); i++) {
12,653✔
1007
      SRefColIdGroup* pSrc = (SRefColIdGroup*)taosArrayGet(pInfo->vtbScan.refColGroups, i);
7,829✔
1008
      SRefColIdGroup  dst = {0};
7,829✔
1009
      QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
7,829✔
1010
      dst.pSlotIdList = taosArrayDup(pSrc->pSlotIdList, NULL);
7,829✔
1011
      QUERY_CHECK_NULL(dst.pSlotIdList, code, lino, _return, terrno)
7,829✔
1012
      void* px = taosArrayPush(pVScan->pRefColGroups, &dst);
7,829✔
1013
      if (NULL == px) {
7,829✔
1014
        taosArrayDestroy(dst.pSlotIdList);
×
1015
        dst.pSlotIdList = NULL;
×
1016
      }
1017
      QUERY_CHECK_NULL(px, code, lino, _return, terrno)
7,829✔
1018
    }
1019
  } else {
1020
    pVScan->pRefColGroups = NULL;
4,644,201✔
1021
  }
1022

1023
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
4,649,025✔
1024
  (*ppRes)->downstreamIdx = 0;
4,649,025✔
1025
  (*ppRes)->value = pVScan;
4,649,025✔
1026
  (*ppRes)->reUse = false;
4,649,025✔
1027

1028
  return TSDB_CODE_SUCCESS;
4,649,025✔
1029
_return:
×
1030
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1031
  if (pVScan) {
×
1032
    if (pVScan->pRefColGroups) {
×
1033
      taosArrayDestroyEx(pVScan->pRefColGroups, destroyRefColIdGroup);
×
1034
      pVScan->pRefColGroups = NULL;
×
1035
    }
1036
    taosArrayDestroy(pVScan->pOpParamArray);
×
1037
    taosMemoryFreeClear(pVScan);
×
1038
  }
1039
  if (*ppRes) {
×
1040
    taosArrayDestroy((*ppRes)->pChildren);
×
1041
    taosMemoryFreeClear(*ppRes);
×
1042
  }
1043
  return code;
×
1044
}
1045

1046
static int32_t addRefColIdToRefMap(SHashObj* refMap, const char* colrefName, col_id_t colId) {
20,934,692✔
1047
  int32_t  code = TSDB_CODE_SUCCESS;
20,934,692✔
1048
  int32_t  line = 0;
20,934,692✔
1049
  SArray** sameRefColIdList = NULL;
20,934,692✔
1050

1051
  if (colrefName == NULL || colrefName[0] == '\0') {
20,934,692✔
1052
    return code;
×
1053
  }
1054

1055
  sameRefColIdList = (SArray**)taosHashGet(refMap, colrefName, strlen(colrefName));
20,934,692✔
1056
  if (sameRefColIdList == NULL) {
20,934,692✔
1057
    SArray* list = taosArrayInit(2, sizeof(col_id_t));
20,922,660✔
1058
    QUERY_CHECK_NULL(list, code, line, _return, terrno)
20,922,660✔
1059
    QUERY_CHECK_CODE(taosHashPut(refMap, colrefName, strlen(colrefName), &list, POINTER_BYTES), line, _return);
20,922,660✔
1060
    sameRefColIdList = (SArray**)taosHashGet(refMap, colrefName, strlen(colrefName));
20,922,660✔
1061
    QUERY_CHECK_NULL(sameRefColIdList, code, line, _return, terrno)
20,922,660✔
1062
  }
1063

1064
  for (int32_t i = 0; i < taosArrayGetSize(*sameRefColIdList); i++) {
20,952,734✔
1065
    col_id_t existing = *(col_id_t*)taosArrayGet(*sameRefColIdList, i);
18,042✔
1066
    if (existing == colId) {
18,042✔
1067
      return code;
×
1068
    }
1069
  }
1070
  QUERY_CHECK_NULL(taosArrayPush(*sameRefColIdList, &colId), code, line, _return, terrno)
41,869,384✔
1071
  return code;
20,934,692✔
1072

1073
_return:
×
1074
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1075
  return code;
×
1076
}
1077

1078
static int32_t buildRefSlotGroupsFromRefMap(SHashObj* refMap, SArray* readColList, SArray** ppGroups) {
4,649,025✔
1079
  int32_t   code = TSDB_CODE_SUCCESS;
4,649,025✔
1080
  int32_t   line = 0;
4,649,025✔
1081
  SArray*   groups = NULL;
4,649,025✔
1082
  SHashObj* colIdToSlot = NULL;
4,649,025✔
1083

1084
  if (refMap == NULL || readColList == NULL) {
4,649,025✔
1085
    return code;
1,477,590✔
1086
  }
1087

1088
  if (*ppGroups) {
3,171,435✔
1089
    taosArrayDestroyEx(*ppGroups, destroyRefColIdGroup);
3,013✔
1090
    *ppGroups = NULL;
3,013✔
1091
  }
1092

1093
  colIdToSlot = taosHashInit(taosArrayGetSize(readColList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false,
3,171,435✔
1094
                             HASH_NO_LOCK);
1095
  QUERY_CHECK_NULL(colIdToSlot, code, line, _return, terrno)
3,171,435✔
1096

1097
  // Build a quick colId -> slotId lookup for columns actually read.
1098
  for (int32_t i = 0; i < taosArrayGetSize(readColList); i++) {
28,327,443✔
1099
    col_id_t colId = *(col_id_t*)taosArrayGet(readColList, i);
25,156,008✔
1100
    int32_t  slotId = i;
25,156,008✔
1101
    code = taosHashPut(colIdToSlot, &colId, sizeof(colId), &slotId, sizeof(slotId));
25,156,008✔
1102
    QUERY_CHECK_CODE(code, line, _return);
25,156,008✔
1103
  }
1104

1105
  groups = taosArrayInit(1, sizeof(SRefColIdGroup));
3,171,435✔
1106
  QUERY_CHECK_NULL(groups, code, line, _return, terrno)
3,171,435✔
1107

1108
  // Group columns that share the same ref name into slotId lists.
1109
  void* pIter = taosHashIterate(refMap, NULL);
3,171,435✔
1110
  while (pIter != NULL) {
24,094,095✔
1111
    SArray* pList = *(SArray**)pIter;  // colId list
20,922,660✔
1112
    if (pList && taosArrayGetSize(pList) > 1) {
20,922,660✔
1113
      SArray* slotList = taosArrayInit(taosArrayGetSize(pList), sizeof(int32_t));
7,224✔
1114
      QUERY_CHECK_NULL(slotList, code, line, _return, terrno)
7,224✔
1115
      for (int32_t i = 0; i < taosArrayGetSize(pList); i++) {
26,480✔
1116
        col_id_t colId = *(col_id_t*)taosArrayGet(pList, i);
19,256✔
1117
        int32_t* slotId = taosHashGet(colIdToSlot, &colId, sizeof(colId));
19,256✔
1118
        if (slotId) {
19,256✔
1119
          QUERY_CHECK_NULL(taosArrayPush(slotList, slotId), code, line, _return, terrno)
19,256✔
1120
        }
1121
      }
1122
      if (taosArrayGetSize(slotList) > 1) {
7,224✔
1123
        SRefColIdGroup g = {.pSlotIdList = slotList};
7,224✔
1124
        QUERY_CHECK_NULL(taosArrayPush(groups, &g), code, line, _return, terrno)
7,224✔
1125
      } else {
1126
        taosArrayDestroy(slotList);
×
1127
      }
1128
    }
1129
    if (pList) {
20,922,660✔
1130
      taosArrayDestroy(pList);
20,922,660✔
1131
    }
1132
    pIter = taosHashIterate(refMap, pIter);
20,922,660✔
1133
  }
1134

1135
  if (taosArrayGetSize(groups) == 0) {
3,171,435✔
1136
    taosArrayDestroy(groups);
3,167,216✔
1137
    groups = NULL;
3,167,216✔
1138
  }
1139

1140
_return:
4,219✔
1141
  if (code) {
3,171,435✔
1142
    qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1143
    if (groups) {
×
1144
      taosArrayDestroyEx(groups, destroyRefColIdGroup);
×
1145
    }
1146
  }
1147
  if (refMap) {
3,171,435✔
1148
    taosHashCleanup(refMap);
3,171,435✔
1149
  }
1150
  if (colIdToSlot) {
3,171,435✔
1151
    taosHashCleanup(colIdToSlot);
3,171,435✔
1152
  }
1153
  *ppGroups = groups;
3,171,435✔
1154
  return code;
3,171,435✔
1155
}
1156

1157
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
135,754,928✔
1158
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
135,754,928✔
1159
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
135,754,928✔
1160

1161
  if (pVtbScan->scanAllCols) {
135,754,928✔
1162
    return true;
10,989,903✔
1163
  }
1164

1165
  // if readColSet exists, use it to check whether colId is needed, otherwise use readColList
1166
  if (pVtbScan->readColSet) {
124,765,025✔
1167
    return taosHashGet(pVtbScan->readColSet, &colId, sizeof(colId)) != NULL;
124,765,025✔
1168
  }
1169

1170
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->readColList); i++) {
×
1171
    if (colId == *(col_id_t*)taosArrayGet(pVtbScan->readColList, i)) {
×
1172
      return true;
×
1173
    }
1174
  }
1175
  return false;
×
1176
}
1177

1178
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
1,347,441✔
1179
  int32_t                       code = TSDB_CODE_SUCCESS;
1,347,441✔
1180
  int32_t                       lino = 0;
1,347,441✔
1181
  SExternalWindowOperatorParam* pExtWinOp = NULL;
1,347,441✔
1182

1183
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,347,441✔
1184
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
1,347,441✔
1185

1186
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
1,347,441✔
1187
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
1,347,441✔
1188

1189
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
1,347,441✔
1190
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
1,347,441✔
1191

1192
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
1,347,441✔
1193
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
1,347,441✔
1194

1195
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
1,347,441✔
1196
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
1,347,441✔
1197

1198
  SOperatorParam* pExchangeOperator = NULL;
1,347,441✔
1199
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
1,347,441✔
1200
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
1,347,441✔
1201
  QUERY_CHECK_CODE(code, lino, _return);
1,347,441✔
1202
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
2,694,882✔
1203

1204
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
1,347,441✔
1205
  (*ppRes)->downstreamIdx = idx;
1,347,441✔
1206
  (*ppRes)->value = pExtWinOp;
1,347,441✔
1207
  (*ppRes)->reUse = false;
1,347,441✔
1208

1209
  return code;
1,347,441✔
1210
_return:
×
1211
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1212
  if (pExtWinOp) {
×
1213
    if (pExtWinOp->ExtWins) {
×
1214
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1215
    }
1216
    taosMemoryFree(pExtWinOp);
×
1217
  }
1218
  if (*ppRes) {
×
1219
    if ((*ppRes)->pChildren) {
×
1220
      taosArrayDestroy((*ppRes)->pChildren);
×
1221
    }
1222
    taosMemoryFree(*ppRes);
×
1223
    *ppRes = NULL;
×
1224
  }
1225
  return code;
×
1226
}
1227

1228
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
560,322✔
1229
                                       int32_t numOfDownstream, int32_t numOfWins) {
1230
  int32_t                   code = TSDB_CODE_SUCCESS;
560,322✔
1231
  int32_t                   lino = 0;
560,322✔
1232
  SMergeOperatorParam*      pMergeOp = NULL;
560,322✔
1233

1234
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
560,322✔
1235
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
560,322✔
1236

1237
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
560,322✔
1238
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
560,322✔
1239

1240
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
560,322✔
1241
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
560,322✔
1242

1243
  pMergeOp->winNum = numOfWins;
560,322✔
1244

1245
  for (int32_t i = 0; i < numOfDownstream; i++) {
1,907,763✔
1246
    SOperatorParam* pExternalWinParam = NULL;
1,347,441✔
1247
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
1,347,441✔
1248
    QUERY_CHECK_CODE(code, lino, _return);
1,347,441✔
1249
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
2,694,882✔
1250
  }
1251

1252
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
560,322✔
1253
  (*ppRes)->downstreamIdx = 0;
560,322✔
1254
  (*ppRes)->value = pMergeOp;
560,322✔
1255
  (*ppRes)->reUse = false;
560,322✔
1256

1257
  return TSDB_CODE_SUCCESS;
560,322✔
1258
_return:
×
1259
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1260
  if (pMergeOp) {
×
1261
    taosMemoryFree(pMergeOp);
×
1262
  }
1263
  if (*ppRes) {
×
1264
    if ((*ppRes)->pChildren) {
×
1265
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1266
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
1267
        if (pChildParam) {
×
1268
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
1269
          if (pExtWinOp) {
×
1270
            if (pExtWinOp->ExtWins) {
×
1271
              taosArrayDestroy(pExtWinOp->ExtWins);
×
1272
            }
1273
            taosMemoryFree(pExtWinOp);
×
1274
          }
1275
          taosMemoryFree(pChildParam);
×
1276
        }
1277
      }
1278
      taosArrayDestroy((*ppRes)->pChildren);
×
1279
    }
1280
    taosMemoryFree(*ppRes);
×
1281
    *ppRes = NULL;
×
1282
  }
1283
  return code;
×
1284
}
1285

1286
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
108,369✔
1287
  int32_t                   code = TSDB_CODE_SUCCESS;
108,369✔
1288
  int32_t                   lino = 0;
108,369✔
1289
  SOperatorParam*           pParam = NULL;
108,369✔
1290
  SOperatorParam*           pExchangeParam = NULL;
108,369✔
1291
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
108,369✔
1292
  bool                      freeExchange = false;
108,369✔
1293

1294
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
108,369✔
1295
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
108,369✔
1296

1297
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
108,369✔
1298
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
108,369✔
1299

1300
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
108,369✔
1301
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
108,369✔
1302

1303
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
108,369✔
1304
  QUERY_CHECK_CODE(code, lino, _return);
108,369✔
1305

1306
  freeExchange = true;
108,369✔
1307

1308
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
216,738✔
1309

1310
  freeExchange = false;
108,369✔
1311

1312
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
108,369✔
1313
  pParam->downstreamIdx = 0;
108,369✔
1314
  pParam->reUse = false;
108,369✔
1315

1316
  *ppRes = pParam;
108,369✔
1317

1318
  return code;
108,369✔
1319
_return:
×
1320
  if (freeExchange) {
×
1321
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1322
  }
1323
  if (pParam) {
×
1324
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1325
  }
1326
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1327
  return code;
×
1328
}
1329

1330
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
3,103,640✔
1331
  int32_t                   code = TSDB_CODE_SUCCESS;
3,103,640✔
1332
  int32_t                   lino = 0;
3,103,640✔
1333
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,103,640✔
1334
  SOperatorParam*           pParam = NULL;
3,103,640✔
1335
  SOperatorParam*           pExchangeParam = NULL;
3,103,640✔
1336
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
3,103,640✔
1337
  bool                      freeExchange = false;
3,103,640✔
1338
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
3,103,640✔
1339

1340
  if (!pIter) {
3,103,640✔
1341
    *ppRes = NULL;
405,460✔
1342
    return code;
405,460✔
1343
  }
1344

1345
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
2,698,180✔
1346

1347
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
2,698,180✔
1348
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
2,698,180✔
1349

1350
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
2,698,180✔
1351
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
2,698,180✔
1352

1353
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
2,698,180✔
1354
  QUERY_CHECK_CODE(code, lino, _return);
2,698,180✔
1355

1356
  freeExchange = true;
2,698,180✔
1357

1358
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
5,396,360✔
1359

1360
  freeExchange = false;
2,698,180✔
1361

1362
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
2,698,180✔
1363
  pParam->downstreamIdx = 0;
2,698,180✔
1364
  pParam->value = NULL;
2,698,180✔
1365
  pParam->reUse = false;
2,698,180✔
1366

1367
  *ppRes = pParam;
2,698,180✔
1368

1369
  return code;
2,698,180✔
1370
_return:
×
1371
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1372
  if (freeExchange) {
×
1373
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1374
  }
1375
  if (pParam) {
×
1376
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1377
  }
1378
  return code;
×
1379
}
1380

1381
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
2,817,000✔
1382
  int32_t                   code = TSDB_CODE_SUCCESS;
2,817,000✔
1383
  int32_t                   lino = 0;
2,817,000✔
1384
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,817,000✔
1385
  SOperatorParam*           pParam = NULL;
2,817,000✔
1386
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
2,817,000✔
1387
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
2,817,000✔
1388

1389
  if (pIter) {
2,817,000✔
1390
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
2,281,960✔
1391

1392
    code = buildBatchExchangeOperatorParamForVirtual(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
2,281,960✔
1393
    QUERY_CHECK_CODE(code, lino, _return);
2,281,960✔
1394

1395
    *ppRes = pParam;
2,281,960✔
1396
  } else {
1397
    *ppRes = NULL;
535,040✔
1398
  }
1399

1400
  return code;
2,817,000✔
1401
_return:
×
1402
  if (pParam) {
×
1403
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1404
  }
1405
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1406
  return code;
×
1407
}
1408

1409
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,720,338✔
1410
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,720,338✔
1411
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,720,338✔
1412
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,720,338✔
1413
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,720,338✔
1414
  SOperatorParam*            pParam = NULL;
3,720,338✔
1415
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,720,338✔
1416
  if (TSDB_CODE_SUCCESS != code) {
3,720,338✔
1417
    pOperator->pTaskInfo->code = code;
×
1418
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1419
  }
1420

1421
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,720,338✔
1422
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,720,338✔
1423
  if (*ppRes && (code == 0)) {
3,720,338✔
1424
    code = blockDataCheck(*ppRes);
214,060✔
1425
    if (code) {
214,060✔
1426
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1427
      pOperator->pTaskInfo->code = code;
×
1428
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1429
    }
1430
    pPost->isStarted = true;
214,060✔
1431
    pStbJoin->execInfo.postBlkNum++;
214,060✔
1432
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
214,060✔
1433
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
214,060✔
1434
  } else {
1435
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,506,278✔
1436
  }
1437
}
3,720,338✔
1438

1439

1440
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1441
  SOperatorParam* pGcParam = NULL;
×
1442
  SOperatorParam* pMergeJoinParam = NULL;
×
1443
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1444
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1445
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1446

1447
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1448

1449
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1450
  if (TSDB_CODE_SUCCESS != code) {
×
1451
    return code;
×
1452
  }
1453
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1454
  if (TSDB_CODE_SUCCESS != code) {
×
1455
    return code;
×
1456
  }
1457

1458
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1459
}
1460

1461
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,719,833✔
1462
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,719,833✔
1463
  int32_t code = 0;
3,719,833✔
1464
  
1465
  pPost->isStarted = false;
3,719,833✔
1466
  
1467
  if (pStbJoin->basic.batchFetch) {
3,719,833✔
1468
    return TSDB_CODE_SUCCESS;
3,717,589✔
1469
  }
1470
  
1471
  if (pPost->leftNeedCache) {
2,244✔
1472
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1473
    if (num && --(*num) <= 0) {
×
1474
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1475
      if (code) {
×
1476
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1477
        QRY_ERR_RET(code);
×
1478
      }
1479
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1480
    }
1481
  }
1482
  
1483
  if (!pPost->rightNeedCache) {
2,244✔
1484
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,244✔
1485
    if (NULL != v) {
2,244✔
1486
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1487
      if (code) {
×
1488
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1489
        QRY_ERR_RET(code);
×
1490
      }
1491
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1492
    }
1493
  }
1494

1495
  return TSDB_CODE_SUCCESS;
2,244✔
1496
}
1497

1498

1499
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1500
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
293,392✔
1501
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
293,392✔
1502
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
293,392✔
1503

1504
  if (!pPost->isStarted) {
293,392✔
1505
    return TSDB_CODE_SUCCESS;
79,837✔
1506
  }
1507
  
1508
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
213,555✔
1509
  
1510
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
213,555✔
1511
  if (NULL == *ppRes) {
213,555✔
1512
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
213,555✔
1513
    pPrev->pListHead->readIdx++;
213,555✔
1514
  } else {
1515
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1516
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1517
  }
1518

1519
  return TSDB_CODE_SUCCESS;
213,555✔
1520
}
1521

1522
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1523
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,440,228✔
1524
  if (NULL == ppArray) {
7,440,228✔
1525
    SArray* pArray = taosArrayInit(10, valSize);
259,792✔
1526
    if (NULL == pArray) {
259,792✔
1527
      return terrno;
×
1528
    }
1529
    if (NULL == taosArrayPush(pArray, pVal)) {
519,584✔
1530
      taosArrayDestroy(pArray);
×
1531
      return terrno;
×
1532
    }
1533
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
259,792✔
1534
      taosArrayDestroy(pArray);      
×
1535
      return terrno;
×
1536
    }
1537
    return TSDB_CODE_SUCCESS;
259,792✔
1538
  }
1539

1540
  if (NULL == taosArrayPush(*ppArray, pVal)) {
14,360,872✔
1541
    return terrno;
×
1542
  }
1543
  
1544
  return TSDB_CODE_SUCCESS;
7,180,436✔
1545
}
1546

1547
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1548
  int32_t code = TSDB_CODE_SUCCESS;
2,244✔
1549
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,244✔
1550
  if (NULL == pNum) {
2,244✔
1551
    uint32_t n = 1;
2,244✔
1552
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,244✔
1553
    if (code) {
2,244✔
1554
      return code;
×
1555
    }
1556
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,244✔
1557
    if (code) {
2,244✔
1558
      return code;
×
1559
    }
1560
    return TSDB_CODE_SUCCESS;
2,244✔
1561
  }
1562

1563
  switch (*pNum) {
×
1564
    case 0:
×
1565
      break;
×
1566
    case UINT32_MAX:
×
1567
      *pNum = 0;
×
1568
      break;
×
1569
    default:
×
1570
      if (1 == (*pNum)) {
×
1571
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1572
        if (code) {
×
1573
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1574
          QRY_ERR_RET(code);
×
1575
        }
1576
      }
1577
      (*pNum)++;
×
1578
      break;
×
1579
  }
1580
  
1581
  return TSDB_CODE_SUCCESS;
×
1582
}
1583

1584

1585
static void freeStbJoinTableList(SStbJoinTableList* pList) {
79,332✔
1586
  if (NULL == pList) {
79,332✔
1587
    return;
×
1588
  }
1589
  taosMemoryFree(pList->pLeftVg);
79,332✔
1590
  taosMemoryFree(pList->pLeftUid);
79,332✔
1591
  taosMemoryFree(pList->pRightVg);
79,332✔
1592
  taosMemoryFree(pList->pRightUid);
79,332✔
1593
  taosMemoryFree(pList);
79,332✔
1594
}
1595

1596
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
79,837✔
1597
  int32_t code = TSDB_CODE_SUCCESS;
79,837✔
1598
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
79,837✔
1599
  if (NULL == pNew) {
79,837✔
1600
    return terrno;
×
1601
  }
1602
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
79,837✔
1603
  if (NULL == pNew->pLeftVg) {
79,837✔
1604
    code = terrno;
×
1605
    freeStbJoinTableList(pNew);
×
1606
    return code;
×
1607
  }
1608
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
79,837✔
1609
  if (NULL == pNew->pLeftUid) {
79,837✔
1610
    code = terrno;
×
1611
    freeStbJoinTableList(pNew);
×
1612
    return code;
×
1613
  }
1614
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
79,837✔
1615
  if (NULL == pNew->pRightVg) {
79,837✔
1616
    code = terrno;
×
1617
    freeStbJoinTableList(pNew);
×
1618
    return code;
×
1619
  }
1620
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
79,837✔
1621
  if (NULL == pNew->pRightUid) {
79,837✔
1622
    code = terrno;
×
1623
    freeStbJoinTableList(pNew);
×
1624
    return code;
×
1625
  }
1626

1627
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
79,837✔
1628
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
79,837✔
1629
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
79,837✔
1630
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
79,837✔
1631

1632
  pNew->readIdx = 0;
79,837✔
1633
  pNew->uidNum = rows;
79,837✔
1634
  pNew->pNext = NULL;
79,837✔
1635
  
1636
  if (pCtx->pListTail) {
79,837✔
1637
    pCtx->pListTail->pNext = pNew;
×
1638
    pCtx->pListTail = pNew;
×
1639
  } else {
1640
    pCtx->pListHead = pNew;
79,837✔
1641
    pCtx->pListTail= pNew;
79,837✔
1642
  }
1643

1644
  return TSDB_CODE_SUCCESS;
79,837✔
1645
}
1646

1647
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
79,837✔
1648
  int32_t                    code = TSDB_CODE_SUCCESS;
79,837✔
1649
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
79,837✔
1650
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
79,837✔
1651
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
79,837✔
1652
  if (NULL == pVg0) {
79,837✔
1653
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1654
  }
1655
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
79,837✔
1656
  if (NULL == pVg1) {
79,837✔
1657
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1658
  }
1659
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
79,837✔
1660
  if (NULL == pUid0) {
79,837✔
1661
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1662
  }
1663
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
79,837✔
1664
  if (NULL == pUid1) {
79,837✔
1665
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1666
  }
1667

1668
  if (pStbJoin->basic.batchFetch) {
79,837✔
1669
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,798,829✔
1670
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,720,114✔
1671
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,720,114✔
1672
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,720,114✔
1673
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,720,114✔
1674

1675
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,720,114✔
1676
      if (TSDB_CODE_SUCCESS != code) {
3,720,114✔
1677
        break;
×
1678
      }
1679
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,720,114✔
1680
      if (TSDB_CODE_SUCCESS != code) {
3,720,114✔
1681
        break;
×
1682
      }
1683
    }
1684
  } else {
1685
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,366✔
1686
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,244✔
1687
    
1688
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,244✔
1689
      if (TSDB_CODE_SUCCESS != code) {
2,244✔
1690
        break;
×
1691
      }
1692
    }
1693
  }
1694

1695
  if (TSDB_CODE_SUCCESS == code) {
79,837✔
1696
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
79,837✔
1697
    if (TSDB_CODE_SUCCESS == code) {
79,837✔
1698
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
79,837✔
1699
    }
1700
  }
1701

1702
_return:
×
1703

1704
  if (TSDB_CODE_SUCCESS != code) {
79,837✔
1705
    pOperator->pTaskInfo->code = code;
×
1706
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1707
  }
1708
}
79,837✔
1709

1710

1711
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,150,546✔
1712
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,150,546✔
1713
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,150,546✔
1714

1715
  if (pStbJoin->basic.batchFetch) {
1,150,546✔
1716
    return;
1,149,424✔
1717
  }
1718

1719
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,122✔
1720
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,122✔
1721
    return;
1,122✔
1722
  }
1723

1724
  uint64_t* pUid = NULL;
×
1725
  int32_t iter = 0;
×
1726
  int32_t code = 0;
×
1727
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1728
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1729
    if (code) {
×
1730
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1731
    }
1732
  }
1733

1734
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1735
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1736

1737
/*
1738
  // debug only
1739
  iter = 0;
1740
  uint32_t* num = NULL;
1741
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1742
    A S S E R T(*num > 1);
1743
  }
1744
*/  
1745
}
1746

1747
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,150,546✔
1748
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,150,546✔
1749
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,150,546✔
1750

1751
  while (true) {
79,837✔
1752
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,230,383✔
1753
    if (NULL == pBlock) {
1,230,383✔
1754
      break;
1,150,546✔
1755
    }
1756

1757
    pStbJoin->execInfo.prevBlkNum++;
79,837✔
1758
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
79,837✔
1759
    
1760
    doBuildStbJoinTableHash(pOperator, pBlock);
79,837✔
1761
  }
1762

1763
  postProcessStbJoinTableHash(pOperator);
1,150,546✔
1764

1765
  pStbJoin->ctx.prev.joinBuild = true;
1,150,546✔
1766
}
1,150,546✔
1767

1768
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
293,392✔
1769
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
293,392✔
1770
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
293,392✔
1771
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
293,392✔
1772
  SStbJoinTableList*         pNode = pPrev->pListHead;
293,392✔
1773

1774
  while (pNode) {
3,879,002✔
1775
    if (pNode->readIdx >= pNode->uidNum) {
3,799,670✔
1776
      pPrev->pListHead = pNode->pNext;
79,332✔
1777
      freeStbJoinTableList(pNode);
79,332✔
1778
      pNode = pPrev->pListHead;
79,332✔
1779
      continue;
79,332✔
1780
    }
1781
    
1782
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,720,338✔
1783
    if (*ppRes) {
3,720,338✔
1784
      return TSDB_CODE_SUCCESS;
214,060✔
1785
    }
1786

1787
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,506,278✔
1788
    pPrev->pListHead->readIdx++;
3,506,278✔
1789
  }
1790

1791
  *ppRes = NULL;
79,332✔
1792
  setOperatorCompleted(pOperator);
79,332✔
1793

1794
  return TSDB_CODE_SUCCESS;
79,332✔
1795
}
1796

1797
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,364,101✔
1798
  if (pBlock) {
1,364,101✔
1799
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
214,060✔
1800
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
214,060✔
1801
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
214,060✔
1802

1803
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
216,304✔
1804
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,244✔
1805
        if (pSlot == NULL) {
2,244✔
1806
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1807
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1808
        }
1809
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,244✔
1810
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,244✔
1811
        if (code != TSDB_CODE_SUCCESS) {
2,244✔
1812
          return code;
×
1813
        }
1814
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,244✔
1815
        if (code != TSDB_CODE_SUCCESS) {
2,244✔
1816
          return code;
×
1817
        }
1818
      }
1819
    } else {
1820
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1821
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1822
    }
1823
  }
1824
  return TSDB_CODE_SUCCESS;
1,364,101✔
1825
}
1826

1827
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,391,036✔
1828
  int32_t                    code = TSDB_CODE_SUCCESS;
1,391,036✔
1829
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,391,036✔
1830
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,391,036✔
1831

1832
  QRY_PARAM_CHECK(pRes);
1,391,036✔
1833
  if (pOperator->status == OP_EXEC_DONE) {
1,391,036✔
1834
    return code;
26,935✔
1835
  }
1836

1837
  int64_t st = 0;
1,364,101✔
1838
  if (pOperator->cost.openCost == 0) {
1,364,101✔
1839
    st = taosGetTimestampUs();
1,150,026✔
1840
  }
1841

1842
  if (!pStbJoin->ctx.prev.joinBuild) {
1,364,101✔
1843
    buildStbJoinTableList(pOperator);
1,150,546✔
1844
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,150,546✔
1845
      setOperatorCompleted(pOperator);
1,070,709✔
1846
      goto _return;
1,070,709✔
1847
    }
1848
  }
1849

1850
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
293,392✔
1851
  if (*pRes) {
293,392✔
1852
    goto _return;
×
1853
  }
1854

1855
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
293,392✔
1856

1857
_return:
293,392✔
1858
  if (pOperator->cost.openCost == 0) {
1,364,101✔
1859
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,150,026✔
1860
  }
1861

1862
  if (code) {
1,364,101✔
1863
    qError("%s failed since %s", __func__, tstrerror(code));
×
1864
    pOperator->pTaskInfo->code = code;
×
1865
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1866
  } else {
1867
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,364,101✔
1868
  }
1869
  return code;
1,364,101✔
1870
}
1871

1872
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,535,490✔
1873
  int32_t                    lino = 0;
2,535,490✔
1874
  SOperatorInfo*             operator=(SOperatorInfo*) param;
2,535,490✔
1875
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
2,535,490✔
1876

1877
  if (TSDB_CODE_SUCCESS != code) {
2,535,490✔
1878
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1879
    if (operator->pTaskInfo->code != code) {
×
1880
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1881
             tstrerror(operator->pTaskInfo->code));
1882
    } else {
1883
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1884
    }
1885
    goto _return;
×
1886
  }
1887

1888
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
2,535,490✔
1889
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
2,535,490✔
1890

1891
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
2,535,490✔
1892
  QUERY_CHECK_CODE(code, lino, _return);
2,535,490✔
1893

1894
  taosMemoryFreeClear(pMsg->pData);
2,535,490✔
1895

1896
  code = tsem_post(&pScanResInfo->vtbScan.ready);
2,535,490✔
1897
  QUERY_CHECK_CODE(code, lino, _return);
2,535,490✔
1898

1899
  return code;
2,535,490✔
1900
_return:
×
1901
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1902
  return code;
×
1903
}
1904

1905
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
2,535,490✔
1906
  int32_t                    code = TSDB_CODE_SUCCESS;
2,535,490✔
1907
  int32_t                    lino = 0;
2,535,490✔
1908
  char*                      buf1 = NULL;
2,535,490✔
1909
  SUseDbReq*                 pReq = NULL;
2,535,490✔
1910
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
2,535,490✔
1911

1912
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
2,535,490✔
1913
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
2,535,490✔
1914
  code = tNameGetFullDbName(name, pReq->db);
2,535,490✔
1915
  QUERY_CHECK_CODE(code, lino, _return);
2,535,490✔
1916
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
2,535,490✔
1917
  buf1 = taosMemoryCalloc(1, contLen);
2,535,490✔
1918
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
2,535,490✔
1919
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
2,535,490✔
1920
  if (tempRes < 0) {
2,535,490✔
1921
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1922
  }
1923

1924
  // send the fetch remote task result request
1925
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,535,490✔
1926
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
2,535,490✔
1927

1928
  pMsgSendInfo->param = pOperator;
2,535,490✔
1929
  pMsgSendInfo->msgInfo.pData = buf1;
2,535,490✔
1930
  pMsgSendInfo->msgInfo.len = contLen;
2,535,490✔
1931
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
2,535,490✔
1932
  pMsgSendInfo->fp = dynProcessUseDbRsp;
2,535,490✔
1933
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
2,535,490✔
1934

1935
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
2,535,490✔
1936
  QUERY_CHECK_CODE(code, lino, _return);
2,535,490✔
1937

1938
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
2,535,490✔
1939
  QUERY_CHECK_CODE(code, lino, _return);
2,535,490✔
1940

1941
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
2,535,490✔
1942
  QUERY_CHECK_CODE(code, lino, _return);
2,535,490✔
1943

1944
_return:
2,535,490✔
1945
  if (code) {
2,535,490✔
1946
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1947
     taosMemoryFree(buf1);
×
1948
  }
1949
  taosMemoryFree(pReq);
2,535,490✔
1950
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
2,535,490✔
1951
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
2,535,490✔
1952
  return code;
2,535,490✔
1953
}
1954

1955
int dynVgInfoComp(const void* lp, const void* rp) {
5,059,648✔
1956
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
5,059,648✔
1957
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
5,059,648✔
1958
  if (pLeft->hashBegin < pRight->hashBegin) {
5,059,648✔
1959
    return -1;
5,059,648✔
1960
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1961
    return 1;
×
1962
  }
1963

1964
  return 0;
×
1965
}
1966

1967
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
15,804,513✔
1968
  if (NULL == dbInfo) {
15,804,513✔
1969
    return TSDB_CODE_SUCCESS;
×
1970
  }
1971

1972
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
15,804,513✔
1973
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
2,535,490✔
1974
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
2,535,490✔
1975
    if (NULL == dbInfo->vgArray) {
2,535,490✔
1976
      return terrno;
×
1977
    }
1978

1979
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
2,535,490✔
1980
    while (pIter) {
7,600,804✔
1981
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
10,130,628✔
1982
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1983
        return terrno;
×
1984
      }
1985

1986
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
5,065,314✔
1987
    }
1988

1989
    taosArraySort(dbInfo->vgArray, sort_func);
2,535,490✔
1990
  }
1991

1992
  return TSDB_CODE_SUCCESS;
15,804,513✔
1993
}
1994

1995
int32_t dynHashValueComp(void const* lp, void const* rp) {
23,727,135✔
1996
  uint32_t*    key = (uint32_t*)lp;
23,727,135✔
1997
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
23,727,135✔
1998

1999
  if (*key < pVg->hashBegin) {
23,727,135✔
2000
    return -1;
×
2001
  } else if (*key > pVg->hashEnd) {
23,727,135✔
2002
    return 1;
7,922,622✔
2003
  }
2004

2005
  return 0;
15,804,513✔
2006
}
2007

2008
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
15,804,513✔
2009
  int32_t code = 0;
15,804,513✔
2010
  int32_t lino = 0;
15,804,513✔
2011
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
15,804,513✔
2012
  QUERY_CHECK_CODE(code, lino, _return);
15,804,513✔
2013

2014
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
15,804,513✔
2015
  if (vgNum <= 0) {
15,804,513✔
2016
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
2017
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
2018
  }
2019

2020
  SVgroupInfo* vgInfo = NULL;
15,804,513✔
2021
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
15,804,513✔
2022
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
15,804,513✔
2023
  int32_t offset = (int32_t)strlen(tbFullName);
15,804,513✔
2024

2025
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
15,804,513✔
2026
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
31,609,026✔
2027
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
15,804,513✔
2028

2029
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
15,804,513✔
2030
  if (NULL == vgInfo) {
15,804,513✔
2031
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
2032
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
2033
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
2034
  }
2035

2036
  *vgId = vgInfo->vgId;
15,804,513✔
2037

2038
_return:
15,804,513✔
2039
  return code;
15,804,513✔
2040
}
2041

2042
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
46,317,197✔
2043
  int32_t                    code = TSDB_CODE_SUCCESS;
46,317,197✔
2044
  int32_t                    line = 0;
46,317,197✔
2045
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
46,317,197✔
2046
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
46,317,197✔
2047
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
46,317,197✔
2048
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
46,317,197✔
2049
  SUseDbOutput*              output = NULL;
46,317,197✔
2050
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
46,317,197✔
2051

2052
  QRY_PARAM_CHECK(dbVgInfo);
46,317,197✔
2053

2054
  if (find == NULL) {
46,317,197✔
2055
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
2,535,490✔
2056
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
2,535,490✔
2057
    QUERY_CHECK_CODE(code, line, _return);
2,535,490✔
2058
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
2,535,490✔
2059
    QUERY_CHECK_CODE(code, line, _return);
2,535,490✔
2060
  } else {
2061
    output = *find;
43,781,707✔
2062
  }
2063

2064
  *dbVgInfo = output->dbVgroup;
46,317,197✔
2065
  return code;
46,317,197✔
2066
_return:
×
2067
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2068
  freeUseDbOutput(output);
×
2069
  return code;
×
2070
}
2071

2072
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
46,317,197✔
2073
  int32_t     code = TSDB_CODE_SUCCESS;
46,317,197✔
2074
  int32_t     line = 0;
46,317,197✔
2075

2076
  const char *first_dot = strchr(colref, '.');
46,317,197✔
2077
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
46,317,197✔
2078

2079
  const char *second_dot = strchr(first_dot + 1, '.');
46,317,197✔
2080
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
46,317,197✔
2081

2082
  size_t db_len = first_dot - colref;
46,317,197✔
2083
  size_t table_len = second_dot - first_dot - 1;
46,317,197✔
2084
  size_t col_len = strlen(second_dot + 1);
46,317,197✔
2085

2086
  *refDb = taosMemoryMalloc(db_len + 1);
46,317,197✔
2087
  *refTb = taosMemoryMalloc(table_len + 1);
46,317,197✔
2088
  *refCol = taosMemoryMalloc(col_len + 1);
46,317,197✔
2089
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
46,317,197✔
2090
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
46,317,197✔
2091
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
46,317,197✔
2092

2093
  tstrncpy(*refDb, colref, db_len + 1);
46,317,197✔
2094
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
46,317,197✔
2095
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
46,317,197✔
2096

2097
  return TSDB_CODE_SUCCESS;
46,317,197✔
2098
_return:
×
2099
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2100
  if (*refDb) {
×
2101
    taosMemoryFree(*refDb);
×
2102
    *refDb = NULL;
×
2103
  }
2104
  if (*refTb) {
×
2105
    taosMemoryFree(*refTb);
×
2106
    *refTb = NULL;
×
2107
  }
2108
  if (*refCol) {
×
2109
    taosMemoryFree(*refCol);
×
2110
    *refCol = NULL;
×
2111
  }
2112
  return code;
×
2113
}
2114

2115
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
333,282,436✔
2116
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
333,282,436✔
2117
      strlen(expectTbName) == varDataLen(tbName) &&
220,501,784✔
2118
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
220,501,784✔
2119
      strlen(expectDbName) == varDataLen(dbName)) {
220,501,784✔
2120
    return true;
220,501,784✔
2121
  }
2122
  return false;
112,780,652✔
2123
}
2124

2125
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
220,501,784✔
2126
  int32_t          code = TSDB_CODE_SUCCESS;
220,501,784✔
2127
  int32_t          line = 0;
220,501,784✔
2128

2129
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
220,501,784✔
2130
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
220,501,784✔
2131
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
220,501,784✔
2132
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
220,501,784✔
2133
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
220,501,784✔
2134
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
220,501,784✔
2135

2136
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
220,501,784✔
2137
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
220,501,784✔
2138
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
220,501,784✔
2139
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
220,501,784✔
2140
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
220,501,784✔
2141
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
220,501,784✔
2142

2143
  if (colDataIsNull_s(pRefCol, index)) {
441,003,568✔
2144
    pInfo->colrefName = NULL;
84,740,592✔
2145
  } else {
2146
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
135,761,192✔
2147
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
135,761,192✔
2148
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
135,761,192✔
2149
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
135,761,192✔
2150
  }
2151

2152
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
220,501,784✔
2153
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
220,501,784✔
2154
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
220,501,784✔
2155
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
220,501,784✔
2156

2157
  if (!colDataIsNull_s(pUidCol, index)) {
441,003,568✔
2158
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
220,501,784✔
2159
  }
2160
  if (!colDataIsNull_s(pColIdCol, index)) {
441,003,568✔
2161
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
135,761,192✔
2162
  }
2163
  if (!colDataIsNull_s(pVgIdCol, index)) {
441,003,568✔
2164
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
220,501,784✔
2165
  }
2166

2167
_return:
×
2168
  return code;
220,501,784✔
2169
}
2170

2171
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,336,314✔
2172
  int32_t                    code = TSDB_CODE_SUCCESS;
1,336,314✔
2173
  int32_t                    line = 0;
1,336,314✔
2174

2175
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,336,314✔
2176
    return code;
1,216,226✔
2177
  }
2178

2179
  if (pVtbScan->existOrgTbVg == NULL) {
120,088✔
2180
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2181
    pVtbScan->curOrgTbVg = NULL;
×
2182
  }
2183

2184
  if (pVtbScan->curOrgTbVg != NULL) {
120,088✔
2185
    // which means rversion has changed
2186
    void*   pCurIter = NULL;
9,992✔
2187
    SArray* tmpArray = NULL;
9,992✔
2188
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
28,840✔
2189
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
18,848✔
2190
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
18,848✔
2191
        if (tmpArray == NULL) {
2,376✔
2192
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,376✔
2193
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,376✔
2194
        }
2195
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,376✔
2196
      }
2197
    }
2198
    if (tmpArray == NULL) {
9,992✔
2199
      return TSDB_CODE_SUCCESS;
7,616✔
2200
    }
2201
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,376✔
2202
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,376✔
2203
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,376✔
2204
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2205
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2206
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2207
        }
2208
        taosArrayDestroy(expiredInfo);
×
2209
      }
2210
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,376✔
2211
        taosArrayDestroy(tmpArray);
×
2212
      }
2213
    }
2214
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,376✔
2215
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,376✔
2216
    taosHashClear(pVtbScan->curOrgTbVg);
2,376✔
2217
    pVtbScan->needRedeploy = true;
2,376✔
2218
    pVtbScan->rversion = rversion;
2,376✔
2219
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,376✔
2220
  }
2221
  return code;
110,096✔
2222
_return:
×
2223
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2224
  return code;
×
2225
}
2226

2227
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
25,824✔
2228
  int32_t                    code =TSDB_CODE_SUCCESS;
25,824✔
2229
  int32_t                    line = 0;
25,824✔
2230
  char*                      refDbName = NULL;
25,824✔
2231
  char*                      refTbName = NULL;
25,824✔
2232
  char*                      refColName = NULL;
25,824✔
2233
  SDBVgInfo*                 dbVgInfo = NULL;
25,824✔
2234
  SName                      name = {0};
25,824✔
2235
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
25,824✔
2236
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
25,824✔
2237

2238
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
25,824✔
2239
  QUERY_CHECK_CODE(code, line, _return);
25,824✔
2240

2241
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
25,824✔
2242

2243
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
25,824✔
2244
  QUERY_CHECK_CODE(code, line, _return);
25,824✔
2245

2246
  code = tNameGetFullDbName(&name, dbFname);
25,824✔
2247
  QUERY_CHECK_CODE(code, line, _return);
25,824✔
2248

2249
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
25,824✔
2250
  QUERY_CHECK_CODE(code, line, _return);
25,824✔
2251

2252
_return:
25,824✔
2253
  if (code) {
25,824✔
2254
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2255
  }
2256
  taosMemoryFree(refDbName);
25,824✔
2257
  taosMemoryFree(refTbName);
25,824✔
2258
  taosMemoryFree(refColName);
25,824✔
2259
  return code;
25,824✔
2260
}
2261

2262
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
2,817,000✔
2263
  int32_t code = TSDB_CODE_SUCCESS;
2,817,000✔
2264
  int32_t line = 0;
2,817,000✔
2265
  STagVal tagVal = {0};
2,817,000✔
2266
  // last col is uid
2267

2268
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
2,817,000✔
2269
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
2,817,000✔
2270

2271
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
20,085,048✔
2272
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
17,268,048✔
2273
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
17,268,048✔
2274
    tagVal.type = pTagCol->info.type;
17,268,048✔
2275
    tagVal.cid = pTagCol->info.colId;
17,268,048✔
2276
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
34,536,096✔
2277
      char*   pData = colDataGetData(pTagCol, rowIdx);
17,268,048✔
2278
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
17,268,048✔
2279
        tagVal.nData = varDataLen(pData);
7,631,784✔
2280
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
7,631,784✔
2281
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
7,631,784✔
2282
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
7,631,784✔
2283
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
15,263,568✔
2284
      } else {
2285
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
9,636,264✔
2286
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
19,272,528✔
2287
      }
2288
    } else {
2289
      tagVal.pData = NULL;
×
2290
      tagVal.nData = 0;
×
2291
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2292
    }
2293
    tagVal = (STagVal){0};
17,268,048✔
2294
  }
2295
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
2,817,000✔
2296
  QUERY_CHECK_CODE(code, line, _return);
2,817,000✔
2297

2298
  return code;
2,817,000✔
2299
_return:
×
2300
  if (tagVal.pData) {
×
2301
    taosMemoryFreeClear(tagVal.pData);
×
2302
  }
2303
  if (pTagList) {
×
2304
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2305
  }
2306
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2307
  return code;
×
2308
}
2309

2310
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId,
11,678,797✔
2311
                                          SHashObj** ppRefMap) {
2312
  int32_t                    code = TSDB_CODE_SUCCESS;
11,678,797✔
2313
  int32_t                    line = 0;
11,678,797✔
2314
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,678,797✔
2315
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
11,678,797✔
2316
  SDBVgInfo*                 dbVgInfo = NULL;
11,678,797✔
2317
  SHashObj*                  refMap = NULL;
11,678,797✔
2318

2319
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
232,168,485✔
2320
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
220,489,688✔
2321
    *uid = pKV->uid;
220,489,688✔
2322
    *vgId = pKV->vgId;
220,489,688✔
2323
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
220,489,688✔
2324
      char*   refDbName = NULL;
46,291,373✔
2325
      char*   refTbName = NULL;
46,291,373✔
2326
      char*   refColName = NULL;
46,291,373✔
2327
      SName   name = {0};
46,291,373✔
2328
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
46,291,373✔
2329
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
46,291,373✔
2330

2331
      if (ppRefMap != NULL) {
46,291,373✔
2332
        // Track colref -> colId mapping for later slot grouping.
2333
        if (refMap == NULL) {
20,934,692✔
2334
          refMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
3,171,435✔
2335
          QUERY_CHECK_NULL(refMap, code, line, _return, terrno)
3,171,435✔
2336
        }
2337
        code = addRefColIdToRefMap(refMap, pKV->colrefName, pKV->colId);
20,934,692✔
2338
        QUERY_CHECK_CODE(code, line, _return);
20,934,692✔
2339
      }
2340

2341
      // Parse db/tb/col ref and resolve source table vgId.
2342
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
46,291,373✔
2343
      QUERY_CHECK_CODE(code, line, _return);
46,291,373✔
2344

2345
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
46,291,373✔
2346

2347
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
46,291,373✔
2348
      QUERY_CHECK_CODE(code, line, _return);
46,291,373✔
2349
      code = tNameGetFullDbName(&name, dbFname);
46,291,373✔
2350
      QUERY_CHECK_CODE(code, line, _return);
46,291,373✔
2351
      code = tNameGetFullTableName(&name, orgTbFName);
46,291,373✔
2352
      QUERY_CHECK_CODE(code, line, _return);
46,291,373✔
2353

2354
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
46,291,373✔
2355
      if (!pVal) {
46,291,373✔
2356
        SOrgTbInfo orgTbInfo = {0};
15,778,689✔
2357
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
15,778,689✔
2358
        QUERY_CHECK_CODE(code, line, _return);
15,778,689✔
2359
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
15,778,689✔
2360
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
15,778,689✔
2361
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
15,778,689✔
2362
        SColIdNameKV colIdNameKV = {0};
15,778,689✔
2363
        colIdNameKV.colId = pKV->colId;
15,778,689✔
2364
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
15,778,689✔
2365
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
31,557,378✔
2366
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
15,778,689✔
2367
        QUERY_CHECK_CODE(code, line, _return);
15,778,689✔
2368
      } else {
2369
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
30,512,684✔
2370
        SColIdNameKV colIdNameKV = {0};
30,512,684✔
2371
        colIdNameKV.colId = pKV->colId;
30,512,684✔
2372
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
30,512,684✔
2373
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
61,025,368✔
2374
      }
2375
      taosMemoryFree(refDbName);
46,291,373✔
2376
      taosMemoryFree(refTbName);
46,291,373✔
2377
      taosMemoryFree(refColName);
46,291,373✔
2378
    }
2379
  }
2380

2381
  if (ppRefMap != NULL) {
11,678,797✔
2382
    *ppRefMap = refMap;
4,649,025✔
2383
  }
2384

2385
  return code;
11,678,797✔
2386

2387
_return:
×
2388
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2389
  if (refMap) {
×
2390
    taosHashCleanup(refMap);
×
2391
  }
2392
  return code;
×
2393
}
2394

2395
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
315,540✔
2396
  int32_t                    code = TSDB_CODE_SUCCESS;
315,540✔
2397
  int32_t                    line = 0;
315,540✔
2398
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
315,540✔
2399
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
315,540✔
2400
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
315,540✔
2401
  SArray*                    pColRefArray = NULL;
315,540✔
2402
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
315,540✔
2403
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
315,540✔
2404

2405
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
315,540✔
2406
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
315,540✔
2407
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
315,540✔
2408
  if (hasPartition) {
315,540✔
2409
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
256,800✔
2410
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
256,800✔
2411
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
256,800✔
2412
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
256,800✔
2413
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
256,800✔
2414
  }
2415

2416
  while (true) {
1,187,144✔
2417
    SSDataBlock *pTagVal = NULL;
1,502,684✔
2418
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
1,502,684✔
2419
    QUERY_CHECK_CODE(code, line, _return);
1,502,684✔
2420
    if (pTagVal == NULL) {
1,502,684✔
2421
      break;
315,540✔
2422
    }
2423
    SHashObj *vtbUidTagListMap = NULL;
1,187,144✔
2424
    if (hasPartition) {
1,187,144✔
2425
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
1,069,664✔
2426
      if (pIter) {
1,069,664✔
2427
        vtbUidTagListMap = *(SHashObj**)pIter;
8,360✔
2428
      } else {
2429
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,061,304✔
2430
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
1,061,304✔
2431
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
1,061,304✔
2432

2433
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
1,061,304✔
2434
        QUERY_CHECK_CODE(code, line, _return);
1,061,304✔
2435
      }
2436
    } else {
2437
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
117,480✔
2438
    }
2439

2440
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,187,144✔
2441
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,187,144✔
2442
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
4,004,144✔
2443
      tb_uid_t uid = 0;
2,817,000✔
2444
      if (!colDataIsNull_s(pUidCol, i)) {
5,634,000✔
2445
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
2,817,000✔
2446
        QUERY_CHECK_CODE(code, line, _return);
2,817,000✔
2447
      }
2448

2449
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
2,817,000✔
2450
      QUERY_CHECK_CODE(code, line, _return);
2,817,000✔
2451

2452
      if (hasPartition) {
2,817,000✔
2453
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
2,311,200✔
2454
        QUERY_CHECK_CODE(code, line, _return);
2,311,200✔
2455
      }
2456
    }
2457
  }
2458

2459
  return code;
315,540✔
2460

2461
_return:
×
2462
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2463
  return code;
×
2464
}
2465

2466
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
315,540✔
2467
  int32_t                    code = TSDB_CODE_SUCCESS;
315,540✔
2468
  int32_t                    line = 0;
315,540✔
2469
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
315,540✔
2470
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
315,540✔
2471
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
315,540✔
2472
  SArray*                    pColRefArray = NULL;
315,540✔
2473
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
315,540✔
2474
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
315,540✔
2475

2476
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
315,540✔
2477
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
315,540✔
2478

2479
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
2,597,500✔
2480
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,281,960✔
2481
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
2,281,960✔
2482

2483
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
2,281,960✔
2484
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
2,281,960✔
2485

2486
    tb_uid_t uid = 0;
2,281,960✔
2487
    int32_t  vgId = 0;
2,281,960✔
2488
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, NULL);
2,281,960✔
2489
    QUERY_CHECK_CODE(code, line, _return);
2,281,960✔
2490

2491
    size_t len = 0;
2,281,960✔
2492
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
2,281,960✔
2493
    while (pOrgTbInfo != NULL) {
5,437,752✔
2494
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
3,155,792✔
2495
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
3,155,792✔
2496

2497
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
3,155,792✔
2498
      if (!pIter) {
3,155,792✔
2499
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
2,626,916✔
2500
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,626,916✔
2501
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
5,253,832✔
2502
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
2,626,916✔
2503
        QUERY_CHECK_CODE(code, line, _return);
2,626,916✔
2504
      } else {
2505
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
528,876✔
2506
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
528,876✔
2507
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
528,876✔
2508
      }
2509

2510
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
3,155,792✔
2511

2512
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
3,155,792✔
2513
      QUERY_CHECK_CODE(code, line, _return);
3,155,792✔
2514
    }
2515

2516
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
2,281,960✔
2517
    QUERY_CHECK_CODE(code, line, _return);
2,281,960✔
2518
  }
2519

2520
  return code;
315,540✔
2521
_return:
×
2522
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2523
  return code;
×
2524
}
2525

2526
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
315,540✔
2527
  int32_t                    code = TSDB_CODE_SUCCESS;
315,540✔
2528
  int32_t                    line = 0;
315,540✔
2529

2530
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
315,540✔
2531
  QUERY_CHECK_CODE(code, line, _return);
315,540✔
2532

2533
  // process tag
2534
  code = getTagBlockAndProcess(pOperator, hasPartition);
315,540✔
2535
  QUERY_CHECK_CODE(code, line, _return);
315,540✔
2536

2537
  return code;
315,540✔
2538
_return:
×
2539
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2540
  return code;
×
2541
}
2542

2543
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
664,303✔
2544
  int32_t                    code = TSDB_CODE_SUCCESS;
664,303✔
2545
  int32_t                    line = 0;
664,303✔
2546
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
664,303✔
2547
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
664,303✔
2548
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
664,303✔
2549
  SArray*                    pColRefArray = NULL;
664,303✔
2550
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
664,303✔
2551
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
664,303✔
2552

2553
  if (hasPartition) {
664,303✔
2554
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
460,732✔
2555
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
460,732✔
2556
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
460,732✔
2557

2558
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
460,732✔
2559
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
460,732✔
2560
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
460,732✔
2561
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
460,732✔
2562
  } else {
2563
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
203,571✔
2564
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
203,571✔
2565
  }
2566

2567
  while (true && hasPartition) {
2,598,509✔
2568
    SSDataBlock* pTagVal = NULL;
2,394,938✔
2569
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
2,394,938✔
2570
    QUERY_CHECK_CODE(code, line, _return);
2,394,938✔
2571
    if (pTagVal == NULL) {
2,394,938✔
2572
      break;
460,732✔
2573
    }
2574

2575
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,934,206✔
2576
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,934,206✔
2577
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
6,057,554✔
2578
      tb_uid_t uid = 0;
4,123,348✔
2579
      if (!colDataIsNull_s(pUidCol, i)) {
8,246,696✔
2580
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
4,123,348✔
2581
        QUERY_CHECK_CODE(code, line, _return);
4,123,348✔
2582
      }
2583
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
4,123,348✔
2584
      QUERY_CHECK_CODE(code, line, _return);
4,123,348✔
2585
    }
2586
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
1,934,206✔
2587
    QUERY_CHECK_CODE(code, line, _return);
1,934,206✔
2588
  }
2589

2590
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
5,412,115✔
2591
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
4,747,812✔
2592
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
4,747,812✔
2593
    tb_uid_t uid = 0;
4,747,812✔
2594
    int32_t  vgId = 0;
4,747,812✔
2595
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, NULL);
4,747,812✔
2596
    QUERY_CHECK_CODE(code, line, _return);
4,747,812✔
2597

2598
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
4,747,812✔
2599
    if (hasPartition) {
4,747,812✔
2600
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
3,588,308✔
2601
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
3,588,308✔
2602

2603
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
3,588,308✔
2604
      if (pHashIter) {
3,588,308✔
2605
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
1,854,742✔
2606
      } else {
2607
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,733,566✔
2608
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,733,566✔
2609
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,733,566✔
2610
        QUERY_CHECK_CODE(code, line, _return);
1,733,566✔
2611
      }
2612
    } else {
2613
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
1,159,504✔
2614
    }
2615

2616
    size_t len = 0;
4,747,812✔
2617
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
4,747,812✔
2618
    while (pOrgTbInfo != NULL) {
10,500,437✔
2619
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
5,752,625✔
2620
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
5,752,625✔
2621
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
5,752,625✔
2622
      if (!pIter) {
5,752,625✔
2623
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
2,806,032✔
2624
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,806,032✔
2625
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
5,612,064✔
2626
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
2,806,032✔
2627
        QUERY_CHECK_CODE(code, line, _return);
2,806,032✔
2628
      } else {
2629
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
2,946,593✔
2630
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,946,593✔
2631
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
2,946,593✔
2632
      }
2633

2634
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
5,752,625✔
2635

2636
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
5,752,625✔
2637
      QUERY_CHECK_CODE(code, line, _return);
5,752,625✔
2638
    }
2639
  }
2640
  return code;
664,303✔
2641
_return:
×
2642
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2643
  return code;
×
2644
}
2645

2646
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
2,201,741✔
2647
  int32_t                    code = TSDB_CODE_SUCCESS;
2,201,741✔
2648
  int32_t                    line = 0;
2,201,741✔
2649
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,201,741✔
2650
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,201,741✔
2651
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,201,741✔
2652
  SArray*                    pColRefArray = NULL;
2,201,741✔
2653
  SOperatorInfo*             pSystableScanOp = NULL;
2,201,741✔
2654
  
2655
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,201,741✔
2656
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
2,201,741✔
2657

2658
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
2,201,741✔
2659
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
884,641✔
2660
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
884,641✔
2661
    pSystableScanOp = pOperator->pDownstream[0];
884,641✔
2662
  } else if (pInfo->qType == DYN_QTYPE_VTB_WINDOW) {
1,317,100✔
2663
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
95,202✔
2664
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
95,202✔
2665
    pSystableScanOp = pOperator->pDownstream[1];
95,202✔
2666
  } else {
2667
    pSystableScanOp = pOperator->pDownstream[1];
1,221,898✔
2668
  }
2669

2670
  while (true) {
4,405,212✔
2671
    SSDataBlock *pChildInfo = NULL;
6,606,953✔
2672
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
6,606,953✔
2673
    QUERY_CHECK_CODE(code, line, _return);
6,606,953✔
2674
    if (pChildInfo == NULL) {
6,606,953✔
2675
      break;
2,201,741✔
2676
    }
2677
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
4,405,212✔
2678
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
4,405,212✔
2679
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
4,405,212✔
2680

2681
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
4,405,212✔
2682
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
4,405,212✔
2683
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
4,405,212✔
2684

2685
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
329,549,620✔
2686
      if (!colDataIsNull_s(pStbNameCol, i)) {
650,288,816✔
2687
        char* stbrawname = colDataGetData(pStbNameCol, i);
325,144,408✔
2688
        char* dbrawname = colDataGetData(pDbNameCol, i);
325,144,408✔
2689
        char *ctbName = colDataGetData(pTableNameCol, i);
325,144,408✔
2690

2691
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
325,144,408✔
2692
          SColRefInfo info = {0};
219,933,648✔
2693
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
219,933,648✔
2694
          QUERY_CHECK_CODE(code, line, _return);
219,933,648✔
2695

2696
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
219,933,648✔
2697
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
107,457,296✔
2698
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2699
                     pInfo->vtbScan.dynTbUid);
2700
              destroyColRefInfo(&info);
×
2701
              continue;
×
2702
            }
2703

2704
            if (pTaskInfo->pStreamRuntimeInfo) {
107,457,296✔
2705
              if (pVtbScan->curOrgTbVg == NULL) {
35,008✔
2706
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,324✔
2707
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,324✔
2708
              }
2709

2710
              if (info.colrefName) {
35,008✔
2711
                int32_t vgId;
18,912✔
2712
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
18,912✔
2713
                QUERY_CHECK_CODE(code, line, _return);
18,912✔
2714
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
18,912✔
2715
                QUERY_CHECK_CODE(code, line, _return);
18,912✔
2716
              }
2717
            }
2718
          }
2719

2720
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
219,933,648✔
2721
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
11,567,405✔
2722
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
11,567,405✔
2723
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
23,134,810✔
2724
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
11,567,405✔
2725
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
23,134,810✔
2726
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
11,567,405✔
2727
            QUERY_CHECK_CODE(code, line, _return);
11,567,405✔
2728
          } else {
2729
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
208,366,243✔
2730
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
208,366,243✔
2731
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
208,366,243✔
2732
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
208,366,243✔
2733
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
416,732,486✔
2734
          }
2735
        }
2736
      }
2737
    }
2738
  }
2739

2740
  switch (pInfo->qType) {
2,201,741✔
2741
    case DYN_QTYPE_VTB_WINDOW: {
95,202✔
2742
      code = buildOrgTbInfoBatch(pOperator, false);
95,202✔
2743
      break;
95,202✔
2744
    }
2745
    case DYN_QTYPE_VTB_AGG: {
884,641✔
2746
      if (pVtbScan->batchProcessChild) {
884,641✔
2747
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
569,101✔
2748
      } else {
2749
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
315,540✔
2750
      }
2751
      break;
884,641✔
2752
    }
2753
    case DYN_QTYPE_VTB_SCAN: {
1,221,898✔
2754
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,221,898✔
2755
      break;
1,221,898✔
2756
    }
2757
    default: {
×
2758
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2759
      break;
×
2760
    }
2761
  }
2762

2763
  QUERY_CHECK_CODE(code, line, _return);
2,201,741✔
2764

2765
_return:
2,201,741✔
2766
  if (code) {
2,201,741✔
2767
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,080✔
2768
  }
2769
  return code;
2,201,741✔
2770
}
2771

2772
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
114,416✔
2773
  int32_t                    code = TSDB_CODE_SUCCESS;
114,416✔
2774
  int32_t                    line = 0;
114,416✔
2775
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
114,416✔
2776
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
114,416✔
2777
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
114,416✔
2778
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
114,416✔
2779
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
114,416✔
2780
  int32_t                    rversion = 0;
114,416✔
2781

2782
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
114,416✔
2783
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
114,416✔
2784

2785
  while (true) {
225,136✔
2786
    SSDataBlock *pTableInfo = NULL;
339,552✔
2787
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
339,552✔
2788
    if (pTableInfo == NULL) {
339,552✔
2789
      break;
114,416✔
2790
    }
2791

2792
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
225,136✔
2793
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
225,136✔
2794
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
225,136✔
2795

2796
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
225,136✔
2797
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
225,136✔
2798
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
225,136✔
2799

2800
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
8,363,164✔
2801
      if (!colDataIsNull_s(pRefVerCol, i)) {
16,276,056✔
2802
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
8,138,028✔
2803
      }
2804

2805
      if (!colDataIsNull_s(pTableNameCol, i)) {
16,276,056✔
2806
        char* tbrawname = colDataGetData(pTableNameCol, i);
8,138,028✔
2807
        char* dbrawname = colDataGetData(pDbNameCol, i);
8,138,028✔
2808
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
8,138,028✔
2809
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
8,138,028✔
2810

2811
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
8,138,028✔
2812
          SColRefInfo info = {0};
568,136✔
2813
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
568,136✔
2814
          QUERY_CHECK_CODE(code, line, _return);
568,136✔
2815

2816
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
568,136✔
2817
            if (pVtbScan->curOrgTbVg == NULL) {
6,912✔
2818
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
432✔
2819
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
432✔
2820
            }
2821
            int32_t vgId;
6,912✔
2822
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,912✔
2823
            QUERY_CHECK_CODE(code, line, _return);
6,912✔
2824
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,912✔
2825
            QUERY_CHECK_CODE(code, line, _return);
6,912✔
2826
          }
2827

2828
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,136,272✔
2829
        }
2830
      }
2831
    }
2832
  }
2833
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
114,416✔
2834
  QUERY_CHECK_CODE(code, line, _return);
114,416✔
2835

2836
_return:
113,120✔
2837
  if (code) {
114,416✔
2838
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,296✔
2839
  }
2840
  return code;
114,416✔
2841
}
2842

2843
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,019,600✔
2844
  int32_t                    code = TSDB_CODE_SUCCESS;
2,019,600✔
2845
  int32_t                    line = 0;
2,019,600✔
2846
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,019,600✔
2847
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,019,600✔
2848
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,019,600✔
2849

2850
  SArray *tmpArray = NULL;
2,019,600✔
2851
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,019,600✔
2852
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,019,600✔
2853
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,752✔
2854
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,376✔
2855
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,376✔
2856
      QUERY_CHECK_CODE(code, line, _return);
2,376✔
2857
      if (pVtbScan->newAddedVgInfo == NULL) {
2,376✔
2858
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
864✔
2859
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
864✔
2860
      }
2861
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,376✔
2862
      QUERY_CHECK_CODE(code, line, _return);
2,376✔
2863
    }
2864
    pVtbScan->needRedeploy = false;
2,376✔
2865
  } else {
2866
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,017,224✔
2867
    QUERY_CHECK_CODE(code, line, _return);
2,017,224✔
2868
  }
2869

2870
_return:
×
2871
  taosArrayClear(tmpArray);
2,019,600✔
2872
  taosArrayDestroy(tmpArray);
2,019,600✔
2873
  if (code) {
2,019,600✔
2874
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,017,224✔
2875
  }
2876
  return code;
2,019,600✔
2877
}
2878

2879
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
4,649,025✔
2880
  int32_t                    code = TSDB_CODE_SUCCESS;
4,649,025✔
2881
  int32_t                    line = 0;
4,649,025✔
2882
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,649,025✔
2883
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,649,025✔
2884
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,649,025✔
2885

2886
  pVtbScan->vtbScanParam = NULL;
4,649,025✔
2887
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
4,649,025✔
2888
  QUERY_CHECK_CODE(code, line, _return);
4,649,025✔
2889

2890
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
4,649,025✔
2891
  while (pIter != NULL) {
11,519,297✔
2892
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
6,870,272✔
2893
    SOperatorParam*  pExchangeParam = NULL;
6,870,272✔
2894
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
6,870,272✔
2895
    if (addr != NULL) {
6,870,272✔
2896
      SDownstreamSourceNode newSource = {0};
2,376✔
2897
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,376✔
2898
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,376✔
2899
      newSource.taskId = addr->taskId;
2,376✔
2900
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,376✔
2901
      newSource.localExec = false;
2,376✔
2902
      newSource.addr.nodeId = addr->nodeId;
2,376✔
2903
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,376✔
2904

2905
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,376✔
2906
      QUERY_CHECK_CODE(code, line, _return);
2,376✔
2907
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,376✔
2908
      QUERY_CHECK_CODE(code, line, _return);
2,376✔
2909
    } else {
2910
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
6,867,896✔
2911
      QUERY_CHECK_CODE(code, line, _return);
6,867,896✔
2912
    }
2913
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
13,740,544✔
2914
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
6,870,272✔
2915
  }
2916

2917
  SOperatorParam*  pExchangeParam = NULL;
4,649,025✔
2918
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
4,649,025✔
2919
  QUERY_CHECK_CODE(code, line, _return);
4,649,025✔
2920
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
4,649,025✔
2921

2922
_return:
4,649,025✔
2923
  if (code) {
4,649,025✔
2924
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2925
  }
2926
  return code;
4,649,025✔
2927
}
2928

2929
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
12,449,533✔
2930
  int32_t                    code = TSDB_CODE_SUCCESS;
12,449,533✔
2931
  int32_t                    line = 0;
12,449,533✔
2932
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,449,533✔
2933
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,449,533✔
2934
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
12,449,533✔
2935

2936
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
12,449,533✔
2937
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
12,449,533✔
2938
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
12,449,533✔
2939

2940
  while (true) {
2941
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
15,805,516✔
2942
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
11,156,491✔
2943
      QUERY_CHECK_CODE(code, line, _return);
11,156,491✔
2944
    } else {
2945
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
4,649,025✔
2946
      SArray* pColRefInfo = NULL;
4,649,025✔
2947
      if (pVtbScan->isSuperTable) {
4,649,025✔
2948
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
4,535,905✔
2949
      } else {
2950
        pColRefInfo = pInfo->vtbScan.colRefInfo;
113,120✔
2951
      }
2952
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
4,649,025✔
2953

2954
      tb_uid_t  uid = 0;
4,649,025✔
2955
      int32_t   vgId = 0;
4,649,025✔
2956
      SHashObj* refMap = NULL;
4,649,025✔
2957
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, &refMap);
4,649,025✔
2958
      QUERY_CHECK_CODE(code, line, _return);
4,649,025✔
2959

2960
      qDebug("virtual table scan process subtable idx:%d uid:%" PRIu64 " vgId:%d", pVtbScan->curTableIdx, uid, vgId);
4,649,025✔
2961

2962
      code = buildRefSlotGroupsFromRefMap(refMap, pVtbScan->readColList, &pVtbScan->refColGroups);
4,649,025✔
2963
      QUERY_CHECK_CODE(code, line, _return);
4,649,025✔
2964

2965
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
4,649,025✔
2966
      QUERY_CHECK_CODE(code, line, _return);
4,649,025✔
2967

2968
      // reset downstream operator's status
2969
      pVtbScanOp->status = OP_NOT_OPENED;
4,649,025✔
2970
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
4,649,025✔
2971
      QUERY_CHECK_CODE(code, line, _return);
4,648,387✔
2972
    }
2973

2974
    if (*pRes) {
15,804,878✔
2975
      // has result, still read data from this table.
2976
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
11,161,063✔
2977
      break;
11,161,063✔
2978
    } else {
2979
      // no result, read next table.
2980
      pVtbScan->curTableIdx++;
4,643,815✔
2981
      if (pVtbScan->isSuperTable) {
4,643,815✔
2982
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
4,530,695✔
2983
          setOperatorCompleted(pOperator);
1,174,712✔
2984
          break;
1,174,712✔
2985
        }
2986
      } else {
2987
        setOperatorCompleted(pOperator);
113,120✔
2988
        break;
113,120✔
2989
      }
2990
    }
2991
  }
2992

2993
_return:
12,448,895✔
2994
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
12,448,895✔
2995
  pVtbScan->otbNameToOtbInfoMap = NULL;
12,448,895✔
2996
  if (code) {
12,448,895✔
2997
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2998
  }
2999
  return code;
12,448,895✔
3000
}
3001

3002
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
12,492,805✔
3003
  int32_t                    code = TSDB_CODE_SUCCESS;
12,492,805✔
3004
  int32_t                    line = 0;
12,492,805✔
3005
  int64_t                    st = 0;
12,492,805✔
3006
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,492,805✔
3007
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,492,805✔
3008

3009
  if (OPTR_IS_OPENED(pOperator)) {
12,492,805✔
3010
    return code;
11,156,491✔
3011
  }
3012

3013
  if (pOperator->cost.openCost == 0) {
1,336,314✔
3014
    st = taosGetTimestampUs();
1,251,362✔
3015
  }
3016

3017
  if (pVtbScan->isSuperTable) {
1,336,314✔
3018
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,221,898✔
3019
    QUERY_CHECK_CODE(code, line, _return);
1,221,898✔
3020
  } else {
3021
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
114,416✔
3022
    QUERY_CHECK_CODE(code, line, _return);
114,416✔
3023
  }
3024

3025
  OPTR_SET_OPENED(pOperator);
1,333,938✔
3026

3027
_return:
1,336,314✔
3028
  if (pOperator->cost.openCost == 0) {
1,336,314✔
3029
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,251,362✔
3030
  }
3031
  if (code) {
1,336,314✔
3032
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,376✔
3033
    pOperator->pTaskInfo->code = code;
2,376✔
3034
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,376✔
3035
  }
3036
  return code;
1,333,938✔
3037
}
3038

3039
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
14,510,029✔
3040
  int32_t                    code = TSDB_CODE_SUCCESS;
14,510,029✔
3041
  int32_t                    line = 0;
14,510,029✔
3042
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
14,510,029✔
3043
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
14,510,029✔
3044

3045
  QRY_PARAM_CHECK(pRes);
14,510,029✔
3046
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
14,510,029✔
3047
    return code;
×
3048
  }
3049
  if (pOperator->pOperatorGetParam) {
14,510,029✔
3050
    if (pOperator->status == OP_EXEC_DONE) {
×
3051
      pOperator->status = OP_OPENED;
×
3052
    }
3053
    pVtbScan->curTableIdx = 0;
×
3054
    pVtbScan->lastTableIdx = -1;
×
3055
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
3056
    pOperator->pOperatorGetParam = NULL;
×
3057
  } else {
3058
    pVtbScan->window.skey = INT64_MAX;
14,510,029✔
3059
    pVtbScan->window.ekey = INT64_MIN;
14,510,029✔
3060
  }
3061

3062
  if (pVtbScan->needRedeploy) {
14,510,029✔
3063
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,019,600✔
3064
    QUERY_CHECK_CODE(code, line, _return);
2,019,600✔
3065
  }
3066

3067
  code = pOperator->fpSet._openFn(pOperator);
12,492,805✔
3068
  QUERY_CHECK_CODE(code, line, _return);
12,490,429✔
3069

3070
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
12,490,429✔
3071
    setOperatorCompleted(pOperator);
40,896✔
3072
    return code;
40,896✔
3073
  }
3074

3075
  code = virtualTableScanGetNext(pOperator, pRes);
12,449,533✔
3076
  QUERY_CHECK_CODE(code, line, _return);
12,448,895✔
3077

3078
  return code;
12,448,895✔
3079

3080
_return:
2,017,224✔
3081
  if (code) {
2,017,224✔
3082
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,017,224✔
3083
    pOperator->pTaskInfo->code = code;
2,017,224✔
3084
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,017,224✔
3085
  }
3086
  return code;
×
3087
}
3088

3089
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,150,806✔
3090
  if (batchFetch) {
1,150,806✔
3091
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,149,684✔
3092
    if (NULL == pPrev->leftHash) {
1,149,684✔
3093
      return terrno;
×
3094
    }
3095
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,149,684✔
3096
    if (NULL == pPrev->rightHash) {
1,149,684✔
3097
      return terrno;
×
3098
    }
3099
  } else {
3100
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,122✔
3101
    if (NULL == pPrev->leftCache) {
1,122✔
3102
      return terrno;
×
3103
    }
3104
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,122✔
3105
    if (NULL == pPrev->rightCache) {
1,122✔
3106
      return terrno;
×
3107
    }
3108
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,122✔
3109
    if (NULL == pPrev->onceTable) {
1,122✔
3110
      return terrno;
×
3111
    }
3112
  }
3113

3114
  return TSDB_CODE_SUCCESS;
1,150,806✔
3115
}
3116

3117
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
3118
  if (pStreamRuntimeInfo == NULL) {
×
3119
    return;
×
3120
  }
3121

3122
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
3123
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
3124
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
3125
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
3126
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
3127

3128
      pVtbScan->dynTbUid = pValue->uid;
×
3129
      break;
×
3130
    }
3131
  }
3132
}
3133

3134
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
2,778,186✔
3135
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
3136
  int32_t      code = TSDB_CODE_SUCCESS;
2,778,186✔
3137
  int32_t      line = 0;
2,778,186✔
3138

3139
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
2,778,186✔
3140
  QUERY_CHECK_CODE(code, line, _return);
2,778,186✔
3141

3142
  pInfo->vtbScan.genNewParam = true;
2,778,186✔
3143
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
2,778,186✔
3144
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
2,778,186✔
3145
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
2,778,186✔
3146
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
2,778,186✔
3147
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
2,778,186✔
3148
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
2,778,186✔
3149
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
2,778,186✔
3150
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
2,778,186✔
3151
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
2,778,186✔
3152
  pInfo->vtbScan.needRedeploy = false;
2,778,186✔
3153
  pInfo->vtbScan.pMsgCb = pMsgCb;
2,778,186✔
3154
  pInfo->vtbScan.curTableIdx = 0;
2,778,186✔
3155
  pInfo->vtbScan.lastTableIdx = -1;
2,778,186✔
3156
  pInfo->vtbScan.dynTbUid = 0;
2,778,186✔
3157
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
2,778,186✔
3158
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
2,778,186✔
3159
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
2,778,186✔
3160
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
2,778,186✔
3161
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,778,186✔
3162
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
2,778,186✔
3163
  SNode* node = NULL;
2,778,186✔
3164
  FOREACH(node, pPhyciNode->vtbScan.pOrgVgIds) {
8,821,494✔
3165
    SValueNode* valueNode = (SValueNode*)node;
6,043,308✔
3166
    int32_t vgId = (int32_t)valueNode->datum.i;
6,043,308✔
3167
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,043,308✔
3168
    QUERY_CHECK_CODE(code, line, _return);
6,043,308✔
3169
  }
3170

3171
  if (pPhyciNode->dynTbname && pTaskInfo) {
2,778,186✔
3172
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3173
  }
3174

3175
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
2,778,186✔
3176
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
2,778,186✔
3177

3178
  SNode* colNode = NULL;
2,778,186✔
3179
  FOREACH(colNode, pPhyciNode->vtbScan.pScanCols) {
24,384,986✔
3180
    SColumnNode* pNode = (SColumnNode*)colNode;
21,606,800✔
3181
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
21,606,800✔
3182
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
43,213,600✔
3183
  }
3184

3185
  pInfo->vtbScan.readColSet =
2,778,186✔
3186
      taosHashInit(taosArrayGetSize(pInfo->vtbScan.readColList) > 0 ? taosArrayGetSize(pInfo->vtbScan.readColList) : 1,
2,778,186✔
3187
                   taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), true, HASH_NO_LOCK);
3188
  QUERY_CHECK_NULL(pInfo->vtbScan.readColSet, code, line, _return, terrno)
2,778,186✔
3189
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->vtbScan.readColList); i++) {
24,384,986✔
3190
    col_id_t colId = *(col_id_t*)taosArrayGet(pInfo->vtbScan.readColList, i);
21,606,800✔
3191
    code = taosHashPut(pInfo->vtbScan.readColSet, &colId, sizeof(colId), NULL, 0);
21,606,800✔
3192
    QUERY_CHECK_CODE(code, line, _return);
21,606,800✔
3193
  }
3194

3195
  pInfo->vtbScan.refColGroups = NULL;
2,778,186✔
3196

3197
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
2,778,186✔
3198
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
2,778,186✔
3199

3200
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,778,186✔
3201
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
2,778,186✔
3202

3203
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
2,778,186✔
3204
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
2,778,186✔
3205
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
2,778,186✔
3206
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
2,778,186✔
3207
  pInfo->vtbScan.vtbUidTagListMap = NULL;
2,778,186✔
3208
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
2,778,186✔
3209
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
2,778,186✔
3210

3211
  return code;
2,778,186✔
3212
_return:
×
3213
  // no need to destroy array and hashmap allocated in this function,
3214
  // since the operator's destroy function will take care of it
3215
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3216
  return code;
×
3217
}
3218

3219
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
642,183✔
3220
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3221
  int32_t              code = TSDB_CODE_SUCCESS;
642,183✔
3222
  int32_t              line = 0;
642,183✔
3223
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
642,183✔
3224

3225
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
642,183✔
3226
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
642,183✔
3227
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
642,183✔
3228
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
642,183✔
3229
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
642,183✔
3230
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
642,183✔
3231

3232
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
642,183✔
3233
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
642,183✔
3234

3235
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
642,183✔
3236
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
642,183✔
3237

3238
  pInfo->vtbWindow.outputWstartSlotId = -1;
642,183✔
3239
  pInfo->vtbWindow.outputWendSlotId = -1;
642,183✔
3240
  pInfo->vtbWindow.outputWdurationSlotId = -1;
642,183✔
3241
  pInfo->vtbWindow.curWinBatchIdx = 0;
642,183✔
3242

3243
  initResultSizeInfo(&pOperator->resultInfo, 1);
642,183✔
3244
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
642,183✔
3245
  QUERY_CHECK_CODE(code, line, _return);
642,183✔
3246

3247
  return code;
642,183✔
3248
_return:
×
3249
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3250
  return code;
×
3251
}
3252

3253
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
2,949,540✔
3254
  int32_t code = TSDB_CODE_SUCCESS;
2,949,540✔
3255
  int32_t lino = 0;
2,949,540✔
3256

3257
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,949,540✔
3258
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
2,949,540✔
3259
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
2,949,540✔
3260

3261
    *ppTsCols = (int64_t*)pColDataInfo->pData;
2,949,540✔
3262

3263
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,949,540✔
3264
      code = blockDataUpdateTsWindow(pBlock, slotId);
287,760✔
3265
      QUERY_CHECK_CODE(code, lino, _return);
287,760✔
3266
    }
3267
  }
3268

3269
  return code;
2,949,540✔
3270
_return:
×
3271
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3272
  return code;
×
3273
}
3274

3275
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
678,153✔
3276
  int32_t                    code = TSDB_CODE_SUCCESS;
678,153✔
3277
  int32_t                    lino = 0;
678,153✔
3278
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
678,153✔
3279
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
678,153✔
3280
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
678,153✔
3281
  int64_t                    st = 0;
678,153✔
3282

3283
  if (OPTR_IS_OPENED(pOperator)) {
678,153✔
3284
    return code;
35,970✔
3285
  }
3286

3287
  if (pOperator->cost.openCost == 0) {
642,183✔
3288
    st = taosGetTimestampUs();
642,183✔
3289
  }
3290

3291
  while (1) {
1,474,770✔
3292
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
2,116,953✔
3293
    if (pBlock == NULL) {
2,116,953✔
3294
      break;
642,183✔
3295
    }
3296

3297
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
1,474,770✔
3298
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
4,636,248✔
3299
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
3,994,065✔
3300
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
3,994,065✔
3301
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
1,049,421✔
3302
            pInfo->outputWstartSlotId = i;
391,575✔
3303
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
657,846✔
3304
            pInfo->outputWendSlotId = i;
391,575✔
3305
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
266,271✔
3306
            pInfo->outputWdurationSlotId = i;
266,271✔
3307
          }
3308
        }
3309
      }
3310
    }
3311

3312
    TSKEY* wstartCol = NULL;
1,474,770✔
3313
    TSKEY* wendCol = NULL;
1,474,770✔
3314

3315
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
1,474,770✔
3316
    QUERY_CHECK_CODE(code, lino, _return);
1,474,770✔
3317
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
1,474,770✔
3318
    QUERY_CHECK_CODE(code, lino, _return);
1,474,770✔
3319

3320
    SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
1,474,770✔
3321
    QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
1,474,770✔
3322

3323
    QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
1,474,770✔
3324

3325
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
3326
      SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
2,147,483,647✔
3327
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
2,147,483,647✔
3328
      pWindow->tw.skey = wstartCol[i];
2,147,483,647✔
3329
      pWindow->tw.ekey = wendCol[i] + 1;
2,147,483,647✔
3330
      pWindow->winOutIdx = -1;
2,147,483,647✔
3331
    }
3332

3333
    QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
2,949,540✔
3334
  }
3335

3336
  // handle first window's start key and last window's end key
3337
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
642,183✔
3338
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
642,183✔
3339

3340
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
642,183✔
3341
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
642,183✔
3342

3343
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
642,183✔
3344
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
642,183✔
3345

3346
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
642,183✔
3347
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
642,183✔
3348

3349
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
642,183✔
3350
    lastWin->tw.ekey = INT64_MAX;
182,327✔
3351
  }
3352
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
642,183✔
3353
    firstWin->tw.skey = INT64_MIN;
229,928✔
3354
  }
3355

3356
  if (pInfo->isVstb) {
642,183✔
3357
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
95,202✔
3358
    QUERY_CHECK_CODE(code, lino, _return);
95,202✔
3359
  }
3360

3361
  OPTR_SET_OPENED(pOperator);
642,183✔
3362

3363
  if (pOperator->cost.openCost == 0) {
642,183✔
3364
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
642,183✔
3365
  }
3366

3367
_return:
×
3368
  if (code != TSDB_CODE_SUCCESS) {
642,183✔
3369
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3370
    pTaskInfo->code = code;
×
3371
    T_LONG_JMP(pTaskInfo->env, code);
×
3372
  }
3373
  return code;
642,183✔
3374
}
3375

3376
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
102,168✔
3377
  int32_t                       code = TSDB_CODE_SUCCESS;
102,168✔
3378
  int32_t                       lino = 0;
102,168✔
3379
  SExternalWindowOperatorParam* pExtWinOp = NULL;
102,168✔
3380

3381
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
102,168✔
3382
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
102,168✔
3383

3384
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
102,168✔
3385
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
102,168✔
3386

3387
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
102,168✔
3388
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
102,168✔
3389

3390
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
102,168✔
3391
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGetLast(pWins);
102,168✔
3392

3393
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
102,168✔
3394
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
102,168✔
3395

3396
  SOperatorParam* pExchangeParam = NULL;
102,168✔
3397
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pInfo->vtbScan.otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey}, EX_SRC_TYPE_VSTB_WIN_SCAN);
102,168✔
3398
  QUERY_CHECK_CODE(code, lino, _return);
102,168✔
3399

3400
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
204,336✔
3401

3402
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
102,168✔
3403
  (*ppRes)->downstreamIdx = idx;
102,168✔
3404
  (*ppRes)->value = pExtWinOp;
102,168✔
3405
  (*ppRes)->reUse = false;
102,168✔
3406

3407
  return code;
102,168✔
3408
_return:
×
3409
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3410
  if (pExtWinOp) {
×
3411
    if (pExtWinOp->ExtWins) {
×
3412
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3413
    }
3414
    taosMemoryFree(pExtWinOp);
×
3415
  }
3416
  if (*ppRes) {
×
3417
    if ((*ppRes)->pChildren) {
×
3418
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3419
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
3420
        if (pChildParam) {
×
3421
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
3422
          if (pDynParam) {
×
3423
            taosMemoryFree(pDynParam);
×
3424
          }
3425
          taosMemoryFree(pChildParam);
×
3426
        }
3427
      }
3428
      taosArrayDestroy((*ppRes)->pChildren);
×
3429
    }
3430
    taosMemoryFree(*ppRes);
×
3431
    *ppRes = NULL;
×
3432
  }
3433
  return code;
×
3434
}
3435

3436
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
678,153✔
3437
  int32_t                    code = TSDB_CODE_SUCCESS;
678,153✔
3438
  int32_t                    lino = 0;
678,153✔
3439
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
678,153✔
3440
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
678,153✔
3441
  int64_t                    st = taosGetTimestampUs();
678,153✔
3442
  int32_t                    numOfWins = 0;
678,153✔
3443
  SOperatorInfo*             mergeOp = NULL;
678,153✔
3444
  SOperatorInfo*             extWinOp = NULL;
678,153✔
3445
  SOperatorParam*            pMergeParam = NULL;
678,153✔
3446
  SOperatorParam*            pExtWinParam = NULL;
678,153✔
3447
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
678,153✔
3448
  SSDataBlock*               pRes = pInfo->pRes;
678,153✔
3449

3450
  code = pOperator->fpSet._openFn(pOperator);
678,153✔
3451
  QUERY_CHECK_CODE(code, lino, _return);
678,153✔
3452

3453
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
678,153✔
3454
    *ppRes = NULL;
15,663✔
3455
    return code;
15,663✔
3456
  }
3457

3458
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
662,490✔
3459
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
662,490✔
3460

3461
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
662,490✔
3462

3463
  if (pInfo->isVstb) {
662,490✔
3464
    extWinOp = pOperator->pDownstream[2];
102,168✔
3465
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
102,168✔
3466
    QUERY_CHECK_CODE(code, lino, _return);
102,168✔
3467

3468
    SSDataBlock* pExtWinBlock = NULL;
102,168✔
3469
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
102,168✔
3470
    QUERY_CHECK_CODE(code, lino, _return);
102,168✔
3471
    setOperatorCompleted(extWinOp);
102,168✔
3472
    // Free the parameter after operator completes, as it's been saved to the operator
3473
    if (extWinOp->pOperatorGetParam) {
102,168✔
NEW
3474
      freeOperatorParam(extWinOp->pOperatorGetParam, OP_GET_PARAM);
×
NEW
3475
      extWinOp->pOperatorGetParam = NULL;
×
3476
    }
3477
    // Also free downstream params if any
3478
    if (extWinOp->pDownstreamGetParams) {
102,168✔
3479
      for (int32_t i = 0; i < extWinOp->numOfDownstream; i++) {
204,336✔
3480
        if (extWinOp->pDownstreamGetParams[i]) {
102,168✔
NEW
3481
          freeOperatorParam(extWinOp->pDownstreamGetParams[i], OP_GET_PARAM);
×
NEW
3482
          extWinOp->pDownstreamGetParams[i] = NULL;
×
3483
        }
3484
      }
3485
    }
3486

3487
    blockDataCleanup(pRes);
102,168✔
3488
    code = blockDataEnsureCapacity(pRes, numOfWins);
102,168✔
3489
    QUERY_CHECK_CODE(code, lino, _return);
102,168✔
3490

3491
    if (pExtWinBlock) {
102,168✔
3492
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
102,168✔
3493
      QUERY_CHECK_CODE(code, lino, _return);
102,168✔
3494

3495
      if (pInfo->curWinBatchIdx == 0) {
102,168✔
3496
        // first batch, get _wstart from pMergedBlock
3497
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
95,202✔
3498
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
95,202✔
3499

3500
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
95,202✔
3501
      }
3502
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
102,168✔
3503
        // last batch, get _wend from pMergedBlock
3504
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
2,322✔
3505
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
2,322✔
3506

3507
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
2,322✔
3508
      }
3509
    }
3510
  } else {
3511
    mergeOp = pOperator->pDownstream[1];
560,322✔
3512
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
560,322✔
3513
    QUERY_CHECK_CODE(code, lino, _return);
560,322✔
3514

3515
    SSDataBlock* pMergedBlock = NULL;
560,322✔
3516
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
560,322✔
3517
    QUERY_CHECK_CODE(code, lino, _return);
560,322✔
3518
    // Free the parameter after operator completes, as it's been saved to the operator
3519
    if (mergeOp->pOperatorGetParam) {
560,322✔
NEW
3520
      freeOperatorParam(mergeOp->pOperatorGetParam, OP_GET_PARAM);
×
NEW
3521
      mergeOp->pOperatorGetParam = NULL;
×
3522
    }
3523
    // Also free downstream params if any
3524
    if (mergeOp->pDownstreamGetParams) {
560,322✔
3525
      for (int32_t i = 0; i < mergeOp->numOfDownstream; i++) {
1,907,763✔
3526
        if (mergeOp->pDownstreamGetParams[i]) {
1,347,441✔
NEW
3527
          freeOperatorParam(mergeOp->pDownstreamGetParams[i], OP_GET_PARAM);
×
NEW
3528
          mergeOp->pDownstreamGetParams[i] = NULL;
×
3529
        }
3530
      }
3531
    }
3532

3533
    blockDataCleanup(pRes);
560,322✔
3534
    code = blockDataEnsureCapacity(pRes, numOfWins);
560,322✔
3535
    QUERY_CHECK_CODE(code, lino, _return);
560,322✔
3536

3537
    if (pMergedBlock) {
560,322✔
3538
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
560,322✔
3539
      QUERY_CHECK_CODE(code, lino, _return);
560,322✔
3540

3541
      if (pInfo->curWinBatchIdx == 0) {
560,322✔
3542
        // first batch, get _wstart from pMergedBlock
3543
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
546,981✔
3544
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
546,981✔
3545

3546
        firstWin->tw.skey = pMergedBlock->info.window.skey;
546,981✔
3547
      }
3548
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
560,322✔
3549
        // last batch, get _wend from pMergedBlock
3550
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
13,341✔
3551
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
13,341✔
3552

3553
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
13,341✔
3554
      }
3555
    }
3556
  }
3557

3558

3559
  if (pInfo->outputWstartSlotId != -1) {
662,490✔
3560
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
411,882✔
3561
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
411,882✔
3562

3563
    for (int32_t i = 0; i < numOfWins; i++) {
1,246,516,299✔
3564
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
1,246,104,417✔
3565
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
1,246,104,417✔
3566
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
1,246,104,417✔
3567
      QUERY_CHECK_CODE(code, lino, _return);
1,246,104,417✔
3568
    }
3569
  }
3570
  if (pInfo->outputWendSlotId != -1) {
662,490✔
3571
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
411,882✔
3572
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
411,882✔
3573

3574
    for (int32_t i = 0; i < numOfWins; i++) {
1,246,516,299✔
3575
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
1,246,104,417✔
3576
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
1,246,104,417✔
3577
      TSKEY ekey = pWindow->tw.ekey - 1;
1,246,104,417✔
3578
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
1,246,104,417✔
3579
      QUERY_CHECK_CODE(code, lino, _return);
1,246,104,417✔
3580
    }
3581
  }
3582
  if (pInfo->outputWdurationSlotId != -1) {
662,490✔
3583
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
286,578✔
3584
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
286,578✔
3585

3586
    for (int32_t i = 0; i < numOfWins; i++) {
861,457,107✔
3587
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
861,170,529✔
3588
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
861,170,529✔
3589
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
861,170,529✔
3590
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
861,170,529✔
3591
      QUERY_CHECK_CODE(code, lino, _return);
861,170,529✔
3592
    }
3593
  }
3594

3595
  pRes->info.rows = numOfWins;
662,490✔
3596
  *ppRes = pRes;
662,490✔
3597
  pInfo->curWinBatchIdx++;
662,490✔
3598

3599
  return code;
662,490✔
3600

3601
_return:
×
3602
  if (code != TSDB_CODE_SUCCESS) {
×
3603
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3604
    pTaskInfo->code = code;
×
3605
    T_LONG_JMP(pTaskInfo->env, code);
×
3606
  }
3607
  return code;
×
3608
}
3609

3610
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,218,696✔
3611
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
2,218,696✔
3612
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
2,218,912✔
3613
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
2,218,696✔
3614

3615
  pOper->status = OP_NOT_OPENED;
2,218,696✔
3616

3617
  switch (pDyn->qType) {
2,218,912✔
3618
    case DYN_QTYPE_STB_HASH:{
780✔
3619
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
780✔
3620
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
780✔
3621
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
780✔
3622
      
3623
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
780✔
3624
      if (TSDB_CODE_SUCCESS != code) {
780✔
3625
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
3626
        return code;
×
3627
      }
3628
      pStbJoin->ctx.prev.pListHead = NULL;
780✔
3629
      pStbJoin->ctx.prev.joinBuild = false;
780✔
3630
      pStbJoin->ctx.prev.pListTail = NULL;
780✔
3631
      pStbJoin->ctx.prev.tableNum = 0;
780✔
3632

3633
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
780✔
3634
      break; 
780✔
3635
    }
3636
    case DYN_QTYPE_VTB_SCAN: {
2,217,484✔
3637
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,217,484✔
3638
      
3639
      if (pVtbScan->otbNameToOtbInfoMap) {
2,217,700✔
3640
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3641
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3642
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3643
      }
3644
      if (pVtbScan->pRsp) {
2,217,700✔
3645
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3646
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3647
      }
3648
      if (pVtbScan->colRefInfo) {
2,217,916✔
3649
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
114,416✔
3650
        pVtbScan->colRefInfo = NULL;
114,416✔
3651
      }
3652
      if (pVtbScan->childTableMap) {
2,217,268✔
3653
        taosHashCleanup(pVtbScan->childTableMap);
5,672✔
3654
        pVtbScan->childTableMap = NULL;
5,672✔
3655
      }
3656
      if (pVtbScan->childTableList) {
2,217,268✔
3657
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,217,268✔
3658
      }
3659
      if (pPhyciNode->dynTbname && pTaskInfo) {
2,217,916✔
3660
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3661
      }
3662
      pVtbScan->curTableIdx = 0;
2,217,484✔
3663
      pVtbScan->lastTableIdx = -1;
2,218,132✔
3664
      break;
2,217,916✔
3665
    }
3666
    case DYN_QTYPE_VTB_WINDOW: {
×
3667
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3668
      if (pVtbWindow->pRes) {
×
3669
        blockDataDestroy(pVtbWindow->pRes);
×
3670
        pVtbWindow->pRes = NULL;
×
3671
      }
3672
      if (pVtbWindow->pWins) {
×
3673
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
3674
        pVtbWindow->pWins = NULL;
×
3675
      }
3676
      pVtbWindow->outputWdurationSlotId = -1;
×
3677
      pVtbWindow->outputWendSlotId = -1;
×
3678
      pVtbWindow->outputWstartSlotId = -1;
×
3679
      pVtbWindow->curWinBatchIdx = 0;
×
3680
      break;
×
3681
    }
3682
    default:
×
3683
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
3684
      break;
×
3685
  }
3686
  return 0;
2,218,480✔
3687
}
3688

3689
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
4,374,819✔
3690
  int32_t                    code = TSDB_CODE_SUCCESS;
4,374,819✔
3691
  int32_t                    line = 0;
4,374,819✔
3692
  int64_t                    st = 0;
4,374,819✔
3693
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,374,819✔
3694

3695
  if (OPTR_IS_OPENED(pOperator)) {
4,374,819✔
3696
    return code;
3,490,178✔
3697
  }
3698

3699
  if (pOperator->cost.openCost == 0) {
884,641✔
3700
    st = taosGetTimestampUs();
884,641✔
3701
  }
3702

3703
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
884,641✔
3704
  QUERY_CHECK_CODE(code, line, _return);
884,641✔
3705
  OPTR_SET_OPENED(pOperator);
884,641✔
3706

3707
_return:
884,641✔
3708
  if (pOperator->cost.openCost == 0) {
884,641✔
3709
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
884,641✔
3710
  }
3711
  if (code) {
884,641✔
3712
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3713
    pOperator->pTaskInfo->code = code;
×
3714
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3715
  }
3716
  return code;
884,641✔
3717
}
3718

3719
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
4,374,819✔
3720
  int32_t                    code = TSDB_CODE_SUCCESS;
4,374,819✔
3721
  int32_t                    line = 0;
4,374,819✔
3722
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,374,819✔
3723
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,374,819✔
3724
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
4,374,819✔
3725
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
4,374,819✔
3726
  SOperatorParam*            pAggParam = NULL;
4,374,819✔
3727

3728
  if (pInfo->vtbScan.hasPartition) {
4,374,819✔
3729
    if (pInfo->vtbScan.batchProcessChild) {
3,787,028✔
3730
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,554,684✔
3731
      while (pIter) {
3,564,372✔
3732
        size_t     keyLen = 0;
3,103,640✔
3733
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
3,103,640✔
3734

3735
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
3,103,640✔
3736
        QUERY_CHECK_CODE(code, line, _return);
3,103,640✔
3737

3738
        if (pAggParam) {
3,103,640✔
3739
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
2,698,180✔
3740
          QUERY_CHECK_CODE(code, line, _return);
2,698,180✔
3741
        } else {
3742
          *pRes = NULL;
405,460✔
3743
        }
3744

3745
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
3,103,640✔
3746

3747
        if (*pRes) {
3,103,640✔
3748
          (*pRes)->info.id.groupId = groupid;
1,093,952✔
3749
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
1,093,952✔
3750
          QUERY_CHECK_CODE(code, line, _return);
1,093,952✔
3751
          break;
1,093,952✔
3752
        }
3753
      }
3754
    } else {
3755
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
2,232,344✔
3756
      while (pIter) {
3,293,648✔
3757
        size_t     keyLen = 0;
3,036,848✔
3758
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
3,036,848✔
3759
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
3,036,848✔
3760

3761
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
3,036,848✔
3762
        while (pIter2) {
5,348,048✔
3763
          size_t   keyLen2 = 0;
4,286,744✔
3764
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
4,286,744✔
3765
          SArray*  pTagList = *(SArray**)pIter2;
4,286,744✔
3766

3767
          if (pVtbScan->genNewParam) {
4,286,744✔
3768
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
2,311,200✔
3769
            QUERY_CHECK_CODE(code, line, _return);
2,311,200✔
3770
            if (pAggParam) {
2,311,200✔
3771
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
1,883,168✔
3772
              QUERY_CHECK_CODE(code, line, _return);
1,883,168✔
3773
            } else {
3774
              *pRes = NULL;
428,032✔
3775
            }
3776
          } else {
3777
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
1,975,544✔
3778
            QUERY_CHECK_CODE(code, line, _return);
1,975,544✔
3779
          }
3780

3781
          if (*pRes) {
4,286,744✔
3782
            pVtbScan->genNewParam = false;
1,975,544✔
3783
            (*pRes)->info.id.groupId = *groupid;
1,975,544✔
3784
            break;
1,975,544✔
3785
          }
3786
          pVtbScan->genNewParam = true;
2,311,200✔
3787
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
2,311,200✔
3788
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
2,311,200✔
3789
          QUERY_CHECK_CODE(code, line, _return);
2,311,200✔
3790
        }
3791
        if (*pRes) {
3,036,848✔
3792
          break;
1,975,544✔
3793
        }
3794
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
1,061,304✔
3795
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
1,061,304✔
3796
        QUERY_CHECK_CODE(code, line, _return);
1,061,304✔
3797
      }
3798
    }
3799

3800
  } else {
3801
    if (pInfo->vtbScan.batchProcessChild) {
587,791✔
3802
      code = buildAggOperatorParam(pInfo, &pAggParam);
108,369✔
3803
      QUERY_CHECK_CODE(code, line, _return);
108,369✔
3804

3805
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
108,369✔
3806
      QUERY_CHECK_CODE(code, line, _return);
108,369✔
3807
      setOperatorCompleted(pOperator);
108,369✔
3808
    } else {
3809
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
479,422✔
3810
      while (pIter) {
985,222✔
3811
        size_t   keyLen = 0;
926,482✔
3812
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
926,482✔
3813
        SArray*  pTagList = *(SArray**)pIter;
926,482✔
3814

3815
        if (pVtbScan->genNewParam) {
926,482✔
3816
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
505,800✔
3817
          QUERY_CHECK_CODE(code, line, _return);
505,800✔
3818

3819
          if (pAggParam) {
505,800✔
3820
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
398,792✔
3821
            QUERY_CHECK_CODE(code, line, _return);
398,792✔
3822
          } else {
3823
            *pRes = NULL;
107,008✔
3824
          }
3825
        } else {
3826
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
420,682✔
3827
          QUERY_CHECK_CODE(code, line, _return);
420,682✔
3828
        }
3829

3830
        if (*pRes) {
926,482✔
3831
          pVtbScan->genNewParam = false;
420,682✔
3832
          break;
420,682✔
3833
        }
3834
        pVtbScan->genNewParam = true;
505,800✔
3835
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
505,800✔
3836
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
505,800✔
3837
        QUERY_CHECK_CODE(code, line, _return);
505,800✔
3838
      }
3839
    }
3840
  }
3841
_return:
4,374,819✔
3842
  if (code) {
4,374,819✔
3843
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3844
  }
3845
  return code;
4,374,819✔
3846
}
3847

3848
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
4,483,188✔
3849
  int32_t                    code = TSDB_CODE_SUCCESS;
4,483,188✔
3850
  int32_t                    line = 0;
4,483,188✔
3851
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,483,188✔
3852
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,483,188✔
3853

3854
  QRY_PARAM_CHECK(pRes);
4,483,188✔
3855
  if (pOperator->status == OP_EXEC_DONE) {
4,483,188✔
3856
    return code;
108,369✔
3857
  }
3858

3859
  code = pOperator->fpSet._openFn(pOperator);
4,374,819✔
3860
  QUERY_CHECK_CODE(code, line, _return);
4,374,819✔
3861

3862
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
4,374,819✔
3863
    setOperatorCompleted(pOperator);
×
3864
    return code;
×
3865
  }
3866

3867
  code = virtualTableAggGetNext(pOperator, pRes);
4,374,819✔
3868
  QUERY_CHECK_CODE(code, line, _return);
4,374,819✔
3869

3870
  return code;
4,374,819✔
3871

3872
_return:
×
3873
  if (code) {
×
3874
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3875
    pOperator->pTaskInfo->code = code;
×
3876
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3877
  }
3878
  return code;
×
3879
}
3880

3881
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,928,212✔
3882
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3883
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3884
  QRY_PARAM_CHECK(pOptrInfo);
3,928,212✔
3885

3886
  int32_t                    code = TSDB_CODE_SUCCESS;
3,928,212✔
3887
  int32_t                    line = 0;
3,928,212✔
3888
  __optr_fn_t                nextFp = NULL;
3,928,212✔
3889
  __optr_open_fn_t           openFp = NULL;
3,928,212✔
3890
  SOperatorInfo*             pOperator = NULL;
3,928,212✔
3891
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
3,928,212✔
3892
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
3,928,212✔
3893

3894
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,928,212✔
3895
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
3,928,212✔
3896

3897
  pOperator->pPhyNode = pPhyciNode;
3,928,212✔
3898
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
3,928,212✔
3899

3900
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
3,928,212✔
3901
  QUERY_CHECK_CODE(code, line, _error);
3,928,212✔
3902

3903
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
3,928,212✔
3904
                  pInfo, pTaskInfo);
3905

3906
  pInfo->qType = pPhyciNode->qType;
3,928,212✔
3907
  switch (pInfo->qType) {
3,928,212✔
3908
    case DYN_QTYPE_STB_HASH:
1,150,026✔
3909
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,150,026✔
3910
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,150,026✔
3911
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,150,026✔
3912
      QUERY_CHECK_CODE(code, line, _error);
1,150,026✔
3913
      nextFp = seqStableJoin;
1,150,026✔
3914
      openFp = optrDummyOpenFn;
1,150,026✔
3915
      break;
1,150,026✔
3916
    case DYN_QTYPE_VTB_SCAN:
1,251,362✔
3917
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,251,362✔
3918
      QUERY_CHECK_CODE(code, line, _error);
1,251,362✔
3919
      nextFp = vtbScanNext;
1,251,362✔
3920
      openFp = vtbScanOpen;
1,251,362✔
3921
      break;
1,251,362✔
3922
    case DYN_QTYPE_VTB_WINDOW:
642,183✔
3923
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
642,183✔
3924
      QUERY_CHECK_CODE(code, line, _error);
642,183✔
3925
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
642,183✔
3926
      QUERY_CHECK_CODE(code, line, _error);
642,183✔
3927
      nextFp = vtbWindowNext;
642,183✔
3928
      openFp = vtbWindowOpen;
642,183✔
3929
      break;
642,183✔
3930
    case DYN_QTYPE_VTB_AGG:
884,641✔
3931
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
884,641✔
3932
      QUERY_CHECK_CODE(code, line, _error);
884,641✔
3933
      nextFp = vtbAggNext;
884,641✔
3934
      openFp = vtbAggOpen;
884,641✔
3935
      break;
884,641✔
3936
    default:
×
3937
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
3938
      code = TSDB_CODE_INVALID_PARA;
×
3939
      goto _error;
×
3940
  }
3941

3942
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
3,928,212✔
3943
                                         NULL, optrDefaultGetNextExtFn, NULL);
3944

3945
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
3,928,212✔
3946
  *pOptrInfo = pOperator;
3,928,212✔
3947
  return TSDB_CODE_SUCCESS;
3,928,212✔
3948

3949
_error:
×
3950
  if (pInfo != NULL) {
×
3951
    destroyDynQueryCtrlOperator(pInfo);
×
3952
  }
3953
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
3954
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
3955
  pTaskInfo->code = code;
×
3956
  return code;
×
3957
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc