• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4947

02 Feb 2026 09:27AM UTC coverage: 66.872% (-0.06%) from 66.932%
#4947

push

travis-ci

web-flow
enh: [6690002267] Optimize virtual table query with plenty of columns. (#34341)

527 of 634 new or added lines in 23 files covered. (83.12%)

3610 existing lines in 126 files now uncovered.

205539 of 307364 relevant lines covered (66.87%)

125933663.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.56
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "nodes.h"
18
#include "operator.h"
19
#include "os.h"
20
#include "plannodes.h"
21
#include "query.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tarray.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "thash.h"
28
#include "tmsg.h"
29
#include "trpc.h"
30
#include "ttypes.h"
31
#include "tdataformat.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,131,742✔
41
  SStbJoinTableList* pNext = NULL;
1,131,742✔
42
  
43
  while (pListHead) {
1,132,236✔
44
    taosMemoryFree(pListHead->pLeftVg);
494✔
45
    taosMemoryFree(pListHead->pLeftUid);
494✔
46
    taosMemoryFree(pListHead->pRightVg);
494✔
47
    taosMemoryFree(pListHead->pRightUid);
494✔
48
    pNext = pListHead->pNext;
494✔
49
    taosMemoryFree(pListHead);
494✔
50
    pListHead = pNext;
494✔
51
  }
52
}
1,131,742✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,131,742✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,131,742✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1,131,742✔
60
    if (pStbJoin->ctx.prev.leftHash) {
1,130,641✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,060,320✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,060,320✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1,130,641✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,060,320✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,060,320✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
1,101✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,101✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
1,101✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,101✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
1,101✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,101✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,131,742✔
81
}
1,131,742✔
82

83
void destroyColRefInfo(void *info) {
186,319,786✔
84
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
186,319,786✔
85
  if (pColRefInfo) {
186,319,786✔
86
    taosMemoryFree(pColRefInfo->colName);
186,319,786✔
87
    taosMemoryFree(pColRefInfo->colrefName);
186,319,786✔
88
  }
89
}
186,319,786✔
90

91
void destroyColRefArray(void *info) {
9,516,523✔
92
  SArray *pColRefArray = *(SArray **)info;
9,516,523✔
93
  if (pColRefArray) {
9,516,523✔
94
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
9,516,523✔
95
  }
96
}
9,516,523✔
97

98
void freeUseDbOutput(void* pOutput) {
2,178,280✔
99
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
2,178,280✔
100
  if (NULL == pOutput) {
2,178,280✔
101
    return;
×
102
  }
103

104
  if (pOut->dbVgroup) {
2,178,280✔
105
    freeVgInfo(pOut->dbVgroup);
2,178,280✔
106
  }
107
  taosMemFree(pOut);
2,178,280✔
108
}
109

110
void destroyOtbInfoArray(void *info) {
3,891,549✔
111
  SArray *pOtbInfoArray = *(SArray **)info;
3,891,549✔
112
  if (pOtbInfoArray) {
3,891,549✔
113
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
3,891,549✔
114
  }
115
}
3,891,549✔
116

117
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
2,846,357✔
118
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
2,846,357✔
119
  if (pOtbVgIdToOtbInfoArrayMap) {
2,846,357✔
120
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
2,846,357✔
121
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
2,846,357✔
122
  }
123
}
2,846,357✔
124

125
void destroyTagList(void *info) {
2,140,880✔
126
  SArray *pTagList = *(SArray **)info;
2,140,880✔
127
  if (pTagList) {
2,140,880✔
128
    taosArrayDestroyEx(pTagList, destroyTagVal);
2,140,880✔
129
  }
130
}
2,140,880✔
131

132
static void destroyRefColIdGroup(void *info) {
7,113✔
133
  SRefColIdGroup *pGroup = (SRefColIdGroup *)info;
7,113✔
134
  if (pGroup && pGroup->pSlotIdList) {
7,113✔
135
    taosArrayDestroy(pGroup->pSlotIdList);
7,113✔
136
    pGroup->pSlotIdList = NULL;
7,113✔
137
  }
138
}
7,113✔
139

140
void destroyVtbUidTagListMap(void *info) {
805,896✔
141
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
805,896✔
142
  if (pVtbUidTagListMap) {
805,896✔
143
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
805,896✔
144
    taosHashCleanup(pVtbUidTagListMap);
805,896✔
145
  }
146
}
805,896✔
147

148
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
2,265,374✔
149
  if (pVtbScan->dbName) {
2,265,374✔
150
    taosMemoryFreeClear(pVtbScan->dbName);
2,265,374✔
151
  }
152
  if (pVtbScan->tbName) {
2,265,374✔
153
    taosMemoryFreeClear(pVtbScan->tbName);
2,265,374✔
154
  }
155
  if (pVtbScan->refColGroups) {
2,265,374✔
156
    taosArrayDestroyEx(pVtbScan->refColGroups, destroyRefColIdGroup);
1,187✔
157
    pVtbScan->refColGroups = NULL;
1,187✔
158
  }
159
  if (pVtbScan->childTableList) {
2,265,374✔
160
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
2,265,374✔
161
  }
162
  if (pVtbScan->colRefInfo) {
2,265,374✔
UNCOV
163
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
UNCOV
164
    pVtbScan->colRefInfo = NULL;
×
165
  }
166
  if (pVtbScan->childTableMap) {
2,265,374✔
167
    taosHashCleanup(pVtbScan->childTableMap);
1,953,156✔
168
  }
169
  if (pVtbScan->readColList) {
2,265,374✔
170
    taosArrayDestroy(pVtbScan->readColList);
2,265,374✔
171
  }
172
  if (pVtbScan->readColSet) {
2,265,374✔
173
    taosHashCleanup(pVtbScan->readColSet);
2,265,374✔
174
  }
175
  if (pVtbScan->dbVgInfoMap) {
2,265,374✔
176
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
2,265,374✔
177
    taosHashCleanup(pVtbScan->dbVgInfoMap);
2,265,374✔
178
  }
179
  if (pVtbScan->otbNameToOtbInfoMap) {
2,265,374✔
180
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
757,036✔
181
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
757,036✔
182
  }
183
  if (pVtbScan->pRsp) {
2,265,374✔
UNCOV
184
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
UNCOV
185
    taosMemoryFreeClear(pVtbScan->pRsp);
×
186
  }
187
  if (pVtbScan->existOrgTbVg) {
2,265,374✔
188
    taosHashCleanup(pVtbScan->existOrgTbVg);
2,265,374✔
189
  }
190
  if (pVtbScan->curOrgTbVg) {
2,265,374✔
191
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,485✔
192
  }
193
  if (pVtbScan->newAddedVgInfo) {
2,265,374✔
194
    taosHashCleanup(pVtbScan->newAddedVgInfo);
840✔
195
  }
196
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
2,265,374✔
197
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
172,380✔
198
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
172,380✔
199
  }
200
  if (pVtbScan->vtbUidToVgIdMapMap) {
2,265,374✔
201
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
240,380✔
202
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
240,380✔
203
  }
204
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
2,265,374✔
205
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
343,650✔
206
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
343,650✔
207
  }
208
  if (pVtbScan->vtbUidTagListMap) {
2,265,374✔
209
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
240,380✔
210
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
240,380✔
211
  }
212
  if (pVtbScan->vtbGroupIdTagListMap) {
2,265,374✔
213
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
540,210✔
214
  }
215
  if (pVtbScan->vtbUidToGroupIdMap) {
2,265,374✔
216
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
540,210✔
217
  }
218
}
2,265,374✔
219

220
void destroyWinArray(void *info) {
937,670✔
221
  SArray *pWinArray = *(SArray **)info;
937,670✔
222
  if (pWinArray) {
937,670✔
223
    taosArrayDestroy(pWinArray);
937,670✔
224
  }
225
}
937,670✔
226

227
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
375,191✔
228
  if (pVtbWindow->pRes) {
375,191✔
229
    blockDataDestroy(pVtbWindow->pRes);
375,191✔
230
  }
231
  if (pVtbWindow->pWins) {
375,191✔
232
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
375,191✔
233
  }
234
}
375,191✔
235

236
static void destroyDynQueryCtrlOperator(void* param) {
3,396,372✔
237
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
3,396,372✔
238

239
  switch (pDyn->qType) {
3,396,372✔
240
    case DYN_QTYPE_STB_HASH:
1,130,998✔
241
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,130,998✔
242
      break;
1,130,998✔
243
    case DYN_QTYPE_VTB_WINDOW:
375,191✔
244
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
375,191✔
245
    case DYN_QTYPE_VTB_AGG:
2,265,374✔
246
    case DYN_QTYPE_VTB_SCAN:
247
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
2,265,374✔
248
      break;
2,265,374✔
UNCOV
249
    default:
×
UNCOV
250
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
UNCOV
251
      break;
×
252
  }
253

254
  taosMemoryFreeClear(param);
3,396,372✔
255
}
3,396,372✔
256

257
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
258
  if (batchFetch) {
7,255,052✔
259
    return true;
7,250,648✔
260
  }
261
  
262
  if (rightTable) {
4,404✔
263
    return pPost->rightCurrUid == pPost->rightNextUid;
2,202✔
264
  }
265

266
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,202✔
267

268
  return (NULL == num) ? false : true;
2,202✔
269
}
270

271
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,627,526✔
272
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,627,526✔
273
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,627,526✔
274
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,627,526✔
275
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,627,526✔
276
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,627,526✔
277
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,627,526✔
278
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,627,526✔
279
  int64_t                    readIdx = pNode->readIdx + 1;
3,627,526✔
280
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,627,526✔
281

282
  pPost->leftCurrUid = *leftUid;
3,627,526✔
283
  pPost->rightCurrUid = *rightUid;
3,627,526✔
284

285
  pPost->leftVgId = *leftVgId;
3,627,526✔
286
  pPost->rightVgId = *rightVgId;
3,627,526✔
287

288
  while (true) {
289
    if (readIdx < pNode->uidNum) {
3,627,526✔
290
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,556,598✔
291
      break;
3,556,598✔
292
    }
293
    
294
    pNode = pNode->pNext;
70,928✔
295
    if (NULL == pNode) {
70,928✔
296
      pPost->rightNextUid = 0;
70,928✔
297
      break;
70,928✔
298
    }
299
    
UNCOV
300
    rightUid = pNode->pRightUid;
×
UNCOV
301
    readIdx = 0;
×
302
  }
303

304
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,255,052✔
305
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,255,052✔
306

307
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,627,526✔
UNCOV
308
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
UNCOV
309
    pStbJoin->execInfo.rightCacheNum++;
×
310
  }  
311

312
  return TSDB_CODE_SUCCESS;
3,627,526✔
313
}
314

315
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
6,747,915✔
316
  int32_t     code = TSDB_CODE_SUCCESS;
6,747,915✔
317
  int32_t     lino = 0;
6,747,915✔
318
  SOrgTbInfo* pTbInfo = NULL;
6,747,915✔
319

320
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
6,747,915✔
321

322
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
6,747,915✔
323
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
6,747,915✔
324

325
  pTbInfo->vgId = pSrc->vgId;
6,747,915✔
326
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
6,747,915✔
327

328
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
6,747,915✔
329
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
6,747,915✔
330

331
  *ppDst = pTbInfo;
6,747,915✔
332

333
  return code;
6,747,915✔
UNCOV
334
_return:
×
UNCOV
335
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
UNCOV
336
  if (pTbInfo) {
×
UNCOV
337
    if (pTbInfo->colMap) {
×
UNCOV
338
      taosArrayDestroy(pTbInfo->colMap);
×
339
    }
UNCOV
340
    taosMemoryFreeClear(pTbInfo);
×
341
  }
UNCOV
342
  return code;
×
343
}
344

345
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
1,853,784✔
346
  int32_t  code = TSDB_CODE_SUCCESS;
1,853,784✔
347
  int32_t  lino = 0;
1,853,784✔
348
  STagVal  tmpTag;
1,853,784✔
349

350
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
1,853,784✔
351
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
1,853,784✔
352

353
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
12,531,816✔
354
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
10,678,032✔
355
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
10,678,032✔
356
    tmpTag.type = pSrcTag->type;
10,678,032✔
357
    tmpTag.cid = pSrcTag->cid;
10,678,032✔
358
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
10,678,032✔
359
      tmpTag.nData = pSrcTag->nData;
4,683,792✔
360
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
4,683,792✔
361
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
4,683,792✔
362
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
4,683,792✔
363
    } else {
364
      tmpTag.i64 = pSrcTag->i64;
5,994,240✔
365
    }
366

367
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
21,356,064✔
368
    tmpTag = (STagVal){0};
10,678,032✔
369
  }
370

371
  return code;
1,853,784✔
UNCOV
372
_return:
×
UNCOV
373
  if (pBasic->tagList) {
×
UNCOV
374
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
UNCOV
375
    pBasic->tagList = NULL;
×
376
  }
UNCOV
377
  if (tmpTag.pData) {
×
UNCOV
378
    taosMemoryFree(tmpTag.pData);
×
379
  }
UNCOV
380
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
UNCOV
381
  return code;
×
382
}
383

384
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
4,004,067✔
385
  int32_t     code = TSDB_CODE_SUCCESS;
4,004,067✔
386
  int32_t     lino = 0;
4,004,067✔
387
  SOrgTbInfo  batchInfo;
4,004,067✔
388

389
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
4,004,067✔
390
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
4,004,067✔
391

392
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
10,732,838✔
393
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
6,728,771✔
394
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
6,728,771✔
395
    batchInfo.vgId = pSrc->vgId;
6,728,771✔
396
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
6,728,771✔
397
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
6,728,771✔
398
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
6,728,771✔
399
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
13,457,542✔
400
    batchInfo = (SOrgTbInfo){0};
6,728,771✔
401
  }
402

403
  return code;
4,004,067✔
UNCOV
404
_return:
×
UNCOV
405
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
UNCOV
406
  if (pBasic->batchOrgTbInfo) {
×
UNCOV
407
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
UNCOV
408
    pBasic->batchOrgTbInfo = NULL;
×
409
  }
UNCOV
410
  if (batchInfo.colMap) {
×
UNCOV
411
    taosArrayDestroy(batchInfo.colMap);
×
UNCOV
412
    batchInfo.colMap = NULL;
×
413
  }
UNCOV
414
  return code;
×
415
}
416

417
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,255,052✔
418
  int32_t code = TSDB_CODE_SUCCESS;
7,255,052✔
419
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,255,052✔
420
  if (NULL == *ppRes) {
7,255,052✔
421
    code = terrno;
×
422
    freeOperatorParam(pChild, OP_GET_PARAM);
×
423
    return code;
×
424
  }
425
  if (pChild) {
7,255,052✔
426
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
145,046✔
427
    if (NULL == (*ppRes)->pChildren) {
145,046✔
UNCOV
428
      code = terrno;
×
429
      freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
430
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
431
      *ppRes = NULL;
×
UNCOV
432
      return code;
×
433
    }
434
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
290,092✔
UNCOV
435
      code = terrno;
×
436
      freeOperatorParam(pChild, OP_GET_PARAM);
×
437
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
438
      *ppRes = NULL;
×
UNCOV
439
      return code;
×
440
    }
441
  } else {
442
    (*ppRes)->pChildren = NULL;
7,110,006✔
443
  }
444

445
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,255,052✔
446
  if (NULL == pGc) {
7,255,052✔
447
    code = terrno;
×
UNCOV
448
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
449
    *ppRes = NULL;
×
450
    return code;
×
451
  }
452

453
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,255,052✔
454
  pGc->downstreamIdx = downstreamIdx;
7,255,052✔
455
  pGc->vgId = vgId;
7,255,052✔
456
  pGc->tbUid = tbUid;
7,255,052✔
457
  pGc->needCache = needCache;
7,255,052✔
458

459
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,255,052✔
460
  (*ppRes)->downstreamIdx = downstreamIdx;
7,255,052✔
461
  (*ppRes)->value = pGc;
7,255,052✔
462
  (*ppRes)->reUse = false;
7,255,052✔
463

464
  return TSDB_CODE_SUCCESS;
7,255,052✔
465
}
466

467

UNCOV
468
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
UNCOV
469
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
470
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
471
  if (NULL == *ppRes) {
×
UNCOV
472
    return terrno;
×
473
  }
UNCOV
474
  (*ppRes)->pChildren = NULL;
×
475

UNCOV
476
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
UNCOV
477
  if (NULL == pGc) {
×
UNCOV
478
    code = terrno;
×
UNCOV
479
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
UNCOV
480
    return code;
×
481
  }
482

483
  pGc->downstreamIdx = downstreamIdx;
×
484
  pGc->vgId = vgId;
×
485
  pGc->tbUid = tbUid;
×
486

487
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
UNCOV
488
  (*ppRes)->downstreamIdx = downstreamIdx;
×
489
  (*ppRes)->value = pGc;
×
UNCOV
490
  (*ppRes)->reUse = false;
×
491

492
  return TSDB_CODE_SUCCESS;
×
493
}
494

495
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
16,240,462✔
496
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
497
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
498
                                               SArray* pOrgTbInfoArray, STimeWindow window,
499
                                               SDownstreamSourceNode* pDownstreamSourceNode,
500
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
501
  int32_t code = TSDB_CODE_SUCCESS;
16,240,462✔
502
  int32_t lino = 0;
16,240,462✔
503

504
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
16,240,462✔
505
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
506

507
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
16,240,462✔
508
  pBasic->srcOpType = srcOpType;
16,240,462✔
509
  pBasic->vgId = vgId;
16,240,462✔
510
  pBasic->groupid = groupId;
16,240,462✔
511
  pBasic->window = window;
16,240,462✔
512
  pBasic->tableSeq = tableSeq;
16,240,462✔
513
  pBasic->type = exchangeType;
16,240,462✔
514
  pBasic->isNewParam = isNewParam;
16,240,462✔
515

516
  if (pDownstreamSourceNode) {
16,240,462✔
517
    pBasic->isNewDeployed = true;
2,310✔
518
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,310✔
519
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,310✔
520
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,310✔
521
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,310✔
522
    pBasic->newDeployedSrc.localExec = false;
2,310✔
523
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,310✔
524
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,310✔
525
  } else {
526
    pBasic->isNewDeployed = false;
16,238,152✔
527
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
16,238,152✔
528
  }
529

530
  if (pUidList) {
16,240,462✔
531
    pBasic->uidList = taosArrayDup(pUidList, NULL);
4,794,913✔
532
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
4,794,913✔
533
  } else {
534
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
11,445,549✔
535
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
11,445,549✔
536
  }
537

538
  if (pOrgTbInfo) {
16,240,462✔
539
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
6,747,915✔
540
    QUERY_CHECK_CODE(code, lino, _return);
6,747,915✔
541
  } else {
542
    pBasic->orgTbInfo = NULL;
9,492,547✔
543
  }
544

545
  if (pTagList) {
16,240,462✔
546
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
1,853,784✔
547
    QUERY_CHECK_CODE(code, lino, _return);
1,853,784✔
548
  } else {
549
    pBasic->tagList = NULL;
14,386,678✔
550
  }
551

552
  if (pOrgTbInfoArray) {
16,240,462✔
553
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
4,004,067✔
554
    QUERY_CHECK_CODE(code, lino, _return);
4,004,067✔
555
  } else {
556
    pBasic->batchOrgTbInfo = NULL;
12,236,395✔
557
  }
558
  return code;
16,240,462✔
559

UNCOV
560
_return:
×
UNCOV
561
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
UNCOV
562
  freeExchangeGetBasicOperatorParam(pBasic);
×
UNCOV
563
  return code;
×
564
}
565

566
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
12,006,920✔
567
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
568
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
569
                                              SArray* pOrgTbInfoArray, STimeWindow window,
570
                                              SDownstreamSourceNode* pDownstreamSourceNode,
571
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
572

573
  int32_t                      code = TSDB_CODE_SUCCESS;
12,006,920✔
574
  int32_t                      lino = 0;
12,006,920✔
575
  SOperatorParam*              pParam = NULL;
12,006,920✔
576
  SExchangeOperatorParam*      pExc = NULL;
12,006,920✔
577

578
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
12,006,920✔
579
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
12,006,920✔
580

581
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
12,006,920✔
582
  pParam->downstreamIdx = downstreamIdx;
12,006,920✔
583
  pParam->reUse = reUse;
12,006,920✔
584
  pParam->pChildren = NULL;
12,006,920✔
585
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
12,006,920✔
586
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
12,006,920✔
587

588
  pExc = (SExchangeOperatorParam*)pParam->value;
12,006,920✔
589
  pExc->multiParams = false;
12,006,920✔
590

591
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
12,006,920✔
592
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
593
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
594

595
  *ppRes = pParam;
12,006,920✔
596
  return code;
12,006,920✔
UNCOV
597
_return:
×
UNCOV
598
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
UNCOV
599
  if (pParam) {
×
UNCOV
600
    freeOperatorParam(pParam, OP_GET_PARAM);
×
601
  }
UNCOV
602
  return code;
×
603
}
604

605
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,404✔
606
  int32_t code = TSDB_CODE_SUCCESS;
4,404✔
607
  int32_t lino = 0;
4,404✔
608

609
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,404✔
610
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,404✔
611

612
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,404✔
613

614
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,404✔
615
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,404✔
616
  QUERY_CHECK_CODE(code, lino, _return);
4,404✔
617

618
_return:
4,404✔
619
  if (code) {
4,404✔
UNCOV
620
    qError("failed to build exchange operator param, code:%d", code);
×
621
  }
622
  taosArrayDestroy(pUidList);
4,404✔
623
  return code;
4,404✔
624
}
625

626
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
693,567✔
627
  int32_t                   code = TSDB_CODE_SUCCESS;
693,567✔
628
  int32_t                   lino = 0;
693,567✔
629

630
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VTB_WIN_SCAN,
693,567✔
631
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
632
  QUERY_CHECK_CODE(code, lino, _return);
693,567✔
633

634
  return code;
693,567✔
635
_return:
×
UNCOV
636
  qError("failed to build exchange operator param for external window, code:%d", code);
×
UNCOV
637
  return code;
×
638
}
639

640
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
4,561,034✔
641
  int32_t                      code = TSDB_CODE_SUCCESS;
4,561,034✔
642
  int32_t                      lino = 0;
4,561,034✔
643
  SArray*                      pUidList = NULL;
4,561,034✔
644

645
  pUidList = taosArrayInit(1, sizeof(int64_t));
4,561,034✔
646
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,561,034✔
647

648
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
4,561,034✔
649

650
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
4,561,034✔
651
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
4,561,034✔
652
  QUERY_CHECK_CODE(code, lino, _return);
4,561,034✔
653

654
_return:
4,561,034✔
655
  if (code) {
4,561,034✔
UNCOV
656
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
657
  }
658
  taosArrayDestroy(pUidList);
4,561,034✔
659
  return code;
4,561,034✔
660
}
661

662
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
6,747,915✔
663
                                                  SDownstreamSourceNode* pNewSource) {
664
  int32_t                      code = TSDB_CODE_SUCCESS;
6,747,915✔
665
  int32_t                      lino = 0;
6,747,915✔
666

667
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
6,747,915✔
668
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
6,747,915✔
669
  QUERY_CHECK_CODE(code, lino, _return);
6,747,915✔
670

671
  return code;
6,747,915✔
UNCOV
672
_return:
×
UNCOV
673
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
674
  return code;
×
675
}
676

677
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
129,232✔
678
  int32_t                       code = TSDB_CODE_SUCCESS;
129,232✔
679
  int32_t                       line = 0;
129,232✔
680
  SOperatorParam*               pParam = NULL;
129,232✔
681
  SExchangeOperatorBatchParam*  pExc = NULL;
129,232✔
682
  SExchangeOperatorBasicParam   basic = {0};
129,232✔
683

684
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
129,232✔
685
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
129,232✔
686

687
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
129,232✔
688
  pParam->downstreamIdx = downstreamIdx;
129,232✔
689
  pParam->reUse = false;
129,232✔
690
  pParam->pChildren = NULL;
129,232✔
691
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
129,232✔
692
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
129,232✔
693

694
  pExc = pParam->value;
129,232✔
695
  pExc->multiParams = true;
129,232✔
696
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
129,232✔
697
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
129,232✔
698

699
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
129,232✔
700

701
  int32_t iter = 0;
129,232✔
702
  void*   p = NULL;
129,232✔
703
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
358,707✔
704
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
229,475✔
705
    SArray*  pUidList = *(SArray**)p;
229,475✔
706

707
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
229,475✔
708
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
709
                                           pUidList, NULL, NULL, NULL,
710
                                           (STimeWindow){0}, NULL, false, false, false);
229,475✔
711
    QUERY_CHECK_CODE(code, line, _return);
229,475✔
712

713
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
229,475✔
714

715
    basic = (SExchangeOperatorBasicParam){0};
229,475✔
716
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
229,475✔
717

718
    // already transferred to batch param, can free here
719
    taosArrayDestroy(pUidList);
229,475✔
720

721
    *(SArray**)p = NULL;
229,475✔
722
  }
723
  *ppRes = pParam;
129,232✔
724

725
  return code;
129,232✔
726
  
UNCOV
727
_return:
×
UNCOV
728
  qError("failed to build batch exchange operator param, code:%d", code);
×
UNCOV
729
  freeOperatorParam(pParam, OP_GET_PARAM);
×
UNCOV
730
  freeExchangeGetBasicOperatorParam(&basic);
×
UNCOV
731
  return code;
×
732
}
733

734
static int32_t buildBatchExchangeOperatorParamForVirtual(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps, STimeWindow window, EExchangeSourceType type) {
3,694,382✔
735
  int32_t                       code = TSDB_CODE_SUCCESS;
3,694,382✔
736
  int32_t                       lino = 0;
3,694,382✔
737
  SOperatorParam*               pParam = NULL;
3,694,382✔
738
  SExchangeOperatorBatchParam*  pExc = NULL;
3,694,382✔
739
  SExchangeOperatorBasicParam   basic = {0};
3,694,382✔
740

741
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
3,694,382✔
742
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
3,694,382✔
743

744
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
3,694,382✔
745
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
3,694,382✔
746

747
  pExc = pParam->value;
3,694,382✔
748
  pExc->multiParams = true;
3,694,382✔
749

750
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
3,694,382✔
751
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
3,694,382✔
752
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
3,694,382✔
753

754
  size_t keyLen = 0;
3,694,382✔
755
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
3,694,382✔
756
  while (pIter != NULL) {
7,698,449✔
757
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
4,004,067✔
758
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
4,004,067✔
759

760
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
4,004,067✔
761
                                           type, *vgId, groupid,
762
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
763
                                           window, NULL, false, true, false);
764
    QUERY_CHECK_CODE(code, lino, _return);
4,004,067✔
765

766
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
4,004,067✔
767
    QUERY_CHECK_CODE(code, lino, _return);
4,004,067✔
768

769
    basic = (SExchangeOperatorBasicParam){0};
4,004,067✔
770
    pIter = taosHashIterate(pBatchMaps, pIter);
4,004,067✔
771
  }
772

773
  pParam->pChildren = NULL;
3,694,382✔
774
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
3,694,382✔
775
  pParam->downstreamIdx = downstreamIdx;
3,694,382✔
776
  pParam->reUse = false;
3,694,382✔
777

778
  *ppRes = pParam;
3,694,382✔
779
  return code;
3,694,382✔
780

UNCOV
781
_return:
×
UNCOV
782
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
783
  freeOperatorParam(pParam, OP_GET_PARAM);
×
UNCOV
784
  freeExchangeGetBasicOperatorParam(&basic);
×
UNCOV
785
  return code;
×
786
}
787

788
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,627,526✔
789
  int32_t code = TSDB_CODE_SUCCESS;
3,627,526✔
790
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,627,526✔
791
  if (NULL == *ppRes) {
3,627,526✔
UNCOV
792
    code = terrno;
×
UNCOV
793
    return code;
×
794
  }
795
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,627,526✔
796
  if (NULL == (*ppRes)->pChildren) {
3,627,526✔
797
    code = terrno;
×
798
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
799
    *ppRes = NULL;
×
800
    return code;
×
801
  }
802
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,255,052✔
UNCOV
803
    code = terrno;
×
UNCOV
804
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
805
    *ppRes = NULL;
×
UNCOV
806
    return code;
×
807
  }
808
  *ppChild0 = NULL;
3,627,526✔
809
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,255,052✔
UNCOV
810
    code = terrno;
×
UNCOV
811
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
812
    *ppRes = NULL;
×
813
    return code;
×
814
  }
815
  *ppChild1 = NULL;
3,627,526✔
816
  
817
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,627,526✔
818
  if (NULL == pJoin) {
3,627,526✔
819
    code = terrno;
×
820
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
821
    *ppRes = NULL;
×
UNCOV
822
    return code;
×
823
  }
824

825
  pJoin->initDownstream = initParam;
3,627,526✔
826
  
827
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,627,526✔
828
  (*ppRes)->value = pJoin;
3,627,526✔
829
  (*ppRes)->reUse = false;
3,627,526✔
830

831
  return TSDB_CODE_SUCCESS;
3,627,526✔
832
}
833

834
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
835
  int32_t code = TSDB_CODE_SUCCESS;
×
836
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
837
  if (NULL == *ppRes) {
×
UNCOV
838
    code = terrno;
×
UNCOV
839
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
840
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
841
    return code;
×
842
  }
UNCOV
843
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
UNCOV
844
  if (NULL == *ppRes) {
×
UNCOV
845
    code = terrno;
×
UNCOV
846
    taosMemoryFreeClear(*ppRes);
×
UNCOV
847
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
848
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
849
    return code;
×
850
  }
851
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
852
    code = terrno;
×
853
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
854
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
855
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
856
    *ppRes = NULL;
×
UNCOV
857
    return code;
×
858
  }
859
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
860
    code = terrno;
×
861
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
862
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
863
    *ppRes = NULL;
×
864
    return code;
×
865
  }
866
  
867
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
868
  (*ppRes)->value = NULL;
×
869
  (*ppRes)->reUse = false;
×
870

871
  return TSDB_CODE_SUCCESS;
×
872
}
873

874
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
11,410✔
875
  int32_t code = TSDB_CODE_SUCCESS;
11,410✔
876
  int32_t vgNum = tSimpleHashGetSize(pVg);
11,410✔
877
  if (vgNum <= 0 || vgNum > 1) {
11,410✔
878
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
879
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
880
  }
881

882
  int32_t iter = 0;
11,410✔
883
  void* p = NULL;
11,410✔
884
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
22,820✔
885
    SArray* pUidList = *(SArray**)p;
11,410✔
886

887
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
11,410✔
888
    if (code) {
11,410✔
UNCOV
889
      return code;
×
890
    }
891
    taosArrayDestroy(pUidList);
11,410✔
892
    *(SArray**)p = NULL;
11,410✔
893
  }
894
  
895
  return TSDB_CODE_SUCCESS;
11,410✔
896
}
897

UNCOV
898
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
UNCOV
899
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
UNCOV
900
  if (NULL == pUidList) {
×
UNCOV
901
    return terrno;
×
902
  }
UNCOV
903
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
904
    return terrno;
×
905
  }
906

UNCOV
907
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
UNCOV
908
  taosArrayDestroy(pUidList);
×
UNCOV
909
  if (code) {
×
UNCOV
910
    return code;
×
911
  }
912
  
913
  return TSDB_CODE_SUCCESS;
×
914
}
915

916
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,627,526✔
917
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,627,526✔
918
  SOperatorParam*             pSrcParam0 = NULL;
3,627,526✔
919
  SOperatorParam*             pSrcParam1 = NULL;
3,627,526✔
920
  SOperatorParam*             pGcParam0 = NULL;
3,627,526✔
921
  SOperatorParam*             pGcParam1 = NULL;  
3,627,526✔
922
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,627,526✔
923
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,627,526✔
924
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,627,526✔
925
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,627,526✔
926
  int32_t                     code = TSDB_CODE_SUCCESS;
3,627,526✔
927

928
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,627,526✔
929
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
930

931
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,627,526✔
932
  
933
  if (pInfo->stbJoin.basic.batchFetch) {
3,627,526✔
934
    if (pPrev->leftHash) {
3,625,324✔
935
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
70,321✔
936
      if (TSDB_CODE_SUCCESS == code) {
70,321✔
937
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
70,321✔
938
      }
939
      if (TSDB_CODE_SUCCESS == code) {
70,321✔
940
        tSimpleHashCleanup(pPrev->leftHash);
70,321✔
941
        tSimpleHashCleanup(pPrev->rightHash);
70,321✔
942
        pPrev->leftHash = NULL;
70,321✔
943
        pPrev->rightHash = NULL;
70,321✔
944
      }
945
    }
946
  } else {
947
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,202✔
948
    if (TSDB_CODE_SUCCESS == code) {
2,202✔
949
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,202✔
950
    }
951
  }
952

953
  bool initParam = pSrcParam0 ? true : false;
3,627,526✔
954
  if (TSDB_CODE_SUCCESS == code) {
3,627,526✔
955
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,627,526✔
956
    pSrcParam0 = NULL;
3,627,526✔
957
  }
958
  if (TSDB_CODE_SUCCESS == code) {
3,627,526✔
959
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,627,526✔
960
    pSrcParam1 = NULL;
3,627,526✔
961
  }
962
  if (TSDB_CODE_SUCCESS == code) {
3,627,526✔
963
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,627,526✔
964
  }
965
  if (TSDB_CODE_SUCCESS != code) {
3,627,526✔
UNCOV
966
    if (pSrcParam0) {
×
UNCOV
967
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
968
    }
UNCOV
969
    if (pSrcParam1) {
×
UNCOV
970
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
971
    }
UNCOV
972
    if (pGcParam0) {
×
UNCOV
973
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
974
    }
UNCOV
975
    if (pGcParam1) {
×
UNCOV
976
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
977
    }
UNCOV
978
    if (*ppParam) {
×
UNCOV
979
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
UNCOV
980
      *ppParam = NULL;
×
981
    }
982
  }
983
  
984
  return code;
3,627,526✔
985
}
986

987
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
4,561,034✔
988
  int32_t                   code = TSDB_CODE_SUCCESS;
4,561,034✔
989
  int32_t                   lino = 0;
4,561,034✔
990
  SVTableScanOperatorParam* pVScan = NULL;
4,561,034✔
991
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,561,034✔
992
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,561,034✔
993

994
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
4,561,034✔
995
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
4,561,034✔
996

997
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
4,561,034✔
998
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
4,561,034✔
999
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
4,561,034✔
1000
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
4,561,034✔
1001
  pVScan->uid = uid;
4,561,034✔
1002
  pVScan->window = pInfo->vtbScan.window;
4,561,034✔
1003
  if (pInfo->vtbScan.refColGroups) {
4,561,034✔
1004
    pVScan->pRefColGroups = taosArrayInit(taosArrayGetSize(pInfo->vtbScan.refColGroups), sizeof(SRefColIdGroup));
4,748✔
1005
    QUERY_CHECK_NULL(pVScan->pRefColGroups, code, lino, _return, terrno)
4,748✔
1006
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->vtbScan.refColGroups); i++) {
12,456✔
1007
      SRefColIdGroup* pSrc = (SRefColIdGroup*)taosArrayGet(pInfo->vtbScan.refColGroups, i);
7,708✔
1008
      SRefColIdGroup  dst = {0};
7,708✔
1009
      QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
7,708✔
1010
      dst.pSlotIdList = taosArrayDup(pSrc->pSlotIdList, NULL);
7,708✔
1011
      QUERY_CHECK_NULL(dst.pSlotIdList, code, lino, _return, terrno)
7,708✔
1012
      void* px = taosArrayPush(pVScan->pRefColGroups, &dst);
7,708✔
1013
      if (NULL == px) {
7,708✔
NEW
1014
        taosArrayDestroy(dst.pSlotIdList);
×
NEW
1015
        dst.pSlotIdList = NULL;
×
1016
      }
1017
      QUERY_CHECK_NULL(px, code, lino, _return, terrno)
7,708✔
1018
    }
1019
  } else {
1020
    pVScan->pRefColGroups = NULL;
4,556,286✔
1021
  }
1022

1023
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
4,561,034✔
1024
  (*ppRes)->downstreamIdx = 0;
4,561,034✔
1025
  (*ppRes)->value = pVScan;
4,561,034✔
1026
  (*ppRes)->reUse = false;
4,561,034✔
1027

1028
  return TSDB_CODE_SUCCESS;
4,561,034✔
UNCOV
1029
_return:
×
UNCOV
1030
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1031
  if (pVScan) {
×
NEW
1032
    if (pVScan->pRefColGroups) {
×
NEW
1033
      taosArrayDestroyEx(pVScan->pRefColGroups, destroyRefColIdGroup);
×
NEW
1034
      pVScan->pRefColGroups = NULL;
×
1035
    }
UNCOV
1036
    taosArrayDestroy(pVScan->pOpParamArray);
×
UNCOV
1037
    taosMemoryFreeClear(pVScan);
×
1038
  }
UNCOV
1039
  if (*ppRes) {
×
UNCOV
1040
    taosArrayDestroy((*ppRes)->pChildren);
×
UNCOV
1041
    taosMemoryFreeClear(*ppRes);
×
1042
  }
UNCOV
1043
  return code;
×
1044
}
1045

1046
static int32_t addRefColIdToRefMap(SHashObj* refMap, const char* colrefName, col_id_t colId) {
20,574,505✔
1047
  int32_t  code = TSDB_CODE_SUCCESS;
20,574,505✔
1048
  int32_t  line = 0;
20,574,505✔
1049
  SArray** sameRefColIdList = NULL;
20,574,505✔
1050

1051
  if (colrefName == NULL || colrefName[0] == '\0') {
20,574,505✔
NEW
1052
    return code;
×
1053
  }
1054

1055
  sameRefColIdList = (SArray**)taosHashGet(refMap, colrefName, strlen(colrefName));
20,574,505✔
1056
  if (sameRefColIdList == NULL) {
20,574,505✔
1057
    SArray* list = taosArrayInit(2, sizeof(col_id_t));
20,562,656✔
1058
    QUERY_CHECK_NULL(list, code, line, _return, terrno)
20,562,656✔
1059
    QUERY_CHECK_CODE(taosHashPut(refMap, colrefName, strlen(colrefName), &list, POINTER_BYTES), line, _return);
20,562,656✔
1060
    sameRefColIdList = (SArray**)taosHashGet(refMap, colrefName, strlen(colrefName));
20,562,656✔
1061
    QUERY_CHECK_NULL(sameRefColIdList, code, line, _return, terrno)
20,562,656✔
1062
  }
1063

1064
  for (int32_t i = 0; i < taosArrayGetSize(*sameRefColIdList); i++) {
20,592,274✔
1065
    col_id_t existing = *(col_id_t*)taosArrayGet(*sameRefColIdList, i);
17,769✔
1066
    if (existing == colId) {
17,769✔
NEW
1067
      return code;
×
1068
    }
1069
  }
1070
  QUERY_CHECK_NULL(taosArrayPush(*sameRefColIdList, &colId), code, line, _return, terrno)
41,149,010✔
1071
  return code;
20,574,505✔
1072

NEW
1073
_return:
×
NEW
1074
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
NEW
1075
  return code;
×
1076
}
1077

1078
static int32_t buildRefSlotGroupsFromRefMap(SHashObj* refMap, SArray* readColList, SArray** ppGroups) {
4,561,034✔
1079
  int32_t   code = TSDB_CODE_SUCCESS;
4,561,034✔
1080
  int32_t   line = 0;
4,561,034✔
1081
  SArray*   groups = NULL;
4,561,034✔
1082
  SHashObj* colIdToSlot = NULL;
4,561,034✔
1083

1084
  if (refMap == NULL || readColList == NULL) {
4,561,034✔
1085
    return code;
1,453,126✔
1086
  }
1087

1088
  if (*ppGroups) {
3,107,908✔
1089
    taosArrayDestroyEx(*ppGroups, destroyRefColIdGroup);
2,966✔
1090
    *ppGroups = NULL;
2,966✔
1091
  }
1092

1093
  colIdToSlot = taosHashInit(taosArrayGetSize(readColList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false,
3,107,908✔
1094
                             HASH_NO_LOCK);
1095
  QUERY_CHECK_NULL(colIdToSlot, code, line, _return, terrno)
3,107,908✔
1096

1097
  // Build a quick colId -> slotId lookup for columns actually read.
1098
  for (int32_t i = 0; i < taosArrayGetSize(readColList); i++) {
27,826,309✔
1099
    col_id_t colId = *(col_id_t*)taosArrayGet(readColList, i);
24,718,401✔
1100
    int32_t  slotId = i;
24,718,401✔
1101
    code = taosHashPut(colIdToSlot, &colId, sizeof(colId), &slotId, sizeof(slotId));
24,718,401✔
1102
    QUERY_CHECK_CODE(code, line, _return);
24,718,401✔
1103
  }
1104

1105
  groups = taosArrayInit(1, sizeof(SRefColIdGroup));
3,107,908✔
1106
  QUERY_CHECK_NULL(groups, code, line, _return, terrno)
3,107,908✔
1107

1108
  // Group columns that share the same ref name into slotId lists.
1109
  void* pIter = taosHashIterate(refMap, NULL);
3,107,908✔
1110
  while (pIter != NULL) {
23,670,564✔
1111
    SArray* pList = *(SArray**)pIter;  // colId list
20,562,656✔
1112
    if (pList && taosArrayGetSize(pList) > 1) {
20,562,656✔
1113
      SArray* slotList = taosArrayInit(taosArrayGetSize(pList), sizeof(int32_t));
7,113✔
1114
      QUERY_CHECK_NULL(slotList, code, line, _return, terrno)
7,113✔
1115
      for (int32_t i = 0; i < taosArrayGetSize(pList); i++) {
26,075✔
1116
        col_id_t colId = *(col_id_t*)taosArrayGet(pList, i);
18,962✔
1117
        int32_t* slotId = taosHashGet(colIdToSlot, &colId, sizeof(colId));
18,962✔
1118
        if (slotId) {
18,962✔
1119
          QUERY_CHECK_NULL(taosArrayPush(slotList, slotId), code, line, _return, terrno)
18,962✔
1120
        }
1121
      }
1122
      if (taosArrayGetSize(slotList) > 1) {
7,113✔
1123
        SRefColIdGroup g = {.pSlotIdList = slotList};
7,113✔
1124
        QUERY_CHECK_NULL(taosArrayPush(groups, &g), code, line, _return, terrno)
7,113✔
1125
      } else {
NEW
1126
        taosArrayDestroy(slotList);
×
1127
      }
1128
    }
1129
    if (pList) {
20,562,656✔
1130
      taosArrayDestroy(pList);
20,562,656✔
1131
    }
1132
    pIter = taosHashIterate(refMap, pIter);
20,562,656✔
1133
  }
1134

1135
  if (taosArrayGetSize(groups) == 0) {
3,107,908✔
1136
    taosArrayDestroy(groups);
3,103,755✔
1137
    groups = NULL;
3,103,755✔
1138
  }
1139

1140
_return:
4,153✔
1141
  if (code) {
3,107,908✔
NEW
1142
    qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
NEW
1143
    if (groups) {
×
NEW
1144
      taosArrayDestroyEx(groups, destroyRefColIdGroup);
×
1145
    }
1146
  }
1147
  if (refMap) {
3,107,908✔
1148
    taosHashCleanup(refMap);
3,107,908✔
1149
  }
1150
  if (colIdToSlot) {
3,107,908✔
1151
    taosHashCleanup(colIdToSlot);
3,107,908✔
1152
  }
1153
  *ppGroups = groups;
3,107,908✔
1154
  return code;
3,107,908✔
1155
}
1156

1157
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
117,865,970✔
1158
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
117,865,970✔
1159
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
117,865,970✔
1160

1161
  if (pVtbScan->scanAllCols) {
117,865,970✔
1162
    return true;
9,970,044✔
1163
  }
1164

1165
  // if readColSet exists, use it to check whether colId is needed, otherwise use readColList
1166
  if (pVtbScan->readColSet) {
107,895,926✔
1167
    return taosHashGet(pVtbScan->readColSet, &colId, sizeof(colId)) != NULL;
107,895,926✔
1168
  }
1169

NEW
1170
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->readColList); i++) {
×
NEW
1171
    if (colId == *(col_id_t*)taosArrayGet(pVtbScan->readColList, i)) {
×
NEW
1172
      return true;
×
1173
    }
1174
  }
NEW
1175
  return false;
×
1176
}
1177

1178
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
693,567✔
1179
  int32_t                       code = TSDB_CODE_SUCCESS;
693,567✔
1180
  int32_t                       lino = 0;
693,567✔
1181
  SExternalWindowOperatorParam* pExtWinOp = NULL;
693,567✔
1182

1183
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
693,567✔
1184
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
693,567✔
1185

1186
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
693,567✔
1187
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
693,567✔
1188

1189
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
693,567✔
1190
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
693,567✔
1191

1192
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
693,567✔
1193
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
693,567✔
1194

1195
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
693,567✔
1196
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
693,567✔
1197

1198
  SOperatorParam* pExchangeOperator = NULL;
693,567✔
1199
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
693,567✔
1200
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
693,567✔
1201
  QUERY_CHECK_CODE(code, lino, _return);
693,567✔
1202
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
1,387,134✔
1203

1204
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
693,567✔
1205
  (*ppRes)->downstreamIdx = idx;
693,567✔
1206
  (*ppRes)->value = pExtWinOp;
693,567✔
1207
  (*ppRes)->reUse = false;
693,567✔
1208

1209
  return code;
693,567✔
1210
_return:
×
1211
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1212
  if (pExtWinOp) {
×
1213
    if (pExtWinOp->ExtWins) {
×
UNCOV
1214
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1215
    }
UNCOV
1216
    taosMemoryFree(pExtWinOp);
×
1217
  }
UNCOV
1218
  if (*ppRes) {
×
UNCOV
1219
    if ((*ppRes)->pChildren) {
×
UNCOV
1220
      taosArrayDestroy((*ppRes)->pChildren);
×
1221
    }
1222
    taosMemoryFree(*ppRes);
×
UNCOV
1223
    *ppRes = NULL;
×
1224
  }
UNCOV
1225
  return code;
×
1226
}
1227

1228
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
288,414✔
1229
                                       int32_t numOfDownstream, int32_t numOfWins) {
1230
  int32_t                   code = TSDB_CODE_SUCCESS;
288,414✔
1231
  int32_t                   lino = 0;
288,414✔
1232
  SMergeOperatorParam*      pMergeOp = NULL;
288,414✔
1233

1234
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
288,414✔
1235
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
288,414✔
1236

1237
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
288,414✔
1238
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
288,414✔
1239

1240
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
288,414✔
1241
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
288,414✔
1242

1243
  pMergeOp->winNum = numOfWins;
288,414✔
1244

1245
  for (int32_t i = 0; i < numOfDownstream; i++) {
981,981✔
1246
    SOperatorParam* pExternalWinParam = NULL;
693,567✔
1247
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
693,567✔
1248
    QUERY_CHECK_CODE(code, lino, _return);
693,567✔
1249
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
1,387,134✔
1250
  }
1251

1252
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
288,414✔
1253
  (*ppRes)->downstreamIdx = 0;
288,414✔
1254
  (*ppRes)->value = pMergeOp;
288,414✔
1255
  (*ppRes)->reUse = false;
288,414✔
1256

1257
  return TSDB_CODE_SUCCESS;
288,414✔
UNCOV
1258
_return:
×
UNCOV
1259
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1260
  if (pMergeOp) {
×
UNCOV
1261
    taosMemoryFree(pMergeOp);
×
1262
  }
UNCOV
1263
  if (*ppRes) {
×
UNCOV
1264
    if ((*ppRes)->pChildren) {
×
UNCOV
1265
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
UNCOV
1266
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
UNCOV
1267
        if (pChildParam) {
×
UNCOV
1268
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
UNCOV
1269
          if (pExtWinOp) {
×
UNCOV
1270
            if (pExtWinOp->ExtWins) {
×
UNCOV
1271
              taosArrayDestroy(pExtWinOp->ExtWins);
×
1272
            }
UNCOV
1273
            taosMemoryFree(pExtWinOp);
×
1274
          }
UNCOV
1275
          taosMemoryFree(pChildParam);
×
1276
        }
1277
      }
UNCOV
1278
      taosArrayDestroy((*ppRes)->pChildren);
×
1279
    }
UNCOV
1280
    taosMemoryFree(*ppRes);
×
UNCOV
1281
    *ppRes = NULL;
×
1282
  }
UNCOV
1283
  return code;
×
1284
}
1285

1286
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
78,736✔
1287
  int32_t                   code = TSDB_CODE_SUCCESS;
78,736✔
1288
  int32_t                   lino = 0;
78,736✔
1289
  SOperatorParam*           pParam = NULL;
78,736✔
1290
  SOperatorParam*           pExchangeParam = NULL;
78,736✔
1291
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
78,736✔
1292
  bool                      freeExchange = false;
78,736✔
1293

1294
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
78,736✔
1295
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
78,736✔
1296

1297
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
78,736✔
1298
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
78,736✔
1299

1300
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
78,736✔
1301
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
78,736✔
1302

1303
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
78,736✔
1304
  QUERY_CHECK_CODE(code, lino, _return);
78,736✔
1305

1306
  freeExchange = true;
78,736✔
1307

1308
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
157,472✔
1309

1310
  freeExchange = false;
78,736✔
1311

1312
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
78,736✔
1313
  pParam->downstreamIdx = 0;
78,736✔
1314
  pParam->reUse = false;
78,736✔
1315

1316
  *ppRes = pParam;
78,736✔
1317

1318
  return code;
78,736✔
UNCOV
1319
_return:
×
UNCOV
1320
  if (freeExchange) {
×
UNCOV
1321
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1322
  }
UNCOV
1323
  if (pParam) {
×
UNCOV
1324
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1325
  }
UNCOV
1326
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1327
  return code;
×
1328
}
1329

1330
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
2,295,645✔
1331
  int32_t                   code = TSDB_CODE_SUCCESS;
2,295,645✔
1332
  int32_t                   lino = 0;
2,295,645✔
1333
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,295,645✔
1334
  SOperatorParam*           pParam = NULL;
2,295,645✔
1335
  SOperatorParam*           pExchangeParam = NULL;
2,295,645✔
1336
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
2,295,645✔
1337
  bool                      freeExchange = false;
2,295,645✔
1338
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
2,295,645✔
1339

1340
  if (!pIter) {
2,295,645✔
1341
    *ppRes = NULL;
397,215✔
1342
    return code;
397,215✔
1343
  }
1344

1345
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
1,898,430✔
1346

1347
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
1,898,430✔
1348
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
1,898,430✔
1349

1350
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
1,898,430✔
1351
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
1,898,430✔
1352

1353
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
1,898,430✔
1354
  QUERY_CHECK_CODE(code, lino, _return);
1,898,430✔
1355

1356
  freeExchange = true;
1,898,430✔
1357

1358
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
3,796,860✔
1359

1360
  freeExchange = false;
1,898,430✔
1361

1362
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
1,898,430✔
1363
  pParam->downstreamIdx = 0;
1,898,430✔
1364
  pParam->value = NULL;
1,898,430✔
1365
  pParam->reUse = false;
1,898,430✔
1366

1367
  *ppRes = pParam;
1,898,430✔
1368

1369
  return code;
1,898,430✔
UNCOV
1370
_return:
×
UNCOV
1371
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1372
  if (freeExchange) {
×
UNCOV
1373
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1374
  }
UNCOV
1375
  if (pParam) {
×
UNCOV
1376
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1377
  }
UNCOV
1378
  return code;
×
1379
}
1380

1381
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
2,140,880✔
1382
  int32_t                   code = TSDB_CODE_SUCCESS;
2,140,880✔
1383
  int32_t                   lino = 0;
2,140,880✔
1384
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,140,880✔
1385
  SOperatorParam*           pParam = NULL;
2,140,880✔
1386
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
2,140,880✔
1387
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
2,140,880✔
1388

1389
  if (pIter) {
2,140,880✔
1390
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
1,616,720✔
1391

1392
    code = buildBatchExchangeOperatorParamForVirtual(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
1,616,720✔
1393
    QUERY_CHECK_CODE(code, lino, _return);
1,616,720✔
1394

1395
    *ppRes = pParam;
1,616,720✔
1396
  } else {
1397
    *ppRes = NULL;
524,160✔
1398
  }
1399

1400
  return code;
2,140,880✔
UNCOV
1401
_return:
×
UNCOV
1402
  if (pParam) {
×
UNCOV
1403
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1404
  }
UNCOV
1405
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1406
  return code;
×
1407
}
1408

1409
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,627,526✔
1410
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,627,526✔
1411
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,627,526✔
1412
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,627,526✔
1413
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,627,526✔
1414
  SOperatorParam*            pParam = NULL;
3,627,526✔
1415
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,627,526✔
1416
  if (TSDB_CODE_SUCCESS != code) {
3,627,526✔
UNCOV
1417
    pOperator->pTaskInfo->code = code;
×
UNCOV
1418
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1419
  }
1420

1421
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,627,526✔
1422
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,627,526✔
1423
  if (*ppRes && (code == 0)) {
3,627,526✔
1424
    code = blockDataCheck(*ppRes);
209,432✔
1425
    if (code) {
209,432✔
UNCOV
1426
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
UNCOV
1427
      pOperator->pTaskInfo->code = code;
×
1428
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1429
    }
1430
    pPost->isStarted = true;
209,432✔
1431
    pStbJoin->execInfo.postBlkNum++;
209,432✔
1432
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
209,432✔
1433
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
209,432✔
1434
  } else {
1435
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,418,094✔
1436
  }
1437
}
3,627,526✔
1438

1439

1440
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1441
  SOperatorParam* pGcParam = NULL;
×
UNCOV
1442
  SOperatorParam* pMergeJoinParam = NULL;
×
1443
  int32_t         downstreamId = leftTable ? 0 : 1;
×
UNCOV
1444
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1445
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1446

UNCOV
1447
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1448

UNCOV
1449
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1450
  if (TSDB_CODE_SUCCESS != code) {
×
1451
    return code;
×
1452
  }
1453
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
UNCOV
1454
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1455
    return code;
×
1456
  }
1457

UNCOV
1458
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1459
}
1460

1461
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,627,032✔
1462
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,627,032✔
1463
  int32_t code = 0;
3,627,032✔
1464
  
1465
  pPost->isStarted = false;
3,627,032✔
1466
  
1467
  if (pStbJoin->basic.batchFetch) {
3,627,032✔
1468
    return TSDB_CODE_SUCCESS;
3,624,830✔
1469
  }
1470
  
1471
  if (pPost->leftNeedCache) {
2,202✔
UNCOV
1472
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
UNCOV
1473
    if (num && --(*num) <= 0) {
×
UNCOV
1474
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
UNCOV
1475
      if (code) {
×
UNCOV
1476
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
UNCOV
1477
        QRY_ERR_RET(code);
×
1478
      }
UNCOV
1479
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1480
    }
1481
  }
1482
  
1483
  if (!pPost->rightNeedCache) {
2,202✔
1484
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,202✔
1485
    if (NULL != v) {
2,202✔
UNCOV
1486
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
UNCOV
1487
      if (code) {
×
UNCOV
1488
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1489
        QRY_ERR_RET(code);
×
1490
      }
1491
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1492
    }
1493
  }
1494

1495
  return TSDB_CODE_SUCCESS;
2,202✔
1496
}
1497

1498

1499
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1500
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
280,360✔
1501
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
280,360✔
1502
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
280,360✔
1503

1504
  if (!pPost->isStarted) {
280,360✔
1505
    return TSDB_CODE_SUCCESS;
71,422✔
1506
  }
1507
  
1508
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
208,938✔
1509
  
1510
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
208,938✔
1511
  if (NULL == *ppRes) {
208,938✔
1512
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
208,938✔
1513
    pPrev->pListHead->readIdx++;
208,938✔
1514
  } else {
UNCOV
1515
    pInfo->stbJoin.execInfo.postBlkNum++;
×
UNCOV
1516
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1517
  }
1518

1519
  return TSDB_CODE_SUCCESS;
208,938✔
1520
}
1521

1522
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1523
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,254,600✔
1524
  if (NULL == ppArray) {
7,254,600✔
1525
    SArray* pArray = taosArrayInit(10, valSize);
240,885✔
1526
    if (NULL == pArray) {
240,885✔
UNCOV
1527
      return terrno;
×
1528
    }
1529
    if (NULL == taosArrayPush(pArray, pVal)) {
481,770✔
UNCOV
1530
      taosArrayDestroy(pArray);
×
UNCOV
1531
      return terrno;
×
1532
    }
1533
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
240,885✔
UNCOV
1534
      taosArrayDestroy(pArray);      
×
UNCOV
1535
      return terrno;
×
1536
    }
1537
    return TSDB_CODE_SUCCESS;
240,885✔
1538
  }
1539

1540
  if (NULL == taosArrayPush(*ppArray, pVal)) {
14,027,430✔
1541
    return terrno;
×
1542
  }
1543
  
1544
  return TSDB_CODE_SUCCESS;
7,013,715✔
1545
}
1546

1547
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1548
  int32_t code = TSDB_CODE_SUCCESS;
2,202✔
1549
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,202✔
1550
  if (NULL == pNum) {
2,202✔
1551
    uint32_t n = 1;
2,202✔
1552
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,202✔
1553
    if (code) {
2,202✔
UNCOV
1554
      return code;
×
1555
    }
1556
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,202✔
1557
    if (code) {
2,202✔
UNCOV
1558
      return code;
×
1559
    }
1560
    return TSDB_CODE_SUCCESS;
2,202✔
1561
  }
1562

UNCOV
1563
  switch (*pNum) {
×
UNCOV
1564
    case 0:
×
UNCOV
1565
      break;
×
UNCOV
1566
    case UINT32_MAX:
×
UNCOV
1567
      *pNum = 0;
×
UNCOV
1568
      break;
×
UNCOV
1569
    default:
×
UNCOV
1570
      if (1 == (*pNum)) {
×
1571
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1572
        if (code) {
×
1573
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
UNCOV
1574
          QRY_ERR_RET(code);
×
1575
        }
1576
      }
UNCOV
1577
      (*pNum)++;
×
UNCOV
1578
      break;
×
1579
  }
1580
  
UNCOV
1581
  return TSDB_CODE_SUCCESS;
×
1582
}
1583

1584

1585
static void freeStbJoinTableList(SStbJoinTableList* pList) {
70,928✔
1586
  if (NULL == pList) {
70,928✔
1587
    return;
×
1588
  }
1589
  taosMemoryFree(pList->pLeftVg);
70,928✔
1590
  taosMemoryFree(pList->pLeftUid);
70,928✔
1591
  taosMemoryFree(pList->pRightVg);
70,928✔
1592
  taosMemoryFree(pList->pRightUid);
70,928✔
1593
  taosMemoryFree(pList);
70,928✔
1594
}
1595

1596
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
71,422✔
1597
  int32_t code = TSDB_CODE_SUCCESS;
71,422✔
1598
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
71,422✔
1599
  if (NULL == pNew) {
71,422✔
UNCOV
1600
    return terrno;
×
1601
  }
1602
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
71,422✔
1603
  if (NULL == pNew->pLeftVg) {
71,422✔
UNCOV
1604
    code = terrno;
×
UNCOV
1605
    freeStbJoinTableList(pNew);
×
UNCOV
1606
    return code;
×
1607
  }
1608
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
71,422✔
1609
  if (NULL == pNew->pLeftUid) {
71,422✔
1610
    code = terrno;
×
1611
    freeStbJoinTableList(pNew);
×
1612
    return code;
×
1613
  }
1614
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
71,422✔
1615
  if (NULL == pNew->pRightVg) {
71,422✔
UNCOV
1616
    code = terrno;
×
1617
    freeStbJoinTableList(pNew);
×
UNCOV
1618
    return code;
×
1619
  }
1620
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
71,422✔
1621
  if (NULL == pNew->pRightUid) {
71,422✔
UNCOV
1622
    code = terrno;
×
1623
    freeStbJoinTableList(pNew);
×
1624
    return code;
×
1625
  }
1626

1627
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
71,422✔
1628
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
71,422✔
1629
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
71,422✔
1630
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
71,422✔
1631

1632
  pNew->readIdx = 0;
71,422✔
1633
  pNew->uidNum = rows;
71,422✔
1634
  pNew->pNext = NULL;
71,422✔
1635
  
1636
  if (pCtx->pListTail) {
71,422✔
UNCOV
1637
    pCtx->pListTail->pNext = pNew;
×
UNCOV
1638
    pCtx->pListTail = pNew;
×
1639
  } else {
1640
    pCtx->pListHead = pNew;
71,422✔
1641
    pCtx->pListTail= pNew;
71,422✔
1642
  }
1643

1644
  return TSDB_CODE_SUCCESS;
71,422✔
1645
}
1646

1647
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
71,422✔
1648
  int32_t                    code = TSDB_CODE_SUCCESS;
71,422✔
1649
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
71,422✔
1650
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
71,422✔
1651
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
71,422✔
1652
  if (NULL == pVg0) {
71,422✔
UNCOV
1653
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1654
  }
1655
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
71,422✔
1656
  if (NULL == pVg1) {
71,422✔
1657
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1658
  }
1659
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
71,422✔
1660
  if (NULL == pUid0) {
71,422✔
1661
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1662
  }
1663
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
71,422✔
1664
  if (NULL == pUid1) {
71,422✔
UNCOV
1665
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1666
  }
1667

1668
  if (pStbJoin->basic.batchFetch) {
71,422✔
1669
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,697,621✔
1670
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,627,300✔
1671
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,627,300✔
1672
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,627,300✔
1673
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,627,300✔
1674

1675
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,627,300✔
1676
      if (TSDB_CODE_SUCCESS != code) {
3,627,300✔
UNCOV
1677
        break;
×
1678
      }
1679
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,627,300✔
1680
      if (TSDB_CODE_SUCCESS != code) {
3,627,300✔
UNCOV
1681
        break;
×
1682
      }
1683
    }
1684
  } else {
1685
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,303✔
1686
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,202✔
1687
    
1688
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,202✔
1689
      if (TSDB_CODE_SUCCESS != code) {
2,202✔
UNCOV
1690
        break;
×
1691
      }
1692
    }
1693
  }
1694

1695
  if (TSDB_CODE_SUCCESS == code) {
71,422✔
1696
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
71,422✔
1697
    if (TSDB_CODE_SUCCESS == code) {
71,422✔
1698
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
71,422✔
1699
    }
1700
  }
1701

UNCOV
1702
_return:
×
1703

1704
  if (TSDB_CODE_SUCCESS != code) {
71,422✔
1705
    pOperator->pTaskInfo->code = code;
×
UNCOV
1706
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1707
  }
1708
}
71,422✔
1709

1710

1711
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,131,494✔
1712
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,131,494✔
1713
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,131,494✔
1714

1715
  if (pStbJoin->basic.batchFetch) {
1,131,494✔
1716
    return;
1,130,393✔
1717
  }
1718

1719
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,101✔
1720
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,101✔
1721
    return;
1,101✔
1722
  }
1723

1724
  uint64_t* pUid = NULL;
×
UNCOV
1725
  int32_t iter = 0;
×
UNCOV
1726
  int32_t code = 0;
×
UNCOV
1727
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1728
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
UNCOV
1729
    if (code) {
×
UNCOV
1730
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1731
    }
1732
  }
1733

1734
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1735
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1736

1737
/*
1738
  // debug only
1739
  iter = 0;
1740
  uint32_t* num = NULL;
1741
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1742
    A S S E R T(*num > 1);
1743
  }
1744
*/  
1745
}
1746

1747
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,131,494✔
1748
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,131,494✔
1749
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,131,494✔
1750

1751
  while (true) {
71,422✔
1752
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,202,916✔
1753
    if (NULL == pBlock) {
1,202,916✔
1754
      break;
1,131,494✔
1755
    }
1756

1757
    pStbJoin->execInfo.prevBlkNum++;
71,422✔
1758
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
71,422✔
1759
    
1760
    doBuildStbJoinTableHash(pOperator, pBlock);
71,422✔
1761
  }
1762

1763
  postProcessStbJoinTableHash(pOperator);
1,131,494✔
1764

1765
  pStbJoin->ctx.prev.joinBuild = true;
1,131,494✔
1766
}
1,131,494✔
1767

1768
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
280,360✔
1769
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
280,360✔
1770
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
280,360✔
1771
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
280,360✔
1772
  SStbJoinTableList*         pNode = pPrev->pListHead;
280,360✔
1773

1774
  while (pNode) {
3,769,382✔
1775
    if (pNode->readIdx >= pNode->uidNum) {
3,698,454✔
1776
      pPrev->pListHead = pNode->pNext;
70,928✔
1777
      freeStbJoinTableList(pNode);
70,928✔
1778
      pNode = pPrev->pListHead;
70,928✔
1779
      continue;
70,928✔
1780
    }
1781
    
1782
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,627,526✔
1783
    if (*ppRes) {
3,627,526✔
1784
      return TSDB_CODE_SUCCESS;
209,432✔
1785
    }
1786

1787
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,418,094✔
1788
    pPrev->pListHead->readIdx++;
3,418,094✔
1789
  }
1790

1791
  *ppRes = NULL;
70,928✔
1792
  setOperatorCompleted(pOperator);
70,928✔
1793

1794
  return TSDB_CODE_SUCCESS;
70,928✔
1795
}
1796

1797
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,340,432✔
1798
  if (pBlock) {
1,340,432✔
1799
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
209,432✔
1800
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
209,432✔
1801
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
209,432✔
1802

1803
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
211,634✔
1804
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,202✔
1805
        if (pSlot == NULL) {
2,202✔
UNCOV
1806
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1807
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1808
        }
1809
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,202✔
1810
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,202✔
1811
        if (code != TSDB_CODE_SUCCESS) {
2,202✔
UNCOV
1812
          return code;
×
1813
        }
1814
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,202✔
1815
        if (code != TSDB_CODE_SUCCESS) {
2,202✔
UNCOV
1816
          return code;
×
1817
        }
1818
      }
1819
    } else {
UNCOV
1820
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
UNCOV
1821
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1822
    }
1823
  }
1824
  return TSDB_CODE_SUCCESS;
1,340,432✔
1825
}
1826

1827
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,366,976✔
1828
  int32_t                    code = TSDB_CODE_SUCCESS;
1,366,976✔
1829
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,366,976✔
1830
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,366,976✔
1831

1832
  QRY_PARAM_CHECK(pRes);
1,366,976✔
1833
  if (pOperator->status == OP_EXEC_DONE) {
1,366,976✔
1834
    return code;
26,544✔
1835
  }
1836

1837
  int64_t st = 0;
1,340,432✔
1838
  if (pOperator->cost.openCost == 0) {
1,340,432✔
1839
    st = taosGetTimestampUs();
1,130,998✔
1840
  }
1841

1842
  if (!pStbJoin->ctx.prev.joinBuild) {
1,340,432✔
1843
    buildStbJoinTableList(pOperator);
1,131,494✔
1844
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,131,494✔
1845
      setOperatorCompleted(pOperator);
1,060,072✔
1846
      goto _return;
1,060,072✔
1847
    }
1848
  }
1849

1850
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
280,360✔
1851
  if (*pRes) {
280,360✔
UNCOV
1852
    goto _return;
×
1853
  }
1854

1855
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
280,360✔
1856

1857
_return:
280,360✔
1858
  if (pOperator->cost.openCost == 0) {
1,340,432✔
1859
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,130,998✔
1860
  }
1861

1862
  if (code) {
1,340,432✔
UNCOV
1863
    qError("%s failed since %s", __func__, tstrerror(code));
×
UNCOV
1864
    pOperator->pTaskInfo->code = code;
×
UNCOV
1865
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1866
  } else {
1867
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,340,432✔
1868
  }
1869
  return code;
1,340,432✔
1870
}
1871

1872
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,178,280✔
1873
  int32_t                    lino = 0;
2,178,280✔
1874
  SOperatorInfo*             operator=(SOperatorInfo*) param;
2,178,280✔
1875
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
2,178,280✔
1876

1877
  if (TSDB_CODE_SUCCESS != code) {
2,178,280✔
UNCOV
1878
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
UNCOV
1879
    if (operator->pTaskInfo->code != code) {
×
UNCOV
1880
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1881
             tstrerror(operator->pTaskInfo->code));
1882
    } else {
UNCOV
1883
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1884
    }
UNCOV
1885
    goto _return;
×
1886
  }
1887

1888
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
2,178,280✔
1889
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
2,178,280✔
1890

1891
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
2,178,280✔
1892
  QUERY_CHECK_CODE(code, lino, _return);
2,178,280✔
1893

1894
  taosMemoryFreeClear(pMsg->pData);
2,178,280✔
1895

1896
  code = tsem_post(&pScanResInfo->vtbScan.ready);
2,178,280✔
1897
  QUERY_CHECK_CODE(code, lino, _return);
2,178,280✔
1898

1899
  return code;
2,178,280✔
1900
_return:
×
UNCOV
1901
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1902
  return code;
×
1903
}
1904

1905
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
2,178,280✔
1906
  int32_t                    code = TSDB_CODE_SUCCESS;
2,178,280✔
1907
  int32_t                    lino = 0;
2,178,280✔
1908
  char*                      buf1 = NULL;
2,178,280✔
1909
  SUseDbReq*                 pReq = NULL;
2,178,280✔
1910
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
2,178,280✔
1911

1912
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
2,178,280✔
1913
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
2,178,280✔
1914
  code = tNameGetFullDbName(name, pReq->db);
2,178,280✔
1915
  QUERY_CHECK_CODE(code, lino, _return);
2,178,280✔
1916
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
2,178,280✔
1917
  buf1 = taosMemoryCalloc(1, contLen);
2,178,280✔
1918
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
2,178,280✔
1919
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
2,178,280✔
1920
  if (tempRes < 0) {
2,178,280✔
UNCOV
1921
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1922
  }
1923

1924
  // send the fetch remote task result request
1925
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,178,280✔
1926
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
2,178,280✔
1927

1928
  pMsgSendInfo->param = pOperator;
2,178,280✔
1929
  pMsgSendInfo->msgInfo.pData = buf1;
2,178,280✔
1930
  pMsgSendInfo->msgInfo.len = contLen;
2,178,280✔
1931
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
2,178,280✔
1932
  pMsgSendInfo->fp = dynProcessUseDbRsp;
2,178,280✔
1933
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
2,178,280✔
1934

1935
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
2,178,280✔
1936
  QUERY_CHECK_CODE(code, lino, _return);
2,178,280✔
1937

1938
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
2,178,280✔
1939
  QUERY_CHECK_CODE(code, lino, _return);
2,178,280✔
1940

1941
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
2,178,280✔
1942
  QUERY_CHECK_CODE(code, lino, _return);
2,178,280✔
1943

1944
_return:
2,178,280✔
1945
  if (code) {
2,178,280✔
UNCOV
1946
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1947
     taosMemoryFree(buf1);
×
1948
  }
1949
  taosMemoryFree(pReq);
2,178,280✔
1950
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
2,178,280✔
1951
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
2,178,280✔
1952
  return code;
2,178,280✔
1953
}
1954

1955
int dynVgInfoComp(const void* lp, const void* rp) {
4,347,250✔
1956
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
4,347,250✔
1957
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
4,347,250✔
1958
  if (pLeft->hashBegin < pRight->hashBegin) {
4,347,250✔
1959
    return -1;
4,347,250✔
UNCOV
1960
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
UNCOV
1961
    return 1;
×
1962
  }
1963

UNCOV
1964
  return 0;
×
1965
}
1966

1967
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
13,324,559✔
1968
  if (NULL == dbInfo) {
13,324,559✔
UNCOV
1969
    return TSDB_CODE_SUCCESS;
×
1970
  }
1971

1972
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
13,324,559✔
1973
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
2,178,280✔
1974
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
2,178,280✔
1975
    if (NULL == dbInfo->vgArray) {
2,178,280✔
1976
      return terrno;
×
1977
    }
1978

1979
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
2,178,280✔
1980
    while (pIter) {
6,530,185✔
1981
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
8,703,810✔
1982
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
UNCOV
1983
        return terrno;
×
1984
      }
1985

1986
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
4,351,905✔
1987
    }
1988

1989
    taosArraySort(dbInfo->vgArray, sort_func);
2,178,280✔
1990
  }
1991

1992
  return TSDB_CODE_SUCCESS;
13,324,559✔
1993
}
1994

1995
int32_t dynHashValueComp(void const* lp, void const* rp) {
20,056,199✔
1996
  uint32_t*    key = (uint32_t*)lp;
20,056,199✔
1997
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
20,056,199✔
1998

1999
  if (*key < pVg->hashBegin) {
20,056,199✔
UNCOV
2000
    return -1;
×
2001
  } else if (*key > pVg->hashEnd) {
20,056,199✔
2002
    return 1;
6,731,640✔
2003
  }
2004

2005
  return 0;
13,324,559✔
2006
}
2007

2008
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
13,324,559✔
2009
  int32_t code = 0;
13,324,559✔
2010
  int32_t lino = 0;
13,324,559✔
2011
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
13,324,559✔
2012
  QUERY_CHECK_CODE(code, lino, _return);
13,324,559✔
2013

2014
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
13,324,559✔
2015
  if (vgNum <= 0) {
13,324,559✔
UNCOV
2016
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
UNCOV
2017
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
2018
  }
2019

2020
  SVgroupInfo* vgInfo = NULL;
13,324,559✔
2021
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
13,324,559✔
2022
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
13,324,559✔
2023
  int32_t offset = (int32_t)strlen(tbFullName);
13,324,559✔
2024

2025
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
13,324,559✔
2026
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
26,649,118✔
2027
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
13,324,559✔
2028

2029
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
13,324,559✔
2030
  if (NULL == vgInfo) {
13,324,559✔
UNCOV
2031
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
2032
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
2033
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
2034
  }
2035

2036
  *vgId = vgInfo->vgId;
13,324,559✔
2037

2038
_return:
13,324,559✔
2039
  return code;
13,324,559✔
2040
}
2041

2042
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
39,122,706✔
2043
  int32_t                    code = TSDB_CODE_SUCCESS;
39,122,706✔
2044
  int32_t                    line = 0;
39,122,706✔
2045
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
39,122,706✔
2046
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
39,122,706✔
2047
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
39,122,706✔
2048
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
39,122,706✔
2049
  SUseDbOutput*              output = NULL;
39,122,706✔
2050
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
39,122,706✔
2051

2052
  QRY_PARAM_CHECK(dbVgInfo);
39,122,706✔
2053

2054
  if (find == NULL) {
39,122,706✔
2055
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
2,178,280✔
2056
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
2,178,280✔
2057
    QUERY_CHECK_CODE(code, line, _return);
2,178,280✔
2058
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
2,178,280✔
2059
    QUERY_CHECK_CODE(code, line, _return);
2,178,280✔
2060
  } else {
2061
    output = *find;
36,944,426✔
2062
  }
2063

2064
  *dbVgInfo = output->dbVgroup;
39,122,706✔
2065
  return code;
39,122,706✔
UNCOV
2066
_return:
×
UNCOV
2067
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2068
  freeUseDbOutput(output);
×
UNCOV
2069
  return code;
×
2070
}
2071

2072
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
39,122,706✔
2073
  int32_t     code = TSDB_CODE_SUCCESS;
39,122,706✔
2074
  int32_t     line = 0;
39,122,706✔
2075

2076
  const char *first_dot = strchr(colref, '.');
39,122,706✔
2077
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
39,122,706✔
2078

2079
  const char *second_dot = strchr(first_dot + 1, '.');
39,122,706✔
2080
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
39,122,706✔
2081

2082
  size_t db_len = first_dot - colref;
39,122,706✔
2083
  size_t table_len = second_dot - first_dot - 1;
39,122,706✔
2084
  size_t col_len = strlen(second_dot + 1);
39,122,706✔
2085

2086
  *refDb = taosMemoryMalloc(db_len + 1);
39,122,706✔
2087
  *refTb = taosMemoryMalloc(table_len + 1);
39,122,706✔
2088
  *refCol = taosMemoryMalloc(col_len + 1);
39,122,706✔
2089
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
39,122,706✔
2090
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
39,122,706✔
2091
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
39,122,706✔
2092

2093
  tstrncpy(*refDb, colref, db_len + 1);
39,122,706✔
2094
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
39,122,706✔
2095
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
39,122,706✔
2096

2097
  return TSDB_CODE_SUCCESS;
39,122,706✔
UNCOV
2098
_return:
×
UNCOV
2099
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2100
  if (*refDb) {
×
2101
    taosMemoryFree(*refDb);
×
2102
    *refDb = NULL;
×
2103
  }
UNCOV
2104
  if (*refTb) {
×
UNCOV
2105
    taosMemoryFree(*refTb);
×
UNCOV
2106
    *refTb = NULL;
×
2107
  }
UNCOV
2108
  if (*refCol) {
×
UNCOV
2109
    taosMemoryFree(*refCol);
×
UNCOV
2110
    *refCol = NULL;
×
2111
  }
UNCOV
2112
  return code;
×
2113
}
2114

2115
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
286,484,256✔
2116
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
286,484,256✔
2117
      strlen(expectTbName) == varDataLen(tbName) &&
186,319,786✔
2118
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
186,319,786✔
2119
      strlen(expectDbName) == varDataLen(dbName)) {
186,319,786✔
2120
    return true;
186,319,786✔
2121
  }
2122
  return false;
100,164,470✔
2123
}
2124

2125
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
186,319,786✔
2126
  int32_t          code = TSDB_CODE_SUCCESS;
186,319,786✔
2127
  int32_t          line = 0;
186,319,786✔
2128

2129
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
186,319,786✔
2130
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
186,319,786✔
2131
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
186,319,786✔
2132
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
186,319,786✔
2133
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
186,319,786✔
2134
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
186,319,786✔
2135

2136
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
186,319,786✔
2137
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
186,319,786✔
2138
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
186,319,786✔
2139
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
186,319,786✔
2140
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
186,319,786✔
2141
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
186,319,786✔
2142

2143
  if (colDataIsNull_s(pRefCol, index)) {
372,639,572✔
2144
    pInfo->colrefName = NULL;
68,447,726✔
2145
  } else {
2146
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
117,872,060✔
2147
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
117,872,060✔
2148
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
117,872,060✔
2149
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
117,872,060✔
2150
  }
2151

2152
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
186,319,786✔
2153
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
186,319,786✔
2154
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
186,319,786✔
2155
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
186,319,786✔
2156

2157
  if (!colDataIsNull_s(pUidCol, index)) {
372,639,572✔
2158
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
186,319,786✔
2159
  }
2160
  if (!colDataIsNull_s(pColIdCol, index)) {
372,639,572✔
2161
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
117,872,060✔
2162
  }
2163
  if (!colDataIsNull_s(pVgIdCol, index)) {
372,639,572✔
2164
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
186,319,786✔
2165
  }
2166

UNCOV
2167
_return:
×
2168
  return code;
186,319,786✔
2169
}
2170

2171
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,302,362✔
2172
  int32_t                    code = TSDB_CODE_SUCCESS;
1,302,362✔
2173
  int32_t                    line = 0;
1,302,362✔
2174

2175
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,302,362✔
2176
    return code;
1,196,746✔
2177
  }
2178

2179
  if (pVtbScan->existOrgTbVg == NULL) {
105,616✔
UNCOV
2180
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
UNCOV
2181
    pVtbScan->curOrgTbVg = NULL;
×
2182
  }
2183

2184
  if (pVtbScan->curOrgTbVg != NULL) {
105,616✔
2185
    // which means rversion has changed
2186
    void*   pCurIter = NULL;
9,700✔
2187
    SArray* tmpArray = NULL;
9,700✔
2188
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
28,010✔
2189
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
18,310✔
2190
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
18,310✔
2191
        if (tmpArray == NULL) {
2,310✔
2192
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,310✔
2193
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,310✔
2194
        }
2195
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,310✔
2196
      }
2197
    }
2198
    if (tmpArray == NULL) {
9,700✔
2199
      return TSDB_CODE_SUCCESS;
7,390✔
2200
    }
2201
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,310✔
2202
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,310✔
2203
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,310✔
UNCOV
2204
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
UNCOV
2205
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
UNCOV
2206
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2207
        }
UNCOV
2208
        taosArrayDestroy(expiredInfo);
×
2209
      }
2210
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,310✔
UNCOV
2211
        taosArrayDestroy(tmpArray);
×
2212
      }
2213
    }
2214
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,310✔
2215
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,310✔
2216
    taosHashClear(pVtbScan->curOrgTbVg);
2,310✔
2217
    pVtbScan->needRedeploy = true;
2,310✔
2218
    pVtbScan->rversion = rversion;
2,310✔
2219
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,310✔
2220
  }
2221
  return code;
95,916✔
2222
_return:
×
2223
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2224
  return code;
×
2225
}
2226

2227
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
25,020✔
2228
  int32_t                    code =TSDB_CODE_SUCCESS;
25,020✔
2229
  int32_t                    line = 0;
25,020✔
2230
  char*                      refDbName = NULL;
25,020✔
2231
  char*                      refTbName = NULL;
25,020✔
2232
  char*                      refColName = NULL;
25,020✔
2233
  SDBVgInfo*                 dbVgInfo = NULL;
25,020✔
2234
  SName                      name = {0};
25,020✔
2235
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
25,020✔
2236
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
25,020✔
2237

2238
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
25,020✔
2239
  QUERY_CHECK_CODE(code, line, _return);
25,020✔
2240

2241
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
25,020✔
2242

2243
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
25,020✔
2244
  QUERY_CHECK_CODE(code, line, _return);
25,020✔
2245

2246
  code = tNameGetFullDbName(&name, dbFname);
25,020✔
2247
  QUERY_CHECK_CODE(code, line, _return);
25,020✔
2248

2249
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
25,020✔
2250
  QUERY_CHECK_CODE(code, line, _return);
25,020✔
2251

2252
_return:
25,020✔
2253
  if (code) {
25,020✔
2254
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2255
  }
2256
  taosMemoryFree(refDbName);
25,020✔
2257
  taosMemoryFree(refTbName);
25,020✔
2258
  taosMemoryFree(refColName);
25,020✔
2259
  return code;
25,020✔
2260
}
2261

2262
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
2,140,880✔
2263
  int32_t code = TSDB_CODE_SUCCESS;
2,140,880✔
2264
  int32_t line = 0;
2,140,880✔
2265
  STagVal tagVal = {0};
2,140,880✔
2266
  // last col is uid
2267

2268
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
2,140,880✔
2269
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
2,140,880✔
2270

2271
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
15,426,712✔
2272
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
13,285,832✔
2273
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
13,285,832✔
2274
    tagVal.type = pTagCol->info.type;
13,285,832✔
2275
    tagVal.cid = pTagCol->info.colId;
13,285,832✔
2276
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
26,571,664✔
2277
      char*   pData = colDataGetData(pTagCol, rowIdx);
13,285,832✔
2278
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
13,285,832✔
2279
        tagVal.nData = varDataLen(pData);
5,867,316✔
2280
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
5,867,316✔
2281
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
5,867,316✔
2282
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
5,867,316✔
2283
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
11,734,632✔
2284
      } else {
2285
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
7,418,516✔
2286
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
14,837,032✔
2287
      }
2288
    } else {
UNCOV
2289
      tagVal.pData = NULL;
×
UNCOV
2290
      tagVal.nData = 0;
×
UNCOV
2291
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2292
    }
2293
    tagVal = (STagVal){0};
13,285,832✔
2294
  }
2295
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
2,140,880✔
2296
  QUERY_CHECK_CODE(code, line, _return);
2,140,880✔
2297

2298
  return code;
2,140,880✔
UNCOV
2299
_return:
×
UNCOV
2300
  if (tagVal.pData) {
×
UNCOV
2301
    taosMemoryFreeClear(tagVal.pData);
×
2302
  }
UNCOV
2303
  if (pTagList) {
×
UNCOV
2304
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2305
  }
UNCOV
2306
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
UNCOV
2307
  return code;
×
2308
}
2309

2310
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId,
9,613,699✔
2311
                                          SHashObj** ppRefMap) {
2312
  int32_t                    code = TSDB_CODE_SUCCESS;
9,613,699✔
2313
  int32_t                    line = 0;
9,613,699✔
2314
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
9,613,699✔
2315
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
9,613,699✔
2316
  SDBVgInfo*                 dbVgInfo = NULL;
9,613,699✔
2317
  SHashObj*                  refMap = NULL;
9,613,699✔
2318

2319
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
195,921,725✔
2320
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
186,308,026✔
2321
    *uid = pKV->uid;
186,308,026✔
2322
    *vgId = pKV->vgId;
186,308,026✔
2323
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
186,308,026✔
2324
      char*   refDbName = NULL;
39,097,686✔
2325
      char*   refTbName = NULL;
39,097,686✔
2326
      char*   refColName = NULL;
39,097,686✔
2327
      SName   name = {0};
39,097,686✔
2328
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
39,097,686✔
2329
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
39,097,686✔
2330

2331
      if (ppRefMap != NULL) {
39,097,686✔
2332
        // Track colref -> colId mapping for later slot grouping.
2333
        if (refMap == NULL) {
20,574,505✔
2334
          refMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
3,107,908✔
2335
          QUERY_CHECK_NULL(refMap, code, line, _return, terrno)
3,107,908✔
2336
        }
2337
        code = addRefColIdToRefMap(refMap, pKV->colrefName, pKV->colId);
20,574,505✔
2338
        QUERY_CHECK_CODE(code, line, _return);
20,574,505✔
2339
      }
2340

2341
      // Parse db/tb/col ref and resolve source table vgId.
2342
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
39,097,686✔
2343
      QUERY_CHECK_CODE(code, line, _return);
39,097,686✔
2344

2345
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
39,097,686✔
2346

2347
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
39,097,686✔
2348
      QUERY_CHECK_CODE(code, line, _return);
39,097,686✔
2349
      code = tNameGetFullDbName(&name, dbFname);
39,097,686✔
2350
      QUERY_CHECK_CODE(code, line, _return);
39,097,686✔
2351
      code = tNameGetFullTableName(&name, orgTbFName);
39,097,686✔
2352
      QUERY_CHECK_CODE(code, line, _return);
39,097,686✔
2353

2354
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
39,097,686✔
2355
      if (!pVal) {
39,097,686✔
2356
        SOrgTbInfo orgTbInfo = {0};
13,299,539✔
2357
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
13,299,539✔
2358
        QUERY_CHECK_CODE(code, line, _return);
13,299,539✔
2359
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
13,299,539✔
2360
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
13,299,539✔
2361
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
13,299,539✔
2362
        SColIdNameKV colIdNameKV = {0};
13,299,539✔
2363
        colIdNameKV.colId = pKV->colId;
13,299,539✔
2364
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
13,299,539✔
2365
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
26,599,078✔
2366
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
13,299,539✔
2367
        QUERY_CHECK_CODE(code, line, _return);
13,299,539✔
2368
      } else {
2369
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
25,798,147✔
2370
        SColIdNameKV colIdNameKV = {0};
25,798,147✔
2371
        colIdNameKV.colId = pKV->colId;
25,798,147✔
2372
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
25,798,147✔
2373
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
51,596,294✔
2374
      }
2375
      taosMemoryFree(refDbName);
39,097,686✔
2376
      taosMemoryFree(refTbName);
39,097,686✔
2377
      taosMemoryFree(refColName);
39,097,686✔
2378
    }
2379
  }
2380

2381
  if (ppRefMap != NULL) {
9,613,699✔
2382
    *ppRefMap = refMap;
4,561,034✔
2383
  }
2384

2385
  return code;
9,613,699✔
2386

UNCOV
2387
_return:
×
NEW
2388
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
NEW
2389
  if (refMap) {
×
NEW
2390
    taosHashCleanup(refMap);
×
2391
  }
UNCOV
2392
  return code;
×
2393
}
2394

2395
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
240,380✔
2396
  int32_t                    code = TSDB_CODE_SUCCESS;
240,380✔
2397
  int32_t                    line = 0;
240,380✔
2398
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
240,380✔
2399
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
240,380✔
2400
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
240,380✔
2401
  SArray*                    pColRefArray = NULL;
240,380✔
2402
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
240,380✔
2403
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
240,380✔
2404

2405
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
240,380✔
2406
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
240,380✔
2407
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
240,380✔
2408
  if (hasPartition) {
240,380✔
2409
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
196,560✔
2410
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
196,560✔
2411
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
196,560✔
2412
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
196,560✔
2413
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
196,560✔
2414
  }
2415

2416
  while (true) {
901,726✔
2417
    SSDataBlock *pTagVal = NULL;
1,142,106✔
2418
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
1,142,106✔
2419
    QUERY_CHECK_CODE(code, line, _return);
1,142,106✔
2420
    if (pTagVal == NULL) {
1,142,106✔
2421
      break;
240,380✔
2422
    }
2423
    SHashObj *vtbUidTagListMap = NULL;
901,726✔
2424
    if (hasPartition) {
901,726✔
2425
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
814,086✔
2426
      if (pIter) {
814,086✔
2427
        vtbUidTagListMap = *(SHashObj**)pIter;
8,190✔
2428
      } else {
2429
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
805,896✔
2430
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
805,896✔
2431
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
805,896✔
2432

2433
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
805,896✔
2434
        QUERY_CHECK_CODE(code, line, _return);
805,896✔
2435
      }
2436
    } else {
2437
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
87,640✔
2438
    }
2439

2440
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
901,726✔
2441
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
901,726✔
2442
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
3,042,606✔
2443
      tb_uid_t uid = 0;
2,140,880✔
2444
      if (!colDataIsNull_s(pUidCol, i)) {
4,281,760✔
2445
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
2,140,880✔
2446
        QUERY_CHECK_CODE(code, line, _return);
2,140,880✔
2447
      }
2448

2449
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
2,140,880✔
2450
      QUERY_CHECK_CODE(code, line, _return);
2,140,880✔
2451

2452
      if (hasPartition) {
2,140,880✔
2453
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
1,769,040✔
2454
        QUERY_CHECK_CODE(code, line, _return);
1,769,040✔
2455
      }
2456
    }
2457
  }
2458

2459
  return code;
240,380✔
2460

UNCOV
2461
_return:
×
UNCOV
2462
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2463
  return code;
×
2464
}
2465

2466
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
240,380✔
2467
  int32_t                    code = TSDB_CODE_SUCCESS;
240,380✔
2468
  int32_t                    line = 0;
240,380✔
2469
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
240,380✔
2470
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
240,380✔
2471
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
240,380✔
2472
  SArray*                    pColRefArray = NULL;
240,380✔
2473
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
240,380✔
2474
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
240,380✔
2475

2476
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
240,380✔
2477
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
240,380✔
2478

2479
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
1,857,100✔
2480
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,616,720✔
2481
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,616,720✔
2482

2483
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
1,616,720✔
2484
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
1,616,720✔
2485

2486
    tb_uid_t uid = 0;
1,616,720✔
2487
    int32_t  vgId = 0;
1,616,720✔
2488
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, NULL);
1,616,720✔
2489
    QUERY_CHECK_CODE(code, line, _return);
1,616,720✔
2490

2491
    size_t len = 0;
1,616,720✔
2492
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
1,616,720✔
2493
    while (pOrgTbInfo != NULL) {
3,871,868✔
2494
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
2,255,148✔
2495
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
2,255,148✔
2496

2497
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
2,255,148✔
2498
      if (!pIter) {
2,255,148✔
2499
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
1,853,784✔
2500
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
1,853,784✔
2501
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
3,707,568✔
2502
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
1,853,784✔
2503
        QUERY_CHECK_CODE(code, line, _return);
1,853,784✔
2504
      } else {
2505
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
401,364✔
2506
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
401,364✔
2507
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
401,364✔
2508
      }
2509

2510
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
2,255,148✔
2511

2512
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
2,255,148✔
2513
      QUERY_CHECK_CODE(code, line, _return);
2,255,148✔
2514
    }
2515

2516
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,616,720✔
2517
    QUERY_CHECK_CODE(code, line, _return);
1,616,720✔
2518
  }
2519

2520
  return code;
240,380✔
UNCOV
2521
_return:
×
UNCOV
2522
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2523
  return code;
×
2524
}
2525

2526
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
240,380✔
2527
  int32_t                    code = TSDB_CODE_SUCCESS;
240,380✔
2528
  int32_t                    line = 0;
240,380✔
2529

2530
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
240,380✔
2531
  QUERY_CHECK_CODE(code, line, _return);
240,380✔
2532

2533
  // process tag
2534
  code = getTagBlockAndProcess(pOperator, hasPartition);
240,380✔
2535
  QUERY_CHECK_CODE(code, line, _return);
240,380✔
2536

2537
  return code;
240,380✔
UNCOV
2538
_return:
×
UNCOV
2539
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2540
  return code;
×
2541
}
2542

2543
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
516,030✔
2544
  int32_t                    code = TSDB_CODE_SUCCESS;
516,030✔
2545
  int32_t                    line = 0;
516,030✔
2546
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
516,030✔
2547
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
516,030✔
2548
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
516,030✔
2549
  SArray*                    pColRefArray = NULL;
516,030✔
2550
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
516,030✔
2551
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
516,030✔
2552

2553
  if (hasPartition) {
516,030✔
2554
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
343,650✔
2555
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
343,650✔
2556
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
343,650✔
2557

2558
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
343,650✔
2559
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
343,650✔
2560
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
343,650✔
2561
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
343,650✔
2562
  } else {
2563
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
172,380✔
2564
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
172,380✔
2565
  }
2566

2567
  while (true && hasPartition) {
1,942,227✔
2568
    SSDataBlock* pTagVal = NULL;
1,769,847✔
2569
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
1,769,847✔
2570
    QUERY_CHECK_CODE(code, line, _return);
1,769,847✔
2571
    if (pTagVal == NULL) {
1,769,847✔
2572
      break;
343,650✔
2573
    }
2574

2575
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,426,197✔
2576
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,426,197✔
2577
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
4,496,127✔
2578
      tb_uid_t uid = 0;
3,069,930✔
2579
      if (!colDataIsNull_s(pUidCol, i)) {
6,139,860✔
2580
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
3,069,930✔
2581
        QUERY_CHECK_CODE(code, line, _return);
3,069,930✔
2582
      }
2583
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
3,069,930✔
2584
      QUERY_CHECK_CODE(code, line, _return);
3,069,930✔
2585
    }
2586
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
1,426,197✔
2587
    QUERY_CHECK_CODE(code, line, _return);
1,426,197✔
2588
  }
2589

2590
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
3,951,975✔
2591
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
3,435,945✔
2592
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
3,435,945✔
2593
    tb_uid_t uid = 0;
3,435,945✔
2594
    int32_t  vgId = 0;
3,435,945✔
2595
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, NULL);
3,435,945✔
2596
    QUERY_CHECK_CODE(code, line, _return);
3,435,945✔
2597

2598
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
3,435,945✔
2599
    if (hasPartition) {
3,435,945✔
2600
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
2,545,770✔
2601
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
2,545,770✔
2602

2603
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
2,545,770✔
2604
      if (pHashIter) {
2,545,770✔
2605
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
1,316,133✔
2606
      } else {
2607
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,229,637✔
2608
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,229,637✔
2609
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,229,637✔
2610
        QUERY_CHECK_CODE(code, line, _return);
1,229,637✔
2611
      }
2612
    } else {
2613
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
890,175✔
2614
    }
2615

2616
    size_t len = 0;
3,435,945✔
2617
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
3,435,945✔
2618
    while (pOrgTbInfo != NULL) {
7,732,421✔
2619
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
4,296,476✔
2620
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
4,296,476✔
2621
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
4,296,476✔
2622
      if (!pIter) {
4,296,476✔
2623
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
2,037,765✔
2624
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,037,765✔
2625
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
4,075,530✔
2626
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
2,037,765✔
2627
        QUERY_CHECK_CODE(code, line, _return);
2,037,765✔
2628
      } else {
2629
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
2,258,711✔
2630
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,258,711✔
2631
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
2,258,711✔
2632
      }
2633

2634
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
4,296,476✔
2635

2636
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
4,296,476✔
2637
      QUERY_CHECK_CODE(code, line, _return);
4,296,476✔
2638
    }
2639
  }
2640
  return code;
516,030✔
UNCOV
2641
_return:
×
UNCOV
2642
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2643
  return code;
×
2644
}
2645

2646
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
1,958,656✔
2647
  int32_t                    code = TSDB_CODE_SUCCESS;
1,958,656✔
2648
  int32_t                    line = 0;
1,958,656✔
2649
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,958,656✔
2650
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,958,656✔
2651
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,958,656✔
2652
  SArray*                    pColRefArray = NULL;
1,958,656✔
2653
  SOperatorInfo*             pSystableScanOp = NULL;
1,958,656✔
2654
  
2655
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,958,656✔
2656
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
1,958,656✔
2657

2658
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
1,958,656✔
2659
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
662,766✔
2660
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
662,766✔
2661
    pSystableScanOp = pOperator->pDownstream[0];
662,766✔
2662
  } else if (pInfo->qType == DYN_QTYPE_VTB_WINDOW) {
1,295,890✔
2663
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
93,644✔
2664
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
93,644✔
2665
    pSystableScanOp = pOperator->pDownstream[1];
93,644✔
2666
  } else {
2667
    pSystableScanOp = pOperator->pDownstream[1];
1,202,246✔
2668
  }
2669

2670
  while (true) {
3,919,568✔
2671
    SSDataBlock *pChildInfo = NULL;
5,878,224✔
2672
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
5,878,224✔
2673
    QUERY_CHECK_CODE(code, line, _return);
5,878,224✔
2674
    if (pChildInfo == NULL) {
5,878,224✔
2675
      break;
1,958,656✔
2676
    }
2677
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
3,919,568✔
2678
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
3,919,568✔
2679
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
3,919,568✔
2680

2681
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
3,919,568✔
2682
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
3,919,568✔
2683
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
3,919,568✔
2684

2685
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
283,326,212✔
2686
      if (!colDataIsNull_s(pStbNameCol, i)) {
558,813,288✔
2687
        char* stbrawname = colDataGetData(pStbNameCol, i);
279,406,644✔
2688
        char* dbrawname = colDataGetData(pDbNameCol, i);
279,406,644✔
2689
        char *ctbName = colDataGetData(pTableNameCol, i);
279,406,644✔
2690

2691
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
279,406,644✔
2692
          SColRefInfo info = {0};
185,822,908✔
2693
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
185,822,908✔
2694
          QUERY_CHECK_CODE(code, line, _return);
185,822,908✔
2695

2696
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
185,822,908✔
2697
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
104,980,268✔
2698
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2699
                     pInfo->vtbScan.dynTbUid);
UNCOV
2700
              destroyColRefInfo(&info);
×
UNCOV
2701
              continue;
×
2702
            }
2703

2704
            if (pTaskInfo->pStreamRuntimeInfo) {
104,980,268✔
2705
              if (pVtbScan->curOrgTbVg == NULL) {
33,920✔
2706
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,065✔
2707
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,065✔
2708
              }
2709

2710
              if (info.colrefName) {
33,920✔
2711
                int32_t vgId;
18,300✔
2712
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
18,300✔
2713
                QUERY_CHECK_CODE(code, line, _return);
18,300✔
2714
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
18,300✔
2715
                QUERY_CHECK_CODE(code, line, _return);
18,300✔
2716
              }
2717
            }
2718
          }
2719

2720
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
185,822,908✔
2721
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
9,516,523✔
2722
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
9,516,523✔
2723
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
19,033,046✔
2724
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
9,516,523✔
2725
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
19,033,046✔
2726
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
9,516,523✔
2727
            QUERY_CHECK_CODE(code, line, _return);
9,516,523✔
2728
          } else {
2729
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
176,306,385✔
2730
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
176,306,385✔
2731
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
176,306,385✔
2732
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
176,306,385✔
2733
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
352,612,770✔
2734
          }
2735
        }
2736
      }
2737
    }
2738
  }
2739

2740
  switch (pInfo->qType) {
1,958,656✔
2741
    case DYN_QTYPE_VTB_WINDOW: {
93,644✔
2742
      code = buildOrgTbInfoBatch(pOperator, false);
93,644✔
2743
      break;
93,644✔
2744
    }
2745
    case DYN_QTYPE_VTB_AGG: {
662,766✔
2746
      if (pVtbScan->batchProcessChild) {
662,766✔
2747
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
422,386✔
2748
      } else {
2749
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
240,380✔
2750
      }
2751
      break;
662,766✔
2752
    }
2753
    case DYN_QTYPE_VTB_SCAN: {
1,202,246✔
2754
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,202,246✔
2755
      break;
1,202,246✔
2756
    }
UNCOV
2757
    default: {
×
UNCOV
2758
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
UNCOV
2759
      break;
×
2760
    }
2761
  }
2762

2763
  QUERY_CHECK_CODE(code, line, _return);
1,958,656✔
2764

2765
_return:
1,958,656✔
2766
  if (code) {
1,958,656✔
2767
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,050✔
2768
  }
2769
  return code;
1,958,656✔
2770
}
2771

2772
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
99,923✔
2773
  int32_t                    code = TSDB_CODE_SUCCESS;
99,923✔
2774
  int32_t                    line = 0;
99,923✔
2775
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
99,923✔
2776
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
100,116✔
2777
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
100,116✔
2778
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
100,116✔
2779
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
100,116✔
2780
  int32_t                    rversion = 0;
100,116✔
2781

2782
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
100,116✔
2783
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
100,116✔
2784

2785
  while (true) {
196,642✔
2786
    SSDataBlock *pTableInfo = NULL;
296,758✔
2787
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
296,758✔
2788
    if (pTableInfo == NULL) {
296,758✔
2789
      break;
100,116✔
2790
    }
2791

2792
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
196,642✔
2793
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
196,642✔
2794
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
196,642✔
2795

2796
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
196,642✔
2797
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
196,642✔
2798
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
196,642✔
2799

2800
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
7,274,254✔
2801
      if (!colDataIsNull_s(pRefVerCol, i)) {
14,155,224✔
2802
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
7,077,612✔
2803
      }
2804

2805
      if (!colDataIsNull_s(pTableNameCol, i)) {
14,155,224✔
2806
        char* tbrawname = colDataGetData(pTableNameCol, i);
7,077,612✔
2807
        char* dbrawname = colDataGetData(pDbNameCol, i);
7,077,612✔
2808
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
7,077,612✔
2809
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
7,077,612✔
2810

2811
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
7,077,612✔
2812
          SColRefInfo info = {0};
496,878✔
2813
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
496,878✔
2814
          QUERY_CHECK_CODE(code, line, _return);
496,878✔
2815

2816
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
496,878✔
2817
            if (pVtbScan->curOrgTbVg == NULL) {
6,720✔
2818
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
420✔
2819
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
420✔
2820
            }
2821
            int32_t vgId;
6,720✔
2822
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,720✔
2823
            QUERY_CHECK_CODE(code, line, _return);
6,720✔
2824
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,720✔
2825
            QUERY_CHECK_CODE(code, line, _return);
6,720✔
2826
          }
2827

2828
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
993,756✔
2829
        }
2830
      }
2831
    }
2832
  }
2833
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
100,116✔
2834
  QUERY_CHECK_CODE(code, line, _return);
100,116✔
2835

2836
_return:
98,856✔
2837
  if (code) {
100,116✔
2838
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,260✔
2839
  }
2840
  return code;
100,116✔
2841
}
2842

2843
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,095,170✔
2844
  int32_t                    code = TSDB_CODE_SUCCESS;
2,095,170✔
2845
  int32_t                    line = 0;
2,095,170✔
2846
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,095,170✔
2847
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,095,170✔
2848
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,095,170✔
2849

2850
  SArray *tmpArray = NULL;
2,095,170✔
2851
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,095,170✔
2852
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,095,170✔
2853
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,620✔
2854
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,310✔
2855
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,310✔
2856
      QUERY_CHECK_CODE(code, line, _return);
2,310✔
2857
      if (pVtbScan->newAddedVgInfo == NULL) {
2,310✔
2858
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
840✔
2859
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
840✔
2860
      }
2861
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,310✔
2862
      QUERY_CHECK_CODE(code, line, _return);
2,310✔
2863
    }
2864
    pVtbScan->needRedeploy = false;
2,310✔
2865
  } else {
2866
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,092,860✔
2867
    QUERY_CHECK_CODE(code, line, _return);
2,092,860✔
2868
  }
2869

UNCOV
2870
_return:
×
2871
  taosArrayClear(tmpArray);
2,095,170✔
2872
  taosArrayDestroy(tmpArray);
2,095,170✔
2873
  if (code) {
2,095,170✔
2874
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,092,860✔
2875
  }
2876
  return code;
2,095,170✔
2877
}
2878

2879
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
4,561,034✔
2880
  int32_t                    code = TSDB_CODE_SUCCESS;
4,561,034✔
2881
  int32_t                    line = 0;
4,561,034✔
2882
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,561,034✔
2883
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,561,034✔
2884
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,561,034✔
2885

2886
  pVtbScan->vtbScanParam = NULL;
4,561,034✔
2887
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
4,561,034✔
2888
  QUERY_CHECK_CODE(code, line, _return);
4,561,034✔
2889

2890
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
4,561,034✔
2891
  while (pIter != NULL) {
11,308,949✔
2892
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
6,747,915✔
2893
    SOperatorParam*  pExchangeParam = NULL;
6,747,915✔
2894
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
6,747,915✔
2895
    if (addr != NULL) {
6,747,915✔
2896
      SDownstreamSourceNode newSource = {0};
2,310✔
2897
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,310✔
2898
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,310✔
2899
      newSource.taskId = addr->taskId;
2,310✔
2900
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,310✔
2901
      newSource.localExec = false;
2,310✔
2902
      newSource.addr.nodeId = addr->nodeId;
2,310✔
2903
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,310✔
2904

2905
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,310✔
2906
      QUERY_CHECK_CODE(code, line, _return);
2,310✔
2907
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,310✔
2908
      QUERY_CHECK_CODE(code, line, _return);
2,310✔
2909
    } else {
2910
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
6,745,605✔
2911
      QUERY_CHECK_CODE(code, line, _return);
6,745,605✔
2912
    }
2913
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
13,495,830✔
2914
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
6,747,915✔
2915
  }
2916

2917
  SOperatorParam*  pExchangeParam = NULL;
4,561,034✔
2918
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
4,561,034✔
2919
  QUERY_CHECK_CODE(code, line, _return);
4,561,034✔
2920
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
4,561,034✔
2921

2922
_return:
4,561,034✔
2923
  if (code) {
4,561,034✔
UNCOV
2924
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2925
  }
2926
  return code;
4,561,034✔
2927
}
2928

2929
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
12,224,786✔
2930
  int32_t                    code = TSDB_CODE_SUCCESS;
12,224,786✔
2931
  int32_t                    line = 0;
12,224,786✔
2932
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,224,786✔
2933
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,224,786✔
2934
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
12,224,786✔
2935

2936
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
12,224,786✔
2937
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
12,224,786✔
2938
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
12,224,786✔
2939

2940
  while (true) {
2941
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
15,526,052✔
2942
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
10,965,018✔
2943
      QUERY_CHECK_CODE(code, line, _return);
10,965,018✔
2944
    } else {
2945
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
4,561,034✔
2946
      SArray* pColRefInfo = NULL;
4,561,034✔
2947
      if (pVtbScan->isSuperTable) {
4,561,034✔
2948
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
4,462,178✔
2949
      } else {
2950
        pColRefInfo = pInfo->vtbScan.colRefInfo;
98,856✔
2951
      }
2952
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
4,561,034✔
2953

2954
      tb_uid_t  uid = 0;
4,561,034✔
2955
      int32_t   vgId = 0;
4,561,034✔
2956
      SHashObj* refMap = NULL;
4,561,034✔
2957
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, &refMap);
4,561,034✔
2958
      QUERY_CHECK_CODE(code, line, _return);
4,561,034✔
2959

2960
      qDebug("virtual table scan process subtable idx:%d uid:%" PRIu64 " vgId:%d", pVtbScan->curTableIdx, uid, vgId);
4,561,034✔
2961

2962
      code = buildRefSlotGroupsFromRefMap(refMap, pVtbScan->readColList, &pVtbScan->refColGroups);
4,561,034✔
2963
      QUERY_CHECK_CODE(code, line, _return);
4,561,034✔
2964

2965
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
4,561,034✔
2966
      QUERY_CHECK_CODE(code, line, _return);
4,561,034✔
2967

2968
      // reset downstream operator's status
2969
      pVtbScanOp->status = OP_NOT_OPENED;
4,561,034✔
2970
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
4,561,034✔
2971
      QUERY_CHECK_CODE(code, line, _return);
4,560,408✔
2972
    }
2973

2974
    if (*pRes) {
15,525,426✔
2975
      // has result, still read data from this table.
2976
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
10,969,526✔
2977
      break;
10,969,526✔
2978
    } else {
2979
      // no result, read next table.
2980
      pVtbScan->curTableIdx++;
4,555,900✔
2981
      if (pVtbScan->isSuperTable) {
4,555,900✔
2982
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
4,457,044✔
2983
          setOperatorCompleted(pOperator);
1,155,778✔
2984
          break;
1,155,778✔
2985
        }
2986
      } else {
2987
        setOperatorCompleted(pOperator);
98,856✔
2988
        break;
98,856✔
2989
      }
2990
    }
2991
  }
2992

2993
_return:
12,224,160✔
2994
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
12,224,160✔
2995
  pVtbScan->otbNameToOtbInfoMap = NULL;
12,224,160✔
2996
  if (code) {
12,224,160✔
UNCOV
2997
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2998
  }
2999
  return code;
12,224,160✔
3000
}
3001

3002
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
12,267,380✔
3003
  int32_t                    code = TSDB_CODE_SUCCESS;
12,267,380✔
3004
  int32_t                    line = 0;
12,267,380✔
3005
  int64_t                    st = 0;
12,267,380✔
3006
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,267,380✔
3007
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,267,380✔
3008

3009
  if (OPTR_IS_OPENED(pOperator)) {
12,267,380✔
3010
    return code;
10,965,018✔
3011
  }
3012

3013
  if (pOperator->cost.openCost == 0) {
1,302,362✔
3014
    st = taosGetTimestampUs();
1,227,224✔
3015
  }
3016

3017
  if (pVtbScan->isSuperTable) {
1,302,362✔
3018
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,202,246✔
3019
    QUERY_CHECK_CODE(code, line, _return);
1,202,246✔
3020
  } else {
3021
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
99,923✔
3022
    QUERY_CHECK_CODE(code, line, _return);
100,116✔
3023
  }
3024

3025
  OPTR_SET_OPENED(pOperator);
1,300,052✔
3026

3027
_return:
1,302,362✔
3028
  if (pOperator->cost.openCost == 0) {
1,302,362✔
3029
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,227,417✔
3030
  }
3031
  if (code) {
1,302,362✔
3032
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,310✔
3033
    pOperator->pTaskInfo->code = code;
2,310✔
3034
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,310✔
3035
  }
3036
  return code;
1,300,052✔
3037
}
3038

3039
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
14,360,240✔
3040
  int32_t                    code = TSDB_CODE_SUCCESS;
14,360,240✔
3041
  int32_t                    line = 0;
14,360,240✔
3042
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
14,360,240✔
3043
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
14,360,240✔
3044

3045
  QRY_PARAM_CHECK(pRes);
14,360,240✔
3046
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
14,360,240✔
UNCOV
3047
    return code;
×
3048
  }
3049
  if (pOperator->pOperatorGetParam) {
14,360,240✔
UNCOV
3050
    if (pOperator->status == OP_EXEC_DONE) {
×
3051
      pOperator->status = OP_OPENED;
×
3052
    }
UNCOV
3053
    pVtbScan->curTableIdx = 0;
×
UNCOV
3054
    pVtbScan->lastTableIdx = -1;
×
UNCOV
3055
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
UNCOV
3056
    pOperator->pOperatorGetParam = NULL;
×
3057
  } else {
3058
    pVtbScan->window.skey = INT64_MAX;
14,360,240✔
3059
    pVtbScan->window.ekey = INT64_MIN;
14,360,240✔
3060
  }
3061

3062
  if (pVtbScan->needRedeploy) {
14,360,240✔
3063
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,095,170✔
3064
    QUERY_CHECK_CODE(code, line, _return);
2,095,170✔
3065
  }
3066

3067
  code = pOperator->fpSet._openFn(pOperator);
12,267,380✔
3068
  QUERY_CHECK_CODE(code, line, _return);
12,265,070✔
3069

3070
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
12,265,070✔
3071
    setOperatorCompleted(pOperator);
40,284✔
3072
    return code;
40,284✔
3073
  }
3074

3075
  code = virtualTableScanGetNext(pOperator, pRes);
12,224,786✔
3076
  QUERY_CHECK_CODE(code, line, _return);
12,224,160✔
3077

3078
  return code;
12,224,160✔
3079

3080
_return:
2,092,860✔
3081
  if (code) {
2,092,860✔
3082
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,092,860✔
3083
    pOperator->pTaskInfo->code = code;
2,092,650✔
3084
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,092,860✔
3085
  }
UNCOV
3086
  return code;
×
3087
}
3088

3089
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,131,742✔
3090
  if (batchFetch) {
1,131,742✔
3091
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,130,641✔
3092
    if (NULL == pPrev->leftHash) {
1,130,641✔
UNCOV
3093
      return terrno;
×
3094
    }
3095
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,130,641✔
3096
    if (NULL == pPrev->rightHash) {
1,130,641✔
UNCOV
3097
      return terrno;
×
3098
    }
3099
  } else {
3100
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,101✔
3101
    if (NULL == pPrev->leftCache) {
1,101✔
UNCOV
3102
      return terrno;
×
3103
    }
3104
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,101✔
3105
    if (NULL == pPrev->rightCache) {
1,101✔
UNCOV
3106
      return terrno;
×
3107
    }
3108
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,101✔
3109
    if (NULL == pPrev->onceTable) {
1,101✔
UNCOV
3110
      return terrno;
×
3111
    }
3112
  }
3113

3114
  return TSDB_CODE_SUCCESS;
1,131,742✔
3115
}
3116

UNCOV
3117
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
UNCOV
3118
  if (pStreamRuntimeInfo == NULL) {
×
UNCOV
3119
    return;
×
3120
  }
3121

UNCOV
3122
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
UNCOV
3123
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
UNCOV
3124
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
UNCOV
3125
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
UNCOV
3126
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
3127

UNCOV
3128
      pVtbScan->dynTbUid = pValue->uid;
×
UNCOV
3129
      break;
×
3130
    }
3131
  }
3132
}
3133

3134
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
2,265,374✔
3135
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
3136
  int32_t      code = TSDB_CODE_SUCCESS;
2,265,374✔
3137
  int32_t      line = 0;
2,265,374✔
3138

3139
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
2,265,374✔
3140
  QUERY_CHECK_CODE(code, line, _return);
2,265,374✔
3141

3142
  pInfo->vtbScan.genNewParam = true;
2,265,374✔
3143
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
2,265,374✔
3144
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
2,265,374✔
3145
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
2,265,374✔
3146
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
2,265,374✔
3147
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
2,265,374✔
3148
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
2,265,374✔
3149
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
2,265,374✔
3150
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
2,265,374✔
3151
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
2,265,374✔
3152
  pInfo->vtbScan.needRedeploy = false;
2,265,374✔
3153
  pInfo->vtbScan.pMsgCb = pMsgCb;
2,265,374✔
3154
  pInfo->vtbScan.curTableIdx = 0;
2,265,374✔
3155
  pInfo->vtbScan.lastTableIdx = -1;
2,265,374✔
3156
  pInfo->vtbScan.dynTbUid = 0;
2,265,181✔
3157
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
2,265,374✔
3158
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
2,265,374✔
3159
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
2,265,374✔
3160
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
2,265,374✔
3161
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,265,374✔
3162
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
2,265,374✔
3163
  SNode* node = NULL;
2,265,374✔
3164
  FOREACH(node, pPhyciNode->vtbScan.pOrgVgIds) {
7,360,389✔
3165
    SValueNode* valueNode = (SValueNode*)node;
5,095,015✔
3166
    int32_t vgId = (int32_t)valueNode->datum.i;
5,095,015✔
3167
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
5,095,015✔
3168
    QUERY_CHECK_CODE(code, line, _return);
5,095,015✔
3169
  }
3170

3171
  if (pPhyciNode->dynTbname && pTaskInfo) {
2,265,374✔
UNCOV
3172
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3173
  }
3174

3175
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
2,265,374✔
3176
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
2,265,374✔
3177

3178
  SNode* colNode = NULL;
2,265,374✔
3179
  FOREACH(colNode, pPhyciNode->vtbScan.pScanCols) {
20,012,768✔
3180
    SColumnNode* pNode = (SColumnNode*)colNode;
17,747,394✔
3181
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
17,747,394✔
3182
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
35,494,788✔
3183
  }
3184

3185
  pInfo->vtbScan.readColSet =
2,265,374✔
3186
      taosHashInit(taosArrayGetSize(pInfo->vtbScan.readColList) > 0 ? taosArrayGetSize(pInfo->vtbScan.readColList) : 1,
2,265,374✔
3187
                   taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), true, HASH_NO_LOCK);
3188
  QUERY_CHECK_NULL(pInfo->vtbScan.readColSet, code, line, _return, terrno)
2,265,374✔
3189
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->vtbScan.readColList); i++) {
20,012,768✔
3190
    col_id_t colId = *(col_id_t*)taosArrayGet(pInfo->vtbScan.readColList, i);
17,747,201✔
3191
    code = taosHashPut(pInfo->vtbScan.readColSet, &colId, sizeof(colId), NULL, 0);
17,747,394✔
3192
    QUERY_CHECK_CODE(code, line, _return);
17,747,394✔
3193
  }
3194

3195
  pInfo->vtbScan.refColGroups = NULL;
2,265,374✔
3196

3197
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
2,265,374✔
3198
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
2,265,374✔
3199

3200
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,265,374✔
3201
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
2,265,374✔
3202

3203
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
2,265,374✔
3204
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
2,265,374✔
3205
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
2,265,374✔
3206
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
2,265,374✔
3207
  pInfo->vtbScan.vtbUidTagListMap = NULL;
2,265,374✔
3208
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
2,265,374✔
3209
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
2,265,374✔
3210

3211
  return code;
2,265,374✔
UNCOV
3212
_return:
×
3213
  // no need to destroy array and hashmap allocated in this function,
3214
  // since the operator's destroy function will take care of it
UNCOV
3215
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
3216
  return code;
×
3217
}
3218

3219
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
375,191✔
3220
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3221
  int32_t              code = TSDB_CODE_SUCCESS;
375,191✔
3222
  int32_t              line = 0;
375,191✔
3223
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
375,191✔
3224

3225
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
375,191✔
3226
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
375,191✔
3227
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
375,191✔
3228
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
375,191✔
3229
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
375,191✔
3230
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
375,191✔
3231

3232
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
375,191✔
3233
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
375,191✔
3234

3235
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
375,191✔
3236
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
375,191✔
3237

3238
  pInfo->vtbWindow.outputWstartSlotId = -1;
375,191✔
3239
  pInfo->vtbWindow.outputWendSlotId = -1;
375,191✔
3240
  pInfo->vtbWindow.outputWdurationSlotId = -1;
375,191✔
3241
  pInfo->vtbWindow.curWinBatchIdx = 0;
375,191✔
3242

3243
  initResultSizeInfo(&pOperator->resultInfo, 1);
375,191✔
3244
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
375,191✔
3245
  QUERY_CHECK_CODE(code, line, _return);
375,191✔
3246

3247
  return code;
375,191✔
3248
_return:
×
3249
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3250
  return code;
×
3251
}
3252

3253
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
1,875,340✔
3254
  int32_t code = TSDB_CODE_SUCCESS;
1,875,340✔
3255
  int32_t lino = 0;
1,875,340✔
3256

3257
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
1,875,340✔
3258
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
1,875,340✔
3259
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
1,875,340✔
3260

3261
    *ppTsCols = (int64_t*)pColDataInfo->pData;
1,875,340✔
3262

3263
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1,875,340✔
3264
      code = blockDataUpdateTsWindow(pBlock, slotId);
182,960✔
3265
      QUERY_CHECK_CODE(code, lino, _return);
182,960✔
3266
    }
3267
  }
3268

3269
  return code;
1,875,340✔
UNCOV
3270
_return:
×
UNCOV
3271
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3272
  return code;
×
3273
}
3274

3275
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
398,061✔
3276
  int32_t                    code = TSDB_CODE_SUCCESS;
398,061✔
3277
  int32_t                    lino = 0;
398,061✔
3278
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
398,061✔
3279
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
398,061✔
3280
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
398,061✔
3281
  int64_t                    st = 0;
398,061✔
3282

3283
  if (OPTR_IS_OPENED(pOperator)) {
398,061✔
3284
    return code;
22,870✔
3285
  }
3286

3287
  if (pOperator->cost.openCost == 0) {
375,191✔
3288
    st = taosGetTimestampUs();
375,191✔
3289
  }
3290

3291
  while (1) {
937,670✔
3292
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,312,861✔
3293
    if (pBlock == NULL) {
1,312,861✔
3294
      break;
375,191✔
3295
    }
3296

3297
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
937,670✔
3298
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
2,708,696✔
3299
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
2,333,505✔
3300
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
2,333,505✔
3301
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
613,117✔
3302
            pInfo->outputWstartSlotId = i;
228,775✔
3303
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
384,342✔
3304
            pInfo->outputWendSlotId = i;
228,775✔
3305
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
155,567✔
3306
            pInfo->outputWdurationSlotId = i;
155,567✔
3307
          }
3308
        }
3309
      }
3310
    }
3311

3312
    TSKEY* wstartCol = NULL;
937,670✔
3313
    TSKEY* wendCol = NULL;
937,670✔
3314

3315
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
937,670✔
3316
    QUERY_CHECK_CODE(code, lino, _return);
937,670✔
3317
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
937,670✔
3318
    QUERY_CHECK_CODE(code, lino, _return);
937,670✔
3319

3320
    SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
937,670✔
3321
    QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
937,670✔
3322

3323
    QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
937,670✔
3324

3325
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
3326
      SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
2,147,483,647✔
3327
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
2,147,483,647✔
3328
      pWindow->tw.skey = wstartCol[i];
2,147,483,647✔
3329
      pWindow->tw.ekey = wendCol[i] + 1;
2,147,483,647✔
3330
      pWindow->winOutIdx = -1;
2,147,483,647✔
3331
    }
3332

3333
    QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
1,875,340✔
3334
  }
3335

3336
  // handle first window's start key and last window's end key
3337
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
375,191✔
3338
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
375,191✔
3339

3340
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
375,191✔
3341
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
375,191✔
3342

3343
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
375,191✔
3344
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
375,191✔
3345

3346
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
375,191✔
3347
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
375,191✔
3348

3349
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
375,191✔
3350
    lastWin->tw.ekey = INT64_MAX;
93,849✔
3351
  }
3352
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
375,191✔
3353
    firstWin->tw.skey = INT64_MIN;
140,671✔
3354
  }
3355

3356
  if (pInfo->isVstb) {
375,191✔
3357
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
93,644✔
3358
    QUERY_CHECK_CODE(code, lino, _return);
93,644✔
3359
  }
3360

3361
  OPTR_SET_OPENED(pOperator);
375,191✔
3362

3363
  if (pOperator->cost.openCost == 0) {
375,191✔
3364
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
375,191✔
3365
  }
3366

3367
_return:
×
3368
  if (code != TSDB_CODE_SUCCESS) {
375,191✔
UNCOV
3369
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3370
    pTaskInfo->code = code;
×
UNCOV
3371
    T_LONG_JMP(pTaskInfo->env, code);
×
3372
  }
3373
  return code;
375,191✔
3374
}
3375

3376
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
100,496✔
3377
  int32_t                       code = TSDB_CODE_SUCCESS;
100,496✔
3378
  int32_t                       lino = 0;
100,496✔
3379
  SExternalWindowOperatorParam* pExtWinOp = NULL;
100,496✔
3380

3381
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
100,496✔
3382
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
100,496✔
3383

3384
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
100,496✔
3385
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
100,496✔
3386

3387
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
100,496✔
3388
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
100,496✔
3389

3390
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
100,496✔
3391
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGetLast(pWins);
100,496✔
3392

3393
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
100,496✔
3394
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
100,496✔
3395

3396
  SOperatorParam* pExchangeParam = NULL;
100,496✔
3397
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pInfo->vtbScan.otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey}, EX_SRC_TYPE_VSTB_WIN_SCAN);
100,496✔
3398
  QUERY_CHECK_CODE(code, lino, _return);
100,496✔
3399

3400
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
200,992✔
3401

3402
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
100,496✔
3403
  (*ppRes)->downstreamIdx = idx;
100,496✔
3404
  (*ppRes)->value = pExtWinOp;
100,496✔
3405
  (*ppRes)->reUse = false;
100,496✔
3406

3407
  return code;
100,496✔
UNCOV
3408
_return:
×
UNCOV
3409
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3410
  if (pExtWinOp) {
×
3411
    if (pExtWinOp->ExtWins) {
×
UNCOV
3412
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3413
    }
UNCOV
3414
    taosMemoryFree(pExtWinOp);
×
3415
  }
UNCOV
3416
  if (*ppRes) {
×
UNCOV
3417
    if ((*ppRes)->pChildren) {
×
UNCOV
3418
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
UNCOV
3419
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
UNCOV
3420
        if (pChildParam) {
×
UNCOV
3421
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
UNCOV
3422
          if (pDynParam) {
×
UNCOV
3423
            taosMemoryFree(pDynParam);
×
3424
          }
UNCOV
3425
          taosMemoryFree(pChildParam);
×
3426
        }
3427
      }
UNCOV
3428
      taosArrayDestroy((*ppRes)->pChildren);
×
3429
    }
UNCOV
3430
    taosMemoryFree(*ppRes);
×
UNCOV
3431
    *ppRes = NULL;
×
3432
  }
UNCOV
3433
  return code;
×
3434
}
3435

3436
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
398,061✔
3437
  int32_t                    code = TSDB_CODE_SUCCESS;
398,061✔
3438
  int32_t                    lino = 0;
398,061✔
3439
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
398,061✔
3440
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
398,061✔
3441
  int64_t                    st = taosGetTimestampUs();
398,061✔
3442
  int32_t                    numOfWins = 0;
398,061✔
3443
  SOperatorInfo*             mergeOp = NULL;
398,061✔
3444
  SOperatorInfo*             extWinOp = NULL;
398,061✔
3445
  SOperatorParam*            pMergeParam = NULL;
398,061✔
3446
  SOperatorParam*            pExtWinParam = NULL;
398,061✔
3447
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
398,061✔
3448
  SSDataBlock*               pRes = pInfo->pRes;
398,061✔
3449

3450
  code = pOperator->fpSet._openFn(pOperator);
398,061✔
3451
  QUERY_CHECK_CODE(code, lino, _return);
398,061✔
3452

3453
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
398,061✔
3454
    *ppRes = NULL;
9,151✔
3455
    return code;
9,151✔
3456
  }
3457

3458
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
388,910✔
3459
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
388,910✔
3460

3461
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
388,910✔
3462

3463
  if (pInfo->isVstb) {
388,910✔
3464
    extWinOp = pOperator->pDownstream[2];
100,496✔
3465
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
100,496✔
3466
    QUERY_CHECK_CODE(code, lino, _return);
100,496✔
3467

3468
    SSDataBlock* pExtWinBlock = NULL;
100,496✔
3469
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
100,496✔
3470
    QUERY_CHECK_CODE(code, lino, _return);
100,496✔
3471
    setOperatorCompleted(extWinOp);
100,496✔
3472

3473
    blockDataCleanup(pRes);
100,496✔
3474
    code = blockDataEnsureCapacity(pRes, numOfWins);
100,496✔
3475
    QUERY_CHECK_CODE(code, lino, _return);
100,496✔
3476

3477
    if (pExtWinBlock) {
100,496✔
3478
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
100,496✔
3479
      QUERY_CHECK_CODE(code, lino, _return);
100,496✔
3480

3481
      if (pInfo->curWinBatchIdx == 0) {
100,496✔
3482
        // first batch, get _wstart from pMergedBlock
3483
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
93,644✔
3484
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
93,644✔
3485

3486
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
93,644✔
3487
      }
3488
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
100,496✔
3489
        // last batch, get _wend from pMergedBlock
3490
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
2,284✔
3491
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
2,284✔
3492

3493
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
2,284✔
3494
      }
3495
    }
3496
  } else {
3497
    mergeOp = pOperator->pDownstream[1];
288,414✔
3498
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
288,414✔
3499
    QUERY_CHECK_CODE(code, lino, _return);
288,414✔
3500

3501
    SSDataBlock* pMergedBlock = NULL;
288,414✔
3502
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
288,414✔
3503
    QUERY_CHECK_CODE(code, lino, _return);
288,414✔
3504

3505
    blockDataCleanup(pRes);
288,414✔
3506
    code = blockDataEnsureCapacity(pRes, numOfWins);
288,414✔
3507
    QUERY_CHECK_CODE(code, lino, _return);
288,414✔
3508

3509
    if (pMergedBlock) {
288,414✔
3510
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
288,414✔
3511
      QUERY_CHECK_CODE(code, lino, _return);
288,414✔
3512

3513
      if (pInfo->curWinBatchIdx == 0) {
288,414✔
3514
        // first batch, get _wstart from pMergedBlock
3515
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
281,547✔
3516
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
281,547✔
3517

3518
        firstWin->tw.skey = pMergedBlock->info.window.skey;
281,547✔
3519
      }
3520
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
288,414✔
3521
        // last batch, get _wend from pMergedBlock
3522
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
6,867✔
3523
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
6,867✔
3524

3525
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
6,867✔
3526
      }
3527
    }
3528
  }
3529

3530

3531
  if (pInfo->outputWstartSlotId != -1) {
388,910✔
3532
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
242,494✔
3533
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
242,494✔
3534

3535
    for (int32_t i = 0; i < numOfWins; i++) {
732,900,873✔
3536
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
732,658,379✔
3537
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
732,658,379✔
3538
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
732,658,379✔
3539
      QUERY_CHECK_CODE(code, lino, _return);
732,658,379✔
3540
    }
3541
  }
3542
  if (pInfo->outputWendSlotId != -1) {
388,910✔
3543
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
242,494✔
3544
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
242,494✔
3545

3546
    for (int32_t i = 0; i < numOfWins; i++) {
732,900,873✔
3547
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
732,658,379✔
3548
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
732,658,379✔
3549
      TSKEY ekey = pWindow->tw.ekey - 1;
732,658,379✔
3550
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
732,658,379✔
3551
      QUERY_CHECK_CODE(code, lino, _return);
732,658,379✔
3552
    }
3553
  }
3554
  if (pInfo->outputWdurationSlotId != -1) {
388,910✔
3555
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
169,286✔
3556
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
169,286✔
3557

3558
    for (int32_t i = 0; i < numOfWins; i++) {
507,932,689✔
3559
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
507,763,403✔
3560
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
507,763,403✔
3561
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
507,763,403✔
3562
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
507,763,403✔
3563
      QUERY_CHECK_CODE(code, lino, _return);
507,763,403✔
3564
    }
3565
  }
3566

3567
  pRes->info.rows = numOfWins;
388,910✔
3568
  *ppRes = pRes;
388,910✔
3569
  pInfo->curWinBatchIdx++;
388,910✔
3570

3571
  return code;
388,910✔
3572

UNCOV
3573
_return:
×
UNCOV
3574
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3575
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3576
    pTaskInfo->code = code;
×
UNCOV
3577
    T_LONG_JMP(pTaskInfo->env, code);
×
3578
  }
UNCOV
3579
  return code;
×
3580
}
3581

3582
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,269,105✔
3583
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
2,269,105✔
3584
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
2,269,945✔
3585
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
2,270,155✔
3586

3587
  pOper->status = OP_NOT_OPENED;
2,270,155✔
3588

3589
  switch (pDyn->qType) {
2,269,945✔
3590
    case DYN_QTYPE_STB_HASH:{
744✔
3591
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
744✔
3592
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
744✔
3593
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
744✔
3594
      
3595
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
744✔
3596
      if (TSDB_CODE_SUCCESS != code) {
744✔
UNCOV
3597
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
UNCOV
3598
        return code;
×
3599
      }
3600
      pStbJoin->ctx.prev.pListHead = NULL;
744✔
3601
      pStbJoin->ctx.prev.joinBuild = false;
744✔
3602
      pStbJoin->ctx.prev.pListTail = NULL;
744✔
3603
      pStbJoin->ctx.prev.tableNum = 0;
744✔
3604

3605
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
744✔
3606
      break; 
744✔
3607
    }
3608
    case DYN_QTYPE_VTB_SCAN: {
2,268,571✔
3609
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,268,571✔
3610
      
3611
      if (pVtbScan->otbNameToOtbInfoMap) {
2,268,991✔
3612
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3613
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3614
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3615
      }
3616
      if (pVtbScan->pRsp) {
2,268,991✔
3617
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3618
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3619
      }
3620
      if (pVtbScan->colRefInfo) {
2,268,991✔
3621
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
100,116✔
3622
        pVtbScan->colRefInfo = NULL;
100,116✔
3623
      }
3624
      if (pVtbScan->childTableMap) {
2,268,991✔
3625
        taosHashCleanup(pVtbScan->childTableMap);
5,500✔
3626
        pVtbScan->childTableMap = NULL;
5,500✔
3627
      }
3628
      if (pVtbScan->childTableList) {
2,268,991✔
3629
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,268,151✔
3630
      }
3631
      if (pPhyciNode->dynTbname && pTaskInfo) {
2,269,411✔
UNCOV
3632
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3633
      }
3634
      pVtbScan->curTableIdx = 0;
2,268,781✔
3635
      pVtbScan->lastTableIdx = -1;
2,268,991✔
3636
      break;
2,268,991✔
3637
    }
UNCOV
3638
    case DYN_QTYPE_VTB_WINDOW: {
×
UNCOV
3639
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
UNCOV
3640
      if (pVtbWindow->pRes) {
×
UNCOV
3641
        blockDataDestroy(pVtbWindow->pRes);
×
UNCOV
3642
        pVtbWindow->pRes = NULL;
×
3643
      }
UNCOV
3644
      if (pVtbWindow->pWins) {
×
UNCOV
3645
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
UNCOV
3646
        pVtbWindow->pWins = NULL;
×
3647
      }
UNCOV
3648
      pVtbWindow->outputWdurationSlotId = -1;
×
UNCOV
3649
      pVtbWindow->outputWendSlotId = -1;
×
UNCOV
3650
      pVtbWindow->outputWstartSlotId = -1;
×
UNCOV
3651
      pVtbWindow->curWinBatchIdx = 0;
×
UNCOV
3652
      break;
×
3653
    }
UNCOV
3654
    default:
×
UNCOV
3655
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
UNCOV
3656
      break;
×
3657
  }
3658
  return 0;
2,269,525✔
3659
}
3660

3661
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
3,064,899✔
3662
  int32_t                    code = TSDB_CODE_SUCCESS;
3,064,899✔
3663
  int32_t                    line = 0;
3,064,899✔
3664
  int64_t                    st = 0;
3,064,899✔
3665
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,064,899✔
3666

3667
  if (OPTR_IS_OPENED(pOperator)) {
3,064,899✔
3668
    return code;
2,402,133✔
3669
  }
3670

3671
  if (pOperator->cost.openCost == 0) {
662,766✔
3672
    st = taosGetTimestampUs();
662,766✔
3673
  }
3674

3675
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
662,766✔
3676
  QUERY_CHECK_CODE(code, line, _return);
662,766✔
3677
  OPTR_SET_OPENED(pOperator);
662,766✔
3678

3679
_return:
662,766✔
3680
  if (pOperator->cost.openCost == 0) {
662,766✔
3681
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
662,766✔
3682
  }
3683
  if (code) {
662,766✔
UNCOV
3684
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
UNCOV
3685
    pOperator->pTaskInfo->code = code;
×
UNCOV
3686
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3687
  }
3688
  return code;
662,766✔
3689
}
3690

3691
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3,064,899✔
3692
  int32_t                    code = TSDB_CODE_SUCCESS;
3,064,899✔
3693
  int32_t                    line = 0;
3,064,899✔
3694
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,064,899✔
3695
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,064,899✔
3696
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
3,064,899✔
3697
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
3,064,899✔
3698
  SOperatorParam*            pAggParam = NULL;
3,064,899✔
3699

3700
  if (pInfo->vtbScan.hasPartition) {
3,064,899✔
3701
    if (pInfo->vtbScan.batchProcessChild) {
2,673,726✔
3702
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,117,434✔
3703
      while (pIter) {
2,639,295✔
3704
        size_t     keyLen = 0;
2,295,645✔
3705
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
2,295,645✔
3706

3707
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
2,295,645✔
3708
        QUERY_CHECK_CODE(code, line, _return);
2,295,645✔
3709

3710
        if (pAggParam) {
2,295,645✔
3711
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
1,898,430✔
3712
          QUERY_CHECK_CODE(code, line, _return);
1,898,430✔
3713
        } else {
3714
          *pRes = NULL;
397,215✔
3715
        }
3716

3717
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
2,295,645✔
3718

3719
        if (*pRes) {
2,295,645✔
3720
          (*pRes)->info.id.groupId = groupid;
773,784✔
3721
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
773,784✔
3722
          QUERY_CHECK_CODE(code, line, _return);
773,784✔
3723
          break;
773,784✔
3724
        }
3725
      }
3726
    } else {
3727
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,556,292✔
3728
      while (pIter) {
2,362,188✔
3729
        size_t     keyLen = 0;
2,165,628✔
3730
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
2,165,628✔
3731
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
2,165,628✔
3732

3733
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
2,165,628✔
3734
        while (pIter2) {
3,934,668✔
3735
          size_t   keyLen2 = 0;
3,128,772✔
3736
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
3,128,772✔
3737
          SArray*  pTagList = *(SArray**)pIter2;
3,128,772✔
3738

3739
          if (pVtbScan->genNewParam) {
3,128,772✔
3740
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
1,769,040✔
3741
            QUERY_CHECK_CODE(code, line, _return);
1,769,040✔
3742
            if (pAggParam) {
1,769,040✔
3743
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
1,349,712✔
3744
              QUERY_CHECK_CODE(code, line, _return);
1,349,712✔
3745
            } else {
3746
              *pRes = NULL;
419,328✔
3747
            }
3748
          } else {
3749
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
1,359,732✔
3750
            QUERY_CHECK_CODE(code, line, _return);
1,359,732✔
3751
          }
3752

3753
          if (*pRes) {
3,128,772✔
3754
            pVtbScan->genNewParam = false;
1,359,732✔
3755
            (*pRes)->info.id.groupId = *groupid;
1,359,732✔
3756
            break;
1,359,732✔
3757
          }
3758
          pVtbScan->genNewParam = true;
1,769,040✔
3759
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
1,769,040✔
3760
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
1,769,040✔
3761
          QUERY_CHECK_CODE(code, line, _return);
1,769,040✔
3762
        }
3763
        if (*pRes) {
2,165,628✔
3764
          break;
1,359,732✔
3765
        }
3766
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
805,896✔
3767
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
805,896✔
3768
        QUERY_CHECK_CODE(code, line, _return);
805,896✔
3769
      }
3770
    }
3771

3772
  } else {
3773
    if (pInfo->vtbScan.batchProcessChild) {
391,173✔
3774
      code = buildAggOperatorParam(pInfo, &pAggParam);
78,736✔
3775
      QUERY_CHECK_CODE(code, line, _return);
78,736✔
3776

3777
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
78,736✔
3778
      QUERY_CHECK_CODE(code, line, _return);
78,736✔
3779
      setOperatorCompleted(pOperator);
78,736✔
3780
    } else {
3781
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
312,437✔
3782
      while (pIter) {
684,277✔
3783
        size_t   keyLen = 0;
640,457✔
3784
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
640,457✔
3785
        SArray*  pTagList = *(SArray**)pIter;
640,457✔
3786

3787
        if (pVtbScan->genNewParam) {
640,457✔
3788
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
371,840✔
3789
          QUERY_CHECK_CODE(code, line, _return);
371,840✔
3790

3791
          if (pAggParam) {
371,840✔
3792
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
267,008✔
3793
            QUERY_CHECK_CODE(code, line, _return);
267,008✔
3794
          } else {
3795
            *pRes = NULL;
104,832✔
3796
          }
3797
        } else {
3798
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
268,617✔
3799
          QUERY_CHECK_CODE(code, line, _return);
268,617✔
3800
        }
3801

3802
        if (*pRes) {
640,457✔
3803
          pVtbScan->genNewParam = false;
268,617✔
3804
          break;
268,617✔
3805
        }
3806
        pVtbScan->genNewParam = true;
371,840✔
3807
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
371,840✔
3808
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
371,840✔
3809
        QUERY_CHECK_CODE(code, line, _return);
371,840✔
3810
      }
3811
    }
3812
  }
3813
_return:
3,064,899✔
3814
  if (code) {
3,064,899✔
UNCOV
3815
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3816
  }
3817
  return code;
3,064,899✔
3818
}
3819

3820
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3,143,635✔
3821
  int32_t                    code = TSDB_CODE_SUCCESS;
3,143,635✔
3822
  int32_t                    line = 0;
3,143,635✔
3823
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,143,635✔
3824
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,143,635✔
3825

3826
  QRY_PARAM_CHECK(pRes);
3,143,635✔
3827
  if (pOperator->status == OP_EXEC_DONE) {
3,143,635✔
3828
    return code;
78,736✔
3829
  }
3830

3831
  code = pOperator->fpSet._openFn(pOperator);
3,064,899✔
3832
  QUERY_CHECK_CODE(code, line, _return);
3,064,899✔
3833

3834
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
3,064,899✔
3835
    setOperatorCompleted(pOperator);
×
3836
    return code;
×
3837
  }
3838

3839
  code = virtualTableAggGetNext(pOperator, pRes);
3,064,899✔
3840
  QUERY_CHECK_CODE(code, line, _return);
3,064,899✔
3841

3842
  return code;
3,064,899✔
3843

3844
_return:
×
3845
  if (code) {
×
3846
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3847
    pOperator->pTaskInfo->code = code;
×
UNCOV
3848
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3849
  }
3850
  return code;
×
3851
}
3852

3853
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,396,372✔
3854
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3855
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3856
  QRY_PARAM_CHECK(pOptrInfo);
3,396,372✔
3857

3858
  int32_t                    code = TSDB_CODE_SUCCESS;
3,396,372✔
3859
  int32_t                    line = 0;
3,396,372✔
3860
  __optr_fn_t                nextFp = NULL;
3,396,372✔
3861
  __optr_open_fn_t           openFp = NULL;
3,396,372✔
3862
  SOperatorInfo*             pOperator = NULL;
3,396,372✔
3863
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
3,396,372✔
3864
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
3,396,372✔
3865

3866
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,396,372✔
3867
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
3,396,372✔
3868

3869
  pOperator->pPhyNode = pPhyciNode;
3,396,372✔
3870
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
3,396,372✔
3871

3872
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
3,396,372✔
3873
  QUERY_CHECK_CODE(code, line, _error);
3,396,372✔
3874

3875
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
3,396,372✔
3876
                  pInfo, pTaskInfo);
3877

3878
  pInfo->qType = pPhyciNode->qType;
3,396,372✔
3879
  switch (pInfo->qType) {
3,396,372✔
3880
    case DYN_QTYPE_STB_HASH:
1,130,998✔
3881
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,130,998✔
3882
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,130,998✔
3883
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,130,998✔
3884
      QUERY_CHECK_CODE(code, line, _error);
1,130,998✔
3885
      nextFp = seqStableJoin;
1,130,998✔
3886
      openFp = optrDummyOpenFn;
1,130,998✔
3887
      break;
1,130,998✔
3888
    case DYN_QTYPE_VTB_SCAN:
1,227,417✔
3889
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,227,417✔
3890
      QUERY_CHECK_CODE(code, line, _error);
1,227,417✔
3891
      nextFp = vtbScanNext;
1,227,417✔
3892
      openFp = vtbScanOpen;
1,227,417✔
3893
      break;
1,227,417✔
3894
    case DYN_QTYPE_VTB_WINDOW:
375,191✔
3895
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
375,191✔
3896
      QUERY_CHECK_CODE(code, line, _error);
375,191✔
3897
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
375,191✔
3898
      QUERY_CHECK_CODE(code, line, _error);
375,191✔
3899
      nextFp = vtbWindowNext;
375,191✔
3900
      openFp = vtbWindowOpen;
375,191✔
3901
      break;
375,191✔
3902
    case DYN_QTYPE_VTB_AGG:
662,766✔
3903
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
662,766✔
3904
      QUERY_CHECK_CODE(code, line, _error);
662,766✔
3905
      nextFp = vtbAggNext;
662,766✔
3906
      openFp = vtbAggOpen;
662,766✔
3907
      break;
662,766✔
UNCOV
3908
    default:
×
UNCOV
3909
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
UNCOV
3910
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
3911
      goto _error;
×
3912
  }
3913

3914
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
3,396,372✔
3915
                                         NULL, optrDefaultGetNextExtFn, NULL);
3916

3917
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
3,396,372✔
3918
  *pOptrInfo = pOperator;
3,396,372✔
3919
  return TSDB_CODE_SUCCESS;
3,396,372✔
3920

UNCOV
3921
_error:
×
UNCOV
3922
  if (pInfo != NULL) {
×
UNCOV
3923
    destroyDynQueryCtrlOperator(pInfo);
×
3924
  }
UNCOV
3925
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
UNCOV
3926
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
UNCOV
3927
  pTaskInfo->code = code;
×
UNCOV
3928
  return code;
×
3929
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc