• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4925

12 Jan 2026 09:34AM UTC coverage: 66.107% (+0.8%) from 65.354%
#4925

push

travis-ci

web-flow
merge: from main to 3.0 branch #34248

103 of 129 new or added lines in 9 files covered. (79.84%)

891 existing lines in 139 files now uncovered.

200488 of 303278 relevant lines covered (66.11%)

129810096.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.47
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "tdataformat.h"
33
#include "dynqueryctrl.h"
34

35
int64_t gSessionId = 0;
36

37
void freeVgTableList(void* ptr) { 
×
38
  taosArrayDestroy(*(SArray**)ptr); 
×
39
}
×
40

41
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,143,362✔
42
  SStbJoinTableList* pNext = NULL;
1,143,362✔
43
  
44
  while (pListHead) {
1,143,856✔
45
    taosMemoryFree(pListHead->pLeftVg);
494✔
46
    taosMemoryFree(pListHead->pLeftUid);
494✔
47
    taosMemoryFree(pListHead->pRightVg);
494✔
48
    taosMemoryFree(pListHead->pRightUid);
494✔
49
    pNext = pListHead->pNext;
494✔
50
    taosMemoryFree(pListHead);
494✔
51
    pListHead = pNext;
494✔
52
  }
53
}
1,143,362✔
54

55
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,143,362✔
56
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,143,362✔
57
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
58
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
59

60
  if (pStbJoin->basic.batchFetch) {
1,143,362✔
61
    if (pStbJoin->ctx.prev.leftHash) {
1,142,264✔
62
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,069,580✔
63
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,069,580✔
64
    }
65
    if (pStbJoin->ctx.prev.rightHash) {
1,142,264✔
66
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,069,580✔
67
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,069,580✔
68
    }
69
  } else {
70
    if (pStbJoin->ctx.prev.leftCache) {
1,098✔
71
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,098✔
72
    }
73
    if (pStbJoin->ctx.prev.rightCache) {
1,098✔
74
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,098✔
75
    }
76
    if (pStbJoin->ctx.prev.onceTable) {
1,098✔
77
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,098✔
78
    }
79
  }
80

81
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,143,362✔
82
}
1,143,362✔
83

84
void destroyColRefInfo(void *info) {
175,275,212✔
85
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
175,275,212✔
86
  if (pColRefInfo) {
175,275,212✔
87
    taosMemoryFree(pColRefInfo->colName);
175,275,644✔
88
    taosMemoryFree(pColRefInfo->colrefName);
175,275,644✔
89
  }
90
}
175,275,212✔
91

92
void destroyColRefArray(void *info) {
9,573,150✔
93
  SArray *pColRefArray = *(SArray **)info;
9,573,150✔
94
  if (pColRefArray) {
9,573,150✔
95
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
9,573,150✔
96
  }
97
}
9,573,150✔
98

99
void freeUseDbOutput(void* pOutput) {
2,204,020✔
100
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
2,204,020✔
101
  if (NULL == pOutput) {
2,204,020✔
102
    return;
×
103
  }
104

105
  if (pOut->dbVgroup) {
2,204,020✔
106
    freeVgInfo(pOut->dbVgroup);
2,204,020✔
107
  }
108
  taosMemFree(pOut);
2,204,020✔
109
}
110

111
void destroyOtbInfoArray(void *info) {
3,752,590✔
112
  SArray *pOtbInfoArray = *(SArray **)info;
3,752,590✔
113
  if (pOtbInfoArray) {
3,752,590✔
114
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
3,752,590✔
115
  }
116
}
3,752,590✔
117

118
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
2,863,592✔
119
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
2,863,592✔
120
  if (pOtbVgIdToOtbInfoArrayMap) {
2,863,592✔
121
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
2,863,592✔
122
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
2,863,592✔
123
  }
124
}
2,863,592✔
125

126
void destroyTagList(void *info) {
2,153,888✔
127
  SArray *pTagList = *(SArray **)info;
2,153,888✔
128
  if (pTagList) {
2,153,888✔
129
    taosArrayDestroyEx(pTagList, destroyTagVal);
2,153,888✔
130
  }
131
}
2,153,888✔
132

133
void destroyVtbUidTagListMap(void *info) {
810,816✔
134
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
810,816✔
135
  if (pVtbUidTagListMap) {
810,816✔
136
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
810,816✔
137
    taosHashCleanup(pVtbUidTagListMap);
810,816✔
138
  }
139
}
810,816✔
140

141
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
1,995,853✔
142
  if (pVtbScan->dbName) {
1,995,853✔
143
    taosMemoryFreeClear(pVtbScan->dbName);
1,995,853✔
144
  }
145
  if (pVtbScan->tbName) {
1,995,853✔
146
    taosMemoryFreeClear(pVtbScan->tbName);
1,995,853✔
147
  }
148
  if (pVtbScan->childTableList) {
1,995,853✔
149
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
1,995,853✔
150
  }
151
  if (pVtbScan->colRefInfo) {
1,995,853✔
152
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
153
    pVtbScan->colRefInfo = NULL;
×
154
  }
155
  if (pVtbScan->childTableMap) {
1,995,853✔
156
    taosHashCleanup(pVtbScan->childTableMap);
1,964,432✔
157
  }
158
  if (pVtbScan->readColList) {
1,995,853✔
159
    taosArrayDestroy(pVtbScan->readColList);
1,995,853✔
160
  }
161
  if (pVtbScan->dbVgInfoMap) {
1,995,853✔
162
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
1,995,853✔
163
    taosHashCleanup(pVtbScan->dbVgInfoMap);
1,995,853✔
164
  }
165
  if (pVtbScan->otbNameToOtbInfoMap) {
1,995,853✔
166
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
667,357✔
167
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
667,357✔
168
  }
169
  if (pVtbScan->pRsp) {
1,995,853✔
170
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
171
    taosMemoryFreeClear(pVtbScan->pRsp);
×
172
  }
173
  if (pVtbScan->existOrgTbVg) {
1,995,853✔
174
    taosHashCleanup(pVtbScan->existOrgTbVg);
1,995,853✔
175
  }
176
  if (pVtbScan->curOrgTbVg) {
1,995,853✔
177
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,696✔
178
  }
179
  if (pVtbScan->newAddedVgInfo) {
1,995,853✔
180
    taosHashCleanup(pVtbScan->newAddedVgInfo);
836✔
181
  }
182
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
1,995,853✔
183
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
79,174✔
184
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
79,174✔
185
  }
186
  if (pVtbScan->vtbUidToVgIdMapMap) {
1,995,853✔
187
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
241,832✔
188
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
241,832✔
189
  }
190
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
1,995,853✔
191
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
345,728✔
192
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
345,728✔
193
  }
194
  if (pVtbScan->vtbUidTagListMap) {
1,995,853✔
195
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
241,832✔
196
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
241,832✔
197
  }
198
  if (pVtbScan->vtbGroupIdTagListMap) {
1,995,853✔
199
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
543,488✔
200
  }
201
  if (pVtbScan->vtbUidToGroupIdMap) {
1,995,853✔
202
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
543,488✔
203
  }
204
}
1,995,853✔
205

206
void destroyWinArray(void *info) {
353,358,500✔
207
  SArray *pWinArray = *(SArray **)info;
353,358,500✔
208
  if (pWinArray) {
353,358,500✔
209
    taosArrayDestroy(pWinArray);
353,358,500✔
210
  }
211
}
353,358,500✔
212

213
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
185,156✔
214
  if (pVtbWindow->pRes) {
185,156✔
215
    blockDataDestroy(pVtbWindow->pRes);
185,156✔
216
  }
217
  if (pVtbWindow->pWins) {
185,156✔
218
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
185,156✔
219
  }
220
}
185,156✔
221

222
static void destroyDynQueryCtrlOperator(void* param) {
3,323,636✔
223
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
3,323,636✔
224

225
  switch (pDyn->qType) {
3,323,636✔
226
    case DYN_QTYPE_STB_HASH:
1,142,627✔
227
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,142,627✔
228
      break;
1,142,627✔
229
    case DYN_QTYPE_VTB_AGG:
1,995,853✔
230
    case DYN_QTYPE_VTB_SCAN:
231
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
1,995,853✔
232
      break;
1,995,853✔
233
    case DYN_QTYPE_VTB_WINDOW:
185,156✔
234
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
185,156✔
235
      break;
185,156✔
236
    default:
×
237
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
238
      break;
×
239
  }
240

241
  taosMemoryFreeClear(param);
3,323,636✔
242
}
3,323,636✔
243

244
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
245
  if (batchFetch) {
7,406,050✔
246
    return true;
7,401,658✔
247
  }
248
  
249
  if (rightTable) {
4,392✔
250
    return pPost->rightCurrUid == pPost->rightNextUid;
2,196✔
251
  }
252

253
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,196✔
254

255
  return (NULL == num) ? false : true;
2,196✔
256
}
257

258
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,703,025✔
259
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,703,025✔
260
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,703,025✔
261
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,703,025✔
262
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,703,025✔
263
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,703,025✔
264
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,703,025✔
265
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,703,025✔
266
  int64_t                    readIdx = pNode->readIdx + 1;
3,703,025✔
267
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,703,025✔
268

269
  pPost->leftCurrUid = *leftUid;
3,703,025✔
270
  pPost->rightCurrUid = *rightUid;
3,703,025✔
271

272
  pPost->leftVgId = *leftVgId;
3,703,025✔
273
  pPost->rightVgId = *rightVgId;
3,703,025✔
274

275
  while (true) {
276
    if (readIdx < pNode->uidNum) {
3,703,025✔
277
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,629,737✔
278
      break;
3,629,737✔
279
    }
280
    
281
    pNode = pNode->pNext;
73,288✔
282
    if (NULL == pNode) {
73,288✔
283
      pPost->rightNextUid = 0;
73,288✔
284
      break;
73,288✔
285
    }
286
    
287
    rightUid = pNode->pRightUid;
×
288
    readIdx = 0;
×
289
  }
290

291
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,406,050✔
292
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,406,050✔
293

294
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,703,025✔
295
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
296
    pStbJoin->execInfo.rightCacheNum++;
×
297
  }  
298

299
  return TSDB_CODE_SUCCESS;
3,703,025✔
300
}
301

302
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
7,746,048✔
303
  int32_t     code = TSDB_CODE_SUCCESS;
7,746,048✔
304
  int32_t     lino = 0;
7,746,048✔
305
  SOrgTbInfo* pTbInfo = NULL;
7,746,048✔
306

307
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
7,746,048✔
308

309
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
7,746,048✔
310
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
7,746,048✔
311

312
  pTbInfo->vgId = pSrc->vgId;
7,746,048✔
313
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
7,746,048✔
314

315
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
7,746,048✔
316
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
7,746,048✔
317

318
  *ppDst = pTbInfo;
7,746,048✔
319

320
  return code;
7,746,048✔
321
_return:
×
322
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
323
  if (pTbInfo) {
×
324
    if (pTbInfo->colMap) {
×
325
      taosArrayDestroy(pTbInfo->colMap);
×
326
    }
327
    taosMemoryFreeClear(pTbInfo);
×
328
  }
329
  return code;
×
330
}
331

332
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
1,875,056✔
333
  int32_t  code = TSDB_CODE_SUCCESS;
1,875,056✔
334
  int32_t  lino = 0;
1,875,056✔
335
  STagVal  tmpTag;
1,875,056✔
336

337
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
1,875,056✔
338
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
1,875,056✔
339

340
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
12,674,860✔
341
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
10,799,804✔
342
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
10,799,804✔
343
    tmpTag.type = pSrcTag->type;
10,799,804✔
344
    tmpTag.cid = pSrcTag->cid;
10,799,804✔
345
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
10,799,804✔
346
      tmpTag.nData = pSrcTag->nData;
4,737,192✔
347
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
4,737,192✔
348
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
4,737,192✔
349
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
4,737,192✔
350
    } else {
351
      tmpTag.i64 = pSrcTag->i64;
6,062,612✔
352
    }
353

354
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
21,599,608✔
355
    tmpTag = (STagVal){0};
10,799,804✔
356
  }
357

358
  return code;
1,875,056✔
359
_return:
×
360
  if (pBasic->tagList) {
×
361
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
362
    pBasic->tagList = NULL;
×
363
  }
364
  if (tmpTag.pData) {
×
365
    taosMemoryFree(tmpTag.pData);
×
366
  }
367
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
368
  return code;
×
369
}
370

371
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
3,852,439✔
372
  int32_t     code = TSDB_CODE_SUCCESS;
3,852,439✔
373
  int32_t     lino = 0;
3,852,439✔
374
  SOrgTbInfo  batchInfo;
3,852,439✔
375

376
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
3,852,439✔
377
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
3,852,439✔
378

379
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
9,907,241✔
380
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
6,054,802✔
381
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
6,054,802✔
382
    batchInfo.vgId = pSrc->vgId;
6,054,802✔
383
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
6,054,802✔
384
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
6,054,802✔
385
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
6,054,802✔
386
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
12,109,604✔
387
    batchInfo = (SOrgTbInfo){0};
6,054,802✔
388
  }
389

390
  return code;
3,852,439✔
391
_return:
×
392
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
393
  if (pBasic->batchOrgTbInfo) {
×
394
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
395
    pBasic->batchOrgTbInfo = NULL;
×
396
  }
397
  if (batchInfo.colMap) {
×
398
    taosArrayDestroy(batchInfo.colMap);
×
399
    batchInfo.colMap = NULL;
×
400
  }
401
  return code;
×
402
}
403

404
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,406,050✔
405
  int32_t code = TSDB_CODE_SUCCESS;
7,406,050✔
406
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,406,050✔
407
  if (NULL == *ppRes) {
7,406,050✔
408
    code = terrno;
×
409
    freeOperatorParam(pChild, OP_GET_PARAM);
×
410
    return code;
×
411
  }
412
  if (pChild) {
7,406,050✔
413
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
149,760✔
414
    if (NULL == (*ppRes)->pChildren) {
149,760✔
415
      code = terrno;
×
416
      freeOperatorParam(pChild, OP_GET_PARAM);
×
417
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
418
      *ppRes = NULL;
×
419
      return code;
×
420
    }
421
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
299,520✔
422
      code = terrno;
×
423
      freeOperatorParam(pChild, OP_GET_PARAM);
×
424
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
425
      *ppRes = NULL;
×
426
      return code;
×
427
    }
428
  } else {
429
    (*ppRes)->pChildren = NULL;
7,256,290✔
430
  }
431

432
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,406,050✔
433
  if (NULL == pGc) {
7,406,050✔
434
    code = terrno;
×
435
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
436
    *ppRes = NULL;
×
437
    return code;
×
438
  }
439

440
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,406,050✔
441
  pGc->downstreamIdx = downstreamIdx;
7,406,050✔
442
  pGc->vgId = vgId;
7,406,050✔
443
  pGc->tbUid = tbUid;
7,406,050✔
444
  pGc->needCache = needCache;
7,406,050✔
445

446
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,406,050✔
447
  (*ppRes)->downstreamIdx = downstreamIdx;
7,406,050✔
448
  (*ppRes)->value = pGc;
7,406,050✔
449
  (*ppRes)->reUse = false;
7,406,050✔
450

451
  return TSDB_CODE_SUCCESS;
7,406,050✔
452
}
453

454

455
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
456
  int32_t code = TSDB_CODE_SUCCESS;
×
457
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
458
  if (NULL == *ppRes) {
×
459
    return terrno;
×
460
  }
461
  (*ppRes)->pChildren = NULL;
×
462

463
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
464
  if (NULL == pGc) {
×
465
    code = terrno;
×
466
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
467
    return code;
×
468
  }
469

470
  pGc->downstreamIdx = downstreamIdx;
×
471
  pGc->vgId = vgId;
×
472
  pGc->tbUid = tbUid;
×
473

474
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
475
  (*ppRes)->downstreamIdx = downstreamIdx;
×
476
  (*ppRes)->value = pGc;
×
477
  (*ppRes)->reUse = false;
×
478

479
  return TSDB_CODE_SUCCESS;
×
480
}
481

482
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
58,937,128✔
483
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
484
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
485
                                               SArray* pOrgTbInfoArray, STimeWindow window,
486
                                               SDownstreamSourceNode* pDownstreamSourceNode,
487
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
488
  int32_t code = TSDB_CODE_SUCCESS;
58,937,128✔
489
  int32_t lino = 0;
58,937,128✔
490

491
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
58,937,128✔
492
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
493

494
  pBasic->srcOpType = srcOpType;
58,937,128✔
495
  pBasic->vgId = vgId;
58,937,128✔
496
  pBasic->groupid = groupId;
58,937,128✔
497
  pBasic->window = window;
58,937,128✔
498
  pBasic->tableSeq = tableSeq;
58,937,128✔
499
  pBasic->type = exchangeType;
58,937,128✔
500
  pBasic->isNewParam = isNewParam;
58,937,128✔
501

502
  if (pDownstreamSourceNode) {
58,937,128✔
503
    pBasic->isNewDeployed = true;
2,299✔
504
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,299✔
505
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,299✔
506
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,299✔
507
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,299✔
508
    pBasic->newDeployedSrc.localExec = false;
2,299✔
509
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,299✔
510
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,299✔
511
  } else {
512
    pBasic->isNewDeployed = false;
58,934,829✔
513
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
58,934,829✔
514
  }
515

516
  if (pUidList) {
58,937,128✔
517
    pBasic->uidList = taosArrayDup(pUidList, NULL);
5,261,985✔
518
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
5,261,985✔
519
  } else {
520
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
53,675,143✔
521
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
53,675,143✔
522
  }
523

524
  if (pOrgTbInfo) {
58,937,128✔
525
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
7,746,048✔
526
    QUERY_CHECK_CODE(code, lino, _return);
7,746,048✔
527
  } else {
528
    pBasic->orgTbInfo = NULL;
51,191,080✔
529
  }
530

531
  if (pTagList) {
58,937,128✔
532
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
1,875,056✔
533
    QUERY_CHECK_CODE(code, lino, _return);
1,875,056✔
534
  } else {
535
    pBasic->tagList = NULL;
57,062,072✔
536
  }
537

538
  if (pOrgTbInfoArray) {
58,937,128✔
539
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
3,852,439✔
540
    QUERY_CHECK_CODE(code, lino, _return);
3,852,439✔
541
  } else {
542
    pBasic->batchOrgTbInfo = NULL;
55,084,689✔
543
  }
544
  return code;
58,937,128✔
545

546
_return:
×
547
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
548
  freeExchangeGetBasicOperatorParam(pBasic);
×
549
  return code;
×
550
}
551

552
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
54,851,173✔
553
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
554
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
555
                                              SArray* pOrgTbInfoArray, STimeWindow window,
556
                                              SDownstreamSourceNode* pDownstreamSourceNode,
557
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
558

559
  int32_t                      code = TSDB_CODE_SUCCESS;
54,851,173✔
560
  int32_t                      lino = 0;
54,851,173✔
561
  SOperatorParam*              pParam = NULL;
54,851,173✔
562
  SExchangeOperatorParam*      pExc = NULL;
54,851,173✔
563

564
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
54,851,173✔
565
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
54,851,173✔
566

567
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
54,851,173✔
568
  pParam->downstreamIdx = downstreamIdx;
54,851,173✔
569
  pParam->reUse = reUse;
54,851,173✔
570
  pParam->pChildren = NULL;
54,851,173✔
571
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
54,851,173✔
572
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
54,851,173✔
573

574
  pExc = (SExchangeOperatorParam*)pParam->value;
54,851,173✔
575
  pExc->multiParams = false;
54,851,173✔
576

577
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
54,851,173✔
578
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
579
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
580

581
  *ppRes = pParam;
54,851,173✔
582
  return code;
54,851,173✔
583
_return:
×
584
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
585
  if (pParam) {
×
586
    freeOperatorParam(pParam, OP_GET_PARAM);
×
587
  }
588
  return code;
×
589
}
590

591
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,392✔
592
  int32_t code = TSDB_CODE_SUCCESS;
4,392✔
593
  int32_t lino = 0;
4,392✔
594

595
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,392✔
596
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,392✔
597

598
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,392✔
599

600
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,392✔
601
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,392✔
602
  QUERY_CHECK_CODE(code, lino, _return);
4,392✔
603

604
_return:
4,392✔
605
  if (code) {
4,392✔
606
    qError("failed to build exchange operator param, code:%d", code);
×
607
  }
608
  taosArrayDestroy(pUidList);
4,392✔
609
  return code;
4,392✔
610
}
611

612
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
42,076,656✔
613
  int32_t                   code = TSDB_CODE_SUCCESS;
42,076,656✔
614
  int32_t                   lino = 0;
42,076,656✔
615

616
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_WIN_SCAN,
42,076,656✔
617
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
618
  QUERY_CHECK_CODE(code, lino, _return);
42,076,656✔
619

620
  return code;
42,076,656✔
621
_return:
×
622
  qError("failed to build exchange operator param for external window, code:%d", code);
×
623
  return code;
×
624
}
625

626
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
5,024,077✔
627
  int32_t                      code = TSDB_CODE_SUCCESS;
5,024,077✔
628
  int32_t                      lino = 0;
5,024,077✔
629
  SArray*                      pUidList = NULL;
5,024,077✔
630

631
  pUidList = taosArrayInit(1, sizeof(int64_t));
5,024,077✔
632
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
5,024,077✔
633

634
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
5,024,077✔
635

636
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
5,024,077✔
637
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
5,024,077✔
638
  QUERY_CHECK_CODE(code, lino, _return);
5,024,077✔
639

640
_return:
5,024,077✔
641
  if (code) {
5,024,077✔
642
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
643
  }
644
  taosArrayDestroy(pUidList);
5,024,077✔
645
  return code;
5,024,077✔
646
}
647

648
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
7,746,048✔
649
                                                  SDownstreamSourceNode* pNewSource) {
650
  int32_t                      code = TSDB_CODE_SUCCESS;
7,746,048✔
651
  int32_t                      lino = 0;
7,746,048✔
652

653
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
7,746,048✔
654
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
7,746,048✔
655
  QUERY_CHECK_CODE(code, lino, _return);
7,746,048✔
656

657
  return code;
7,746,048✔
658
_return:
×
659
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
660
  return code;
×
661
}
662

663
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
134,099✔
664
  int32_t                       code = TSDB_CODE_SUCCESS;
134,099✔
665
  int32_t                       line = 0;
134,099✔
666
  SOperatorParam*               pParam = NULL;
134,099✔
667
  SExchangeOperatorBatchParam*  pExc = NULL;
134,099✔
668
  SExchangeOperatorBasicParam   basic = {0};
134,099✔
669

670
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
134,099✔
671
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
134,099✔
672

673
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
134,099✔
674
  pParam->downstreamIdx = downstreamIdx;
134,099✔
675
  pParam->reUse = false;
134,099✔
676
  pParam->pChildren = NULL;
134,099✔
677
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
134,099✔
678
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
134,099✔
679

680
  pExc = pParam->value;
134,099✔
681
  pExc->multiParams = true;
134,099✔
682
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
134,099✔
683
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
134,099✔
684

685
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
134,099✔
686

687
  int32_t iter = 0;
134,099✔
688
  void*   p = NULL;
134,099✔
689
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
367,615✔
690
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
233,516✔
691
    SArray*  pUidList = *(SArray**)p;
233,516✔
692

693
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
233,516✔
694
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
695
                                           pUidList, NULL, NULL, NULL,
696
                                           (STimeWindow){0}, NULL, false, false, false);
233,516✔
697
    QUERY_CHECK_CODE(code, line, _return);
233,516✔
698

699
    // already transferred to batch param, can free here
700
    taosArrayDestroy(pUidList);
233,516✔
701

702
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
233,516✔
703

704
    basic = (SExchangeOperatorBasicParam){0};
233,516✔
705
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
233,516✔
706
    *(SArray**)p = NULL;
233,516✔
707
  }
708
  *ppRes = pParam;
134,099✔
709

710
  return code;
134,099✔
711
  
712
_return:
×
713
  qError("failed to build batch exchange operator param, code:%d", code);
×
714
  freeOperatorParam(pParam, OP_GET_PARAM);
×
715
  freeExchangeGetBasicOperatorParam(&basic);
×
716
  return code;
×
717
}
718

719
static int32_t buildBatchExchangeOperatorParamForVSAgg(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps) {
3,615,622✔
720
  int32_t                       code = TSDB_CODE_SUCCESS;
3,615,622✔
721
  int32_t                       lino = 0;
3,615,622✔
722
  SOperatorParam*               pParam = NULL;
3,615,622✔
723
  SExchangeOperatorBatchParam*  pExc = NULL;
3,615,622✔
724
  SExchangeOperatorBasicParam   basic = {0};
3,615,622✔
725

726
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
3,615,622✔
727
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
3,615,622✔
728

729
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
3,615,622✔
730
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
3,615,622✔
731

732
  pExc = pParam->value;
3,615,622✔
733
  pExc->multiParams = true;
3,615,622✔
734

735
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
3,615,622✔
736
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
3,615,622✔
737
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
3,615,622✔
738

739
  size_t keyLen = 0;
3,615,622✔
740
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
3,615,622✔
741
  while (pIter != NULL) {
7,468,061✔
742
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
3,852,439✔
743
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
3,852,439✔
744
    STimeWindow      win = {.skey = INT64_MAX, .ekey = INT64_MIN};
3,852,439✔
745

746
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
3,852,439✔
747
                                           EX_SRC_TYPE_VSTB_AGG_SCAN, *vgId, groupid,
748
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
749
                                           win, NULL, false, true, false);
750
    QUERY_CHECK_CODE(code, lino, _return);
3,852,439✔
751

752
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
3,852,439✔
753
    QUERY_CHECK_CODE(code, lino, _return);
3,852,439✔
754

755
    basic = (SExchangeOperatorBasicParam){0};
3,852,439✔
756
    pIter = taosHashIterate(pBatchMaps, pIter);
3,852,439✔
757
  }
758

759
  pParam->pChildren = NULL;
3,615,622✔
760
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
3,615,622✔
761
  pParam->downstreamIdx = downstreamIdx;
3,615,622✔
762
  pParam->reUse = false;
3,615,622✔
763

764
  *ppRes = pParam;
3,615,622✔
765
  return code;
3,615,622✔
766

767
_return:
×
768
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
769
  freeOperatorParam(pParam, OP_GET_PARAM);
×
770
  freeExchangeGetBasicOperatorParam(&basic);
×
771
  return code;
×
772
}
773

774
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,703,025✔
775
  int32_t code = TSDB_CODE_SUCCESS;
3,703,025✔
776
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,703,025✔
777
  if (NULL == *ppRes) {
3,703,025✔
778
    code = terrno;
×
779
    return code;
×
780
  }
781
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,703,025✔
782
  if (NULL == (*ppRes)->pChildren) {
3,703,025✔
783
    code = terrno;
×
784
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
785
    *ppRes = NULL;
×
786
    return code;
×
787
  }
788
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,406,050✔
789
    code = terrno;
×
790
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
791
    *ppRes = NULL;
×
792
    return code;
×
793
  }
794
  *ppChild0 = NULL;
3,703,025✔
795
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,406,050✔
796
    code = terrno;
×
797
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
798
    *ppRes = NULL;
×
799
    return code;
×
800
  }
801
  *ppChild1 = NULL;
3,703,025✔
802
  
803
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,703,025✔
804
  if (NULL == pJoin) {
3,703,025✔
805
    code = terrno;
×
806
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
807
    *ppRes = NULL;
×
808
    return code;
×
809
  }
810

811
  pJoin->initDownstream = initParam;
3,703,025✔
812
  
813
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,703,025✔
814
  (*ppRes)->value = pJoin;
3,703,025✔
815
  (*ppRes)->reUse = false;
3,703,025✔
816

817
  return TSDB_CODE_SUCCESS;
3,703,025✔
818
}
819

820
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
821
  int32_t code = TSDB_CODE_SUCCESS;
×
822
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
823
  if (NULL == *ppRes) {
×
824
    code = terrno;
×
825
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
826
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
827
    return code;
×
828
  }
829
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
830
  if (NULL == *ppRes) {
×
831
    code = terrno;
×
832
    taosMemoryFreeClear(*ppRes);
×
833
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
834
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
835
    return code;
×
836
  }
837
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
838
    code = terrno;
×
839
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
840
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
841
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
842
    *ppRes = NULL;
×
843
    return code;
×
844
  }
845
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
846
    code = terrno;
×
847
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
848
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
849
    *ppRes = NULL;
×
850
    return code;
×
851
  }
852
  
853
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
854
  (*ppRes)->value = NULL;
×
855
  (*ppRes)->reUse = false;
×
856

857
  return TSDB_CODE_SUCCESS;
×
858
}
859

860
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
11,269✔
861
  int32_t code = TSDB_CODE_SUCCESS;
11,269✔
862
  int32_t vgNum = tSimpleHashGetSize(pVg);
11,269✔
863
  if (vgNum <= 0 || vgNum > 1) {
11,269✔
864
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
865
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
866
  }
867

868
  int32_t iter = 0;
11,269✔
869
  void* p = NULL;
11,269✔
870
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
22,538✔
871
    SArray* pUidList = *(SArray**)p;
11,269✔
872

873
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
11,269✔
874
    if (code) {
11,269✔
875
      return code;
×
876
    }
877
    taosArrayDestroy(pUidList);
11,269✔
878
    *(SArray**)p = NULL;
11,269✔
879
  }
880
  
881
  return TSDB_CODE_SUCCESS;
11,269✔
882
}
883

884
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
885
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
886
  if (NULL == pUidList) {
×
887
    return terrno;
×
888
  }
889
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
890
    return terrno;
×
891
  }
892

893
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
894
  taosArrayDestroy(pUidList);
×
895
  if (code) {
×
896
    return code;
×
897
  }
898
  
899
  return TSDB_CODE_SUCCESS;
×
900
}
901

902
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,703,025✔
903
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,703,025✔
904
  SOperatorParam*             pSrcParam0 = NULL;
3,703,025✔
905
  SOperatorParam*             pSrcParam1 = NULL;
3,703,025✔
906
  SOperatorParam*             pGcParam0 = NULL;
3,703,025✔
907
  SOperatorParam*             pGcParam1 = NULL;  
3,703,025✔
908
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,703,025✔
909
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,703,025✔
910
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,703,025✔
911
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,703,025✔
912
  int32_t                     code = TSDB_CODE_SUCCESS;
3,703,025✔
913

914
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,703,025✔
915
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
916

917
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,703,025✔
918
  
919
  if (pInfo->stbJoin.basic.batchFetch) {
3,703,025✔
920
    if (pPrev->leftHash) {
3,700,829✔
921
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
72,684✔
922
      if (TSDB_CODE_SUCCESS == code) {
72,684✔
923
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
72,684✔
924
      }
925
      if (TSDB_CODE_SUCCESS == code) {
72,684✔
926
        tSimpleHashCleanup(pPrev->leftHash);
72,684✔
927
        tSimpleHashCleanup(pPrev->rightHash);
72,684✔
928
        pPrev->leftHash = NULL;
72,684✔
929
        pPrev->rightHash = NULL;
72,684✔
930
      }
931
    }
932
  } else {
933
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,196✔
934
    if (TSDB_CODE_SUCCESS == code) {
2,196✔
935
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,196✔
936
    }
937
  }
938

939
  bool initParam = pSrcParam0 ? true : false;
3,703,025✔
940
  if (TSDB_CODE_SUCCESS == code) {
3,703,025✔
941
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,703,025✔
942
    pSrcParam0 = NULL;
3,703,025✔
943
  }
944
  if (TSDB_CODE_SUCCESS == code) {
3,703,025✔
945
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,703,025✔
946
    pSrcParam1 = NULL;
3,703,025✔
947
  }
948
  if (TSDB_CODE_SUCCESS == code) {
3,703,025✔
949
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,703,025✔
950
  }
951
  if (TSDB_CODE_SUCCESS != code) {
3,703,025✔
952
    if (pSrcParam0) {
×
953
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
954
    }
955
    if (pSrcParam1) {
×
956
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
957
    }
958
    if (pGcParam0) {
×
959
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
960
    }
961
    if (pGcParam1) {
×
962
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
963
    }
964
    if (*ppParam) {
×
965
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
966
      *ppParam = NULL;
×
967
    }
968
  }
969
  
970
  return code;
3,703,025✔
971
}
972

973
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
5,024,077✔
974
  int32_t                   code = TSDB_CODE_SUCCESS;
5,024,077✔
975
  int32_t                   lino = 0;
5,024,077✔
976
  SVTableScanOperatorParam* pVScan = NULL;
5,024,077✔
977
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
5,024,077✔
978
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
5,024,077✔
979

980
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
5,024,077✔
981
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
5,024,077✔
982

983
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
5,024,077✔
984
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
5,024,077✔
985
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
5,024,077✔
986
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
5,024,077✔
987
  pVScan->uid = uid;
5,024,077✔
988
  pVScan->window = pInfo->vtbScan.window;
5,024,077✔
989

990
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
5,024,077✔
991
  (*ppRes)->downstreamIdx = 0;
5,024,077✔
992
  (*ppRes)->value = pVScan;
5,024,077✔
993
  (*ppRes)->reUse = false;
5,024,077✔
994

995
  return TSDB_CODE_SUCCESS;
5,024,077✔
996
_return:
×
997
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
998
  if (pVScan) {
×
999
    taosArrayDestroy(pVScan->pOpParamArray);
×
1000
    taosMemoryFreeClear(pVScan);
×
1001
  }
1002
  if (*ppRes) {
×
1003
    taosArrayDestroy((*ppRes)->pChildren);
×
1004
    taosMemoryFreeClear(*ppRes);
×
1005
  }
1006
  return code;
×
1007
}
1008

1009
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
42,076,656✔
1010
  int32_t                       code = TSDB_CODE_SUCCESS;
42,076,656✔
1011
  int32_t                       lino = 0;
42,076,656✔
1012
  SExternalWindowOperatorParam* pExtWinOp = NULL;
42,076,656✔
1013

1014
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
42,076,656✔
1015
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
42,076,656✔
1016

1017
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
42,076,656✔
1018
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
42,076,656✔
1019

1020
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
42,076,656✔
1021
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
42,076,656✔
1022

1023
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
42,076,656✔
1024
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
42,076,656✔
1025

1026
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
42,076,656✔
1027
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
42,076,656✔
1028

1029
  SOperatorParam* pExchangeOperator = NULL;
42,076,656✔
1030
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
42,076,656✔
1031
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
42,076,656✔
1032
  QUERY_CHECK_CODE(code, lino, _return);
42,076,656✔
1033
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
84,153,312✔
1034

1035
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
42,076,656✔
1036
  (*ppRes)->downstreamIdx = idx;
42,076,656✔
1037
  (*ppRes)->value = pExtWinOp;
42,076,656✔
1038
  (*ppRes)->reUse = false;
42,076,656✔
1039

1040
  return code;
42,076,656✔
1041
_return:
×
1042
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1043
  if (pExtWinOp) {
×
1044
    if (pExtWinOp->ExtWins) {
×
1045
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1046
    }
1047
    taosMemoryFree(pExtWinOp);
×
1048
  }
1049
  if (*ppRes) {
×
1050
    if ((*ppRes)->pChildren) {
×
1051
      taosArrayDestroy((*ppRes)->pChildren);
×
1052
    }
1053
    taosMemoryFree(*ppRes);
×
1054
    *ppRes = NULL;
×
1055
  }
1056
  return code;
×
1057
}
1058

1059
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
15,449,652✔
1060
                                       int32_t numOfDownstream, int32_t numOfWins) {
1061
  int32_t                   code = TSDB_CODE_SUCCESS;
15,449,652✔
1062
  int32_t                   lino = 0;
15,449,652✔
1063
  SMergeOperatorParam*      pMergeOp = NULL;
15,449,652✔
1064

1065
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
15,449,652✔
1066
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
15,449,652✔
1067

1068
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
15,449,652✔
1069
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
15,449,652✔
1070

1071
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
15,449,652✔
1072
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
15,449,652✔
1073

1074
  pMergeOp->winNum = numOfWins;
15,449,652✔
1075

1076
  for (int32_t i = 0; i < numOfDownstream; i++) {
57,526,308✔
1077
    SOperatorParam* pExternalWinParam = NULL;
42,076,656✔
1078
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
42,076,656✔
1079
    QUERY_CHECK_CODE(code, lino, _return);
42,076,656✔
1080
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
84,153,312✔
1081
  }
1082

1083
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
15,449,652✔
1084
  (*ppRes)->downstreamIdx = 0;
15,449,652✔
1085
  (*ppRes)->value = pMergeOp;
15,449,652✔
1086
  (*ppRes)->reUse = false;
15,449,652✔
1087

1088
  return TSDB_CODE_SUCCESS;
15,449,652✔
1089
_return:
×
1090
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1091
  if (pMergeOp) {
×
1092
    taosMemoryFree(pMergeOp);
×
1093
  }
1094
  if (*ppRes) {
×
1095
    if ((*ppRes)->pChildren) {
×
1096
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1097
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
1098
        if (pChildParam) {
×
1099
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
1100
          if (pExtWinOp) {
×
1101
            if (pExtWinOp->ExtWins) {
×
1102
              taosArrayDestroy(pExtWinOp->ExtWins);
×
1103
            }
1104
            taosMemoryFree(pExtWinOp);
×
1105
          }
1106
          taosMemoryFree(pChildParam);
×
1107
        }
1108
      }
1109
      taosArrayDestroy((*ppRes)->pChildren);
×
1110
    }
1111
    taosMemoryFree(*ppRes);
×
1112
    *ppRes = NULL;
×
1113
  }
1114
  return code;
×
1115
}
1116

1117
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
79,174✔
1118
  int32_t                   code = TSDB_CODE_SUCCESS;
79,174✔
1119
  int32_t                   lino = 0;
79,174✔
1120
  SOperatorParam*           pParam = NULL;
79,174✔
1121
  SOperatorParam*           pExchangeParam = NULL;
79,174✔
1122
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
79,174✔
1123
  bool                      freeExchange = false;
79,174✔
1124

1125
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
79,174✔
1126
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
79,174✔
1127

1128
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
79,174✔
1129
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
79,174✔
1130

1131
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
79,174✔
1132
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
79,174✔
1133

1134
  code = buildBatchExchangeOperatorParamForVSAgg(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap);
79,174✔
1135
  QUERY_CHECK_CODE(code, lino, _return);
79,174✔
1136

1137
  freeExchange = true;
79,174✔
1138

1139
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
158,348✔
1140

1141
  freeExchange = false;
79,174✔
1142

1143
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
79,174✔
1144
  pParam->downstreamIdx = 0;
79,174✔
1145
  pParam->reUse = false;
79,174✔
1146

1147
  *ppRes = pParam;
79,174✔
1148

1149
  return code;
79,174✔
1150
_return:
×
1151
  if (freeExchange) {
×
1152
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1153
  }
1154
  if (pParam) {
×
1155
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1156
  }
1157
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1158
  return code;
×
1159
}
1160

1161
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
2,309,560✔
1162
  int32_t                   code = TSDB_CODE_SUCCESS;
2,309,560✔
1163
  int32_t                   lino = 0;
2,309,560✔
1164
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,309,560✔
1165
  SOperatorParam*           pParam = NULL;
2,309,560✔
1166
  SOperatorParam*           pExchangeParam = NULL;
2,309,560✔
1167
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
2,309,560✔
1168
  bool                      freeExchange = false;
2,309,560✔
1169
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
2,309,560✔
1170

1171
  if (!pIter) {
2,309,560✔
1172
    *ppRes = NULL;
399,640✔
1173
    return code;
399,640✔
1174
  }
1175

1176
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
1,909,920✔
1177

1178
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
1,909,920✔
1179
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
1,909,920✔
1180

1181
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
1,909,920✔
1182
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
1,909,920✔
1183

1184
  code = buildBatchExchangeOperatorParamForVSAgg(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap);
1,909,920✔
1185
  QUERY_CHECK_CODE(code, lino, _return);
1,909,920✔
1186

1187
  freeExchange = true;
1,909,920✔
1188

1189
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
3,819,840✔
1190

1191
  freeExchange = false;
1,909,920✔
1192

1193
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
1,909,920✔
1194
  pParam->downstreamIdx = 0;
1,909,920✔
1195
  pParam->value = NULL;
1,909,920✔
1196
  pParam->reUse = false;
1,909,920✔
1197

1198
  *ppRes = pParam;
1,909,920✔
1199

1200
  return code;
1,909,920✔
1201
_return:
×
1202
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1203
  if (freeExchange) {
×
1204
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1205
  }
1206
  if (pParam) {
×
1207
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1208
  }
1209
  return code;
×
1210
}
1211

1212
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
2,153,888✔
1213
  int32_t                   code = TSDB_CODE_SUCCESS;
2,153,888✔
1214
  int32_t                   lino = 0;
2,153,888✔
1215
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,153,888✔
1216
  SOperatorParam*           pParam = NULL;
2,153,888✔
1217
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
2,153,888✔
1218
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
2,153,888✔
1219

1220
  if (pIter) {
2,153,888✔
1221
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
1,626,528✔
1222

1223
    code = buildBatchExchangeOperatorParamForVSAgg(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap);
1,626,528✔
1224
    QUERY_CHECK_CODE(code, lino, _return);
1,626,528✔
1225

1226
    *ppRes = pParam;
1,626,528✔
1227
  } else {
1228
    *ppRes = NULL;
527,360✔
1229
  }
1230

1231
  return code;
2,153,888✔
1232
_return:
×
1233
  if (pParam) {
×
1234
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1235
  }
1236
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1237
  return code;
×
1238
}
1239

1240
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,703,025✔
1241
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,703,025✔
1242
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,703,025✔
1243
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,703,025✔
1244
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,703,025✔
1245
  SOperatorParam*            pParam = NULL;
3,703,025✔
1246
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,703,025✔
1247
  if (TSDB_CODE_SUCCESS != code) {
3,703,025✔
1248
    pOperator->pTaskInfo->code = code;
×
1249
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1250
  }
1251

1252
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,703,025✔
1253
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,703,025✔
1254
  if (*ppRes && (code == 0)) {
3,703,025✔
1255
    code = blockDataCheck(*ppRes);
208,309✔
1256
    if (code) {
208,309✔
1257
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1258
      pOperator->pTaskInfo->code = code;
×
1259
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1260
    }
1261
    pPost->isStarted = true;
208,309✔
1262
    pStbJoin->execInfo.postBlkNum++;
208,309✔
1263
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
208,309✔
1264
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
208,309✔
1265
  } else {
1266
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,494,716✔
1267
  }
1268
}
3,703,025✔
1269

1270

1271
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1272
  SOperatorParam* pGcParam = NULL;
×
1273
  SOperatorParam* pMergeJoinParam = NULL;
×
1274
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1275
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1276
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1277

1278
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1279

1280
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1281
  if (TSDB_CODE_SUCCESS != code) {
×
1282
    return code;
×
1283
  }
1284
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1285
  if (TSDB_CODE_SUCCESS != code) {
×
1286
    return code;
×
1287
  }
1288

1289
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1290
}
1291

1292
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,702,531✔
1293
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,702,531✔
1294
  int32_t code = 0;
3,702,531✔
1295
  
1296
  pPost->isStarted = false;
3,702,531✔
1297
  
1298
  if (pStbJoin->basic.batchFetch) {
3,702,531✔
1299
    return TSDB_CODE_SUCCESS;
3,700,335✔
1300
  }
1301
  
1302
  if (pPost->leftNeedCache) {
2,196✔
1303
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1304
    if (num && --(*num) <= 0) {
×
1305
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1306
      if (code) {
×
1307
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1308
        QRY_ERR_RET(code);
×
1309
      }
1310
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1311
    }
1312
  }
1313
  
1314
  if (!pPost->rightNeedCache) {
2,196✔
1315
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,196✔
1316
    if (NULL != v) {
2,196✔
1317
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1318
      if (code) {
×
1319
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1320
        QRY_ERR_RET(code);
×
1321
      }
1322
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1323
    }
1324
  }
1325

1326
  return TSDB_CODE_SUCCESS;
2,196✔
1327
}
1328

1329

1330
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1331
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
281,597✔
1332
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
281,597✔
1333
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
281,597✔
1334

1335
  if (!pPost->isStarted) {
281,597✔
1336
    return TSDB_CODE_SUCCESS;
73,782✔
1337
  }
1338
  
1339
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
207,815✔
1340
  
1341
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
207,815✔
1342
  if (NULL == *ppRes) {
207,815✔
1343
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
207,815✔
1344
    pPrev->pListHead->readIdx++;
207,815✔
1345
  } else {
1346
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1347
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1348
  }
1349

1350
  return TSDB_CODE_SUCCESS;
207,815✔
1351
}
1352

1353
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1354
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,405,610✔
1355
  if (NULL == ppArray) {
7,405,610✔
1356
    SArray* pArray = taosArrayInit(10, valSize);
244,785✔
1357
    if (NULL == pArray) {
244,785✔
1358
      return terrno;
×
1359
    }
1360
    if (NULL == taosArrayPush(pArray, pVal)) {
489,570✔
1361
      taosArrayDestroy(pArray);
×
1362
      return terrno;
×
1363
    }
1364
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
244,785✔
1365
      taosArrayDestroy(pArray);      
×
1366
      return terrno;
×
1367
    }
1368
    return TSDB_CODE_SUCCESS;
244,785✔
1369
  }
1370

1371
  if (NULL == taosArrayPush(*ppArray, pVal)) {
14,321,650✔
1372
    return terrno;
×
1373
  }
1374
  
1375
  return TSDB_CODE_SUCCESS;
7,160,825✔
1376
}
1377

1378
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1379
  int32_t code = TSDB_CODE_SUCCESS;
2,196✔
1380
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,196✔
1381
  if (NULL == pNum) {
2,196✔
1382
    uint32_t n = 1;
2,196✔
1383
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,196✔
1384
    if (code) {
2,196✔
1385
      return code;
×
1386
    }
1387
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,196✔
1388
    if (code) {
2,196✔
1389
      return code;
×
1390
    }
1391
    return TSDB_CODE_SUCCESS;
2,196✔
1392
  }
1393

1394
  switch (*pNum) {
×
1395
    case 0:
×
1396
      break;
×
1397
    case UINT32_MAX:
×
1398
      *pNum = 0;
×
1399
      break;
×
1400
    default:
×
1401
      if (1 == (*pNum)) {
×
1402
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1403
        if (code) {
×
1404
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1405
          QRY_ERR_RET(code);
×
1406
        }
1407
      }
1408
      (*pNum)++;
×
1409
      break;
×
1410
  }
1411
  
1412
  return TSDB_CODE_SUCCESS;
×
1413
}
1414

1415

1416
static void freeStbJoinTableList(SStbJoinTableList* pList) {
73,288✔
1417
  if (NULL == pList) {
73,288✔
1418
    return;
×
1419
  }
1420
  taosMemoryFree(pList->pLeftVg);
73,288✔
1421
  taosMemoryFree(pList->pLeftUid);
73,288✔
1422
  taosMemoryFree(pList->pRightVg);
73,288✔
1423
  taosMemoryFree(pList->pRightUid);
73,288✔
1424
  taosMemoryFree(pList);
73,288✔
1425
}
1426

1427
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
73,782✔
1428
  int32_t code = TSDB_CODE_SUCCESS;
73,782✔
1429
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
73,782✔
1430
  if (NULL == pNew) {
73,782✔
1431
    return terrno;
×
1432
  }
1433
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
73,782✔
1434
  if (NULL == pNew->pLeftVg) {
73,782✔
1435
    code = terrno;
×
1436
    freeStbJoinTableList(pNew);
×
1437
    return code;
×
1438
  }
1439
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
73,782✔
1440
  if (NULL == pNew->pLeftUid) {
73,782✔
1441
    code = terrno;
×
1442
    freeStbJoinTableList(pNew);
×
1443
    return code;
×
1444
  }
1445
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
73,782✔
1446
  if (NULL == pNew->pRightVg) {
73,782✔
1447
    code = terrno;
×
1448
    freeStbJoinTableList(pNew);
×
1449
    return code;
×
1450
  }
1451
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
73,782✔
1452
  if (NULL == pNew->pRightUid) {
73,782✔
1453
    code = terrno;
×
1454
    freeStbJoinTableList(pNew);
×
1455
    return code;
×
1456
  }
1457

1458
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
73,782✔
1459
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
73,782✔
1460
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
73,782✔
1461
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
73,782✔
1462

1463
  pNew->readIdx = 0;
73,782✔
1464
  pNew->uidNum = rows;
73,782✔
1465
  pNew->pNext = NULL;
73,782✔
1466
  
1467
  if (pCtx->pListTail) {
73,782✔
1468
    pCtx->pListTail->pNext = pNew;
×
1469
    pCtx->pListTail = pNew;
×
1470
  } else {
1471
    pCtx->pListHead = pNew;
73,782✔
1472
    pCtx->pListTail= pNew;
73,782✔
1473
  }
1474

1475
  return TSDB_CODE_SUCCESS;
73,782✔
1476
}
1477

1478
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
73,782✔
1479
  int32_t                    code = TSDB_CODE_SUCCESS;
73,782✔
1480
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
73,782✔
1481
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
73,782✔
1482
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
73,782✔
1483
  if (NULL == pVg0) {
73,782✔
1484
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1485
  }
1486
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
73,782✔
1487
  if (NULL == pVg1) {
73,782✔
1488
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1489
  }
1490
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
73,782✔
1491
  if (NULL == pUid0) {
73,782✔
1492
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1493
  }
1494
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
73,782✔
1495
  if (NULL == pUid1) {
73,782✔
1496
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1497
  }
1498

1499
  if (pStbJoin->basic.batchFetch) {
73,782✔
1500
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,775,489✔
1501
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,702,805✔
1502
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,702,805✔
1503
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,702,805✔
1504
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,702,805✔
1505

1506
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,702,805✔
1507
      if (TSDB_CODE_SUCCESS != code) {
3,702,805✔
1508
        break;
×
1509
      }
1510
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,702,805✔
1511
      if (TSDB_CODE_SUCCESS != code) {
3,702,805✔
1512
        break;
×
1513
      }
1514
    }
1515
  } else {
1516
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,294✔
1517
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,196✔
1518
    
1519
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,196✔
1520
      if (TSDB_CODE_SUCCESS != code) {
2,196✔
1521
        break;
×
1522
      }
1523
    }
1524
  }
1525

1526
  if (TSDB_CODE_SUCCESS == code) {
73,782✔
1527
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
73,782✔
1528
    if (TSDB_CODE_SUCCESS == code) {
73,782✔
1529
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
73,782✔
1530
    }
1531
  }
1532

1533
_return:
×
1534

1535
  if (TSDB_CODE_SUCCESS != code) {
73,782✔
1536
    pOperator->pTaskInfo->code = code;
×
1537
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1538
  }
1539
}
73,782✔
1540

1541

1542
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,143,117✔
1543
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,143,117✔
1544
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,143,117✔
1545

1546
  if (pStbJoin->basic.batchFetch) {
1,143,117✔
1547
    return;
1,142,019✔
1548
  }
1549

1550
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,098✔
1551
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,098✔
1552
    return;
1,098✔
1553
  }
1554

1555
  uint64_t* pUid = NULL;
×
1556
  int32_t iter = 0;
×
1557
  int32_t code = 0;
×
1558
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1559
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1560
    if (code) {
×
1561
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1562
    }
1563
  }
1564

1565
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1566
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1567

1568
/*
1569
  // debug only
1570
  iter = 0;
1571
  uint32_t* num = NULL;
1572
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1573
    A S S E R T(*num > 1);
1574
  }
1575
*/  
1576
}
1577

1578
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,143,117✔
1579
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,143,117✔
1580
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,143,117✔
1581

1582
  while (true) {
73,782✔
1583
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,216,899✔
1584
    if (NULL == pBlock) {
1,216,899✔
1585
      break;
1,143,117✔
1586
    }
1587

1588
    pStbJoin->execInfo.prevBlkNum++;
73,782✔
1589
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
73,782✔
1590
    
1591
    doBuildStbJoinTableHash(pOperator, pBlock);
73,782✔
1592
  }
1593

1594
  postProcessStbJoinTableHash(pOperator);
1,143,117✔
1595

1596
  pStbJoin->ctx.prev.joinBuild = true;
1,143,117✔
1597
}
1,143,117✔
1598

1599
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
281,597✔
1600
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
281,597✔
1601
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
281,597✔
1602
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
281,597✔
1603
  SStbJoinTableList*         pNode = pPrev->pListHead;
281,597✔
1604

1605
  while (pNode) {
3,849,601✔
1606
    if (pNode->readIdx >= pNode->uidNum) {
3,776,313✔
1607
      pPrev->pListHead = pNode->pNext;
73,288✔
1608
      freeStbJoinTableList(pNode);
73,288✔
1609
      pNode = pPrev->pListHead;
73,288✔
1610
      continue;
73,288✔
1611
    }
1612
    
1613
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,703,025✔
1614
    if (*ppRes) {
3,703,025✔
1615
      return TSDB_CODE_SUCCESS;
208,309✔
1616
    }
1617

1618
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,494,716✔
1619
    pPrev->pListHead->readIdx++;
3,494,716✔
1620
  }
1621

1622
  *ppRes = NULL;
73,288✔
1623
  setOperatorCompleted(pOperator);
73,288✔
1624

1625
  return TSDB_CODE_SUCCESS;
73,288✔
1626
}
1627

1628
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,350,932✔
1629
  if (pBlock) {
1,350,932✔
1630
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
208,309✔
1631
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
208,309✔
1632
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
208,309✔
1633

1634
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
210,505✔
1635
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,196✔
1636
        if (pSlot == NULL) {
2,196✔
1637
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1638
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1639
        }
1640
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,196✔
1641
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,196✔
1642
        if (code != TSDB_CODE_SUCCESS) {
2,196✔
1643
          return code;
×
1644
        }
1645
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,196✔
1646
        if (code != TSDB_CODE_SUCCESS) {
2,196✔
1647
          return code;
×
1648
        }
1649
      }
1650
    } else {
1651
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1652
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1653
    }
1654
  }
1655
  return TSDB_CODE_SUCCESS;
1,350,932✔
1656
}
1657

1658
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,378,333✔
1659
  int32_t                    code = TSDB_CODE_SUCCESS;
1,378,333✔
1660
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,378,333✔
1661
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,378,333✔
1662

1663
  QRY_PARAM_CHECK(pRes);
1,378,333✔
1664
  if (pOperator->status == OP_EXEC_DONE) {
1,378,333✔
1665
    return code;
27,401✔
1666
  }
1667

1668
  int64_t st = 0;
1,350,932✔
1669
  if (pOperator->cost.openCost == 0) {
1,350,932✔
1670
    st = taosGetTimestampUs();
1,142,627✔
1671
  }
1672

1673
  if (!pStbJoin->ctx.prev.joinBuild) {
1,350,932✔
1674
    buildStbJoinTableList(pOperator);
1,143,117✔
1675
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,143,117✔
1676
      setOperatorCompleted(pOperator);
1,069,335✔
1677
      goto _return;
1,069,335✔
1678
    }
1679
  }
1680

1681
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
281,597✔
1682
  if (*pRes) {
281,597✔
1683
    goto _return;
×
1684
  }
1685

1686
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
281,597✔
1687

1688
_return:
281,597✔
1689
  if (pOperator->cost.openCost == 0) {
1,350,932✔
1690
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,142,627✔
1691
  }
1692

1693
  if (code) {
1,350,932✔
1694
    qError("%s failed since %s", __func__, tstrerror(code));
×
1695
    pOperator->pTaskInfo->code = code;
×
1696
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1697
  } else {
1698
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,350,932✔
1699
  }
1700
  return code;
1,350,932✔
1701
}
1702

1703
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,204,020✔
1704
  int32_t                    lino = 0;
2,204,020✔
1705
  SOperatorInfo*             operator=(SOperatorInfo*) param;
2,204,020✔
1706
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
2,204,020✔
1707

1708
  if (TSDB_CODE_SUCCESS != code) {
2,204,020✔
1709
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1710
    if (operator->pTaskInfo->code != code) {
×
1711
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1712
             tstrerror(operator->pTaskInfo->code));
1713
    } else {
1714
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1715
    }
1716
    goto _return;
×
1717
  }
1718

1719
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
2,204,020✔
1720
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
2,204,020✔
1721

1722
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
2,204,020✔
1723
  QUERY_CHECK_CODE(code, lino, _return);
2,204,020✔
1724

1725
  taosMemoryFreeClear(pMsg->pData);
2,204,020✔
1726

1727
  code = tsem_post(&pScanResInfo->vtbScan.ready);
2,204,020✔
1728
  QUERY_CHECK_CODE(code, lino, _return);
2,204,020✔
1729

1730
  return code;
2,204,020✔
1731
_return:
×
1732
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1733
  return code;
×
1734
}
1735

1736
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
2,204,020✔
1737
  int32_t                    code = TSDB_CODE_SUCCESS;
2,204,020✔
1738
  int32_t                    lino = 0;
2,204,020✔
1739
  char*                      buf1 = NULL;
2,204,020✔
1740
  SUseDbReq*                 pReq = NULL;
2,204,020✔
1741
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
2,204,020✔
1742

1743
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
2,204,020✔
1744
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
2,204,020✔
1745
  code = tNameGetFullDbName(name, pReq->db);
2,204,020✔
1746
  QUERY_CHECK_CODE(code, lino, _return);
2,204,020✔
1747
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
2,204,020✔
1748
  buf1 = taosMemoryCalloc(1, contLen);
2,204,020✔
1749
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
2,204,020✔
1750
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
2,204,020✔
1751
  if (tempRes < 0) {
2,204,020✔
1752
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1753
  }
1754

1755
  // send the fetch remote task result request
1756
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,204,020✔
1757
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
2,204,020✔
1758

1759
  pMsgSendInfo->param = pOperator;
2,204,020✔
1760
  pMsgSendInfo->msgInfo.pData = buf1;
2,204,020✔
1761
  pMsgSendInfo->msgInfo.len = contLen;
2,204,020✔
1762
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
2,204,020✔
1763
  pMsgSendInfo->fp = dynProcessUseDbRsp;
2,204,020✔
1764
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
2,204,020✔
1765

1766
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
2,204,020✔
1767
  QUERY_CHECK_CODE(code, lino, _return);
2,204,020✔
1768

1769
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
2,204,020✔
1770
  QUERY_CHECK_CODE(code, lino, _return);
2,204,020✔
1771

1772
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
2,204,020✔
1773
  QUERY_CHECK_CODE(code, lino, _return);
2,204,020✔
1774

1775
_return:
2,204,020✔
1776
  if (code) {
2,204,020✔
1777
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1778
     taosMemoryFree(buf1);
×
1779
  }
1780
  taosMemoryFree(pReq);
2,204,020✔
1781
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
2,204,020✔
1782
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
2,204,020✔
1783
  return code;
2,204,020✔
1784
}
1785

1786
int dynVgInfoComp(const void* lp, const void* rp) {
4,398,322✔
1787
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
4,398,322✔
1788
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
4,398,322✔
1789
  if (pLeft->hashBegin < pRight->hashBegin) {
4,398,322✔
1790
    return -1;
4,398,322✔
1791
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1792
    return 1;
×
1793
  }
1794

1795
  return 0;
×
1796
}
1797

1798
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
13,709,616✔
1799
  if (NULL == dbInfo) {
13,709,616✔
1800
    return TSDB_CODE_SUCCESS;
×
1801
  }
1802

1803
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
13,709,616✔
1804
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
2,204,020✔
1805
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
2,204,020✔
1806
    if (NULL == dbInfo->vgArray) {
2,204,020✔
1807
      return terrno;
×
1808
    }
1809

1810
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
2,204,020✔
1811
    while (pIter) {
6,607,201✔
1812
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
8,806,362✔
1813
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1814
        return terrno;
×
1815
      }
1816

1817
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
4,403,181✔
1818
    }
1819

1820
    taosArraySort(dbInfo->vgArray, sort_func);
2,204,020✔
1821
  }
1822

1823
  return TSDB_CODE_SUCCESS;
13,709,616✔
1824
}
1825

1826
int32_t dynHashValueComp(void const* lp, void const* rp) {
20,648,403✔
1827
  uint32_t*    key = (uint32_t*)lp;
20,648,403✔
1828
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
20,648,403✔
1829

1830
  if (*key < pVg->hashBegin) {
20,648,403✔
1831
    return -1;
×
1832
  } else if (*key > pVg->hashEnd) {
20,648,403✔
1833
    return 1;
6,938,787✔
1834
  }
1835

1836
  return 0;
13,709,616✔
1837
}
1838

1839
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
13,709,616✔
1840
  int32_t code = 0;
13,709,616✔
1841
  int32_t lino = 0;
13,709,616✔
1842
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
13,709,616✔
1843
  QUERY_CHECK_CODE(code, lino, _return);
13,709,616✔
1844

1845
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
13,709,616✔
1846
  if (vgNum <= 0) {
13,709,616✔
1847
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1848
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1849
  }
1850

1851
  SVgroupInfo* vgInfo = NULL;
13,709,616✔
1852
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
13,709,616✔
1853
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
13,709,616✔
1854
  int32_t offset = (int32_t)strlen(tbFullName);
13,709,616✔
1855

1856
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
13,709,616✔
1857
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
27,419,232✔
1858
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
13,709,616✔
1859

1860
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
13,709,616✔
1861
  if (NULL == vgInfo) {
13,709,616✔
1862
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1863
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1864
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1865
  }
1866

1867
  *vgId = vgInfo->vgId;
13,709,616✔
1868

1869
_return:
13,709,616✔
1870
  return code;
13,709,616✔
1871
}
1872

1873
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
106,497,449✔
1874
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
106,497,449✔
1875
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
106,497,449✔
1876
  SArray *                   pColList = pVtbScan->readColList;
106,497,449✔
1877
  if (pVtbScan->scanAllCols) {
106,497,449✔
1878
    return true;
10,019,874✔
1879
  }
1880
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
573,619,567✔
1881
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
506,986,332✔
1882
      return true;
29,844,340✔
1883
    }
1884
  }
1885
  return false;
66,633,235✔
1886
}
1887

1888
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
39,889,164✔
1889
  int32_t                    code = TSDB_CODE_SUCCESS;
39,889,164✔
1890
  int32_t                    line = 0;
39,889,164✔
1891
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
39,889,164✔
1892
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
39,889,164✔
1893
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
39,889,164✔
1894
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
39,889,164✔
1895
  SUseDbOutput*              output = NULL;
39,889,164✔
1896
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
39,889,164✔
1897

1898
  QRY_PARAM_CHECK(dbVgInfo);
39,889,164✔
1899

1900
  if (find == NULL) {
39,889,164✔
1901
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
2,204,020✔
1902
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
2,204,020✔
1903
    QUERY_CHECK_CODE(code, line, _return);
2,204,020✔
1904
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
2,204,020✔
1905
    QUERY_CHECK_CODE(code, line, _return);
2,204,020✔
1906
  } else {
1907
    output = *find;
37,685,144✔
1908
  }
1909

1910
  *dbVgInfo = output->dbVgroup;
39,889,164✔
1911
  return code;
39,889,164✔
1912
_return:
×
1913
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1914
  freeUseDbOutput(output);
×
1915
  return code;
×
1916
}
1917

1918
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
39,889,164✔
1919
  int32_t     code = TSDB_CODE_SUCCESS;
39,889,164✔
1920
  int32_t     line = 0;
39,889,164✔
1921

1922
  const char *first_dot = strchr(colref, '.');
39,889,164✔
1923
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
39,889,164✔
1924

1925
  const char *second_dot = strchr(first_dot + 1, '.');
39,889,164✔
1926
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
39,889,164✔
1927

1928
  size_t db_len = first_dot - colref;
39,889,164✔
1929
  size_t table_len = second_dot - first_dot - 1;
39,889,164✔
1930
  size_t col_len = strlen(second_dot + 1);
39,889,164✔
1931

1932
  *refDb = taosMemoryMalloc(db_len + 1);
39,889,164✔
1933
  *refTb = taosMemoryMalloc(table_len + 1);
39,889,164✔
1934
  *refCol = taosMemoryMalloc(col_len + 1);
39,889,164✔
1935
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
39,889,164✔
1936
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
39,889,164✔
1937
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
39,889,164✔
1938

1939
  tstrncpy(*refDb, colref, db_len + 1);
39,889,164✔
1940
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
39,889,164✔
1941
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
39,889,164✔
1942

1943
  return TSDB_CODE_SUCCESS;
39,889,164✔
1944
_return:
×
1945
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1946
  if (*refDb) {
×
1947
    taosMemoryFree(*refDb);
×
1948
    *refDb = NULL;
×
1949
  }
1950
  if (*refTb) {
×
1951
    taosMemoryFree(*refTb);
×
1952
    *refTb = NULL;
×
1953
  }
1954
  if (*refCol) {
×
1955
    taosMemoryFree(*refCol);
×
1956
    *refCol = NULL;
×
1957
  }
1958
  return code;
×
1959
}
1960

1961
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
279,244,550✔
1962
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
279,244,550✔
1963
      strlen(expectTbName) == varDataLen(tbName) &&
175,275,644✔
1964
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
175,275,644✔
1965
      strlen(expectDbName) == varDataLen(dbName)) {
175,275,644✔
1966
    return true;
175,275,644✔
1967
  }
1968
  return false;
103,968,906✔
1969
}
1970

1971
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
175,275,644✔
1972
  int32_t          code = TSDB_CODE_SUCCESS;
175,275,644✔
1973
  int32_t          line = 0;
175,275,644✔
1974

1975
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
175,275,644✔
1976
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
175,275,644✔
1977
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
175,275,644✔
1978
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
175,275,644✔
1979
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
175,275,644✔
1980
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
175,275,644✔
1981

1982
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
175,275,644✔
1983
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
175,275,644✔
1984
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
175,275,644✔
1985
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
175,275,644✔
1986
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
175,275,644✔
1987
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
175,275,644✔
1988

1989
  if (colDataIsNull_s(pRefCol, index)) {
350,551,288✔
1990
    pInfo->colrefName = NULL;
68,904,578✔
1991
  } else {
1992
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
106,371,066✔
1993
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
106,371,066✔
1994
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
106,371,066✔
1995
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
106,371,066✔
1996
  }
1997

1998
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
175,275,644✔
1999
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
175,275,644✔
2000
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
175,275,644✔
2001
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
175,275,644✔
2002

2003
  if (!colDataIsNull_s(pUidCol, index)) {
350,551,288✔
2004
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
175,275,644✔
2005
  }
2006
  if (!colDataIsNull_s(pColIdCol, index)) {
350,551,288✔
2007
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
106,371,066✔
2008
  }
2009
  if (!colDataIsNull_s(pVgIdCol, index)) {
350,551,288✔
2010
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
175,275,644✔
2011
  }
2012

2013
_return:
×
2014
  return code;
175,275,644✔
2015
}
2016

2017
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,449,785✔
2018
  int32_t                    code = TSDB_CODE_SUCCESS;
1,449,785✔
2019
  int32_t                    line = 0;
1,449,785✔
2020

2021
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,449,785✔
2022
    return code;
1,297,698✔
2023
  }
2024

2025
  if (pVtbScan->existOrgTbVg == NULL) {
152,087✔
2026
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2027
    pVtbScan->curOrgTbVg = NULL;
×
2028
  }
2029

2030
  if (pVtbScan->curOrgTbVg != NULL) {
152,087✔
2031
    // which means rversion has changed
2032
    void*   pCurIter = NULL;
9,662✔
2033
    SArray* tmpArray = NULL;
9,662✔
2034
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
27,893✔
2035
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
18,231✔
2036
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
18,231✔
2037
        if (tmpArray == NULL) {
2,299✔
2038
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,299✔
2039
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,299✔
2040
        }
2041
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,299✔
2042
      }
2043
    }
2044
    if (tmpArray == NULL) {
9,662✔
2045
      return TSDB_CODE_SUCCESS;
7,363✔
2046
    }
2047
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,299✔
2048
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,299✔
2049
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,299✔
2050
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2051
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2052
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2053
        }
2054
        taosArrayDestroy(expiredInfo);
×
2055
      }
2056
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,299✔
2057
        taosArrayDestroy(tmpArray);
×
2058
      }
2059
    }
2060
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,299✔
2061
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,299✔
2062
    taosHashClear(pVtbScan->curOrgTbVg);
2,299✔
2063
    pVtbScan->needRedeploy = true;
2,299✔
2064
    pVtbScan->rversion = rversion;
2,299✔
2065
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,299✔
2066
  }
2067
  return code;
142,425✔
2068
_return:
×
2069
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2070
  return code;
×
2071
}
2072

2073
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
24,950✔
2074
  int32_t                    code =TSDB_CODE_SUCCESS;
24,950✔
2075
  int32_t                    line = 0;
24,950✔
2076
  char*                      refDbName = NULL;
24,950✔
2077
  char*                      refTbName = NULL;
24,950✔
2078
  char*                      refColName = NULL;
24,950✔
2079
  SDBVgInfo*                 dbVgInfo = NULL;
24,950✔
2080
  SName                      name = {0};
24,950✔
2081
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
24,950✔
2082
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
24,950✔
2083

2084
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
24,950✔
2085
  QUERY_CHECK_CODE(code, line, _return);
24,950✔
2086

2087
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
24,950✔
2088

2089
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
24,950✔
2090
  QUERY_CHECK_CODE(code, line, _return);
24,950✔
2091

2092
  code = tNameGetFullDbName(&name, dbFname);
24,950✔
2093
  QUERY_CHECK_CODE(code, line, _return);
24,950✔
2094

2095
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
24,950✔
2096
  QUERY_CHECK_CODE(code, line, _return);
24,950✔
2097

2098
_return:
24,950✔
2099
  if (code) {
24,950✔
2100
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2101
  }
2102
  taosMemoryFree(refDbName);
24,950✔
2103
  taosMemoryFree(refTbName);
24,950✔
2104
  taosMemoryFree(refColName);
24,950✔
2105
  return code;
24,950✔
2106
}
2107

2108
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
2,153,888✔
2109
  int32_t code = TSDB_CODE_SUCCESS;
2,153,888✔
2110
  int32_t line = 0;
2,153,888✔
2111
  STagVal tagVal = {0};
2,153,888✔
2112
  // last col is uid
2113

2114
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
2,153,888✔
2115
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
2,153,888✔
2116

2117
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
15,520,768✔
2118
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
13,366,880✔
2119
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
13,366,880✔
2120
    tagVal.type = pTagCol->info.type;
13,366,880✔
2121
    tagVal.cid = pTagCol->info.colId;
13,366,880✔
2122
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
26,733,760✔
2123
      char*   pData = colDataGetData(pTagCol, rowIdx);
13,366,880✔
2124
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
13,366,880✔
2125
        tagVal.nData = varDataLen(pData);
5,903,136✔
2126
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
5,903,136✔
2127
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
5,903,136✔
2128
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
5,903,136✔
2129
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
11,806,272✔
2130
      } else {
2131
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
7,463,744✔
2132
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
14,927,488✔
2133
      }
2134
    } else {
2135
      tagVal.pData = NULL;
×
2136
      tagVal.nData = 0;
×
2137
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2138
    }
2139
    tagVal = (STagVal){0};
13,366,880✔
2140
  }
2141
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
2,153,888✔
2142
  QUERY_CHECK_CODE(code, line, _return);
2,153,888✔
2143

2144
  return code;
2,153,888✔
2145
_return:
×
2146
  if (tagVal.pData) {
×
2147
    taosMemoryFreeClear(tagVal.pData);
×
2148
  }
2149
  if (pTagList) {
×
2150
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2151
  }
2152
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2153
  return code;
×
2154
}
2155

2156
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
9,730,413✔
2157
  int32_t                    code = TSDB_CODE_SUCCESS;
9,730,413✔
2158
  int32_t                    line = 0;
9,730,413✔
2159
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
9,730,413✔
2160
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
9,730,413✔
2161
  SDBVgInfo*                 dbVgInfo = NULL;
9,730,413✔
2162

2163
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
185,211,697✔
2164
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
175,481,284✔
2165
    *uid = pKV->uid;
175,481,284✔
2166
    *vgId = pKV->vgId;
175,481,284✔
2167
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
175,481,284✔
2168
      char*   refDbName = NULL;
39,864,214✔
2169
      char*   refTbName = NULL;
39,864,214✔
2170
      char*   refColName = NULL;
39,864,214✔
2171
      SName   name = {0};
39,864,214✔
2172
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
39,864,214✔
2173
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
39,864,214✔
2174

2175
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
39,864,214✔
2176
      QUERY_CHECK_CODE(code, line, _return);
39,864,214✔
2177

2178
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
39,864,214✔
2179

2180
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
39,864,214✔
2181
      QUERY_CHECK_CODE(code, line, _return);
39,864,214✔
2182
      code = tNameGetFullDbName(&name, dbFname);
39,864,214✔
2183
      QUERY_CHECK_CODE(code, line, _return);
39,864,214✔
2184
      code = tNameGetFullTableName(&name, orgTbFName);
39,864,214✔
2185
      QUERY_CHECK_CODE(code, line, _return);
39,864,214✔
2186

2187
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
39,864,214✔
2188
      if (!pVal) {
39,864,214✔
2189
        SOrgTbInfo orgTbInfo = {0};
13,684,666✔
2190
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
13,684,666✔
2191
        QUERY_CHECK_CODE(code, line, _return);
13,684,666✔
2192
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
13,684,666✔
2193
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
13,684,666✔
2194
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
13,684,666✔
2195
        SColIdNameKV colIdNameKV = {0};
13,684,666✔
2196
        colIdNameKV.colId = pKV->colId;
13,684,666✔
2197
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
13,684,666✔
2198
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
27,369,332✔
2199
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
13,684,666✔
2200
        QUERY_CHECK_CODE(code, line, _return);
13,684,666✔
2201
      } else {
2202
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
26,179,548✔
2203
        SColIdNameKV colIdNameKV = {0};
26,179,548✔
2204
        colIdNameKV.colId = pKV->colId;
26,179,548✔
2205
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
26,179,548✔
2206
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
52,359,096✔
2207
      }
2208
      taosMemoryFree(refDbName);
39,864,214✔
2209
      taosMemoryFree(refTbName);
39,864,214✔
2210
      taosMemoryFree(refColName);
39,864,214✔
2211
    }
2212
  }
2213

2214
_return:
9,730,413✔
2215
  if (code) {
9,730,413✔
2216
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2217
  }
2218
  return code;
9,730,413✔
2219
}
2220

2221
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
241,832✔
2222
  int32_t                    code = TSDB_CODE_SUCCESS;
241,832✔
2223
  int32_t                    line = 0;
241,832✔
2224
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
241,832✔
2225
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
241,832✔
2226
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
241,832✔
2227
  SArray*                    pColRefArray = NULL;
241,832✔
2228
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
241,832✔
2229
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
241,832✔
2230

2231
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
241,832✔
2232
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
241,832✔
2233
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
241,832✔
2234
  if (hasPartition) {
241,832✔
2235
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
197,760✔
2236
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
197,760✔
2237
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
197,760✔
2238
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
197,760✔
2239
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
197,760✔
2240
  }
2241

2242
  while (true) {
907,200✔
2243
    SSDataBlock *pTagVal = NULL;
1,149,032✔
2244
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
1,149,032✔
2245
    QUERY_CHECK_CODE(code, line, _return);
1,149,032✔
2246
    if (pTagVal == NULL) {
1,149,032✔
2247
      break;
241,832✔
2248
    }
2249
    SHashObj *vtbUidTagListMap = NULL;
907,200✔
2250
    if (hasPartition) {
907,200✔
2251
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
819,056✔
2252
      if (pIter) {
819,056✔
2253
        vtbUidTagListMap = *(SHashObj**)pIter;
8,240✔
2254
      } else {
2255
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
810,816✔
2256
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
810,816✔
2257
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
810,816✔
2258

2259
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
810,816✔
2260
        QUERY_CHECK_CODE(code, line, _return);
810,816✔
2261
      }
2262
    } else {
2263
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
88,144✔
2264
    }
2265

2266
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
907,200✔
2267
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
907,200✔
2268
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
3,061,088✔
2269
      tb_uid_t uid = 0;
2,153,888✔
2270
      if (!colDataIsNull_s(pUidCol, i)) {
4,307,776✔
2271
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
2,153,888✔
2272
        QUERY_CHECK_CODE(code, line, _return);
2,153,888✔
2273
      }
2274

2275
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
2,153,888✔
2276
      QUERY_CHECK_CODE(code, line, _return);
2,153,888✔
2277

2278
      if (hasPartition) {
2,153,888✔
2279
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
1,779,840✔
2280
        QUERY_CHECK_CODE(code, line, _return);
1,779,840✔
2281
      }
2282
    }
2283
  }
2284

2285
  return code;
241,832✔
2286

2287
_return:
×
2288
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2289
  return code;
×
2290
}
2291

2292
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
241,832✔
2293
  int32_t                    code = TSDB_CODE_SUCCESS;
241,832✔
2294
  int32_t                    line = 0;
241,832✔
2295
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
241,832✔
2296
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
241,832✔
2297
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
241,832✔
2298
  SArray*                    pColRefArray = NULL;
241,832✔
2299
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
241,832✔
2300
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
241,832✔
2301

2302
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
241,832✔
2303
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
241,832✔
2304

2305
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
1,868,360✔
2306
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,626,528✔
2307
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,626,528✔
2308

2309
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
1,626,528✔
2310
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
1,626,528✔
2311

2312
    tb_uid_t uid = 0;
1,626,528✔
2313
    int32_t  vgId = 0;
1,626,528✔
2314
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
1,626,528✔
2315
    QUERY_CHECK_CODE(code, line, _return);
1,626,528✔
2316

2317
    size_t len = 0;
1,626,528✔
2318
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
1,626,528✔
2319
    while (pOrgTbInfo != NULL) {
3,895,304✔
2320
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
2,268,776✔
2321
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
2,268,776✔
2322

2323
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
2,268,776✔
2324
      if (!pIter) {
2,268,776✔
2325
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
1,875,056✔
2326
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
1,875,056✔
2327
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
3,750,112✔
2328
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
1,875,056✔
2329
        QUERY_CHECK_CODE(code, line, _return);
1,875,056✔
2330
      } else {
2331
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
393,720✔
2332
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
393,720✔
2333
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
393,720✔
2334
      }
2335

2336
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
2,268,776✔
2337

2338
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
2,268,776✔
2339
      QUERY_CHECK_CODE(code, line, _return);
2,268,776✔
2340
    }
2341

2342
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,626,528✔
2343
    QUERY_CHECK_CODE(code, line, _return);
1,626,528✔
2344
  }
2345

2346
  return code;
241,832✔
2347
_return:
×
2348
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2349
  return code;
×
2350
}
2351

2352
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
241,832✔
2353
  int32_t                    code = TSDB_CODE_SUCCESS;
241,832✔
2354
  int32_t                    line = 0;
241,832✔
2355

2356
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
241,832✔
2357
  QUERY_CHECK_CODE(code, line, _return);
241,832✔
2358

2359
  // process tag
2360
  code = getTagBlockAndProcess(pOperator, hasPartition);
241,832✔
2361
  QUERY_CHECK_CODE(code, line, _return);
241,832✔
2362

2363
  return code;
241,832✔
2364
_return:
×
2365
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2366
  return code;
×
2367
}
2368

2369
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
424,902✔
2370
  int32_t                    code = TSDB_CODE_SUCCESS;
424,902✔
2371
  int32_t                    line = 0;
424,902✔
2372
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
424,902✔
2373
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
424,902✔
2374
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
424,902✔
2375
  SArray*                    pColRefArray = NULL;
424,902✔
2376
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
424,902✔
2377
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
424,902✔
2378

2379
  if (hasPartition) {
424,902✔
2380
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
345,728✔
2381
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
345,728✔
2382
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
345,728✔
2383

2384
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
345,728✔
2385
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
345,728✔
2386
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
345,728✔
2387
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
345,728✔
2388
  } else {
2389
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
79,174✔
2390
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
79,174✔
2391
  }
2392

2393
  while (true && hasPartition) {
1,859,726✔
2394
    SSDataBlock* pTagVal = NULL;
1,780,552✔
2395
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
1,780,552✔
2396
    QUERY_CHECK_CODE(code, line, _return);
1,780,552✔
2397
    if (pTagVal == NULL) {
1,780,552✔
2398
      break;
345,728✔
2399
    }
2400

2401
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,434,824✔
2402
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,434,824✔
2403
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
4,523,416✔
2404
      tb_uid_t uid = 0;
3,088,592✔
2405
      if (!colDataIsNull_s(pUidCol, i)) {
6,177,184✔
2406
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
3,088,592✔
2407
        QUERY_CHECK_CODE(code, line, _return);
3,088,592✔
2408
      }
2409
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
3,088,592✔
2410
      QUERY_CHECK_CODE(code, line, _return);
3,088,592✔
2411
    }
2412
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
1,434,824✔
2413
    QUERY_CHECK_CODE(code, line, _return);
1,434,824✔
2414
  }
2415

2416
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
3,504,710✔
2417
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
3,079,808✔
2418
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
3,079,808✔
2419
    tb_uid_t uid = 0;
3,079,808✔
2420
    int32_t  vgId = 0;
3,079,808✔
2421
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
3,079,808✔
2422
    QUERY_CHECK_CODE(code, line, _return);
3,079,808✔
2423

2424
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
3,079,808✔
2425
    if (hasPartition) {
3,079,808✔
2426
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
2,561,232✔
2427
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
2,561,232✔
2428

2429
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
2,561,232✔
2430
      if (pHashIter) {
2,561,232✔
2431
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
1,324,168✔
2432
      } else {
2433
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,237,064✔
2434
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,237,064✔
2435
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,237,064✔
2436
        QUERY_CHECK_CODE(code, line, _return);
1,237,064✔
2437
      }
2438
    } else {
2439
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
518,576✔
2440
    }
2441

2442
    size_t len = 0;
3,079,808✔
2443
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
3,079,808✔
2444
    while (pOrgTbInfo != NULL) {
6,749,650✔
2445
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
3,669,842✔
2446
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
3,669,842✔
2447
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
3,669,842✔
2448
      if (!pIter) {
3,669,842✔
2449
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
1,877,534✔
2450
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
1,877,534✔
2451
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
3,755,068✔
2452
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
1,877,534✔
2453
        QUERY_CHECK_CODE(code, line, _return);
1,877,534✔
2454
      } else {
2455
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
1,792,308✔
2456
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
1,792,308✔
2457
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
1,792,308✔
2458
      }
2459

2460
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
3,669,842✔
2461

2462
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
3,669,842✔
2463
      QUERY_CHECK_CODE(code, line, _return);
3,669,842✔
2464
    }
2465
  }
2466
  return code;
424,902✔
2467
_return:
×
2468
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2469
  return code;
×
2470
}
2471

2472
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
1,969,914✔
2473
  int32_t                    code = TSDB_CODE_SUCCESS;
1,969,914✔
2474
  int32_t                    line = 0;
1,969,914✔
2475
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,969,914✔
2476
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,969,914✔
2477
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,969,914✔
2478
  SArray*                    pColRefArray = NULL;
1,969,914✔
2479
  SOperatorInfo*             pSystableScanOp = NULL;
1,969,914✔
2480
  
2481
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,969,914✔
2482
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
1,969,914✔
2483

2484
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
1,969,914✔
2485
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
666,734✔
2486
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
666,734✔
2487
    pSystableScanOp = pOperator->pDownstream[0];
666,734✔
2488
  } else {
2489
    pSystableScanOp = pOperator->pDownstream[1];
1,303,180✔
2490
  }
2491

2492
  while (true) {
3,942,088✔
2493
    SSDataBlock *pChildInfo = NULL;
5,912,002✔
2494
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
5,912,002✔
2495
    QUERY_CHECK_CODE(code, line, _return);
5,912,002✔
2496
    if (pChildInfo == NULL) {
5,912,002✔
2497
      break;
1,969,914✔
2498
    }
2499
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
3,942,088✔
2500
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
3,942,088✔
2501
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
3,942,088✔
2502

2503
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
3,942,088✔
2504
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
3,942,088✔
2505
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
3,942,088✔
2506

2507
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
272,621,034✔
2508
      if (!colDataIsNull_s(pStbNameCol, i)) {
537,357,892✔
2509
        char* stbrawname = colDataGetData(pStbNameCol, i);
268,678,946✔
2510
        char* dbrawname = colDataGetData(pDbNameCol, i);
268,678,946✔
2511
        char *ctbName = colDataGetData(pTableNameCol, i);
268,678,946✔
2512

2513
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
268,678,946✔
2514
          SColRefInfo info = {0};
174,546,266✔
2515
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
174,546,266✔
2516
          QUERY_CHECK_CODE(code, line, _return);
174,546,266✔
2517

2518
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
174,546,266✔
2519
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
99,244,890✔
2520
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2521
                     pInfo->vtbScan.dynTbUid);
2522
              destroyColRefInfo(&info);
×
2523
              continue;
×
2524
            }
2525

2526
            if (pTaskInfo->pStreamRuntimeInfo) {
99,244,890✔
2527
              if (pVtbScan->curOrgTbVg == NULL) {
33,824✔
2528
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,278✔
2529
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,278✔
2530
              }
2531

2532
              if (info.colrefName) {
33,824✔
2533
                int32_t vgId;
18,262✔
2534
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
18,262✔
2535
                QUERY_CHECK_CODE(code, line, _return);
18,262✔
2536
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
18,262✔
2537
                QUERY_CHECK_CODE(code, line, _return);
18,262✔
2538
              }
2539
            }
2540
          }
2541

2542
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
174,546,266✔
2543
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
9,573,150✔
2544
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
9,573,150✔
2545
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
19,146,300✔
2546
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
9,573,150✔
2547
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
19,146,300✔
2548
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
9,573,150✔
2549
            QUERY_CHECK_CODE(code, line, _return);
9,573,150✔
2550
          } else {
2551
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
164,973,116✔
2552
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
164,973,116✔
2553
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
164,973,116✔
2554
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
164,973,116✔
2555
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
329,946,232✔
2556
          }
2557
        }
2558
      }
2559
    }
2560
  }
2561

2562
  switch (pInfo->qType) {
1,969,914✔
2563
    case DYN_QTYPE_VTB_AGG: {
666,734✔
2564
      if (pVtbScan->batchProcessChild) {
666,734✔
2565
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
424,902✔
2566
      } else {
2567
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
241,832✔
2568
      }
2569
      break;
666,734✔
2570
    }
2571
    case DYN_QTYPE_VTB_SCAN: {
1,303,180✔
2572
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,303,180✔
2573
      break;
1,303,180✔
2574
    }
2575
    default: {
×
2576
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2577
      break;
×
2578
    }
2579
  }
2580

2581
  QUERY_CHECK_CODE(code, line, _return);
1,969,914✔
2582

2583
_return:
1,969,914✔
2584
  if (code) {
1,969,914✔
2585
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,045✔
2586
  }
2587
  return code;
1,969,914✔
2588
}
2589

2590
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
146,605✔
2591
  int32_t                    code = TSDB_CODE_SUCCESS;
146,605✔
2592
  int32_t                    line = 0;
146,605✔
2593
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
146,605✔
2594
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
146,605✔
2595
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
146,605✔
2596
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
146,605✔
2597
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
146,605✔
2598
  int32_t                    rversion = 0;
146,605✔
2599

2600
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
146,605✔
2601
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
146,605✔
2602

2603
  while (true) {
289,627✔
2604
    SSDataBlock *pTableInfo = NULL;
436,232✔
2605
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
436,232✔
2606
    if (pTableInfo == NULL) {
436,232✔
2607
      break;
146,605✔
2608
    }
2609

2610
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
289,627✔
2611
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
289,627✔
2612
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
289,627✔
2613

2614
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
289,627✔
2615
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
289,627✔
2616
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
289,627✔
2617

2618
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
10,855,231✔
2619
      if (!colDataIsNull_s(pRefVerCol, i)) {
21,131,208✔
2620
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
10,565,604✔
2621
      }
2622

2623
      if (!colDataIsNull_s(pTableNameCol, i)) {
21,131,208✔
2624
        char* tbrawname = colDataGetData(pTableNameCol, i);
10,565,604✔
2625
        char* dbrawname = colDataGetData(pDbNameCol, i);
10,565,604✔
2626
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
10,565,604✔
2627
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
10,565,604✔
2628

2629
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
10,565,604✔
2630
          SColRefInfo info = {0};
729,378✔
2631
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
729,378✔
2632
          QUERY_CHECK_CODE(code, line, _return);
729,378✔
2633

2634
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
729,378✔
2635
            if (pVtbScan->curOrgTbVg == NULL) {
6,688✔
2636
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
418✔
2637
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
418✔
2638
            }
2639
            int32_t vgId;
6,688✔
2640
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,688✔
2641
            QUERY_CHECK_CODE(code, line, _return);
6,688✔
2642
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,688✔
2643
            QUERY_CHECK_CODE(code, line, _return);
6,688✔
2644
          }
2645

2646
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,458,756✔
2647
        }
2648
      }
2649
    }
2650
  }
2651
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
146,605✔
2652
  QUERY_CHECK_CODE(code, line, _return);
146,605✔
2653

2654
_return:
145,351✔
2655
  if (code) {
146,605✔
2656
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,254✔
2657
  }
2658
  return code;
146,605✔
2659
}
2660

2661
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,657,226✔
2662
  int32_t                    code = TSDB_CODE_SUCCESS;
2,657,226✔
2663
  int32_t                    line = 0;
2,657,226✔
2664
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,657,226✔
2665
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,657,226✔
2666
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,657,226✔
2667

2668
  SArray *tmpArray = NULL;
2,657,226✔
2669
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,657,226✔
2670
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,657,226✔
2671
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,598✔
2672
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,299✔
2673
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,299✔
2674
      QUERY_CHECK_CODE(code, line, _return);
2,299✔
2675
      if (pVtbScan->newAddedVgInfo == NULL) {
2,299✔
2676
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
836✔
2677
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
836✔
2678
      }
2679
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,299✔
2680
      QUERY_CHECK_CODE(code, line, _return);
2,299✔
2681
    }
2682
    pVtbScan->needRedeploy = false;
2,299✔
2683
  } else {
2684
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,654,927✔
2685
    QUERY_CHECK_CODE(code, line, _return);
2,654,927✔
2686
  }
2687

2688
_return:
×
2689
  taosArrayClear(tmpArray);
2,657,226✔
2690
  taosArrayDestroy(tmpArray);
2,657,226✔
2691
  if (code) {
2,657,226✔
2692
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,654,927✔
2693
  }
2694
  return code;
2,657,226✔
2695
}
2696

2697
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
5,024,077✔
2698
  int32_t                    code = TSDB_CODE_SUCCESS;
5,024,077✔
2699
  int32_t                    line = 0;
5,024,077✔
2700
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
5,024,077✔
2701
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
5,024,077✔
2702
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
5,024,077✔
2703

2704
  pVtbScan->vtbScanParam = NULL;
5,024,077✔
2705
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
5,024,077✔
2706
  QUERY_CHECK_CODE(code, line, _return);
5,024,077✔
2707

2708
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
5,024,077✔
2709
  while (pIter != NULL) {
12,770,125✔
2710
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
7,746,048✔
2711
    SOperatorParam*  pExchangeParam = NULL;
7,746,048✔
2712
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
7,746,048✔
2713
    if (addr != NULL) {
7,746,048✔
2714
      SDownstreamSourceNode newSource = {0};
2,299✔
2715
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,299✔
2716
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,299✔
2717
      newSource.taskId = addr->taskId;
2,299✔
2718
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,299✔
2719
      newSource.localExec = false;
2,299✔
2720
      newSource.addr.nodeId = addr->nodeId;
2,299✔
2721
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,299✔
2722

2723
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,299✔
2724
      QUERY_CHECK_CODE(code, line, _return);
2,299✔
2725
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,299✔
2726
      QUERY_CHECK_CODE(code, line, _return);
2,299✔
2727
    } else {
2728
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
7,743,749✔
2729
      QUERY_CHECK_CODE(code, line, _return);
7,743,749✔
2730
    }
2731
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
15,492,096✔
2732
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
7,746,048✔
2733
  }
2734

2735
  SOperatorParam*  pExchangeParam = NULL;
5,024,077✔
2736
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
5,024,077✔
2737
  QUERY_CHECK_CODE(code, line, _return);
5,024,077✔
2738
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
5,024,077✔
2739

2740
_return:
5,024,077✔
2741
  if (code) {
5,024,077✔
2742
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2743
  }
2744
  return code;
5,024,077✔
2745
}
2746

2747
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
14,162,874✔
2748
  int32_t                    code = TSDB_CODE_SUCCESS;
14,162,874✔
2749
  int32_t                    line = 0;
14,162,874✔
2750
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
14,162,874✔
2751
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
14,162,874✔
2752
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
14,162,874✔
2753

2754
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
14,162,874✔
2755
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
14,162,874✔
2756
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
14,162,874✔
2757

2758
  while (true) {
2759
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
17,776,317✔
2760
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
12,752,240✔
2761
      QUERY_CHECK_CODE(code, line, _return);
12,752,240✔
2762
    } else {
2763
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
5,024,077✔
2764
      SArray* pColRefInfo = NULL;
5,024,077✔
2765
      if (pVtbScan->isSuperTable) {
5,024,077✔
2766
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
4,878,726✔
2767
      } else {
2768
        pColRefInfo = pInfo->vtbScan.colRefInfo;
145,351✔
2769
      }
2770
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
5,024,077✔
2771

2772
      tb_uid_t uid = 0;
5,024,077✔
2773
      int32_t  vgId = 0;
5,024,077✔
2774
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
5,024,077✔
2775
      QUERY_CHECK_CODE(code, line, _return);
5,024,077✔
2776

2777
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
5,024,077✔
2778
      QUERY_CHECK_CODE(code, line, _return);
5,024,077✔
2779

2780
      // reset downstream operator's status
2781
      pVtbScanOp->status = OP_NOT_OPENED;
5,024,077✔
2782
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
5,024,077✔
2783
      QUERY_CHECK_CODE(code, line, _return);
5,023,454✔
2784
    }
2785

2786
    if (*pRes) {
17,775,694✔
2787
      // has result, still read data from this table.
2788
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
12,756,760✔
2789
      break;
12,756,760✔
2790
    } else {
2791
      // no result, read next table.
2792
      pVtbScan->curTableIdx++;
5,018,934✔
2793
      if (pVtbScan->isSuperTable) {
5,018,934✔
2794
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
4,873,583✔
2795
          setOperatorCompleted(pOperator);
1,260,140✔
2796
          break;
1,260,140✔
2797
        }
2798
      } else {
2799
        setOperatorCompleted(pOperator);
145,351✔
2800
        break;
145,351✔
2801
      }
2802
    }
2803
  }
2804

2805
_return:
14,162,251✔
2806
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
14,162,251✔
2807
  pVtbScan->otbNameToOtbInfoMap = NULL;
14,162,251✔
2808
  if (code) {
14,162,251✔
2809
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2810
  }
2811
  return code;
14,162,251✔
2812
}
2813

2814
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
14,205,421✔
2815
  int32_t                    code = TSDB_CODE_SUCCESS;
14,205,421✔
2816
  int32_t                    line = 0;
14,205,421✔
2817
  int64_t                    st = 0;
14,205,421✔
2818
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
14,205,421✔
2819
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
14,205,421✔
2820

2821
  if (OPTR_IS_OPENED(pOperator)) {
14,205,421✔
2822
    return code;
12,755,636✔
2823
  }
2824

2825
  if (pOperator->cost.openCost == 0) {
1,449,785✔
2826
    st = taosGetTimestampUs();
1,329,119✔
2827
  }
2828

2829
  if (pVtbScan->isSuperTable) {
1,449,785✔
2830
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,303,180✔
2831
    QUERY_CHECK_CODE(code, line, _return);
1,303,180✔
2832
  } else {
2833
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
146,605✔
2834
    QUERY_CHECK_CODE(code, line, _return);
146,605✔
2835
  }
2836

2837
  OPTR_SET_OPENED(pOperator);
1,447,486✔
2838

2839
_return:
1,449,785✔
2840
  if (pOperator->cost.openCost == 0) {
1,449,785✔
2841
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,329,119✔
2842
  }
2843
  if (code) {
1,449,785✔
2844
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,299✔
2845
    pOperator->pTaskInfo->code = code;
2,299✔
2846
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,299✔
2847
  }
2848
  return code;
1,447,486✔
2849
}
2850

2851
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
16,860,348✔
2852
  int32_t                    code = TSDB_CODE_SUCCESS;
16,860,348✔
2853
  int32_t                    line = 0;
16,860,348✔
2854
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
16,860,348✔
2855
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
16,860,348✔
2856

2857
  QRY_PARAM_CHECK(pRes);
16,860,348✔
2858
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
16,860,348✔
2859
    return code;
×
2860
  }
2861
  if (pOperator->pOperatorGetParam) {
16,860,348✔
2862
    if (pOperator->status == OP_EXEC_DONE) {
49,808✔
2863
      pOperator->status = OP_OPENED;
3,396✔
2864
    }
2865
    pVtbScan->curTableIdx = 0;
49,808✔
2866
    pVtbScan->lastTableIdx = -1;
49,808✔
2867
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
49,808✔
2868
    pOperator->pOperatorGetParam = NULL;
49,808✔
2869
  } else {
2870
    pVtbScan->window.skey = INT64_MAX;
16,810,540✔
2871
    pVtbScan->window.ekey = INT64_MIN;
16,810,540✔
2872
  }
2873

2874
  if (pVtbScan->needRedeploy) {
16,860,348✔
2875
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,657,226✔
2876
    QUERY_CHECK_CODE(code, line, _return);
2,657,226✔
2877
  }
2878

2879
  code = pOperator->fpSet._openFn(pOperator);
14,205,421✔
2880
  QUERY_CHECK_CODE(code, line, _return);
14,203,122✔
2881

2882
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
14,203,122✔
2883
    setOperatorCompleted(pOperator);
40,248✔
2884
    return code;
40,248✔
2885
  }
2886

2887
  code = virtualTableScanGetNext(pOperator, pRes);
14,162,874✔
2888
  QUERY_CHECK_CODE(code, line, _return);
14,162,251✔
2889

2890
  return code;
14,162,251✔
2891

2892
_return:
2,654,927✔
2893
  if (code) {
2,654,927✔
2894
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,654,927✔
2895
    pOperator->pTaskInfo->code = code;
2,654,718✔
2896
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,654,718✔
2897
  }
2898
  return code;
×
2899
}
2900

2901
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,143,362✔
2902
  if (batchFetch) {
1,143,362✔
2903
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,142,264✔
2904
    if (NULL == pPrev->leftHash) {
1,142,264✔
2905
      return terrno;
×
2906
    }
2907
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,142,264✔
2908
    if (NULL == pPrev->rightHash) {
1,142,264✔
2909
      return terrno;
×
2910
    }
2911
  } else {
2912
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,098✔
2913
    if (NULL == pPrev->leftCache) {
1,098✔
2914
      return terrno;
×
2915
    }
2916
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,098✔
2917
    if (NULL == pPrev->rightCache) {
1,098✔
2918
      return terrno;
×
2919
    }
2920
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,098✔
2921
    if (NULL == pPrev->onceTable) {
1,098✔
2922
      return terrno;
×
2923
    }
2924
  }
2925

2926
  return TSDB_CODE_SUCCESS;
1,143,362✔
2927
}
2928

2929
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2930
  if (pStreamRuntimeInfo == NULL) {
×
2931
    return;
×
2932
  }
2933

2934
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2935
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2936
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2937
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2938
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2939

2940
      pVtbScan->dynTbUid = pValue->uid;
×
2941
      break;
×
2942
    }
2943
  }
2944
}
2945

2946
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
1,995,853✔
2947
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2948
  int32_t      code = TSDB_CODE_SUCCESS;
1,995,853✔
2949
  int32_t      line = 0;
1,995,853✔
2950

2951
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
1,995,853✔
2952
  QUERY_CHECK_CODE(code, line, _return);
1,995,853✔
2953

2954
  pInfo->vtbScan.genNewParam = true;
1,995,853✔
2955
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
1,995,853✔
2956
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
1,995,853✔
2957
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
1,995,853✔
2958
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
1,995,853✔
2959
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
1,995,853✔
2960
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
1,995,853✔
2961
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
1,995,853✔
2962
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
1,995,853✔
2963
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
1,995,853✔
2964
  pInfo->vtbScan.needRedeploy = false;
1,995,853✔
2965
  pInfo->vtbScan.pMsgCb = pMsgCb;
1,995,853✔
2966
  pInfo->vtbScan.curTableIdx = 0;
1,995,853✔
2967
  pInfo->vtbScan.lastTableIdx = -1;
1,995,853✔
2968
  pInfo->vtbScan.dynTbUid = 0;
1,995,853✔
2969
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
1,995,853✔
2970
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
1,995,853✔
2971
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
1,995,853✔
2972
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
1,995,853✔
2973
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,995,853✔
2974
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
1,995,853✔
2975
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
7,169,490✔
2976
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
5,173,637✔
2977
    int32_t vgId = (int32_t)valueNode->datum.i;
5,173,637✔
2978
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
5,173,637✔
2979
    QUERY_CHECK_CODE(code, line, _return);
5,173,637✔
2980
  }
2981

2982
  if (pPhyciNode->dynTbname && pTaskInfo) {
1,995,853✔
2983
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2984
  }
2985

2986
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
1,995,853✔
2987
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
1,995,853✔
2988

2989
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
18,218,228✔
2990
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
16,222,375✔
2991
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
16,222,375✔
2992
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
32,444,750✔
2993
  }
2994

2995
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
1,995,853✔
2996
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
1,995,853✔
2997

2998
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,995,853✔
2999
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
1,995,853✔
3000

3001
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
1,995,853✔
3002
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
1,995,853✔
3003
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
1,995,853✔
3004
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
1,995,853✔
3005
  pInfo->vtbScan.vtbUidTagListMap = NULL;
1,995,853✔
3006
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
1,995,853✔
3007
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
1,995,853✔
3008

3009
  return code;
1,995,853✔
3010
_return:
×
3011
  // no need to destroy array and hashmap allocated in this function,
3012
  // since the operator's destroy function will take care of it
3013
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3014
  return code;
×
3015
}
3016

3017
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
185,156✔
3018
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3019
  int32_t              code = TSDB_CODE_SUCCESS;
185,156✔
3020
  int32_t              line = 0;
185,156✔
3021
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
185,156✔
3022

3023
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
185,156✔
3024
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
185,156✔
3025
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
185,156✔
3026
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
185,156✔
3027
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
185,156✔
3028
  pInfo->vtbWindow.singleWinMode = pPhyciNode->vtbWindow.singleWinMode;
185,156✔
3029
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
185,156✔
3030

3031
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
185,156✔
3032
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
185,156✔
3033

3034
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
185,156✔
3035
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
185,156✔
3036

3037
  pInfo->vtbWindow.outputWstartSlotId = -1;
185,156✔
3038
  pInfo->vtbWindow.outputWendSlotId = -1;
185,156✔
3039
  pInfo->vtbWindow.outputWdurationSlotId = -1;
185,156✔
3040
  pInfo->vtbWindow.curWinBatchIdx = 0;
185,156✔
3041

3042
  initResultSizeInfo(&pOperator->resultInfo, 1);
185,156✔
3043
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
185,156✔
3044
  QUERY_CHECK_CODE(code, line, _return);
185,156✔
3045

3046
  return code;
185,156✔
3047
_return:
×
3048
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3049
  return code;
×
3050
}
3051

3052
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
926,272✔
3053
  int32_t code = TSDB_CODE_SUCCESS;
926,272✔
3054
  int32_t lino = 0;
926,272✔
3055

3056
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
926,272✔
3057
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
926,272✔
3058
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
926,272✔
3059

3060
    *ppTsCols = (int64_t*)pColDataInfo->pData;
926,272✔
3061

3062
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
926,272✔
3063
      code = blockDataUpdateTsWindow(pBlock, slotId);
90,368✔
3064
      QUERY_CHECK_CODE(code, lino, _return);
90,368✔
3065
    }
3066
  }
3067

3068
  return code;
926,272✔
3069
_return:
×
3070
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3071
  return code;
×
3072
}
3073

3074
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
15,503,976✔
3075
  int32_t                    code = TSDB_CODE_SUCCESS;
15,503,976✔
3076
  int32_t                    lino = 0;
15,503,976✔
3077
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
15,503,976✔
3078
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
15,503,976✔
3079
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
15,503,976✔
3080
  int64_t                    st = 0;
15,503,976✔
3081

3082
  if (OPTR_IS_OPENED(pOperator)) {
15,503,976✔
3083
    return code;
15,318,820✔
3084
  }
3085

3086
  if (pOperator->cost.openCost == 0) {
185,156✔
3087
    st = taosGetTimestampUs();
185,156✔
3088
  }
3089

3090
  while (1) {
463,136✔
3091
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
648,292✔
3092
    if (pBlock == NULL) {
648,292✔
3093
      break;
185,156✔
3094
    }
3095

3096
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
463,136✔
3097
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
1,336,736✔
3098
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
1,151,580✔
3099
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
1,151,580✔
3100
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
302,572✔
3101
            pInfo->outputWstartSlotId = i;
112,900✔
3102
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
189,672✔
3103
            pInfo->outputWendSlotId = i;
112,900✔
3104
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
76,772✔
3105
            pInfo->outputWdurationSlotId = i;
76,772✔
3106
          }
3107
        }
3108
      }
3109
    }
3110

3111
    TSKEY* wstartCol = NULL;
463,136✔
3112
    TSKEY* wendCol = NULL;
463,136✔
3113

3114
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
463,136✔
3115
    QUERY_CHECK_CODE(code, lino, _return);
463,136✔
3116
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
463,136✔
3117
    QUERY_CHECK_CODE(code, lino, _return);
463,136✔
3118

3119
    if (pDynInfo->vtbWindow.singleWinMode) {
463,136✔
3120
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
353,172,852✔
3121
        SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
353,034,108✔
3122
        QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
353,034,108✔
3123

3124
        QUERY_CHECK_NULL(taosArrayReserve(pWin, 1), code, lino, _return, terrno);
353,034,108✔
3125

3126
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, 0);
353,034,108✔
3127
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
353,034,108✔
3128
        pWindow->tw.skey = wstartCol[i];
353,034,108✔
3129
        pWindow->tw.ekey = wendCol[i] + 1;
353,034,108✔
3130
        pWindow->winOutIdx = -1;
353,034,108✔
3131

3132
        QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
706,068,216✔
3133
      }
3134
    } else {
3135
      SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
324,392✔
3136
      QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
324,392✔
3137

3138
      QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
324,392✔
3139

3140
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
821,284,284✔
3141
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
820,959,892✔
3142
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
820,959,892✔
3143
        pWindow->tw.skey = wstartCol[i];
820,959,892✔
3144
        pWindow->tw.ekey = wendCol[i] + 1;
820,959,892✔
3145
        pWindow->winOutIdx = -1;
820,959,892✔
3146
      }
3147

3148
      QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
648,784✔
3149
    }
3150
  }
3151

3152
  // handle first window's start key and last window's end key
3153
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
185,156✔
3154
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
185,156✔
3155

3156
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
185,156✔
3157
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
185,156✔
3158

3159
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
185,156✔
3160
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
185,156✔
3161

3162
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
185,156✔
3163
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
185,156✔
3164

3165
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
185,156✔
3166
    lastWin->tw.ekey = INT64_MAX;
46,248✔
3167
  }
3168
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
185,156✔
3169
    firstWin->tw.skey = INT64_MIN;
69,454✔
3170
  }
3171

3172
  OPTR_SET_OPENED(pOperator);
185,156✔
3173

3174
  if (pOperator->cost.openCost == 0) {
185,156✔
3175
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
185,156✔
3176
  }
3177

3178
_return:
×
3179
  if (code != TSDB_CODE_SUCCESS) {
185,156✔
3180
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3181
    pTaskInfo->code = code;
×
3182
    T_LONG_JMP(pTaskInfo->env, code);
×
3183
  }
3184
  return code;
185,156✔
3185
}
3186

3187
static int32_t buildDynQueryCtrlOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
49,808✔
3188
  int32_t                     code = TSDB_CODE_SUCCESS;
49,808✔
3189
  int32_t                     lino = 0;
49,808✔
3190
  SDynQueryCtrlOperatorParam* pDyn = NULL;
49,808✔
3191

3192
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
49,808✔
3193
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
49,808✔
3194

3195
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
49,808✔
3196
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
49,808✔
3197

3198
  pDyn = taosMemoryMalloc(sizeof(SDynQueryCtrlOperatorParam));
49,808✔
3199
  QUERY_CHECK_NULL(pDyn, code, lino, _return, terrno);
49,808✔
3200

3201
  pDyn->window.skey = skey;
49,808✔
3202
  pDyn->window.ekey = ekey;
49,808✔
3203

3204
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL;
49,808✔
3205
  (*ppRes)->downstreamIdx = 0;
49,808✔
3206
  (*ppRes)->reUse = false;
49,808✔
3207
  (*ppRes)->value = pDyn;
49,808✔
3208

3209
  return code;
49,808✔
3210
_return:
×
3211
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3212
  if (pDyn) {
×
3213
    taosMemoryFree(pDyn);
×
3214
  }
3215
  if (*ppRes) {
×
3216
    if ((*ppRes)->pChildren) {
×
3217
      taosArrayDestroy((*ppRes)->pChildren);
×
3218
    }
3219
    taosMemoryFree(*ppRes);
×
3220
    *ppRes = NULL;
×
3221
  }
3222
  return code;
×
3223
}
3224

3225
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
49,808✔
3226
  int32_t                       code = TSDB_CODE_SUCCESS;
49,808✔
3227
  int32_t                       lino = 0;
49,808✔
3228
  SExternalWindowOperatorParam* pExtWinOp = NULL;
49,808✔
3229

3230
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
49,808✔
3231
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
49,808✔
3232

3233
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
49,808✔
3234
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
49,808✔
3235

3236
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
49,808✔
3237
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
49,808✔
3238

3239
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
49,808✔
3240
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
49,808✔
3241

3242
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
49,808✔
3243
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
49,808✔
3244

3245
  SOperatorParam* pDynQueryCtrlParam = NULL;
49,808✔
3246
  code = buildDynQueryCtrlOperatorParamForExternalWindow(&pDynQueryCtrlParam, 0, firstWin->tw.skey, lastWin->tw.ekey);
49,808✔
3247
  QUERY_CHECK_CODE(code, lino, _return);
49,808✔
3248
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pDynQueryCtrlParam), code, lino, _return, terrno)
99,616✔
3249

3250
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
49,808✔
3251
  (*ppRes)->downstreamIdx = idx;
49,808✔
3252
  (*ppRes)->value = pExtWinOp;
49,808✔
3253
  (*ppRes)->reUse = false;
49,808✔
3254

3255
  return code;
49,808✔
3256
_return:
×
3257
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3258
  if (pExtWinOp) {
×
3259
    if (pExtWinOp->ExtWins) {
×
3260
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3261
    }
3262
    taosMemoryFree(pExtWinOp);
×
3263
  }
3264
  if (*ppRes) {
×
3265
    if ((*ppRes)->pChildren) {
×
3266
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3267
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
3268
        if (pChildParam) {
×
3269
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
3270
          if (pDynParam) {
×
3271
            taosMemoryFree(pDynParam);
×
3272
          }
3273
          taosMemoryFree(pChildParam);
×
3274
        }
3275
      }
3276
      taosArrayDestroy((*ppRes)->pChildren);
×
3277
    }
3278
    taosMemoryFree(*ppRes);
×
3279
    *ppRes = NULL;
×
3280
  }
3281
  return code;
×
3282
}
3283

3284
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
15,503,976✔
3285
  int32_t                    code = TSDB_CODE_SUCCESS;
15,503,976✔
3286
  int32_t                    lino = 0;
15,503,976✔
3287
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
15,503,976✔
3288
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
15,503,976✔
3289
  int64_t                    st = taosGetTimestampUs();
15,503,976✔
3290
  int32_t                    numOfWins = 0;
15,503,976✔
3291
  SOperatorInfo*             mergeOp = NULL;
15,503,976✔
3292
  SOperatorInfo*             extWinOp = NULL;
15,503,976✔
3293
  SOperatorParam*            pMergeParam = NULL;
15,503,976✔
3294
  SOperatorParam*            pExtWinParam = NULL;
15,503,976✔
3295
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
15,503,976✔
3296
  SSDataBlock*               pRes = pInfo->pRes;
15,503,976✔
3297

3298
  code = pOperator->fpSet._openFn(pOperator);
15,503,976✔
3299
  QUERY_CHECK_CODE(code, lino, _return);
15,503,976✔
3300

3301
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
15,503,976✔
3302
    *ppRes = NULL;
4,516✔
3303
    return code;
4,516✔
3304
  }
3305

3306
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
15,499,460✔
3307
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
15,499,460✔
3308

3309
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
15,499,460✔
3310

3311
  if (pInfo->isVstb) {
15,499,460✔
3312
    extWinOp = pOperator->pDownstream[1];
49,808✔
3313
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
49,808✔
3314
    QUERY_CHECK_CODE(code, lino, _return);
49,808✔
3315

3316
    SSDataBlock* pExtWinBlock = NULL;
49,808✔
3317
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
49,808✔
3318
    QUERY_CHECK_CODE(code, lino, _return);
49,808✔
3319
    setOperatorCompleted(extWinOp);
49,808✔
3320

3321
    blockDataCleanup(pRes);
49,808✔
3322
    code = blockDataEnsureCapacity(pRes, numOfWins);
49,808✔
3323
    QUERY_CHECK_CODE(code, lino, _return);
49,808✔
3324

3325
    if (pExtWinBlock) {
49,808✔
3326
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
49,808✔
3327
      QUERY_CHECK_CODE(code, lino, _return);
49,808✔
3328

3329
      if (pInfo->curWinBatchIdx == 0) {
49,808✔
3330
        // first batch, get _wstart from pMergedBlock
3331
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
46,412✔
3332
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
46,412✔
3333

3334
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
46,412✔
3335
      }
3336
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
49,808✔
3337
        // last batch, get _wend from pMergedBlock
3338
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
1,132✔
3339
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
1,132✔
3340

3341
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
1,132✔
3342
      }
3343
    }
3344
  } else {
3345
    mergeOp = pOperator->pDownstream[1];
15,449,652✔
3346
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
15,449,652✔
3347
    QUERY_CHECK_CODE(code, lino, _return);
15,449,652✔
3348

3349
    SSDataBlock* pMergedBlock = NULL;
15,449,652✔
3350
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
15,449,652✔
3351
    QUERY_CHECK_CODE(code, lino, _return);
15,449,652✔
3352

3353
    blockDataCleanup(pRes);
15,449,652✔
3354
    code = blockDataEnsureCapacity(pRes, numOfWins);
15,449,652✔
3355
    QUERY_CHECK_CODE(code, lino, _return);
15,449,652✔
3356

3357
    if (pMergedBlock) {
15,449,652✔
3358
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
15,449,652✔
3359
      QUERY_CHECK_CODE(code, lino, _return);
15,449,652✔
3360

3361
      if (pInfo->curWinBatchIdx == 0) {
15,449,652✔
3362
        // first batch, get _wstart from pMergedBlock
3363
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
138,744✔
3364
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
138,744✔
3365

3366
        firstWin->tw.skey = pMergedBlock->info.window.skey;
138,744✔
3367
      }
3368
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
15,449,652✔
3369
        // last batch, get _wend from pMergedBlock
3370
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
3,384✔
3371
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
3,384✔
3372

3373
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
3,384✔
3374
      }
3375
    }
3376
  }
3377

3378

3379
  if (pInfo->outputWstartSlotId != -1) {
15,499,460✔
3380
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
12,747,076✔
3381
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
12,747,076✔
3382

3383
    for (int32_t i = 0; i < numOfWins; i++) {
253,649,748✔
3384
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
240,902,672✔
3385
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
240,902,672✔
3386
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
240,902,672✔
3387
      QUERY_CHECK_CODE(code, lino, _return);
240,902,672✔
3388
    }
3389
  }
3390
  if (pInfo->outputWendSlotId != -1) {
15,499,460✔
3391
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
12,747,076✔
3392
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
12,747,076✔
3393

3394
    for (int32_t i = 0; i < numOfWins; i++) {
253,649,748✔
3395
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
240,902,672✔
3396
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
240,902,672✔
3397
      TSKEY ekey = pWindow->tw.ekey - 1;
240,902,672✔
3398
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
240,902,672✔
3399
      QUERY_CHECK_CODE(code, lino, _return);
240,902,672✔
3400
    }
3401
  }
3402
  if (pInfo->outputWdurationSlotId != -1) {
15,499,460✔
3403
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
11,370,884✔
3404
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
11,370,884✔
3405

3406
    for (int32_t i = 0; i < numOfWins; i++) {
181,517,332✔
3407
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
170,146,448✔
3408
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
170,146,448✔
3409
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
170,146,448✔
3410
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
170,146,448✔
3411
      QUERY_CHECK_CODE(code, lino, _return);
170,146,448✔
3412
    }
3413
  }
3414

3415
  pRes->info.rows = numOfWins;
15,499,460✔
3416
  *ppRes = pRes;
15,499,460✔
3417
  pInfo->curWinBatchIdx++;
15,499,460✔
3418

3419
  return code;
15,499,460✔
3420

3421
_return:
×
3422
  if (code != TSDB_CODE_SUCCESS) {
×
3423
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3424
    pTaskInfo->code = code;
×
UNCOV
3425
    T_LONG_JMP(pTaskInfo->env, code);
×
3426
  }
UNCOV
3427
  return code;
×
3428
}
3429

3430
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,923,166✔
3431
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
2,923,166✔
3432
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
2,924,420✔
3433
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
2,924,002✔
3434

3435
  pOper->status = OP_NOT_OPENED;
2,924,002✔
3436

3437
  switch (pDyn->qType) {
2,924,002✔
3438
    case DYN_QTYPE_STB_HASH:{
735✔
3439
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
735✔
3440
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
735✔
3441
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
735✔
3442
      
3443
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
735✔
3444
      if (TSDB_CODE_SUCCESS != code) {
735✔
3445
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
UNCOV
3446
        return code;
×
3447
      }
3448
      pStbJoin->ctx.prev.pListHead = NULL;
735✔
3449
      pStbJoin->ctx.prev.joinBuild = false;
735✔
3450
      pStbJoin->ctx.prev.pListTail = NULL;
735✔
3451
      pStbJoin->ctx.prev.tableNum = 0;
735✔
3452

3453
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
735✔
3454
      break; 
735✔
3455
    }
3456
    case DYN_QTYPE_VTB_SCAN: {
2,923,685✔
3457
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,923,685✔
3458
      
3459
      if (pVtbScan->otbNameToOtbInfoMap) {
2,922,640✔
3460
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3461
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
UNCOV
3462
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3463
      }
3464
      if (pVtbScan->pRsp) {
2,923,267✔
3465
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
UNCOV
3466
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3467
      }
3468
      if (pVtbScan->colRefInfo) {
2,922,849✔
3469
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
146,605✔
3470
        pVtbScan->colRefInfo = NULL;
146,605✔
3471
      }
3472
      if (pVtbScan->childTableMap) {
2,923,058✔
3473
        taosHashCleanup(pVtbScan->childTableMap);
5,482✔
3474
        pVtbScan->childTableMap = NULL;
5,482✔
3475
      }
3476
      if (pVtbScan->childTableList) {
2,923,058✔
3477
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,922,431✔
3478
      }
3479
      if (pPhyciNode->dynTbname && pTaskInfo) {
2,924,312✔
UNCOV
3480
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3481
      }
3482
      pVtbScan->curTableIdx = 0;
2,923,476✔
3483
      pVtbScan->lastTableIdx = -1;
2,923,476✔
3484
      break;
2,923,685✔
3485
    }
3486
    case DYN_QTYPE_VTB_WINDOW: {
×
3487
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3488
      if (pVtbWindow->pRes) {
×
3489
        blockDataDestroy(pVtbWindow->pRes);
×
UNCOV
3490
        pVtbWindow->pRes = NULL;
×
3491
      }
3492
      if (pVtbWindow->pWins) {
×
3493
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
UNCOV
3494
        pVtbWindow->pWins = NULL;
×
3495
      }
3496
      pVtbWindow->outputWdurationSlotId = -1;
×
3497
      pVtbWindow->outputWendSlotId = -1;
×
3498
      pVtbWindow->outputWstartSlotId = -1;
×
3499
      pVtbWindow->curWinBatchIdx = 0;
×
UNCOV
3500
      break;
×
3501
    }
3502
    default:
×
3503
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
UNCOV
3504
      break;
×
3505
  }
3506
  return 0;
2,923,793✔
3507
}
3508

3509
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
3,092,053✔
3510
  int32_t                    code = TSDB_CODE_SUCCESS;
3,092,053✔
3511
  int32_t                    line = 0;
3,092,053✔
3512
  int64_t                    st = 0;
3,092,053✔
3513
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,092,053✔
3514

3515
  if (OPTR_IS_OPENED(pOperator)) {
3,092,053✔
3516
    return code;
2,425,319✔
3517
  }
3518

3519
  if (pOperator->cost.openCost == 0) {
666,734✔
3520
    st = taosGetTimestampUs();
666,734✔
3521
  }
3522

3523
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
666,734✔
3524
  QUERY_CHECK_CODE(code, line, _return);
666,734✔
3525
  OPTR_SET_OPENED(pOperator);
666,734✔
3526

3527
_return:
666,734✔
3528
  if (pOperator->cost.openCost == 0) {
666,734✔
3529
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
666,734✔
3530
  }
3531
  if (code) {
666,734✔
3532
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3533
    pOperator->pTaskInfo->code = code;
×
UNCOV
3534
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3535
  }
3536
  return code;
666,734✔
3537
}
3538

3539
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3,092,053✔
3540
  int32_t                    code = TSDB_CODE_SUCCESS;
3,092,053✔
3541
  int32_t                    line = 0;
3,092,053✔
3542
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,092,053✔
3543
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,092,053✔
3544
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
3,092,053✔
3545
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
3,092,053✔
3546
  SOperatorParam*            pAggParam = NULL;
3,092,053✔
3547

3548
  if (pInfo->vtbScan.hasPartition) {
3,092,053✔
3549
    if (pInfo->vtbScan.batchProcessChild) {
2,697,348✔
3550
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,124,176✔
3551
      while (pIter) {
2,655,288✔
3552
        size_t     keyLen = 0;
2,309,560✔
3553
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
2,309,560✔
3554

3555
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
2,309,560✔
3556
        QUERY_CHECK_CODE(code, line, _return);
2,309,560✔
3557

3558
        if (pAggParam) {
2,309,560✔
3559
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
1,909,920✔
3560
          QUERY_CHECK_CODE(code, line, _return);
1,909,920✔
3561
        } else {
3562
          *pRes = NULL;
399,640✔
3563
        }
3564

3565
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
2,309,560✔
3566

3567
        if (*pRes) {
2,309,560✔
3568
          (*pRes)->info.id.groupId = groupid;
778,448✔
3569
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
778,448✔
3570
          QUERY_CHECK_CODE(code, line, _return);
778,448✔
3571
          break;
778,448✔
3572
        }
3573
      }
3574
    } else {
3575
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,573,172✔
3576
      while (pIter) {
2,383,988✔
3577
        size_t     keyLen = 0;
2,186,228✔
3578
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
2,186,228✔
3579
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
2,186,228✔
3580

3581
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
2,186,228✔
3582
        while (pIter2) {
3,966,068✔
3583
          size_t   keyLen2 = 0;
3,155,252✔
3584
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
3,155,252✔
3585
          SArray*  pTagList = *(SArray**)pIter2;
3,155,252✔
3586

3587
          if (pVtbScan->genNewParam) {
3,155,252✔
3588
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
1,779,840✔
3589
            QUERY_CHECK_CODE(code, line, _return);
1,779,840✔
3590
            if (pAggParam) {
1,779,840✔
3591
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
1,357,952✔
3592
              QUERY_CHECK_CODE(code, line, _return);
1,357,952✔
3593
            } else {
3594
              *pRes = NULL;
421,888✔
3595
            }
3596
          } else {
3597
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
1,375,412✔
3598
            QUERY_CHECK_CODE(code, line, _return);
1,375,412✔
3599
          }
3600

3601
          if (*pRes) {
3,155,252✔
3602
            pVtbScan->genNewParam = false;
1,375,412✔
3603
            (*pRes)->info.id.groupId = *groupid;
1,375,412✔
3604
            break;
1,375,412✔
3605
          }
3606
          pVtbScan->genNewParam = true;
1,779,840✔
3607
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
1,779,840✔
3608
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
1,779,840✔
3609
          QUERY_CHECK_CODE(code, line, _return);
1,779,840✔
3610
        }
3611
        if (*pRes) {
2,186,228✔
3612
          break;
1,375,412✔
3613
        }
3614
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
810,816✔
3615
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
810,816✔
3616
        QUERY_CHECK_CODE(code, line, _return);
810,816✔
3617
      }
3618
    }
3619

3620
  } else {
3621
    if (pInfo->vtbScan.batchProcessChild) {
394,705✔
3622
      code = buildAggOperatorParam(pInfo, &pAggParam);
79,174✔
3623
      QUERY_CHECK_CODE(code, line, _return);
79,174✔
3624

3625
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
79,174✔
3626
      QUERY_CHECK_CODE(code, line, _return);
79,174✔
3627
      setOperatorCompleted(pOperator);
79,174✔
3628
    } else {
3629
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
315,531✔
3630
      while (pIter) {
689,579✔
3631
        size_t   keyLen = 0;
645,507✔
3632
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
645,507✔
3633
        SArray*  pTagList = *(SArray**)pIter;
645,507✔
3634

3635
        if (pVtbScan->genNewParam) {
645,507✔
3636
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
374,048✔
3637
          QUERY_CHECK_CODE(code, line, _return);
374,048✔
3638

3639
          if (pAggParam) {
374,048✔
3640
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
268,576✔
3641
            QUERY_CHECK_CODE(code, line, _return);
268,576✔
3642
          } else {
3643
            *pRes = NULL;
105,472✔
3644
          }
3645
        } else {
3646
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
271,459✔
3647
          QUERY_CHECK_CODE(code, line, _return);
271,459✔
3648
        }
3649

3650
        if (*pRes) {
645,507✔
3651
          pVtbScan->genNewParam = false;
271,459✔
3652
          break;
271,459✔
3653
        }
3654
        pVtbScan->genNewParam = true;
374,048✔
3655
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
374,048✔
3656
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
374,048✔
3657
        QUERY_CHECK_CODE(code, line, _return);
374,048✔
3658
      }
3659
    }
3660
  }
3661
_return:
3,092,053✔
3662
  if (code) {
3,092,053✔
UNCOV
3663
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3664
  }
3665
  return code;
3,092,053✔
3666
}
3667

3668
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3,171,227✔
3669
  int32_t                    code = TSDB_CODE_SUCCESS;
3,171,227✔
3670
  int32_t                    line = 0;
3,171,227✔
3671
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,171,227✔
3672
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,171,227✔
3673

3674
  QRY_PARAM_CHECK(pRes);
3,171,227✔
3675
  if (pOperator->status == OP_EXEC_DONE) {
3,171,227✔
3676
    return code;
79,174✔
3677
  }
3678

3679
  code = pOperator->fpSet._openFn(pOperator);
3,092,053✔
3680
  QUERY_CHECK_CODE(code, line, _return);
3,092,053✔
3681

3682
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
3,092,053✔
3683
    setOperatorCompleted(pOperator);
×
UNCOV
3684
    return code;
×
3685
  }
3686

3687
  code = virtualTableAggGetNext(pOperator, pRes);
3,092,053✔
3688
  QUERY_CHECK_CODE(code, line, _return);
3,092,053✔
3689

3690
  return code;
3,092,053✔
3691

3692
_return:
×
3693
  if (code) {
×
3694
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3695
    pOperator->pTaskInfo->code = code;
×
UNCOV
3696
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3697
  }
UNCOV
3698
  return code;
×
3699
}
3700

3701
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,323,636✔
3702
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3703
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3704
  QRY_PARAM_CHECK(pOptrInfo);
3,323,636✔
3705

3706
  int32_t                    code = TSDB_CODE_SUCCESS;
3,323,636✔
3707
  int32_t                    line = 0;
3,323,636✔
3708
  __optr_fn_t                nextFp = NULL;
3,323,636✔
3709
  __optr_open_fn_t           openFp = NULL;
3,323,636✔
3710
  SOperatorInfo*             pOperator = NULL;
3,323,636✔
3711
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
3,323,636✔
3712
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
3,323,636✔
3713

3714
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,323,636✔
3715
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
3,323,636✔
3716

3717
  pOperator->pPhyNode = pPhyciNode;
3,323,636✔
3718
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
3,323,636✔
3719

3720
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
3,323,636✔
3721
  QUERY_CHECK_CODE(code, line, _error);
3,323,636✔
3722

3723
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
3,323,636✔
3724
                  pInfo, pTaskInfo);
3725

3726
  pInfo->qType = pPhyciNode->qType;
3,323,636✔
3727
  switch (pInfo->qType) {
3,323,636✔
3728
    case DYN_QTYPE_STB_HASH:
1,142,627✔
3729
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,142,627✔
3730
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,142,627✔
3731
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,142,627✔
3732
      QUERY_CHECK_CODE(code, line, _error);
1,142,627✔
3733
      nextFp = seqStableJoin;
1,142,627✔
3734
      openFp = optrDummyOpenFn;
1,142,627✔
3735
      break;
1,142,627✔
3736
    case DYN_QTYPE_VTB_SCAN:
1,329,119✔
3737
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,329,119✔
3738
      QUERY_CHECK_CODE(code, line, _error);
1,329,119✔
3739
      nextFp = vtbScanNext;
1,329,119✔
3740
      openFp = vtbScanOpen;
1,329,119✔
3741
      break;
1,329,119✔
3742
    case DYN_QTYPE_VTB_WINDOW:
185,156✔
3743
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
185,156✔
3744
      QUERY_CHECK_CODE(code, line, _error);
185,156✔
3745
      nextFp = vtbWindowNext;
185,156✔
3746
      openFp = vtbWindowOpen;
185,156✔
3747
      break;
185,156✔
3748
    case DYN_QTYPE_VTB_AGG:
666,734✔
3749
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
666,734✔
3750
      QUERY_CHECK_CODE(code, line, _error);
666,734✔
3751
      nextFp = vtbAggNext;
666,734✔
3752
      openFp = vtbAggOpen;
666,734✔
3753
      break;
666,734✔
3754
    default:
×
3755
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
3756
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
3757
      goto _error;
×
3758
  }
3759

3760
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
3,323,636✔
3761
                                         NULL, optrDefaultGetNextExtFn, NULL);
3762

3763
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
3,323,636✔
3764
  *pOptrInfo = pOperator;
3,323,636✔
3765
  return TSDB_CODE_SUCCESS;
3,323,636✔
3766

3767
_error:
×
3768
  if (pInfo != NULL) {
×
UNCOV
3769
    destroyDynQueryCtrlOperator(pInfo);
×
3770
  }
3771
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
3772
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
3773
  pTaskInfo->code = code;
×
UNCOV
3774
  return code;
×
3775
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc