• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4945

30 Jan 2026 06:19AM UTC coverage: 66.87% (+0.02%) from 66.849%
#4945

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1126 of 2018 new or added lines in 72 files covered. (55.8%)

13708 existing lines in 159 files now uncovered.

205277 of 306978 relevant lines covered (66.87%)

126353544.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.4
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "nodes.h"
18
#include "operator.h"
19
#include "os.h"
20
#include "plannodes.h"
21
#include "query.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tarray.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "thash.h"
28
#include "tmsg.h"
29
#include "trpc.h"
30
#include "ttypes.h"
31
#include "tdataformat.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,135,705✔
41
  SStbJoinTableList* pNext = NULL;
1,135,705✔
42
  
43
  while (pListHead) {
1,136,197✔
44
    taosMemoryFree(pListHead->pLeftVg);
492✔
45
    taosMemoryFree(pListHead->pLeftUid);
492✔
46
    taosMemoryFree(pListHead->pRightVg);
492✔
47
    taosMemoryFree(pListHead->pRightUid);
492✔
48
    pNext = pListHead->pNext;
492✔
49
    taosMemoryFree(pListHead);
492✔
50
    pListHead = pNext;
492✔
51
  }
52
}
1,135,705✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,135,705✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,135,705✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1,135,705✔
60
    if (pStbJoin->ctx.prev.leftHash) {
1,134,601✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,058,247✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,058,247✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1,134,601✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,058,247✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,058,247✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
1,104✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,104✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
1,104✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,104✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
1,104✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,104✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,135,705✔
81
}
1,135,705✔
82

83
void destroyColRefInfo(void *info) {
206,064,896✔
84
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
206,064,896✔
85
  if (pColRefInfo) {
206,064,896✔
86
    taosMemoryFree(pColRefInfo->colName);
206,064,896✔
87
    taosMemoryFree(pColRefInfo->colrefName);
206,064,896✔
88
  }
89
}
206,064,896✔
90

91
void destroyColRefArray(void *info) {
10,726,469✔
92
  SArray *pColRefArray = *(SArray **)info;
10,726,469✔
93
  if (pColRefArray) {
10,726,469✔
94
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
10,726,469✔
95
  }
96
}
10,726,469✔
97

98
void freeUseDbOutput(void* pOutput) {
2,504,392✔
99
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
2,504,392✔
100
  if (NULL == pOutput) {
2,504,392✔
101
    return;
×
102
  }
103

104
  if (pOut->dbVgroup) {
2,504,392✔
105
    freeVgInfo(pOut->dbVgroup);
2,504,392✔
106
  }
107
  taosMemFree(pOut);
2,504,392✔
108
}
109

110
void destroyOtbInfoArray(void *info) {
4,898,995✔
111
  SArray *pOtbInfoArray = *(SArray **)info;
4,898,995✔
112
  if (pOtbInfoArray) {
4,898,995✔
113
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
4,898,995✔
114
  }
115
}
4,898,995✔
116

117
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
3,457,011✔
118
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
3,457,011✔
119
  if (pOtbVgIdToOtbInfoArrayMap) {
3,457,011✔
120
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
3,457,011✔
121
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
3,457,011✔
122
  }
123
}
3,457,011✔
124

125
void destroyTagList(void *info) {
2,602,192✔
126
  SArray *pTagList = *(SArray **)info;
2,602,192✔
127
  if (pTagList) {
2,602,192✔
128
    taosArrayDestroyEx(pTagList, destroyTagVal);
2,602,192✔
129
  }
130
}
2,602,192✔
131

132
void destroyVtbUidTagListMap(void *info) {
981,048✔
133
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
981,048✔
134
  if (pVtbUidTagListMap) {
981,048✔
135
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
981,048✔
136
    taosHashCleanup(pVtbUidTagListMap);
981,048✔
137
  }
138
}
981,048✔
139

140
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
2,424,425✔
141
  if (pVtbScan->dbName) {
2,424,425✔
142
    taosMemoryFreeClear(pVtbScan->dbName);
2,424,425✔
143
  }
144
  if (pVtbScan->tbName) {
2,424,425✔
145
    taosMemoryFreeClear(pVtbScan->tbName);
2,424,425✔
146
  }
147
  if (pVtbScan->childTableList) {
2,424,425✔
148
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
2,424,425✔
149
  }
150
  if (pVtbScan->colRefInfo) {
2,424,425✔
151
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
152
    pVtbScan->colRefInfo = NULL;
×
153
  }
154
  if (pVtbScan->childTableMap) {
2,424,425✔
155
    taosHashCleanup(pVtbScan->childTableMap);
2,115,670✔
156
  }
157
  if (pVtbScan->readColList) {
2,424,425✔
158
    taosArrayDestroy(pVtbScan->readColList);
2,424,425✔
159
  }
160
  if (pVtbScan->dbVgInfoMap) {
2,424,425✔
161
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
2,424,425✔
162
    taosHashCleanup(pVtbScan->dbVgInfoMap);
2,424,425✔
163
  }
164
  if (pVtbScan->otbNameToOtbInfoMap) {
2,424,425✔
165
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
896,514✔
166
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
896,514✔
167
  }
168
  if (pVtbScan->pRsp) {
2,424,425✔
169
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
170
    taosMemoryFreeClear(pVtbScan->pRsp);
×
171
  }
172
  if (pVtbScan->existOrgTbVg) {
2,424,425✔
173
    taosHashCleanup(pVtbScan->existOrgTbVg);
2,424,425✔
174
  }
175
  if (pVtbScan->curOrgTbVg) {
2,424,425✔
176
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,700✔
177
  }
178
  if (pVtbScan->newAddedVgInfo) {
2,424,425✔
179
    taosHashCleanup(pVtbScan->newAddedVgInfo);
840✔
180
  }
181
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
2,424,425✔
182
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
186,918✔
183
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
186,918✔
184
  }
185
  if (pVtbScan->vtbUidToVgIdMapMap) {
2,424,425✔
186
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
291,628✔
187
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
291,628✔
188
  }
189
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
2,424,425✔
190
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
417,342✔
191
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
417,342✔
192
  }
193
  if (pVtbScan->vtbUidTagListMap) {
2,424,425✔
194
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
291,628✔
195
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
291,628✔
196
  }
197
  if (pVtbScan->vtbGroupIdTagListMap) {
2,424,425✔
198
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
656,622✔
199
  }
200
  if (pVtbScan->vtbUidToGroupIdMap) {
2,424,425✔
201
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
656,622✔
202
  }
203
}
2,424,425✔
204

205
void destroyWinArray(void *info) {
938,572✔
206
  SArray *pWinArray = *(SArray **)info;
938,572✔
207
  if (pWinArray) {
938,572✔
208
    taosArrayDestroy(pWinArray);
938,572✔
209
  }
210
}
938,572✔
211

212
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
375,478✔
213
  if (pVtbWindow->pRes) {
375,478✔
214
    blockDataDestroy(pVtbWindow->pRes);
375,478✔
215
  }
216
  if (pVtbWindow->pWins) {
375,478✔
217
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
375,478✔
218
  }
219
}
375,478✔
220

221
static void destroyDynQueryCtrlOperator(void* param) {
3,559,383✔
222
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
3,559,383✔
223

224
  switch (pDyn->qType) {
3,559,383✔
225
    case DYN_QTYPE_STB_HASH:
1,134,958✔
226
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,134,958✔
227
      break;
1,134,958✔
228
    case DYN_QTYPE_VTB_WINDOW:
375,478✔
229
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
375,478✔
230
    case DYN_QTYPE_VTB_AGG:
2,424,425✔
231
    case DYN_QTYPE_VTB_SCAN:
232
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
2,424,425✔
233
      break;
2,424,425✔
234
    default:
×
235
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
236
      break;
×
237
  }
238

239
  taosMemoryFreeClear(param);
3,559,383✔
240
}
3,559,383✔
241

242
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
243
  if (batchFetch) {
7,281,286✔
244
    return true;
7,276,870✔
245
  }
246
  
247
  if (rightTable) {
4,416✔
248
    return pPost->rightCurrUid == pPost->rightNextUid;
2,208✔
249
  }
250

251
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,208✔
252

253
  return (NULL == num) ? false : true;
2,208✔
254
}
255

256
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,640,643✔
257
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,640,643✔
258
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,640,643✔
259
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,640,643✔
260
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,640,643✔
261
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,640,643✔
262
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,640,643✔
263
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,640,643✔
264
  int64_t                    readIdx = pNode->readIdx + 1;
3,640,643✔
265
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,640,643✔
266

267
  pPost->leftCurrUid = *leftUid;
3,640,643✔
268
  pPost->rightCurrUid = *rightUid;
3,640,643✔
269

270
  pPost->leftVgId = *leftVgId;
3,640,643✔
271
  pPost->rightVgId = *rightVgId;
3,640,643✔
272

273
  while (true) {
274
    if (readIdx < pNode->uidNum) {
3,640,643✔
275
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,563,677✔
276
      break;
3,563,677✔
277
    }
278
    
279
    pNode = pNode->pNext;
76,966✔
280
    if (NULL == pNode) {
76,966✔
281
      pPost->rightNextUid = 0;
76,966✔
282
      break;
76,966✔
283
    }
284
    
285
    rightUid = pNode->pRightUid;
×
286
    readIdx = 0;
×
287
  }
288

289
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,281,286✔
290
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,281,286✔
291

292
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,640,643✔
293
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
294
    pStbJoin->execInfo.rightCacheNum++;
×
295
  }  
296

297
  return TSDB_CODE_SUCCESS;
3,640,643✔
298
}
299

300
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
7,024,348✔
301
  int32_t     code = TSDB_CODE_SUCCESS;
7,024,348✔
302
  int32_t     lino = 0;
7,024,348✔
303
  SOrgTbInfo* pTbInfo = NULL;
7,024,348✔
304

305
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
7,024,348✔
306

307
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
7,024,348✔
308
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
7,024,348✔
309

310
  pTbInfo->vgId = pSrc->vgId;
7,024,348✔
311
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
7,024,348✔
312

313
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
7,024,348✔
314
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
7,024,348✔
315

316
  *ppDst = pTbInfo;
7,024,348✔
317

318
  return code;
7,024,348✔
319
_return:
×
320
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
321
  if (pTbInfo) {
×
322
    if (pTbInfo->colMap) {
×
323
      taosArrayDestroy(pTbInfo->colMap);
×
324
    }
325
    taosMemoryFreeClear(pTbInfo);
×
326
  }
327
  return code;
×
328
}
329

330
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
2,335,000✔
331
  int32_t  code = TSDB_CODE_SUCCESS;
2,335,000✔
332
  int32_t  lino = 0;
2,335,000✔
333
  STagVal  tmpTag;
2,335,000✔
334

335
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
2,335,000✔
336
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
2,335,000✔
337

338
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
15,803,264✔
339
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
13,468,264✔
340
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
13,468,264✔
341
    tmpTag.type = pSrcTag->type;
13,468,264✔
342
    tmpTag.cid = pSrcTag->cid;
13,468,264✔
343
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
13,468,264✔
344
      tmpTag.nData = pSrcTag->nData;
5,909,856✔
345
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
5,909,856✔
346
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
5,909,856✔
347
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
5,909,856✔
348
    } else {
349
      tmpTag.i64 = pSrcTag->i64;
7,558,408✔
350
    }
351

352
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
26,936,528✔
353
    tmpTag = (STagVal){0};
13,468,264✔
354
  }
355

356
  return code;
2,335,000✔
357
_return:
×
358
  if (pBasic->tagList) {
×
359
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
360
    pBasic->tagList = NULL;
×
361
  }
362
  if (tmpTag.pData) {
×
363
    taosMemoryFree(tmpTag.pData);
×
364
  }
365
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
366
  return code;
×
367
}
368

369
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
5,036,635✔
370
  int32_t     code = TSDB_CODE_SUCCESS;
5,036,635✔
371
  int32_t     lino = 0;
5,036,635✔
372
  SOrgTbInfo  batchInfo;
5,036,635✔
373

374
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
5,036,635✔
375
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
5,036,635✔
376

377
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
13,051,534✔
378
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
8,014,899✔
379
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
8,014,899✔
380
    batchInfo.vgId = pSrc->vgId;
8,014,899✔
381
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
8,014,899✔
382
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
8,014,899✔
383
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
8,014,899✔
384
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
16,029,798✔
385
    batchInfo = (SOrgTbInfo){0};
8,014,899✔
386
  }
387

388
  return code;
5,036,635✔
389
_return:
×
390
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
391
  if (pBasic->batchOrgTbInfo) {
×
392
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
393
    pBasic->batchOrgTbInfo = NULL;
×
394
  }
395
  if (batchInfo.colMap) {
×
396
    taosArrayDestroy(batchInfo.colMap);
×
397
    batchInfo.colMap = NULL;
×
398
  }
399
  return code;
×
400
}
401

402
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,281,286✔
403
  int32_t code = TSDB_CODE_SUCCESS;
7,281,286✔
404
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,281,286✔
405
  if (NULL == *ppRes) {
7,281,286✔
406
    code = terrno;
×
407
    freeOperatorParam(pChild, OP_GET_PARAM);
×
408
    return code;
×
409
  }
410
  if (pChild) {
7,281,286✔
411
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
157,124✔
412
    if (NULL == (*ppRes)->pChildren) {
157,124✔
413
      code = terrno;
×
414
      freeOperatorParam(pChild, OP_GET_PARAM);
×
415
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
416
      *ppRes = NULL;
×
417
      return code;
×
418
    }
419
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
314,248✔
420
      code = terrno;
×
421
      freeOperatorParam(pChild, OP_GET_PARAM);
×
422
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
423
      *ppRes = NULL;
×
424
      return code;
×
425
    }
426
  } else {
427
    (*ppRes)->pChildren = NULL;
7,124,162✔
428
  }
429

430
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,281,286✔
431
  if (NULL == pGc) {
7,281,286✔
432
    code = terrno;
×
433
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
434
    *ppRes = NULL;
×
435
    return code;
×
436
  }
437

438
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,281,286✔
439
  pGc->downstreamIdx = downstreamIdx;
7,281,286✔
440
  pGc->vgId = vgId;
7,281,286✔
441
  pGc->tbUid = tbUid;
7,281,286✔
442
  pGc->needCache = needCache;
7,281,286✔
443

444
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,281,286✔
445
  (*ppRes)->downstreamIdx = downstreamIdx;
7,281,286✔
446
  (*ppRes)->value = pGc;
7,281,286✔
447
  (*ppRes)->reUse = false;
7,281,286✔
448

449
  return TSDB_CODE_SUCCESS;
7,281,286✔
450
}
451

452

453
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
454
  int32_t code = TSDB_CODE_SUCCESS;
×
455
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
456
  if (NULL == *ppRes) {
×
457
    return terrno;
×
458
  }
459
  (*ppRes)->pChildren = NULL;
×
460

461
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
462
  if (NULL == pGc) {
×
463
    code = terrno;
×
464
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
465
    return code;
×
466
  }
467

468
  pGc->downstreamIdx = downstreamIdx;
×
469
  pGc->vgId = vgId;
×
470
  pGc->tbUid = tbUid;
×
471

472
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
473
  (*ppRes)->downstreamIdx = downstreamIdx;
×
474
  (*ppRes)->value = pGc;
×
475
  (*ppRes)->reUse = false;
×
476

477
  return TSDB_CODE_SUCCESS;
×
478
}
479

480
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
17,772,091✔
481
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
482
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
483
                                               SArray* pOrgTbInfoArray, STimeWindow window,
484
                                               SDownstreamSourceNode* pDownstreamSourceNode,
485
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
486
  int32_t code = TSDB_CODE_SUCCESS;
17,772,091✔
487
  int32_t lino = 0;
17,772,091✔
488

489
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
17,772,091✔
490
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
491

492
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
17,772,091✔
493
  pBasic->srcOpType = srcOpType;
17,772,091✔
494
  pBasic->vgId = vgId;
17,772,091✔
495
  pBasic->groupid = groupId;
17,772,091✔
496
  pBasic->window = window;
17,772,091✔
497
  pBasic->tableSeq = tableSeq;
17,772,091✔
498
  pBasic->type = exchangeType;
17,772,091✔
499
  pBasic->isNewParam = isNewParam;
17,772,091✔
500

501
  if (pDownstreamSourceNode) {
17,772,091✔
502
    pBasic->isNewDeployed = true;
2,310✔
503
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,310✔
504
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,310✔
505
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,310✔
506
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,310✔
507
    pBasic->newDeployedSrc.localExec = false;
2,310✔
508
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,310✔
509
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,310✔
510
  } else {
511
    pBasic->isNewDeployed = false;
17,769,781✔
512
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
17,769,781✔
513
  }
514

515
  if (pUidList) {
17,772,091✔
516
    pBasic->uidList = taosArrayDup(pUidList, NULL);
5,017,238✔
517
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
5,017,238✔
518
  } else {
519
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
12,754,853✔
520
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
12,754,853✔
521
  }
522

523
  if (pOrgTbInfo) {
17,772,091✔
524
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
7,024,348✔
525
    QUERY_CHECK_CODE(code, lino, _return);
7,024,348✔
526
  } else {
527
    pBasic->orgTbInfo = NULL;
10,747,743✔
528
  }
529

530
  if (pTagList) {
17,772,091✔
531
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
2,335,000✔
532
    QUERY_CHECK_CODE(code, lino, _return);
2,335,000✔
533
  } else {
534
    pBasic->tagList = NULL;
15,437,091✔
535
  }
536

537
  if (pOrgTbInfoArray) {
17,772,091✔
538
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
5,036,635✔
539
    QUERY_CHECK_CODE(code, lino, _return);
5,036,635✔
540
  } else {
541
    pBasic->batchOrgTbInfo = NULL;
12,735,456✔
542
  }
543
  return code;
17,772,091✔
544

545
_return:
×
546
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
547
  freeExchangeGetBasicOperatorParam(pBasic);
×
548
  return code;
×
549
}
550

551
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
12,493,733✔
552
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
553
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
554
                                              SArray* pOrgTbInfoArray, STimeWindow window,
555
                                              SDownstreamSourceNode* pDownstreamSourceNode,
556
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
557

558
  int32_t                      code = TSDB_CODE_SUCCESS;
12,493,733✔
559
  int32_t                      lino = 0;
12,493,733✔
560
  SOperatorParam*              pParam = NULL;
12,493,733✔
561
  SExchangeOperatorParam*      pExc = NULL;
12,493,733✔
562

563
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
12,493,733✔
564
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
12,493,733✔
565

566
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
12,493,733✔
567
  pParam->downstreamIdx = downstreamIdx;
12,493,733✔
568
  pParam->reUse = reUse;
12,493,733✔
569
  pParam->pChildren = NULL;
12,493,733✔
570
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
12,493,733✔
571
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
12,493,733✔
572

573
  pExc = (SExchangeOperatorParam*)pParam->value;
12,493,733✔
574
  pExc->multiParams = false;
12,493,733✔
575

576
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
12,493,733✔
577
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
578
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
579

580
  *ppRes = pParam;
12,493,733✔
581
  return code;
12,493,733✔
582
_return:
×
583
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
584
  if (pParam) {
×
585
    freeOperatorParam(pParam, OP_GET_PARAM);
×
586
  }
587
  return code;
×
588
}
589

590
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,416✔
591
  int32_t code = TSDB_CODE_SUCCESS;
4,416✔
592
  int32_t lino = 0;
4,416✔
593

594
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,416✔
595
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,416✔
596

597
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,416✔
598

599
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,416✔
600
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,416✔
601
  QUERY_CHECK_CODE(code, lino, _return);
4,416✔
602

603
_return:
4,416✔
604
  if (code) {
4,416✔
605
    qError("failed to build exchange operator param, code:%d", code);
×
606
  }
607
  taosArrayDestroy(pUidList);
4,416✔
608
  return code;
4,416✔
609
}
610

611
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
693,870✔
612
  int32_t                   code = TSDB_CODE_SUCCESS;
693,870✔
613
  int32_t                   lino = 0;
693,870✔
614

615
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VTB_WIN_SCAN,
693,870✔
616
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
617
  QUERY_CHECK_CODE(code, lino, _return);
693,870✔
618

619
  return code;
693,870✔
620
_return:
×
621
  qError("failed to build exchange operator param for external window, code:%d", code);
×
622
  return code;
×
623
}
624

625
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
4,771,099✔
626
  int32_t                      code = TSDB_CODE_SUCCESS;
4,771,099✔
627
  int32_t                      lino = 0;
4,771,099✔
628
  SArray*                      pUidList = NULL;
4,771,099✔
629

630
  pUidList = taosArrayInit(1, sizeof(int64_t));
4,771,099✔
631
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,771,099✔
632

633
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
4,771,099✔
634

635
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
4,771,099✔
636
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
4,771,099✔
637
  QUERY_CHECK_CODE(code, lino, _return);
4,771,099✔
638

639
_return:
4,771,099✔
640
  if (code) {
4,771,099✔
641
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
642
  }
643
  taosArrayDestroy(pUidList);
4,771,099✔
644
  return code;
4,771,099✔
645
}
646

647
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
7,024,348✔
648
                                                  SDownstreamSourceNode* pNewSource) {
649
  int32_t                      code = TSDB_CODE_SUCCESS;
7,024,348✔
650
  int32_t                      lino = 0;
7,024,348✔
651

652
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
7,024,348✔
653
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
7,024,348✔
654
  QUERY_CHECK_CODE(code, lino, _return);
7,024,348✔
655

656
  return code;
7,024,348✔
657
_return:
×
658
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
659
  return code;
×
660
}
661

662
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
141,267✔
663
  int32_t                       code = TSDB_CODE_SUCCESS;
141,267✔
664
  int32_t                       line = 0;
141,267✔
665
  SOperatorParam*               pParam = NULL;
141,267✔
666
  SExchangeOperatorBatchParam*  pExc = NULL;
141,267✔
667
  SExchangeOperatorBasicParam   basic = {0};
141,267✔
668

669
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
141,267✔
670
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
141,267✔
671

672
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
141,267✔
673
  pParam->downstreamIdx = downstreamIdx;
141,267✔
674
  pParam->reUse = false;
141,267✔
675
  pParam->pChildren = NULL;
141,267✔
676
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
141,267✔
677
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
141,267✔
678

679
  pExc = pParam->value;
141,267✔
680
  pExc->multiParams = true;
141,267✔
681
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
141,267✔
682
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
141,267✔
683

684
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
141,267✔
685

686
  int32_t iter = 0;
141,267✔
687
  void*   p = NULL;
141,267✔
688
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
382,990✔
689
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
241,723✔
690
    SArray*  pUidList = *(SArray**)p;
241,723✔
691

692
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
241,723✔
693
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
694
                                           pUidList, NULL, NULL, NULL,
695
                                           (STimeWindow){0}, NULL, false, false, false);
241,723✔
696
    QUERY_CHECK_CODE(code, line, _return);
241,723✔
697

698
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
241,723✔
699

700
    basic = (SExchangeOperatorBasicParam){0};
241,723✔
701
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
241,723✔
702

703
    // already transferred to batch param, can free here
704
    taosArrayDestroy(pUidList);
241,723✔
705

706
    *(SArray**)p = NULL;
241,723✔
707
  }
708
  *ppRes = pParam;
141,267✔
709

710
  return code;
141,267✔
711
  
UNCOV
712
_return:
×
713
  qError("failed to build batch exchange operator param, code:%d", code);
×
714
  freeOperatorParam(pParam, OP_GET_PARAM);
×
715
  freeExchangeGetBasicOperatorParam(&basic);
×
716
  return code;
×
717
}
718

719
static int32_t buildBatchExchangeOperatorParamForVirtual(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps, STimeWindow window, EExchangeSourceType type) {
4,463,944✔
720
  int32_t                       code = TSDB_CODE_SUCCESS;
4,463,944✔
721
  int32_t                       lino = 0;
4,463,944✔
722
  SOperatorParam*               pParam = NULL;
4,463,944✔
723
  SExchangeOperatorBatchParam*  pExc = NULL;
4,463,944✔
724
  SExchangeOperatorBasicParam   basic = {0};
4,463,944✔
725

726
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
4,463,944✔
727
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
4,463,944✔
728

729
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
4,463,944✔
730
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
4,463,944✔
731

732
  pExc = pParam->value;
4,463,944✔
733
  pExc->multiParams = true;
4,463,944✔
734

735
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
4,463,944✔
736
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
4,463,944✔
737
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
4,463,944✔
738

739
  size_t keyLen = 0;
4,463,944✔
740
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
4,463,944✔
741
  while (pIter != NULL) {
9,500,579✔
742
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
5,036,635✔
743
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
5,036,635✔
744

745
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
5,036,635✔
746
                                           type, *vgId, groupid,
747
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
748
                                           window, NULL, false, true, false);
749
    QUERY_CHECK_CODE(code, lino, _return);
5,036,635✔
750

751
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
5,036,635✔
752
    QUERY_CHECK_CODE(code, lino, _return);
5,036,635✔
753

754
    basic = (SExchangeOperatorBasicParam){0};
5,036,635✔
755
    pIter = taosHashIterate(pBatchMaps, pIter);
5,036,635✔
756
  }
757

758
  pParam->pChildren = NULL;
4,463,944✔
759
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
4,463,944✔
760
  pParam->downstreamIdx = downstreamIdx;
4,463,944✔
761
  pParam->reUse = false;
4,463,944✔
762

763
  *ppRes = pParam;
4,463,944✔
764
  return code;
4,463,944✔
765

UNCOV
766
_return:
×
767
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
768
  freeOperatorParam(pParam, OP_GET_PARAM);
×
769
  freeExchangeGetBasicOperatorParam(&basic);
×
770
  return code;
×
771
}
772

773
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,640,643✔
774
  int32_t code = TSDB_CODE_SUCCESS;
3,640,643✔
775
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,640,643✔
776
  if (NULL == *ppRes) {
3,640,643✔
UNCOV
777
    code = terrno;
×
778
    return code;
×
779
  }
780
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,640,643✔
781
  if (NULL == (*ppRes)->pChildren) {
3,640,643✔
UNCOV
782
    code = terrno;
×
783
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
784
    *ppRes = NULL;
×
785
    return code;
×
786
  }
787
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,281,286✔
UNCOV
788
    code = terrno;
×
789
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
790
    *ppRes = NULL;
×
791
    return code;
×
792
  }
793
  *ppChild0 = NULL;
3,640,643✔
794
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,281,286✔
UNCOV
795
    code = terrno;
×
796
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
797
    *ppRes = NULL;
×
798
    return code;
×
799
  }
800
  *ppChild1 = NULL;
3,640,643✔
801
  
802
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,640,643✔
803
  if (NULL == pJoin) {
3,640,643✔
UNCOV
804
    code = terrno;
×
805
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
806
    *ppRes = NULL;
×
807
    return code;
×
808
  }
809

810
  pJoin->initDownstream = initParam;
3,640,643✔
811
  
812
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,640,643✔
813
  (*ppRes)->value = pJoin;
3,640,643✔
814
  (*ppRes)->reUse = false;
3,640,643✔
815

816
  return TSDB_CODE_SUCCESS;
3,640,643✔
817
}
818

UNCOV
819
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
820
  int32_t code = TSDB_CODE_SUCCESS;
×
821
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
822
  if (NULL == *ppRes) {
×
823
    code = terrno;
×
824
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
825
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
826
    return code;
×
827
  }
UNCOV
828
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
829
  if (NULL == *ppRes) {
×
830
    code = terrno;
×
831
    taosMemoryFreeClear(*ppRes);
×
832
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
833
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
834
    return code;
×
835
  }
UNCOV
836
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
837
    code = terrno;
×
838
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
839
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
840
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
841
    *ppRes = NULL;
×
842
    return code;
×
843
  }
UNCOV
844
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
845
    code = terrno;
×
846
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
847
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
848
    *ppRes = NULL;
×
849
    return code;
×
850
  }
851
  
UNCOV
852
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
853
  (*ppRes)->value = NULL;
×
854
  (*ppRes)->reUse = false;
×
855

UNCOV
856
  return TSDB_CODE_SUCCESS;
×
857
}
858

859
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
11,441✔
860
  int32_t code = TSDB_CODE_SUCCESS;
11,441✔
861
  int32_t vgNum = tSimpleHashGetSize(pVg);
11,441✔
862
  if (vgNum <= 0 || vgNum > 1) {
11,441✔
UNCOV
863
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
864
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
865
  }
866

867
  int32_t iter = 0;
11,441✔
868
  void* p = NULL;
11,441✔
869
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
22,882✔
870
    SArray* pUidList = *(SArray**)p;
11,441✔
871

872
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
11,441✔
873
    if (code) {
11,441✔
UNCOV
874
      return code;
×
875
    }
876
    taosArrayDestroy(pUidList);
11,441✔
877
    *(SArray**)p = NULL;
11,441✔
878
  }
879
  
880
  return TSDB_CODE_SUCCESS;
11,441✔
881
}
882

UNCOV
883
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
884
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
885
  if (NULL == pUidList) {
×
886
    return terrno;
×
887
  }
UNCOV
888
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
889
    return terrno;
×
890
  }
891

UNCOV
892
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
893
  taosArrayDestroy(pUidList);
×
894
  if (code) {
×
895
    return code;
×
896
  }
897
  
UNCOV
898
  return TSDB_CODE_SUCCESS;
×
899
}
900

901
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,640,643✔
902
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,640,643✔
903
  SOperatorParam*             pSrcParam0 = NULL;
3,640,643✔
904
  SOperatorParam*             pSrcParam1 = NULL;
3,640,643✔
905
  SOperatorParam*             pGcParam0 = NULL;
3,640,643✔
906
  SOperatorParam*             pGcParam1 = NULL;  
3,640,643✔
907
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,640,643✔
908
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,640,643✔
909
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,640,643✔
910
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,640,643✔
911
  int32_t                     code = TSDB_CODE_SUCCESS;
3,640,643✔
912

913
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,640,643✔
914
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
915

916
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,640,643✔
917
  
918
  if (pInfo->stbJoin.basic.batchFetch) {
3,640,643✔
919
    if (pPrev->leftHash) {
3,638,435✔
920
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
76,354✔
921
      if (TSDB_CODE_SUCCESS == code) {
76,354✔
922
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
76,354✔
923
      }
924
      if (TSDB_CODE_SUCCESS == code) {
76,354✔
925
        tSimpleHashCleanup(pPrev->leftHash);
76,354✔
926
        tSimpleHashCleanup(pPrev->rightHash);
76,354✔
927
        pPrev->leftHash = NULL;
76,354✔
928
        pPrev->rightHash = NULL;
76,354✔
929
      }
930
    }
931
  } else {
932
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,208✔
933
    if (TSDB_CODE_SUCCESS == code) {
2,208✔
934
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,208✔
935
    }
936
  }
937

938
  bool initParam = pSrcParam0 ? true : false;
3,640,643✔
939
  if (TSDB_CODE_SUCCESS == code) {
3,640,643✔
940
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,640,643✔
941
    pSrcParam0 = NULL;
3,640,643✔
942
  }
943
  if (TSDB_CODE_SUCCESS == code) {
3,640,643✔
944
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,640,643✔
945
    pSrcParam1 = NULL;
3,640,643✔
946
  }
947
  if (TSDB_CODE_SUCCESS == code) {
3,640,643✔
948
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,640,643✔
949
  }
950
  if (TSDB_CODE_SUCCESS != code) {
3,640,643✔
UNCOV
951
    if (pSrcParam0) {
×
952
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
953
    }
UNCOV
954
    if (pSrcParam1) {
×
955
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
956
    }
UNCOV
957
    if (pGcParam0) {
×
958
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
959
    }
UNCOV
960
    if (pGcParam1) {
×
961
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
962
    }
UNCOV
963
    if (*ppParam) {
×
964
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
965
      *ppParam = NULL;
×
966
    }
967
  }
968
  
969
  return code;
3,640,643✔
970
}
971

972
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
4,771,099✔
973
  int32_t                   code = TSDB_CODE_SUCCESS;
4,771,099✔
974
  int32_t                   lino = 0;
4,771,099✔
975
  SVTableScanOperatorParam* pVScan = NULL;
4,771,099✔
976
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,771,099✔
977
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,771,099✔
978

979
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
4,771,099✔
980
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
4,771,099✔
981

982
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
4,771,099✔
983
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
4,771,099✔
984
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
4,771,099✔
985
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
4,771,099✔
986
  pVScan->uid = uid;
4,771,099✔
987
  pVScan->window = pInfo->vtbScan.window;
4,771,099✔
988

989
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
4,771,099✔
990
  (*ppRes)->downstreamIdx = 0;
4,771,099✔
991
  (*ppRes)->value = pVScan;
4,771,099✔
992
  (*ppRes)->reUse = false;
4,771,099✔
993

994
  return TSDB_CODE_SUCCESS;
4,771,099✔
UNCOV
995
_return:
×
996
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
997
  if (pVScan) {
×
998
    taosArrayDestroy(pVScan->pOpParamArray);
×
999
    taosMemoryFreeClear(pVScan);
×
1000
  }
UNCOV
1001
  if (*ppRes) {
×
1002
    taosArrayDestroy((*ppRes)->pChildren);
×
1003
    taosMemoryFreeClear(*ppRes);
×
1004
  }
UNCOV
1005
  return code;
×
1006
}
1007

1008
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
693,870✔
1009
  int32_t                       code = TSDB_CODE_SUCCESS;
693,870✔
1010
  int32_t                       lino = 0;
693,870✔
1011
  SExternalWindowOperatorParam* pExtWinOp = NULL;
693,870✔
1012

1013
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
693,870✔
1014
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
693,870✔
1015

1016
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
693,870✔
1017
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
693,870✔
1018

1019
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
693,870✔
1020
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
693,870✔
1021

1022
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
693,870✔
1023
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
693,870✔
1024

1025
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
693,870✔
1026
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
693,870✔
1027

1028
  SOperatorParam* pExchangeOperator = NULL;
693,870✔
1029
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
693,870✔
1030
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
693,870✔
1031
  QUERY_CHECK_CODE(code, lino, _return);
693,870✔
1032
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
1,387,740✔
1033

1034
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
693,870✔
1035
  (*ppRes)->downstreamIdx = idx;
693,870✔
1036
  (*ppRes)->value = pExtWinOp;
693,870✔
1037
  (*ppRes)->reUse = false;
693,870✔
1038

1039
  return code;
693,870✔
UNCOV
1040
_return:
×
1041
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1042
  if (pExtWinOp) {
×
1043
    if (pExtWinOp->ExtWins) {
×
1044
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1045
    }
UNCOV
1046
    taosMemoryFree(pExtWinOp);
×
1047
  }
UNCOV
1048
  if (*ppRes) {
×
1049
    if ((*ppRes)->pChildren) {
×
1050
      taosArrayDestroy((*ppRes)->pChildren);
×
1051
    }
UNCOV
1052
    taosMemoryFree(*ppRes);
×
1053
    *ppRes = NULL;
×
1054
  }
UNCOV
1055
  return code;
×
1056
}
1057

1058
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
288,540✔
1059
                                       int32_t numOfDownstream, int32_t numOfWins) {
1060
  int32_t                   code = TSDB_CODE_SUCCESS;
288,540✔
1061
  int32_t                   lino = 0;
288,540✔
1062
  SMergeOperatorParam*      pMergeOp = NULL;
288,540✔
1063

1064
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
288,540✔
1065
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
288,540✔
1066

1067
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
288,540✔
1068
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
288,540✔
1069

1070
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
288,540✔
1071
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
288,540✔
1072

1073
  pMergeOp->winNum = numOfWins;
288,540✔
1074

1075
  for (int32_t i = 0; i < numOfDownstream; i++) {
982,410✔
1076
    SOperatorParam* pExternalWinParam = NULL;
693,870✔
1077
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
693,870✔
1078
    QUERY_CHECK_CODE(code, lino, _return);
693,870✔
1079
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
1,387,740✔
1080
  }
1081

1082
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
288,540✔
1083
  (*ppRes)->downstreamIdx = 0;
288,540✔
1084
  (*ppRes)->value = pMergeOp;
288,540✔
1085
  (*ppRes)->reUse = false;
288,540✔
1086

1087
  return TSDB_CODE_SUCCESS;
288,540✔
UNCOV
1088
_return:
×
1089
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1090
  if (pMergeOp) {
×
1091
    taosMemoryFree(pMergeOp);
×
1092
  }
UNCOV
1093
  if (*ppRes) {
×
1094
    if ((*ppRes)->pChildren) {
×
1095
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1096
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
1097
        if (pChildParam) {
×
1098
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
1099
          if (pExtWinOp) {
×
1100
            if (pExtWinOp->ExtWins) {
×
1101
              taosArrayDestroy(pExtWinOp->ExtWins);
×
1102
            }
UNCOV
1103
            taosMemoryFree(pExtWinOp);
×
1104
          }
UNCOV
1105
          taosMemoryFree(pChildParam);
×
1106
        }
1107
      }
UNCOV
1108
      taosArrayDestroy((*ppRes)->pChildren);
×
1109
    }
UNCOV
1110
    taosMemoryFree(*ppRes);
×
1111
    *ppRes = NULL;
×
1112
  }
UNCOV
1113
  return code;
×
1114
}
1115

1116
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
93,110✔
1117
  int32_t                   code = TSDB_CODE_SUCCESS;
93,110✔
1118
  int32_t                   lino = 0;
93,110✔
1119
  SOperatorParam*           pParam = NULL;
93,110✔
1120
  SOperatorParam*           pExchangeParam = NULL;
93,110✔
1121
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
93,110✔
1122
  bool                      freeExchange = false;
93,110✔
1123

1124
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
93,110✔
1125
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
93,110✔
1126

1127
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
93,110✔
1128
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
93,110✔
1129

1130
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
93,110✔
1131
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
93,110✔
1132

1133
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
93,110✔
1134
  QUERY_CHECK_CODE(code, lino, _return);
93,110✔
1135

1136
  freeExchange = true;
93,110✔
1137

1138
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
186,220✔
1139

1140
  freeExchange = false;
93,110✔
1141

1142
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
93,110✔
1143
  pParam->downstreamIdx = 0;
93,110✔
1144
  pParam->reUse = false;
93,110✔
1145

1146
  *ppRes = pParam;
93,110✔
1147

1148
  return code;
93,110✔
UNCOV
1149
_return:
×
1150
  if (freeExchange) {
×
1151
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1152
  }
UNCOV
1153
  if (pParam) {
×
1154
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1155
  }
UNCOV
1156
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1157
  return code;
×
1158
}
1159

1160
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
2,789,595✔
1161
  int32_t                   code = TSDB_CODE_SUCCESS;
2,789,595✔
1162
  int32_t                   lino = 0;
2,789,595✔
1163
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,789,595✔
1164
  SOperatorParam*           pParam = NULL;
2,789,595✔
1165
  SOperatorParam*           pExchangeParam = NULL;
2,789,595✔
1166
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
2,789,595✔
1167
  bool                      freeExchange = false;
2,789,595✔
1168
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
2,789,595✔
1169

1170
  if (!pIter) {
2,789,595✔
1171
    *ppRes = NULL;
483,545✔
1172
    return code;
483,545✔
1173
  }
1174

1175
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
2,306,050✔
1176

1177
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
2,306,050✔
1178
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
2,306,050✔
1179

1180
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
2,306,050✔
1181
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
2,306,050✔
1182

1183
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
2,306,050✔
1184
  QUERY_CHECK_CODE(code, lino, _return);
2,306,050✔
1185

1186
  freeExchange = true;
2,306,050✔
1187

1188
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
4,612,100✔
1189

1190
  freeExchange = false;
2,306,050✔
1191

1192
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
2,306,050✔
1193
  pParam->downstreamIdx = 0;
2,306,050✔
1194
  pParam->value = NULL;
2,306,050✔
1195
  pParam->reUse = false;
2,306,050✔
1196

1197
  *ppRes = pParam;
2,306,050✔
1198

1199
  return code;
2,306,050✔
UNCOV
1200
_return:
×
1201
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1202
  if (freeExchange) {
×
1203
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1204
  }
UNCOV
1205
  if (pParam) {
×
1206
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1207
  }
UNCOV
1208
  return code;
×
1209
}
1210

1211
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
2,602,192✔
1212
  int32_t                   code = TSDB_CODE_SUCCESS;
2,602,192✔
1213
  int32_t                   lino = 0;
2,602,192✔
1214
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,602,192✔
1215
  SOperatorParam*           pParam = NULL;
2,602,192✔
1216
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
2,602,192✔
1217
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
2,602,192✔
1218

1219
  if (pIter) {
2,602,192✔
1220
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
1,964,112✔
1221

1222
    code = buildBatchExchangeOperatorParamForVirtual(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
1,964,112✔
1223
    QUERY_CHECK_CODE(code, lino, _return);
1,964,112✔
1224

1225
    *ppRes = pParam;
1,964,112✔
1226
  } else {
1227
    *ppRes = NULL;
638,080✔
1228
  }
1229

1230
  return code;
2,602,192✔
UNCOV
1231
_return:
×
1232
  if (pParam) {
×
1233
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1234
  }
UNCOV
1235
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1236
  return code;
×
1237
}
1238

1239
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,640,643✔
1240
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,640,643✔
1241
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,640,643✔
1242
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,640,643✔
1243
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,640,643✔
1244
  SOperatorParam*            pParam = NULL;
3,640,643✔
1245
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,640,643✔
1246
  if (TSDB_CODE_SUCCESS != code) {
3,640,643✔
UNCOV
1247
    pOperator->pTaskInfo->code = code;
×
1248
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1249
  }
1250

1251
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,640,643✔
1252
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,640,643✔
1253
  if (*ppRes && (code == 0)) {
3,640,643✔
1254
    code = blockDataCheck(*ppRes);
209,890✔
1255
    if (code) {
209,890✔
UNCOV
1256
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1257
      pOperator->pTaskInfo->code = code;
×
1258
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1259
    }
1260
    pPost->isStarted = true;
209,890✔
1261
    pStbJoin->execInfo.postBlkNum++;
209,890✔
1262
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
209,890✔
1263
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
209,890✔
1264
  } else {
1265
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,430,753✔
1266
  }
1267
}
3,640,643✔
1268

1269

UNCOV
1270
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1271
  SOperatorParam* pGcParam = NULL;
×
1272
  SOperatorParam* pMergeJoinParam = NULL;
×
1273
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1274
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1275
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1276

UNCOV
1277
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1278

UNCOV
1279
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1280
  if (TSDB_CODE_SUCCESS != code) {
×
1281
    return code;
×
1282
  }
UNCOV
1283
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1284
  if (TSDB_CODE_SUCCESS != code) {
×
1285
    return code;
×
1286
  }
1287

UNCOV
1288
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1289
}
1290

1291
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,640,151✔
1292
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,640,151✔
1293
  int32_t code = 0;
3,640,151✔
1294
  
1295
  pPost->isStarted = false;
3,640,151✔
1296
  
1297
  if (pStbJoin->basic.batchFetch) {
3,640,151✔
1298
    return TSDB_CODE_SUCCESS;
3,637,943✔
1299
  }
1300
  
1301
  if (pPost->leftNeedCache) {
2,208✔
UNCOV
1302
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1303
    if (num && --(*num) <= 0) {
×
1304
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1305
      if (code) {
×
1306
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1307
        QRY_ERR_RET(code);
×
1308
      }
UNCOV
1309
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1310
    }
1311
  }
1312
  
1313
  if (!pPost->rightNeedCache) {
2,208✔
1314
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,208✔
1315
    if (NULL != v) {
2,208✔
UNCOV
1316
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1317
      if (code) {
×
1318
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1319
        QRY_ERR_RET(code);
×
1320
      }
UNCOV
1321
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1322
    }
1323
  }
1324

1325
  return TSDB_CODE_SUCCESS;
2,208✔
1326
}
1327

1328

1329
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1330
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
286,856✔
1331
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
286,856✔
1332
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
286,856✔
1333

1334
  if (!pPost->isStarted) {
286,856✔
1335
    return TSDB_CODE_SUCCESS;
77,458✔
1336
  }
1337
  
1338
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
209,398✔
1339
  
1340
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
209,398✔
1341
  if (NULL == *ppRes) {
209,398✔
1342
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
209,398✔
1343
    pPrev->pListHead->readIdx++;
209,398✔
1344
  } else {
UNCOV
1345
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1346
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1347
  }
1348

1349
  return TSDB_CODE_SUCCESS;
209,398✔
1350
}
1351

1352
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1353
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,280,806✔
1354
  if (NULL == ppArray) {
7,280,806✔
1355
    SArray* pArray = taosArrayInit(10, valSize);
253,164✔
1356
    if (NULL == pArray) {
253,164✔
UNCOV
1357
      return terrno;
×
1358
    }
1359
    if (NULL == taosArrayPush(pArray, pVal)) {
506,328✔
UNCOV
1360
      taosArrayDestroy(pArray);
×
1361
      return terrno;
×
1362
    }
1363
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
253,164✔
UNCOV
1364
      taosArrayDestroy(pArray);      
×
1365
      return terrno;
×
1366
    }
1367
    return TSDB_CODE_SUCCESS;
253,164✔
1368
  }
1369

1370
  if (NULL == taosArrayPush(*ppArray, pVal)) {
14,055,284✔
UNCOV
1371
    return terrno;
×
1372
  }
1373
  
1374
  return TSDB_CODE_SUCCESS;
7,027,642✔
1375
}
1376

1377
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1378
  int32_t code = TSDB_CODE_SUCCESS;
2,208✔
1379
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,208✔
1380
  if (NULL == pNum) {
2,208✔
1381
    uint32_t n = 1;
2,208✔
1382
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,208✔
1383
    if (code) {
2,208✔
UNCOV
1384
      return code;
×
1385
    }
1386
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,208✔
1387
    if (code) {
2,208✔
UNCOV
1388
      return code;
×
1389
    }
1390
    return TSDB_CODE_SUCCESS;
2,208✔
1391
  }
1392

UNCOV
1393
  switch (*pNum) {
×
1394
    case 0:
×
1395
      break;
×
1396
    case UINT32_MAX:
×
1397
      *pNum = 0;
×
1398
      break;
×
1399
    default:
×
1400
      if (1 == (*pNum)) {
×
1401
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1402
        if (code) {
×
1403
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1404
          QRY_ERR_RET(code);
×
1405
        }
1406
      }
UNCOV
1407
      (*pNum)++;
×
1408
      break;
×
1409
  }
1410
  
UNCOV
1411
  return TSDB_CODE_SUCCESS;
×
1412
}
1413

1414

1415
static void freeStbJoinTableList(SStbJoinTableList* pList) {
76,966✔
1416
  if (NULL == pList) {
76,966✔
UNCOV
1417
    return;
×
1418
  }
1419
  taosMemoryFree(pList->pLeftVg);
76,966✔
1420
  taosMemoryFree(pList->pLeftUid);
76,966✔
1421
  taosMemoryFree(pList->pRightVg);
76,966✔
1422
  taosMemoryFree(pList->pRightUid);
76,966✔
1423
  taosMemoryFree(pList);
76,966✔
1424
}
1425

1426
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
77,458✔
1427
  int32_t code = TSDB_CODE_SUCCESS;
77,458✔
1428
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
77,458✔
1429
  if (NULL == pNew) {
77,458✔
UNCOV
1430
    return terrno;
×
1431
  }
1432
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
77,458✔
1433
  if (NULL == pNew->pLeftVg) {
77,458✔
UNCOV
1434
    code = terrno;
×
1435
    freeStbJoinTableList(pNew);
×
1436
    return code;
×
1437
  }
1438
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
77,458✔
1439
  if (NULL == pNew->pLeftUid) {
77,458✔
UNCOV
1440
    code = terrno;
×
1441
    freeStbJoinTableList(pNew);
×
1442
    return code;
×
1443
  }
1444
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
77,458✔
1445
  if (NULL == pNew->pRightVg) {
77,458✔
UNCOV
1446
    code = terrno;
×
1447
    freeStbJoinTableList(pNew);
×
1448
    return code;
×
1449
  }
1450
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
77,458✔
1451
  if (NULL == pNew->pRightUid) {
77,458✔
UNCOV
1452
    code = terrno;
×
1453
    freeStbJoinTableList(pNew);
×
1454
    return code;
×
1455
  }
1456

1457
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
77,458✔
1458
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
77,458✔
1459
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
77,458✔
1460
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
77,458✔
1461

1462
  pNew->readIdx = 0;
77,458✔
1463
  pNew->uidNum = rows;
77,458✔
1464
  pNew->pNext = NULL;
77,458✔
1465
  
1466
  if (pCtx->pListTail) {
77,458✔
UNCOV
1467
    pCtx->pListTail->pNext = pNew;
×
1468
    pCtx->pListTail = pNew;
×
1469
  } else {
1470
    pCtx->pListHead = pNew;
77,458✔
1471
    pCtx->pListTail= pNew;
77,458✔
1472
  }
1473

1474
  return TSDB_CODE_SUCCESS;
77,458✔
1475
}
1476

1477
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
77,458✔
1478
  int32_t                    code = TSDB_CODE_SUCCESS;
77,458✔
1479
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
77,458✔
1480
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
77,458✔
1481
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
77,458✔
1482
  if (NULL == pVg0) {
77,458✔
UNCOV
1483
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1484
  }
1485
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
77,458✔
1486
  if (NULL == pVg1) {
77,458✔
UNCOV
1487
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1488
  }
1489
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
77,458✔
1490
  if (NULL == pUid0) {
77,458✔
UNCOV
1491
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1492
  }
1493
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
77,458✔
1494
  if (NULL == pUid1) {
77,458✔
UNCOV
1495
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1496
  }
1497

1498
  if (pStbJoin->basic.batchFetch) {
77,458✔
1499
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,716,757✔
1500
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,640,403✔
1501
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,640,403✔
1502
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,640,403✔
1503
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,640,403✔
1504

1505
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,640,403✔
1506
      if (TSDB_CODE_SUCCESS != code) {
3,640,403✔
UNCOV
1507
        break;
×
1508
      }
1509
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,640,403✔
1510
      if (TSDB_CODE_SUCCESS != code) {
3,640,403✔
UNCOV
1511
        break;
×
1512
      }
1513
    }
1514
  } else {
1515
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,312✔
1516
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,208✔
1517
    
1518
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,208✔
1519
      if (TSDB_CODE_SUCCESS != code) {
2,208✔
UNCOV
1520
        break;
×
1521
      }
1522
    }
1523
  }
1524

1525
  if (TSDB_CODE_SUCCESS == code) {
77,458✔
1526
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
77,458✔
1527
    if (TSDB_CODE_SUCCESS == code) {
77,458✔
1528
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
77,458✔
1529
    }
1530
  }
1531

UNCOV
1532
_return:
×
1533

1534
  if (TSDB_CODE_SUCCESS != code) {
77,458✔
UNCOV
1535
    pOperator->pTaskInfo->code = code;
×
1536
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1537
  }
1538
}
77,458✔
1539

1540

1541
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,135,456✔
1542
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,135,456✔
1543
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,135,456✔
1544

1545
  if (pStbJoin->basic.batchFetch) {
1,135,456✔
1546
    return;
1,134,352✔
1547
  }
1548

1549
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,104✔
1550
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,104✔
1551
    return;
1,104✔
1552
  }
1553

UNCOV
1554
  uint64_t* pUid = NULL;
×
1555
  int32_t iter = 0;
×
1556
  int32_t code = 0;
×
1557
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1558
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1559
    if (code) {
×
1560
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1561
    }
1562
  }
1563

UNCOV
1564
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1565
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1566

1567
/*
1568
  // debug only
1569
  iter = 0;
1570
  uint32_t* num = NULL;
1571
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1572
    A S S E R T(*num > 1);
1573
  }
1574
*/  
1575
}
1576

1577
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,135,456✔
1578
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,135,456✔
1579
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,135,456✔
1580

1581
  while (true) {
77,458✔
1582
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,212,914✔
1583
    if (NULL == pBlock) {
1,212,914✔
1584
      break;
1,135,456✔
1585
    }
1586

1587
    pStbJoin->execInfo.prevBlkNum++;
77,458✔
1588
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
77,458✔
1589
    
1590
    doBuildStbJoinTableHash(pOperator, pBlock);
77,458✔
1591
  }
1592

1593
  postProcessStbJoinTableHash(pOperator);
1,135,456✔
1594

1595
  pStbJoin->ctx.prev.joinBuild = true;
1,135,456✔
1596
}
1,135,456✔
1597

1598
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
286,856✔
1599
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
286,856✔
1600
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
286,856✔
1601
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
286,856✔
1602
  SStbJoinTableList*         pNode = pPrev->pListHead;
286,856✔
1603

1604
  while (pNode) {
3,794,575✔
1605
    if (pNode->readIdx >= pNode->uidNum) {
3,717,609✔
1606
      pPrev->pListHead = pNode->pNext;
76,966✔
1607
      freeStbJoinTableList(pNode);
76,966✔
1608
      pNode = pPrev->pListHead;
76,966✔
1609
      continue;
76,966✔
1610
    }
1611
    
1612
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,640,643✔
1613
    if (*ppRes) {
3,640,643✔
1614
      return TSDB_CODE_SUCCESS;
209,890✔
1615
    }
1616

1617
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,430,753✔
1618
    pPrev->pListHead->readIdx++;
3,430,753✔
1619
  }
1620

1621
  *ppRes = NULL;
76,966✔
1622
  setOperatorCompleted(pOperator);
76,966✔
1623

1624
  return TSDB_CODE_SUCCESS;
76,966✔
1625
}
1626

1627
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,344,854✔
1628
  if (pBlock) {
1,344,854✔
1629
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
209,890✔
1630
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
209,890✔
1631
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
209,890✔
1632

1633
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
212,098✔
1634
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,208✔
1635
        if (pSlot == NULL) {
2,208✔
UNCOV
1636
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1637
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1638
        }
1639
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,208✔
1640
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,208✔
1641
        if (code != TSDB_CODE_SUCCESS) {
2,208✔
UNCOV
1642
          return code;
×
1643
        }
1644
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,208✔
1645
        if (code != TSDB_CODE_SUCCESS) {
2,208✔
UNCOV
1646
          return code;
×
1647
        }
1648
      }
1649
    } else {
UNCOV
1650
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1651
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1652
    }
1653
  }
1654
  return TSDB_CODE_SUCCESS;
1,344,854✔
1655
}
1656

1657
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,370,026✔
1658
  int32_t                    code = TSDB_CODE_SUCCESS;
1,370,026✔
1659
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,370,026✔
1660
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,370,026✔
1661

1662
  QRY_PARAM_CHECK(pRes);
1,370,026✔
1663
  if (pOperator->status == OP_EXEC_DONE) {
1,370,026✔
1664
    return code;
25,172✔
1665
  }
1666

1667
  int64_t st = 0;
1,344,854✔
1668
  if (pOperator->cost.openCost == 0) {
1,344,854✔
1669
    st = taosGetTimestampUs();
1,134,958✔
1670
  }
1671

1672
  if (!pStbJoin->ctx.prev.joinBuild) {
1,344,854✔
1673
    buildStbJoinTableList(pOperator);
1,135,456✔
1674
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,135,456✔
1675
      setOperatorCompleted(pOperator);
1,057,998✔
1676
      goto _return;
1,057,998✔
1677
    }
1678
  }
1679

1680
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
286,856✔
1681
  if (*pRes) {
286,856✔
UNCOV
1682
    goto _return;
×
1683
  }
1684

1685
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
286,856✔
1686

1687
_return:
286,856✔
1688
  if (pOperator->cost.openCost == 0) {
1,344,854✔
1689
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,134,958✔
1690
  }
1691

1692
  if (code) {
1,344,854✔
UNCOV
1693
    qError("%s failed since %s", __func__, tstrerror(code));
×
1694
    pOperator->pTaskInfo->code = code;
×
1695
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1696
  } else {
1697
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,344,854✔
1698
  }
1699
  return code;
1,344,854✔
1700
}
1701

1702
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,504,392✔
1703
  int32_t                    lino = 0;
2,504,392✔
1704
  SOperatorInfo*             operator=(SOperatorInfo*) param;
2,504,392✔
1705
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
2,504,392✔
1706

1707
  if (TSDB_CODE_SUCCESS != code) {
2,504,392✔
UNCOV
1708
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1709
    if (operator->pTaskInfo->code != code) {
×
1710
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1711
             tstrerror(operator->pTaskInfo->code));
1712
    } else {
UNCOV
1713
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1714
    }
UNCOV
1715
    goto _return;
×
1716
  }
1717

1718
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
2,504,392✔
1719
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
2,504,392✔
1720

1721
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
2,504,392✔
1722
  QUERY_CHECK_CODE(code, lino, _return);
2,504,392✔
1723

1724
  taosMemoryFreeClear(pMsg->pData);
2,504,392✔
1725

1726
  code = tsem_post(&pScanResInfo->vtbScan.ready);
2,504,392✔
1727
  QUERY_CHECK_CODE(code, lino, _return);
2,504,392✔
1728

1729
  return code;
2,504,392✔
UNCOV
1730
_return:
×
1731
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1732
  return code;
×
1733
}
1734

1735
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
2,504,392✔
1736
  int32_t                    code = TSDB_CODE_SUCCESS;
2,504,392✔
1737
  int32_t                    lino = 0;
2,504,392✔
1738
  char*                      buf1 = NULL;
2,504,392✔
1739
  SUseDbReq*                 pReq = NULL;
2,504,392✔
1740
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
2,504,392✔
1741

1742
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
2,504,392✔
1743
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
2,504,392✔
1744
  code = tNameGetFullDbName(name, pReq->db);
2,504,392✔
1745
  QUERY_CHECK_CODE(code, lino, _return);
2,504,392✔
1746
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
2,504,392✔
1747
  buf1 = taosMemoryCalloc(1, contLen);
2,504,392✔
1748
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
2,504,392✔
1749
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
2,504,392✔
1750
  if (tempRes < 0) {
2,504,392✔
UNCOV
1751
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1752
  }
1753

1754
  // send the fetch remote task result request
1755
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,504,392✔
1756
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
2,504,392✔
1757

1758
  pMsgSendInfo->param = pOperator;
2,504,392✔
1759
  pMsgSendInfo->msgInfo.pData = buf1;
2,504,392✔
1760
  pMsgSendInfo->msgInfo.len = contLen;
2,504,392✔
1761
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
2,504,392✔
1762
  pMsgSendInfo->fp = dynProcessUseDbRsp;
2,504,392✔
1763
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
2,504,392✔
1764

1765
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
2,504,392✔
1766
  QUERY_CHECK_CODE(code, lino, _return);
2,504,392✔
1767

1768
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
2,504,392✔
1769
  QUERY_CHECK_CODE(code, lino, _return);
2,504,392✔
1770

1771
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
2,504,392✔
1772
  QUERY_CHECK_CODE(code, lino, _return);
2,504,392✔
1773

1774
_return:
2,504,392✔
1775
  if (code) {
2,504,392✔
UNCOV
1776
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1777
     taosMemoryFree(buf1);
×
1778
  }
1779
  taosMemoryFree(pReq);
2,504,392✔
1780
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
2,504,392✔
1781
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
2,504,392✔
1782
  return code;
2,504,392✔
1783
}
1784

1785
int dynVgInfoComp(const void* lp, const void* rp) {
4,999,474✔
1786
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
4,999,474✔
1787
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
4,999,474✔
1788
  if (pLeft->hashBegin < pRight->hashBegin) {
4,999,474✔
1789
    return -1;
4,999,474✔
UNCOV
1790
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1791
    return 1;
×
1792
  }
1793

UNCOV
1794
  return 0;
×
1795
}
1796

1797
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
14,861,914✔
1798
  if (NULL == dbInfo) {
14,861,914✔
UNCOV
1799
    return TSDB_CODE_SUCCESS;
×
1800
  }
1801

1802
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
14,861,914✔
1803
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
2,504,392✔
1804
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
2,504,392✔
1805
    if (NULL == dbInfo->vgArray) {
2,504,392✔
UNCOV
1806
      return terrno;
×
1807
    }
1808

1809
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
2,504,392✔
1810
    while (pIter) {
7,508,521✔
1811
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
10,008,258✔
UNCOV
1812
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1813
        return terrno;
×
1814
      }
1815

1816
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
5,004,129✔
1817
    }
1818

1819
    taosArraySort(dbInfo->vgArray, sort_func);
2,504,392✔
1820
  }
1821

1822
  return TSDB_CODE_SUCCESS;
14,861,914✔
1823
}
1824

1825
int32_t dynHashValueComp(void const* lp, void const* rp) {
22,367,735✔
1826
  uint32_t*    key = (uint32_t*)lp;
22,367,735✔
1827
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
22,367,735✔
1828

1829
  if (*key < pVg->hashBegin) {
22,367,735✔
UNCOV
1830
    return -1;
×
1831
  } else if (*key > pVg->hashEnd) {
22,367,735✔
1832
    return 1;
7,505,821✔
1833
  }
1834

1835
  return 0;
14,861,914✔
1836
}
1837

1838
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
14,861,914✔
1839
  int32_t code = 0;
14,861,914✔
1840
  int32_t lino = 0;
14,861,914✔
1841
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
14,861,914✔
1842
  QUERY_CHECK_CODE(code, lino, _return);
14,861,914✔
1843

1844
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
14,861,914✔
1845
  if (vgNum <= 0) {
14,861,914✔
UNCOV
1846
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1847
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1848
  }
1849

1850
  SVgroupInfo* vgInfo = NULL;
14,861,914✔
1851
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
14,861,914✔
1852
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
14,861,914✔
1853
  int32_t offset = (int32_t)strlen(tbFullName);
14,861,914✔
1854

1855
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
14,861,914✔
1856
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
29,723,828✔
1857
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
14,861,914✔
1858

1859
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
14,861,914✔
1860
  if (NULL == vgInfo) {
14,861,914✔
UNCOV
1861
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1862
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
UNCOV
1863
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1864
  }
1865

1866
  *vgId = vgInfo->vgId;
14,861,914✔
1867

1868
_return:
14,861,914✔
1869
  return code;
14,861,914✔
1870
}
1871

1872
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
128,104,202✔
1873
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
128,104,202✔
1874
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
128,104,202✔
1875
  SArray *                   pColList = pVtbScan->readColList;
128,104,202✔
1876
  if (pVtbScan->scanAllCols) {
128,104,202✔
1877
    return true;
11,086,080✔
1878
  }
1879
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
687,423,426✔
1880
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
603,082,271✔
1881
      return true;
32,676,967✔
1882
    }
1883
  }
1884
  return false;
84,341,155✔
1885
}
1886

1887
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
43,788,067✔
1888
  int32_t                    code = TSDB_CODE_SUCCESS;
43,788,067✔
1889
  int32_t                    line = 0;
43,788,067✔
1890
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
43,788,067✔
1891
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
43,788,067✔
1892
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
43,788,067✔
1893
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
43,788,067✔
1894
  SUseDbOutput*              output = NULL;
43,788,067✔
1895
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
43,788,067✔
1896

1897
  QRY_PARAM_CHECK(dbVgInfo);
43,788,067✔
1898

1899
  if (find == NULL) {
43,788,067✔
1900
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
2,504,392✔
1901
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
2,504,392✔
1902
    QUERY_CHECK_CODE(code, line, _return);
2,504,392✔
1903
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
2,504,392✔
1904
    QUERY_CHECK_CODE(code, line, _return);
2,504,392✔
1905
  } else {
1906
    output = *find;
41,283,675✔
1907
  }
1908

1909
  *dbVgInfo = output->dbVgroup;
43,788,067✔
1910
  return code;
43,788,067✔
UNCOV
1911
_return:
×
1912
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1913
  freeUseDbOutput(output);
×
1914
  return code;
×
1915
}
1916

1917
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
43,788,067✔
1918
  int32_t     code = TSDB_CODE_SUCCESS;
43,788,067✔
1919
  int32_t     line = 0;
43,788,067✔
1920

1921
  const char *first_dot = strchr(colref, '.');
43,788,067✔
1922
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
43,788,067✔
1923

1924
  const char *second_dot = strchr(first_dot + 1, '.');
43,788,067✔
1925
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
43,788,067✔
1926

1927
  size_t db_len = first_dot - colref;
43,788,067✔
1928
  size_t table_len = second_dot - first_dot - 1;
43,788,067✔
1929
  size_t col_len = strlen(second_dot + 1);
43,788,067✔
1930

1931
  *refDb = taosMemoryMalloc(db_len + 1);
43,788,067✔
1932
  *refTb = taosMemoryMalloc(table_len + 1);
43,788,067✔
1933
  *refCol = taosMemoryMalloc(col_len + 1);
43,788,067✔
1934
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
43,788,067✔
1935
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
43,788,067✔
1936
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
43,788,067✔
1937

1938
  tstrncpy(*refDb, colref, db_len + 1);
43,788,067✔
1939
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
43,788,067✔
1940
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
43,788,067✔
1941

1942
  return TSDB_CODE_SUCCESS;
43,788,067✔
UNCOV
1943
_return:
×
1944
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1945
  if (*refDb) {
×
1946
    taosMemoryFree(*refDb);
×
1947
    *refDb = NULL;
×
1948
  }
UNCOV
1949
  if (*refTb) {
×
1950
    taosMemoryFree(*refTb);
×
1951
    *refTb = NULL;
×
1952
  }
UNCOV
1953
  if (*refCol) {
×
1954
    taosMemoryFree(*refCol);
×
1955
    *refCol = NULL;
×
1956
  }
UNCOV
1957
  return code;
×
1958
}
1959

1960
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
313,931,282✔
1961
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
313,931,282✔
1962
      strlen(expectTbName) == varDataLen(tbName) &&
206,064,896✔
1963
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
206,064,896✔
1964
      strlen(expectDbName) == varDataLen(dbName)) {
206,064,896✔
1965
    return true;
206,064,896✔
1966
  }
1967
  return false;
107,866,386✔
1968
}
1969

1970
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
206,064,896✔
1971
  int32_t          code = TSDB_CODE_SUCCESS;
206,064,896✔
1972
  int32_t          line = 0;
206,064,896✔
1973

1974
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
206,064,896✔
1975
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
206,064,896✔
1976
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
206,064,896✔
1977
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
206,064,896✔
1978
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
206,064,896✔
1979
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
206,064,896✔
1980

1981
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
206,064,896✔
1982
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
206,064,896✔
1983
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
206,064,896✔
1984
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
206,064,896✔
1985
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
206,064,896✔
1986
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
206,064,896✔
1987

1988
  if (colDataIsNull_s(pRefCol, index)) {
412,129,792✔
1989
    pInfo->colrefName = NULL;
77,954,604✔
1990
  } else {
1991
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
128,110,292✔
1992
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
128,110,292✔
1993
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
128,110,292✔
1994
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
128,110,292✔
1995
  }
1996

1997
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
206,064,896✔
1998
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
206,064,896✔
1999
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
206,064,896✔
2000
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
206,064,896✔
2001

2002
  if (!colDataIsNull_s(pUidCol, index)) {
412,129,792✔
2003
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
206,064,896✔
2004
  }
2005
  if (!colDataIsNull_s(pColIdCol, index)) {
412,129,792✔
2006
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
128,110,292✔
2007
  }
2008
  if (!colDataIsNull_s(pVgIdCol, index)) {
412,129,792✔
2009
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
206,064,896✔
2010
  }
2011

UNCOV
2012
_return:
×
2013
  return code;
206,064,896✔
2014
}
2015

2016
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,323,979✔
2017
  int32_t                    code = TSDB_CODE_SUCCESS;
1,323,979✔
2018
  int32_t                    line = 0;
1,323,979✔
2019

2020
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,323,979✔
2021
    return code;
1,219,782✔
2022
  }
2023

2024
  if (pVtbScan->existOrgTbVg == NULL) {
104,197✔
UNCOV
2025
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2026
    pVtbScan->curOrgTbVg = NULL;
×
2027
  }
2028

2029
  if (pVtbScan->curOrgTbVg != NULL) {
104,197✔
2030
    // which means rversion has changed
2031
    void*   pCurIter = NULL;
9,700✔
2032
    SArray* tmpArray = NULL;
9,700✔
2033
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
28,010✔
2034
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
18,310✔
2035
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
18,310✔
2036
        if (tmpArray == NULL) {
2,310✔
2037
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,310✔
2038
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,310✔
2039
        }
2040
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,310✔
2041
      }
2042
    }
2043
    if (tmpArray == NULL) {
9,700✔
2044
      return TSDB_CODE_SUCCESS;
7,390✔
2045
    }
2046
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,310✔
2047
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,310✔
2048
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,310✔
UNCOV
2049
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2050
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2051
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2052
        }
UNCOV
2053
        taosArrayDestroy(expiredInfo);
×
2054
      }
2055
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,310✔
UNCOV
2056
        taosArrayDestroy(tmpArray);
×
2057
      }
2058
    }
2059
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,310✔
2060
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,310✔
2061
    taosHashClear(pVtbScan->curOrgTbVg);
2,310✔
2062
    pVtbScan->needRedeploy = true;
2,310✔
2063
    pVtbScan->rversion = rversion;
2,310✔
2064
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,310✔
2065
  }
2066
  return code;
94,497✔
UNCOV
2067
_return:
×
2068
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2069
  return code;
×
2070
}
2071

2072
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
25,020✔
2073
  int32_t                    code =TSDB_CODE_SUCCESS;
25,020✔
2074
  int32_t                    line = 0;
25,020✔
2075
  char*                      refDbName = NULL;
25,020✔
2076
  char*                      refTbName = NULL;
25,020✔
2077
  char*                      refColName = NULL;
25,020✔
2078
  SDBVgInfo*                 dbVgInfo = NULL;
25,020✔
2079
  SName                      name = {0};
25,020✔
2080
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
25,020✔
2081
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
25,020✔
2082

2083
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
25,020✔
2084
  QUERY_CHECK_CODE(code, line, _return);
25,020✔
2085

2086
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
25,020✔
2087

2088
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
25,020✔
2089
  QUERY_CHECK_CODE(code, line, _return);
25,020✔
2090

2091
  code = tNameGetFullDbName(&name, dbFname);
25,020✔
2092
  QUERY_CHECK_CODE(code, line, _return);
25,020✔
2093

2094
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
25,020✔
2095
  QUERY_CHECK_CODE(code, line, _return);
25,020✔
2096

2097
_return:
25,020✔
2098
  if (code) {
25,020✔
UNCOV
2099
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2100
  }
2101
  taosMemoryFree(refDbName);
25,020✔
2102
  taosMemoryFree(refTbName);
25,020✔
2103
  taosMemoryFree(refColName);
25,020✔
2104
  return code;
25,020✔
2105
}
2106

2107
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
2,602,192✔
2108
  int32_t code = TSDB_CODE_SUCCESS;
2,602,192✔
2109
  int32_t line = 0;
2,602,192✔
2110
  STagVal tagVal = {0};
2,602,192✔
2111
  // last col is uid
2112

2113
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
2,602,192✔
2114
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
2,602,192✔
2115

2116
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
18,771,560✔
2117
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
16,169,368✔
2118
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
16,169,368✔
2119
    tagVal.type = pTagCol->info.type;
16,169,368✔
2120
    tagVal.cid = pTagCol->info.colId;
16,169,368✔
2121
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
32,338,736✔
2122
      char*   pData = colDataGetData(pTagCol, rowIdx);
16,169,368✔
2123
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
16,169,368✔
2124
        tagVal.nData = varDataLen(pData);
7,142,508✔
2125
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
7,142,508✔
2126
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
7,142,508✔
2127
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
7,142,508✔
2128
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
14,285,016✔
2129
      } else {
2130
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
9,026,860✔
2131
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
18,053,720✔
2132
      }
2133
    } else {
UNCOV
2134
      tagVal.pData = NULL;
×
2135
      tagVal.nData = 0;
×
2136
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2137
    }
2138
    tagVal = (STagVal){0};
16,169,368✔
2139
  }
2140
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
2,602,192✔
2141
  QUERY_CHECK_CODE(code, line, _return);
2,602,192✔
2142

2143
  return code;
2,602,192✔
UNCOV
2144
_return:
×
2145
  if (tagVal.pData) {
×
2146
    taosMemoryFreeClear(tagVal.pData);
×
2147
  }
UNCOV
2148
  if (pTagList) {
×
2149
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2150
  }
UNCOV
2151
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2152
  return code;
×
2153
}
2154

2155
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
10,822,226✔
2156
  int32_t                    code = TSDB_CODE_SUCCESS;
10,822,226✔
2157
  int32_t                    line = 0;
10,822,226✔
2158
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
10,822,226✔
2159
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
10,822,226✔
2160
  SDBVgInfo*                 dbVgInfo = NULL;
10,822,226✔
2161

2162
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
216,875,362✔
2163
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
206,053,136✔
2164
    *uid = pKV->uid;
206,053,136✔
2165
    *vgId = pKV->vgId;
206,053,136✔
2166
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
206,053,136✔
2167
      char*   refDbName = NULL;
43,763,047✔
2168
      char*   refTbName = NULL;
43,763,047✔
2169
      char*   refColName = NULL;
43,763,047✔
2170
      SName   name = {0};
43,763,047✔
2171
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
43,763,047✔
2172
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
43,763,047✔
2173

2174
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
43,763,047✔
2175
      QUERY_CHECK_CODE(code, line, _return);
43,763,047✔
2176

2177
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
43,763,047✔
2178

2179
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
43,763,047✔
2180
      QUERY_CHECK_CODE(code, line, _return);
43,763,047✔
2181
      code = tNameGetFullDbName(&name, dbFname);
43,763,047✔
2182
      QUERY_CHECK_CODE(code, line, _return);
43,763,047✔
2183
      code = tNameGetFullTableName(&name, orgTbFName);
43,763,047✔
2184
      QUERY_CHECK_CODE(code, line, _return);
43,763,047✔
2185

2186
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
43,763,047✔
2187
      if (!pVal) {
43,763,047✔
2188
        SOrgTbInfo orgTbInfo = {0};
14,836,894✔
2189
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
14,836,894✔
2190
        QUERY_CHECK_CODE(code, line, _return);
14,836,894✔
2191
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
14,836,894✔
2192
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
14,836,894✔
2193
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
14,836,894✔
2194
        SColIdNameKV colIdNameKV = {0};
14,836,894✔
2195
        colIdNameKV.colId = pKV->colId;
14,836,894✔
2196
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
14,836,894✔
2197
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
29,673,788✔
2198
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
14,836,894✔
2199
        QUERY_CHECK_CODE(code, line, _return);
14,836,894✔
2200
      } else {
2201
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
28,926,153✔
2202
        SColIdNameKV colIdNameKV = {0};
28,926,153✔
2203
        colIdNameKV.colId = pKV->colId;
28,926,153✔
2204
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
28,926,153✔
2205
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
57,852,306✔
2206
      }
2207
      taosMemoryFree(refDbName);
43,763,047✔
2208
      taosMemoryFree(refTbName);
43,763,047✔
2209
      taosMemoryFree(refColName);
43,763,047✔
2210
    }
2211
  }
2212

2213
_return:
10,822,226✔
2214
  if (code) {
10,822,226✔
UNCOV
2215
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2216
  }
2217
  return code;
10,822,226✔
2218
}
2219

2220
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
291,628✔
2221
  int32_t                    code = TSDB_CODE_SUCCESS;
291,628✔
2222
  int32_t                    line = 0;
291,628✔
2223
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
291,628✔
2224
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
291,628✔
2225
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
291,628✔
2226
  SArray*                    pColRefArray = NULL;
291,628✔
2227
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
291,628✔
2228
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
291,628✔
2229

2230
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
291,628✔
2231
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
291,628✔
2232
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
291,628✔
2233
  if (hasPartition) {
291,628✔
2234
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
239,280✔
2235
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
239,280✔
2236
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
239,280✔
2237
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
239,280✔
2238
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
239,280✔
2239
  }
2240

2241
  while (true) {
1,095,714✔
2242
    SSDataBlock *pTagVal = NULL;
1,387,342✔
2243
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
1,387,342✔
2244
    QUERY_CHECK_CODE(code, line, _return);
1,387,342✔
2245
    if (pTagVal == NULL) {
1,387,342✔
2246
      break;
291,628✔
2247
    }
2248
    SHashObj *vtbUidTagListMap = NULL;
1,095,714✔
2249
    if (hasPartition) {
1,095,714✔
2250
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
991,018✔
2251
      if (pIter) {
991,018✔
2252
        vtbUidTagListMap = *(SHashObj**)pIter;
9,970✔
2253
      } else {
2254
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
981,048✔
2255
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
981,048✔
2256
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
981,048✔
2257

2258
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
981,048✔
2259
        QUERY_CHECK_CODE(code, line, _return);
981,048✔
2260
      }
2261
    } else {
2262
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
104,696✔
2263
    }
2264

2265
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,095,714✔
2266
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,095,714✔
2267
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
3,697,906✔
2268
      tb_uid_t uid = 0;
2,602,192✔
2269
      if (!colDataIsNull_s(pUidCol, i)) {
5,204,384✔
2270
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
2,602,192✔
2271
        QUERY_CHECK_CODE(code, line, _return);
2,602,192✔
2272
      }
2273

2274
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
2,602,192✔
2275
      QUERY_CHECK_CODE(code, line, _return);
2,602,192✔
2276

2277
      if (hasPartition) {
2,602,192✔
2278
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
2,153,520✔
2279
        QUERY_CHECK_CODE(code, line, _return);
2,153,520✔
2280
      }
2281
    }
2282
  }
2283

2284
  return code;
291,628✔
2285

UNCOV
2286
_return:
×
2287
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2288
  return code;
×
2289
}
2290

2291
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
291,628✔
2292
  int32_t                    code = TSDB_CODE_SUCCESS;
291,628✔
2293
  int32_t                    line = 0;
291,628✔
2294
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
291,628✔
2295
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
291,628✔
2296
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
291,628✔
2297
  SArray*                    pColRefArray = NULL;
291,628✔
2298
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
291,628✔
2299
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
291,628✔
2300

2301
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
291,628✔
2302
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
291,628✔
2303

2304
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
2,255,740✔
2305
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,964,112✔
2306
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,964,112✔
2307

2308
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
1,964,112✔
2309
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
1,964,112✔
2310

2311
    tb_uid_t uid = 0;
1,964,112✔
2312
    int32_t  vgId = 0;
1,964,112✔
2313
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
1,964,112✔
2314
    QUERY_CHECK_CODE(code, line, _return);
1,964,112✔
2315

2316
    size_t len = 0;
1,964,112✔
2317
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
1,964,112✔
2318
    while (pOrgTbInfo != NULL) {
4,700,428✔
2319
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
2,736,316✔
2320
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
2,736,316✔
2321

2322
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
2,736,316✔
2323
      if (!pIter) {
2,736,316✔
2324
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
2,335,000✔
2325
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,335,000✔
2326
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
4,670,000✔
2327
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
2,335,000✔
2328
        QUERY_CHECK_CODE(code, line, _return);
2,335,000✔
2329
      } else {
2330
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
401,316✔
2331
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
401,316✔
2332
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
401,316✔
2333
      }
2334

2335
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
2,736,316✔
2336

2337
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
2,736,316✔
2338
      QUERY_CHECK_CODE(code, line, _return);
2,736,316✔
2339
    }
2340

2341
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,964,112✔
2342
    QUERY_CHECK_CODE(code, line, _return);
1,964,112✔
2343
  }
2344

2345
  return code;
291,628✔
UNCOV
2346
_return:
×
2347
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2348
  return code;
×
2349
}
2350

2351
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
291,628✔
2352
  int32_t                    code = TSDB_CODE_SUCCESS;
291,628✔
2353
  int32_t                    line = 0;
291,628✔
2354

2355
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
291,628✔
2356
  QUERY_CHECK_CODE(code, line, _return);
291,628✔
2357

2358
  // process tag
2359
  code = getTagBlockAndProcess(pOperator, hasPartition);
291,628✔
2360
  QUERY_CHECK_CODE(code, line, _return);
291,628✔
2361

2362
  return code;
291,628✔
UNCOV
2363
_return:
×
2364
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2365
  return code;
×
2366
}
2367

2368
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
604,260✔
2369
  int32_t                    code = TSDB_CODE_SUCCESS;
604,260✔
2370
  int32_t                    line = 0;
604,260✔
2371
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
604,260✔
2372
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
604,260✔
2373
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
604,260✔
2374
  SArray*                    pColRefArray = NULL;
604,260✔
2375
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
604,260✔
2376
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
604,260✔
2377

2378
  if (hasPartition) {
604,260✔
2379
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
417,342✔
2380
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
417,342✔
2381
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
417,342✔
2382

2383
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
417,342✔
2384
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
417,342✔
2385
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
417,342✔
2386
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
417,342✔
2387
  } else {
2388
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
186,918✔
2389
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
186,918✔
2390
  }
2391

2392
  while (true && hasPartition) {
2,336,439✔
2393
    SSDataBlock* pTagVal = NULL;
2,149,521✔
2394
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
2,149,521✔
2395
    QUERY_CHECK_CODE(code, line, _return);
2,149,521✔
2396
    if (pTagVal == NULL) {
2,149,521✔
2397
      break;
417,342✔
2398
    }
2399

2400
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,732,179✔
2401
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,732,179✔
2402
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
5,465,337✔
2403
      tb_uid_t uid = 0;
3,733,158✔
2404
      if (!colDataIsNull_s(pUidCol, i)) {
7,466,316✔
2405
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
3,733,158✔
2406
        QUERY_CHECK_CODE(code, line, _return);
3,733,158✔
2407
      }
2408
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
3,733,158✔
2409
      QUERY_CHECK_CODE(code, line, _return);
3,733,158✔
2410
    }
2411
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
1,732,179✔
2412
    QUERY_CHECK_CODE(code, line, _return);
1,732,179✔
2413
  }
2414

2415
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
4,691,275✔
2416
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
4,087,015✔
2417
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
4,087,015✔
2418
    tb_uid_t uid = 0;
4,087,015✔
2419
    int32_t  vgId = 0;
4,087,015✔
2420
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
4,087,015✔
2421
    QUERY_CHECK_CODE(code, line, _return);
4,087,015✔
2422

2423
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
4,087,015✔
2424
    if (hasPartition) {
4,087,015✔
2425
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
3,095,078✔
2426
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
3,095,078✔
2427

2428
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
3,095,078✔
2429
      if (pHashIter) {
3,095,078✔
2430
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
1,602,179✔
2431
      } else {
2432
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
1,492,899✔
2433
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
1,492,899✔
2434
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
1,492,899✔
2435
        QUERY_CHECK_CODE(code, line, _return);
1,492,899✔
2436
      }
2437
    } else {
2438
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
991,937✔
2439
    }
2440

2441
    size_t len = 0;
4,087,015✔
2442
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
4,087,015✔
2443
    while (pOrgTbInfo != NULL) {
9,163,245✔
2444
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
5,076,230✔
2445
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
5,076,230✔
2446
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
5,076,230✔
2447
      if (!pIter) {
5,076,230✔
2448
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
2,563,995✔
2449
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,563,995✔
2450
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
5,127,990✔
2451
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
2,563,995✔
2452
        QUERY_CHECK_CODE(code, line, _return);
2,563,995✔
2453
      } else {
2454
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
2,512,235✔
2455
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,512,235✔
2456
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
2,512,235✔
2457
      }
2458

2459
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
5,076,230✔
2460

2461
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
5,076,230✔
2462
      QUERY_CHECK_CODE(code, line, _return);
5,076,230✔
2463
    }
2464
  }
2465
  return code;
604,260✔
UNCOV
2466
_return:
×
2467
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2468
  return code;
×
2469
}
2470

2471
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
2,121,170✔
2472
  int32_t                    code = TSDB_CODE_SUCCESS;
2,121,170✔
2473
  int32_t                    line = 0;
2,121,170✔
2474
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,121,170✔
2475
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,121,170✔
2476
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,121,170✔
2477
  SArray*                    pColRefArray = NULL;
2,121,170✔
2478
  SOperatorInfo*             pSystableScanOp = NULL;
2,121,170✔
2479
  
2480
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,121,170✔
2481
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
2,121,170✔
2482

2483
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
2,121,170✔
2484
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
802,080✔
2485
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
802,080✔
2486
    pSystableScanOp = pOperator->pDownstream[0];
802,080✔
2487
  } else if (pInfo->qType == DYN_QTYPE_VTB_WINDOW) {
1,319,090✔
2488
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
93,808✔
2489
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
93,808✔
2490
    pSystableScanOp = pOperator->pDownstream[1];
93,808✔
2491
  } else {
2492
    pSystableScanOp = pOperator->pDownstream[1];
1,225,282✔
2493
  }
2494

2495
  while (true) {
4,244,596✔
2496
    SSDataBlock *pChildInfo = NULL;
6,365,766✔
2497
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
6,365,766✔
2498
    QUERY_CHECK_CODE(code, line, _return);
6,365,766✔
2499
    if (pChildInfo == NULL) {
6,365,766✔
2500
      break;
2,121,170✔
2501
    }
2502
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
4,244,596✔
2503
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
4,244,596✔
2504
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
4,244,596✔
2505

2506
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
4,244,596✔
2507
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
4,244,596✔
2508
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
4,244,596✔
2509

2510
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
311,203,888✔
2511
      if (!colDataIsNull_s(pStbNameCol, i)) {
613,918,584✔
2512
        char* stbrawname = colDataGetData(pStbNameCol, i);
306,959,292✔
2513
        char* dbrawname = colDataGetData(pDbNameCol, i);
306,959,292✔
2514
        char *ctbName = colDataGetData(pTableNameCol, i);
306,959,292✔
2515

2516
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
306,959,292✔
2517
          SColRefInfo info = {0};
205,575,060✔
2518
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
205,575,060✔
2519
          QUERY_CHECK_CODE(code, line, _return);
205,575,060✔
2520

2521
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
205,575,060✔
2522
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
108,757,028✔
UNCOV
2523
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2524
                     pInfo->vtbScan.dynTbUid);
UNCOV
2525
              destroyColRefInfo(&info);
×
2526
              continue;
×
2527
            }
2528

2529
            if (pTaskInfo->pStreamRuntimeInfo) {
108,757,028✔
2530
              if (pVtbScan->curOrgTbVg == NULL) {
33,920✔
2531
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,280✔
2532
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,280✔
2533
              }
2534

2535
              if (info.colrefName) {
33,920✔
2536
                int32_t vgId;
18,300✔
2537
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
18,300✔
2538
                QUERY_CHECK_CODE(code, line, _return);
18,300✔
2539
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
18,300✔
2540
                QUERY_CHECK_CODE(code, line, _return);
18,300✔
2541
              }
2542
            }
2543
          }
2544

2545
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
205,575,060✔
2546
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
10,726,469✔
2547
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
10,726,469✔
2548
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
21,452,938✔
2549
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
10,726,469✔
2550
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
21,452,938✔
2551
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
10,726,469✔
2552
            QUERY_CHECK_CODE(code, line, _return);
10,726,469✔
2553
          } else {
2554
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
194,848,591✔
2555
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
194,848,591✔
2556
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
194,848,591✔
2557
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
194,848,591✔
2558
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
389,697,182✔
2559
          }
2560
        }
2561
      }
2562
    }
2563
  }
2564

2565
  switch (pInfo->qType) {
2,121,170✔
2566
    case DYN_QTYPE_VTB_WINDOW: {
93,808✔
2567
      code = buildOrgTbInfoBatch(pOperator, false);
93,808✔
2568
      break;
93,808✔
2569
    }
2570
    case DYN_QTYPE_VTB_AGG: {
802,080✔
2571
      if (pVtbScan->batchProcessChild) {
802,080✔
2572
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
510,452✔
2573
      } else {
2574
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
291,628✔
2575
      }
2576
      break;
802,080✔
2577
    }
2578
    case DYN_QTYPE_VTB_SCAN: {
1,225,282✔
2579
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,225,282✔
2580
      break;
1,225,282✔
2581
    }
UNCOV
2582
    default: {
×
2583
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2584
      break;
×
2585
    }
2586
  }
2587

2588
  QUERY_CHECK_CODE(code, line, _return);
2,121,170✔
2589

2590
_return:
2,121,170✔
2591
  if (code) {
2,121,170✔
2592
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,050✔
2593
  }
2594
  return code;
2,121,170✔
2595
}
2596

2597
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
98,697✔
2598
  int32_t                    code = TSDB_CODE_SUCCESS;
98,697✔
2599
  int32_t                    line = 0;
98,697✔
2600
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
98,697✔
2601
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
98,697✔
2602
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
98,697✔
2603
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
98,697✔
2604
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
98,697✔
2605
  int32_t                    rversion = 0;
98,697✔
2606

2607
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
98,697✔
2608
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
98,697✔
2609

2610
  while (true) {
193,805✔
2611
    SSDataBlock *pTableInfo = NULL;
292,502✔
2612
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
292,502✔
2613
    if (pTableInfo == NULL) {
292,502✔
2614
      break;
98,697✔
2615
    }
2616

2617
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
193,805✔
2618
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
193,805✔
2619
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
193,805✔
2620

2621
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
193,805✔
2622
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
193,805✔
2623
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
193,805✔
2624

2625
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
7,165,795✔
2626
      if (!colDataIsNull_s(pRefVerCol, i)) {
13,943,980✔
2627
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
6,971,990✔
2628
      }
2629

2630
      if (!colDataIsNull_s(pTableNameCol, i)) {
13,943,980✔
2631
        char* tbrawname = colDataGetData(pTableNameCol, i);
6,971,990✔
2632
        char* dbrawname = colDataGetData(pDbNameCol, i);
6,971,990✔
2633
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
6,971,990✔
2634
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
6,971,990✔
2635

2636
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
6,971,990✔
2637
          SColRefInfo info = {0};
489,836✔
2638
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
489,836✔
2639
          QUERY_CHECK_CODE(code, line, _return);
489,836✔
2640

2641
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
489,836✔
2642
            if (pVtbScan->curOrgTbVg == NULL) {
6,720✔
2643
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
420✔
2644
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
420✔
2645
            }
2646
            int32_t vgId;
6,720✔
2647
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,720✔
2648
            QUERY_CHECK_CODE(code, line, _return);
6,720✔
2649
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,720✔
2650
            QUERY_CHECK_CODE(code, line, _return);
6,720✔
2651
          }
2652

2653
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
979,672✔
2654
        }
2655
      }
2656
    }
2657
  }
2658
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
98,697✔
2659
  QUERY_CHECK_CODE(code, line, _return);
98,697✔
2660

2661
_return:
97,437✔
2662
  if (code) {
98,697✔
2663
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,260✔
2664
  }
2665
  return code;
98,697✔
2666
}
2667

2668
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,059,260✔
2669
  int32_t                    code = TSDB_CODE_SUCCESS;
2,059,260✔
2670
  int32_t                    line = 0;
2,059,260✔
2671
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,059,260✔
2672
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,059,260✔
2673
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,059,260✔
2674

2675
  SArray *tmpArray = NULL;
2,059,260✔
2676
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,059,260✔
2677
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,059,260✔
2678
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,620✔
2679
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,310✔
2680
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,310✔
2681
      QUERY_CHECK_CODE(code, line, _return);
2,310✔
2682
      if (pVtbScan->newAddedVgInfo == NULL) {
2,310✔
2683
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
840✔
2684
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
840✔
2685
      }
2686
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,310✔
2687
      QUERY_CHECK_CODE(code, line, _return);
2,310✔
2688
    }
2689
    pVtbScan->needRedeploy = false;
2,310✔
2690
  } else {
2691
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,056,950✔
2692
    QUERY_CHECK_CODE(code, line, _return);
2,056,950✔
2693
  }
2694

UNCOV
2695
_return:
×
2696
  taosArrayClear(tmpArray);
2,059,260✔
2697
  taosArrayDestroy(tmpArray);
2,059,260✔
2698
  if (code) {
2,059,260✔
2699
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,056,950✔
2700
  }
2701
  return code;
2,059,260✔
2702
}
2703

2704
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
4,771,099✔
2705
  int32_t                    code = TSDB_CODE_SUCCESS;
4,771,099✔
2706
  int32_t                    line = 0;
4,771,099✔
2707
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,771,099✔
2708
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,771,099✔
2709
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,771,099✔
2710

2711
  pVtbScan->vtbScanParam = NULL;
4,771,099✔
2712
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
4,771,099✔
2713
  QUERY_CHECK_CODE(code, line, _return);
4,771,099✔
2714

2715
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
4,771,099✔
2716
  while (pIter != NULL) {
11,795,447✔
2717
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
7,024,348✔
2718
    SOperatorParam*  pExchangeParam = NULL;
7,024,348✔
2719
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
7,024,348✔
2720
    if (addr != NULL) {
7,024,348✔
2721
      SDownstreamSourceNode newSource = {0};
2,310✔
2722
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,310✔
2723
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,310✔
2724
      newSource.taskId = addr->taskId;
2,310✔
2725
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,310✔
2726
      newSource.localExec = false;
2,310✔
2727
      newSource.addr.nodeId = addr->nodeId;
2,310✔
2728
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,310✔
2729

2730
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,310✔
2731
      QUERY_CHECK_CODE(code, line, _return);
2,310✔
2732
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,310✔
2733
      QUERY_CHECK_CODE(code, line, _return);
2,310✔
2734
    } else {
2735
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
7,022,038✔
2736
      QUERY_CHECK_CODE(code, line, _return);
7,022,038✔
2737
    }
2738
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
14,048,696✔
2739
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
7,024,348✔
2740
  }
2741

2742
  SOperatorParam*  pExchangeParam = NULL;
4,771,099✔
2743
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
4,771,099✔
2744
  QUERY_CHECK_CODE(code, line, _return);
4,771,099✔
2745
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
4,771,099✔
2746

2747
_return:
4,771,099✔
2748
  if (code) {
4,771,099✔
UNCOV
2749
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2750
  }
2751
  return code;
4,771,099✔
2752
}
2753

2754
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
12,756,095✔
2755
  int32_t                    code = TSDB_CODE_SUCCESS;
12,756,095✔
2756
  int32_t                    line = 0;
12,756,095✔
2757
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,756,095✔
2758
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,756,095✔
2759
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
12,756,095✔
2760

2761
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
12,756,095✔
2762
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
12,756,095✔
2763
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
12,756,095✔
2764

2765
  while (true) {
2766
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
16,245,665✔
2767
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
11,474,566✔
2768
      QUERY_CHECK_CODE(code, line, _return);
11,474,566✔
2769
    } else {
2770
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
4,771,099✔
2771
      SArray* pColRefInfo = NULL;
4,771,099✔
2772
      if (pVtbScan->isSuperTable) {
4,771,099✔
2773
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
4,673,662✔
2774
      } else {
2775
        pColRefInfo = pInfo->vtbScan.colRefInfo;
97,437✔
2776
      }
2777
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
4,771,099✔
2778

2779
      tb_uid_t uid = 0;
4,771,099✔
2780
      int32_t  vgId = 0;
4,771,099✔
2781
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
4,771,099✔
2782
      QUERY_CHECK_CODE(code, line, _return);
4,771,099✔
2783

2784
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
4,771,099✔
2785
      QUERY_CHECK_CODE(code, line, _return);
4,771,099✔
2786

2787
      // reset downstream operator's status
2788
      pVtbScanOp->status = OP_NOT_OPENED;
4,771,099✔
2789
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
4,771,099✔
2790
      QUERY_CHECK_CODE(code, line, _return);
4,770,473✔
2791
    }
2792

2793
    if (*pRes) {
16,245,039✔
2794
      // has result, still read data from this table.
2795
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
11,479,058✔
2796
      break;
11,479,058✔
2797
    } else {
2798
      // no result, read next table.
2799
      pVtbScan->curTableIdx++;
4,765,981✔
2800
      if (pVtbScan->isSuperTable) {
4,765,981✔
2801
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
4,668,544✔
2802
          setOperatorCompleted(pOperator);
1,178,974✔
2803
          break;
1,178,974✔
2804
        }
2805
      } else {
2806
        setOperatorCompleted(pOperator);
97,437✔
2807
        break;
97,437✔
2808
      }
2809
    }
2810
  }
2811

2812
_return:
12,755,469✔
2813
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
12,755,469✔
2814
  pVtbScan->otbNameToOtbInfoMap = NULL;
12,755,469✔
2815
  if (code) {
12,755,469✔
UNCOV
2816
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2817
  }
2818
  return code;
12,755,469✔
2819
}
2820

2821
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
12,798,545✔
2822
  int32_t                    code = TSDB_CODE_SUCCESS;
12,798,545✔
2823
  int32_t                    line = 0;
12,798,545✔
2824
  int64_t                    st = 0;
12,798,545✔
2825
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,798,545✔
2826
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,798,545✔
2827

2828
  if (OPTR_IS_OPENED(pOperator)) {
12,798,545✔
2829
    return code;
11,474,566✔
2830
  }
2831

2832
  if (pOperator->cost.openCost == 0) {
1,323,979✔
2833
    st = taosGetTimestampUs();
1,246,867✔
2834
  }
2835

2836
  if (pVtbScan->isSuperTable) {
1,323,979✔
2837
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,225,282✔
2838
    QUERY_CHECK_CODE(code, line, _return);
1,225,282✔
2839
  } else {
2840
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
98,697✔
2841
    QUERY_CHECK_CODE(code, line, _return);
98,697✔
2842
  }
2843

2844
  OPTR_SET_OPENED(pOperator);
1,321,669✔
2845

2846
_return:
1,323,979✔
2847
  if (pOperator->cost.openCost == 0) {
1,323,979✔
2848
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,246,867✔
2849
  }
2850
  if (code) {
1,323,979✔
2851
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,310✔
2852
    pOperator->pTaskInfo->code = code;
2,310✔
2853
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,310✔
2854
  }
2855
  return code;
1,321,669✔
2856
}
2857

2858
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
14,855,495✔
2859
  int32_t                    code = TSDB_CODE_SUCCESS;
14,855,495✔
2860
  int32_t                    line = 0;
14,855,495✔
2861
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
14,855,495✔
2862
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
14,855,495✔
2863

2864
  QRY_PARAM_CHECK(pRes);
14,855,495✔
2865
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
14,855,495✔
UNCOV
2866
    return code;
×
2867
  }
2868
  if (pOperator->pOperatorGetParam) {
14,855,495✔
UNCOV
2869
    if (pOperator->status == OP_EXEC_DONE) {
×
2870
      pOperator->status = OP_OPENED;
×
2871
    }
UNCOV
2872
    pVtbScan->curTableIdx = 0;
×
2873
    pVtbScan->lastTableIdx = -1;
×
2874
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
2875
    pOperator->pOperatorGetParam = NULL;
×
2876
  } else {
2877
    pVtbScan->window.skey = INT64_MAX;
14,855,495✔
2878
    pVtbScan->window.ekey = INT64_MIN;
14,855,495✔
2879
  }
2880

2881
  if (pVtbScan->needRedeploy) {
14,855,495✔
2882
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,059,260✔
2883
    QUERY_CHECK_CODE(code, line, _return);
2,059,260✔
2884
  }
2885

2886
  code = pOperator->fpSet._openFn(pOperator);
12,798,545✔
2887
  QUERY_CHECK_CODE(code, line, _return);
12,796,235✔
2888

2889
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
12,796,235✔
2890
    setOperatorCompleted(pOperator);
40,140✔
2891
    return code;
40,140✔
2892
  }
2893

2894
  code = virtualTableScanGetNext(pOperator, pRes);
12,756,095✔
2895
  QUERY_CHECK_CODE(code, line, _return);
12,755,469✔
2896

2897
  return code;
12,755,469✔
2898

2899
_return:
2,056,950✔
2900
  if (code) {
2,056,950✔
2901
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,056,950✔
2902
    pOperator->pTaskInfo->code = code;
2,056,950✔
2903
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,056,950✔
2904
  }
UNCOV
2905
  return code;
×
2906
}
2907

2908
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,135,705✔
2909
  if (batchFetch) {
1,135,705✔
2910
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,134,601✔
2911
    if (NULL == pPrev->leftHash) {
1,134,601✔
UNCOV
2912
      return terrno;
×
2913
    }
2914
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,134,601✔
2915
    if (NULL == pPrev->rightHash) {
1,134,601✔
UNCOV
2916
      return terrno;
×
2917
    }
2918
  } else {
2919
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,104✔
2920
    if (NULL == pPrev->leftCache) {
1,104✔
UNCOV
2921
      return terrno;
×
2922
    }
2923
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,104✔
2924
    if (NULL == pPrev->rightCache) {
1,104✔
UNCOV
2925
      return terrno;
×
2926
    }
2927
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,104✔
2928
    if (NULL == pPrev->onceTable) {
1,104✔
UNCOV
2929
      return terrno;
×
2930
    }
2931
  }
2932

2933
  return TSDB_CODE_SUCCESS;
1,135,705✔
2934
}
2935

UNCOV
2936
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2937
  if (pStreamRuntimeInfo == NULL) {
×
2938
    return;
×
2939
  }
2940

UNCOV
2941
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2942
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2943
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2944
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2945
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2946

UNCOV
2947
      pVtbScan->dynTbUid = pValue->uid;
×
2948
      break;
×
2949
    }
2950
  }
2951
}
2952

2953
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
2,424,425✔
2954
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2955
  int32_t      code = TSDB_CODE_SUCCESS;
2,424,425✔
2956
  int32_t      line = 0;
2,424,425✔
2957

2958
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
2,424,425✔
2959
  QUERY_CHECK_CODE(code, line, _return);
2,424,425✔
2960

2961
  pInfo->vtbScan.genNewParam = true;
2,424,425✔
2962
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
2,424,425✔
2963
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
2,424,425✔
2964
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
2,424,425✔
2965
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
2,424,425✔
2966
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
2,424,425✔
2967
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
2,424,425✔
2968
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
2,424,425✔
2969
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
2,424,425✔
2970
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
2,424,425✔
2971
  pInfo->vtbScan.needRedeploy = false;
2,424,425✔
2972
  pInfo->vtbScan.pMsgCb = pMsgCb;
2,424,425✔
2973
  pInfo->vtbScan.curTableIdx = 0;
2,424,425✔
2974
  pInfo->vtbScan.lastTableIdx = -1;
2,424,425✔
2975
  pInfo->vtbScan.dynTbUid = 0;
2,424,425✔
2976
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
2,424,425✔
2977
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
2,424,425✔
2978
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
2,424,425✔
2979
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
2,424,425✔
2980
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,424,425✔
2981
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
2,424,425✔
2982
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
8,497,346✔
2983
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
6,072,921✔
2984
    int32_t vgId = (int32_t)valueNode->datum.i;
6,072,921✔
2985
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,072,921✔
2986
    QUERY_CHECK_CODE(code, line, _return);
6,072,921✔
2987
  }
2988

2989
  if (pPhyciNode->dynTbname && pTaskInfo) {
2,424,425✔
UNCOV
2990
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2991
  }
2992

2993
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
2,424,425✔
2994
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
2,424,425✔
2995

2996
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
21,727,234✔
2997
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
19,302,809✔
2998
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
19,302,809✔
2999
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
38,605,618✔
3000
  }
3001

3002
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
2,424,425✔
3003
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
2,424,425✔
3004

3005
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,424,425✔
3006
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
2,424,425✔
3007

3008
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
2,424,425✔
3009
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
2,424,425✔
3010
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
2,424,425✔
3011
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
2,424,425✔
3012
  pInfo->vtbScan.vtbUidTagListMap = NULL;
2,424,425✔
3013
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
2,424,425✔
3014
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
2,424,425✔
3015

3016
  return code;
2,424,425✔
UNCOV
3017
_return:
×
3018
  // no need to destroy array and hashmap allocated in this function,
3019
  // since the operator's destroy function will take care of it
UNCOV
3020
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3021
  return code;
×
3022
}
3023

3024
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
375,478✔
3025
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3026
  int32_t              code = TSDB_CODE_SUCCESS;
375,478✔
3027
  int32_t              line = 0;
375,478✔
3028
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
375,478✔
3029

3030
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
375,478✔
3031
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
375,478✔
3032
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
375,478✔
3033
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
375,478✔
3034
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
375,478✔
3035
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
375,478✔
3036

3037
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
375,478✔
3038
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
375,478✔
3039

3040
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
375,478✔
3041
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
375,478✔
3042

3043
  pInfo->vtbWindow.outputWstartSlotId = -1;
375,478✔
3044
  pInfo->vtbWindow.outputWendSlotId = -1;
375,478✔
3045
  pInfo->vtbWindow.outputWdurationSlotId = -1;
375,478✔
3046
  pInfo->vtbWindow.curWinBatchIdx = 0;
375,478✔
3047

3048
  initResultSizeInfo(&pOperator->resultInfo, 1);
375,478✔
3049
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
375,478✔
3050
  QUERY_CHECK_CODE(code, line, _return);
375,478✔
3051

3052
  return code;
375,478✔
UNCOV
3053
_return:
×
3054
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3055
  return code;
×
3056
}
3057

3058
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
1,877,144✔
3059
  int32_t code = TSDB_CODE_SUCCESS;
1,877,144✔
3060
  int32_t lino = 0;
1,877,144✔
3061

3062
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
1,877,144✔
3063
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
1,877,144✔
3064
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
1,877,144✔
3065

3066
    *ppTsCols = (int64_t*)pColDataInfo->pData;
1,877,144✔
3067

3068
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1,877,144✔
3069
      code = blockDataUpdateTsWindow(pBlock, slotId);
183,136✔
3070
      QUERY_CHECK_CODE(code, lino, _return);
183,136✔
3071
    }
3072
  }
3073

3074
  return code;
1,877,144✔
UNCOV
3075
_return:
×
3076
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3077
  return code;
×
3078
}
3079

3080
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
398,370✔
3081
  int32_t                    code = TSDB_CODE_SUCCESS;
398,370✔
3082
  int32_t                    lino = 0;
398,370✔
3083
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
398,370✔
3084
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
398,370✔
3085
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
398,370✔
3086
  int64_t                    st = 0;
398,370✔
3087

3088
  if (OPTR_IS_OPENED(pOperator)) {
398,370✔
3089
    return code;
22,892✔
3090
  }
3091

3092
  if (pOperator->cost.openCost == 0) {
375,478✔
3093
    st = taosGetTimestampUs();
375,478✔
3094
  }
3095

3096
  while (1) {
938,572✔
3097
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,314,050✔
3098
    if (pBlock == NULL) {
1,314,050✔
3099
      break;
375,478✔
3100
    }
3101

3102
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
938,572✔
3103
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
2,710,768✔
3104
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
2,335,290✔
3105
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
2,335,290✔
3106
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
613,586✔
3107
            pInfo->outputWstartSlotId = i;
228,950✔
3108
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
384,636✔
3109
            pInfo->outputWendSlotId = i;
228,950✔
3110
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
155,686✔
3111
            pInfo->outputWdurationSlotId = i;
155,686✔
3112
          }
3113
        }
3114
      }
3115
    }
3116

3117
    TSKEY* wstartCol = NULL;
938,572✔
3118
    TSKEY* wendCol = NULL;
938,572✔
3119

3120
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
938,572✔
3121
    QUERY_CHECK_CODE(code, lino, _return);
938,572✔
3122
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
938,572✔
3123
    QUERY_CHECK_CODE(code, lino, _return);
938,572✔
3124

3125
    SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
938,572✔
3126
    QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
938,572✔
3127

3128
    QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
938,572✔
3129

3130
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
3131
      SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
2,147,483,647✔
3132
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
2,147,483,647✔
3133
      pWindow->tw.skey = wstartCol[i];
2,147,483,647✔
3134
      pWindow->tw.ekey = wendCol[i] + 1;
2,147,483,647✔
3135
      pWindow->winOutIdx = -1;
2,147,483,647✔
3136
    }
3137

3138
    QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
1,877,144✔
3139
  }
3140

3141
  // handle first window's start key and last window's end key
3142
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
375,478✔
3143
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
375,478✔
3144

3145
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
375,478✔
3146
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
375,478✔
3147

3148
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
375,478✔
3149
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
375,478✔
3150

3151
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
375,478✔
3152
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
375,478✔
3153

3154
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
375,478✔
3155
    lastWin->tw.ekey = INT64_MAX;
93,890✔
3156
  }
3157
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
375,478✔
3158
    firstWin->tw.skey = INT64_MIN;
140,794✔
3159
  }
3160

3161
  if (pInfo->isVstb) {
375,478✔
3162
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
93,808✔
3163
    QUERY_CHECK_CODE(code, lino, _return);
93,808✔
3164
  }
3165

3166
  OPTR_SET_OPENED(pOperator);
375,478✔
3167

3168
  if (pOperator->cost.openCost == 0) {
375,478✔
3169
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
375,478✔
3170
  }
3171

UNCOV
3172
_return:
×
3173
  if (code != TSDB_CODE_SUCCESS) {
375,478✔
UNCOV
3174
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3175
    pTaskInfo->code = code;
×
3176
    T_LONG_JMP(pTaskInfo->env, code);
×
3177
  }
3178
  return code;
375,478✔
3179
}
3180

3181
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
100,672✔
3182
  int32_t                       code = TSDB_CODE_SUCCESS;
100,672✔
3183
  int32_t                       lino = 0;
100,672✔
3184
  SExternalWindowOperatorParam* pExtWinOp = NULL;
100,672✔
3185

3186
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
100,672✔
3187
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
100,672✔
3188

3189
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
100,672✔
3190
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
100,672✔
3191

3192
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
100,672✔
3193
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
100,672✔
3194

3195
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
100,672✔
3196
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGetLast(pWins);
100,672✔
3197

3198
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
100,672✔
3199
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
100,672✔
3200

3201
  SOperatorParam* pExchangeParam = NULL;
100,672✔
3202
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pInfo->vtbScan.otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey}, EX_SRC_TYPE_VSTB_WIN_SCAN);
100,672✔
3203
  QUERY_CHECK_CODE(code, lino, _return);
100,672✔
3204

3205
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
201,344✔
3206

3207
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
100,672✔
3208
  (*ppRes)->downstreamIdx = idx;
100,672✔
3209
  (*ppRes)->value = pExtWinOp;
100,672✔
3210
  (*ppRes)->reUse = false;
100,672✔
3211

3212
  return code;
100,672✔
UNCOV
3213
_return:
×
3214
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3215
  if (pExtWinOp) {
×
3216
    if (pExtWinOp->ExtWins) {
×
3217
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3218
    }
UNCOV
3219
    taosMemoryFree(pExtWinOp);
×
3220
  }
UNCOV
3221
  if (*ppRes) {
×
3222
    if ((*ppRes)->pChildren) {
×
3223
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3224
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
3225
        if (pChildParam) {
×
3226
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
3227
          if (pDynParam) {
×
3228
            taosMemoryFree(pDynParam);
×
3229
          }
UNCOV
3230
          taosMemoryFree(pChildParam);
×
3231
        }
3232
      }
UNCOV
3233
      taosArrayDestroy((*ppRes)->pChildren);
×
3234
    }
UNCOV
3235
    taosMemoryFree(*ppRes);
×
3236
    *ppRes = NULL;
×
3237
  }
UNCOV
3238
  return code;
×
3239
}
3240

3241
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
398,370✔
3242
  int32_t                    code = TSDB_CODE_SUCCESS;
398,370✔
3243
  int32_t                    lino = 0;
398,370✔
3244
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
398,370✔
3245
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
398,370✔
3246
  int64_t                    st = taosGetTimestampUs();
398,370✔
3247
  int32_t                    numOfWins = 0;
398,370✔
3248
  SOperatorInfo*             mergeOp = NULL;
398,370✔
3249
  SOperatorInfo*             extWinOp = NULL;
398,370✔
3250
  SOperatorParam*            pMergeParam = NULL;
398,370✔
3251
  SOperatorParam*            pExtWinParam = NULL;
398,370✔
3252
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
398,370✔
3253
  SSDataBlock*               pRes = pInfo->pRes;
398,370✔
3254

3255
  code = pOperator->fpSet._openFn(pOperator);
398,370✔
3256
  QUERY_CHECK_CODE(code, lino, _return);
398,370✔
3257

3258
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
398,370✔
3259
    *ppRes = NULL;
9,158✔
3260
    return code;
9,158✔
3261
  }
3262

3263
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
389,212✔
3264
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
389,212✔
3265

3266
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
389,212✔
3267

3268
  if (pInfo->isVstb) {
389,212✔
3269
    extWinOp = pOperator->pDownstream[2];
100,672✔
3270
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
100,672✔
3271
    QUERY_CHECK_CODE(code, lino, _return);
100,672✔
3272

3273
    SSDataBlock* pExtWinBlock = NULL;
100,672✔
3274
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
100,672✔
3275
    QUERY_CHECK_CODE(code, lino, _return);
100,672✔
3276
    setOperatorCompleted(extWinOp);
100,672✔
3277

3278
    blockDataCleanup(pRes);
100,672✔
3279
    code = blockDataEnsureCapacity(pRes, numOfWins);
100,672✔
3280
    QUERY_CHECK_CODE(code, lino, _return);
100,672✔
3281

3282
    if (pExtWinBlock) {
100,672✔
3283
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
100,672✔
3284
      QUERY_CHECK_CODE(code, lino, _return);
100,672✔
3285

3286
      if (pInfo->curWinBatchIdx == 0) {
100,672✔
3287
        // first batch, get _wstart from pMergedBlock
3288
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
93,808✔
3289
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
93,808✔
3290

3291
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
93,808✔
3292
      }
3293
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
100,672✔
3294
        // last batch, get _wend from pMergedBlock
3295
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
2,288✔
3296
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
2,288✔
3297

3298
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
2,288✔
3299
      }
3300
    }
3301
  } else {
3302
    mergeOp = pOperator->pDownstream[1];
288,540✔
3303
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
288,540✔
3304
    QUERY_CHECK_CODE(code, lino, _return);
288,540✔
3305

3306
    SSDataBlock* pMergedBlock = NULL;
288,540✔
3307
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
288,540✔
3308
    QUERY_CHECK_CODE(code, lino, _return);
288,540✔
3309

3310
    blockDataCleanup(pRes);
288,540✔
3311
    code = blockDataEnsureCapacity(pRes, numOfWins);
288,540✔
3312
    QUERY_CHECK_CODE(code, lino, _return);
288,540✔
3313

3314
    if (pMergedBlock) {
288,540✔
3315
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
288,540✔
3316
      QUERY_CHECK_CODE(code, lino, _return);
288,540✔
3317

3318
      if (pInfo->curWinBatchIdx == 0) {
288,540✔
3319
        // first batch, get _wstart from pMergedBlock
3320
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
281,670✔
3321
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
281,670✔
3322

3323
        firstWin->tw.skey = pMergedBlock->info.window.skey;
281,670✔
3324
      }
3325
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
288,540✔
3326
        // last batch, get _wend from pMergedBlock
3327
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
6,870✔
3328
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
6,870✔
3329

3330
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
6,870✔
3331
      }
3332
    }
3333
  }
3334

3335

3336
  if (pInfo->outputWstartSlotId != -1) {
389,212✔
3337
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
242,684✔
3338
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
242,684✔
3339

3340
    for (int32_t i = 0; i < numOfWins; i++) {
733,472,754✔
3341
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
733,230,070✔
3342
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
733,230,070✔
3343
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
733,230,070✔
3344
      QUERY_CHECK_CODE(code, lino, _return);
733,230,070✔
3345
    }
3346
  }
3347
  if (pInfo->outputWendSlotId != -1) {
389,212✔
3348
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
242,684✔
3349
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
242,684✔
3350

3351
    for (int32_t i = 0; i < numOfWins; i++) {
733,472,754✔
3352
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
733,230,070✔
3353
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
733,230,070✔
3354
      TSKEY ekey = pWindow->tw.ekey - 1;
733,230,070✔
3355
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
733,230,070✔
3356
      QUERY_CHECK_CODE(code, lino, _return);
733,230,070✔
3357
    }
3358
  }
3359
  if (pInfo->outputWdurationSlotId != -1) {
389,212✔
3360
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
169,420✔
3361
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
169,420✔
3362

3363
    for (int32_t i = 0; i < numOfWins; i++) {
508,332,482✔
3364
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
508,163,062✔
3365
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
508,163,062✔
3366
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
508,163,062✔
3367
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
508,163,062✔
3368
      QUERY_CHECK_CODE(code, lino, _return);
508,163,062✔
3369
    }
3370
  }
3371

3372
  pRes->info.rows = numOfWins;
389,212✔
3373
  *ppRes = pRes;
389,212✔
3374
  pInfo->curWinBatchIdx++;
389,212✔
3375

3376
  return code;
389,212✔
3377

UNCOV
3378
_return:
×
3379
  if (code != TSDB_CODE_SUCCESS) {
×
3380
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3381
    pTaskInfo->code = code;
×
3382
    T_LONG_JMP(pTaskInfo->env, code);
×
3383
  }
UNCOV
3384
  return code;
×
3385
}
3386

3387
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,233,106✔
3388
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
2,233,106✔
3389
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
2,234,366✔
3390
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
2,233,526✔
3391

3392
  pOper->status = OP_NOT_OPENED;
2,234,996✔
3393

3394
  switch (pDyn->qType) {
2,234,996✔
3395
    case DYN_QTYPE_STB_HASH:{
747✔
3396
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
747✔
3397
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
747✔
3398
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
747✔
3399
      
3400
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
747✔
3401
      if (TSDB_CODE_SUCCESS != code) {
747✔
UNCOV
3402
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
3403
        return code;
×
3404
      }
3405
      pStbJoin->ctx.prev.pListHead = NULL;
747✔
3406
      pStbJoin->ctx.prev.joinBuild = false;
747✔
3407
      pStbJoin->ctx.prev.pListTail = NULL;
747✔
3408
      pStbJoin->ctx.prev.tableNum = 0;
747✔
3409

3410
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
747✔
3411
      break; 
747✔
3412
    }
3413
    case DYN_QTYPE_VTB_SCAN: {
2,233,619✔
3414
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,233,619✔
3415
      
3416
      if (pVtbScan->otbNameToOtbInfoMap) {
2,233,619✔
UNCOV
3417
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3418
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3419
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3420
      }
3421
      if (pVtbScan->pRsp) {
2,233,199✔
UNCOV
3422
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3423
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3424
      }
3425
      if (pVtbScan->colRefInfo) {
2,233,409✔
3426
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
98,697✔
3427
        pVtbScan->colRefInfo = NULL;
98,697✔
3428
      }
3429
      if (pVtbScan->childTableMap) {
2,233,409✔
3430
        taosHashCleanup(pVtbScan->childTableMap);
5,500✔
3431
        pVtbScan->childTableMap = NULL;
5,500✔
3432
      }
3433
      if (pVtbScan->childTableList) {
2,233,829✔
3434
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,233,199✔
3435
      }
3436
      if (pPhyciNode->dynTbname && pTaskInfo) {
2,234,039✔
UNCOV
3437
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3438
      }
3439
      pVtbScan->curTableIdx = 0;
2,234,249✔
3440
      pVtbScan->lastTableIdx = -1;
2,234,039✔
3441
      break;
2,234,249✔
3442
    }
UNCOV
3443
    case DYN_QTYPE_VTB_WINDOW: {
×
3444
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3445
      if (pVtbWindow->pRes) {
×
3446
        blockDataDestroy(pVtbWindow->pRes);
×
3447
        pVtbWindow->pRes = NULL;
×
3448
      }
UNCOV
3449
      if (pVtbWindow->pWins) {
×
3450
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
3451
        pVtbWindow->pWins = NULL;
×
3452
      }
UNCOV
3453
      pVtbWindow->outputWdurationSlotId = -1;
×
3454
      pVtbWindow->outputWendSlotId = -1;
×
3455
      pVtbWindow->outputWstartSlotId = -1;
×
3456
      pVtbWindow->curWinBatchIdx = 0;
×
3457
      break;
×
3458
    }
UNCOV
3459
    default:
×
3460
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
3461
      break;
×
3462
  }
3463
  return 0;
2,233,526✔
3464
}
3465

3466
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
3,790,093✔
3467
  int32_t                    code = TSDB_CODE_SUCCESS;
3,790,093✔
3468
  int32_t                    line = 0;
3,790,093✔
3469
  int64_t                    st = 0;
3,790,093✔
3470
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,790,093✔
3471

3472
  if (OPTR_IS_OPENED(pOperator)) {
3,790,093✔
3473
    return code;
2,988,013✔
3474
  }
3475

3476
  if (pOperator->cost.openCost == 0) {
802,080✔
3477
    st = taosGetTimestampUs();
802,080✔
3478
  }
3479

3480
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
802,080✔
3481
  QUERY_CHECK_CODE(code, line, _return);
802,080✔
3482
  OPTR_SET_OPENED(pOperator);
802,080✔
3483

3484
_return:
802,080✔
3485
  if (pOperator->cost.openCost == 0) {
802,080✔
3486
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
802,080✔
3487
  }
3488
  if (code) {
802,080✔
UNCOV
3489
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3490
    pOperator->pTaskInfo->code = code;
×
3491
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3492
  }
3493
  return code;
802,080✔
3494
}
3495

3496
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3,790,093✔
3497
  int32_t                    code = TSDB_CODE_SUCCESS;
3,790,093✔
3498
  int32_t                    line = 0;
3,790,093✔
3499
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,790,093✔
3500
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,790,093✔
3501
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
3,790,093✔
3502
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
3,790,093✔
3503
  SOperatorParam*            pAggParam = NULL;
3,790,093✔
3504

3505
  if (pInfo->vtbScan.hasPartition) {
3,790,093✔
3506
    if (pInfo->vtbScan.batchProcessChild) {
3,312,746✔
3507
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,356,310✔
3508
      while (pIter) {
3,206,937✔
3509
        size_t     keyLen = 0;
2,789,595✔
3510
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
2,789,595✔
3511

3512
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
2,789,595✔
3513
        QUERY_CHECK_CODE(code, line, _return);
2,789,595✔
3514

3515
        if (pAggParam) {
2,789,595✔
3516
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
2,306,050✔
3517
          QUERY_CHECK_CODE(code, line, _return);
2,306,050✔
3518
        } else {
3519
          *pRes = NULL;
483,545✔
3520
        }
3521

3522
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
2,789,595✔
3523

3524
        if (*pRes) {
2,789,595✔
3525
          (*pRes)->info.id.groupId = groupid;
938,968✔
3526
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
938,968✔
3527
          QUERY_CHECK_CODE(code, line, _return);
938,968✔
3528
          break;
938,968✔
3529
        }
3530
      }
3531
    } else {
3532
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,956,436✔
3533
      while (pIter) {
2,937,484✔
3534
        size_t     keyLen = 0;
2,698,204✔
3535
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
2,698,204✔
3536
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
2,698,204✔
3537

3538
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
2,698,204✔
3539
        while (pIter2) {
4,851,724✔
3540
          size_t   keyLen2 = 0;
3,870,676✔
3541
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
3,870,676✔
3542
          SArray*  pTagList = *(SArray**)pIter2;
3,870,676✔
3543

3544
          if (pVtbScan->genNewParam) {
3,870,676✔
3545
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
2,153,520✔
3546
            QUERY_CHECK_CODE(code, line, _return);
2,153,520✔
3547
            if (pAggParam) {
2,153,520✔
3548
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
1,643,056✔
3549
              QUERY_CHECK_CODE(code, line, _return);
1,643,056✔
3550
            } else {
3551
              *pRes = NULL;
510,464✔
3552
            }
3553
          } else {
3554
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
1,717,156✔
3555
            QUERY_CHECK_CODE(code, line, _return);
1,717,156✔
3556
          }
3557

3558
          if (*pRes) {
3,870,676✔
3559
            pVtbScan->genNewParam = false;
1,717,156✔
3560
            (*pRes)->info.id.groupId = *groupid;
1,717,156✔
3561
            break;
1,717,156✔
3562
          }
3563
          pVtbScan->genNewParam = true;
2,153,520✔
3564
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
2,153,520✔
3565
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
2,153,520✔
3566
          QUERY_CHECK_CODE(code, line, _return);
2,153,520✔
3567
        }
3568
        if (*pRes) {
2,698,204✔
3569
          break;
1,717,156✔
3570
        }
3571
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
981,048✔
3572
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
981,048✔
3573
        QUERY_CHECK_CODE(code, line, _return);
981,048✔
3574
      }
3575
    }
3576

3577
  } else {
3578
    if (pInfo->vtbScan.batchProcessChild) {
477,347✔
3579
      code = buildAggOperatorParam(pInfo, &pAggParam);
93,110✔
3580
      QUERY_CHECK_CODE(code, line, _return);
93,110✔
3581

3582
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
93,110✔
3583
      QUERY_CHECK_CODE(code, line, _return);
93,110✔
3584
      setOperatorCompleted(pOperator);
93,110✔
3585
    } else {
3586
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
384,237✔
3587
      while (pIter) {
832,909✔
3588
        size_t   keyLen = 0;
780,561✔
3589
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
780,561✔
3590
        SArray*  pTagList = *(SArray**)pIter;
780,561✔
3591

3592
        if (pVtbScan->genNewParam) {
780,561✔
3593
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
448,672✔
3594
          QUERY_CHECK_CODE(code, line, _return);
448,672✔
3595

3596
          if (pAggParam) {
448,672✔
3597
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
321,056✔
3598
            QUERY_CHECK_CODE(code, line, _return);
321,056✔
3599
          } else {
3600
            *pRes = NULL;
127,616✔
3601
          }
3602
        } else {
3603
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
331,889✔
3604
          QUERY_CHECK_CODE(code, line, _return);
331,889✔
3605
        }
3606

3607
        if (*pRes) {
780,561✔
3608
          pVtbScan->genNewParam = false;
331,889✔
3609
          break;
331,889✔
3610
        }
3611
        pVtbScan->genNewParam = true;
448,672✔
3612
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
448,672✔
3613
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
448,672✔
3614
        QUERY_CHECK_CODE(code, line, _return);
448,672✔
3615
      }
3616
    }
3617
  }
3618
_return:
3,790,093✔
3619
  if (code) {
3,790,093✔
UNCOV
3620
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3621
  }
3622
  return code;
3,790,093✔
3623
}
3624

3625
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3,883,203✔
3626
  int32_t                    code = TSDB_CODE_SUCCESS;
3,883,203✔
3627
  int32_t                    line = 0;
3,883,203✔
3628
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,883,203✔
3629
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,883,203✔
3630

3631
  QRY_PARAM_CHECK(pRes);
3,883,203✔
3632
  if (pOperator->status == OP_EXEC_DONE) {
3,883,203✔
3633
    return code;
93,110✔
3634
  }
3635

3636
  code = pOperator->fpSet._openFn(pOperator);
3,790,093✔
3637
  QUERY_CHECK_CODE(code, line, _return);
3,790,093✔
3638

3639
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
3,790,093✔
UNCOV
3640
    setOperatorCompleted(pOperator);
×
3641
    return code;
×
3642
  }
3643

3644
  code = virtualTableAggGetNext(pOperator, pRes);
3,790,093✔
3645
  QUERY_CHECK_CODE(code, line, _return);
3,790,093✔
3646

3647
  return code;
3,790,093✔
3648

UNCOV
3649
_return:
×
3650
  if (code) {
×
3651
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3652
    pOperator->pTaskInfo->code = code;
×
3653
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3654
  }
UNCOV
3655
  return code;
×
3656
}
3657

3658
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,559,383✔
3659
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3660
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3661
  QRY_PARAM_CHECK(pOptrInfo);
3,559,383✔
3662

3663
  int32_t                    code = TSDB_CODE_SUCCESS;
3,559,383✔
3664
  int32_t                    line = 0;
3,559,383✔
3665
  __optr_fn_t                nextFp = NULL;
3,559,383✔
3666
  __optr_open_fn_t           openFp = NULL;
3,559,383✔
3667
  SOperatorInfo*             pOperator = NULL;
3,559,383✔
3668
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
3,559,383✔
3669
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
3,559,383✔
3670

3671
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,559,383✔
3672
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
3,559,383✔
3673

3674
  pOperator->pPhyNode = pPhyciNode;
3,559,383✔
3675
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
3,559,383✔
3676

3677
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
3,559,383✔
3678
  QUERY_CHECK_CODE(code, line, _error);
3,559,383✔
3679

3680
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
3,559,383✔
3681
                  pInfo, pTaskInfo);
3682

3683
  pInfo->qType = pPhyciNode->qType;
3,559,383✔
3684
  switch (pInfo->qType) {
3,559,383✔
3685
    case DYN_QTYPE_STB_HASH:
1,134,958✔
3686
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,134,958✔
3687
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,134,958✔
3688
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,134,958✔
3689
      QUERY_CHECK_CODE(code, line, _error);
1,134,958✔
3690
      nextFp = seqStableJoin;
1,134,958✔
3691
      openFp = optrDummyOpenFn;
1,134,958✔
3692
      break;
1,134,958✔
3693
    case DYN_QTYPE_VTB_SCAN:
1,246,867✔
3694
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,246,867✔
3695
      QUERY_CHECK_CODE(code, line, _error);
1,246,867✔
3696
      nextFp = vtbScanNext;
1,246,867✔
3697
      openFp = vtbScanOpen;
1,246,867✔
3698
      break;
1,246,867✔
3699
    case DYN_QTYPE_VTB_WINDOW:
375,478✔
3700
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
375,478✔
3701
      QUERY_CHECK_CODE(code, line, _error);
375,478✔
3702
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
375,478✔
3703
      QUERY_CHECK_CODE(code, line, _error);
375,478✔
3704
      nextFp = vtbWindowNext;
375,478✔
3705
      openFp = vtbWindowOpen;
375,478✔
3706
      break;
375,478✔
3707
    case DYN_QTYPE_VTB_AGG:
802,080✔
3708
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
802,080✔
3709
      QUERY_CHECK_CODE(code, line, _error);
802,080✔
3710
      nextFp = vtbAggNext;
802,080✔
3711
      openFp = vtbAggOpen;
802,080✔
3712
      break;
802,080✔
UNCOV
3713
    default:
×
3714
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
3715
      code = TSDB_CODE_INVALID_PARA;
×
3716
      goto _error;
×
3717
  }
3718

3719
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
3,559,383✔
3720
                                         NULL, optrDefaultGetNextExtFn, NULL);
3721

3722
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
3,559,383✔
3723
  *pOptrInfo = pOperator;
3,559,383✔
3724
  return TSDB_CODE_SUCCESS;
3,559,383✔
3725

UNCOV
3726
_error:
×
3727
  if (pInfo != NULL) {
×
3728
    destroyDynQueryCtrlOperator(pInfo);
×
3729
  }
UNCOV
3730
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
3731
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
3732
  pTaskInfo->code = code;
×
3733
  return code;
×
3734
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc