• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4943

30 Jan 2026 06:19AM UTC coverage: 66.718% (-0.07%) from 66.788%
#4943

push

travis-ci

web-flow
merge: from main to 3.0 #34453

1122 of 2018 new or added lines in 72 files covered. (55.6%)

823 existing lines in 156 files now uncovered.

204811 of 306978 relevant lines covered (66.72%)

123993567.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.75
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "nodes.h"
18
#include "operator.h"
19
#include "os.h"
20
#include "plannodes.h"
21
#include "query.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tarray.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "thash.h"
28
#include "tmsg.h"
29
#include "trpc.h"
30
#include "ttypes.h"
31
#include "tdataformat.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,140,780✔
41
  SStbJoinTableList* pNext = NULL;
1,140,780✔
42
  
43
  while (pListHead) {
1,141,273✔
44
    taosMemoryFree(pListHead->pLeftVg);
493✔
45
    taosMemoryFree(pListHead->pLeftUid);
493✔
46
    taosMemoryFree(pListHead->pRightVg);
493✔
47
    taosMemoryFree(pListHead->pRightUid);
493✔
48
    pNext = pListHead->pNext;
493✔
49
    taosMemoryFree(pListHead);
493✔
50
    pListHead = pNext;
493✔
51
  }
52
}
1,140,780✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,140,780✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,140,780✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1,140,780✔
60
    if (pStbJoin->ctx.prev.leftHash) {
1,139,679✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,069,101✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,069,101✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1,139,679✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,069,101✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,069,101✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
1,101✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,101✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
1,101✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,101✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
1,101✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,101✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,140,780✔
81
}
1,140,780✔
82

83
void destroyColRefInfo(void *info) {
63,738,411✔
84
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
63,738,411✔
85
  if (pColRefInfo) {
63,738,411✔
86
    taosMemoryFree(pColRefInfo->colName);
63,738,411✔
87
    taosMemoryFree(pColRefInfo->colrefName);
63,738,411✔
88
  }
89
}
63,738,411✔
90

91
void destroyColRefArray(void *info) {
3,953,974✔
92
  SArray *pColRefArray = *(SArray **)info;
3,953,974✔
93
  if (pColRefArray) {
3,953,974✔
94
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
3,953,974✔
95
  }
96
}
3,953,974✔
97

98
void freeUseDbOutput(void* pOutput) {
1,145,954✔
99
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
1,145,954✔
100
  if (NULL == pOutput) {
1,145,954✔
101
    return;
×
102
  }
103

104
  if (pOut->dbVgroup) {
1,145,954✔
105
    freeVgInfo(pOut->dbVgroup);
1,145,954✔
106
  }
107
  taosMemFree(pOut);
1,145,954✔
108
}
109

110
void destroyOtbInfoArray(void *info) {
260,556✔
111
  SArray *pOtbInfoArray = *(SArray **)info;
260,556✔
112
  if (pOtbInfoArray) {
260,556✔
113
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
260,556✔
114
  }
115
}
260,556✔
116

117
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
36,608✔
118
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
36,608✔
119
  if (pOtbVgIdToOtbInfoArrayMap) {
36,608✔
120
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
36,608✔
121
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
36,608✔
122
  }
123
}
36,608✔
124

125
void destroyTagList(void *info) {
18,240✔
126
  SArray *pTagList = *(SArray **)info;
18,240✔
127
  if (pTagList) {
18,240✔
128
    taosArrayDestroyEx(pTagList, destroyTagVal);
18,240✔
129
  }
130
}
18,240✔
131

UNCOV
132
void destroyVtbUidTagListMap(void *info) {
×
UNCOV
133
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
×
UNCOV
134
  if (pVtbUidTagListMap) {
×
UNCOV
135
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
×
UNCOV
136
    taosHashCleanup(pVtbUidTagListMap);
×
137
  }
UNCOV
138
}
×
139

140
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
1,530,988✔
141
  if (pVtbScan->dbName) {
1,530,988✔
142
    taosMemoryFreeClear(pVtbScan->dbName);
1,530,988✔
143
  }
144
  if (pVtbScan->tbName) {
1,530,988✔
145
    taosMemoryFreeClear(pVtbScan->tbName);
1,530,988✔
146
  }
147
  if (pVtbScan->childTableList) {
1,530,988✔
148
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
1,530,988✔
149
  }
150
  if (pVtbScan->colRefInfo) {
1,530,988✔
151
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
152
    pVtbScan->colRefInfo = NULL;
×
153
  }
154
  if (pVtbScan->childTableMap) {
1,530,988✔
155
    taosHashCleanup(pVtbScan->childTableMap);
1,206,684✔
156
  }
157
  if (pVtbScan->readColList) {
1,530,988✔
158
    taosArrayDestroy(pVtbScan->readColList);
1,530,988✔
159
  }
160
  if (pVtbScan->dbVgInfoMap) {
1,530,988✔
161
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
1,530,988✔
162
    taosHashCleanup(pVtbScan->dbVgInfoMap);
1,530,988✔
163
  }
164
  if (pVtbScan->otbNameToOtbInfoMap) {
1,530,988✔
165
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
116,039✔
166
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
116,039✔
167
  }
168
  if (pVtbScan->pRsp) {
1,530,988✔
169
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
170
    taosMemoryFreeClear(pVtbScan->pRsp);
×
171
  }
172
  if (pVtbScan->existOrgTbVg) {
1,530,988✔
173
    taosHashCleanup(pVtbScan->existOrgTbVg);
1,530,988✔
174
  }
175
  if (pVtbScan->curOrgTbVg) {
1,530,988✔
176
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,728✔
177
  }
178
  if (pVtbScan->newAddedVgInfo) {
1,530,988✔
179
    taosHashCleanup(pVtbScan->newAddedVgInfo);
848✔
180
  }
181
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
1,530,988✔
182
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
106,266✔
183
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
106,266✔
184
  }
185
  if (pVtbScan->vtbUidToVgIdMapMap) {
1,530,988✔
186
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,560✔
187
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
4,560✔
188
  }
189
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
1,530,988✔
190
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,592✔
191
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
4,592✔
192
  }
193
  if (pVtbScan->vtbUidTagListMap) {
1,530,988✔
194
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
4,560✔
195
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
4,560✔
196
  }
197
  if (pVtbScan->vtbGroupIdTagListMap) {
1,530,988✔
198
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
4,592✔
199
  }
200
  if (pVtbScan->vtbUidToGroupIdMap) {
1,530,988✔
201
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
4,592✔
202
  }
203
}
1,530,988✔
204

205
void destroyWinArray(void *info) {
942,672✔
206
  SArray *pWinArray = *(SArray **)info;
942,672✔
207
  if (pWinArray) {
942,672✔
208
    taosArrayDestroy(pWinArray);
942,672✔
209
  }
210
}
942,672✔
211

212
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
377,610✔
213
  if (pVtbWindow->pRes) {
377,610✔
214
    blockDataDestroy(pVtbWindow->pRes);
377,610✔
215
  }
216
  if (pVtbWindow->pWins) {
377,610✔
217
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
377,610✔
218
  }
219
}
377,610✔
220

221
static void destroyDynQueryCtrlOperator(void* param) {
2,671,021✔
222
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
2,671,021✔
223

224
  switch (pDyn->qType) {
2,671,021✔
225
    case DYN_QTYPE_STB_HASH:
1,140,033✔
226
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,140,033✔
227
      break;
1,140,033✔
228
    case DYN_QTYPE_VTB_WINDOW:
377,610✔
229
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
377,610✔
230
    case DYN_QTYPE_VTB_AGG:
1,530,988✔
231
    case DYN_QTYPE_VTB_SCAN:
232
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
1,530,988✔
233
      break;
1,530,988✔
234
    default:
×
235
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
236
      break;
×
237
  }
238

239
  taosMemoryFreeClear(param);
2,671,021✔
240
}
2,671,021✔
241

242
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
243
  if (batchFetch) {
7,242,612✔
244
    return true;
7,238,208✔
245
  }
246
  
247
  if (rightTable) {
4,404✔
248
    return pPost->rightCurrUid == pPost->rightNextUid;
2,202✔
249
  }
250

251
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,202✔
252

253
  return (NULL == num) ? false : true;
2,202✔
254
}
255

256
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,621,306✔
257
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,621,306✔
258
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,621,306✔
259
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,621,306✔
260
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,621,306✔
261
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,621,306✔
262
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,621,306✔
263
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,621,306✔
264
  int64_t                    readIdx = pNode->readIdx + 1;
3,621,306✔
265
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,621,306✔
266

267
  pPost->leftCurrUid = *leftUid;
3,621,306✔
268
  pPost->rightCurrUid = *rightUid;
3,621,306✔
269

270
  pPost->leftVgId = *leftVgId;
3,621,306✔
271
  pPost->rightVgId = *rightVgId;
3,621,306✔
272

273
  while (true) {
274
    if (readIdx < pNode->uidNum) {
3,621,306✔
275
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,550,120✔
276
      break;
3,550,120✔
277
    }
278
    
279
    pNode = pNode->pNext;
71,186✔
280
    if (NULL == pNode) {
71,186✔
281
      pPost->rightNextUid = 0;
71,186✔
282
      break;
71,186✔
283
    }
284
    
285
    rightUid = pNode->pRightUid;
×
286
    readIdx = 0;
×
287
  }
288

289
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,242,612✔
290
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,242,612✔
291

292
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,621,306✔
293
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
294
    pStbJoin->execInfo.rightCacheNum++;
×
295
  }  
296

297
  return TSDB_CODE_SUCCESS;
3,621,306✔
298
}
299

300
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
5,497,359✔
301
  int32_t     code = TSDB_CODE_SUCCESS;
5,497,359✔
302
  int32_t     lino = 0;
5,497,359✔
303
  SOrgTbInfo* pTbInfo = NULL;
5,497,359✔
304

305
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
5,497,359✔
306

307
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
5,497,359✔
308
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
5,497,359✔
309

310
  pTbInfo->vgId = pSrc->vgId;
5,497,359✔
311
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
5,497,359✔
312

313
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
5,497,359✔
314
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
5,497,359✔
315

316
  *ppDst = pTbInfo;
5,497,359✔
317

318
  return code;
5,497,359✔
319
_return:
×
320
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
321
  if (pTbInfo) {
×
322
    if (pTbInfo->colMap) {
×
323
      taosArrayDestroy(pTbInfo->colMap);
×
324
    }
325
    taosMemoryFreeClear(pTbInfo);
×
326
  }
327
  return code;
×
328
}
329

330
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
27,360✔
331
  int32_t  code = TSDB_CODE_SUCCESS;
27,360✔
332
  int32_t  lino = 0;
27,360✔
333
  STagVal  tmpTag;
27,360✔
334

335
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
27,360✔
336
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
27,360✔
337

338
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
54,720✔
339
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
27,360✔
340
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
27,360✔
341
    tmpTag.type = pSrcTag->type;
27,360✔
342
    tmpTag.cid = pSrcTag->cid;
27,360✔
343
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
27,360✔
UNCOV
344
      tmpTag.nData = pSrcTag->nData;
×
UNCOV
345
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
×
UNCOV
346
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
×
UNCOV
347
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
×
348
    } else {
349
      tmpTag.i64 = pSrcTag->i64;
27,360✔
350
    }
351

352
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
54,720✔
353
    tmpTag = (STagVal){0};
27,360✔
354
  }
355

356
  return code;
27,360✔
357
_return:
×
358
  if (pBasic->tagList) {
×
359
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
360
    pBasic->tagList = NULL;
×
361
  }
362
  if (tmpTag.pData) {
×
363
    taosMemoryFree(tmpTag.pData);
×
364
  }
365
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
366
  return code;
×
367
}
368

369
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
274,272✔
370
  int32_t     code = TSDB_CODE_SUCCESS;
274,272✔
371
  int32_t     lino = 0;
274,272✔
372
  SOrgTbInfo  batchInfo;
274,272✔
373

374
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
274,272✔
375
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
274,272✔
376

377
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
1,091,430✔
378
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
817,158✔
379
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
817,158✔
380
    batchInfo.vgId = pSrc->vgId;
817,158✔
381
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
817,158✔
382
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
817,158✔
383
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
817,158✔
384
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
1,634,316✔
385
    batchInfo = (SOrgTbInfo){0};
817,158✔
386
  }
387

388
  return code;
274,272✔
389
_return:
×
390
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
391
  if (pBasic->batchOrgTbInfo) {
×
392
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
393
    pBasic->batchOrgTbInfo = NULL;
×
394
  }
395
  if (batchInfo.colMap) {
×
396
    taosArrayDestroy(batchInfo.colMap);
×
397
    batchInfo.colMap = NULL;
×
398
  }
399
  return code;
×
400
}
401

402
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,242,612✔
403
  int32_t code = TSDB_CODE_SUCCESS;
7,242,612✔
404
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,242,612✔
405
  if (NULL == *ppRes) {
7,242,612✔
406
    code = terrno;
×
407
    freeOperatorParam(pChild, OP_GET_PARAM);
×
408
    return code;
×
409
  }
410
  if (pChild) {
7,242,612✔
411
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
145,560✔
412
    if (NULL == (*ppRes)->pChildren) {
145,560✔
413
      code = terrno;
×
414
      freeOperatorParam(pChild, OP_GET_PARAM);
×
415
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
416
      *ppRes = NULL;
×
417
      return code;
×
418
    }
419
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
291,120✔
420
      code = terrno;
×
421
      freeOperatorParam(pChild, OP_GET_PARAM);
×
422
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
423
      *ppRes = NULL;
×
424
      return code;
×
425
    }
426
  } else {
427
    (*ppRes)->pChildren = NULL;
7,097,052✔
428
  }
429

430
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,242,612✔
431
  if (NULL == pGc) {
7,242,612✔
432
    code = terrno;
×
433
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
434
    *ppRes = NULL;
×
435
    return code;
×
436
  }
437

438
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,242,612✔
439
  pGc->downstreamIdx = downstreamIdx;
7,242,612✔
440
  pGc->vgId = vgId;
7,242,612✔
441
  pGc->tbUid = tbUid;
7,242,612✔
442
  pGc->needCache = needCache;
7,242,612✔
443

444
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,242,612✔
445
  (*ppRes)->downstreamIdx = downstreamIdx;
7,242,612✔
446
  (*ppRes)->value = pGc;
7,242,612✔
447
  (*ppRes)->reUse = false;
7,242,612✔
448

449
  return TSDB_CODE_SUCCESS;
7,242,612✔
450
}
451

452

453
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
454
  int32_t code = TSDB_CODE_SUCCESS;
×
455
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
456
  if (NULL == *ppRes) {
×
457
    return terrno;
×
458
  }
459
  (*ppRes)->pChildren = NULL;
×
460

461
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
462
  if (NULL == pGc) {
×
463
    code = terrno;
×
464
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
465
    return code;
×
466
  }
467

468
  pGc->downstreamIdx = downstreamIdx;
×
469
  pGc->vgId = vgId;
×
470
  pGc->tbUid = tbUid;
×
471

472
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
473
  (*ppRes)->downstreamIdx = downstreamIdx;
×
474
  (*ppRes)->value = pGc;
×
475
  (*ppRes)->reUse = false;
×
476

477
  return TSDB_CODE_SUCCESS;
×
478
}
479

480
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
10,325,397✔
481
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
482
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
483
                                               SArray* pOrgTbInfoArray, STimeWindow window,
484
                                               SDownstreamSourceNode* pDownstreamSourceNode,
485
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
486
  int32_t code = TSDB_CODE_SUCCESS;
10,325,397✔
487
  int32_t lino = 0;
10,325,397✔
488

489
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
10,325,397✔
490
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
491

492
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
10,325,397✔
493
  pBasic->srcOpType = srcOpType;
10,325,397✔
494
  pBasic->vgId = vgId;
10,325,397✔
495
  pBasic->groupid = groupId;
10,325,397✔
496
  pBasic->window = window;
10,325,397✔
497
  pBasic->tableSeq = tableSeq;
10,325,397✔
498
  pBasic->type = exchangeType;
10,325,397✔
499
  pBasic->isNewParam = isNewParam;
10,325,397✔
500

501
  if (pDownstreamSourceNode) {
10,325,397✔
502
    pBasic->isNewDeployed = true;
2,332✔
503
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,332✔
504
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,332✔
505
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,332✔
506
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,332✔
507
    pBasic->newDeployedSrc.localExec = false;
2,332✔
508
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,332✔
509
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,332✔
510
  } else {
511
    pBasic->isNewDeployed = false;
10,323,065✔
512
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
10,323,065✔
513
  }
514

515
  if (pUidList) {
10,325,397✔
516
    pBasic->uidList = taosArrayDup(pUidList, NULL);
3,854,442✔
517
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
3,854,442✔
518
  } else {
519
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
6,470,955✔
520
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
6,470,955✔
521
  }
522

523
  if (pOrgTbInfo) {
10,325,397✔
524
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
5,497,359✔
525
    QUERY_CHECK_CODE(code, lino, _return);
5,497,359✔
526
  } else {
527
    pBasic->orgTbInfo = NULL;
4,828,038✔
528
  }
529

530
  if (pTagList) {
10,325,397✔
531
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
27,360✔
532
    QUERY_CHECK_CODE(code, lino, _return);
27,360✔
533
  } else {
534
    pBasic->tagList = NULL;
10,298,037✔
535
  }
536

537
  if (pOrgTbInfoArray) {
10,325,397✔
538
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
274,272✔
539
    QUERY_CHECK_CODE(code, lino, _return);
274,272✔
540
  } else {
541
    pBasic->batchOrgTbInfo = NULL;
10,051,125✔
542
  }
543
  return code;
10,325,397✔
544

545
_return:
×
546
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
547
  freeExchangeGetBasicOperatorParam(pBasic);
×
548
  return code;
×
549
}
550

551
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
9,820,810✔
552
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
553
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
554
                                              SArray* pOrgTbInfoArray, STimeWindow window,
555
                                              SDownstreamSourceNode* pDownstreamSourceNode,
556
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
557

558
  int32_t                      code = TSDB_CODE_SUCCESS;
9,820,810✔
559
  int32_t                      lino = 0;
9,820,810✔
560
  SOperatorParam*              pParam = NULL;
9,820,810✔
561
  SExchangeOperatorParam*      pExc = NULL;
9,820,810✔
562

563
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
9,820,810✔
564
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
9,820,810✔
565

566
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
9,820,810✔
567
  pParam->downstreamIdx = downstreamIdx;
9,820,810✔
568
  pParam->reUse = reUse;
9,820,810✔
569
  pParam->pChildren = NULL;
9,820,810✔
570
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
9,820,810✔
571
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
9,820,810✔
572

573
  pExc = (SExchangeOperatorParam*)pParam->value;
9,820,810✔
574
  pExc->multiParams = false;
9,820,810✔
575

576
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
9,820,810✔
577
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
578
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
579

580
  *ppRes = pParam;
9,820,810✔
581
  return code;
9,820,810✔
582
_return:
×
583
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
584
  if (pParam) {
×
585
    freeOperatorParam(pParam, OP_GET_PARAM);
×
586
  }
587
  return code;
×
588
}
589

590
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,404✔
591
  int32_t code = TSDB_CODE_SUCCESS;
4,404✔
592
  int32_t lino = 0;
4,404✔
593

594
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,404✔
595
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,404✔
596

597
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,404✔
598

599
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,404✔
600
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,404✔
601
  QUERY_CHECK_CODE(code, lino, _return);
4,404✔
602

603
_return:
4,404✔
604
  if (code) {
4,404✔
605
    qError("failed to build exchange operator param, code:%d", code);
×
606
  }
607
  taosArrayDestroy(pUidList);
4,404✔
608
  return code;
4,404✔
609
}
610

611
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
699,324✔
612
  int32_t                   code = TSDB_CODE_SUCCESS;
699,324✔
613
  int32_t                   lino = 0;
699,324✔
614

615
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VTB_WIN_SCAN,
699,324✔
616
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
617
  QUERY_CHECK_CODE(code, lino, _return);
699,324✔
618

619
  return code;
699,324✔
620
_return:
×
621
  qError("failed to build exchange operator param for external window, code:%d", code);
×
622
  return code;
×
623
}
624

625
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
3,619,723✔
626
  int32_t                      code = TSDB_CODE_SUCCESS;
3,619,723✔
627
  int32_t                      lino = 0;
3,619,723✔
628
  SArray*                      pUidList = NULL;
3,619,723✔
629

630
  pUidList = taosArrayInit(1, sizeof(int64_t));
3,619,723✔
631
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
3,619,723✔
632

633
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
3,619,723✔
634

635
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
3,619,723✔
636
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
3,619,723✔
637
  QUERY_CHECK_CODE(code, lino, _return);
3,619,723✔
638

639
_return:
3,619,723✔
640
  if (code) {
3,619,723✔
641
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
642
  }
643
  taosArrayDestroy(pUidList);
3,619,723✔
644
  return code;
3,619,723✔
645
}
646

647
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
5,497,359✔
648
                                                  SDownstreamSourceNode* pNewSource) {
649
  int32_t                      code = TSDB_CODE_SUCCESS;
5,497,359✔
650
  int32_t                      lino = 0;
5,497,359✔
651

652
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
5,497,359✔
653
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
5,497,359✔
654
  QUERY_CHECK_CODE(code, lino, _return);
5,497,359✔
655

656
  return code;
5,497,359✔
657
_return:
×
658
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
659
  return code;
×
660
}
661

662
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
129,680✔
663
  int32_t                       code = TSDB_CODE_SUCCESS;
129,680✔
664
  int32_t                       line = 0;
129,680✔
665
  SOperatorParam*               pParam = NULL;
129,680✔
666
  SExchangeOperatorBatchParam*  pExc = NULL;
129,680✔
667
  SExchangeOperatorBasicParam   basic = {0};
129,680✔
668

669
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
129,680✔
670
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
129,680✔
671

672
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
129,680✔
673
  pParam->downstreamIdx = downstreamIdx;
129,680✔
674
  pParam->reUse = false;
129,680✔
675
  pParam->pChildren = NULL;
129,680✔
676
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
129,680✔
677
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
129,680✔
678

679
  pExc = pParam->value;
129,680✔
680
  pExc->multiParams = true;
129,680✔
681
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
129,680✔
682
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
129,680✔
683

684
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
129,680✔
685

686
  int32_t iter = 0;
129,680✔
687
  void*   p = NULL;
129,680✔
688
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
359,995✔
689
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
230,315✔
690
    SArray*  pUidList = *(SArray**)p;
230,315✔
691

692
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
230,315✔
693
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
694
                                           pUidList, NULL, NULL, NULL,
695
                                           (STimeWindow){0}, NULL, false, false, false);
230,315✔
696
    QUERY_CHECK_CODE(code, line, _return);
230,315✔
697

698
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
230,315✔
699

700
    basic = (SExchangeOperatorBasicParam){0};
230,315✔
701
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
230,315✔
702

703
    // already transferred to batch param, can free here
704
    taosArrayDestroy(pUidList);
230,315✔
705

706
    *(SArray**)p = NULL;
230,315✔
707
  }
708
  *ppRes = pParam;
129,680✔
709

710
  return code;
129,680✔
711
  
712
_return:
×
713
  qError("failed to build batch exchange operator param, code:%d", code);
×
714
  freeOperatorParam(pParam, OP_GET_PARAM);
×
715
  freeExchangeGetBasicOperatorParam(&basic);
×
716
  return code;
×
717
}
718

719
static int32_t buildBatchExchangeOperatorParamForVirtual(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps, STimeWindow window, EExchangeSourceType type) {
154,324✔
720
  int32_t                       code = TSDB_CODE_SUCCESS;
154,324✔
721
  int32_t                       lino = 0;
154,324✔
722
  SOperatorParam*               pParam = NULL;
154,324✔
723
  SExchangeOperatorBatchParam*  pExc = NULL;
154,324✔
724
  SExchangeOperatorBasicParam   basic = {0};
154,324✔
725

726
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
154,324✔
727
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
154,324✔
728

729
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
154,324✔
730
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
154,324✔
731

732
  pExc = pParam->value;
154,324✔
733
  pExc->multiParams = true;
154,324✔
734

735
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
154,324✔
736
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
154,324✔
737
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
154,324✔
738

739
  size_t keyLen = 0;
154,324✔
740
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
154,324✔
741
  while (pIter != NULL) {
428,596✔
742
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
274,272✔
743
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
274,272✔
744

745
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
274,272✔
746
                                           type, *vgId, groupid,
747
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
748
                                           window, NULL, false, true, false);
749
    QUERY_CHECK_CODE(code, lino, _return);
274,272✔
750

751
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
274,272✔
752
    QUERY_CHECK_CODE(code, lino, _return);
274,272✔
753

754
    basic = (SExchangeOperatorBasicParam){0};
274,272✔
755
    pIter = taosHashIterate(pBatchMaps, pIter);
274,272✔
756
  }
757

758
  pParam->pChildren = NULL;
154,324✔
759
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
154,324✔
760
  pParam->downstreamIdx = downstreamIdx;
154,324✔
761
  pParam->reUse = false;
154,324✔
762

763
  *ppRes = pParam;
154,324✔
764
  return code;
154,324✔
765

766
_return:
×
767
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
768
  freeOperatorParam(pParam, OP_GET_PARAM);
×
769
  freeExchangeGetBasicOperatorParam(&basic);
×
770
  return code;
×
771
}
772

773
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,621,306✔
774
  int32_t code = TSDB_CODE_SUCCESS;
3,621,306✔
775
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,621,306✔
776
  if (NULL == *ppRes) {
3,621,306✔
777
    code = terrno;
×
778
    return code;
×
779
  }
780
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,621,306✔
781
  if (NULL == (*ppRes)->pChildren) {
3,621,306✔
782
    code = terrno;
×
783
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
784
    *ppRes = NULL;
×
785
    return code;
×
786
  }
787
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,242,612✔
788
    code = terrno;
×
789
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
790
    *ppRes = NULL;
×
791
    return code;
×
792
  }
793
  *ppChild0 = NULL;
3,621,306✔
794
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,242,612✔
795
    code = terrno;
×
796
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
797
    *ppRes = NULL;
×
798
    return code;
×
799
  }
800
  *ppChild1 = NULL;
3,621,306✔
801
  
802
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,621,306✔
803
  if (NULL == pJoin) {
3,621,306✔
804
    code = terrno;
×
805
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
806
    *ppRes = NULL;
×
807
    return code;
×
808
  }
809

810
  pJoin->initDownstream = initParam;
3,621,306✔
811
  
812
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,621,306✔
813
  (*ppRes)->value = pJoin;
3,621,306✔
814
  (*ppRes)->reUse = false;
3,621,306✔
815

816
  return TSDB_CODE_SUCCESS;
3,621,306✔
817
}
818

819
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
820
  int32_t code = TSDB_CODE_SUCCESS;
×
821
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
822
  if (NULL == *ppRes) {
×
823
    code = terrno;
×
824
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
825
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
826
    return code;
×
827
  }
828
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
829
  if (NULL == *ppRes) {
×
830
    code = terrno;
×
831
    taosMemoryFreeClear(*ppRes);
×
832
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
833
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
834
    return code;
×
835
  }
836
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
837
    code = terrno;
×
838
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
839
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
840
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
841
    *ppRes = NULL;
×
842
    return code;
×
843
  }
844
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
845
    code = terrno;
×
846
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
847
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
848
    *ppRes = NULL;
×
849
    return code;
×
850
  }
851
  
852
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
853
  (*ppRes)->value = NULL;
×
854
  (*ppRes)->reUse = false;
×
855

856
  return TSDB_CODE_SUCCESS;
×
857
}
858

859
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
11,476✔
860
  int32_t code = TSDB_CODE_SUCCESS;
11,476✔
861
  int32_t vgNum = tSimpleHashGetSize(pVg);
11,476✔
862
  if (vgNum <= 0 || vgNum > 1) {
11,476✔
863
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
864
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
865
  }
866

867
  int32_t iter = 0;
11,476✔
868
  void* p = NULL;
11,476✔
869
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
22,952✔
870
    SArray* pUidList = *(SArray**)p;
11,476✔
871

872
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
11,476✔
873
    if (code) {
11,476✔
874
      return code;
×
875
    }
876
    taosArrayDestroy(pUidList);
11,476✔
877
    *(SArray**)p = NULL;
11,476✔
878
  }
879
  
880
  return TSDB_CODE_SUCCESS;
11,476✔
881
}
882

883
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
884
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
885
  if (NULL == pUidList) {
×
886
    return terrno;
×
887
  }
888
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
889
    return terrno;
×
890
  }
891

892
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
893
  taosArrayDestroy(pUidList);
×
894
  if (code) {
×
895
    return code;
×
896
  }
897
  
898
  return TSDB_CODE_SUCCESS;
×
899
}
900

901
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,621,306✔
902
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,621,306✔
903
  SOperatorParam*             pSrcParam0 = NULL;
3,621,306✔
904
  SOperatorParam*             pSrcParam1 = NULL;
3,621,306✔
905
  SOperatorParam*             pGcParam0 = NULL;
3,621,306✔
906
  SOperatorParam*             pGcParam1 = NULL;  
3,621,306✔
907
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,621,306✔
908
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,621,306✔
909
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,621,306✔
910
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,621,306✔
911
  int32_t                     code = TSDB_CODE_SUCCESS;
3,621,306✔
912

913
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,621,306✔
914
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
915

916
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,621,306✔
917
  
918
  if (pInfo->stbJoin.basic.batchFetch) {
3,621,306✔
919
    if (pPrev->leftHash) {
3,619,104✔
920
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
70,578✔
921
      if (TSDB_CODE_SUCCESS == code) {
70,578✔
922
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
70,578✔
923
      }
924
      if (TSDB_CODE_SUCCESS == code) {
70,578✔
925
        tSimpleHashCleanup(pPrev->leftHash);
70,578✔
926
        tSimpleHashCleanup(pPrev->rightHash);
70,578✔
927
        pPrev->leftHash = NULL;
70,578✔
928
        pPrev->rightHash = NULL;
70,578✔
929
      }
930
    }
931
  } else {
932
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,202✔
933
    if (TSDB_CODE_SUCCESS == code) {
2,202✔
934
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,202✔
935
    }
936
  }
937

938
  bool initParam = pSrcParam0 ? true : false;
3,621,306✔
939
  if (TSDB_CODE_SUCCESS == code) {
3,621,306✔
940
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,621,306✔
941
    pSrcParam0 = NULL;
3,621,306✔
942
  }
943
  if (TSDB_CODE_SUCCESS == code) {
3,621,306✔
944
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,621,306✔
945
    pSrcParam1 = NULL;
3,621,306✔
946
  }
947
  if (TSDB_CODE_SUCCESS == code) {
3,621,306✔
948
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,621,306✔
949
  }
950
  if (TSDB_CODE_SUCCESS != code) {
3,621,306✔
951
    if (pSrcParam0) {
×
952
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
953
    }
954
    if (pSrcParam1) {
×
955
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
956
    }
957
    if (pGcParam0) {
×
958
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
959
    }
960
    if (pGcParam1) {
×
961
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
962
    }
963
    if (*ppParam) {
×
964
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
965
      *ppParam = NULL;
×
966
    }
967
  }
968
  
969
  return code;
3,621,306✔
970
}
971

972
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
3,619,723✔
973
  int32_t                   code = TSDB_CODE_SUCCESS;
3,619,723✔
974
  int32_t                   lino = 0;
3,619,723✔
975
  SVTableScanOperatorParam* pVScan = NULL;
3,619,723✔
976
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,619,723✔
977
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
3,619,723✔
978

979
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
3,619,723✔
980
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
3,619,723✔
981

982
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
3,619,723✔
983
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
3,619,723✔
984
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
3,619,723✔
985
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
3,619,723✔
986
  pVScan->uid = uid;
3,619,723✔
987
  pVScan->window = pInfo->vtbScan.window;
3,619,723✔
988

989
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
3,619,723✔
990
  (*ppRes)->downstreamIdx = 0;
3,619,723✔
991
  (*ppRes)->value = pVScan;
3,619,723✔
992
  (*ppRes)->reUse = false;
3,619,723✔
993

994
  return TSDB_CODE_SUCCESS;
3,619,723✔
995
_return:
×
996
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
997
  if (pVScan) {
×
998
    taosArrayDestroy(pVScan->pOpParamArray);
×
999
    taosMemoryFreeClear(pVScan);
×
1000
  }
1001
  if (*ppRes) {
×
1002
    taosArrayDestroy((*ppRes)->pChildren);
×
1003
    taosMemoryFreeClear(*ppRes);
×
1004
  }
1005
  return code;
×
1006
}
1007

1008
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
699,324✔
1009
  int32_t                       code = TSDB_CODE_SUCCESS;
699,324✔
1010
  int32_t                       lino = 0;
699,324✔
1011
  SExternalWindowOperatorParam* pExtWinOp = NULL;
699,324✔
1012

1013
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
699,324✔
1014
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
699,324✔
1015

1016
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
699,324✔
1017
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
699,324✔
1018

1019
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
699,324✔
1020
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
699,324✔
1021

1022
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
699,324✔
1023
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
699,324✔
1024

1025
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
699,324✔
1026
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
699,324✔
1027

1028
  SOperatorParam* pExchangeOperator = NULL;
699,324✔
1029
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
699,324✔
1030
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
699,324✔
1031
  QUERY_CHECK_CODE(code, lino, _return);
699,324✔
1032
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
1,398,648✔
1033

1034
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
699,324✔
1035
  (*ppRes)->downstreamIdx = idx;
699,324✔
1036
  (*ppRes)->value = pExtWinOp;
699,324✔
1037
  (*ppRes)->reUse = false;
699,324✔
1038

1039
  return code;
699,324✔
1040
_return:
×
1041
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1042
  if (pExtWinOp) {
×
1043
    if (pExtWinOp->ExtWins) {
×
1044
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1045
    }
1046
    taosMemoryFree(pExtWinOp);
×
1047
  }
1048
  if (*ppRes) {
×
1049
    if ((*ppRes)->pChildren) {
×
1050
      taosArrayDestroy((*ppRes)->pChildren);
×
1051
    }
1052
    taosMemoryFree(*ppRes);
×
1053
    *ppRes = NULL;
×
1054
  }
1055
  return code;
×
1056
}
1057

1058
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
290,808✔
1059
                                       int32_t numOfDownstream, int32_t numOfWins) {
1060
  int32_t                   code = TSDB_CODE_SUCCESS;
290,808✔
1061
  int32_t                   lino = 0;
290,808✔
1062
  SMergeOperatorParam*      pMergeOp = NULL;
290,808✔
1063

1064
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
290,808✔
1065
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
290,808✔
1066

1067
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
290,808✔
1068
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
290,808✔
1069

1070
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
290,808✔
1071
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
290,808✔
1072

1073
  pMergeOp->winNum = numOfWins;
290,808✔
1074

1075
  for (int32_t i = 0; i < numOfDownstream; i++) {
990,132✔
1076
    SOperatorParam* pExternalWinParam = NULL;
699,324✔
1077
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
699,324✔
1078
    QUERY_CHECK_CODE(code, lino, _return);
699,324✔
1079
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
1,398,648✔
1080
  }
1081

1082
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
290,808✔
1083
  (*ppRes)->downstreamIdx = 0;
290,808✔
1084
  (*ppRes)->value = pMergeOp;
290,808✔
1085
  (*ppRes)->reUse = false;
290,808✔
1086

1087
  return TSDB_CODE_SUCCESS;
290,808✔
1088
_return:
×
1089
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1090
  if (pMergeOp) {
×
1091
    taosMemoryFree(pMergeOp);
×
1092
  }
1093
  if (*ppRes) {
×
1094
    if ((*ppRes)->pChildren) {
×
1095
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1096
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
1097
        if (pChildParam) {
×
1098
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
1099
          if (pExtWinOp) {
×
1100
            if (pExtWinOp->ExtWins) {
×
1101
              taosArrayDestroy(pExtWinOp->ExtWins);
×
1102
            }
1103
            taosMemoryFree(pExtWinOp);
×
1104
          }
1105
          taosMemoryFree(pChildParam);
×
1106
        }
1107
      }
1108
      taosArrayDestroy((*ppRes)->pChildren);
×
1109
    }
1110
    taosMemoryFree(*ppRes);
×
1111
    *ppRes = NULL;
×
1112
  }
1113
  return code;
×
1114
}
1115

1116
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
12,540✔
1117
  int32_t                   code = TSDB_CODE_SUCCESS;
12,540✔
1118
  int32_t                   lino = 0;
12,540✔
1119
  SOperatorParam*           pParam = NULL;
12,540✔
1120
  SOperatorParam*           pExchangeParam = NULL;
12,540✔
1121
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,540✔
1122
  bool                      freeExchange = false;
12,540✔
1123

1124
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
12,540✔
1125
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
12,540✔
1126

1127
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
12,540✔
1128
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
12,540✔
1129

1130
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
12,540✔
1131
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
12,540✔
1132

1133
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
12,540✔
1134
  QUERY_CHECK_CODE(code, lino, _return);
12,540✔
1135

1136
  freeExchange = true;
12,540✔
1137

1138
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
25,080✔
1139

1140
  freeExchange = false;
12,540✔
1141

1142
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
12,540✔
1143
  pParam->downstreamIdx = 0;
12,540✔
1144
  pParam->reUse = false;
12,540✔
1145

1146
  *ppRes = pParam;
12,540✔
1147

1148
  return code;
12,540✔
1149
_return:
×
1150
  if (freeExchange) {
×
1151
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1152
  }
1153
  if (pParam) {
×
1154
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1155
  }
1156
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1157
  return code;
×
1158
}
1159

1160
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
22,960✔
1161
  int32_t                   code = TSDB_CODE_SUCCESS;
22,960✔
1162
  int32_t                   lino = 0;
22,960✔
1163
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
22,960✔
1164
  SOperatorParam*           pParam = NULL;
22,960✔
1165
  SOperatorParam*           pExchangeParam = NULL;
22,960✔
1166
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
22,960✔
1167
  bool                      freeExchange = false;
22,960✔
1168
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
22,960✔
1169

1170
  if (!pIter) {
22,960✔
UNCOV
1171
    *ppRes = NULL;
×
UNCOV
1172
    return code;
×
1173
  }
1174

1175
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
22,960✔
1176

1177
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
22,960✔
1178
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
22,960✔
1179

1180
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
22,960✔
1181
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
22,960✔
1182

1183
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
22,960✔
1184
  QUERY_CHECK_CODE(code, lino, _return);
22,960✔
1185

1186
  freeExchange = true;
22,960✔
1187

1188
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
45,920✔
1189

1190
  freeExchange = false;
22,960✔
1191

1192
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
22,960✔
1193
  pParam->downstreamIdx = 0;
22,960✔
1194
  pParam->value = NULL;
22,960✔
1195
  pParam->reUse = false;
22,960✔
1196

1197
  *ppRes = pParam;
22,960✔
1198

1199
  return code;
22,960✔
1200
_return:
×
1201
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1202
  if (freeExchange) {
×
1203
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1204
  }
1205
  if (pParam) {
×
1206
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1207
  }
1208
  return code;
×
1209
}
1210

1211
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
18,240✔
1212
  int32_t                   code = TSDB_CODE_SUCCESS;
18,240✔
1213
  int32_t                   lino = 0;
18,240✔
1214
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
18,240✔
1215
  SOperatorParam*           pParam = NULL;
18,240✔
1216
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
18,240✔
1217
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
18,240✔
1218

1219
  if (pIter) {
18,240✔
1220
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
18,240✔
1221

1222
    code = buildBatchExchangeOperatorParamForVirtual(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
18,240✔
1223
    QUERY_CHECK_CODE(code, lino, _return);
18,240✔
1224

1225
    *ppRes = pParam;
18,240✔
1226
  } else {
UNCOV
1227
    *ppRes = NULL;
×
1228
  }
1229

1230
  return code;
18,240✔
1231
_return:
×
1232
  if (pParam) {
×
1233
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1234
  }
1235
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1236
  return code;
×
1237
}
1238

1239
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,621,306✔
1240
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,621,306✔
1241
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,621,306✔
1242
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,621,306✔
1243
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,621,306✔
1244
  SOperatorParam*            pParam = NULL;
3,621,306✔
1245
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,621,306✔
1246
  if (TSDB_CODE_SUCCESS != code) {
3,621,306✔
1247
    pOperator->pTaskInfo->code = code;
×
1248
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1249
  }
1250

1251
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,621,306✔
1252
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,621,306✔
1253
  if (*ppRes && (code == 0)) {
3,621,306✔
1254
    code = blockDataCheck(*ppRes);
209,863✔
1255
    if (code) {
209,863✔
1256
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1257
      pOperator->pTaskInfo->code = code;
×
1258
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1259
    }
1260
    pPost->isStarted = true;
209,863✔
1261
    pStbJoin->execInfo.postBlkNum++;
209,863✔
1262
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
209,863✔
1263
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
209,863✔
1264
  } else {
1265
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,411,443✔
1266
  }
1267
}
3,621,306✔
1268

1269

1270
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1271
  SOperatorParam* pGcParam = NULL;
×
1272
  SOperatorParam* pMergeJoinParam = NULL;
×
1273
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1274
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1275
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1276

1277
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1278

1279
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1280
  if (TSDB_CODE_SUCCESS != code) {
×
1281
    return code;
×
1282
  }
1283
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1284
  if (TSDB_CODE_SUCCESS != code) {
×
1285
    return code;
×
1286
  }
1287

1288
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1289
}
1290

1291
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,620,813✔
1292
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,620,813✔
1293
  int32_t code = 0;
3,620,813✔
1294
  
1295
  pPost->isStarted = false;
3,620,813✔
1296
  
1297
  if (pStbJoin->basic.batchFetch) {
3,620,813✔
1298
    return TSDB_CODE_SUCCESS;
3,618,611✔
1299
  }
1300
  
1301
  if (pPost->leftNeedCache) {
2,202✔
1302
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1303
    if (num && --(*num) <= 0) {
×
1304
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1305
      if (code) {
×
1306
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1307
        QRY_ERR_RET(code);
×
1308
      }
1309
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1310
    }
1311
  }
1312
  
1313
  if (!pPost->rightNeedCache) {
2,202✔
1314
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,202✔
1315
    if (NULL != v) {
2,202✔
1316
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1317
      if (code) {
×
1318
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1319
        QRY_ERR_RET(code);
×
1320
      }
1321
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1322
    }
1323
  }
1324

1325
  return TSDB_CODE_SUCCESS;
2,202✔
1326
}
1327

1328

1329
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1330
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
281,049✔
1331
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
281,049✔
1332
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
281,049✔
1333

1334
  if (!pPost->isStarted) {
281,049✔
1335
    return TSDB_CODE_SUCCESS;
71,679✔
1336
  }
1337
  
1338
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
209,370✔
1339
  
1340
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
209,370✔
1341
  if (NULL == *ppRes) {
209,370✔
1342
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
209,370✔
1343
    pPrev->pListHead->readIdx++;
209,370✔
1344
  } else {
1345
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1346
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1347
  }
1348

1349
  return TSDB_CODE_SUCCESS;
209,370✔
1350
}
1351

1352
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1353
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,242,152✔
1354
  if (NULL == ppArray) {
7,242,152✔
1355
    SArray* pArray = taosArrayInit(10, valSize);
241,791✔
1356
    if (NULL == pArray) {
241,791✔
1357
      return terrno;
×
1358
    }
1359
    if (NULL == taosArrayPush(pArray, pVal)) {
483,582✔
1360
      taosArrayDestroy(pArray);
×
1361
      return terrno;
×
1362
    }
1363
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
241,791✔
1364
      taosArrayDestroy(pArray);      
×
1365
      return terrno;
×
1366
    }
1367
    return TSDB_CODE_SUCCESS;
241,791✔
1368
  }
1369

1370
  if (NULL == taosArrayPush(*ppArray, pVal)) {
14,000,722✔
1371
    return terrno;
×
1372
  }
1373
  
1374
  return TSDB_CODE_SUCCESS;
7,000,361✔
1375
}
1376

1377
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1378
  int32_t code = TSDB_CODE_SUCCESS;
2,202✔
1379
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,202✔
1380
  if (NULL == pNum) {
2,202✔
1381
    uint32_t n = 1;
2,202✔
1382
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,202✔
1383
    if (code) {
2,202✔
1384
      return code;
×
1385
    }
1386
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,202✔
1387
    if (code) {
2,202✔
1388
      return code;
×
1389
    }
1390
    return TSDB_CODE_SUCCESS;
2,202✔
1391
  }
1392

1393
  switch (*pNum) {
×
1394
    case 0:
×
1395
      break;
×
1396
    case UINT32_MAX:
×
1397
      *pNum = 0;
×
1398
      break;
×
1399
    default:
×
1400
      if (1 == (*pNum)) {
×
1401
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1402
        if (code) {
×
1403
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1404
          QRY_ERR_RET(code);
×
1405
        }
1406
      }
1407
      (*pNum)++;
×
1408
      break;
×
1409
  }
1410
  
1411
  return TSDB_CODE_SUCCESS;
×
1412
}
1413

1414

1415
static void freeStbJoinTableList(SStbJoinTableList* pList) {
71,186✔
1416
  if (NULL == pList) {
71,186✔
1417
    return;
×
1418
  }
1419
  taosMemoryFree(pList->pLeftVg);
71,186✔
1420
  taosMemoryFree(pList->pLeftUid);
71,186✔
1421
  taosMemoryFree(pList->pRightVg);
71,186✔
1422
  taosMemoryFree(pList->pRightUid);
71,186✔
1423
  taosMemoryFree(pList);
71,186✔
1424
}
1425

1426
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
71,679✔
1427
  int32_t code = TSDB_CODE_SUCCESS;
71,679✔
1428
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
71,679✔
1429
  if (NULL == pNew) {
71,679✔
1430
    return terrno;
×
1431
  }
1432
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
71,679✔
1433
  if (NULL == pNew->pLeftVg) {
71,679✔
1434
    code = terrno;
×
1435
    freeStbJoinTableList(pNew);
×
1436
    return code;
×
1437
  }
1438
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
71,679✔
1439
  if (NULL == pNew->pLeftUid) {
71,679✔
1440
    code = terrno;
×
1441
    freeStbJoinTableList(pNew);
×
1442
    return code;
×
1443
  }
1444
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
71,679✔
1445
  if (NULL == pNew->pRightVg) {
71,679✔
1446
    code = terrno;
×
1447
    freeStbJoinTableList(pNew);
×
1448
    return code;
×
1449
  }
1450
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
71,679✔
1451
  if (NULL == pNew->pRightUid) {
71,679✔
1452
    code = terrno;
×
1453
    freeStbJoinTableList(pNew);
×
1454
    return code;
×
1455
  }
1456

1457
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
71,679✔
1458
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
71,679✔
1459
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
71,679✔
1460
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
71,679✔
1461

1462
  pNew->readIdx = 0;
71,679✔
1463
  pNew->uidNum = rows;
71,679✔
1464
  pNew->pNext = NULL;
71,679✔
1465
  
1466
  if (pCtx->pListTail) {
71,679✔
1467
    pCtx->pListTail->pNext = pNew;
×
1468
    pCtx->pListTail = pNew;
×
1469
  } else {
1470
    pCtx->pListHead = pNew;
71,679✔
1471
    pCtx->pListTail= pNew;
71,679✔
1472
  }
1473

1474
  return TSDB_CODE_SUCCESS;
71,679✔
1475
}
1476

1477
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
71,679✔
1478
  int32_t                    code = TSDB_CODE_SUCCESS;
71,679✔
1479
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
71,679✔
1480
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
71,679✔
1481
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
71,679✔
1482
  if (NULL == pVg0) {
71,679✔
1483
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1484
  }
1485
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
71,679✔
1486
  if (NULL == pVg1) {
71,679✔
1487
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1488
  }
1489
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
71,679✔
1490
  if (NULL == pUid0) {
71,679✔
1491
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1492
  }
1493
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
71,679✔
1494
  if (NULL == pUid1) {
71,679✔
1495
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1496
  }
1497

1498
  if (pStbJoin->basic.batchFetch) {
71,679✔
1499
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,691,654✔
1500
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,621,076✔
1501
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,621,076✔
1502
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,621,076✔
1503
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,621,076✔
1504

1505
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,621,076✔
1506
      if (TSDB_CODE_SUCCESS != code) {
3,621,076✔
1507
        break;
×
1508
      }
1509
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,621,076✔
1510
      if (TSDB_CODE_SUCCESS != code) {
3,621,076✔
1511
        break;
×
1512
      }
1513
    }
1514
  } else {
1515
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,303✔
1516
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,202✔
1517
    
1518
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,202✔
1519
      if (TSDB_CODE_SUCCESS != code) {
2,202✔
1520
        break;
×
1521
      }
1522
    }
1523
  }
1524

1525
  if (TSDB_CODE_SUCCESS == code) {
71,679✔
1526
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
71,679✔
1527
    if (TSDB_CODE_SUCCESS == code) {
71,679✔
1528
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
71,679✔
1529
    }
1530
  }
1531

1532
_return:
×
1533

1534
  if (TSDB_CODE_SUCCESS != code) {
71,679✔
1535
    pOperator->pTaskInfo->code = code;
×
1536
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1537
  }
1538
}
71,679✔
1539

1540

1541
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,140,531✔
1542
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,140,531✔
1543
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,140,531✔
1544

1545
  if (pStbJoin->basic.batchFetch) {
1,140,531✔
1546
    return;
1,139,430✔
1547
  }
1548

1549
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,101✔
1550
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,101✔
1551
    return;
1,101✔
1552
  }
1553

1554
  uint64_t* pUid = NULL;
×
1555
  int32_t iter = 0;
×
1556
  int32_t code = 0;
×
1557
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1558
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1559
    if (code) {
×
1560
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1561
    }
1562
  }
1563

1564
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1565
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1566

1567
/*
1568
  // debug only
1569
  iter = 0;
1570
  uint32_t* num = NULL;
1571
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1572
    A S S E R T(*num > 1);
1573
  }
1574
*/  
1575
}
1576

1577
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,140,531✔
1578
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,140,531✔
1579
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,140,531✔
1580

1581
  while (true) {
71,679✔
1582
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,212,210✔
1583
    if (NULL == pBlock) {
1,212,210✔
1584
      break;
1,140,531✔
1585
    }
1586

1587
    pStbJoin->execInfo.prevBlkNum++;
71,679✔
1588
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
71,679✔
1589
    
1590
    doBuildStbJoinTableHash(pOperator, pBlock);
71,679✔
1591
  }
1592

1593
  postProcessStbJoinTableHash(pOperator);
1,140,531✔
1594

1595
  pStbJoin->ctx.prev.joinBuild = true;
1,140,531✔
1596
}
1,140,531✔
1597

1598
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
281,049✔
1599
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
281,049✔
1600
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
281,049✔
1601
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
281,049✔
1602
  SStbJoinTableList*         pNode = pPrev->pListHead;
281,049✔
1603

1604
  while (pNode) {
3,763,678✔
1605
    if (pNode->readIdx >= pNode->uidNum) {
3,692,492✔
1606
      pPrev->pListHead = pNode->pNext;
71,186✔
1607
      freeStbJoinTableList(pNode);
71,186✔
1608
      pNode = pPrev->pListHead;
71,186✔
1609
      continue;
71,186✔
1610
    }
1611
    
1612
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,621,306✔
1613
    if (*ppRes) {
3,621,306✔
1614
      return TSDB_CODE_SUCCESS;
209,863✔
1615
    }
1616

1617
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,411,443✔
1618
    pPrev->pListHead->readIdx++;
3,411,443✔
1619
  }
1620

1621
  *ppRes = NULL;
71,186✔
1622
  setOperatorCompleted(pOperator);
71,186✔
1623

1624
  return TSDB_CODE_SUCCESS;
71,186✔
1625
}
1626

1627
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,349,901✔
1628
  if (pBlock) {
1,349,901✔
1629
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
209,863✔
1630
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
209,863✔
1631
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
209,863✔
1632

1633
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
212,065✔
1634
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,202✔
1635
        if (pSlot == NULL) {
2,202✔
1636
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1637
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1638
        }
1639
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,202✔
1640
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,202✔
1641
        if (code != TSDB_CODE_SUCCESS) {
2,202✔
1642
          return code;
×
1643
        }
1644
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,202✔
1645
        if (code != TSDB_CODE_SUCCESS) {
2,202✔
1646
          return code;
×
1647
        }
1648
      }
1649
    } else {
1650
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1651
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1652
    }
1653
  }
1654
  return TSDB_CODE_SUCCESS;
1,349,901✔
1655
}
1656

1657
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,376,327✔
1658
  int32_t                    code = TSDB_CODE_SUCCESS;
1,376,327✔
1659
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,376,327✔
1660
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,376,327✔
1661

1662
  QRY_PARAM_CHECK(pRes);
1,376,327✔
1663
  if (pOperator->status == OP_EXEC_DONE) {
1,376,327✔
1664
    return code;
26,426✔
1665
  }
1666

1667
  int64_t st = 0;
1,349,901✔
1668
  if (pOperator->cost.openCost == 0) {
1,349,901✔
1669
    st = taosGetTimestampUs();
1,140,033✔
1670
  }
1671

1672
  if (!pStbJoin->ctx.prev.joinBuild) {
1,349,901✔
1673
    buildStbJoinTableList(pOperator);
1,140,531✔
1674
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,140,531✔
1675
      setOperatorCompleted(pOperator);
1,068,852✔
1676
      goto _return;
1,068,852✔
1677
    }
1678
  }
1679

1680
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
281,049✔
1681
  if (*pRes) {
281,049✔
1682
    goto _return;
×
1683
  }
1684

1685
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
281,049✔
1686

1687
_return:
281,049✔
1688
  if (pOperator->cost.openCost == 0) {
1,349,901✔
1689
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,140,033✔
1690
  }
1691

1692
  if (code) {
1,349,901✔
1693
    qError("%s failed since %s", __func__, tstrerror(code));
×
1694
    pOperator->pTaskInfo->code = code;
×
1695
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1696
  } else {
1697
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,349,901✔
1698
  }
1699
  return code;
1,349,901✔
1700
}
1701

1702
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,145,954✔
1703
  int32_t                    lino = 0;
1,145,954✔
1704
  SOperatorInfo*             operator=(SOperatorInfo*) param;
1,145,954✔
1705
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
1,145,954✔
1706

1707
  if (TSDB_CODE_SUCCESS != code) {
1,145,954✔
1708
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1709
    if (operator->pTaskInfo->code != code) {
×
1710
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1711
             tstrerror(operator->pTaskInfo->code));
1712
    } else {
1713
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1714
    }
1715
    goto _return;
×
1716
  }
1717

1718
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
1,145,954✔
1719
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
1,145,954✔
1720

1721
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
1,145,954✔
1722
  QUERY_CHECK_CODE(code, lino, _return);
1,145,954✔
1723

1724
  taosMemoryFreeClear(pMsg->pData);
1,145,954✔
1725

1726
  code = tsem_post(&pScanResInfo->vtbScan.ready);
1,145,954✔
1727
  QUERY_CHECK_CODE(code, lino, _return);
1,145,954✔
1728

1729
  return code;
1,145,954✔
1730
_return:
×
1731
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1732
  return code;
×
1733
}
1734

1735
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
1,145,954✔
1736
  int32_t                    code = TSDB_CODE_SUCCESS;
1,145,954✔
1737
  int32_t                    lino = 0;
1,145,954✔
1738
  char*                      buf1 = NULL;
1,145,954✔
1739
  SUseDbReq*                 pReq = NULL;
1,145,954✔
1740
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
1,145,954✔
1741

1742
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
1,145,954✔
1743
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
1,145,954✔
1744
  code = tNameGetFullDbName(name, pReq->db);
1,145,954✔
1745
  QUERY_CHECK_CODE(code, lino, _return);
1,145,954✔
1746
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
1,145,954✔
1747
  buf1 = taosMemoryCalloc(1, contLen);
1,145,954✔
1748
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
1,145,954✔
1749
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
1,145,954✔
1750
  if (tempRes < 0) {
1,145,954✔
1751
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1752
  }
1753

1754
  // send the fetch remote task result request
1755
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,145,954✔
1756
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
1,145,954✔
1757

1758
  pMsgSendInfo->param = pOperator;
1,145,954✔
1759
  pMsgSendInfo->msgInfo.pData = buf1;
1,145,954✔
1760
  pMsgSendInfo->msgInfo.len = contLen;
1,145,954✔
1761
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
1,145,954✔
1762
  pMsgSendInfo->fp = dynProcessUseDbRsp;
1,145,954✔
1763
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
1,145,954✔
1764

1765
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
1,145,954✔
1766
  QUERY_CHECK_CODE(code, lino, _return);
1,145,954✔
1767

1768
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
1,145,954✔
1769
  QUERY_CHECK_CODE(code, lino, _return);
1,145,954✔
1770

1771
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
1,145,954✔
1772
  QUERY_CHECK_CODE(code, lino, _return);
1,145,954✔
1773

1774
_return:
1,145,954✔
1775
  if (code) {
1,145,954✔
1776
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1777
     taosMemoryFree(buf1);
×
1778
  }
1779
  taosMemoryFree(pReq);
1,145,954✔
1780
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
1,145,954✔
1781
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
1,145,954✔
1782
  return code;
1,145,954✔
1783
}
1784

1785
int dynVgInfoComp(const void* lp, const void* rp) {
2,282,468✔
1786
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
2,282,468✔
1787
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
2,282,468✔
1788
  if (pLeft->hashBegin < pRight->hashBegin) {
2,282,468✔
1789
    return -1;
2,282,468✔
1790
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1791
    return 1;
×
1792
  }
1793

1794
  return 0;
×
1795
}
1796

1797
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
6,278,195✔
1798
  if (NULL == dbInfo) {
6,278,195✔
1799
    return TSDB_CODE_SUCCESS;
×
1800
  }
1801

1802
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
6,278,195✔
1803
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
1,145,954✔
1804
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
1,145,954✔
1805
    if (NULL == dbInfo->vgArray) {
1,145,954✔
1806
      return terrno;
×
1807
    }
1808

1809
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
1,145,954✔
1810
    while (pIter) {
3,433,142✔
1811
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
4,574,376✔
1812
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1813
        return terrno;
×
1814
      }
1815

1816
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
2,287,188✔
1817
    }
1818

1819
    taosArraySort(dbInfo->vgArray, sort_func);
1,145,954✔
1820
  }
1821

1822
  return TSDB_CODE_SUCCESS;
6,278,195✔
1823
}
1824

1825
int32_t dynHashValueComp(void const* lp, void const* rp) {
9,750,766✔
1826
  uint32_t*    key = (uint32_t*)lp;
9,750,766✔
1827
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
9,750,766✔
1828

1829
  if (*key < pVg->hashBegin) {
9,750,766✔
1830
    return -1;
×
1831
  } else if (*key > pVg->hashEnd) {
9,750,766✔
1832
    return 1;
3,472,571✔
1833
  }
1834

1835
  return 0;
6,278,195✔
1836
}
1837

1838
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
6,278,195✔
1839
  int32_t code = 0;
6,278,195✔
1840
  int32_t lino = 0;
6,278,195✔
1841
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
6,278,195✔
1842
  QUERY_CHECK_CODE(code, lino, _return);
6,278,195✔
1843

1844
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
6,278,195✔
1845
  if (vgNum <= 0) {
6,278,195✔
1846
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1847
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1848
  }
1849

1850
  SVgroupInfo* vgInfo = NULL;
6,278,195✔
1851
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
6,278,195✔
1852
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
6,278,195✔
1853
  int32_t offset = (int32_t)strlen(tbFullName);
6,278,195✔
1854

1855
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
6,278,195✔
1856
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
12,556,390✔
1857
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
6,278,195✔
1858

1859
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
6,278,195✔
1860
  if (NULL == vgInfo) {
6,278,195✔
1861
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1862
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1863
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1864
  }
1865

1866
  *vgId = vgInfo->vgId;
6,278,195✔
1867

1868
_return:
6,278,195✔
1869
  return code;
6,278,195✔
1870
}
1871

1872
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
38,969,802✔
1873
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
38,969,802✔
1874
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
38,969,802✔
1875
  SArray *                   pColList = pVtbScan->readColList;
38,969,802✔
1876
  if (pVtbScan->scanAllCols) {
38,969,802✔
1877
    return true;
4,825,437✔
1878
  }
1879
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
184,981,997✔
1880
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
163,682,400✔
1881
      return true;
12,844,768✔
1882
    }
1883
  }
1884
  return false;
21,299,597✔
1885
}
1886

1887
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
17,695,605✔
1888
  int32_t                    code = TSDB_CODE_SUCCESS;
17,695,605✔
1889
  int32_t                    line = 0;
17,695,605✔
1890
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
17,695,605✔
1891
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
17,695,605✔
1892
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
17,695,605✔
1893
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
17,695,605✔
1894
  SUseDbOutput*              output = NULL;
17,695,605✔
1895
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
17,695,605✔
1896

1897
  QRY_PARAM_CHECK(dbVgInfo);
17,695,605✔
1898

1899
  if (find == NULL) {
17,695,605✔
1900
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
1,145,954✔
1901
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
1,145,954✔
1902
    QUERY_CHECK_CODE(code, line, _return);
1,145,954✔
1903
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
1,145,954✔
1904
    QUERY_CHECK_CODE(code, line, _return);
1,145,954✔
1905
  } else {
1906
    output = *find;
16,549,651✔
1907
  }
1908

1909
  *dbVgInfo = output->dbVgroup;
17,695,605✔
1910
  return code;
17,695,605✔
1911
_return:
×
1912
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1913
  freeUseDbOutput(output);
×
1914
  return code;
×
1915
}
1916

1917
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
17,695,605✔
1918
  int32_t     code = TSDB_CODE_SUCCESS;
17,695,605✔
1919
  int32_t     line = 0;
17,695,605✔
1920

1921
  const char *first_dot = strchr(colref, '.');
17,695,605✔
1922
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
17,695,605✔
1923

1924
  const char *second_dot = strchr(first_dot + 1, '.');
17,695,605✔
1925
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
17,695,605✔
1926

1927
  size_t db_len = first_dot - colref;
17,695,605✔
1928
  size_t table_len = second_dot - first_dot - 1;
17,695,605✔
1929
  size_t col_len = strlen(second_dot + 1);
17,695,605✔
1930

1931
  *refDb = taosMemoryMalloc(db_len + 1);
17,695,605✔
1932
  *refTb = taosMemoryMalloc(table_len + 1);
17,695,605✔
1933
  *refCol = taosMemoryMalloc(col_len + 1);
17,695,605✔
1934
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
17,695,605✔
1935
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
17,695,605✔
1936
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
17,695,605✔
1937

1938
  tstrncpy(*refDb, colref, db_len + 1);
17,695,605✔
1939
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
17,695,605✔
1940
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
17,695,605✔
1941

1942
  return TSDB_CODE_SUCCESS;
17,695,605✔
1943
_return:
×
1944
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1945
  if (*refDb) {
×
1946
    taosMemoryFree(*refDb);
×
1947
    *refDb = NULL;
×
1948
  }
1949
  if (*refTb) {
×
1950
    taosMemoryFree(*refTb);
×
1951
    *refTb = NULL;
×
1952
  }
1953
  if (*refCol) {
×
1954
    taosMemoryFree(*refCol);
×
1955
    *refCol = NULL;
×
1956
  }
1957
  return code;
×
1958
}
1959

1960
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
130,213,183✔
1961
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
130,213,183✔
1962
      strlen(expectTbName) == varDataLen(tbName) &&
63,738,411✔
1963
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
63,738,411✔
1964
      strlen(expectDbName) == varDataLen(dbName)) {
63,738,411✔
1965
    return true;
63,738,411✔
1966
  }
1967
  return false;
66,474,772✔
1968
}
1969

1970
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
63,738,411✔
1971
  int32_t          code = TSDB_CODE_SUCCESS;
63,738,411✔
1972
  int32_t          line = 0;
63,738,411✔
1973

1974
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
63,738,411✔
1975
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
63,738,411✔
1976
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
63,738,411✔
1977
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
63,738,411✔
1978
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
63,738,411✔
1979
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
63,738,411✔
1980

1981
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
63,738,411✔
1982
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
63,738,411✔
1983
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
63,738,411✔
1984
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
63,738,411✔
1985
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
63,738,411✔
1986
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
63,738,411✔
1987

1988
  if (colDataIsNull_s(pRefCol, index)) {
127,476,822✔
1989
    pInfo->colrefName = NULL;
24,762,461✔
1990
  } else {
1991
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
38,975,950✔
1992
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
38,975,950✔
1993
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
38,975,950✔
1994
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
38,975,950✔
1995
  }
1996

1997
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
63,738,411✔
1998
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
63,738,411✔
1999
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
63,738,411✔
2000
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
63,738,411✔
2001

2002
  if (!colDataIsNull_s(pUidCol, index)) {
127,476,822✔
2003
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
63,738,411✔
2004
  }
2005
  if (!colDataIsNull_s(pColIdCol, index)) {
127,476,822✔
2006
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
38,975,950✔
2007
  }
2008
  if (!colDataIsNull_s(pVgIdCol, index)) {
127,476,822✔
2009
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
63,738,411✔
2010
  }
2011

2012
_return:
×
2013
  return code;
63,738,411✔
2014
}
2015

2016
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,227,231✔
2017
  int32_t                    code = TSDB_CODE_SUCCESS;
1,227,231✔
2018
  int32_t                    line = 0;
1,227,231✔
2019

2020
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,227,231✔
2021
    return code;
1,091,266✔
2022
  }
2023

2024
  if (pVtbScan->existOrgTbVg == NULL) {
135,965✔
2025
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2026
    pVtbScan->curOrgTbVg = NULL;
×
2027
  }
2028

2029
  if (pVtbScan->curOrgTbVg != NULL) {
135,965✔
2030
    // which means rversion has changed
2031
    void*   pCurIter = NULL;
9,816✔
2032
    SArray* tmpArray = NULL;
9,816✔
2033
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
28,324✔
2034
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
18,508✔
2035
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
18,508✔
2036
        if (tmpArray == NULL) {
2,332✔
2037
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,332✔
2038
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,332✔
2039
        }
2040
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,332✔
2041
      }
2042
    }
2043
    if (tmpArray == NULL) {
9,816✔
2044
      return TSDB_CODE_SUCCESS;
7,484✔
2045
    }
2046
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,332✔
2047
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,332✔
2048
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,332✔
2049
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2050
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2051
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2052
        }
2053
        taosArrayDestroy(expiredInfo);
×
2054
      }
2055
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,332✔
2056
        taosArrayDestroy(tmpArray);
×
2057
      }
2058
    }
2059
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,332✔
2060
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,332✔
2061
    taosHashClear(pVtbScan->curOrgTbVg);
2,332✔
2062
    pVtbScan->needRedeploy = true;
2,332✔
2063
    pVtbScan->rversion = rversion;
2,332✔
2064
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,332✔
2065
  }
2066
  return code;
126,149✔
2067
_return:
×
2068
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2069
  return code;
×
2070
}
2071

2072
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
25,400✔
2073
  int32_t                    code =TSDB_CODE_SUCCESS;
25,400✔
2074
  int32_t                    line = 0;
25,400✔
2075
  char*                      refDbName = NULL;
25,400✔
2076
  char*                      refTbName = NULL;
25,400✔
2077
  char*                      refColName = NULL;
25,400✔
2078
  SDBVgInfo*                 dbVgInfo = NULL;
25,400✔
2079
  SName                      name = {0};
25,400✔
2080
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
25,400✔
2081
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
25,400✔
2082

2083
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
25,400✔
2084
  QUERY_CHECK_CODE(code, line, _return);
25,400✔
2085

2086
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
25,400✔
2087

2088
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
25,400✔
2089
  QUERY_CHECK_CODE(code, line, _return);
25,400✔
2090

2091
  code = tNameGetFullDbName(&name, dbFname);
25,400✔
2092
  QUERY_CHECK_CODE(code, line, _return);
25,400✔
2093

2094
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
25,400✔
2095
  QUERY_CHECK_CODE(code, line, _return);
25,400✔
2096

2097
_return:
25,400✔
2098
  if (code) {
25,400✔
2099
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2100
  }
2101
  taosMemoryFree(refDbName);
25,400✔
2102
  taosMemoryFree(refTbName);
25,400✔
2103
  taosMemoryFree(refColName);
25,400✔
2104
  return code;
25,400✔
2105
}
2106

2107
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
18,240✔
2108
  int32_t code = TSDB_CODE_SUCCESS;
18,240✔
2109
  int32_t line = 0;
18,240✔
2110
  STagVal tagVal = {0};
18,240✔
2111
  // last col is uid
2112

2113
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
18,240✔
2114
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
18,240✔
2115

2116
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
36,480✔
2117
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
18,240✔
2118
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
18,240✔
2119
    tagVal.type = pTagCol->info.type;
18,240✔
2120
    tagVal.cid = pTagCol->info.colId;
18,240✔
2121
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
36,480✔
2122
      char*   pData = colDataGetData(pTagCol, rowIdx);
18,240✔
2123
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
18,240✔
UNCOV
2124
        tagVal.nData = varDataLen(pData);
×
UNCOV
2125
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
×
UNCOV
2126
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
×
UNCOV
2127
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
×
UNCOV
2128
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2129
      } else {
2130
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
18,240✔
2131
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
36,480✔
2132
      }
2133
    } else {
2134
      tagVal.pData = NULL;
×
2135
      tagVal.nData = 0;
×
2136
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2137
    }
2138
    tagVal = (STagVal){0};
18,240✔
2139
  }
2140
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
18,240✔
2141
  QUERY_CHECK_CODE(code, line, _return);
18,240✔
2142

2143
  return code;
18,240✔
2144
_return:
×
2145
  if (tagVal.pData) {
×
2146
    taosMemoryFreeClear(tagVal.pData);
×
2147
  }
2148
  if (pTagList) {
×
2149
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2150
  }
2151
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2152
  return code;
×
2153
}
2154

2155
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
4,081,395✔
2156
  int32_t                    code = TSDB_CODE_SUCCESS;
4,081,395✔
2157
  int32_t                    line = 0;
4,081,395✔
2158
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,081,395✔
2159
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,081,395✔
2160
  SDBVgInfo*                 dbVgInfo = NULL;
4,081,395✔
2161

2162
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
67,807,934✔
2163
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
63,726,539✔
2164
    *uid = pKV->uid;
63,726,539✔
2165
    *vgId = pKV->vgId;
63,726,539✔
2166
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
63,726,539✔
2167
      char*   refDbName = NULL;
17,670,205✔
2168
      char*   refTbName = NULL;
17,670,205✔
2169
      char*   refColName = NULL;
17,670,205✔
2170
      SName   name = {0};
17,670,205✔
2171
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
17,670,205✔
2172
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
17,670,205✔
2173

2174
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
17,670,205✔
2175
      QUERY_CHECK_CODE(code, line, _return);
17,670,205✔
2176

2177
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
17,670,205✔
2178

2179
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
17,670,205✔
2180
      QUERY_CHECK_CODE(code, line, _return);
17,670,205✔
2181
      code = tNameGetFullDbName(&name, dbFname);
17,670,205✔
2182
      QUERY_CHECK_CODE(code, line, _return);
17,670,205✔
2183
      code = tNameGetFullTableName(&name, orgTbFName);
17,670,205✔
2184
      QUERY_CHECK_CODE(code, line, _return);
17,670,205✔
2185

2186
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
17,670,205✔
2187
      if (!pVal) {
17,670,205✔
2188
        SOrgTbInfo orgTbInfo = {0};
6,252,795✔
2189
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
6,252,795✔
2190
        QUERY_CHECK_CODE(code, line, _return);
6,252,795✔
2191
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
6,252,795✔
2192
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
6,252,795✔
2193
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
6,252,795✔
2194
        SColIdNameKV colIdNameKV = {0};
6,252,795✔
2195
        colIdNameKV.colId = pKV->colId;
6,252,795✔
2196
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
6,252,795✔
2197
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
12,505,590✔
2198
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
6,252,795✔
2199
        QUERY_CHECK_CODE(code, line, _return);
6,252,795✔
2200
      } else {
2201
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
11,417,410✔
2202
        SColIdNameKV colIdNameKV = {0};
11,417,410✔
2203
        colIdNameKV.colId = pKV->colId;
11,417,410✔
2204
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
11,417,410✔
2205
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
22,834,820✔
2206
      }
2207
      taosMemoryFree(refDbName);
17,670,205✔
2208
      taosMemoryFree(refTbName);
17,670,205✔
2209
      taosMemoryFree(refColName);
17,670,205✔
2210
    }
2211
  }
2212

2213
_return:
4,081,395✔
2214
  if (code) {
4,081,395✔
2215
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2216
  }
2217
  return code;
4,081,395✔
2218
}
2219

2220
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
4,560✔
2221
  int32_t                    code = TSDB_CODE_SUCCESS;
4,560✔
2222
  int32_t                    line = 0;
4,560✔
2223
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,560✔
2224
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,560✔
2225
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,560✔
2226
  SArray*                    pColRefArray = NULL;
4,560✔
2227
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
4,560✔
2228
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
4,560✔
2229

2230
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,560✔
2231
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
4,560✔
2232
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
4,560✔
2233
  if (hasPartition) {
4,560✔
UNCOV
2234
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2235
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2236
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
×
UNCOV
2237
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
×
UNCOV
2238
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
×
2239
  }
2240

2241
  while (true) {
9,120✔
2242
    SSDataBlock *pTagVal = NULL;
13,680✔
2243
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
13,680✔
2244
    QUERY_CHECK_CODE(code, line, _return);
13,680✔
2245
    if (pTagVal == NULL) {
13,680✔
2246
      break;
4,560✔
2247
    }
2248
    SHashObj *vtbUidTagListMap = NULL;
9,120✔
2249
    if (hasPartition) {
9,120✔
UNCOV
2250
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
UNCOV
2251
      if (pIter) {
×
UNCOV
2252
        vtbUidTagListMap = *(SHashObj**)pIter;
×
2253
      } else {
UNCOV
2254
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2255
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
×
UNCOV
2256
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
×
2257

UNCOV
2258
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
×
UNCOV
2259
        QUERY_CHECK_CODE(code, line, _return);
×
2260
      }
2261
    } else {
2262
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
9,120✔
2263
    }
2264

2265
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
9,120✔
2266
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
9,120✔
2267
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
27,360✔
2268
      tb_uid_t uid = 0;
18,240✔
2269
      if (!colDataIsNull_s(pUidCol, i)) {
36,480✔
2270
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
18,240✔
2271
        QUERY_CHECK_CODE(code, line, _return);
18,240✔
2272
      }
2273

2274
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
18,240✔
2275
      QUERY_CHECK_CODE(code, line, _return);
18,240✔
2276

2277
      if (hasPartition) {
18,240✔
UNCOV
2278
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
UNCOV
2279
        QUERY_CHECK_CODE(code, line, _return);
×
2280
      }
2281
    }
2282
  }
2283

2284
  return code;
4,560✔
2285

2286
_return:
×
2287
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2288
  return code;
×
2289
}
2290

2291
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
4,560✔
2292
  int32_t                    code = TSDB_CODE_SUCCESS;
4,560✔
2293
  int32_t                    line = 0;
4,560✔
2294
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,560✔
2295
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,560✔
2296
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,560✔
2297
  SArray*                    pColRefArray = NULL;
4,560✔
2298
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
4,560✔
2299
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
4,560✔
2300

2301
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,560✔
2302
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
4,560✔
2303

2304
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
22,800✔
2305
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
18,240✔
2306
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
18,240✔
2307

2308
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
18,240✔
2309
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
18,240✔
2310

2311
    tb_uid_t uid = 0;
18,240✔
2312
    int32_t  vgId = 0;
18,240✔
2313
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
18,240✔
2314
    QUERY_CHECK_CODE(code, line, _return);
18,240✔
2315

2316
    size_t len = 0;
18,240✔
2317
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
18,240✔
2318
    while (pOrgTbInfo != NULL) {
59,280✔
2319
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
41,040✔
2320
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
41,040✔
2321

2322
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
41,040✔
2323
      if (!pIter) {
41,040✔
2324
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
27,360✔
2325
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
27,360✔
2326
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
54,720✔
2327
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
27,360✔
2328
        QUERY_CHECK_CODE(code, line, _return);
27,360✔
2329
      } else {
2330
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
13,680✔
2331
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
13,680✔
2332
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
13,680✔
2333
      }
2334

2335
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
41,040✔
2336

2337
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
41,040✔
2338
      QUERY_CHECK_CODE(code, line, _return);
41,040✔
2339
    }
2340

2341
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
18,240✔
2342
    QUERY_CHECK_CODE(code, line, _return);
18,240✔
2343
  }
2344

2345
  return code;
4,560✔
2346
_return:
×
2347
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2348
  return code;
×
2349
}
2350

2351
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
4,560✔
2352
  int32_t                    code = TSDB_CODE_SUCCESS;
4,560✔
2353
  int32_t                    line = 0;
4,560✔
2354

2355
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
4,560✔
2356
  QUERY_CHECK_CODE(code, line, _return);
4,560✔
2357

2358
  // process tag
2359
  code = getTagBlockAndProcess(pOperator, hasPartition);
4,560✔
2360
  QUERY_CHECK_CODE(code, line, _return);
4,560✔
2361

2362
  return code;
4,560✔
2363
_return:
×
2364
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2365
  return code;
×
2366
}
2367

2368
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
110,858✔
2369
  int32_t                    code = TSDB_CODE_SUCCESS;
110,858✔
2370
  int32_t                    line = 0;
110,858✔
2371
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
110,858✔
2372
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
110,858✔
2373
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
110,858✔
2374
  SArray*                    pColRefArray = NULL;
110,858✔
2375
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
110,858✔
2376
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
110,858✔
2377

2378
  if (hasPartition) {
110,858✔
2379
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,592✔
2380
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
4,592✔
2381
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
4,592✔
2382

2383
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
4,592✔
2384
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
4,592✔
2385
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
4,592✔
2386
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,592✔
2387
  } else {
2388
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
106,266✔
2389
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
106,266✔
2390
  }
2391

2392
  while (true && hasPartition) {
129,226✔
2393
    SSDataBlock* pTagVal = NULL;
22,960✔
2394
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
22,960✔
2395
    QUERY_CHECK_CODE(code, line, _return);
22,960✔
2396
    if (pTagVal == NULL) {
22,960✔
2397
      break;
4,592✔
2398
    }
2399

2400
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
18,368✔
2401
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
18,368✔
2402
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
36,736✔
2403
      tb_uid_t uid = 0;
18,368✔
2404
      if (!colDataIsNull_s(pUidCol, i)) {
36,736✔
2405
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
18,368✔
2406
        QUERY_CHECK_CODE(code, line, _return);
18,368✔
2407
      }
2408
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
18,368✔
2409
      QUERY_CHECK_CODE(code, line, _return);
18,368✔
2410
    }
2411
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
18,368✔
2412
    QUERY_CHECK_CODE(code, line, _return);
18,368✔
2413
  }
2414

2415
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
554,290✔
2416
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
443,432✔
2417
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
443,432✔
2418
    tb_uid_t uid = 0;
443,432✔
2419
    int32_t  vgId = 0;
443,432✔
2420
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
443,432✔
2421
    QUERY_CHECK_CODE(code, line, _return);
443,432✔
2422

2423
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
443,432✔
2424
    if (hasPartition) {
443,432✔
2425
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
18,368✔
2426
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
18,368✔
2427

2428
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
18,368✔
2429
      if (pHashIter) {
18,368✔
UNCOV
2430
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
×
2431
      } else {
2432
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
18,368✔
2433
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
18,368✔
2434
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
18,368✔
2435
        QUERY_CHECK_CODE(code, line, _return);
18,368✔
2436
      }
2437
    } else {
2438
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
425,064✔
2439
    }
2440

2441
    size_t len = 0;
443,432✔
2442
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
443,432✔
2443
    while (pOrgTbInfo != NULL) {
1,157,828✔
2444
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
714,396✔
2445
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
714,396✔
2446
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
714,396✔
2447
      if (!pIter) {
714,396✔
2448
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
233,196✔
2449
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
233,196✔
2450
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
466,392✔
2451
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
233,196✔
2452
        QUERY_CHECK_CODE(code, line, _return);
233,196✔
2453
      } else {
2454
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
481,200✔
2455
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
481,200✔
2456
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
481,200✔
2457
      }
2458

2459
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
714,396✔
2460

2461
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
714,396✔
2462
      QUERY_CHECK_CODE(code, line, _return);
714,396✔
2463
    }
2464
  }
2465
  return code;
110,858✔
2466
_return:
×
2467
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2468
  return code;
×
2469
}
2470

2471
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
1,212,260✔
2472
  int32_t                    code = TSDB_CODE_SUCCESS;
1,212,260✔
2473
  int32_t                    line = 0;
1,212,260✔
2474
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,212,260✔
2475
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,212,260✔
2476
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,212,260✔
2477
  SArray*                    pColRefArray = NULL;
1,212,260✔
2478
  SOperatorInfo*             pSystableScanOp = NULL;
1,212,260✔
2479
  
2480
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,212,260✔
2481
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
1,212,260✔
2482

2483
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
1,212,260✔
2484
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
21,692✔
2485
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
21,692✔
2486
    pSystableScanOp = pOperator->pDownstream[0];
21,692✔
2487
  } else if (pInfo->qType == DYN_QTYPE_VTB_WINDOW) {
1,190,568✔
2488
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
93,726✔
2489
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
93,726✔
2490
    pSystableScanOp = pOperator->pDownstream[1];
93,726✔
2491
  } else {
2492
    pSystableScanOp = pOperator->pDownstream[1];
1,096,842✔
2493
  }
2494

2495
  while (true) {
2,426,871✔
2496
    SSDataBlock *pChildInfo = NULL;
3,639,131✔
2497
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
3,639,131✔
2498
    QUERY_CHECK_CODE(code, line, _return);
3,639,131✔
2499
    if (pChildInfo == NULL) {
3,639,131✔
2500
      break;
1,212,260✔
2501
    }
2502
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
2,426,871✔
2503
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
2,426,871✔
2504
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
2,426,871✔
2505

2506
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
2,426,871✔
2507
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
2,426,871✔
2508
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
2,426,871✔
2509

2510
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
123,297,319✔
2511
      if (!colDataIsNull_s(pStbNameCol, i)) {
241,740,896✔
2512
        char* stbrawname = colDataGetData(pStbNameCol, i);
120,870,448✔
2513
        char* dbrawname = colDataGetData(pDbNameCol, i);
120,870,448✔
2514
        char *ctbName = colDataGetData(pTableNameCol, i);
120,870,448✔
2515

2516
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
120,870,448✔
2517
          SColRefInfo info = {0};
63,090,444✔
2518
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
63,090,444✔
2519
          QUERY_CHECK_CODE(code, line, _return);
63,090,444✔
2520

2521
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
63,090,444✔
2522
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
55,703,692✔
2523
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2524
                     pInfo->vtbScan.dynTbUid);
2525
              destroyColRefInfo(&info);
×
2526
              continue;
×
2527
            }
2528

2529
            if (pTaskInfo->pStreamRuntimeInfo) {
55,703,692✔
2530
              if (pVtbScan->curOrgTbVg == NULL) {
34,432✔
2531
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,304✔
2532
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,304✔
2533
              }
2534

2535
              if (info.colrefName) {
34,432✔
2536
                int32_t vgId;
18,616✔
2537
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
18,616✔
2538
                QUERY_CHECK_CODE(code, line, _return);
18,616✔
2539
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
18,616✔
2540
                QUERY_CHECK_CODE(code, line, _return);
18,616✔
2541
              }
2542
            }
2543
          }
2544

2545
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
63,090,444✔
2546
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
3,953,974✔
2547
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
3,953,974✔
2548
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
7,907,948✔
2549
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
3,953,974✔
2550
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
7,907,948✔
2551
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
3,953,974✔
2552
            QUERY_CHECK_CODE(code, line, _return);
3,953,974✔
2553
          } else {
2554
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
59,136,470✔
2555
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
59,136,470✔
2556
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
59,136,470✔
2557
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
59,136,470✔
2558
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
118,272,940✔
2559
          }
2560
        }
2561
      }
2562
    }
2563
  }
2564

2565
  switch (pInfo->qType) {
1,212,260✔
2566
    case DYN_QTYPE_VTB_WINDOW: {
93,726✔
2567
      code = buildOrgTbInfoBatch(pOperator, false);
93,726✔
2568
      break;
93,726✔
2569
    }
2570
    case DYN_QTYPE_VTB_AGG: {
21,692✔
2571
      if (pVtbScan->batchProcessChild) {
21,692✔
2572
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
17,132✔
2573
      } else {
2574
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
4,560✔
2575
      }
2576
      break;
21,692✔
2577
    }
2578
    case DYN_QTYPE_VTB_SCAN: {
1,096,842✔
2579
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,096,842✔
2580
      break;
1,096,842✔
2581
    }
2582
    default: {
×
2583
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2584
      break;
×
2585
    }
2586
  }
2587

2588
  QUERY_CHECK_CODE(code, line, _return);
1,212,260✔
2589

2590
_return:
1,212,260✔
2591
  if (code) {
1,212,260✔
2592
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,060✔
2593
  }
2594
  return code;
1,212,260✔
2595
}
2596

2597
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
130,389✔
2598
  int32_t                    code = TSDB_CODE_SUCCESS;
130,389✔
2599
  int32_t                    line = 0;
130,389✔
2600
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
130,389✔
2601
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
130,389✔
2602
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
130,177✔
2603
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
130,177✔
2604
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
130,177✔
2605
  int32_t                    rversion = 0;
130,177✔
2606

2607
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
130,177✔
2608
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
130,389✔
2609

2610
  while (true) {
257,156✔
2611
    SSDataBlock *pTableInfo = NULL;
387,545✔
2612
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
387,545✔
2613
    if (pTableInfo == NULL) {
387,545✔
2614
      break;
130,389✔
2615
    }
2616

2617
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
257,156✔
2618
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
257,156✔
2619
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
257,156✔
2620

2621
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
257,156✔
2622
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
257,156✔
2623
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
257,156✔
2624

2625
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
9,599,891✔
2626
      if (!colDataIsNull_s(pRefVerCol, i)) {
18,685,470✔
2627
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
9,342,735✔
2628
      }
2629

2630
      if (!colDataIsNull_s(pTableNameCol, i)) {
18,685,470✔
2631
        char* tbrawname = colDataGetData(pTableNameCol, i);
9,342,735✔
2632
        char* dbrawname = colDataGetData(pDbNameCol, i);
9,342,735✔
2633
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
9,342,735✔
2634
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
9,342,735✔
2635

2636
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
9,342,735✔
2637
          SColRefInfo info = {0};
647,967✔
2638
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
647,967✔
2639
          QUERY_CHECK_CODE(code, line, _return);
647,967✔
2640

2641
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
647,967✔
2642
            if (pVtbScan->curOrgTbVg == NULL) {
6,784✔
2643
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
424✔
2644
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
424✔
2645
            }
2646
            int32_t vgId;
6,784✔
2647
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,784✔
2648
            QUERY_CHECK_CODE(code, line, _return);
6,784✔
2649
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,784✔
2650
            QUERY_CHECK_CODE(code, line, _return);
6,784✔
2651
          }
2652

2653
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,295,934✔
2654
        }
2655
      }
2656
    }
2657
  }
2658
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
130,389✔
2659
  QUERY_CHECK_CODE(code, line, _return);
130,389✔
2660

2661
_return:
129,117✔
2662
  if (code) {
130,389✔
2663
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,272✔
2664
  }
2665
  return code;
130,389✔
2666
}
2667

2668
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
855,844✔
2669
  int32_t                    code = TSDB_CODE_SUCCESS;
855,844✔
2670
  int32_t                    line = 0;
855,844✔
2671
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
855,844✔
2672
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
855,844✔
2673
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
856,056✔
2674

2675
  SArray *tmpArray = NULL;
855,844✔
2676
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
855,844✔
2677
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
855,844✔
2678
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,664✔
2679
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,332✔
2680
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,332✔
2681
      QUERY_CHECK_CODE(code, line, _return);
2,332✔
2682
      if (pVtbScan->newAddedVgInfo == NULL) {
2,332✔
2683
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
848✔
2684
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
848✔
2685
      }
2686
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,332✔
2687
      QUERY_CHECK_CODE(code, line, _return);
2,332✔
2688
    }
2689
    pVtbScan->needRedeploy = false;
2,332✔
2690
  } else {
2691
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
853,512✔
2692
    QUERY_CHECK_CODE(code, line, _return);
853,512✔
2693
  }
2694

2695
_return:
×
2696
  taosArrayClear(tmpArray);
855,844✔
2697
  taosArrayDestroy(tmpArray);
855,844✔
2698
  if (code) {
855,844✔
2699
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
853,724✔
2700
  }
2701
  return code;
856,056✔
2702
}
2703

2704
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
3,619,723✔
2705
  int32_t                    code = TSDB_CODE_SUCCESS;
3,619,723✔
2706
  int32_t                    line = 0;
3,619,723✔
2707
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,619,723✔
2708
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,619,723✔
2709
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
3,619,723✔
2710

2711
  pVtbScan->vtbScanParam = NULL;
3,619,723✔
2712
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
3,619,723✔
2713
  QUERY_CHECK_CODE(code, line, _return);
3,619,723✔
2714

2715
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
3,619,723✔
2716
  while (pIter != NULL) {
9,117,082✔
2717
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
5,497,359✔
2718
    SOperatorParam*  pExchangeParam = NULL;
5,497,359✔
2719
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
5,497,359✔
2720
    if (addr != NULL) {
5,497,359✔
2721
      SDownstreamSourceNode newSource = {0};
2,332✔
2722
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,332✔
2723
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,332✔
2724
      newSource.taskId = addr->taskId;
2,332✔
2725
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,332✔
2726
      newSource.localExec = false;
2,332✔
2727
      newSource.addr.nodeId = addr->nodeId;
2,332✔
2728
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,332✔
2729

2730
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,332✔
2731
      QUERY_CHECK_CODE(code, line, _return);
2,332✔
2732
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,332✔
2733
      QUERY_CHECK_CODE(code, line, _return);
2,332✔
2734
    } else {
2735
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
5,495,027✔
2736
      QUERY_CHECK_CODE(code, line, _return);
5,495,027✔
2737
    }
2738
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
10,994,718✔
2739
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
5,497,359✔
2740
  }
2741

2742
  SOperatorParam*  pExchangeParam = NULL;
3,619,723✔
2743
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
3,619,723✔
2744
  QUERY_CHECK_CODE(code, line, _return);
3,619,723✔
2745
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
3,619,723✔
2746

2747
_return:
3,619,723✔
2748
  if (code) {
3,619,723✔
2749
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2750
  }
2751
  return code;
3,619,723✔
2752
}
2753

2754
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
9,828,231✔
2755
  int32_t                    code = TSDB_CODE_SUCCESS;
9,828,231✔
2756
  int32_t                    line = 0;
9,828,231✔
2757
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
9,828,231✔
2758
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
9,828,231✔
2759
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
9,828,231✔
2760

2761
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
9,828,231✔
2762
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
9,828,231✔
2763
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
9,828,231✔
2764

2765
  while (true) {
2766
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
12,263,915✔
2767
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
8,644,192✔
2768
      QUERY_CHECK_CODE(code, line, _return);
8,644,192✔
2769
    } else {
2770
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
3,619,723✔
2771
      SArray* pColRefInfo = NULL;
3,619,723✔
2772
      if (pVtbScan->isSuperTable) {
3,619,723✔
2773
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
3,490,606✔
2774
      } else {
2775
        pColRefInfo = pInfo->vtbScan.colRefInfo;
129,117✔
2776
      }
2777
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
3,619,723✔
2778

2779
      tb_uid_t uid = 0;
3,619,723✔
2780
      int32_t  vgId = 0;
3,619,723✔
2781
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
3,619,723✔
2782
      QUERY_CHECK_CODE(code, line, _return);
3,619,723✔
2783

2784
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
3,619,723✔
2785
      QUERY_CHECK_CODE(code, line, _return);
3,619,723✔
2786

2787
      // reset downstream operator's status
2788
      pVtbScanOp->status = OP_NOT_OPENED;
3,619,723✔
2789
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
3,619,723✔
2790
      QUERY_CHECK_CODE(code, line, _return);
3,619,102✔
2791
    }
2792

2793
    if (*pRes) {
12,263,294✔
2794
      // has result, still read data from this table.
2795
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
8,648,752✔
2796
      break;
8,648,752✔
2797
    } else {
2798
      // no result, read next table.
2799
      pVtbScan->curTableIdx++;
3,614,542✔
2800
      if (pVtbScan->isSuperTable) {
3,614,542✔
2801
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
3,485,425✔
2802
          setOperatorCompleted(pOperator);
1,049,741✔
2803
          break;
1,049,741✔
2804
        }
2805
      } else {
2806
        setOperatorCompleted(pOperator);
129,117✔
2807
        break;
129,117✔
2808
      }
2809
    }
2810
  }
2811

2812
_return:
9,827,610✔
2813
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
9,827,610✔
2814
  pVtbScan->otbNameToOtbInfoMap = NULL;
9,827,610✔
2815
  if (code) {
9,827,610✔
2816
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2817
  }
2818
  return code;
9,827,610✔
2819
}
2820

2821
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
9,871,423✔
2822
  int32_t                    code = TSDB_CODE_SUCCESS;
9,871,423✔
2823
  int32_t                    line = 0;
9,871,423✔
2824
  int64_t                    st = 0;
9,871,423✔
2825
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
9,871,423✔
2826
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
9,871,423✔
2827

2828
  if (OPTR_IS_OPENED(pOperator)) {
9,871,423✔
2829
    return code;
8,644,192✔
2830
  }
2831

2832
  if (pOperator->cost.openCost == 0) {
1,227,231✔
2833
    st = taosGetTimestampUs();
1,131,686✔
2834
  }
2835

2836
  if (pVtbScan->isSuperTable) {
1,227,231✔
2837
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,096,842✔
2838
    QUERY_CHECK_CODE(code, line, _return);
1,096,842✔
2839
  } else {
2840
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
130,389✔
2841
    QUERY_CHECK_CODE(code, line, _return);
130,389✔
2842
  }
2843

2844
  OPTR_SET_OPENED(pOperator);
1,224,899✔
2845

2846
_return:
1,227,231✔
2847
  if (pOperator->cost.openCost == 0) {
1,227,231✔
2848
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,131,686✔
2849
  }
2850
  if (code) {
1,227,231✔
2851
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,332✔
2852
    pOperator->pTaskInfo->code = code;
2,332✔
2853
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,332✔
2854
  }
2855
  return code;
1,224,899✔
2856
}
2857

2858
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
10,724,935✔
2859
  int32_t                    code = TSDB_CODE_SUCCESS;
10,724,935✔
2860
  int32_t                    line = 0;
10,724,935✔
2861
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
10,724,935✔
2862
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
10,725,147✔
2863

2864
  QRY_PARAM_CHECK(pRes);
10,725,147✔
2865
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
10,725,147✔
2866
    return code;
×
2867
  }
2868
  if (pOperator->pOperatorGetParam) {
10,725,147✔
2869
    if (pOperator->status == OP_EXEC_DONE) {
×
2870
      pOperator->status = OP_OPENED;
×
2871
    }
2872
    pVtbScan->curTableIdx = 0;
×
2873
    pVtbScan->lastTableIdx = -1;
×
2874
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
2875
    pOperator->pOperatorGetParam = NULL;
×
2876
  } else {
2877
    pVtbScan->window.skey = INT64_MAX;
10,725,147✔
2878
    pVtbScan->window.ekey = INT64_MIN;
10,724,935✔
2879
  }
2880

2881
  if (pVtbScan->needRedeploy) {
10,724,935✔
2882
    code = virtualTableScanCheckNeedRedeploy(pOperator);
856,056✔
2883
    QUERY_CHECK_CODE(code, line, _return);
856,056✔
2884
  }
2885

2886
  code = pOperator->fpSet._openFn(pOperator);
9,871,211✔
2887
  QUERY_CHECK_CODE(code, line, _return);
9,869,091✔
2888

2889
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
9,869,091✔
2890
    setOperatorCompleted(pOperator);
40,860✔
2891
    return code;
40,860✔
2892
  }
2893

2894
  code = virtualTableScanGetNext(pOperator, pRes);
9,828,231✔
2895
  QUERY_CHECK_CODE(code, line, _return);
9,827,610✔
2896

2897
  return code;
9,827,610✔
2898

2899
_return:
853,724✔
2900
  if (code) {
853,724✔
2901
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
853,724✔
2902
    pOperator->pTaskInfo->code = code;
853,724✔
2903
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
853,724✔
2904
  }
2905
  return code;
×
2906
}
2907

2908
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,140,780✔
2909
  if (batchFetch) {
1,140,780✔
2910
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,139,679✔
2911
    if (NULL == pPrev->leftHash) {
1,139,679✔
2912
      return terrno;
×
2913
    }
2914
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,139,679✔
2915
    if (NULL == pPrev->rightHash) {
1,139,679✔
2916
      return terrno;
×
2917
    }
2918
  } else {
2919
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,101✔
2920
    if (NULL == pPrev->leftCache) {
1,101✔
2921
      return terrno;
×
2922
    }
2923
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,101✔
2924
    if (NULL == pPrev->rightCache) {
1,101✔
2925
      return terrno;
×
2926
    }
2927
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,101✔
2928
    if (NULL == pPrev->onceTable) {
1,101✔
2929
      return terrno;
×
2930
    }
2931
  }
2932

2933
  return TSDB_CODE_SUCCESS;
1,140,780✔
2934
}
2935

2936
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2937
  if (pStreamRuntimeInfo == NULL) {
×
2938
    return;
×
2939
  }
2940

2941
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2942
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2943
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2944
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2945
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2946

2947
      pVtbScan->dynTbUid = pValue->uid;
×
2948
      break;
×
2949
    }
2950
  }
2951
}
2952

2953
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
1,530,988✔
2954
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2955
  int32_t      code = TSDB_CODE_SUCCESS;
1,530,988✔
2956
  int32_t      line = 0;
1,530,988✔
2957

2958
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
1,530,988✔
2959
  QUERY_CHECK_CODE(code, line, _return);
1,530,988✔
2960

2961
  pInfo->vtbScan.genNewParam = true;
1,530,988✔
2962
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
1,530,988✔
2963
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
1,530,988✔
2964
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
1,530,988✔
2965
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
1,530,988✔
2966
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
1,530,988✔
2967
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
1,530,988✔
2968
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
1,530,988✔
2969
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
1,530,988✔
2970
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
1,530,988✔
2971
  pInfo->vtbScan.needRedeploy = false;
1,530,988✔
2972
  pInfo->vtbScan.pMsgCb = pMsgCb;
1,530,988✔
2973
  pInfo->vtbScan.curTableIdx = 0;
1,530,988✔
2974
  pInfo->vtbScan.lastTableIdx = -1;
1,530,988✔
2975
  pInfo->vtbScan.dynTbUid = 0;
1,530,988✔
2976
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
1,530,988✔
2977
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
1,530,988✔
2978
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
1,530,988✔
2979
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
1,530,988✔
2980
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,530,988✔
2981
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
1,530,988✔
2982
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
3,984,776✔
2983
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
2,453,788✔
2984
    int32_t vgId = (int32_t)valueNode->datum.i;
2,453,788✔
2985
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
2,453,788✔
2986
    QUERY_CHECK_CODE(code, line, _return);
2,453,788✔
2987
  }
2988

2989
  if (pPhyciNode->dynTbname && pTaskInfo) {
1,530,988✔
2990
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2991
  }
2992

2993
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
1,530,988✔
2994
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
1,530,988✔
2995

2996
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
12,108,073✔
2997
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
10,577,085✔
2998
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
10,577,085✔
2999
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
21,154,170✔
3000
  }
3001

3002
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
1,530,988✔
3003
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
1,530,988✔
3004

3005
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,530,988✔
3006
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
1,530,988✔
3007

3008
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
1,530,988✔
3009
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
1,530,988✔
3010
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
1,530,988✔
3011
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
1,530,988✔
3012
  pInfo->vtbScan.vtbUidTagListMap = NULL;
1,530,988✔
3013
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
1,530,988✔
3014
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
1,530,988✔
3015

3016
  return code;
1,530,988✔
3017
_return:
×
3018
  // no need to destroy array and hashmap allocated in this function,
3019
  // since the operator's destroy function will take care of it
3020
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3021
  return code;
×
3022
}
3023

3024
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
377,610✔
3025
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3026
  int32_t              code = TSDB_CODE_SUCCESS;
377,610✔
3027
  int32_t              line = 0;
377,610✔
3028
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
377,610✔
3029

3030
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
377,610✔
3031
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
377,610✔
3032
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
377,610✔
3033
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
377,610✔
3034
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
377,610✔
3035
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
377,610✔
3036

3037
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
377,610✔
3038
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
377,610✔
3039

3040
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
377,610✔
3041
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
377,610✔
3042

3043
  pInfo->vtbWindow.outputWstartSlotId = -1;
377,610✔
3044
  pInfo->vtbWindow.outputWendSlotId = -1;
377,610✔
3045
  pInfo->vtbWindow.outputWdurationSlotId = -1;
377,610✔
3046
  pInfo->vtbWindow.curWinBatchIdx = 0;
377,610✔
3047

3048
  initResultSizeInfo(&pOperator->resultInfo, 1);
377,610✔
3049
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
377,610✔
3050
  QUERY_CHECK_CODE(code, line, _return);
377,610✔
3051

3052
  return code;
377,610✔
3053
_return:
×
3054
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3055
  return code;
×
3056
}
3057

3058
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
1,885,344✔
3059
  int32_t code = TSDB_CODE_SUCCESS;
1,885,344✔
3060
  int32_t lino = 0;
1,885,344✔
3061

3062
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
1,885,344✔
3063
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
1,885,344✔
3064
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
1,885,344✔
3065

3066
    *ppTsCols = (int64_t*)pColDataInfo->pData;
1,885,344✔
3067

3068
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1,885,344✔
3069
      code = blockDataUpdateTsWindow(pBlock, slotId);
183,936✔
3070
      QUERY_CHECK_CODE(code, lino, _return);
183,936✔
3071
    }
3072
  }
3073

3074
  return code;
1,885,344✔
3075
_return:
×
3076
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3077
  return code;
×
3078
}
3079

3080
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
400,602✔
3081
  int32_t                    code = TSDB_CODE_SUCCESS;
400,602✔
3082
  int32_t                    lino = 0;
400,602✔
3083
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
400,602✔
3084
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
400,602✔
3085
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
400,602✔
3086
  int64_t                    st = 0;
400,602✔
3087

3088
  if (OPTR_IS_OPENED(pOperator)) {
400,602✔
3089
    return code;
22,992✔
3090
  }
3091

3092
  if (pOperator->cost.openCost == 0) {
377,610✔
3093
    st = taosGetTimestampUs();
377,610✔
3094
  }
3095

3096
  while (1) {
942,672✔
3097
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,320,282✔
3098
    if (pBlock == NULL) {
1,320,282✔
3099
      break;
377,610✔
3100
    }
3101

3102
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
942,672✔
3103
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
2,726,160✔
3104
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
2,348,550✔
3105
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
2,348,550✔
3106
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
617,070✔
3107
            pInfo->outputWstartSlotId = i;
230,250✔
3108
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
386,820✔
3109
            pInfo->outputWendSlotId = i;
230,250✔
3110
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
156,570✔
3111
            pInfo->outputWdurationSlotId = i;
156,570✔
3112
          }
3113
        }
3114
      }
3115
    }
3116

3117
    TSKEY* wstartCol = NULL;
942,672✔
3118
    TSKEY* wendCol = NULL;
942,672✔
3119

3120
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
942,672✔
3121
    QUERY_CHECK_CODE(code, lino, _return);
942,672✔
3122
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
942,672✔
3123
    QUERY_CHECK_CODE(code, lino, _return);
942,672✔
3124

3125
    SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
942,672✔
3126
    QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
942,672✔
3127

3128
    QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
942,672✔
3129

3130
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
3131
      SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
2,147,483,647✔
3132
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
2,147,483,647✔
3133
      pWindow->tw.skey = wstartCol[i];
2,147,483,647✔
3134
      pWindow->tw.ekey = wendCol[i] + 1;
2,147,483,647✔
3135
      pWindow->winOutIdx = -1;
2,147,483,647✔
3136
    }
3137

3138
    QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
1,885,344✔
3139
  }
3140

3141
  // handle first window's start key and last window's end key
3142
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
377,610✔
3143
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
377,610✔
3144

3145
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
377,610✔
3146
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
377,610✔
3147

3148
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
377,610✔
3149
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
377,610✔
3150

3151
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
377,610✔
3152
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
377,610✔
3153

3154
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
377,610✔
3155
    lastWin->tw.ekey = INT64_MAX;
94,628✔
3156
  }
3157
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
377,610✔
3158
    firstWin->tw.skey = INT64_MIN;
141,491✔
3159
  }
3160

3161
  if (pInfo->isVstb) {
377,610✔
3162
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
93,726✔
3163
    QUERY_CHECK_CODE(code, lino, _return);
93,726✔
3164
  }
3165

3166
  OPTR_SET_OPENED(pOperator);
377,610✔
3167

3168
  if (pOperator->cost.openCost == 0) {
377,610✔
3169
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
377,610✔
3170
  }
3171

3172
_return:
×
3173
  if (code != TSDB_CODE_SUCCESS) {
377,610✔
3174
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3175
    pTaskInfo->code = code;
×
3176
    T_LONG_JMP(pTaskInfo->env, code);
×
3177
  }
3178
  return code;
377,610✔
3179
}
3180

3181
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
100,584✔
3182
  int32_t                       code = TSDB_CODE_SUCCESS;
100,584✔
3183
  int32_t                       lino = 0;
100,584✔
3184
  SExternalWindowOperatorParam* pExtWinOp = NULL;
100,584✔
3185

3186
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
100,584✔
3187
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
100,584✔
3188

3189
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
100,584✔
3190
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
100,584✔
3191

3192
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
100,584✔
3193
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
100,584✔
3194

3195
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
100,584✔
3196
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGetLast(pWins);
100,584✔
3197

3198
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
100,584✔
3199
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
100,584✔
3200

3201
  SOperatorParam* pExchangeParam = NULL;
100,584✔
3202
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pInfo->vtbScan.otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey}, EX_SRC_TYPE_VSTB_WIN_SCAN);
100,584✔
3203
  QUERY_CHECK_CODE(code, lino, _return);
100,584✔
3204

3205
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
201,168✔
3206

3207
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
100,584✔
3208
  (*ppRes)->downstreamIdx = idx;
100,584✔
3209
  (*ppRes)->value = pExtWinOp;
100,584✔
3210
  (*ppRes)->reUse = false;
100,584✔
3211

3212
  return code;
100,584✔
3213
_return:
×
3214
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3215
  if (pExtWinOp) {
×
3216
    if (pExtWinOp->ExtWins) {
×
3217
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3218
    }
3219
    taosMemoryFree(pExtWinOp);
×
3220
  }
3221
  if (*ppRes) {
×
3222
    if ((*ppRes)->pChildren) {
×
3223
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3224
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
3225
        if (pChildParam) {
×
3226
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
3227
          if (pDynParam) {
×
3228
            taosMemoryFree(pDynParam);
×
3229
          }
3230
          taosMemoryFree(pChildParam);
×
3231
        }
3232
      }
3233
      taosArrayDestroy((*ppRes)->pChildren);
×
3234
    }
3235
    taosMemoryFree(*ppRes);
×
3236
    *ppRes = NULL;
×
3237
  }
3238
  return code;
×
3239
}
3240

3241
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
400,602✔
3242
  int32_t                    code = TSDB_CODE_SUCCESS;
400,602✔
3243
  int32_t                    lino = 0;
400,602✔
3244
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
400,602✔
3245
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
400,602✔
3246
  int64_t                    st = taosGetTimestampUs();
400,602✔
3247
  int32_t                    numOfWins = 0;
400,602✔
3248
  SOperatorInfo*             mergeOp = NULL;
400,602✔
3249
  SOperatorInfo*             extWinOp = NULL;
400,602✔
3250
  SOperatorParam*            pMergeParam = NULL;
400,602✔
3251
  SOperatorParam*            pExtWinParam = NULL;
400,602✔
3252
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
400,602✔
3253
  SSDataBlock*               pRes = pInfo->pRes;
400,602✔
3254

3255
  code = pOperator->fpSet._openFn(pOperator);
400,602✔
3256
  QUERY_CHECK_CODE(code, lino, _return);
400,602✔
3257

3258
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
400,602✔
3259
    *ppRes = NULL;
9,210✔
3260
    return code;
9,210✔
3261
  }
3262

3263
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
391,392✔
3264
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
391,392✔
3265

3266
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
391,392✔
3267

3268
  if (pInfo->isVstb) {
391,392✔
3269
    extWinOp = pOperator->pDownstream[2];
100,584✔
3270
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
100,584✔
3271
    QUERY_CHECK_CODE(code, lino, _return);
100,584✔
3272

3273
    SSDataBlock* pExtWinBlock = NULL;
100,584✔
3274
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
100,584✔
3275
    QUERY_CHECK_CODE(code, lino, _return);
100,584✔
3276
    setOperatorCompleted(extWinOp);
100,584✔
3277

3278
    blockDataCleanup(pRes);
100,584✔
3279
    code = blockDataEnsureCapacity(pRes, numOfWins);
100,584✔
3280
    QUERY_CHECK_CODE(code, lino, _return);
100,584✔
3281

3282
    if (pExtWinBlock) {
100,584✔
3283
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
100,584✔
3284
      QUERY_CHECK_CODE(code, lino, _return);
100,584✔
3285

3286
      if (pInfo->curWinBatchIdx == 0) {
100,584✔
3287
        // first batch, get _wstart from pMergedBlock
3288
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
93,726✔
3289
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
93,726✔
3290

3291
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
93,726✔
3292
      }
3293
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
100,584✔
3294
        // last batch, get _wend from pMergedBlock
3295
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
2,286✔
3296
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
2,286✔
3297

3298
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
2,286✔
3299
      }
3300
    }
3301
  } else {
3302
    mergeOp = pOperator->pDownstream[1];
290,808✔
3303
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
290,808✔
3304
    QUERY_CHECK_CODE(code, lino, _return);
290,808✔
3305

3306
    SSDataBlock* pMergedBlock = NULL;
290,808✔
3307
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
290,808✔
3308
    QUERY_CHECK_CODE(code, lino, _return);
290,808✔
3309

3310
    blockDataCleanup(pRes);
290,808✔
3311
    code = blockDataEnsureCapacity(pRes, numOfWins);
290,808✔
3312
    QUERY_CHECK_CODE(code, lino, _return);
290,808✔
3313

3314
    if (pMergedBlock) {
290,808✔
3315
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
290,808✔
3316
      QUERY_CHECK_CODE(code, lino, _return);
290,808✔
3317

3318
      if (pInfo->curWinBatchIdx == 0) {
290,808✔
3319
        // first batch, get _wstart from pMergedBlock
3320
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
283,884✔
3321
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
283,884✔
3322

3323
        firstWin->tw.skey = pMergedBlock->info.window.skey;
283,884✔
3324
      }
3325
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
290,808✔
3326
        // last batch, get _wend from pMergedBlock
3327
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
6,924✔
3328
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
6,924✔
3329

3330
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
6,924✔
3331
      }
3332
    }
3333
  }
3334

3335

3336
  if (pInfo->outputWstartSlotId != -1) {
391,392✔
3337
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
244,032✔
3338
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
244,032✔
3339

3340
    for (int32_t i = 0; i < numOfWins; i++) {
737,562,600✔
3341
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
737,318,568✔
3342
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
737,318,568✔
3343
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
737,318,568✔
3344
      QUERY_CHECK_CODE(code, lino, _return);
737,318,568✔
3345
    }
3346
  }
3347
  if (pInfo->outputWendSlotId != -1) {
391,392✔
3348
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
244,032✔
3349
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
244,032✔
3350

3351
    for (int32_t i = 0; i < numOfWins; i++) {
737,562,600✔
3352
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
737,318,568✔
3353
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
737,318,568✔
3354
      TSKEY ekey = pWindow->tw.ekey - 1;
737,318,568✔
3355
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
737,318,568✔
3356
      QUERY_CHECK_CODE(code, lino, _return);
737,318,568✔
3357
    }
3358
  }
3359
  if (pInfo->outputWdurationSlotId != -1) {
391,392✔
3360
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
170,352✔
3361
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
170,352✔
3362

3363
    for (int32_t i = 0; i < numOfWins; i++) {
511,143,960✔
3364
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
510,973,608✔
3365
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
510,973,608✔
3366
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
510,973,608✔
3367
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
510,973,608✔
3368
      QUERY_CHECK_CODE(code, lino, _return);
510,973,608✔
3369
    }
3370
  }
3371

3372
  pRes->info.rows = numOfWins;
391,392✔
3373
  *ppRes = pRes;
391,392✔
3374
  pInfo->curWinBatchIdx++;
391,392✔
3375

3376
  return code;
391,392✔
3377

3378
_return:
×
3379
  if (code != TSDB_CODE_SUCCESS) {
×
3380
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3381
    pTaskInfo->code = code;
×
3382
    T_LONG_JMP(pTaskInfo->env, code);
×
3383
  }
3384
  return code;
×
3385
}
3386

3387
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
1,081,497✔
3388
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
1,081,497✔
3389
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
1,081,921✔
3390
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
1,081,921✔
3391

3392
  pOper->status = OP_NOT_OPENED;
1,081,921✔
3393

3394
  switch (pDyn->qType) {
1,081,709✔
3395
    case DYN_QTYPE_STB_HASH:{
747✔
3396
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
747✔
3397
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
747✔
3398
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
747✔
3399
      
3400
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
747✔
3401
      if (TSDB_CODE_SUCCESS != code) {
747✔
3402
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
3403
        return code;
×
3404
      }
3405
      pStbJoin->ctx.prev.pListHead = NULL;
747✔
3406
      pStbJoin->ctx.prev.joinBuild = false;
747✔
3407
      pStbJoin->ctx.prev.pListTail = NULL;
747✔
3408
      pStbJoin->ctx.prev.tableNum = 0;
747✔
3409

3410
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
747✔
3411
      break; 
747✔
3412
    }
3413
    case DYN_QTYPE_VTB_SCAN: {
1,081,174✔
3414
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
1,081,174✔
3415
      
3416
      if (pVtbScan->otbNameToOtbInfoMap) {
1,080,962✔
3417
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3418
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3419
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3420
      }
3421
      if (pVtbScan->pRsp) {
1,081,174✔
3422
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3423
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3424
      }
3425
      if (pVtbScan->colRefInfo) {
1,080,962✔
3426
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
130,389✔
3427
        pVtbScan->colRefInfo = NULL;
130,389✔
3428
      }
3429
      if (pVtbScan->childTableMap) {
1,080,962✔
3430
        taosHashCleanup(pVtbScan->childTableMap);
5,576✔
3431
        pVtbScan->childTableMap = NULL;
5,576✔
3432
      }
3433
      if (pVtbScan->childTableList) {
1,080,750✔
3434
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
1,080,750✔
3435
      }
3436
      if (pPhyciNode->dynTbname && pTaskInfo) {
1,081,174✔
3437
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3438
      }
3439
      pVtbScan->curTableIdx = 0;
1,081,174✔
3440
      pVtbScan->lastTableIdx = -1;
1,081,174✔
3441
      break;
1,081,174✔
3442
    }
3443
    case DYN_QTYPE_VTB_WINDOW: {
×
3444
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3445
      if (pVtbWindow->pRes) {
×
3446
        blockDataDestroy(pVtbWindow->pRes);
×
3447
        pVtbWindow->pRes = NULL;
×
3448
      }
3449
      if (pVtbWindow->pWins) {
×
3450
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
3451
        pVtbWindow->pWins = NULL;
×
3452
      }
3453
      pVtbWindow->outputWdurationSlotId = -1;
×
3454
      pVtbWindow->outputWendSlotId = -1;
×
3455
      pVtbWindow->outputWstartSlotId = -1;
×
3456
      pVtbWindow->curWinBatchIdx = 0;
×
3457
      break;
×
3458
    }
3459
    default:
×
3460
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
3461
      break;
×
3462
  }
3463
  return 0;
1,081,497✔
3464
}
3465

3466
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
62,828✔
3467
  int32_t                    code = TSDB_CODE_SUCCESS;
62,828✔
3468
  int32_t                    line = 0;
62,828✔
3469
  int64_t                    st = 0;
62,828✔
3470
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
62,828✔
3471

3472
  if (OPTR_IS_OPENED(pOperator)) {
62,828✔
3473
    return code;
41,136✔
3474
  }
3475

3476
  if (pOperator->cost.openCost == 0) {
21,692✔
3477
    st = taosGetTimestampUs();
21,692✔
3478
  }
3479

3480
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
21,692✔
3481
  QUERY_CHECK_CODE(code, line, _return);
21,692✔
3482
  OPTR_SET_OPENED(pOperator);
21,692✔
3483

3484
_return:
21,692✔
3485
  if (pOperator->cost.openCost == 0) {
21,692✔
3486
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
21,692✔
3487
  }
3488
  if (code) {
21,692✔
3489
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3490
    pOperator->pTaskInfo->code = code;
×
3491
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3492
  }
3493
  return code;
21,692✔
3494
}
3495

3496
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
62,828✔
3497
  int32_t                    code = TSDB_CODE_SUCCESS;
62,828✔
3498
  int32_t                    line = 0;
62,828✔
3499
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
62,828✔
3500
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
62,828✔
3501
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
62,828✔
3502
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
62,828✔
3503
  SOperatorParam*            pAggParam = NULL;
62,828✔
3504

3505
  if (pInfo->vtbScan.hasPartition) {
62,828✔
3506
    if (pInfo->vtbScan.batchProcessChild) {
18,368✔
3507
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
18,368✔
3508
      while (pIter) {
27,552✔
3509
        size_t     keyLen = 0;
22,960✔
3510
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
22,960✔
3511

3512
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
22,960✔
3513
        QUERY_CHECK_CODE(code, line, _return);
22,960✔
3514

3515
        if (pAggParam) {
22,960✔
3516
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
22,960✔
3517
          QUERY_CHECK_CODE(code, line, _return);
22,960✔
3518
        } else {
UNCOV
3519
          *pRes = NULL;
×
3520
        }
3521

3522
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
22,960✔
3523

3524
        if (*pRes) {
22,960✔
3525
          (*pRes)->info.id.groupId = groupid;
13,776✔
3526
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
13,776✔
3527
          QUERY_CHECK_CODE(code, line, _return);
13,776✔
3528
          break;
13,776✔
3529
        }
3530
      }
3531
    } else {
UNCOV
3532
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
×
UNCOV
3533
      while (pIter) {
×
UNCOV
3534
        size_t     keyLen = 0;
×
UNCOV
3535
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
×
UNCOV
3536
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
×
3537

UNCOV
3538
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
×
UNCOV
3539
        while (pIter2) {
×
UNCOV
3540
          size_t   keyLen2 = 0;
×
UNCOV
3541
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
×
UNCOV
3542
          SArray*  pTagList = *(SArray**)pIter2;
×
3543

UNCOV
3544
          if (pVtbScan->genNewParam) {
×
UNCOV
3545
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
×
UNCOV
3546
            QUERY_CHECK_CODE(code, line, _return);
×
UNCOV
3547
            if (pAggParam) {
×
UNCOV
3548
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
×
UNCOV
3549
              QUERY_CHECK_CODE(code, line, _return);
×
3550
            } else {
UNCOV
3551
              *pRes = NULL;
×
3552
            }
3553
          } else {
UNCOV
3554
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
×
UNCOV
3555
            QUERY_CHECK_CODE(code, line, _return);
×
3556
          }
3557

UNCOV
3558
          if (*pRes) {
×
UNCOV
3559
            pVtbScan->genNewParam = false;
×
UNCOV
3560
            (*pRes)->info.id.groupId = *groupid;
×
UNCOV
3561
            break;
×
3562
          }
UNCOV
3563
          pVtbScan->genNewParam = true;
×
UNCOV
3564
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
×
UNCOV
3565
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
×
UNCOV
3566
          QUERY_CHECK_CODE(code, line, _return);
×
3567
        }
UNCOV
3568
        if (*pRes) {
×
UNCOV
3569
          break;
×
3570
        }
UNCOV
3571
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
×
UNCOV
3572
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
×
UNCOV
3573
        QUERY_CHECK_CODE(code, line, _return);
×
3574
      }
3575
    }
3576

3577
  } else {
3578
    if (pInfo->vtbScan.batchProcessChild) {
44,460✔
3579
      code = buildAggOperatorParam(pInfo, &pAggParam);
12,540✔
3580
      QUERY_CHECK_CODE(code, line, _return);
12,540✔
3581

3582
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
12,540✔
3583
      QUERY_CHECK_CODE(code, line, _return);
12,540✔
3584
      setOperatorCompleted(pOperator);
12,540✔
3585
    } else {
3586
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
31,920✔
3587
      while (pIter) {
50,160✔
3588
        size_t   keyLen = 0;
45,600✔
3589
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
45,600✔
3590
        SArray*  pTagList = *(SArray**)pIter;
45,600✔
3591

3592
        if (pVtbScan->genNewParam) {
45,600✔
3593
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
18,240✔
3594
          QUERY_CHECK_CODE(code, line, _return);
18,240✔
3595

3596
          if (pAggParam) {
18,240✔
3597
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
18,240✔
3598
            QUERY_CHECK_CODE(code, line, _return);
18,240✔
3599
          } else {
UNCOV
3600
            *pRes = NULL;
×
3601
          }
3602
        } else {
3603
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
27,360✔
3604
          QUERY_CHECK_CODE(code, line, _return);
27,360✔
3605
        }
3606

3607
        if (*pRes) {
45,600✔
3608
          pVtbScan->genNewParam = false;
27,360✔
3609
          break;
27,360✔
3610
        }
3611
        pVtbScan->genNewParam = true;
18,240✔
3612
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
18,240✔
3613
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
18,240✔
3614
        QUERY_CHECK_CODE(code, line, _return);
18,240✔
3615
      }
3616
    }
3617
  }
3618
_return:
62,828✔
3619
  if (code) {
62,828✔
3620
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3621
  }
3622
  return code;
62,828✔
3623
}
3624

3625
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
75,368✔
3626
  int32_t                    code = TSDB_CODE_SUCCESS;
75,368✔
3627
  int32_t                    line = 0;
75,368✔
3628
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
75,368✔
3629
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
75,368✔
3630

3631
  QRY_PARAM_CHECK(pRes);
75,368✔
3632
  if (pOperator->status == OP_EXEC_DONE) {
75,368✔
3633
    return code;
12,540✔
3634
  }
3635

3636
  code = pOperator->fpSet._openFn(pOperator);
62,828✔
3637
  QUERY_CHECK_CODE(code, line, _return);
62,828✔
3638

3639
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
62,828✔
3640
    setOperatorCompleted(pOperator);
×
3641
    return code;
×
3642
  }
3643

3644
  code = virtualTableAggGetNext(pOperator, pRes);
62,828✔
3645
  QUERY_CHECK_CODE(code, line, _return);
62,828✔
3646

3647
  return code;
62,828✔
3648

3649
_return:
×
3650
  if (code) {
×
3651
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3652
    pOperator->pTaskInfo->code = code;
×
3653
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3654
  }
3655
  return code;
×
3656
}
3657

3658
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
2,671,021✔
3659
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3660
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3661
  QRY_PARAM_CHECK(pOptrInfo);
2,671,021✔
3662

3663
  int32_t                    code = TSDB_CODE_SUCCESS;
2,671,021✔
3664
  int32_t                    line = 0;
2,671,021✔
3665
  __optr_fn_t                nextFp = NULL;
2,671,021✔
3666
  __optr_open_fn_t           openFp = NULL;
2,671,021✔
3667
  SOperatorInfo*             pOperator = NULL;
2,671,021✔
3668
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
2,671,021✔
3669
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
2,671,021✔
3670

3671
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,671,021✔
3672
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
2,671,021✔
3673

3674
  pOperator->pPhyNode = pPhyciNode;
2,671,021✔
3675
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
2,671,021✔
3676

3677
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
2,671,021✔
3678
  QUERY_CHECK_CODE(code, line, _error);
2,671,021✔
3679

3680
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
2,671,021✔
3681
                  pInfo, pTaskInfo);
3682

3683
  pInfo->qType = pPhyciNode->qType;
2,671,021✔
3684
  switch (pInfo->qType) {
2,671,021✔
3685
    case DYN_QTYPE_STB_HASH:
1,140,033✔
3686
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,140,033✔
3687
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,140,033✔
3688
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,140,033✔
3689
      QUERY_CHECK_CODE(code, line, _error);
1,140,033✔
3690
      nextFp = seqStableJoin;
1,140,033✔
3691
      openFp = optrDummyOpenFn;
1,140,033✔
3692
      break;
1,140,033✔
3693
    case DYN_QTYPE_VTB_SCAN:
1,131,686✔
3694
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,131,686✔
3695
      QUERY_CHECK_CODE(code, line, _error);
1,131,686✔
3696
      nextFp = vtbScanNext;
1,131,686✔
3697
      openFp = vtbScanOpen;
1,131,686✔
3698
      break;
1,131,686✔
3699
    case DYN_QTYPE_VTB_WINDOW:
377,610✔
3700
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
377,610✔
3701
      QUERY_CHECK_CODE(code, line, _error);
377,610✔
3702
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
377,610✔
3703
      QUERY_CHECK_CODE(code, line, _error);
377,610✔
3704
      nextFp = vtbWindowNext;
377,610✔
3705
      openFp = vtbWindowOpen;
377,610✔
3706
      break;
377,610✔
3707
    case DYN_QTYPE_VTB_AGG:
21,692✔
3708
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
21,692✔
3709
      QUERY_CHECK_CODE(code, line, _error);
21,692✔
3710
      nextFp = vtbAggNext;
21,692✔
3711
      openFp = vtbAggOpen;
21,692✔
3712
      break;
21,692✔
3713
    default:
×
3714
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
3715
      code = TSDB_CODE_INVALID_PARA;
×
3716
      goto _error;
×
3717
  }
3718

3719
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
2,671,021✔
3720
                                         NULL, optrDefaultGetNextExtFn, NULL);
3721

3722
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
2,671,021✔
3723
  *pOptrInfo = pOperator;
2,671,021✔
3724
  return TSDB_CODE_SUCCESS;
2,671,021✔
3725

3726
_error:
×
3727
  if (pInfo != NULL) {
×
3728
    destroyDynQueryCtrlOperator(pInfo);
×
3729
  }
3730
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
3731
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
3732
  pTaskInfo->code = code;
×
3733
  return code;
×
3734
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc