• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4931

16 Jan 2026 02:32AM UTC coverage: 66.749% (+0.03%) from 66.716%
#4931

push

travis-ci

web-flow
enh: interp supports using non-null prev/next values to fill (#34236)

281 of 327 new or added lines in 11 files covered. (85.93%)

1890 existing lines in 121 files now uncovered.

203303 of 304580 relevant lines covered (66.75%)

129941648.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.48
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "tdataformat.h"
33
#include "dynqueryctrl.h"
34

35
int64_t gSessionId = 0;
36

37
void freeVgTableList(void* ptr) { 
×
38
  taosArrayDestroy(*(SArray**)ptr); 
×
39
}
×
40

41
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,151,391✔
42
  SStbJoinTableList* pNext = NULL;
1,151,391✔
43
  
44
  while (pListHead) {
1,151,887✔
45
    taosMemoryFree(pListHead->pLeftVg);
496✔
46
    taosMemoryFree(pListHead->pLeftUid);
496✔
47
    taosMemoryFree(pListHead->pRightVg);
496✔
48
    taosMemoryFree(pListHead->pRightUid);
496✔
49
    pNext = pListHead->pNext;
496✔
50
    taosMemoryFree(pListHead);
496✔
51
    pListHead = pNext;
496✔
52
  }
53
}
1,151,391✔
54

55
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,151,391✔
56
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,151,391✔
57
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
58
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
59

60
  if (pStbJoin->basic.batchFetch) {
1,151,391✔
61
    if (pStbJoin->ctx.prev.leftHash) {
1,150,281✔
62
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,080,212✔
63
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,080,212✔
64
    }
65
    if (pStbJoin->ctx.prev.rightHash) {
1,150,281✔
66
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,080,212✔
67
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,080,212✔
68
    }
69
  } else {
70
    if (pStbJoin->ctx.prev.leftCache) {
1,110✔
71
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,110✔
72
    }
73
    if (pStbJoin->ctx.prev.rightCache) {
1,110✔
74
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,110✔
75
    }
76
    if (pStbJoin->ctx.prev.onceTable) {
1,110✔
77
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,110✔
78
    }
79
  }
80

81
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,151,391✔
82
}
1,151,391✔
83

84
void destroyColRefInfo(void *info) {
227,566,237✔
85
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
227,566,237✔
86
  if (pColRefInfo) {
227,566,237✔
87
    taosMemoryFree(pColRefInfo->colName);
227,566,237✔
88
    taosMemoryFree(pColRefInfo->colrefName);
227,566,237✔
89
  }
90
}
227,566,237✔
91

92
void destroyColRefArray(void *info) {
12,011,502✔
93
  SArray *pColRefArray = *(SArray **)info;
12,011,502✔
94
  if (pColRefArray) {
12,011,502✔
95
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
12,011,502✔
96
  }
97
}
12,011,502✔
98

99
void freeUseDbOutput(void* pOutput) {
2,831,623✔
100
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
2,831,623✔
101
  if (NULL == pOutput) {
2,831,623✔
102
    return;
×
103
  }
104

105
  if (pOut->dbVgroup) {
2,831,623✔
106
    freeVgInfo(pOut->dbVgroup);
2,831,623✔
107
  }
108
  taosMemFree(pOut);
2,831,623✔
109
}
110

111
void destroyOtbInfoArray(void *info) {
4,742,699✔
112
  SArray *pOtbInfoArray = *(SArray **)info;
4,742,699✔
113
  if (pOtbInfoArray) {
4,742,699✔
114
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
4,742,699✔
115
  }
116
}
4,742,699✔
117

118
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
3,478,189✔
119
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
3,478,189✔
120
  if (pOtbVgIdToOtbInfoArrayMap) {
3,478,189✔
121
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
3,478,189✔
122
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
3,478,189✔
123
  }
124
}
3,478,189✔
125

126
void destroyTagList(void *info) {
2,618,128✔
127
  SArray *pTagList = *(SArray **)info;
2,618,128✔
128
  if (pTagList) {
2,618,128✔
129
    taosArrayDestroyEx(pTagList, destroyTagVal);
2,618,128✔
130
  }
131
}
2,618,128✔
132

133
void destroyVtbUidTagListMap(void *info) {
986,952✔
134
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
986,952✔
135
  if (pVtbUidTagListMap) {
986,952✔
136
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
986,952✔
137
    taosHashCleanup(pVtbUidTagListMap);
986,952✔
138
  }
139
}
986,952✔
140

141
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
2,468,316✔
142
  if (pVtbScan->dbName) {
2,468,316✔
143
    taosMemoryFreeClear(pVtbScan->dbName);
2,468,316✔
144
  }
145
  if (pVtbScan->tbName) {
2,468,316✔
146
    taosMemoryFreeClear(pVtbScan->tbName);
2,468,316✔
147
  }
148
  if (pVtbScan->childTableList) {
2,468,316✔
149
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
2,468,316✔
150
  }
151
  if (pVtbScan->colRefInfo) {
2,468,316✔
152
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
153
    pVtbScan->colRefInfo = NULL;
×
154
  }
155
  if (pVtbScan->childTableMap) {
2,468,316✔
156
    taosHashCleanup(pVtbScan->childTableMap);
2,436,877✔
157
  }
158
  if (pVtbScan->readColList) {
2,468,316✔
159
    taosArrayDestroy(pVtbScan->readColList);
2,468,316✔
160
  }
161
  if (pVtbScan->dbVgInfoMap) {
2,468,316✔
162
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
2,468,316✔
163
    taosHashCleanup(pVtbScan->dbVgInfoMap);
2,468,316✔
164
  }
165
  if (pVtbScan->otbNameToOtbInfoMap) {
2,468,316✔
166
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
807,819✔
167
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
807,819✔
168
  }
169
  if (pVtbScan->pRsp) {
2,468,316✔
170
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
171
    taosMemoryFreeClear(pVtbScan->pRsp);
×
172
  }
173
  if (pVtbScan->existOrgTbVg) {
2,468,316✔
174
    taosHashCleanup(pVtbScan->existOrgTbVg);
2,468,316✔
175
  }
176
  if (pVtbScan->curOrgTbVg) {
2,468,316✔
177
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,512✔
178
  }
179
  if (pVtbScan->newAddedVgInfo) {
2,468,316✔
180
    taosHashCleanup(pVtbScan->newAddedVgInfo);
864✔
181
  }
182
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
2,468,316✔
183
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
93,860✔
184
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
93,860✔
185
  }
186
  if (pVtbScan->vtbUidToVgIdMapMap) {
2,468,316✔
187
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
293,452✔
188
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
293,452✔
189
  }
190
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
2,468,316✔
191
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
419,878✔
192
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
419,878✔
193
  }
194
  if (pVtbScan->vtbUidTagListMap) {
2,468,316✔
195
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
293,452✔
196
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
293,452✔
197
  }
198
  if (pVtbScan->vtbGroupIdTagListMap) {
2,468,316✔
199
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
660,598✔
200
  }
201
  if (pVtbScan->vtbUidToGroupIdMap) {
2,468,316✔
202
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
660,598✔
203
  }
204
}
2,468,316✔
205

206
void destroyWinArray(void *info) {
358,701,948✔
207
  SArray *pWinArray = *(SArray **)info;
358,701,948✔
208
  if (pWinArray) {
358,701,948✔
209
    taosArrayDestroy(pWinArray);
358,701,948✔
210
  }
211
}
358,701,948✔
212

213
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
270,600✔
214
  if (pVtbWindow->pRes) {
270,600✔
215
    blockDataDestroy(pVtbWindow->pRes);
270,600✔
216
  }
217
  if (pVtbWindow->pWins) {
270,600✔
218
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
270,600✔
219
  }
220
}
270,600✔
221

222
static void destroyDynQueryCtrlOperator(void* param) {
3,889,557✔
223
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
3,889,557✔
224

225
  switch (pDyn->qType) {
3,889,557✔
226
    case DYN_QTYPE_STB_HASH:
1,150,641✔
227
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,150,641✔
228
      break;
1,150,641✔
229
    case DYN_QTYPE_VTB_AGG:
2,468,316✔
230
    case DYN_QTYPE_VTB_SCAN:
231
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
2,468,316✔
232
      break;
2,468,316✔
233
    case DYN_QTYPE_VTB_WINDOW:
270,600✔
234
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
270,600✔
235
      break;
270,600✔
236
    default:
×
237
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
238
      break;
×
239
  }
240

241
  taosMemoryFreeClear(param);
3,889,557✔
242
}
3,889,557✔
243

244
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
245
  if (batchFetch) {
7,402,960✔
246
    return true;
7,398,520✔
247
  }
248
  
249
  if (rightTable) {
4,440✔
250
    return pPost->rightCurrUid == pPost->rightNextUid;
2,220✔
251
  }
252

253
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,220✔
254

255
  return (NULL == num) ? false : true;
2,220✔
256
}
257

258
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,701,480✔
259
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,701,480✔
260
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,701,480✔
261
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,701,480✔
262
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,701,480✔
263
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,701,480✔
264
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,701,480✔
265
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,701,480✔
266
  int64_t                    readIdx = pNode->readIdx + 1;
3,701,480✔
267
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,701,480✔
268

269
  pPost->leftCurrUid = *leftUid;
3,701,480✔
270
  pPost->rightCurrUid = *rightUid;
3,701,480✔
271

272
  pPost->leftVgId = *leftVgId;
3,701,480✔
273
  pPost->rightVgId = *rightVgId;
3,701,480✔
274

275
  while (true) {
276
    if (readIdx < pNode->uidNum) {
3,701,480✔
277
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,630,797✔
278
      break;
3,630,797✔
279
    }
280
    
281
    pNode = pNode->pNext;
70,683✔
282
    if (NULL == pNode) {
70,683✔
283
      pPost->rightNextUid = 0;
70,683✔
284
      break;
70,683✔
285
    }
286
    
287
    rightUid = pNode->pRightUid;
×
288
    readIdx = 0;
×
289
  }
290

291
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,402,960✔
292
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,402,960✔
293

294
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,701,480✔
295
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
296
    pStbJoin->execInfo.rightCacheNum++;
×
297
  }  
298

299
  return TSDB_CODE_SUCCESS;
3,701,480✔
300
}
301

302
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
9,699,016✔
303
  int32_t     code = TSDB_CODE_SUCCESS;
9,699,016✔
304
  int32_t     lino = 0;
9,699,016✔
305
  SOrgTbInfo* pTbInfo = NULL;
9,699,016✔
306

307
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
9,699,016✔
308

309
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
9,699,016✔
310
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
9,699,016✔
311

312
  pTbInfo->vgId = pSrc->vgId;
9,699,016✔
313
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
9,699,016✔
314

315
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
9,699,016✔
316
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
9,699,016✔
317

318
  *ppDst = pTbInfo;
9,699,016✔
319

320
  return code;
9,699,016✔
321
_return:
×
322
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
323
  if (pTbInfo) {
×
324
    if (pTbInfo->colMap) {
×
325
      taosArrayDestroy(pTbInfo->colMap);
×
326
    }
327
    taosMemoryFreeClear(pTbInfo);
×
328
  }
329
  return code;
×
330
}
331

332
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
2,350,264✔
333
  int32_t  code = TSDB_CODE_SUCCESS;
2,350,264✔
334
  int32_t  lino = 0;
2,350,264✔
335
  STagVal  tmpTag;
2,350,264✔
336

337
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
2,350,264✔
338
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
2,350,264✔
339

340
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
15,904,496✔
341
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
13,554,232✔
342
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
13,554,232✔
343
    tmpTag.type = pSrcTag->type;
13,554,232✔
344
    tmpTag.cid = pSrcTag->cid;
13,554,232✔
345
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
13,554,232✔
346
      tmpTag.nData = pSrcTag->nData;
5,947,392✔
347
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
5,947,392✔
348
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
5,947,392✔
349
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
5,947,392✔
350
    } else {
351
      tmpTag.i64 = pSrcTag->i64;
7,606,840✔
352
    }
353

354
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
27,108,464✔
355
    tmpTag = (STagVal){0};
13,554,232✔
356
  }
357

358
  return code;
2,350,264✔
359
_return:
×
360
  if (pBasic->tagList) {
×
361
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
362
    pBasic->tagList = NULL;
×
363
  }
364
  if (tmpTag.pData) {
×
365
    taosMemoryFree(tmpTag.pData);
×
366
  }
367
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
368
  return code;
×
369
}
370

371
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
4,867,391✔
372
  int32_t     code = TSDB_CODE_SUCCESS;
4,867,391✔
373
  int32_t     lino = 0;
4,867,391✔
374
  SOrgTbInfo  batchInfo;
4,867,391✔
375

376
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
4,867,391✔
377
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
4,867,391✔
378

379
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
12,216,010✔
380
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
7,348,619✔
381
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
7,348,619✔
382
    batchInfo.vgId = pSrc->vgId;
7,348,619✔
383
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
7,348,619✔
384
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
7,348,619✔
385
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
7,348,619✔
386
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
14,697,238✔
387
    batchInfo = (SOrgTbInfo){0};
7,348,619✔
388
  }
389

390
  return code;
4,867,391✔
391
_return:
×
392
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
393
  if (pBasic->batchOrgTbInfo) {
×
394
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
395
    pBasic->batchOrgTbInfo = NULL;
×
396
  }
397
  if (batchInfo.colMap) {
×
398
    taosArrayDestroy(batchInfo.colMap);
×
399
    batchInfo.colMap = NULL;
×
400
  }
401
  return code;
×
402
}
403

404
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,402,960✔
405
  int32_t code = TSDB_CODE_SUCCESS;
7,402,960✔
406
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,402,960✔
407
  if (NULL == *ppRes) {
7,402,960✔
408
    code = terrno;
×
409
    freeOperatorParam(pChild, OP_GET_PARAM);
×
410
    return code;
×
411
  }
412
  if (pChild) {
7,402,960✔
413
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
144,578✔
414
    if (NULL == (*ppRes)->pChildren) {
144,578✔
415
      code = terrno;
×
416
      freeOperatorParam(pChild, OP_GET_PARAM);
×
417
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
418
      *ppRes = NULL;
×
419
      return code;
×
420
    }
421
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
289,156✔
422
      code = terrno;
×
423
      freeOperatorParam(pChild, OP_GET_PARAM);
×
424
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
425
      *ppRes = NULL;
×
426
      return code;
×
427
    }
428
  } else {
429
    (*ppRes)->pChildren = NULL;
7,258,382✔
430
  }
431

432
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,402,960✔
433
  if (NULL == pGc) {
7,402,960✔
434
    code = terrno;
×
435
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
436
    *ppRes = NULL;
×
437
    return code;
×
438
  }
439

440
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,402,960✔
441
  pGc->downstreamIdx = downstreamIdx;
7,402,960✔
442
  pGc->vgId = vgId;
7,402,960✔
443
  pGc->tbUid = tbUid;
7,402,960✔
444
  pGc->needCache = needCache;
7,402,960✔
445

446
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,402,960✔
447
  (*ppRes)->downstreamIdx = downstreamIdx;
7,402,960✔
448
  (*ppRes)->value = pGc;
7,402,960✔
449
  (*ppRes)->reUse = false;
7,402,960✔
450

451
  return TSDB_CODE_SUCCESS;
7,402,960✔
452
}
453

454

455
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
456
  int32_t code = TSDB_CODE_SUCCESS;
×
457
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
458
  if (NULL == *ppRes) {
×
459
    return terrno;
×
460
  }
461
  (*ppRes)->pChildren = NULL;
×
462

463
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
464
  if (NULL == pGc) {
×
465
    code = terrno;
×
466
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
467
    return code;
×
468
  }
469

470
  pGc->downstreamIdx = downstreamIdx;
×
471
  pGc->vgId = vgId;
×
472
  pGc->tbUid = tbUid;
×
473

474
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
475
  (*ppRes)->downstreamIdx = downstreamIdx;
×
476
  (*ppRes)->value = pGc;
×
477
  (*ppRes)->reUse = false;
×
478

479
  return TSDB_CODE_SUCCESS;
×
480
}
481

482
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
63,899,776✔
483
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
484
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
485
                                               SArray* pOrgTbInfoArray, STimeWindow window,
486
                                               SDownstreamSourceNode* pDownstreamSourceNode,
487
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
488
  int32_t code = TSDB_CODE_SUCCESS;
63,899,776✔
489
  int32_t lino = 0;
63,899,776✔
490

491
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
63,899,776✔
492
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
493

494
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
63,899,776✔
495
  pBasic->srcOpType = srcOpType;
63,899,776✔
496
  pBasic->vgId = vgId;
63,899,776✔
497
  pBasic->groupid = groupId;
63,899,776✔
498
  pBasic->window = window;
63,899,776✔
499
  pBasic->tableSeq = tableSeq;
63,899,776✔
500
  pBasic->type = exchangeType;
63,899,776✔
501
  pBasic->isNewParam = isNewParam;
63,899,776✔
502

503
  if (pDownstreamSourceNode) {
63,899,776✔
504
    pBasic->isNewDeployed = true;
2,376✔
505
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,376✔
506
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,376✔
507
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,376✔
508
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,376✔
509
    pBasic->newDeployedSrc.localExec = false;
2,376✔
510
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,376✔
511
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,376✔
512
  } else {
513
    pBasic->isNewDeployed = false;
63,897,400✔
514
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
63,897,400✔
515
  }
516

517
  if (pUidList) {
63,899,776✔
518
    pBasic->uidList = taosArrayDup(pUidList, NULL);
6,659,881✔
519
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
6,659,881✔
520
  } else {
521
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
57,239,895✔
522
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
57,239,895✔
523
  }
524

525
  if (pOrgTbInfo) {
63,899,776✔
526
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
9,699,016✔
527
    QUERY_CHECK_CODE(code, lino, _return);
9,699,016✔
528
  } else {
529
    pBasic->orgTbInfo = NULL;
54,200,760✔
530
  }
531

532
  if (pTagList) {
63,899,776✔
533
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
2,350,264✔
534
    QUERY_CHECK_CODE(code, lino, _return);
2,350,264✔
535
  } else {
536
    pBasic->tagList = NULL;
61,549,512✔
537
  }
538

539
  if (pOrgTbInfoArray) {
63,899,776✔
540
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
4,867,391✔
541
    QUERY_CHECK_CODE(code, lino, _return);
4,867,391✔
542
  } else {
543
    pBasic->batchOrgTbInfo = NULL;
59,032,385✔
544
  }
545
  return code;
63,899,776✔
546

UNCOV
547
_return:
×
548
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
549
  freeExchangeGetBasicOperatorParam(pBasic);
×
550
  return code;
×
551
}
552

553
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
58,803,756✔
554
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
555
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
556
                                              SArray* pOrgTbInfoArray, STimeWindow window,
557
                                              SDownstreamSourceNode* pDownstreamSourceNode,
558
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
559

560
  int32_t                      code = TSDB_CODE_SUCCESS;
58,803,756✔
561
  int32_t                      lino = 0;
58,803,756✔
562
  SOperatorParam*              pParam = NULL;
58,803,756✔
563
  SExchangeOperatorParam*      pExc = NULL;
58,803,756✔
564

565
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
58,803,756✔
566
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
58,803,756✔
567

568
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
58,803,756✔
569
  pParam->downstreamIdx = downstreamIdx;
58,803,756✔
570
  pParam->reUse = reUse;
58,803,756✔
571
  pParam->pChildren = NULL;
58,803,756✔
572
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
58,803,756✔
573
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
58,803,756✔
574

575
  pExc = (SExchangeOperatorParam*)pParam->value;
58,803,756✔
576
  pExc->multiParams = false;
58,803,756✔
577

578
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
58,803,756✔
579
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
580
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
581

582
  *ppRes = pParam;
58,803,756✔
583
  return code;
58,803,756✔
UNCOV
584
_return:
×
585
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
586
  if (pParam) {
×
587
    freeOperatorParam(pParam, OP_GET_PARAM);
×
588
  }
UNCOV
589
  return code;
×
590
}
591

592
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,440✔
593
  int32_t code = TSDB_CODE_SUCCESS;
4,440✔
594
  int32_t lino = 0;
4,440✔
595

596
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,440✔
597
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,440✔
598

599
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,440✔
600

601
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,440✔
602
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,440✔
603
  QUERY_CHECK_CODE(code, lino, _return);
4,440✔
604

605
_return:
4,440✔
606
  if (code) {
4,440✔
UNCOV
607
    qError("failed to build exchange operator param, code:%d", code);
×
608
  }
609
  taosArrayDestroy(pUidList);
4,440✔
610
  return code;
4,440✔
611
}
612

613
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
42,673,488✔
614
  int32_t                   code = TSDB_CODE_SUCCESS;
42,673,488✔
615
  int32_t                   lino = 0;
42,673,488✔
616

617
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_WIN_SCAN,
42,673,488✔
618
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
619
  QUERY_CHECK_CODE(code, lino, _return);
42,673,488✔
620

621
  return code;
42,673,488✔
UNCOV
622
_return:
×
623
  qError("failed to build exchange operator param for external window, code:%d", code);
×
624
  return code;
×
625
}
626

627
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
6,426,812✔
628
  int32_t                      code = TSDB_CODE_SUCCESS;
6,426,812✔
629
  int32_t                      lino = 0;
6,426,812✔
630
  SArray*                      pUidList = NULL;
6,426,812✔
631

632
  pUidList = taosArrayInit(1, sizeof(int64_t));
6,426,812✔
633
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
6,426,812✔
634

635
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
6,426,812✔
636

637
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
6,426,812✔
638
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
6,426,812✔
639
  QUERY_CHECK_CODE(code, lino, _return);
6,426,812✔
640

641
_return:
6,426,812✔
642
  if (code) {
6,426,812✔
UNCOV
643
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
644
  }
645
  taosArrayDestroy(pUidList);
6,426,812✔
646
  return code;
6,426,812✔
647
}
648

649
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
9,699,016✔
650
                                                  SDownstreamSourceNode* pNewSource) {
651
  int32_t                      code = TSDB_CODE_SUCCESS;
9,699,016✔
652
  int32_t                      lino = 0;
9,699,016✔
653

654
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
9,699,016✔
655
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
9,699,016✔
656
  QUERY_CHECK_CODE(code, lino, _return);
9,699,016✔
657

658
  return code;
9,699,016✔
UNCOV
659
_return:
×
660
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
661
  return code;
×
662
}
663

664
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
128,822✔
665
  int32_t                       code = TSDB_CODE_SUCCESS;
128,822✔
666
  int32_t                       line = 0;
128,822✔
667
  SOperatorParam*               pParam = NULL;
128,822✔
668
  SExchangeOperatorBatchParam*  pExc = NULL;
128,822✔
669
  SExchangeOperatorBasicParam   basic = {0};
128,822✔
670

671
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
128,822✔
672
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
128,822✔
673

674
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
128,822✔
675
  pParam->downstreamIdx = downstreamIdx;
128,822✔
676
  pParam->reUse = false;
128,822✔
677
  pParam->pChildren = NULL;
128,822✔
678
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
128,822✔
679
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
128,822✔
680

681
  pExc = pParam->value;
128,822✔
682
  pExc->multiParams = true;
128,822✔
683
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
128,822✔
684
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
128,822✔
685

686
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
128,822✔
687

688
  int32_t iter = 0;
128,822✔
689
  void*   p = NULL;
128,822✔
690
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
357,451✔
691
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
228,629✔
692
    SArray*  pUidList = *(SArray**)p;
228,629✔
693

694
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
228,629✔
695
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
696
                                           pUidList, NULL, NULL, NULL,
697
                                           (STimeWindow){0}, NULL, false, false, false);
228,629✔
698
    QUERY_CHECK_CODE(code, line, _return);
228,629✔
699

700
    // already transferred to batch param, can free here
701
    taosArrayDestroy(pUidList);
228,629✔
702

703
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
228,629✔
704

705
    basic = (SExchangeOperatorBasicParam){0};
228,629✔
706
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
228,629✔
707
    *(SArray**)p = NULL;
228,629✔
708
  }
709
  *ppRes = pParam;
128,822✔
710

711
  return code;
128,822✔
712
  
UNCOV
713
_return:
×
714
  qError("failed to build batch exchange operator param, code:%d", code);
×
715
  freeOperatorParam(pParam, OP_GET_PARAM);
×
716
  freeExchangeGetBasicOperatorParam(&basic);
×
717
  return code;
×
718
}
719

720
static int32_t buildBatchExchangeOperatorParamForVSAgg(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps) {
4,390,118✔
721
  int32_t                       code = TSDB_CODE_SUCCESS;
4,390,118✔
722
  int32_t                       lino = 0;
4,390,118✔
723
  SOperatorParam*               pParam = NULL;
4,390,118✔
724
  SExchangeOperatorBatchParam*  pExc = NULL;
4,390,118✔
725
  SExchangeOperatorBasicParam   basic = {0};
4,390,118✔
726

727
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
4,390,118✔
728
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
4,390,118✔
729

730
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
4,390,118✔
731
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
4,390,118✔
732

733
  pExc = pParam->value;
4,390,118✔
734
  pExc->multiParams = true;
4,390,118✔
735

736
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
4,390,118✔
737
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
4,390,118✔
738
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
4,390,118✔
739

740
  size_t keyLen = 0;
4,390,118✔
741
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
4,390,118✔
742
  while (pIter != NULL) {
9,257,509✔
743
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
4,867,391✔
744
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
4,867,391✔
745
    STimeWindow      win = {.skey = INT64_MAX, .ekey = INT64_MIN};
4,867,391✔
746

747
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
4,867,391✔
748
                                           EX_SRC_TYPE_VSTB_AGG_SCAN, *vgId, groupid,
749
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
750
                                           win, NULL, false, true, false);
751
    QUERY_CHECK_CODE(code, lino, _return);
4,867,391✔
752

753
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
4,867,391✔
754
    QUERY_CHECK_CODE(code, lino, _return);
4,867,391✔
755

756
    basic = (SExchangeOperatorBasicParam){0};
4,867,391✔
757
    pIter = taosHashIterate(pBatchMaps, pIter);
4,867,391✔
758
  }
759

760
  pParam->pChildren = NULL;
4,390,118✔
761
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
4,390,118✔
762
  pParam->downstreamIdx = downstreamIdx;
4,390,118✔
763
  pParam->reUse = false;
4,390,118✔
764

765
  *ppRes = pParam;
4,390,118✔
766
  return code;
4,390,118✔
767

UNCOV
768
_return:
×
769
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
770
  freeOperatorParam(pParam, OP_GET_PARAM);
×
771
  freeExchangeGetBasicOperatorParam(&basic);
×
772
  return code;
×
773
}
774

775
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,701,480✔
776
  int32_t code = TSDB_CODE_SUCCESS;
3,701,480✔
777
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,701,480✔
778
  if (NULL == *ppRes) {
3,701,480✔
UNCOV
779
    code = terrno;
×
780
    return code;
×
781
  }
782
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,701,480✔
783
  if (NULL == (*ppRes)->pChildren) {
3,701,480✔
UNCOV
784
    code = terrno;
×
785
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
786
    *ppRes = NULL;
×
787
    return code;
×
788
  }
789
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,402,960✔
UNCOV
790
    code = terrno;
×
791
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
792
    *ppRes = NULL;
×
793
    return code;
×
794
  }
795
  *ppChild0 = NULL;
3,701,480✔
796
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,402,960✔
UNCOV
797
    code = terrno;
×
798
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
799
    *ppRes = NULL;
×
800
    return code;
×
801
  }
802
  *ppChild1 = NULL;
3,701,480✔
803
  
804
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,701,480✔
805
  if (NULL == pJoin) {
3,701,480✔
UNCOV
806
    code = terrno;
×
807
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
808
    *ppRes = NULL;
×
809
    return code;
×
810
  }
811

812
  pJoin->initDownstream = initParam;
3,701,480✔
813
  
814
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,701,480✔
815
  (*ppRes)->value = pJoin;
3,701,480✔
816
  (*ppRes)->reUse = false;
3,701,480✔
817

818
  return TSDB_CODE_SUCCESS;
3,701,480✔
819
}
820

UNCOV
821
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
822
  int32_t code = TSDB_CODE_SUCCESS;
×
823
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
824
  if (NULL == *ppRes) {
×
825
    code = terrno;
×
826
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
827
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
828
    return code;
×
829
  }
UNCOV
830
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
831
  if (NULL == *ppRes) {
×
832
    code = terrno;
×
833
    taosMemoryFreeClear(*ppRes);
×
834
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
835
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
836
    return code;
×
837
  }
UNCOV
838
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
839
    code = terrno;
×
840
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
841
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
842
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
843
    *ppRes = NULL;
×
844
    return code;
×
845
  }
UNCOV
846
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
847
    code = terrno;
×
848
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
849
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
850
    *ppRes = NULL;
×
851
    return code;
×
852
  }
853
  
UNCOV
854
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
855
  (*ppRes)->value = NULL;
×
856
  (*ppRes)->reUse = false;
×
857

UNCOV
858
  return TSDB_CODE_SUCCESS;
×
859
}
860

861
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
11,316✔
862
  int32_t code = TSDB_CODE_SUCCESS;
11,316✔
863
  int32_t vgNum = tSimpleHashGetSize(pVg);
11,316✔
864
  if (vgNum <= 0 || vgNum > 1) {
11,316✔
UNCOV
865
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
866
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
867
  }
868

869
  int32_t iter = 0;
11,316✔
870
  void* p = NULL;
11,316✔
871
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
22,632✔
872
    SArray* pUidList = *(SArray**)p;
11,316✔
873

874
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
11,316✔
875
    if (code) {
11,316✔
UNCOV
876
      return code;
×
877
    }
878
    taosArrayDestroy(pUidList);
11,316✔
879
    *(SArray**)p = NULL;
11,316✔
880
  }
881
  
882
  return TSDB_CODE_SUCCESS;
11,316✔
883
}
884

UNCOV
885
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
886
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
887
  if (NULL == pUidList) {
×
888
    return terrno;
×
889
  }
UNCOV
890
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
891
    return terrno;
×
892
  }
893

UNCOV
894
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
895
  taosArrayDestroy(pUidList);
×
896
  if (code) {
×
897
    return code;
×
898
  }
899
  
UNCOV
900
  return TSDB_CODE_SUCCESS;
×
901
}
902

903
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,701,480✔
904
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,701,480✔
905
  SOperatorParam*             pSrcParam0 = NULL;
3,701,480✔
906
  SOperatorParam*             pSrcParam1 = NULL;
3,701,480✔
907
  SOperatorParam*             pGcParam0 = NULL;
3,701,480✔
908
  SOperatorParam*             pGcParam1 = NULL;  
3,701,480✔
909
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,701,480✔
910
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,701,480✔
911
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,701,480✔
912
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,701,480✔
913
  int32_t                     code = TSDB_CODE_SUCCESS;
3,701,480✔
914

915
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,701,480✔
916
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
917

918
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,701,480✔
919
  
920
  if (pInfo->stbJoin.basic.batchFetch) {
3,701,480✔
921
    if (pPrev->leftHash) {
3,699,260✔
922
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
70,069✔
923
      if (TSDB_CODE_SUCCESS == code) {
70,069✔
924
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
70,069✔
925
      }
926
      if (TSDB_CODE_SUCCESS == code) {
70,069✔
927
        tSimpleHashCleanup(pPrev->leftHash);
70,069✔
928
        tSimpleHashCleanup(pPrev->rightHash);
70,069✔
929
        pPrev->leftHash = NULL;
70,069✔
930
        pPrev->rightHash = NULL;
70,069✔
931
      }
932
    }
933
  } else {
934
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,220✔
935
    if (TSDB_CODE_SUCCESS == code) {
2,220✔
936
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,220✔
937
    }
938
  }
939

940
  bool initParam = pSrcParam0 ? true : false;
3,701,480✔
941
  if (TSDB_CODE_SUCCESS == code) {
3,701,480✔
942
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,701,480✔
943
    pSrcParam0 = NULL;
3,701,480✔
944
  }
945
  if (TSDB_CODE_SUCCESS == code) {
3,701,480✔
946
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,701,480✔
947
    pSrcParam1 = NULL;
3,701,480✔
948
  }
949
  if (TSDB_CODE_SUCCESS == code) {
3,701,480✔
950
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,701,480✔
951
  }
952
  if (TSDB_CODE_SUCCESS != code) {
3,701,480✔
UNCOV
953
    if (pSrcParam0) {
×
954
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
955
    }
UNCOV
956
    if (pSrcParam1) {
×
957
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
958
    }
UNCOV
959
    if (pGcParam0) {
×
960
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
961
    }
UNCOV
962
    if (pGcParam1) {
×
963
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
964
    }
UNCOV
965
    if (*ppParam) {
×
966
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
967
      *ppParam = NULL;
×
968
    }
969
  }
970
  
971
  return code;
3,701,480✔
972
}
973

974
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
6,426,812✔
975
  int32_t                   code = TSDB_CODE_SUCCESS;
6,426,812✔
976
  int32_t                   lino = 0;
6,426,812✔
977
  SVTableScanOperatorParam* pVScan = NULL;
6,426,812✔
978
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
6,426,812✔
979
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
6,426,812✔
980

981
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
6,426,812✔
982
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
6,426,812✔
983

984
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
6,426,812✔
985
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
6,426,812✔
986
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
6,426,812✔
987
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
6,426,812✔
988
  pVScan->uid = uid;
6,426,812✔
989
  pVScan->window = pInfo->vtbScan.window;
6,426,812✔
990

991
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
6,426,812✔
992
  (*ppRes)->downstreamIdx = 0;
6,426,812✔
993
  (*ppRes)->value = pVScan;
6,426,812✔
994
  (*ppRes)->reUse = false;
6,426,812✔
995

996
  return TSDB_CODE_SUCCESS;
6,426,812✔
UNCOV
997
_return:
×
998
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
999
  if (pVScan) {
×
1000
    taosArrayDestroy(pVScan->pOpParamArray);
×
1001
    taosMemoryFreeClear(pVScan);
×
1002
  }
UNCOV
1003
  if (*ppRes) {
×
1004
    taosArrayDestroy((*ppRes)->pChildren);
×
1005
    taosMemoryFreeClear(*ppRes);
×
1006
  }
UNCOV
1007
  return code;
×
1008
}
1009

1010
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
42,673,488✔
1011
  int32_t                       code = TSDB_CODE_SUCCESS;
42,673,488✔
1012
  int32_t                       lino = 0;
42,673,488✔
1013
  SExternalWindowOperatorParam* pExtWinOp = NULL;
42,673,488✔
1014

1015
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
42,673,488✔
1016
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
42,673,488✔
1017

1018
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
42,673,488✔
1019
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
42,673,488✔
1020

1021
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
42,673,488✔
1022
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
42,673,488✔
1023

1024
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
42,673,488✔
1025
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
42,673,488✔
1026

1027
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
42,673,488✔
1028
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
42,673,488✔
1029

1030
  SOperatorParam* pExchangeOperator = NULL;
42,673,488✔
1031
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
42,673,488✔
1032
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
42,673,488✔
1033
  QUERY_CHECK_CODE(code, lino, _return);
42,673,488✔
1034
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
85,346,976✔
1035

1036
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
42,673,488✔
1037
  (*ppRes)->downstreamIdx = idx;
42,673,488✔
1038
  (*ppRes)->value = pExtWinOp;
42,673,488✔
1039
  (*ppRes)->reUse = false;
42,673,488✔
1040

1041
  return code;
42,673,488✔
UNCOV
1042
_return:
×
1043
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1044
  if (pExtWinOp) {
×
1045
    if (pExtWinOp->ExtWins) {
×
1046
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1047
    }
UNCOV
1048
    taosMemoryFree(pExtWinOp);
×
1049
  }
UNCOV
1050
  if (*ppRes) {
×
1051
    if ((*ppRes)->pChildren) {
×
1052
      taosArrayDestroy((*ppRes)->pChildren);
×
1053
    }
UNCOV
1054
    taosMemoryFree(*ppRes);
×
1055
    *ppRes = NULL;
×
1056
  }
UNCOV
1057
  return code;
×
1058
}
1059

1060
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
15,668,796✔
1061
                                       int32_t numOfDownstream, int32_t numOfWins) {
1062
  int32_t                   code = TSDB_CODE_SUCCESS;
15,668,796✔
1063
  int32_t                   lino = 0;
15,668,796✔
1064
  SMergeOperatorParam*      pMergeOp = NULL;
15,668,796✔
1065

1066
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
15,668,796✔
1067
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
15,668,796✔
1068

1069
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
15,668,796✔
1070
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
15,668,796✔
1071

1072
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
15,668,796✔
1073
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
15,668,796✔
1074

1075
  pMergeOp->winNum = numOfWins;
15,668,796✔
1076

1077
  for (int32_t i = 0; i < numOfDownstream; i++) {
58,342,284✔
1078
    SOperatorParam* pExternalWinParam = NULL;
42,673,488✔
1079
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
42,673,488✔
1080
    QUERY_CHECK_CODE(code, lino, _return);
42,673,488✔
1081
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
85,346,976✔
1082
  }
1083

1084
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
15,668,796✔
1085
  (*ppRes)->downstreamIdx = 0;
15,668,796✔
1086
  (*ppRes)->value = pMergeOp;
15,668,796✔
1087
  (*ppRes)->reUse = false;
15,668,796✔
1088

1089
  return TSDB_CODE_SUCCESS;
15,668,796✔
UNCOV
1090
_return:
×
1091
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1092
  if (pMergeOp) {
×
1093
    taosMemoryFree(pMergeOp);
×
1094
  }
UNCOV
1095
  if (*ppRes) {
×
1096
    if ((*ppRes)->pChildren) {
×
1097
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1098
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
1099
        if (pChildParam) {
×
1100
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
1101
          if (pExtWinOp) {
×
1102
            if (pExtWinOp->ExtWins) {
×
1103
              taosArrayDestroy(pExtWinOp->ExtWins);
×
1104
            }
UNCOV
1105
            taosMemoryFree(pExtWinOp);
×
1106
          }
UNCOV
1107
          taosMemoryFree(pChildParam);
×
1108
        }
1109
      }
UNCOV
1110
      taosArrayDestroy((*ppRes)->pChildren);
×
1111
    }
UNCOV
1112
    taosMemoryFree(*ppRes);
×
1113
    *ppRes = NULL;
×
1114
  }
UNCOV
1115
  return code;
×
1116
}
1117

1118
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
93,860✔
1119
  int32_t                   code = TSDB_CODE_SUCCESS;
93,860✔
1120
  int32_t                   lino = 0;
93,860✔
1121
  SOperatorParam*           pParam = NULL;
93,860✔
1122
  SOperatorParam*           pExchangeParam = NULL;
93,860✔
1123
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
93,860✔
1124
  bool                      freeExchange = false;
93,860✔
1125

1126
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
93,860✔
1127
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
93,860✔
1128

1129
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
93,860✔
1130
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
93,860✔
1131

1132
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
93,860✔
1133
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
93,860✔
1134

1135
  code = buildBatchExchangeOperatorParamForVSAgg(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap);
93,860✔
1136
  QUERY_CHECK_CODE(code, lino, _return);
93,860✔
1137

1138
  freeExchange = true;
93,860✔
1139

1140
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
187,720✔
1141

1142
  freeExchange = false;
93,860✔
1143

1144
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
93,860✔
1145
  pParam->downstreamIdx = 0;
93,860✔
1146
  pParam->reUse = false;
93,860✔
1147

1148
  *ppRes = pParam;
93,860✔
1149

1150
  return code;
93,860✔
UNCOV
1151
_return:
×
1152
  if (freeExchange) {
×
1153
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1154
  }
UNCOV
1155
  if (pParam) {
×
1156
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1157
  }
UNCOV
1158
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1159
  return code;
×
1160
}
1161

1162
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
2,806,505✔
1163
  int32_t                   code = TSDB_CODE_SUCCESS;
2,806,505✔
1164
  int32_t                   lino = 0;
2,806,505✔
1165
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,806,505✔
1166
  SOperatorParam*           pParam = NULL;
2,806,505✔
1167
  SOperatorParam*           pExchangeParam = NULL;
2,806,505✔
1168
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
2,806,505✔
1169
  bool                      freeExchange = false;
2,806,505✔
1170
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
2,806,505✔
1171

1172
  if (!pIter) {
2,806,505✔
1173
    *ppRes = NULL;
486,455✔
1174
    return code;
486,455✔
1175
  }
1176

1177
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
2,320,050✔
1178

1179
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
2,320,050✔
1180
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
2,320,050✔
1181

1182
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
2,320,050✔
1183
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
2,320,050✔
1184

1185
  code = buildBatchExchangeOperatorParamForVSAgg(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap);
2,320,050✔
1186
  QUERY_CHECK_CODE(code, lino, _return);
2,320,050✔
1187

1188
  freeExchange = true;
2,320,050✔
1189

1190
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
4,640,100✔
1191

1192
  freeExchange = false;
2,320,050✔
1193

1194
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
2,320,050✔
1195
  pParam->downstreamIdx = 0;
2,320,050✔
1196
  pParam->value = NULL;
2,320,050✔
1197
  pParam->reUse = false;
2,320,050✔
1198

1199
  *ppRes = pParam;
2,320,050✔
1200

1201
  return code;
2,320,050✔
UNCOV
1202
_return:
×
1203
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1204
  if (freeExchange) {
×
1205
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1206
  }
UNCOV
1207
  if (pParam) {
×
1208
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1209
  }
UNCOV
1210
  return code;
×
1211
}
1212

1213
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
2,618,128✔
1214
  int32_t                   code = TSDB_CODE_SUCCESS;
2,618,128✔
1215
  int32_t                   lino = 0;
2,618,128✔
1216
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,618,128✔
1217
  SOperatorParam*           pParam = NULL;
2,618,128✔
1218
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
2,618,128✔
1219
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
2,618,128✔
1220

1221
  if (pIter) {
2,618,128✔
1222
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
1,976,208✔
1223

1224
    code = buildBatchExchangeOperatorParamForVSAgg(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap);
1,976,208✔
1225
    QUERY_CHECK_CODE(code, lino, _return);
1,976,208✔
1226

1227
    *ppRes = pParam;
1,976,208✔
1228
  } else {
1229
    *ppRes = NULL;
641,920✔
1230
  }
1231

1232
  return code;
2,618,128✔
UNCOV
1233
_return:
×
1234
  if (pParam) {
×
1235
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1236
  }
UNCOV
1237
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1238
  return code;
×
1239
}
1240

1241
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,701,480✔
1242
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,701,480✔
1243
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,701,480✔
1244
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,701,480✔
1245
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,701,480✔
1246
  SOperatorParam*            pParam = NULL;
3,701,480✔
1247
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,701,480✔
1248
  if (TSDB_CODE_SUCCESS != code) {
3,701,480✔
UNCOV
1249
    pOperator->pTaskInfo->code = code;
×
1250
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1251
  }
1252

1253
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,701,480✔
1254
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,701,480✔
1255
  if (*ppRes && (code == 0)) {
3,701,480✔
1256
    code = blockDataCheck(*ppRes);
209,524✔
1257
    if (code) {
209,524✔
UNCOV
1258
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1259
      pOperator->pTaskInfo->code = code;
×
1260
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1261
    }
1262
    pPost->isStarted = true;
209,524✔
1263
    pStbJoin->execInfo.postBlkNum++;
209,524✔
1264
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
209,524✔
1265
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
209,524✔
1266
  } else {
1267
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,491,956✔
1268
  }
1269
}
3,701,480✔
1270

1271

UNCOV
1272
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1273
  SOperatorParam* pGcParam = NULL;
×
1274
  SOperatorParam* pMergeJoinParam = NULL;
×
1275
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1276
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1277
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1278

UNCOV
1279
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1280

UNCOV
1281
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1282
  if (TSDB_CODE_SUCCESS != code) {
×
1283
    return code;
×
1284
  }
UNCOV
1285
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1286
  if (TSDB_CODE_SUCCESS != code) {
×
1287
    return code;
×
1288
  }
1289

UNCOV
1290
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1291
}
1292

1293
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,700,984✔
1294
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,700,984✔
1295
  int32_t code = 0;
3,700,984✔
1296
  
1297
  pPost->isStarted = false;
3,700,984✔
1298
  
1299
  if (pStbJoin->basic.batchFetch) {
3,700,984✔
1300
    return TSDB_CODE_SUCCESS;
3,698,764✔
1301
  }
1302
  
1303
  if (pPost->leftNeedCache) {
2,220✔
UNCOV
1304
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1305
    if (num && --(*num) <= 0) {
×
1306
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1307
      if (code) {
×
1308
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1309
        QRY_ERR_RET(code);
×
1310
      }
UNCOV
1311
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1312
    }
1313
  }
1314
  
1315
  if (!pPost->rightNeedCache) {
2,220✔
1316
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,220✔
1317
    if (NULL != v) {
2,220✔
UNCOV
1318
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1319
      if (code) {
×
1320
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1321
        QRY_ERR_RET(code);
×
1322
      }
UNCOV
1323
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1324
    }
1325
  }
1326

1327
  return TSDB_CODE_SUCCESS;
2,220✔
1328
}
1329

1330

1331
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1332
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
280,207✔
1333
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
280,207✔
1334
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
280,207✔
1335

1336
  if (!pPost->isStarted) {
280,207✔
1337
    return TSDB_CODE_SUCCESS;
71,179✔
1338
  }
1339
  
1340
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
209,028✔
1341
  
1342
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
209,028✔
1343
  if (NULL == *ppRes) {
209,028✔
1344
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
209,028✔
1345
    pPrev->pListHead->readIdx++;
209,028✔
1346
  } else {
UNCOV
1347
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1348
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1349
  }
1350

1351
  return TSDB_CODE_SUCCESS;
209,028✔
1352
}
1353

1354
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1355
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,402,488✔
1356
  if (NULL == ppArray) {
7,402,488✔
1357
    SArray* pArray = taosArrayInit(10, valSize);
239,945✔
1358
    if (NULL == pArray) {
239,945✔
UNCOV
1359
      return terrno;
×
1360
    }
1361
    if (NULL == taosArrayPush(pArray, pVal)) {
479,890✔
UNCOV
1362
      taosArrayDestroy(pArray);
×
1363
      return terrno;
×
1364
    }
1365
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
239,945✔
UNCOV
1366
      taosArrayDestroy(pArray);      
×
1367
      return terrno;
×
1368
    }
1369
    return TSDB_CODE_SUCCESS;
239,945✔
1370
  }
1371

1372
  if (NULL == taosArrayPush(*ppArray, pVal)) {
14,325,086✔
UNCOV
1373
    return terrno;
×
1374
  }
1375
  
1376
  return TSDB_CODE_SUCCESS;
7,162,543✔
1377
}
1378

1379
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1380
  int32_t code = TSDB_CODE_SUCCESS;
2,220✔
1381
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,220✔
1382
  if (NULL == pNum) {
2,220✔
1383
    uint32_t n = 1;
2,220✔
1384
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,220✔
1385
    if (code) {
2,220✔
UNCOV
1386
      return code;
×
1387
    }
1388
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,220✔
1389
    if (code) {
2,220✔
UNCOV
1390
      return code;
×
1391
    }
1392
    return TSDB_CODE_SUCCESS;
2,220✔
1393
  }
1394

UNCOV
1395
  switch (*pNum) {
×
1396
    case 0:
×
1397
      break;
×
1398
    case UINT32_MAX:
×
1399
      *pNum = 0;
×
1400
      break;
×
1401
    default:
×
1402
      if (1 == (*pNum)) {
×
1403
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1404
        if (code) {
×
1405
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1406
          QRY_ERR_RET(code);
×
1407
        }
1408
      }
UNCOV
1409
      (*pNum)++;
×
1410
      break;
×
1411
  }
1412
  
UNCOV
1413
  return TSDB_CODE_SUCCESS;
×
1414
}
1415

1416

1417
static void freeStbJoinTableList(SStbJoinTableList* pList) {
70,683✔
1418
  if (NULL == pList) {
70,683✔
UNCOV
1419
    return;
×
1420
  }
1421
  taosMemoryFree(pList->pLeftVg);
70,683✔
1422
  taosMemoryFree(pList->pLeftUid);
70,683✔
1423
  taosMemoryFree(pList->pRightVg);
70,683✔
1424
  taosMemoryFree(pList->pRightUid);
70,683✔
1425
  taosMemoryFree(pList);
70,683✔
1426
}
1427

1428
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
71,179✔
1429
  int32_t code = TSDB_CODE_SUCCESS;
71,179✔
1430
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
71,179✔
1431
  if (NULL == pNew) {
71,179✔
UNCOV
1432
    return terrno;
×
1433
  }
1434
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
71,179✔
1435
  if (NULL == pNew->pLeftVg) {
71,179✔
UNCOV
1436
    code = terrno;
×
1437
    freeStbJoinTableList(pNew);
×
1438
    return code;
×
1439
  }
1440
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
71,179✔
1441
  if (NULL == pNew->pLeftUid) {
71,179✔
UNCOV
1442
    code = terrno;
×
1443
    freeStbJoinTableList(pNew);
×
1444
    return code;
×
1445
  }
1446
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
71,179✔
1447
  if (NULL == pNew->pRightVg) {
71,179✔
UNCOV
1448
    code = terrno;
×
1449
    freeStbJoinTableList(pNew);
×
1450
    return code;
×
1451
  }
1452
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
71,179✔
1453
  if (NULL == pNew->pRightUid) {
71,179✔
UNCOV
1454
    code = terrno;
×
1455
    freeStbJoinTableList(pNew);
×
1456
    return code;
×
1457
  }
1458

1459
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
71,179✔
1460
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
71,179✔
1461
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
71,179✔
1462
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
71,179✔
1463

1464
  pNew->readIdx = 0;
71,179✔
1465
  pNew->uidNum = rows;
71,179✔
1466
  pNew->pNext = NULL;
71,179✔
1467
  
1468
  if (pCtx->pListTail) {
71,179✔
UNCOV
1469
    pCtx->pListTail->pNext = pNew;
×
1470
    pCtx->pListTail = pNew;
×
1471
  } else {
1472
    pCtx->pListHead = pNew;
71,179✔
1473
    pCtx->pListTail= pNew;
71,179✔
1474
  }
1475

1476
  return TSDB_CODE_SUCCESS;
71,179✔
1477
}
1478

1479
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
71,179✔
1480
  int32_t                    code = TSDB_CODE_SUCCESS;
71,179✔
1481
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
71,179✔
1482
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
71,179✔
1483
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
71,179✔
1484
  if (NULL == pVg0) {
71,179✔
UNCOV
1485
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1486
  }
1487
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
71,179✔
1488
  if (NULL == pVg1) {
71,179✔
UNCOV
1489
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1490
  }
1491
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
71,179✔
1492
  if (NULL == pUid0) {
71,179✔
UNCOV
1493
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1494
  }
1495
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
71,179✔
1496
  if (NULL == pUid1) {
71,179✔
UNCOV
1497
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1498
  }
1499

1500
  if (pStbJoin->basic.batchFetch) {
71,179✔
1501
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,771,313✔
1502
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,701,244✔
1503
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,701,244✔
1504
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,701,244✔
1505
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,701,244✔
1506

1507
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,701,244✔
1508
      if (TSDB_CODE_SUCCESS != code) {
3,701,244✔
UNCOV
1509
        break;
×
1510
      }
1511
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,701,244✔
1512
      if (TSDB_CODE_SUCCESS != code) {
3,701,244✔
UNCOV
1513
        break;
×
1514
      }
1515
    }
1516
  } else {
1517
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,330✔
1518
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,220✔
1519
    
1520
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,220✔
1521
      if (TSDB_CODE_SUCCESS != code) {
2,220✔
UNCOV
1522
        break;
×
1523
      }
1524
    }
1525
  }
1526

1527
  if (TSDB_CODE_SUCCESS == code) {
71,179✔
1528
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
71,179✔
1529
    if (TSDB_CODE_SUCCESS == code) {
71,179✔
1530
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
71,179✔
1531
    }
1532
  }
1533

UNCOV
1534
_return:
×
1535

1536
  if (TSDB_CODE_SUCCESS != code) {
71,179✔
UNCOV
1537
    pOperator->pTaskInfo->code = code;
×
1538
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1539
  }
1540
}
71,179✔
1541

1542

1543
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,151,141✔
1544
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,151,141✔
1545
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,151,141✔
1546

1547
  if (pStbJoin->basic.batchFetch) {
1,151,141✔
1548
    return;
1,150,031✔
1549
  }
1550

1551
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,110✔
1552
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,110✔
1553
    return;
1,110✔
1554
  }
1555

UNCOV
1556
  uint64_t* pUid = NULL;
×
1557
  int32_t iter = 0;
×
1558
  int32_t code = 0;
×
1559
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1560
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1561
    if (code) {
×
1562
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1563
    }
1564
  }
1565

UNCOV
1566
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1567
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1568

1569
/*
1570
  // debug only
1571
  iter = 0;
1572
  uint32_t* num = NULL;
1573
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1574
    A S S E R T(*num > 1);
1575
  }
1576
*/  
1577
}
1578

1579
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,151,141✔
1580
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,151,141✔
1581
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,151,141✔
1582

1583
  while (true) {
71,179✔
1584
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,222,320✔
1585
    if (NULL == pBlock) {
1,222,320✔
1586
      break;
1,151,141✔
1587
    }
1588

1589
    pStbJoin->execInfo.prevBlkNum++;
71,179✔
1590
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
71,179✔
1591
    
1592
    doBuildStbJoinTableHash(pOperator, pBlock);
71,179✔
1593
  }
1594

1595
  postProcessStbJoinTableHash(pOperator);
1,151,141✔
1596

1597
  pStbJoin->ctx.prev.joinBuild = true;
1,151,141✔
1598
}
1,151,141✔
1599

1600
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
280,207✔
1601
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
280,207✔
1602
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
280,207✔
1603
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
280,207✔
1604
  SStbJoinTableList*         pNode = pPrev->pListHead;
280,207✔
1605

1606
  while (pNode) {
3,842,846✔
1607
    if (pNode->readIdx >= pNode->uidNum) {
3,772,163✔
1608
      pPrev->pListHead = pNode->pNext;
70,683✔
1609
      freeStbJoinTableList(pNode);
70,683✔
1610
      pNode = pPrev->pListHead;
70,683✔
1611
      continue;
70,683✔
1612
    }
1613
    
1614
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,701,480✔
1615
    if (*ppRes) {
3,701,480✔
1616
      return TSDB_CODE_SUCCESS;
209,524✔
1617
    }
1618

1619
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,491,956✔
1620
    pPrev->pListHead->readIdx++;
3,491,956✔
1621
  }
1622

1623
  *ppRes = NULL;
70,683✔
1624
  setOperatorCompleted(pOperator);
70,683✔
1625

1626
  return TSDB_CODE_SUCCESS;
70,683✔
1627
}
1628

1629
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,360,169✔
1630
  if (pBlock) {
1,360,169✔
1631
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
209,524✔
1632
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
209,524✔
1633
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
209,524✔
1634

1635
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
211,744✔
1636
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,220✔
1637
        if (pSlot == NULL) {
2,220✔
UNCOV
1638
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1639
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1640
        }
1641
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,220✔
1642
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,220✔
1643
        if (code != TSDB_CODE_SUCCESS) {
2,220✔
UNCOV
1644
          return code;
×
1645
        }
1646
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,220✔
1647
        if (code != TSDB_CODE_SUCCESS) {
2,220✔
UNCOV
1648
          return code;
×
1649
        }
1650
      }
1651
    } else {
UNCOV
1652
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1653
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1654
    }
1655
  }
1656
  return TSDB_CODE_SUCCESS;
1,360,169✔
1657
}
1658

1659
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,391,623✔
1660
  int32_t                    code = TSDB_CODE_SUCCESS;
1,391,623✔
1661
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,391,623✔
1662
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,391,623✔
1663

1664
  QRY_PARAM_CHECK(pRes);
1,391,623✔
1665
  if (pOperator->status == OP_EXEC_DONE) {
1,391,623✔
1666
    return code;
31,454✔
1667
  }
1668

1669
  int64_t st = 0;
1,360,169✔
1670
  if (pOperator->cost.openCost == 0) {
1,360,169✔
1671
    st = taosGetTimestampUs();
1,150,641✔
1672
  }
1673

1674
  if (!pStbJoin->ctx.prev.joinBuild) {
1,360,169✔
1675
    buildStbJoinTableList(pOperator);
1,151,141✔
1676
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,151,141✔
1677
      setOperatorCompleted(pOperator);
1,079,962✔
1678
      goto _return;
1,079,962✔
1679
    }
1680
  }
1681

1682
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
280,207✔
1683
  if (*pRes) {
280,207✔
UNCOV
1684
    goto _return;
×
1685
  }
1686

1687
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
280,207✔
1688

1689
_return:
280,207✔
1690
  if (pOperator->cost.openCost == 0) {
1,360,169✔
1691
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,150,641✔
1692
  }
1693

1694
  if (code) {
1,360,169✔
UNCOV
1695
    qError("%s failed since %s", __func__, tstrerror(code));
×
1696
    pOperator->pTaskInfo->code = code;
×
1697
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1698
  } else {
1699
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,360,169✔
1700
  }
1701
  return code;
1,360,169✔
1702
}
1703

1704
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,831,623✔
1705
  int32_t                    lino = 0;
2,831,623✔
1706
  SOperatorInfo*             operator=(SOperatorInfo*) param;
2,831,623✔
1707
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
2,831,623✔
1708

1709
  if (TSDB_CODE_SUCCESS != code) {
2,831,623✔
UNCOV
1710
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1711
    if (operator->pTaskInfo->code != code) {
×
1712
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1713
             tstrerror(operator->pTaskInfo->code));
1714
    } else {
UNCOV
1715
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1716
    }
UNCOV
1717
    goto _return;
×
1718
  }
1719

1720
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
2,831,623✔
1721
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
2,831,623✔
1722

1723
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
2,831,623✔
1724
  QUERY_CHECK_CODE(code, lino, _return);
2,831,623✔
1725

1726
  taosMemoryFreeClear(pMsg->pData);
2,831,623✔
1727

1728
  code = tsem_post(&pScanResInfo->vtbScan.ready);
2,831,623✔
1729
  QUERY_CHECK_CODE(code, lino, _return);
2,831,623✔
1730

1731
  return code;
2,831,623✔
UNCOV
1732
_return:
×
1733
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1734
  return code;
×
1735
}
1736

1737
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
2,831,623✔
1738
  int32_t                    code = TSDB_CODE_SUCCESS;
2,831,623✔
1739
  int32_t                    lino = 0;
2,831,623✔
1740
  char*                      buf1 = NULL;
2,831,623✔
1741
  SUseDbReq*                 pReq = NULL;
2,831,623✔
1742
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
2,831,623✔
1743

1744
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
2,831,623✔
1745
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
2,831,623✔
1746
  code = tNameGetFullDbName(name, pReq->db);
2,831,623✔
1747
  QUERY_CHECK_CODE(code, lino, _return);
2,831,623✔
1748
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
2,831,623✔
1749
  buf1 = taosMemoryCalloc(1, contLen);
2,831,623✔
1750
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
2,831,623✔
1751
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
2,831,623✔
1752
  if (tempRes < 0) {
2,831,623✔
UNCOV
1753
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1754
  }
1755

1756
  // send the fetch remote task result request
1757
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,831,623✔
1758
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
2,831,623✔
1759

1760
  pMsgSendInfo->param = pOperator;
2,831,623✔
1761
  pMsgSendInfo->msgInfo.pData = buf1;
2,831,623✔
1762
  pMsgSendInfo->msgInfo.len = contLen;
2,831,623✔
1763
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
2,831,623✔
1764
  pMsgSendInfo->fp = dynProcessUseDbRsp;
2,831,623✔
1765
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
2,831,623✔
1766

1767
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
2,831,623✔
1768
  QUERY_CHECK_CODE(code, lino, _return);
2,831,623✔
1769

1770
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
2,831,623✔
1771
  QUERY_CHECK_CODE(code, lino, _return);
2,831,623✔
1772

1773
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
2,831,623✔
1774
  QUERY_CHECK_CODE(code, lino, _return);
2,831,623✔
1775

1776
_return:
2,831,623✔
1777
  if (code) {
2,831,623✔
UNCOV
1778
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1779
     taosMemoryFree(buf1);
×
1780
  }
1781
  taosMemoryFree(pReq);
2,831,623✔
1782
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
2,831,623✔
1783
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
2,831,623✔
1784
  return code;
2,831,623✔
1785
}
1786

1787
int dynVgInfoComp(const void* lp, const void* rp) {
5,652,682✔
1788
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
5,652,682✔
1789
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
5,652,682✔
1790
  if (pLeft->hashBegin < pRight->hashBegin) {
5,652,682✔
1791
    return -1;
5,652,682✔
UNCOV
1792
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1793
    return 1;
×
1794
  }
1795

UNCOV
1796
  return 0;
×
1797
}
1798

1799
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
16,931,700✔
1800
  if (NULL == dbInfo) {
16,931,700✔
UNCOV
1801
    return TSDB_CODE_SUCCESS;
×
1802
  }
1803

1804
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
16,931,700✔
1805
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
2,831,623✔
1806
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
2,831,623✔
1807
    if (NULL == dbInfo->vgArray) {
2,831,623✔
UNCOV
1808
      return terrno;
×
1809
    }
1810

1811
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
2,831,623✔
1812
    while (pIter) {
8,489,587✔
1813
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
11,315,928✔
UNCOV
1814
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1815
        return terrno;
×
1816
      }
1817

1818
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
5,657,964✔
1819
    }
1820

1821
    taosArraySort(dbInfo->vgArray, sort_func);
2,831,623✔
1822
  }
1823

1824
  return TSDB_CODE_SUCCESS;
16,931,700✔
1825
}
1826

1827
int32_t dynHashValueComp(void const* lp, void const* rp) {
25,630,754✔
1828
  uint32_t*    key = (uint32_t*)lp;
25,630,754✔
1829
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
25,630,754✔
1830

1831
  if (*key < pVg->hashBegin) {
25,630,754✔
UNCOV
1832
    return -1;
×
1833
  } else if (*key > pVg->hashEnd) {
25,630,754✔
1834
    return 1;
8,699,054✔
1835
  }
1836

1837
  return 0;
16,931,700✔
1838
}
1839

1840
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
16,931,700✔
1841
  int32_t code = 0;
16,931,700✔
1842
  int32_t lino = 0;
16,931,700✔
1843
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
16,931,700✔
1844
  QUERY_CHECK_CODE(code, lino, _return);
16,931,700✔
1845

1846
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
16,931,700✔
1847
  if (vgNum <= 0) {
16,931,700✔
UNCOV
1848
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1849
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1850
  }
1851

1852
  SVgroupInfo* vgInfo = NULL;
16,931,700✔
1853
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
16,931,700✔
1854
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
16,931,700✔
1855
  int32_t offset = (int32_t)strlen(tbFullName);
16,931,700✔
1856

1857
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
16,931,700✔
1858
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
33,863,400✔
1859
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
16,931,700✔
1860

1861
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
16,931,700✔
1862
  if (NULL == vgInfo) {
16,931,700✔
UNCOV
1863
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1864
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
UNCOV
1865
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1866
  }
1867

1868
  *vgId = vgInfo->vgId;
16,931,700✔
1869

1870
_return:
16,931,700✔
1871
  return code;
16,931,700✔
1872
}
1873

1874
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
141,887,685✔
1875
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
141,887,685✔
1876
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
141,887,685✔
1877
  SArray *                   pColList = pVtbScan->readColList;
141,887,685✔
1878
  if (pVtbScan->scanAllCols) {
141,887,685✔
1879
    return true;
11,221,794✔
1880
  }
1881
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
756,218,926✔
1882
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
661,572,005✔
1883
      return true;
36,018,970✔
1884
    }
1885
  }
1886
  return false;
94,646,921✔
1887
}
1888

1889
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
47,266,252✔
1890
  int32_t                    code = TSDB_CODE_SUCCESS;
47,266,252✔
1891
  int32_t                    line = 0;
47,266,252✔
1892
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
47,266,252✔
1893
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
47,266,252✔
1894
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
47,266,252✔
1895
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
47,266,252✔
1896
  SUseDbOutput*              output = NULL;
47,266,252✔
1897
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
47,266,252✔
1898

1899
  QRY_PARAM_CHECK(dbVgInfo);
47,266,252✔
1900

1901
  if (find == NULL) {
47,266,252✔
1902
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
2,831,623✔
1903
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
2,831,623✔
1904
    QUERY_CHECK_CODE(code, line, _return);
2,831,623✔
1905
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
2,831,623✔
1906
    QUERY_CHECK_CODE(code, line, _return);
2,831,623✔
1907
  } else {
1908
    output = *find;
44,434,629✔
1909
  }
1910

1911
  *dbVgInfo = output->dbVgroup;
47,266,252✔
1912
  return code;
47,266,252✔
UNCOV
1913
_return:
×
1914
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1915
  freeUseDbOutput(output);
×
1916
  return code;
×
1917
}
1918

1919
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
47,266,252✔
1920
  int32_t     code = TSDB_CODE_SUCCESS;
47,266,252✔
1921
  int32_t     line = 0;
47,266,252✔
1922

1923
  const char *first_dot = strchr(colref, '.');
47,266,252✔
1924
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
47,266,252✔
1925

1926
  const char *second_dot = strchr(first_dot + 1, '.');
47,266,252✔
1927
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
47,266,252✔
1928

1929
  size_t db_len = first_dot - colref;
47,266,252✔
1930
  size_t table_len = second_dot - first_dot - 1;
47,266,252✔
1931
  size_t col_len = strlen(second_dot + 1);
47,266,252✔
1932

1933
  *refDb = taosMemoryMalloc(db_len + 1);
47,266,252✔
1934
  *refTb = taosMemoryMalloc(table_len + 1);
47,266,252✔
1935
  *refCol = taosMemoryMalloc(col_len + 1);
47,266,252✔
1936
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
47,266,252✔
1937
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
47,266,252✔
1938
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
47,266,252✔
1939

1940
  tstrncpy(*refDb, colref, db_len + 1);
47,266,252✔
1941
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
47,266,252✔
1942
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
47,266,252✔
1943

1944
  return TSDB_CODE_SUCCESS;
47,266,252✔
UNCOV
1945
_return:
×
1946
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1947
  if (*refDb) {
×
1948
    taosMemoryFree(*refDb);
×
1949
    *refDb = NULL;
×
1950
  }
UNCOV
1951
  if (*refTb) {
×
1952
    taosMemoryFree(*refTb);
×
1953
    *refTb = NULL;
×
1954
  }
UNCOV
1955
  if (*refCol) {
×
1956
    taosMemoryFree(*refCol);
×
1957
    *refCol = NULL;
×
1958
  }
UNCOV
1959
  return code;
×
1960
}
1961

1962
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
350,287,273✔
1963
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
350,287,273✔
1964
      strlen(expectTbName) == varDataLen(tbName) &&
227,566,237✔
1965
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
227,566,237✔
1966
      strlen(expectDbName) == varDataLen(dbName)) {
227,566,237✔
1967
    return true;
227,566,237✔
1968
  }
1969
  return false;
122,721,036✔
1970
}
1971

1972
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
227,566,237✔
1973
  int32_t          code = TSDB_CODE_SUCCESS;
227,566,237✔
1974
  int32_t          line = 0;
227,566,237✔
1975

1976
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
227,566,237✔
1977
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
227,566,237✔
1978
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
227,566,237✔
1979
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
227,566,237✔
1980
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
227,566,237✔
1981
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
227,566,237✔
1982

1983
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
227,566,237✔
1984
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
227,566,237✔
1985
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
227,566,237✔
1986
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
227,566,237✔
1987
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
227,566,237✔
1988
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
227,566,237✔
1989

1990
  if (colDataIsNull_s(pRefCol, index)) {
455,132,474✔
1991
    pInfo->colrefName = NULL;
86,042,944✔
1992
  } else {
1993
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
141,523,293✔
1994
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
141,523,293✔
1995
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
141,523,293✔
1996
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
141,523,293✔
1997
  }
1998

1999
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
227,566,237✔
2000
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
227,566,237✔
2001
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
227,566,237✔
2002
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
227,566,237✔
2003

2004
  if (!colDataIsNull_s(pUidCol, index)) {
455,132,474✔
2005
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
227,566,237✔
2006
  }
2007
  if (!colDataIsNull_s(pColIdCol, index)) {
455,132,474✔
2008
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
141,523,293✔
2009
  }
2010
  if (!colDataIsNull_s(pVgIdCol, index)) {
455,132,474✔
2011
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
227,566,237✔
2012
  }
2013

UNCOV
2014
_return:
×
2015
  return code;
227,566,237✔
2016
}
2017

2018
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,726,806✔
2019
  int32_t                    code = TSDB_CODE_SUCCESS;
1,726,806✔
2020
  int32_t                    line = 0;
1,726,806✔
2021

2022
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,726,806✔
2023
    return code;
1,629,687✔
2024
  }
2025

2026
  if (pVtbScan->existOrgTbVg == NULL) {
97,119✔
UNCOV
2027
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2028
    pVtbScan->curOrgTbVg = NULL;
×
2029
  }
2030

2031
  if (pVtbScan->curOrgTbVg != NULL) {
97,119✔
2032
    // which means rversion has changed
2033
    void*   pCurIter = NULL;
9,936✔
2034
    SArray* tmpArray = NULL;
9,936✔
2035
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
28,728✔
2036
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
18,792✔
2037
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
18,792✔
2038
        if (tmpArray == NULL) {
2,376✔
2039
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,376✔
2040
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,376✔
2041
        }
2042
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,376✔
2043
      }
2044
    }
2045
    if (tmpArray == NULL) {
9,936✔
2046
      return TSDB_CODE_SUCCESS;
7,560✔
2047
    }
2048
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,376✔
2049
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,376✔
2050
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,376✔
UNCOV
2051
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2052
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2053
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2054
        }
UNCOV
2055
        taosArrayDestroy(expiredInfo);
×
2056
      }
2057
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,376✔
UNCOV
2058
        taosArrayDestroy(tmpArray);
×
2059
      }
2060
    }
2061
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,376✔
2062
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,376✔
2063
    taosHashClear(pVtbScan->curOrgTbVg);
2,376✔
2064
    pVtbScan->needRedeploy = true;
2,376✔
2065
    pVtbScan->rversion = rversion;
2,376✔
2066
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,376✔
2067
  }
2068
  return code;
87,183✔
UNCOV
2069
_return:
×
2070
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2071
  return code;
×
2072
}
2073

2074
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
25,488✔
2075
  int32_t                    code =TSDB_CODE_SUCCESS;
25,488✔
2076
  int32_t                    line = 0;
25,488✔
2077
  char*                      refDbName = NULL;
25,488✔
2078
  char*                      refTbName = NULL;
25,488✔
2079
  char*                      refColName = NULL;
25,488✔
2080
  SDBVgInfo*                 dbVgInfo = NULL;
25,488✔
2081
  SName                      name = {0};
25,488✔
2082
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
25,488✔
2083
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
25,488✔
2084

2085
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
25,488✔
2086
  QUERY_CHECK_CODE(code, line, _return);
25,488✔
2087

2088
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
25,488✔
2089

2090
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
25,488✔
2091
  QUERY_CHECK_CODE(code, line, _return);
25,488✔
2092

2093
  code = tNameGetFullDbName(&name, dbFname);
25,488✔
2094
  QUERY_CHECK_CODE(code, line, _return);
25,488✔
2095

2096
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
25,488✔
2097
  QUERY_CHECK_CODE(code, line, _return);
25,488✔
2098

2099
_return:
25,488✔
2100
  if (code) {
25,488✔
UNCOV
2101
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2102
  }
2103
  taosMemoryFree(refDbName);
25,488✔
2104
  taosMemoryFree(refTbName);
25,488✔
2105
  taosMemoryFree(refColName);
25,488✔
2106
  return code;
25,488✔
2107
}
2108

2109
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
2,618,128✔
2110
  int32_t code = TSDB_CODE_SUCCESS;
2,618,128✔
2111
  int32_t line = 0;
2,618,128✔
2112
  STagVal tagVal = {0};
2,618,128✔
2113
  // last col is uid
2114

2115
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
2,618,128✔
2116
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
2,618,128✔
2117

2118
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
18,885,080✔
2119
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
16,266,952✔
2120
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
16,266,952✔
2121
    tagVal.type = pTagCol->info.type;
16,266,952✔
2122
    tagVal.cid = pTagCol->info.colId;
16,266,952✔
2123
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
32,533,904✔
2124
      char*   pData = colDataGetData(pTagCol, rowIdx);
16,266,952✔
2125
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
16,266,952✔
2126
        tagVal.nData = varDataLen(pData);
7,185,492✔
2127
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
7,185,492✔
2128
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
7,185,492✔
2129
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
7,185,492✔
2130
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
14,370,984✔
2131
      } else {
2132
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
9,081,460✔
2133
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
18,162,920✔
2134
      }
2135
    } else {
UNCOV
2136
      tagVal.pData = NULL;
×
2137
      tagVal.nData = 0;
×
2138
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2139
    }
2140
    tagVal = (STagVal){0};
16,266,952✔
2141
  }
2142
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
2,618,128✔
2143
  QUERY_CHECK_CODE(code, line, _return);
2,618,128✔
2144

2145
  return code;
2,618,128✔
UNCOV
2146
_return:
×
2147
  if (tagVal.pData) {
×
2148
    taosMemoryFreeClear(tagVal.pData);
×
2149
  }
UNCOV
2150
  if (pTagList) {
×
2151
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2152
  }
UNCOV
2153
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2154
  return code;
×
2155
}
2156

2157
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
12,137,997✔
2158
  int32_t                    code = TSDB_CODE_SUCCESS;
12,137,997✔
2159
  int32_t                    line = 0;
12,137,997✔
2160
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,137,997✔
2161
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,137,997✔
2162
  SDBVgInfo*                 dbVgInfo = NULL;
12,137,997✔
2163

2164
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
240,300,394✔
2165
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
228,162,397✔
2166
    *uid = pKV->uid;
228,162,397✔
2167
    *vgId = pKV->vgId;
228,162,397✔
2168
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
228,162,397✔
2169
      char*   refDbName = NULL;
47,240,764✔
2170
      char*   refTbName = NULL;
47,240,764✔
2171
      char*   refColName = NULL;
47,240,764✔
2172
      SName   name = {0};
47,240,764✔
2173
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
47,240,764✔
2174
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
47,240,764✔
2175

2176
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
47,240,764✔
2177
      QUERY_CHECK_CODE(code, line, _return);
47,240,764✔
2178

2179
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
47,240,764✔
2180

2181
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
47,240,764✔
2182
      QUERY_CHECK_CODE(code, line, _return);
47,240,764✔
2183
      code = tNameGetFullDbName(&name, dbFname);
47,240,764✔
2184
      QUERY_CHECK_CODE(code, line, _return);
47,240,764✔
2185
      code = tNameGetFullTableName(&name, orgTbFName);
47,240,764✔
2186
      QUERY_CHECK_CODE(code, line, _return);
47,240,764✔
2187

2188
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
47,240,764✔
2189
      if (!pVal) {
47,240,764✔
2190
        SOrgTbInfo orgTbInfo = {0};
16,906,212✔
2191
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
16,906,212✔
2192
        QUERY_CHECK_CODE(code, line, _return);
16,906,212✔
2193
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
16,906,212✔
2194
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
16,906,212✔
2195
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
16,906,212✔
2196
        SColIdNameKV colIdNameKV = {0};
16,906,212✔
2197
        colIdNameKV.colId = pKV->colId;
16,906,212✔
2198
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
16,906,212✔
2199
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
33,812,424✔
2200
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
16,906,212✔
2201
        QUERY_CHECK_CODE(code, line, _return);
16,906,212✔
2202
      } else {
2203
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
30,334,552✔
2204
        SColIdNameKV colIdNameKV = {0};
30,334,552✔
2205
        colIdNameKV.colId = pKV->colId;
30,334,552✔
2206
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
30,334,552✔
2207
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
60,669,104✔
2208
      }
2209
      taosMemoryFree(refDbName);
47,240,764✔
2210
      taosMemoryFree(refTbName);
47,240,764✔
2211
      taosMemoryFree(refColName);
47,240,764✔
2212
    }
2213
  }
2214

2215
_return:
12,137,997✔
2216
  if (code) {
12,137,997✔
UNCOV
2217
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2218
  }
2219
  return code;
12,137,997✔
2220
}
2221

2222
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
293,452✔
2223
  int32_t                    code = TSDB_CODE_SUCCESS;
293,452✔
2224
  int32_t                    line = 0;
293,452✔
2225
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
293,452✔
2226
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
293,452✔
2227
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
293,452✔
2228
  SArray*                    pColRefArray = NULL;
293,452✔
2229
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
293,452✔
2230
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
293,452✔
2231

2232
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
293,452✔
2233
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
293,452✔
2234
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
293,452✔
2235
  if (hasPartition) {
293,452✔
2236
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
240,720✔
2237
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
240,720✔
2238
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
240,720✔
2239
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
240,720✔
2240
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
240,720✔
2241
  }
2242

2243
  while (true) {
1,102,446✔
2244
    SSDataBlock *pTagVal = NULL;
1,395,898✔
2245
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
1,395,898✔
2246
    QUERY_CHECK_CODE(code, line, _return);
1,395,898✔
2247
    if (pTagVal == NULL) {
1,395,898✔
2248
      break;
293,452✔
2249
    }
2250
    SHashObj *vtbUidTagListMap = NULL;
1,102,446✔
2251
    if (hasPartition) {
1,102,446✔
2252
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
996,982✔
2253
      if (pIter) {
996,982✔
2254
        vtbUidTagListMap = *(SHashObj**)pIter;
10,030✔
2255
      } else {
2256
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
986,952✔
2257
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
986,952✔
2258
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
986,952✔
2259

2260
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
986,952✔
2261
        QUERY_CHECK_CODE(code, line, _return);
986,952✔
2262
      }
2263
    } else {
2264
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
105,464✔
2265
    }
2266

2267
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,102,446✔
2268
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,102,446✔
2269
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
3,720,574✔
2270
      tb_uid_t uid = 0;
2,618,128✔
2271
      if (!colDataIsNull_s(pUidCol, i)) {
5,236,256✔
2272
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
2,618,128✔
2273
        QUERY_CHECK_CODE(code, line, _return);
2,618,128✔
2274
      }
2275

2276
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
2,618,128✔
2277
      QUERY_CHECK_CODE(code, line, _return);
2,618,128✔
2278

2279
      if (hasPartition) {
2,618,128✔
2280
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
2,166,480✔
2281
        QUERY_CHECK_CODE(code, line, _return);
2,166,480✔
2282
      }
2283
    }
2284
  }
2285

2286
  return code;
293,452✔
2287

UNCOV
2288
_return:
×
2289
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2290
  return code;
×
2291
}
2292

2293
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
293,452✔
2294
  int32_t                    code = TSDB_CODE_SUCCESS;
293,452✔
2295
  int32_t                    line = 0;
293,452✔
2296
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
293,452✔
2297
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
293,452✔
2298
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
293,452✔
2299
  SArray*                    pColRefArray = NULL;
293,452✔
2300
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
293,452✔
2301
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
293,452✔
2302

2303
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
293,452✔
2304
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
293,452✔
2305

2306
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
2,269,660✔
2307
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,976,208✔
2308
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,976,208✔
2309

2310
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
1,976,208✔
2311
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
1,976,208✔
2312

2313
    tb_uid_t uid = 0;
1,976,208✔
2314
    int32_t  vgId = 0;
1,976,208✔
2315
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
1,976,208✔
2316
    QUERY_CHECK_CODE(code, line, _return);
1,976,208✔
2317

2318
    size_t len = 0;
1,976,208✔
2319
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
1,976,208✔
2320
    while (pOrgTbInfo != NULL) {
4,729,612✔
2321
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
2,753,404✔
2322
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
2,753,404✔
2323

2324
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
2,753,404✔
2325
      if (!pIter) {
2,753,404✔
2326
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
2,350,264✔
2327
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,350,264✔
2328
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
4,700,528✔
2329
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
2,350,264✔
2330
        QUERY_CHECK_CODE(code, line, _return);
2,350,264✔
2331
      } else {
2332
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
403,140✔
2333
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
403,140✔
2334
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
403,140✔
2335
      }
2336

2337
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
2,753,404✔
2338

2339
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
2,753,404✔
2340
      QUERY_CHECK_CODE(code, line, _return);
2,753,404✔
2341
    }
2342

2343
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,976,208✔
2344
    QUERY_CHECK_CODE(code, line, _return);
1,976,208✔
2345
  }
2346

2347
  return code;
293,452✔
UNCOV
2348
_return:
×
2349
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2350
  return code;
×
2351
}
2352

2353
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
293,452✔
2354
  int32_t                    code = TSDB_CODE_SUCCESS;
293,452✔
2355
  int32_t                    line = 0;
293,452✔
2356

2357
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
293,452✔
2358
  QUERY_CHECK_CODE(code, line, _return);
293,452✔
2359

2360
  // process tag
2361
  code = getTagBlockAndProcess(pOperator, hasPartition);
293,452✔
2362
  QUERY_CHECK_CODE(code, line, _return);
293,452✔
2363

2364
  return code;
293,452✔
UNCOV
2365
_return:
×
2366
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2367
  return code;
×
2368
}
2369

2370
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
513,738✔
2371
  int32_t                    code = TSDB_CODE_SUCCESS;
513,738✔
2372
  int32_t                    line = 0;
513,738✔
2373
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
513,738✔
2374
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
513,738✔
2375
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
513,738✔
2376
  SArray*                    pColRefArray = NULL;
513,738✔
2377
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
513,738✔
2378
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
513,738✔
2379

2380
  if (hasPartition) {
513,738✔
2381
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
419,878✔
2382
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
419,878✔
2383
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
419,878✔
2384

2385
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
419,878✔
2386
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
419,878✔
2387
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
419,878✔
2388
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
419,878✔
2389
  } else {
2390
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
93,860✔
2391
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
93,860✔
2392
  }
2393

2394
  while (true && hasPartition) {
2,256,439✔
2395
    SSDataBlock* pTagVal = NULL;
2,162,579✔
2396
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
2,162,579✔
2397
    QUERY_CHECK_CODE(code, line, _return);
2,162,579✔
2398
    if (pTagVal == NULL) {
2,162,579✔
2399
      break;
419,878✔
2400
    }
2401

2402
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,742,701✔
2403
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,742,701✔
2404
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
5,498,423✔
2405
      tb_uid_t uid = 0;
3,755,722✔
2406
      if (!colDataIsNull_s(pUidCol, i)) {
7,511,444✔
2407
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
3,755,722✔
2408
        QUERY_CHECK_CODE(code, line, _return);
3,755,722✔
2409
      }
2410
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
3,755,722✔
2411
      QUERY_CHECK_CODE(code, line, _return);
3,755,722✔
2412
    }
2413
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
1,742,701✔
2414
    QUERY_CHECK_CODE(code, line, _return);
1,742,701✔
2415
  }
2416

2417
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
4,248,715✔
2418
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
3,734,977✔
2419
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
3,734,977✔
2420
    tb_uid_t uid = 0;
3,734,977✔
2421
    int32_t  vgId = 0;
3,734,977✔
2422
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
3,734,977✔
2423
    QUERY_CHECK_CODE(code, line, _return);
3,734,977✔
2424

2425
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
3,734,977✔
2426
    if (hasPartition) {
3,734,977✔
2427
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
3,113,802✔
2428
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
3,113,802✔
2429

2430
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
3,113,802✔
2431
      if (pHashIter) {
3,113,802✔
2432
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
1,611,821✔
2433
      } else {
2434
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,501,981✔
2435
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,501,981✔
2436
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,501,981✔
2437
        QUERY_CHECK_CODE(code, line, _return);
1,501,981✔
2438
      }
2439
    } else {
2440
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
621,175✔
2441
    }
2442

2443
    size_t len = 0;
3,734,977✔
2444
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
3,734,977✔
2445
    while (pOrgTbInfo != NULL) {
8,188,769✔
2446
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
4,453,792✔
2447
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
4,453,792✔
2448
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
4,453,792✔
2449
      if (!pIter) {
4,453,792✔
2450
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
2,392,435✔
2451
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,392,435✔
2452
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
4,784,870✔
2453
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
2,392,435✔
2454
        QUERY_CHECK_CODE(code, line, _return);
2,392,435✔
2455
      } else {
2456
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
2,061,357✔
2457
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,061,357✔
2458
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
2,061,357✔
2459
      }
2460

2461
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
4,453,792✔
2462

2463
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
4,453,792✔
2464
      QUERY_CHECK_CODE(code, line, _return);
4,453,792✔
2465
    }
2466
  }
2467
  return code;
513,738✔
UNCOV
2468
_return:
×
2469
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2470
  return code;
×
2471
}
2472

2473
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
2,442,493✔
2474
  int32_t                    code = TSDB_CODE_SUCCESS;
2,442,493✔
2475
  int32_t                    line = 0;
2,442,493✔
2476
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,442,493✔
2477
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,442,493✔
2478
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,442,493✔
2479
  SArray*                    pColRefArray = NULL;
2,442,493✔
2480
  SOperatorInfo*             pSystableScanOp = NULL;
2,442,493✔
2481
  
2482
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,442,493✔
2483
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
2,442,493✔
2484

2485
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
2,442,493✔
2486
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
807,190✔
2487
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
807,190✔
2488
    pSystableScanOp = pOperator->pDownstream[0];
807,190✔
2489
  } else {
2490
    pSystableScanOp = pOperator->pDownstream[1];
1,635,303✔
2491
  }
2492

2493
  while (true) {
4,887,247✔
2494
    SSDataBlock *pChildInfo = NULL;
7,329,740✔
2495
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
7,329,740✔
2496
    QUERY_CHECK_CODE(code, line, _return);
7,329,740✔
2497
    if (pChildInfo == NULL) {
7,329,740✔
2498
      break;
2,442,493✔
2499
    }
2500
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
4,887,247✔
2501
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
4,887,247✔
2502
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
4,887,247✔
2503

2504
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
4,887,247✔
2505
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
4,887,247✔
2506
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
4,887,247✔
2507

2508
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
348,800,393✔
2509
      if (!colDataIsNull_s(pStbNameCol, i)) {
687,826,292✔
2510
        char* stbrawname = colDataGetData(pStbNameCol, i);
343,913,146✔
2511
        char* dbrawname = colDataGetData(pDbNameCol, i);
343,913,146✔
2512
        char *ctbName = colDataGetData(pTableNameCol, i);
343,913,146✔
2513

2514
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
343,913,146✔
2515
          SColRefInfo info = {0};
227,114,578✔
2516
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
227,114,578✔
2517
          QUERY_CHECK_CODE(code, line, _return);
227,114,578✔
2518

2519
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
227,114,578✔
2520
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
135,735,618✔
UNCOV
2521
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2522
                     pInfo->vtbScan.dynTbUid);
UNCOV
2523
              destroyColRefInfo(&info);
×
2524
              continue;
×
2525
            }
2526

2527
            if (pTaskInfo->pStreamRuntimeInfo) {
135,735,618✔
2528
              if (pVtbScan->curOrgTbVg == NULL) {
34,560✔
2529
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,080✔
2530
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,080✔
2531
              }
2532

2533
              if (info.colrefName) {
34,560✔
2534
                int32_t vgId;
18,576✔
2535
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
18,576✔
2536
                QUERY_CHECK_CODE(code, line, _return);
18,576✔
2537
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
18,576✔
2538
                QUERY_CHECK_CODE(code, line, _return);
18,576✔
2539
              }
2540
            }
2541
          }
2542

2543
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
227,114,578✔
2544
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
12,011,502✔
2545
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
12,011,502✔
2546
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
24,023,004✔
2547
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
12,011,502✔
2548
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
24,023,004✔
2549
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
12,011,502✔
2550
            QUERY_CHECK_CODE(code, line, _return);
12,011,502✔
2551
          } else {
2552
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
215,103,076✔
2553
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
215,103,076✔
2554
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
215,103,076✔
2555
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
215,103,076✔
2556
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
430,206,152✔
2557
          }
2558
        }
2559
      }
2560
    }
2561
  }
2562

2563
  switch (pInfo->qType) {
2,442,493✔
2564
    case DYN_QTYPE_VTB_AGG: {
807,190✔
2565
      if (pVtbScan->batchProcessChild) {
807,190✔
2566
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
513,738✔
2567
      } else {
2568
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
293,452✔
2569
      }
2570
      break;
807,190✔
2571
    }
2572
    case DYN_QTYPE_VTB_SCAN: {
1,635,303✔
2573
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,635,303✔
2574
      break;
1,635,303✔
2575
    }
UNCOV
2576
    default: {
×
2577
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2578
      break;
×
2579
    }
2580
  }
2581

2582
  QUERY_CHECK_CODE(code, line, _return);
2,442,493✔
2583

2584
_return:
2,442,493✔
2585
  if (code) {
2,442,493✔
2586
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,080✔
2587
  }
2588
  return code;
2,442,493✔
2589
}
2590

2591
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
91,503✔
2592
  int32_t                    code = TSDB_CODE_SUCCESS;
91,503✔
2593
  int32_t                    line = 0;
91,503✔
2594
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
91,503✔
2595
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
91,503✔
2596
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
91,503✔
2597
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
91,503✔
2598
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
91,503✔
2599
  int32_t                    rversion = 0;
91,503✔
2600

2601
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
91,503✔
2602
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
91,503✔
2603

2604
  while (true) {
178,622✔
2605
    SSDataBlock *pTableInfo = NULL;
270,125✔
2606
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
270,125✔
2607
    if (pTableInfo == NULL) {
270,125✔
2608
      break;
91,503✔
2609
    }
2610

2611
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
178,622✔
2612
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
178,622✔
2613
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
178,622✔
2614

2615
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
178,622✔
2616
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
178,622✔
2617
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
178,622✔
2618

2619
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
6,552,749✔
2620
      if (!colDataIsNull_s(pRefVerCol, i)) {
12,748,254✔
2621
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
6,374,127✔
2622
      }
2623

2624
      if (!colDataIsNull_s(pTableNameCol, i)) {
12,748,254✔
2625
        char* tbrawname = colDataGetData(pTableNameCol, i);
6,374,127✔
2626
        char* dbrawname = colDataGetData(pDbNameCol, i);
6,374,127✔
2627
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
6,374,127✔
2628
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
6,374,127✔
2629

2630
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
6,374,127✔
2631
          SColRefInfo info = {0};
451,659✔
2632
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
451,659✔
2633
          QUERY_CHECK_CODE(code, line, _return);
451,659✔
2634

2635
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
451,659✔
2636
            if (pVtbScan->curOrgTbVg == NULL) {
6,912✔
2637
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
432✔
2638
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
432✔
2639
            }
2640
            int32_t vgId;
6,912✔
2641
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,912✔
2642
            QUERY_CHECK_CODE(code, line, _return);
6,912✔
2643
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,912✔
2644
            QUERY_CHECK_CODE(code, line, _return);
6,912✔
2645
          }
2646

2647
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
903,318✔
2648
        }
2649
      }
2650
    }
2651
  }
2652
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
91,503✔
2653
  QUERY_CHECK_CODE(code, line, _return);
91,503✔
2654

2655
_return:
90,207✔
2656
  if (code) {
91,503✔
2657
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,296✔
2658
  }
2659
  return code;
91,503✔
2660
}
2661

2662
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,111,184✔
2663
  int32_t                    code = TSDB_CODE_SUCCESS;
2,111,184✔
2664
  int32_t                    line = 0;
2,111,184✔
2665
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,111,184✔
2666
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,111,184✔
2667
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,111,184✔
2668

2669
  SArray *tmpArray = NULL;
2,111,400✔
2670
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,111,400✔
2671
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,111,400✔
2672
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,752✔
2673
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,376✔
2674
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,376✔
2675
      QUERY_CHECK_CODE(code, line, _return);
2,376✔
2676
      if (pVtbScan->newAddedVgInfo == NULL) {
2,376✔
2677
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
864✔
2678
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
864✔
2679
      }
2680
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,376✔
2681
      QUERY_CHECK_CODE(code, line, _return);
2,376✔
2682
    }
2683
    pVtbScan->needRedeploy = false;
2,376✔
2684
  } else {
2685
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,108,808✔
2686
    QUERY_CHECK_CODE(code, line, _return);
2,108,808✔
2687
  }
2688

UNCOV
2689
_return:
×
2690
  taosArrayClear(tmpArray);
2,111,184✔
2691
  taosArrayDestroy(tmpArray);
2,111,400✔
2692
  if (code) {
2,111,400✔
2693
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,109,024✔
2694
  }
2695
  return code;
2,111,400✔
2696
}
2697

2698
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
6,426,812✔
2699
  int32_t                    code = TSDB_CODE_SUCCESS;
6,426,812✔
2700
  int32_t                    line = 0;
6,426,812✔
2701
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
6,426,812✔
2702
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
6,426,812✔
2703
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
6,426,812✔
2704

2705
  pVtbScan->vtbScanParam = NULL;
6,426,812✔
2706
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
6,426,812✔
2707
  QUERY_CHECK_CODE(code, line, _return);
6,426,812✔
2708

2709
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
6,426,812✔
2710
  while (pIter != NULL) {
16,125,828✔
2711
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
9,699,016✔
2712
    SOperatorParam*  pExchangeParam = NULL;
9,699,016✔
2713
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
9,699,016✔
2714
    if (addr != NULL) {
9,699,016✔
2715
      SDownstreamSourceNode newSource = {0};
2,376✔
2716
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,376✔
2717
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,376✔
2718
      newSource.taskId = addr->taskId;
2,376✔
2719
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,376✔
2720
      newSource.localExec = false;
2,376✔
2721
      newSource.addr.nodeId = addr->nodeId;
2,376✔
2722
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,376✔
2723

2724
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,376✔
2725
      QUERY_CHECK_CODE(code, line, _return);
2,376✔
2726
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,376✔
2727
      QUERY_CHECK_CODE(code, line, _return);
2,376✔
2728
    } else {
2729
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
9,696,640✔
2730
      QUERY_CHECK_CODE(code, line, _return);
9,696,640✔
2731
    }
2732
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
19,398,032✔
2733
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
9,699,016✔
2734
  }
2735

2736
  SOperatorParam*  pExchangeParam = NULL;
6,426,812✔
2737
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
6,426,812✔
2738
  QUERY_CHECK_CODE(code, line, _return);
6,426,812✔
2739
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
6,426,812✔
2740

2741
_return:
6,426,812✔
2742
  if (code) {
6,426,812✔
UNCOV
2743
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2744
  }
2745
  return code;
6,426,812✔
2746
}
2747

2748
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
18,386,735✔
2749
  int32_t                    code = TSDB_CODE_SUCCESS;
18,386,735✔
2750
  int32_t                    line = 0;
18,386,735✔
2751
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
18,386,735✔
2752
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
18,386,735✔
2753
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
18,386,735✔
2754

2755
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
18,386,735✔
2756
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
18,386,735✔
2757
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
18,386,735✔
2758

2759
  while (true) {
2760
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
23,120,545✔
2761
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
16,693,733✔
2762
      QUERY_CHECK_CODE(code, line, _return);
16,693,733✔
2763
    } else {
2764
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
6,426,812✔
2765
      SArray* pColRefInfo = NULL;
6,426,812✔
2766
      if (pVtbScan->isSuperTable) {
6,426,812✔
2767
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
6,336,605✔
2768
      } else {
2769
        pColRefInfo = pInfo->vtbScan.colRefInfo;
90,207✔
2770
      }
2771
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
6,426,812✔
2772

2773
      tb_uid_t uid = 0;
6,426,812✔
2774
      int32_t  vgId = 0;
6,426,812✔
2775
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
6,426,812✔
2776
      QUERY_CHECK_CODE(code, line, _return);
6,426,812✔
2777

2778
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
6,426,812✔
2779
      QUERY_CHECK_CODE(code, line, _return);
6,426,812✔
2780

2781
      // reset downstream operator's status
2782
      pVtbScanOp->status = OP_NOT_OPENED;
6,426,812✔
2783
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
6,426,812✔
2784
      QUERY_CHECK_CODE(code, line, _return);
6,426,183✔
2785
    }
2786

2787
    if (*pRes) {
23,119,916✔
2788
      // has result, still read data from this table.
2789
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
16,698,321✔
2790
      break;
16,698,321✔
2791
    } else {
2792
      // no result, read next table.
2793
      pVtbScan->curTableIdx++;
6,421,595✔
2794
      if (pVtbScan->isSuperTable) {
6,421,595✔
2795
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
6,331,388✔
2796
          setOperatorCompleted(pOperator);
1,597,578✔
2797
          break;
1,597,578✔
2798
        }
2799
      } else {
2800
        setOperatorCompleted(pOperator);
90,207✔
2801
        break;
90,207✔
2802
      }
2803
    }
2804
  }
2805

2806
_return:
18,386,106✔
2807
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
18,386,106✔
2808
  pVtbScan->otbNameToOtbInfoMap = NULL;
18,386,106✔
2809
  if (code) {
18,386,106✔
UNCOV
2810
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2811
  }
2812
  return code;
18,386,106✔
2813
}
2814

2815
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
18,430,043✔
2816
  int32_t                    code = TSDB_CODE_SUCCESS;
18,430,043✔
2817
  int32_t                    line = 0;
18,430,043✔
2818
  int64_t                    st = 0;
18,430,043✔
2819
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
18,430,043✔
2820
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
18,430,043✔
2821

2822
  if (OPTR_IS_OPENED(pOperator)) {
18,430,043✔
2823
    return code;
16,703,237✔
2824
  }
2825

2826
  if (pOperator->cost.openCost == 0) {
1,726,806✔
2827
    st = taosGetTimestampUs();
1,661,126✔
2828
  }
2829

2830
  if (pVtbScan->isSuperTable) {
1,726,806✔
2831
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,635,303✔
2832
    QUERY_CHECK_CODE(code, line, _return);
1,635,303✔
2833
  } else {
2834
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
91,503✔
2835
    QUERY_CHECK_CODE(code, line, _return);
91,503✔
2836
  }
2837

2838
  OPTR_SET_OPENED(pOperator);
1,724,430✔
2839

2840
_return:
1,726,806✔
2841
  if (pOperator->cost.openCost == 0) {
1,726,806✔
2842
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,661,126✔
2843
  }
2844
  if (code) {
1,726,806✔
2845
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,376✔
2846
    pOperator->pTaskInfo->code = code;
2,376✔
2847
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,376✔
2848
  }
2849
  return code;
1,724,430✔
2850
}
2851

2852
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
20,538,419✔
2853
  int32_t                    code = TSDB_CODE_SUCCESS;
20,538,419✔
2854
  int32_t                    line = 0;
20,538,419✔
2855
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
20,538,419✔
2856
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
20,539,067✔
2857

2858
  QRY_PARAM_CHECK(pRes);
20,539,067✔
2859
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
20,539,067✔
UNCOV
2860
    return code;
×
2861
  }
2862
  if (pOperator->pOperatorGetParam) {
20,538,851✔
2863
    if (pOperator->status == OP_EXEC_DONE) {
139,392✔
2864
      pOperator->status = OP_OPENED;
9,504✔
2865
    }
2866
    pVtbScan->curTableIdx = 0;
139,392✔
2867
    pVtbScan->lastTableIdx = -1;
139,392✔
2868
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
139,392✔
2869
    pOperator->pOperatorGetParam = NULL;
139,392✔
2870
  } else {
2871
    pVtbScan->window.skey = INT64_MAX;
20,399,459✔
2872
    pVtbScan->window.ekey = INT64_MIN;
20,399,459✔
2873
  }
2874

2875
  if (pVtbScan->needRedeploy) {
20,539,067✔
2876
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,111,400✔
2877
    QUERY_CHECK_CODE(code, line, _return);
2,111,400✔
2878
  }
2879

2880
  code = pOperator->fpSet._openFn(pOperator);
18,430,043✔
2881
  QUERY_CHECK_CODE(code, line, _return);
18,427,667✔
2882

2883
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
18,427,667✔
2884
    setOperatorCompleted(pOperator);
40,932✔
2885
    return code;
40,932✔
2886
  }
2887

2888
  code = virtualTableScanGetNext(pOperator, pRes);
18,386,735✔
2889
  QUERY_CHECK_CODE(code, line, _return);
18,386,106✔
2890

2891
  return code;
18,386,106✔
2892

2893
_return:
2,109,024✔
2894
  if (code) {
2,109,024✔
2895
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,109,024✔
2896
    pOperator->pTaskInfo->code = code;
2,109,024✔
2897
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,109,024✔
2898
  }
UNCOV
2899
  return code;
×
2900
}
2901

2902
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,151,391✔
2903
  if (batchFetch) {
1,151,391✔
2904
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,150,281✔
2905
    if (NULL == pPrev->leftHash) {
1,150,281✔
UNCOV
2906
      return terrno;
×
2907
    }
2908
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,150,281✔
2909
    if (NULL == pPrev->rightHash) {
1,150,281✔
UNCOV
2910
      return terrno;
×
2911
    }
2912
  } else {
2913
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,110✔
2914
    if (NULL == pPrev->leftCache) {
1,110✔
UNCOV
2915
      return terrno;
×
2916
    }
2917
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,110✔
2918
    if (NULL == pPrev->rightCache) {
1,110✔
UNCOV
2919
      return terrno;
×
2920
    }
2921
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,110✔
2922
    if (NULL == pPrev->onceTable) {
1,110✔
UNCOV
2923
      return terrno;
×
2924
    }
2925
  }
2926

2927
  return TSDB_CODE_SUCCESS;
1,151,391✔
2928
}
2929

UNCOV
2930
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2931
  if (pStreamRuntimeInfo == NULL) {
×
2932
    return;
×
2933
  }
2934

UNCOV
2935
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2936
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2937
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2938
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2939
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2940

UNCOV
2941
      pVtbScan->dynTbUid = pValue->uid;
×
2942
      break;
×
2943
    }
2944
  }
2945
}
2946

2947
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
2,468,316✔
2948
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2949
  int32_t      code = TSDB_CODE_SUCCESS;
2,468,316✔
2950
  int32_t      line = 0;
2,468,316✔
2951

2952
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
2,468,316✔
2953
  QUERY_CHECK_CODE(code, line, _return);
2,468,316✔
2954

2955
  pInfo->vtbScan.genNewParam = true;
2,468,316✔
2956
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
2,468,316✔
2957
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
2,468,316✔
2958
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
2,468,316✔
2959
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
2,468,316✔
2960
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
2,468,316✔
2961
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
2,468,316✔
2962
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
2,468,316✔
2963
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
2,468,316✔
2964
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
2,468,316✔
2965
  pInfo->vtbScan.needRedeploy = false;
2,468,316✔
2966
  pInfo->vtbScan.pMsgCb = pMsgCb;
2,468,316✔
2967
  pInfo->vtbScan.curTableIdx = 0;
2,468,316✔
2968
  pInfo->vtbScan.lastTableIdx = -1;
2,468,316✔
2969
  pInfo->vtbScan.dynTbUid = 0;
2,468,316✔
2970
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
2,468,316✔
2971
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
2,468,316✔
2972
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
2,468,316✔
2973
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
2,468,316✔
2974
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,468,316✔
2975
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
2,468,316✔
2976
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
9,202,757✔
2977
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
6,734,441✔
2978
    int32_t vgId = (int32_t)valueNode->datum.i;
6,734,441✔
2979
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,734,441✔
2980
    QUERY_CHECK_CODE(code, line, _return);
6,734,441✔
2981
  }
2982

2983
  if (pPhyciNode->dynTbname && pTaskInfo) {
2,468,316✔
UNCOV
2984
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2985
  }
2986

2987
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
2,468,316✔
2988
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
2,468,316✔
2989

2990
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
21,651,468✔
2991
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
19,183,152✔
2992
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
19,183,152✔
2993
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
38,366,304✔
2994
  }
2995

2996
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
2,468,316✔
2997
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
2,468,316✔
2998

2999
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,468,316✔
3000
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
2,468,316✔
3001

3002
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
2,468,316✔
3003
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
2,468,316✔
3004
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
2,468,316✔
3005
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
2,468,316✔
3006
  pInfo->vtbScan.vtbUidTagListMap = NULL;
2,468,316✔
3007
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
2,468,316✔
3008
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
2,468,316✔
3009

3010
  return code;
2,468,316✔
UNCOV
3011
_return:
×
3012
  // no need to destroy array and hashmap allocated in this function,
3013
  // since the operator's destroy function will take care of it
UNCOV
3014
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3015
  return code;
×
3016
}
3017

3018
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
270,600✔
3019
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3020
  int32_t              code = TSDB_CODE_SUCCESS;
270,600✔
3021
  int32_t              line = 0;
270,600✔
3022
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
270,600✔
3023

3024
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
270,600✔
3025
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
270,600✔
3026
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
270,600✔
3027
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
270,600✔
3028
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
270,600✔
3029
  pInfo->vtbWindow.singleWinMode = pPhyciNode->vtbWindow.singleWinMode;
270,600✔
3030
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
270,600✔
3031

3032
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
270,600✔
3033
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
270,600✔
3034

3035
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
270,600✔
3036
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
270,600✔
3037

3038
  pInfo->vtbWindow.outputWstartSlotId = -1;
270,600✔
3039
  pInfo->vtbWindow.outputWendSlotId = -1;
270,600✔
3040
  pInfo->vtbWindow.outputWdurationSlotId = -1;
270,600✔
3041
  pInfo->vtbWindow.curWinBatchIdx = 0;
270,600✔
3042

3043
  initResultSizeInfo(&pOperator->resultInfo, 1);
270,600✔
3044
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
270,600✔
3045
  QUERY_CHECK_CODE(code, line, _return);
270,600✔
3046

3047
  return code;
270,600✔
UNCOV
3048
_return:
×
3049
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3050
  return code;
×
3051
}
3052

3053
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
1,601,952✔
3054
  int32_t code = TSDB_CODE_SUCCESS;
1,601,952✔
3055
  int32_t lino = 0;
1,601,952✔
3056

3057
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
1,601,952✔
3058
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
1,601,952✔
3059
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
1,601,952✔
3060

3061
    *ppTsCols = (int64_t*)pColDataInfo->pData;
1,601,952✔
3062

3063
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1,601,952✔
3064
      code = blockDataUpdateTsWindow(pBlock, slotId);
156,288✔
3065
      QUERY_CHECK_CODE(code, lino, _return);
156,288✔
3066
    }
3067
  }
3068

3069
  return code;
1,601,952✔
UNCOV
3070
_return:
×
3071
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3072
  return code;
×
3073
}
3074

3075
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
15,814,788✔
3076
  int32_t                    code = TSDB_CODE_SUCCESS;
15,814,788✔
3077
  int32_t                    lino = 0;
15,814,788✔
3078
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
15,814,788✔
3079
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
15,814,788✔
3080
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
15,814,788✔
3081
  int64_t                    st = 0;
15,814,788✔
3082

3083
  if (OPTR_IS_OPENED(pOperator)) {
15,814,788✔
3084
    return code;
15,544,188✔
3085
  }
3086

3087
  if (pOperator->cost.openCost == 0) {
270,600✔
3088
    st = taosGetTimestampUs();
270,600✔
3089
  }
3090

3091
  while (1) {
800,976✔
3092
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,071,576✔
3093
    if (pBlock == NULL) {
1,071,576✔
3094
      break;
270,600✔
3095
    }
3096

3097
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
800,976✔
3098
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
1,953,600✔
3099
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
1,683,000✔
3100
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
1,683,000✔
3101
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
442,200✔
3102
            pInfo->outputWstartSlotId = i;
165,000✔
3103
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
277,200✔
3104
            pInfo->outputWendSlotId = i;
165,000✔
3105
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
112,200✔
3106
            pInfo->outputWdurationSlotId = i;
112,200✔
3107
          }
3108
        }
3109
      }
3110
    }
3111

3112
    TSKEY* wstartCol = NULL;
800,976✔
3113
    TSKEY* wendCol = NULL;
800,976✔
3114

3115
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
800,976✔
3116
    QUERY_CHECK_CODE(code, lino, _return);
800,976✔
3117
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
800,976✔
3118
    QUERY_CHECK_CODE(code, lino, _return);
800,976✔
3119

3120
    if (pDynInfo->vtbWindow.singleWinMode) {
800,976✔
3121
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
358,182,396✔
3122
        SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
358,041,684✔
3123
        QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
358,041,684✔
3124

3125
        QUERY_CHECK_NULL(taosArrayReserve(pWin, 1), code, lino, _return, terrno);
358,041,684✔
3126

3127
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, 0);
358,041,684✔
3128
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
358,041,684✔
3129
        pWindow->tw.skey = wstartCol[i];
358,041,684✔
3130
        pWindow->tw.ekey = wendCol[i] + 1;
358,041,684✔
3131
        pWindow->winOutIdx = -1;
358,041,684✔
3132

3133
        QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
716,083,368✔
3134
      }
3135
    } else {
3136
      SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
660,264✔
3137
      QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
660,264✔
3138

3139
      QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
660,264✔
3140

3141
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
1,668,232,764✔
3142
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
1,667,572,500✔
3143
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
1,667,572,500✔
3144
        pWindow->tw.skey = wstartCol[i];
1,667,572,500✔
3145
        pWindow->tw.ekey = wendCol[i] + 1;
1,667,572,500✔
3146
        pWindow->winOutIdx = -1;
1,667,572,500✔
3147
      }
3148

3149
      QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
1,320,528✔
3150
    }
3151
  }
3152

3153
  // handle first window's start key and last window's end key
3154
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
270,600✔
3155
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
270,600✔
3156

3157
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
270,600✔
3158
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
270,600✔
3159

3160
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
270,600✔
3161
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
270,600✔
3162

3163
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
270,600✔
3164
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
270,600✔
3165

3166
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
270,600✔
3167
    lastWin->tw.ekey = INT64_MAX;
90,200✔
3168
  }
3169
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
270,600✔
3170
    firstWin->tw.skey = INT64_MIN;
90,200✔
3171
  }
3172

3173
  OPTR_SET_OPENED(pOperator);
270,600✔
3174

3175
  if (pOperator->cost.openCost == 0) {
270,600✔
3176
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
270,600✔
3177
  }
3178

UNCOV
3179
_return:
×
3180
  if (code != TSDB_CODE_SUCCESS) {
270,600✔
UNCOV
3181
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3182
    pTaskInfo->code = code;
×
3183
    T_LONG_JMP(pTaskInfo->env, code);
×
3184
  }
3185
  return code;
270,600✔
3186
}
3187

3188
static int32_t buildDynQueryCtrlOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
139,392✔
3189
  int32_t                     code = TSDB_CODE_SUCCESS;
139,392✔
3190
  int32_t                     lino = 0;
139,392✔
3191
  SDynQueryCtrlOperatorParam* pDyn = NULL;
139,392✔
3192

3193
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
139,392✔
3194
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
139,392✔
3195

3196
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
139,392✔
3197
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
139,392✔
3198

3199
  pDyn = taosMemoryMalloc(sizeof(SDynQueryCtrlOperatorParam));
139,392✔
3200
  QUERY_CHECK_NULL(pDyn, code, lino, _return, terrno);
139,392✔
3201

3202
  pDyn->window.skey = skey;
139,392✔
3203
  pDyn->window.ekey = ekey;
139,392✔
3204

3205
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL;
139,392✔
3206
  (*ppRes)->downstreamIdx = 0;
139,392✔
3207
  (*ppRes)->reUse = false;
139,392✔
3208
  (*ppRes)->value = pDyn;
139,392✔
3209

3210
  return code;
139,392✔
UNCOV
3211
_return:
×
3212
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3213
  if (pDyn) {
×
3214
    taosMemoryFree(pDyn);
×
3215
  }
UNCOV
3216
  if (*ppRes) {
×
3217
    if ((*ppRes)->pChildren) {
×
3218
      taosArrayDestroy((*ppRes)->pChildren);
×
3219
    }
UNCOV
3220
    taosMemoryFree(*ppRes);
×
3221
    *ppRes = NULL;
×
3222
  }
UNCOV
3223
  return code;
×
3224
}
3225

3226
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
139,392✔
3227
  int32_t                       code = TSDB_CODE_SUCCESS;
139,392✔
3228
  int32_t                       lino = 0;
139,392✔
3229
  SExternalWindowOperatorParam* pExtWinOp = NULL;
139,392✔
3230

3231
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
139,392✔
3232
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
139,392✔
3233

3234
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
139,392✔
3235
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
139,392✔
3236

3237
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
139,392✔
3238
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
139,392✔
3239

3240
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
139,392✔
3241
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
139,392✔
3242

3243
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
139,392✔
3244
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
139,392✔
3245

3246
  SOperatorParam* pDynQueryCtrlParam = NULL;
139,392✔
3247
  code = buildDynQueryCtrlOperatorParamForExternalWindow(&pDynQueryCtrlParam, 0, firstWin->tw.skey, lastWin->tw.ekey);
139,392✔
3248
  QUERY_CHECK_CODE(code, lino, _return);
139,392✔
3249
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pDynQueryCtrlParam), code, lino, _return, terrno)
278,784✔
3250

3251
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
139,392✔
3252
  (*ppRes)->downstreamIdx = idx;
139,392✔
3253
  (*ppRes)->value = pExtWinOp;
139,392✔
3254
  (*ppRes)->reUse = false;
139,392✔
3255

3256
  return code;
139,392✔
UNCOV
3257
_return:
×
3258
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3259
  if (pExtWinOp) {
×
3260
    if (pExtWinOp->ExtWins) {
×
3261
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3262
    }
UNCOV
3263
    taosMemoryFree(pExtWinOp);
×
3264
  }
UNCOV
3265
  if (*ppRes) {
×
3266
    if ((*ppRes)->pChildren) {
×
3267
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3268
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
3269
        if (pChildParam) {
×
3270
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
3271
          if (pDynParam) {
×
3272
            taosMemoryFree(pDynParam);
×
3273
          }
UNCOV
3274
          taosMemoryFree(pChildParam);
×
3275
        }
3276
      }
UNCOV
3277
      taosArrayDestroy((*ppRes)->pChildren);
×
3278
    }
UNCOV
3279
    taosMemoryFree(*ppRes);
×
3280
    *ppRes = NULL;
×
3281
  }
UNCOV
3282
  return code;
×
3283
}
3284

3285
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
15,814,788✔
3286
  int32_t                    code = TSDB_CODE_SUCCESS;
15,814,788✔
3287
  int32_t                    lino = 0;
15,814,788✔
3288
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
15,814,788✔
3289
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
15,814,788✔
3290
  int64_t                    st = taosGetTimestampUs();
15,814,788✔
3291
  int32_t                    numOfWins = 0;
15,814,788✔
3292
  SOperatorInfo*             mergeOp = NULL;
15,814,788✔
3293
  SOperatorInfo*             extWinOp = NULL;
15,814,788✔
3294
  SOperatorParam*            pMergeParam = NULL;
15,814,788✔
3295
  SOperatorParam*            pExtWinParam = NULL;
15,814,788✔
3296
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
15,814,788✔
3297
  SSDataBlock*               pRes = pInfo->pRes;
15,814,788✔
3298

3299
  code = pOperator->fpSet._openFn(pOperator);
15,814,788✔
3300
  QUERY_CHECK_CODE(code, lino, _return);
15,814,788✔
3301

3302
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
15,814,788✔
3303
    *ppRes = NULL;
6,600✔
3304
    return code;
6,600✔
3305
  }
3306

3307
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
15,808,188✔
3308
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
15,808,188✔
3309

3310
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
15,808,188✔
3311

3312
  if (pInfo->isVstb) {
15,808,188✔
3313
    extWinOp = pOperator->pDownstream[1];
139,392✔
3314
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
139,392✔
3315
    QUERY_CHECK_CODE(code, lino, _return);
139,392✔
3316

3317
    SSDataBlock* pExtWinBlock = NULL;
139,392✔
3318
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
139,392✔
3319
    QUERY_CHECK_CODE(code, lino, _return);
139,392✔
3320
    setOperatorCompleted(extWinOp);
139,392✔
3321

3322
    blockDataCleanup(pRes);
139,392✔
3323
    code = blockDataEnsureCapacity(pRes, numOfWins);
139,392✔
3324
    QUERY_CHECK_CODE(code, lino, _return);
139,392✔
3325

3326
    if (pExtWinBlock) {
139,392✔
3327
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
139,392✔
3328
      QUERY_CHECK_CODE(code, lino, _return);
139,392✔
3329

3330
      if (pInfo->curWinBatchIdx == 0) {
139,392✔
3331
        // first batch, get _wstart from pMergedBlock
3332
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
129,888✔
3333
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
129,888✔
3334

3335
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
129,888✔
3336
      }
3337
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
139,392✔
3338
        // last batch, get _wend from pMergedBlock
3339
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
3,168✔
3340
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
3,168✔
3341

3342
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
3,168✔
3343
      }
3344
    }
3345
  } else {
3346
    mergeOp = pOperator->pDownstream[1];
15,668,796✔
3347
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
15,668,796✔
3348
    QUERY_CHECK_CODE(code, lino, _return);
15,668,796✔
3349

3350
    SSDataBlock* pMergedBlock = NULL;
15,668,796✔
3351
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
15,668,796✔
3352
    QUERY_CHECK_CODE(code, lino, _return);
15,668,796✔
3353

3354
    blockDataCleanup(pRes);
15,668,796✔
3355
    code = blockDataEnsureCapacity(pRes, numOfWins);
15,668,796✔
3356
    QUERY_CHECK_CODE(code, lino, _return);
15,668,796✔
3357

3358
    if (pMergedBlock) {
15,668,796✔
3359
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
15,668,796✔
3360
      QUERY_CHECK_CODE(code, lino, _return);
15,668,796✔
3361

3362
      if (pInfo->curWinBatchIdx == 0) {
15,668,796✔
3363
        // first batch, get _wstart from pMergedBlock
3364
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
140,712✔
3365
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
140,712✔
3366

3367
        firstWin->tw.skey = pMergedBlock->info.window.skey;
140,712✔
3368
      }
3369
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
15,668,796✔
3370
        // last batch, get _wend from pMergedBlock
3371
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
3,432✔
3372
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
3,432✔
3373

3374
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
3,432✔
3375
      }
3376
    }
3377
  }
3378

3379

3380
  if (pInfo->outputWstartSlotId != -1) {
15,808,188✔
3381
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
12,984,444✔
3382
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
12,984,444✔
3383

3384
    for (int32_t i = 0; i < numOfWins; i++) {
426,595,620✔
3385
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
413,611,176✔
3386
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
413,611,176✔
3387
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
413,611,176✔
3388
      QUERY_CHECK_CODE(code, lino, _return);
413,611,176✔
3389
    }
3390
  }
3391
  if (pInfo->outputWendSlotId != -1) {
15,808,188✔
3392
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
12,984,444✔
3393
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
12,984,444✔
3394

3395
    for (int32_t i = 0; i < numOfWins; i++) {
426,595,620✔
3396
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
413,611,176✔
3397
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
413,611,176✔
3398
      TSKEY ekey = pWindow->tw.ekey - 1;
413,611,176✔
3399
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
413,611,176✔
3400
      QUERY_CHECK_CODE(code, lino, _return);
413,611,176✔
3401
    }
3402
  }
3403
  if (pInfo->outputWdurationSlotId != -1) {
15,808,188✔
3404
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
11,572,572✔
3405
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
11,572,572✔
3406

3407
    for (int32_t i = 0; i < numOfWins; i++) {
303,781,764✔
3408
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
292,209,192✔
3409
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
292,209,192✔
3410
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
292,209,192✔
3411
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
292,209,192✔
3412
      QUERY_CHECK_CODE(code, lino, _return);
292,209,192✔
3413
    }
3414
  }
3415

3416
  pRes->info.rows = numOfWins;
15,808,188✔
3417
  *ppRes = pRes;
15,808,188✔
3418
  pInfo->curWinBatchIdx++;
15,808,188✔
3419

3420
  return code;
15,808,188✔
3421

UNCOV
3422
_return:
×
3423
  if (code != TSDB_CODE_SUCCESS) {
×
3424
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3425
    pTaskInfo->code = code;
×
3426
    T_LONG_JMP(pTaskInfo->env, code);
×
3427
  }
UNCOV
3428
  return code;
×
3429
}
3430

3431
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,267,389✔
3432
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
2,267,389✔
3433
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
2,268,253✔
3434
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
2,268,037✔
3435

3436
  pOper->status = OP_NOT_OPENED;
2,268,253✔
3437

3438
  switch (pDyn->qType) {
2,268,253✔
3439
    case DYN_QTYPE_STB_HASH:{
750✔
3440
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
750✔
3441
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
750✔
3442
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
750✔
3443
      
3444
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
750✔
3445
      if (TSDB_CODE_SUCCESS != code) {
750✔
UNCOV
3446
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
3447
        return code;
×
3448
      }
3449
      pStbJoin->ctx.prev.pListHead = NULL;
750✔
3450
      pStbJoin->ctx.prev.joinBuild = false;
750✔
3451
      pStbJoin->ctx.prev.pListTail = NULL;
750✔
3452
      pStbJoin->ctx.prev.tableNum = 0;
750✔
3453

3454
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
750✔
3455
      break; 
750✔
3456
    }
3457
    case DYN_QTYPE_VTB_SCAN: {
2,267,719✔
3458
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,267,719✔
3459
      
3460
      if (pVtbScan->otbNameToOtbInfoMap) {
2,267,287✔
UNCOV
3461
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3462
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3463
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3464
      }
3465
      if (pVtbScan->pRsp) {
2,267,071✔
UNCOV
3466
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3467
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3468
      }
3469
      if (pVtbScan->colRefInfo) {
2,266,855✔
3470
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
91,503✔
3471
        pVtbScan->colRefInfo = NULL;
91,503✔
3472
      }
3473
      if (pVtbScan->childTableMap) {
2,267,071✔
3474
        taosHashCleanup(pVtbScan->childTableMap);
5,616✔
3475
        pVtbScan->childTableMap = NULL;
5,616✔
3476
      }
3477
      if (pVtbScan->childTableList) {
2,267,503✔
3478
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,267,071✔
3479
      }
3480
      if (pPhyciNode->dynTbname && pTaskInfo) {
2,266,855✔
UNCOV
3481
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3482
      }
3483
      pVtbScan->curTableIdx = 0;
2,266,639✔
3484
      pVtbScan->lastTableIdx = -1;
2,267,071✔
3485
      break;
2,267,287✔
3486
    }
UNCOV
3487
    case DYN_QTYPE_VTB_WINDOW: {
×
3488
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3489
      if (pVtbWindow->pRes) {
×
3490
        blockDataDestroy(pVtbWindow->pRes);
×
3491
        pVtbWindow->pRes = NULL;
×
3492
      }
UNCOV
3493
      if (pVtbWindow->pWins) {
×
3494
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
3495
        pVtbWindow->pWins = NULL;
×
3496
      }
UNCOV
3497
      pVtbWindow->outputWdurationSlotId = -1;
×
3498
      pVtbWindow->outputWendSlotId = -1;
×
3499
      pVtbWindow->outputWstartSlotId = -1;
×
3500
      pVtbWindow->curWinBatchIdx = 0;
×
3501
      break;
×
3502
    }
UNCOV
3503
    default:
×
3504
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
3505
      break;
×
3506
  }
3507
  return 0;
2,267,173✔
3508
}
3509

3510
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
3,814,361✔
3511
  int32_t                    code = TSDB_CODE_SUCCESS;
3,814,361✔
3512
  int32_t                    line = 0;
3,814,361✔
3513
  int64_t                    st = 0;
3,814,361✔
3514
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,814,361✔
3515

3516
  if (OPTR_IS_OPENED(pOperator)) {
3,814,361✔
3517
    return code;
3,007,171✔
3518
  }
3519

3520
  if (pOperator->cost.openCost == 0) {
807,190✔
3521
    st = taosGetTimestampUs();
807,190✔
3522
  }
3523

3524
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
807,190✔
3525
  QUERY_CHECK_CODE(code, line, _return);
807,190✔
3526
  OPTR_SET_OPENED(pOperator);
807,190✔
3527

3528
_return:
807,190✔
3529
  if (pOperator->cost.openCost == 0) {
807,190✔
3530
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
807,190✔
3531
  }
3532
  if (code) {
807,190✔
UNCOV
3533
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3534
    pOperator->pTaskInfo->code = code;
×
3535
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3536
  }
3537
  return code;
807,190✔
3538
}
3539

3540
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3,814,361✔
3541
  int32_t                    code = TSDB_CODE_SUCCESS;
3,814,361✔
3542
  int32_t                    line = 0;
3,814,361✔
3543
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,814,361✔
3544
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,814,361✔
3545
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
3,814,361✔
3546
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
3,814,361✔
3547
  SOperatorParam*            pAggParam = NULL;
3,814,361✔
3548

3549
  if (pInfo->vtbScan.hasPartition) {
3,814,361✔
3550
    if (pInfo->vtbScan.batchProcessChild) {
3,333,366✔
3551
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,364,570✔
3552
      while (pIter) {
3,226,383✔
3553
        size_t     keyLen = 0;
2,806,505✔
3554
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
2,806,505✔
3555

3556
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
2,806,505✔
3557
        QUERY_CHECK_CODE(code, line, _return);
2,806,505✔
3558

3559
        if (pAggParam) {
2,806,505✔
3560
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
2,320,050✔
3561
          QUERY_CHECK_CODE(code, line, _return);
2,320,050✔
3562
        } else {
3563
          *pRes = NULL;
486,455✔
3564
        }
3565

3566
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
2,806,505✔
3567

3568
        if (*pRes) {
2,806,505✔
3569
          (*pRes)->info.id.groupId = groupid;
944,692✔
3570
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
944,692✔
3571
          QUERY_CHECK_CODE(code, line, _return);
944,692✔
3572
          break;
944,692✔
3573
        }
3574
      }
3575
    } else {
3576
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,968,796✔
3577
      while (pIter) {
2,955,748✔
3578
        size_t     keyLen = 0;
2,715,028✔
3579
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
2,715,028✔
3580
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
2,715,028✔
3581

3582
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
2,715,028✔
3583
        while (pIter2) {
4,881,508✔
3584
          size_t   keyLen2 = 0;
3,894,556✔
3585
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
3,894,556✔
3586
          SArray*  pTagList = *(SArray**)pIter2;
3,894,556✔
3587

3588
          if (pVtbScan->genNewParam) {
3,894,556✔
3589
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
2,166,480✔
3590
            QUERY_CHECK_CODE(code, line, _return);
2,166,480✔
3591
            if (pAggParam) {
2,166,480✔
3592
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
1,652,944✔
3593
              QUERY_CHECK_CODE(code, line, _return);
1,652,944✔
3594
            } else {
3595
              *pRes = NULL;
513,536✔
3596
            }
3597
          } else {
3598
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
1,728,076✔
3599
            QUERY_CHECK_CODE(code, line, _return);
1,728,076✔
3600
          }
3601

3602
          if (*pRes) {
3,894,556✔
3603
            pVtbScan->genNewParam = false;
1,728,076✔
3604
            (*pRes)->info.id.groupId = *groupid;
1,728,076✔
3605
            break;
1,728,076✔
3606
          }
3607
          pVtbScan->genNewParam = true;
2,166,480✔
3608
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
2,166,480✔
3609
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
2,166,480✔
3610
          QUERY_CHECK_CODE(code, line, _return);
2,166,480✔
3611
        }
3612
        if (*pRes) {
2,715,028✔
3613
          break;
1,728,076✔
3614
        }
3615
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
986,952✔
3616
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
986,952✔
3617
        QUERY_CHECK_CODE(code, line, _return);
986,952✔
3618
      }
3619
    }
3620

3621
  } else {
3622
    if (pInfo->vtbScan.batchProcessChild) {
480,995✔
3623
      code = buildAggOperatorParam(pInfo, &pAggParam);
93,860✔
3624
      QUERY_CHECK_CODE(code, line, _return);
93,860✔
3625

3626
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
93,860✔
3627
      QUERY_CHECK_CODE(code, line, _return);
93,860✔
3628
      setOperatorCompleted(pOperator);
93,860✔
3629
    } else {
3630
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
387,135✔
3631
      while (pIter) {
838,783✔
3632
        size_t   keyLen = 0;
786,051✔
3633
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
786,051✔
3634
        SArray*  pTagList = *(SArray**)pIter;
786,051✔
3635

3636
        if (pVtbScan->genNewParam) {
786,051✔
3637
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
451,648✔
3638
          QUERY_CHECK_CODE(code, line, _return);
451,648✔
3639

3640
          if (pAggParam) {
451,648✔
3641
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
323,264✔
3642
            QUERY_CHECK_CODE(code, line, _return);
323,264✔
3643
          } else {
3644
            *pRes = NULL;
128,384✔
3645
          }
3646
        } else {
3647
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
334,403✔
3648
          QUERY_CHECK_CODE(code, line, _return);
334,403✔
3649
        }
3650

3651
        if (*pRes) {
786,051✔
3652
          pVtbScan->genNewParam = false;
334,403✔
3653
          break;
334,403✔
3654
        }
3655
        pVtbScan->genNewParam = true;
451,648✔
3656
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
451,648✔
3657
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
451,648✔
3658
        QUERY_CHECK_CODE(code, line, _return);
451,648✔
3659
      }
3660
    }
3661
  }
3662
_return:
3,814,361✔
3663
  if (code) {
3,814,361✔
UNCOV
3664
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3665
  }
3666
  return code;
3,814,361✔
3667
}
3668

3669
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3,908,221✔
3670
  int32_t                    code = TSDB_CODE_SUCCESS;
3,908,221✔
3671
  int32_t                    line = 0;
3,908,221✔
3672
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,908,221✔
3673
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,908,221✔
3674

3675
  QRY_PARAM_CHECK(pRes);
3,908,221✔
3676
  if (pOperator->status == OP_EXEC_DONE) {
3,908,221✔
3677
    return code;
93,860✔
3678
  }
3679

3680
  code = pOperator->fpSet._openFn(pOperator);
3,814,361✔
3681
  QUERY_CHECK_CODE(code, line, _return);
3,814,361✔
3682

3683
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
3,814,361✔
UNCOV
3684
    setOperatorCompleted(pOperator);
×
3685
    return code;
×
3686
  }
3687

3688
  code = virtualTableAggGetNext(pOperator, pRes);
3,814,361✔
3689
  QUERY_CHECK_CODE(code, line, _return);
3,814,361✔
3690

3691
  return code;
3,814,361✔
3692

UNCOV
3693
_return:
×
3694
  if (code) {
×
3695
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3696
    pOperator->pTaskInfo->code = code;
×
3697
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3698
  }
UNCOV
3699
  return code;
×
3700
}
3701

3702
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,889,557✔
3703
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3704
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3705
  QRY_PARAM_CHECK(pOptrInfo);
3,889,557✔
3706

3707
  int32_t                    code = TSDB_CODE_SUCCESS;
3,889,557✔
3708
  int32_t                    line = 0;
3,889,557✔
3709
  __optr_fn_t                nextFp = NULL;
3,889,557✔
3710
  __optr_open_fn_t           openFp = NULL;
3,889,557✔
3711
  SOperatorInfo*             pOperator = NULL;
3,889,557✔
3712
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
3,889,557✔
3713
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
3,889,557✔
3714

3715
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,889,557✔
3716
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
3,889,557✔
3717

3718
  pOperator->pPhyNode = pPhyciNode;
3,889,557✔
3719
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
3,889,557✔
3720

3721
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
3,889,557✔
3722
  QUERY_CHECK_CODE(code, line, _error);
3,889,557✔
3723

3724
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
3,889,557✔
3725
                  pInfo, pTaskInfo);
3726

3727
  pInfo->qType = pPhyciNode->qType;
3,889,557✔
3728
  switch (pInfo->qType) {
3,889,557✔
3729
    case DYN_QTYPE_STB_HASH:
1,150,641✔
3730
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,150,641✔
3731
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,150,641✔
3732
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,150,641✔
3733
      QUERY_CHECK_CODE(code, line, _error);
1,150,641✔
3734
      nextFp = seqStableJoin;
1,150,641✔
3735
      openFp = optrDummyOpenFn;
1,150,641✔
3736
      break;
1,150,641✔
3737
    case DYN_QTYPE_VTB_SCAN:
1,661,126✔
3738
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,661,126✔
3739
      QUERY_CHECK_CODE(code, line, _error);
1,661,126✔
3740
      nextFp = vtbScanNext;
1,661,126✔
3741
      openFp = vtbScanOpen;
1,661,126✔
3742
      break;
1,661,126✔
3743
    case DYN_QTYPE_VTB_WINDOW:
270,600✔
3744
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
270,600✔
3745
      QUERY_CHECK_CODE(code, line, _error);
270,600✔
3746
      nextFp = vtbWindowNext;
270,600✔
3747
      openFp = vtbWindowOpen;
270,600✔
3748
      break;
270,600✔
3749
    case DYN_QTYPE_VTB_AGG:
807,190✔
3750
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
807,190✔
3751
      QUERY_CHECK_CODE(code, line, _error);
807,190✔
3752
      nextFp = vtbAggNext;
807,190✔
3753
      openFp = vtbAggOpen;
807,190✔
3754
      break;
807,190✔
UNCOV
3755
    default:
×
3756
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
3757
      code = TSDB_CODE_INVALID_PARA;
×
3758
      goto _error;
×
3759
  }
3760

3761
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
3,889,557✔
3762
                                         NULL, optrDefaultGetNextExtFn, NULL);
3763

3764
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
3,889,557✔
3765
  *pOptrInfo = pOperator;
3,889,557✔
3766
  return TSDB_CODE_SUCCESS;
3,889,557✔
3767

UNCOV
3768
_error:
×
3769
  if (pInfo != NULL) {
×
3770
    destroyDynQueryCtrlOperator(pInfo);
×
3771
  }
UNCOV
3772
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
3773
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
3774
  pTaskInfo->code = code;
×
3775
  return code;
×
3776
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc