• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4991

17 Mar 2026 07:57AM UTC coverage: 69.756% (+0.4%) from 69.348%
#4991

push

travis-ci

web-flow
merge: from main to 3.0 branch #34807

14 of 16 new or added lines in 5 files covered. (87.5%)

3928 existing lines in 138 files now uncovered.

192146 of 275455 relevant lines covered (69.76%)

137208686.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.68
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "nodes.h"
18
#include "operator.h"
19
#include "os.h"
20
#include "plannodes.h"
21
#include "query.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tarray.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "thash.h"
28
#include "tmsg.h"
29
#include "trpc.h"
30
#include "ttypes.h"
31
#include "tdataformat.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,207,903✔
41
  SStbJoinTableList* pNext = NULL;
1,207,903✔
42
  
43
  while (pListHead) {
1,208,421✔
44
    taosMemoryFree(pListHead->pLeftVg);
518✔
45
    taosMemoryFree(pListHead->pLeftUid);
518✔
46
    taosMemoryFree(pListHead->pRightVg);
518✔
47
    taosMemoryFree(pListHead->pRightUid);
518✔
48
    pNext = pListHead->pNext;
518✔
49
    taosMemoryFree(pListHead);
518✔
50
    pListHead = pNext;
518✔
51
  }
52
}
1,207,903✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,207,903✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,207,903✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1,207,903✔
60
    if (pStbJoin->ctx.prev.leftHash) {
1,206,736✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,123,645✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,123,645✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1,206,736✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,123,645✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,123,645✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
1,167✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,167✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
1,167✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,167✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
1,167✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,167✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,207,903✔
81
}
1,207,903✔
82

83
void destroyColRefInfo(void *info) {
271,140,232✔
84
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
271,140,232✔
85
  if (pColRefInfo) {
271,140,232✔
86
    taosMemoryFree(pColRefInfo->colName);
271,140,232✔
87
    taosMemoryFree(pColRefInfo->colrefName);
271,140,232✔
88
  }
89
}
271,140,232✔
90

91
void destroyColRefArray(void *info) {
14,603,319✔
92
  SArray *pColRefArray = *(SArray **)info;
14,603,319✔
93
  if (pColRefArray) {
14,603,319✔
94
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
14,603,319✔
95
  }
96
}
14,603,319✔
97

98
void freeUseDbOutput(void* pOutput) {
3,297,223✔
99
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
3,297,223✔
100
  if (NULL == pOutput) {
3,297,223✔
101
    return;
×
102
  }
103

104
  if (pOut->dbVgroup) {
3,297,223✔
105
    freeVgInfo(pOut->dbVgroup);
3,297,223✔
106
  }
107
  taosMemFree(pOut);
3,297,223✔
108
}
109

110
void destroyOtbInfoArray(void *info) {
7,166,564✔
111
  SArray *pOtbInfoArray = *(SArray **)info;
7,166,564✔
112
  if (pOtbInfoArray) {
7,166,564✔
113
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
7,166,564✔
114
  }
115
}
7,166,564✔
116

117
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
4,900,924✔
118
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
4,900,924✔
119
  if (pOtbVgIdToOtbInfoArrayMap) {
4,900,924✔
120
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
4,900,924✔
121
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
4,900,924✔
122
  }
123
}
4,900,924✔
124

125
void destroyTagList(void *info) {
3,475,560✔
126
  SArray *pTagList = *(SArray **)info;
3,475,560✔
127
  if (pTagList) {
3,475,560✔
128
    taosArrayDestroyEx(pTagList, destroyTagVal);
3,475,560✔
129
  }
130
}
3,475,560✔
131

132
static void destroyRefColIdGroup(void *info) {
7,425✔
133
  SRefColIdGroup *pGroup = (SRefColIdGroup *)info;
7,425✔
134
  if (pGroup && pGroup->pSlotIdList) {
7,425✔
135
    taosArrayDestroy(pGroup->pSlotIdList);
7,425✔
136
    pGroup->pSlotIdList = NULL;
7,425✔
137
  }
138
}
7,425✔
139

140
void destroyVtbUidTagListMap(void *info) {
1,310,856✔
141
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
1,310,856✔
142
  if (pVtbUidTagListMap) {
1,310,856✔
143
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
1,310,856✔
144
    taosHashCleanup(pVtbUidTagListMap);
1,310,856✔
145
  }
146
}
1,310,856✔
147

148
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
3,096,307✔
149
  if (pVtbScan->dbName) {
3,096,307✔
150
    taosMemoryFreeClear(pVtbScan->dbName);
3,096,307✔
151
  }
152
  if (pVtbScan->tbName) {
3,096,307✔
153
    taosMemoryFreeClear(pVtbScan->tbName);
3,096,307✔
154
  }
155
  if (pVtbScan->refColGroups) {
3,096,307✔
156
    taosArrayDestroyEx(pVtbScan->refColGroups, destroyRefColIdGroup);
1,243✔
157
    pVtbScan->refColGroups = NULL;
1,243✔
158
  }
159
  if (pVtbScan->childTableList) {
3,096,307✔
160
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
3,096,307✔
161
  }
162
  if (pVtbScan->colRefInfo) {
3,096,307✔
163
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
164
    pVtbScan->colRefInfo = NULL;
×
165
  }
166
  if (pVtbScan->childTableMap) {
3,096,307✔
167
    taosHashCleanup(pVtbScan->childTableMap);
2,764,354✔
168
  }
169
  if (pVtbScan->readColList) {
3,096,307✔
170
    taosArrayDestroy(pVtbScan->readColList);
3,096,307✔
171
  }
172
  if (pVtbScan->readColSet) {
3,096,307✔
173
    taosHashCleanup(pVtbScan->readColSet);
3,096,307✔
174
  }
175
  if (pVtbScan->dbVgInfoMap) {
3,096,307✔
176
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
3,096,307✔
177
    taosHashCleanup(pVtbScan->dbVgInfoMap);
3,096,307✔
178
  }
179
  if (pVtbScan->otbNameToOtbInfoMap) {
3,096,307✔
180
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
1,358,227✔
181
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
1,358,227✔
182
  }
183
  if (pVtbScan->pRsp) {
3,096,307✔
184
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
185
    taosMemoryFreeClear(pVtbScan->pRsp);
×
186
  }
187
  if (pVtbScan->existOrgTbVg) {
3,096,307✔
188
    taosHashCleanup(pVtbScan->existOrgTbVg);
3,096,307✔
189
  }
190
  if (pVtbScan->curOrgTbVg) {
3,096,307✔
191
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,804✔
192
  }
193
  if (pVtbScan->newAddedVgInfo) {
3,096,307✔
194
    taosHashCleanup(pVtbScan->newAddedVgInfo);
872✔
195
  }
196
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
3,096,307✔
197
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
402,304✔
198
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
402,304✔
199
  }
200
  if (pVtbScan->vtbUidToVgIdMapMap) {
3,096,307✔
201
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
388,860✔
202
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
388,860✔
203
  }
204
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
3,096,307✔
205
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
566,408✔
206
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
566,408✔
207
  }
208
  if (pVtbScan->vtbUidTagListMap) {
3,096,307✔
209
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
388,860✔
210
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
388,860✔
211
  }
212
  if (pVtbScan->vtbGroupIdTagListMap) {
3,096,307✔
213
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
883,976✔
214
  }
215
  if (pVtbScan->vtbUidToGroupIdMap) {
3,096,307✔
216
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
883,976✔
217
  }
218
}
3,096,307✔
219

220
void destroyWinArray(void *info) {
1,407,410✔
221
  SArray *pWinArray = *(SArray **)info;
1,407,410✔
222
  if (pWinArray) {
1,407,410✔
223
    taosArrayDestroy(pWinArray);
1,407,410✔
224
  }
225
}
1,407,410✔
226

227
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
513,803✔
228
  if (pVtbWindow->pRes) {
513,803✔
229
    blockDataDestroy(pVtbWindow->pRes);
513,803✔
230
  }
231
  if (pVtbWindow->pWins) {
513,803✔
232
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
513,803✔
233
  }
234
}
513,803✔
235

236
static void destroyDynQueryCtrlOperator(void* param) {
4,303,406✔
237
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
4,303,406✔
238

239
  switch (pDyn->qType) {
4,303,406✔
240
    case DYN_QTYPE_STB_HASH:
1,207,099✔
241
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,207,099✔
242
      break;
1,207,099✔
243
    case DYN_QTYPE_VTB_WINDOW:
513,803✔
244
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
513,803✔
245
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
513,803✔
246
      break;
513,803✔
247
    case DYN_QTYPE_VTB_INTERVAL:
2,582,504✔
248
    case DYN_QTYPE_VTB_AGG:
249
    case DYN_QTYPE_VTB_SCAN:
250
    case DYN_QTYPE_VTB_TS_SCAN:
251
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
2,582,504✔
252
      break;
2,582,504✔
253
    default:
×
254
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
255
      break;
×
256
  }
257

258
  taosMemoryFreeClear(param);
4,303,406✔
259
}
4,303,406✔
260

261
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
262
  if (batchFetch) {
7,596,734✔
263
    return true;
7,592,066✔
264
  }
265
  
266
  if (rightTable) {
4,668✔
267
    return pPost->rightCurrUid == pPost->rightNextUid;
2,334✔
268
  }
269

270
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,334✔
271

272
  return (NULL == num) ? false : true;
2,334✔
273
}
274

275
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,798,367✔
276
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,798,367✔
277
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,798,367✔
278
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,798,367✔
279
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,798,367✔
280
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,798,367✔
281
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,798,367✔
282
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,798,367✔
283
  int64_t                    readIdx = pNode->readIdx + 1;
3,798,367✔
284
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,798,367✔
285

286
  pPost->leftCurrUid = *leftUid;
3,798,367✔
287
  pPost->rightCurrUid = *rightUid;
3,798,367✔
288

289
  pPost->leftVgId = *leftVgId;
3,798,367✔
290
  pPost->rightVgId = *rightVgId;
3,798,367✔
291

292
  while (true) {
293
    if (readIdx < pNode->uidNum) {
3,798,367✔
294
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,714,627✔
295
      break;
3,714,627✔
296
    }
297
    
298
    pNode = pNode->pNext;
83,740✔
299
    if (NULL == pNode) {
83,740✔
300
      pPost->rightNextUid = 0;
83,740✔
301
      break;
83,740✔
302
    }
303
    
304
    rightUid = pNode->pRightUid;
×
305
    readIdx = 0;
×
306
  }
307

308
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,596,734✔
309
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,596,734✔
310

311
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,798,367✔
312
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
313
    pStbJoin->execInfo.rightCacheNum++;
×
314
  }  
315

316
  return TSDB_CODE_SUCCESS;
3,798,367✔
317
}
318

319
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
7,869,679✔
320
  int32_t     code = TSDB_CODE_SUCCESS;
7,869,679✔
321
  int32_t     lino = 0;
7,869,679✔
322
  SOrgTbInfo* pTbInfo = NULL;
7,869,679✔
323

324
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
7,869,679✔
325

326
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
7,869,679✔
327
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
7,869,679✔
328

329
  pTbInfo->vgId = pSrc->vgId;
7,869,679✔
330
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
7,869,679✔
331

332
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
7,869,679✔
333
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
7,869,679✔
334

335
  *ppDst = pTbInfo;
7,869,679✔
336

337
  return code;
7,869,679✔
338
_return:
×
339
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
340
  if (pTbInfo) {
×
341
    if (pTbInfo->colMap) {
×
342
      taosArrayDestroy(pTbInfo->colMap);
×
343
    }
344
    taosMemoryFreeClear(pTbInfo);
×
345
  }
346
  return code;
×
347
}
348

349
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
3,293,292✔
350
  int32_t  code = TSDB_CODE_SUCCESS;
3,293,292✔
351
  int32_t  lino = 0;
3,293,292✔
352
  STagVal  tmpTag;
3,293,292✔
353

354
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
3,293,292✔
355
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
3,293,292✔
356

357
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
22,090,880✔
358
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
18,797,588✔
359
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
18,797,588✔
360
    tmpTag.type = pSrcTag->type;
18,797,588✔
361
    tmpTag.cid = pSrcTag->cid;
18,797,588✔
362
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
18,797,588✔
363
      tmpTag.nData = pSrcTag->nData;
8,253,564✔
364
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
8,253,564✔
365
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
8,253,564✔
366
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
8,253,564✔
367
    } else {
368
      tmpTag.i64 = pSrcTag->i64;
10,544,024✔
369
    }
370

371
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
37,595,176✔
372
    tmpTag = (STagVal){0};
18,797,588✔
373
  }
374

375
  return code;
3,293,292✔
376
_return:
×
377
  if (pBasic->tagList) {
×
378
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
379
    pBasic->tagList = NULL;
×
380
  }
381
  if (tmpTag.pData) {
×
382
    taosMemoryFree(tmpTag.pData);
×
383
  }
384
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
385
  return code;
×
386
}
387

388
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
7,457,144✔
389
  int32_t     code = TSDB_CODE_SUCCESS;
7,457,144✔
390
  int32_t     lino = 0;
7,457,144✔
391
  SOrgTbInfo  batchInfo;
7,457,144✔
392

393
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
7,457,144✔
394
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
7,457,144✔
395

396
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
19,812,353✔
397
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
12,355,209✔
398
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
12,355,209✔
399
    batchInfo.vgId = pSrc->vgId;
12,355,209✔
400
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
12,355,209✔
401
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
12,355,209✔
402
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
12,355,209✔
403
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
24,710,418✔
404
    batchInfo = (SOrgTbInfo){0};
12,355,209✔
405
  }
406

407
  return code;
7,457,144✔
408
_return:
×
409
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
410
  if (pBasic->batchOrgTbInfo) {
×
411
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
412
    pBasic->batchOrgTbInfo = NULL;
×
413
  }
414
  if (batchInfo.colMap) {
×
415
    taosArrayDestroy(batchInfo.colMap);
×
416
    batchInfo.colMap = NULL;
×
417
  }
418
  return code;
×
419
}
420

421
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,596,734✔
422
  int32_t code = TSDB_CODE_SUCCESS;
7,596,734✔
423
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
7,596,734✔
424
  if (NULL == *ppRes) {
7,596,734✔
425
    code = terrno;
×
426
    freeOperatorParam(pChild, OP_GET_PARAM);
×
427
    return code;
×
428
  }
429
  if (pChild) {
7,596,734✔
430
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
170,850✔
431
    if (NULL == (*ppRes)->pChildren) {
170,850✔
432
      code = terrno;
×
433
      freeOperatorParam(pChild, OP_GET_PARAM);
×
434
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
435
      *ppRes = NULL;
×
436
      return code;
×
437
    }
438
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
341,700✔
439
      code = terrno;
×
440
      freeOperatorParam(pChild, OP_GET_PARAM);
×
441
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
442
      *ppRes = NULL;
×
443
      return code;
×
444
    }
445
  } else {
446
    (*ppRes)->pChildren = NULL;
7,425,884✔
447
  }
448

449
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,596,734✔
450
  if (NULL == pGc) {
7,596,734✔
451
    code = terrno;
×
452
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
453
    *ppRes = NULL;
×
454
    return code;
×
455
  }
456

457
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,596,734✔
458
  pGc->downstreamIdx = downstreamIdx;
7,596,734✔
459
  pGc->vgId = vgId;
7,596,734✔
460
  pGc->tbUid = tbUid;
7,596,734✔
461
  pGc->needCache = needCache;
7,596,734✔
462

463
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,596,734✔
464
  (*ppRes)->downstreamIdx = downstreamIdx;
7,596,734✔
465
  (*ppRes)->value = pGc;
7,596,734✔
466
  (*ppRes)->reUse = false;
7,596,734✔
467

468
  return TSDB_CODE_SUCCESS;
7,596,734✔
469
}
470

471

472
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
473
  int32_t code = TSDB_CODE_SUCCESS;
×
474
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
×
475
  if (NULL == *ppRes) {
×
476
    return terrno;
×
477
  }
478
  (*ppRes)->pChildren = NULL;
×
479

480
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
481
  if (NULL == pGc) {
×
482
    code = terrno;
×
483
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
484
    return code;
×
485
  }
486

487
  pGc->downstreamIdx = downstreamIdx;
×
488
  pGc->vgId = vgId;
×
489
  pGc->tbUid = tbUid;
×
490

491
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
492
  (*ppRes)->downstreamIdx = downstreamIdx;
×
493
  (*ppRes)->value = pGc;
×
494
  (*ppRes)->reUse = false;
×
495

496
  return TSDB_CODE_SUCCESS;
×
497
}
498

499
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
21,869,239✔
500
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
501
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
502
                                               SArray* pOrgTbInfoArray, STimeWindow window,
503
                                               SDownstreamSourceNode* pDownstreamSourceNode,
504
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
505
  int32_t code = TSDB_CODE_SUCCESS;
21,869,239✔
506
  int32_t lino = 0;
21,869,239✔
507

508
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
21,869,239✔
509
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
510

511
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
21,869,239✔
512
  pBasic->srcOpType = srcOpType;
21,869,239✔
513
  pBasic->vgId = vgId;
21,869,239✔
514
  pBasic->groupid = groupId;
21,869,239✔
515
  pBasic->window = window;
21,869,239✔
516
  pBasic->tableSeq = tableSeq;
21,869,239✔
517
  pBasic->type = exchangeType;
21,869,239✔
518
  pBasic->isNewParam = isNewParam;
21,869,239✔
519

520
  if (pDownstreamSourceNode) {
21,869,239✔
521
    pBasic->isNewDeployed = true;
2,398✔
522
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,398✔
523
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,398✔
524
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,398✔
525
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,398✔
526
    pBasic->newDeployedSrc.localExec = false;
2,398✔
527
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,398✔
528
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,398✔
529
  } else {
530
    pBasic->isNewDeployed = false;
21,866,841✔
531
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
21,866,841✔
532
  }
533

534
  if (pUidList) {
21,869,239✔
535
    pBasic->uidList = taosArrayDup(pUidList, NULL);
5,803,399✔
536
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
5,803,399✔
537
  } else {
538
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
16,065,840✔
539
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
16,065,840✔
540
  }
541

542
  if (pOrgTbInfo) {
21,869,239✔
543
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
7,869,679✔
544
    QUERY_CHECK_CODE(code, lino, _return);
7,869,679✔
545
  } else {
546
    pBasic->orgTbInfo = NULL;
13,999,560✔
547
  }
548

549
  if (pTagList) {
21,869,239✔
550
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
3,293,292✔
551
    QUERY_CHECK_CODE(code, lino, _return);
3,293,292✔
552
  } else {
553
    pBasic->tagList = NULL;
18,575,947✔
554
  }
555

556
  if (pOrgTbInfoArray) {
21,869,239✔
557
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
7,457,144✔
558
    QUERY_CHECK_CODE(code, lino, _return);
7,457,144✔
559
  } else {
560
    pBasic->batchOrgTbInfo = NULL;
14,412,095✔
561
  }
562
  return code;
21,869,239✔
563

564
_return:
×
565
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
566
  freeExchangeGetBasicOperatorParam(pBasic);
×
567
  return code;
×
568
}
569

570
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
14,151,455✔
571
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
572
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
573
                                              SArray* pOrgTbInfoArray, STimeWindow window,
574
                                              SDownstreamSourceNode* pDownstreamSourceNode,
575
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
576

577
  int32_t                      code = TSDB_CODE_SUCCESS;
14,151,455✔
578
  int32_t                      lino = 0;
14,151,455✔
579
  SOperatorParam*              pParam = NULL;
14,151,455✔
580
  SExchangeOperatorParam*      pExc = NULL;
14,151,455✔
581

582
  *ppRes = NULL;
14,151,455✔
583

584
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
14,151,455✔
585
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
14,151,455✔
586

587
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
14,151,455✔
588
  pParam->downstreamIdx = downstreamIdx;
14,151,455✔
589
  pParam->reUse = reUse;
14,151,455✔
590
  pParam->pChildren = NULL;
14,151,455✔
591
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
14,151,455✔
592
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
14,151,455✔
593

594
  pExc = (SExchangeOperatorParam*)pParam->value;
14,151,455✔
595
  pExc->multiParams = false;
14,151,455✔
596

597
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
14,151,455✔
598
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
599
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
600
  QUERY_CHECK_CODE(code, lino, _return);
14,151,455✔
601

602
  *ppRes = pParam;
14,151,455✔
603
  return code;
14,151,455✔
604
_return:
×
605
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
606
  if (pParam) {
×
607
    freeOperatorParam(pParam, OP_GET_PARAM);
×
608
  }
609
  return code;
×
610
}
611

612
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,668✔
613
  int32_t code = TSDB_CODE_SUCCESS;
4,668✔
614
  int32_t lino = 0;
4,668✔
615

616
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,668✔
617
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,668✔
618

619
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,668✔
620

621
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,668✔
622
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,668✔
623
  QUERY_CHECK_CODE(code, lino, _return);
4,668✔
624

625
_return:
4,668✔
626
  if (code) {
4,668✔
627
    qError("failed to build exchange operator param, code:%d", code);
×
628
  }
629
  taosArrayDestroy(pUidList);
4,668✔
630
  return code;
4,668✔
631
}
632

633
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
739,017✔
634
  int32_t                   code = TSDB_CODE_SUCCESS;
739,017✔
635
  int32_t                   lino = 0;
739,017✔
636

637
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VTB_WIN_SCAN,
739,017✔
638
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
639
  QUERY_CHECK_CODE(code, lino, _return);
739,017✔
640

641
  return code;
739,017✔
642
_return:
×
643
  qError("failed to build exchange operator param for external window, code:%d", code);
×
644
  return code;
×
645
}
646

647
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
5,538,091✔
648
  int32_t                      code = TSDB_CODE_SUCCESS;
5,538,091✔
649
  int32_t                      lino = 0;
5,538,091✔
650
  SArray*                      pUidList = NULL;
5,538,091✔
651

652
  pUidList = taosArrayInit(1, sizeof(int64_t));
5,538,091✔
653
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
5,538,091✔
654

655
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
5,538,091✔
656

657
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
5,538,091✔
658
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
5,538,091✔
659
  QUERY_CHECK_CODE(code, lino, _return);
5,538,091✔
660

661
_return:
5,538,091✔
662
  if (code) {
5,538,091✔
663
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
664
  }
665
  taosArrayDestroy(pUidList);
5,538,091✔
666
  return code;
5,538,091✔
667
}
668

669
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
7,869,679✔
670
                                                  SDownstreamSourceNode* pNewSource) {
671
  int32_t                      code = TSDB_CODE_SUCCESS;
7,869,679✔
672
  int32_t                      lino = 0;
7,869,679✔
673

674
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
7,869,679✔
675
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
7,869,679✔
676
  QUERY_CHECK_CODE(code, lino, _return);
7,869,679✔
677

678
  return code;
7,869,679✔
679
_return:
×
680
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
681
  return code;
×
682
}
683

684
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
154,024✔
685
  int32_t                       code = TSDB_CODE_SUCCESS;
154,024✔
686
  int32_t                       line = 0;
154,024✔
687
  SOperatorParam*               pParam = NULL;
154,024✔
688
  SExchangeOperatorBatchParam*  pExc = NULL;
154,024✔
689
  SExchangeOperatorBasicParam   basic = {0};
154,024✔
690

691
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
154,024✔
692
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
154,024✔
693

694
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
154,024✔
695
  pParam->downstreamIdx = downstreamIdx;
154,024✔
696
  pParam->reUse = false;
154,024✔
697
  pParam->pChildren = NULL;
154,024✔
698
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
154,024✔
699
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
154,024✔
700

701
  pExc = pParam->value;
154,024✔
702
  pExc->multiParams = true;
154,024✔
703
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
154,024✔
704
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
154,024✔
705

706
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
154,024✔
707

708
  int32_t iter = 0;
154,024✔
709
  void*   p = NULL;
154,024✔
710
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
414,664✔
711
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
260,640✔
712
    SArray*  pUidList = *(SArray**)p;
260,640✔
713

714
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
260,640✔
715
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
716
                                           pUidList, NULL, NULL, NULL,
717
                                           (STimeWindow){0}, NULL, false, false, false);
260,640✔
718
    QUERY_CHECK_CODE(code, line, _return);
260,640✔
719

720
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
260,640✔
721

722
    basic = (SExchangeOperatorBasicParam){0};
260,640✔
723
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
260,640✔
724

725
    // already transferred to batch param, can free here
726
    taosArrayDestroy(pUidList);
260,640✔
727

728
    *(SArray**)p = NULL;
260,640✔
729
  }
730
  *ppRes = pParam;
154,024✔
731

732
  return code;
154,024✔
733
  
734
_return:
×
735
  qError("failed to build batch exchange operator param, code:%d", code);
×
736
  freeOperatorParam(pParam, OP_GET_PARAM);
×
737
  freeExchangeGetBasicOperatorParam(&basic);
×
738
  return code;
×
739
}
740

741
/*
742
 * Build one batch-exchange get-param for virtual-table dynamic execution.
743
 *
744
 * @param ppRes Output operator param.
745
 * @param downstreamIdx Downstream operator index to fetch from.
746
 * @param pTagList Optional tag values bound to each source table.
747
 * @param groupid Group id used by downstream operators.
748
 * @param pBatchMaps Hash map from vgroup id to source-table metadata array.
749
 * @param window Time window forwarded to downstream scan.
750
 * @param type Exchange source type.
751
 * @param srcOpType Physical scan operator type for each batch source.
752
 *
753
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
754
 */
755
static int32_t buildBatchExchangeOperatorParamForVirtual(SOperatorParam** ppRes, int32_t downstreamIdx,
6,556,058✔
756
                                                         SArray* pTagList, uint64_t groupid, SHashObj* pBatchMaps,
757
                                                         STimeWindow window, EExchangeSourceType type,
758
                                                         ENodeType srcOpType) {
759
  int32_t                       code = TSDB_CODE_SUCCESS;
6,556,058✔
760
  int32_t                       lino = 0;
6,556,058✔
761
  SOperatorParam*               pParam = NULL;
6,556,058✔
762
  SExchangeOperatorBatchParam*  pExc = NULL;
6,556,058✔
763
  SExchangeOperatorBasicParam   basic = {0};
6,556,058✔
764

765
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
6,556,058✔
766
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
6,556,058✔
767

768
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
6,556,058✔
769
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
6,556,058✔
770

771
  pExc = pParam->value;
6,556,058✔
772
  pExc->multiParams = true;
6,556,058✔
773

774
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,556,058✔
775
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
6,556,058✔
776
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
6,556,058✔
777

778
  size_t keyLen = 0;
6,556,058✔
779
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
6,556,058✔
780
  while (pIter != NULL) {
14,013,202✔
781
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
7,457,144✔
782
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
7,457,144✔
783

784
    code = buildExchangeOperatorBasicParam(&basic, srcOpType,
7,457,144✔
785
                                           type, *vgId, groupid,
786
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
787
                                           window, NULL, false, true, false);
788
    QUERY_CHECK_CODE(code, lino, _return);
7,457,144✔
789

790
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
7,457,144✔
791
    QUERY_CHECK_CODE(code, lino, _return);
7,457,144✔
792

793
    basic = (SExchangeOperatorBasicParam){0};
7,457,144✔
794
    pIter = taosHashIterate(pBatchMaps, pIter);
7,457,144✔
795
  }
796

797
  pParam->pChildren = NULL;
6,556,058✔
798
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
6,556,058✔
799
  pParam->downstreamIdx = downstreamIdx;
6,556,058✔
800
  pParam->reUse = false;
6,556,058✔
801

802
  *ppRes = pParam;
6,556,058✔
803
  return code;
6,556,058✔
804

805
_return:
×
806
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
807
  freeOperatorParam(pParam, OP_GET_PARAM);
×
808
  freeExchangeGetBasicOperatorParam(&basic);
×
809
  return code;
×
810
}
811

812
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,798,367✔
813
  int32_t code = TSDB_CODE_SUCCESS;
3,798,367✔
814
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
3,798,367✔
815
  if (NULL == *ppRes) {
3,798,367✔
816
    code = terrno;
×
817
    return code;
×
818
  }
819
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,798,367✔
820
  if (NULL == (*ppRes)->pChildren) {
3,798,367✔
821
    code = terrno;
×
822
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
823
    *ppRes = NULL;
×
824
    return code;
×
825
  }
826
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,596,734✔
827
    code = terrno;
×
828
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
829
    *ppRes = NULL;
×
830
    return code;
×
831
  }
832
  *ppChild0 = NULL;
3,798,367✔
833
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,596,734✔
834
    code = terrno;
×
835
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
836
    *ppRes = NULL;
×
837
    return code;
×
838
  }
839
  *ppChild1 = NULL;
3,798,367✔
840
  
841
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,798,367✔
842
  if (NULL == pJoin) {
3,798,367✔
843
    code = terrno;
×
844
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
845
    *ppRes = NULL;
×
846
    return code;
×
847
  }
848

849
  pJoin->initDownstream = initParam;
3,798,367✔
850
  
851
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,798,367✔
852
  (*ppRes)->value = pJoin;
3,798,367✔
853
  (*ppRes)->reUse = false;
3,798,367✔
854

855
  return TSDB_CODE_SUCCESS;
3,798,367✔
856
}
857

858
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
859
  int32_t code = TSDB_CODE_SUCCESS;
×
860
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
×
861
  if (NULL == *ppRes) {
×
862
    code = terrno;
×
863
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
864
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
865
    return code;
×
866
  }
867
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
868
  if (NULL == *ppRes) {
×
869
    code = terrno;
×
870
    taosMemoryFreeClear(*ppRes);
×
871
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
872
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
873
    return code;
×
874
  }
875
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
876
    code = terrno;
×
877
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
878
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
879
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
880
    *ppRes = NULL;
×
881
    return code;
×
882
  }
883
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
884
    code = terrno;
×
885
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
886
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
887
    *ppRes = NULL;
×
888
    return code;
×
889
  }
890
  
891
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
892
  (*ppRes)->value = NULL;
×
893
  (*ppRes)->reUse = false;
×
894

895
  return TSDB_CODE_SUCCESS;
×
896
}
897

898
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
12,158✔
899
  int32_t code = TSDB_CODE_SUCCESS;
12,158✔
900
  int32_t vgNum = tSimpleHashGetSize(pVg);
12,158✔
901
  if (vgNum <= 0 || vgNum > 1) {
12,158✔
902
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
903
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
904
  }
905

906
  int32_t iter = 0;
12,158✔
907
  void* p = NULL;
12,158✔
908
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
24,316✔
909
    SArray* pUidList = *(SArray**)p;
12,158✔
910

911
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
12,158✔
912
    if (code) {
12,158✔
913
      return code;
×
914
    }
915
    taosArrayDestroy(pUidList);
12,158✔
916
    *(SArray**)p = NULL;
12,158✔
917
  }
918
  
919
  return TSDB_CODE_SUCCESS;
12,158✔
920
}
921

922
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
923
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
924
  if (NULL == pUidList) {
×
925
    return terrno;
×
926
  }
927
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
928
    return terrno;
×
929
  }
930

931
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
932
  taosArrayDestroy(pUidList);
×
933
  if (code) {
×
934
    return code;
×
935
  }
936
  
937
  return TSDB_CODE_SUCCESS;
×
938
}
939

940
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,798,367✔
941
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,798,367✔
942
  SOperatorParam*             pSrcParam0 = NULL;
3,798,367✔
943
  SOperatorParam*             pSrcParam1 = NULL;
3,798,367✔
944
  SOperatorParam*             pGcParam0 = NULL;
3,798,367✔
945
  SOperatorParam*             pGcParam1 = NULL;  
3,798,367✔
946
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,798,367✔
947
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,798,367✔
948
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,798,367✔
949
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,798,367✔
950
  int32_t                     code = TSDB_CODE_SUCCESS;
3,798,367✔
951

952
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,798,367✔
953
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
954

955
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,798,367✔
956
  
957
  if (pInfo->stbJoin.basic.batchFetch) {
3,798,367✔
958
    if (pPrev->leftHash) {
3,796,033✔
959
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
83,091✔
960
      if (TSDB_CODE_SUCCESS == code) {
83,091✔
961
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
83,091✔
962
      }
963
      if (TSDB_CODE_SUCCESS == code) {
83,091✔
964
        tSimpleHashCleanup(pPrev->leftHash);
83,091✔
965
        tSimpleHashCleanup(pPrev->rightHash);
83,091✔
966
        pPrev->leftHash = NULL;
83,091✔
967
        pPrev->rightHash = NULL;
83,091✔
968
      }
969
    }
970
  } else {
971
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,334✔
972
    if (TSDB_CODE_SUCCESS == code) {
2,334✔
973
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,334✔
974
    }
975
  }
976

977
  bool initParam = pSrcParam0 ? true : false;
3,798,367✔
978
  if (TSDB_CODE_SUCCESS == code) {
3,798,367✔
979
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,798,367✔
980
    pSrcParam0 = NULL;
3,798,367✔
981
  }
982
  if (TSDB_CODE_SUCCESS == code) {
3,798,367✔
983
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,798,367✔
984
    pSrcParam1 = NULL;
3,798,367✔
985
  }
986
  if (TSDB_CODE_SUCCESS == code) {
3,798,367✔
987
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,798,367✔
988
  }
989
  if (TSDB_CODE_SUCCESS != code) {
3,798,367✔
990
    if (pSrcParam0) {
×
991
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
992
    }
993
    if (pSrcParam1) {
×
994
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
995
    }
996
    if (pGcParam0) {
×
997
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
998
    }
999
    if (pGcParam1) {
×
1000
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
1001
    }
1002
    if (*ppParam) {
×
1003
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
1004
      *ppParam = NULL;
×
1005
    }
1006
  }
1007
  
1008
  return code;
3,798,367✔
1009
}
1010

1011
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
5,538,091✔
1012
  int32_t                   code = TSDB_CODE_SUCCESS;
5,538,091✔
1013
  int32_t                   lino = 0;
5,538,091✔
1014
  SVTableScanOperatorParam* pVScan = NULL;
5,538,091✔
1015
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
5,538,091✔
1016
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
5,538,091✔
1017

1018
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
5,538,091✔
1019
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
5,538,091✔
1020

1021
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
5,538,091✔
1022
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
5,538,091✔
1023
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
5,538,091✔
1024
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
5,538,091✔
1025
  pVScan->uid = uid;
5,538,091✔
1026
  pVScan->window = pInfo->vtbScan.window;
5,538,091✔
1027
  if (pInfo->vtbScan.refColGroups) {
5,538,091✔
1028
    pVScan->pRefColGroups = taosArrayInit(taosArrayGetSize(pInfo->vtbScan.refColGroups), sizeof(SRefColIdGroup));
4,972✔
1029
    QUERY_CHECK_NULL(pVScan->pRefColGroups, code, lino, _return, terrno)
4,972✔
1030
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->vtbScan.refColGroups); i++) {
13,024✔
1031
      SRefColIdGroup* pSrc = (SRefColIdGroup*)taosArrayGet(pInfo->vtbScan.refColGroups, i);
8,052✔
1032
      SRefColIdGroup  dst = {0};
8,052✔
1033
      QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
8,052✔
1034
      dst.pSlotIdList = taosArrayDup(pSrc->pSlotIdList, NULL);
8,052✔
1035
      QUERY_CHECK_NULL(dst.pSlotIdList, code, lino, _return, terrno)
8,052✔
1036
      void* px = taosArrayPush(pVScan->pRefColGroups, &dst);
8,052✔
1037
      if (NULL == px) {
8,052✔
1038
        taosArrayDestroy(dst.pSlotIdList);
×
1039
        dst.pSlotIdList = NULL;
×
1040
      }
1041
      QUERY_CHECK_NULL(px, code, lino, _return, terrno)
8,052✔
1042
    }
1043
  } else {
1044
    pVScan->pRefColGroups = NULL;
5,533,119✔
1045
  }
1046

1047
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
5,538,091✔
1048
  (*ppRes)->downstreamIdx = 0;
5,538,091✔
1049
  (*ppRes)->value = pVScan;
5,538,091✔
1050
  (*ppRes)->reUse = false;
5,538,091✔
1051

1052
  return TSDB_CODE_SUCCESS;
5,538,091✔
1053
_return:
×
1054
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1055
  if (pVScan) {
×
1056
    if (pVScan->pRefColGroups) {
×
1057
      taosArrayDestroyEx(pVScan->pRefColGroups, destroyRefColIdGroup);
×
1058
      pVScan->pRefColGroups = NULL;
×
1059
    }
1060
    taosArrayDestroy(pVScan->pOpParamArray);
×
1061
    taosMemoryFreeClear(pVScan);
×
1062
  }
1063
  if (*ppRes) {
×
1064
    taosArrayDestroy((*ppRes)->pChildren);
×
1065
    taosMemoryFreeClear(*ppRes);
×
1066
  }
1067
  return code;
×
1068
}
1069

1070
static int32_t addRefColIdToRefMap(SHashObj* refMap, const char* colrefName, col_id_t colId) {
23,672,821✔
1071
  int32_t  code = TSDB_CODE_SUCCESS;
23,672,821✔
1072
  int32_t  line = 0;
23,672,821✔
1073
  SArray** sameRefColIdList = NULL;
23,672,821✔
1074

1075
  if (colrefName == NULL || colrefName[0] == '\0') {
23,672,821✔
1076
    return code;
×
1077
  }
1078

1079
  sameRefColIdList = (SArray**)taosHashGet(refMap, colrefName, strlen(colrefName));
23,672,821✔
1080
  if (sameRefColIdList == NULL) {
23,672,821✔
1081
    SArray* list = taosArrayInit(2, sizeof(col_id_t));
23,660,468✔
1082
    QUERY_CHECK_NULL(list, code, line, _return, terrno)
23,660,468✔
1083
    QUERY_CHECK_CODE(taosHashPut(refMap, colrefName, strlen(colrefName), &list, POINTER_BYTES), line, _return);
23,660,468✔
1084
    sameRefColIdList = (SArray**)taosHashGet(refMap, colrefName, strlen(colrefName));
23,660,468✔
1085
    QUERY_CHECK_NULL(sameRefColIdList, code, line, _return, terrno)
23,660,468✔
1086
  }
1087

1088
  for (int32_t i = 0; i < taosArrayGetSize(*sameRefColIdList); i++) {
23,691,334✔
1089
    col_id_t existing = *(col_id_t*)taosArrayGet(*sameRefColIdList, i);
18,513✔
1090
    if (existing == colId) {
18,513✔
1091
      return code;
×
1092
    }
1093
  }
1094
  QUERY_CHECK_NULL(taosArrayPush(*sameRefColIdList, &colId), code, line, _return, terrno)
47,345,642✔
1095
  return code;
23,672,821✔
1096

1097
_return:
×
1098
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1099
  return code;
×
1100
}
1101

1102
static int32_t buildRefSlotGroupsFromRefMap(SHashObj* refMap, SArray* readColList, SArray** ppGroups) {
5,538,091✔
1103
  int32_t   code = TSDB_CODE_SUCCESS;
5,538,091✔
1104
  int32_t   line = 0;
5,538,091✔
1105
  SArray*   groups = NULL;
5,538,091✔
1106
  SHashObj* colIdToSlot = NULL;
5,538,091✔
1107

1108
  if (refMap == NULL || readColList == NULL) {
5,538,091✔
1109
    return code;
1,807,687✔
1110
  }
1111

1112
  if (*ppGroups) {
3,730,404✔
1113
    taosArrayDestroyEx(*ppGroups, destroyRefColIdGroup);
3,102✔
1114
    *ppGroups = NULL;
3,102✔
1115
  }
1116

1117
  colIdToSlot = taosHashInit(taosArrayGetSize(readColList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false,
3,730,404✔
1118
                             HASH_NO_LOCK);
1119
  QUERY_CHECK_NULL(colIdToSlot, code, line, _return, terrno)
3,730,404✔
1120

1121
  // Build a quick colId -> slotId lookup for columns actually read.
1122
  for (int32_t i = 0; i < taosArrayGetSize(readColList); i++) {
32,045,437✔
1123
    col_id_t colId = *(col_id_t*)taosArrayGet(readColList, i);
28,315,033✔
1124
    int32_t  slotId = i;
28,315,033✔
1125
    code = taosHashPut(colIdToSlot, &colId, sizeof(colId), &slotId, sizeof(slotId));
28,315,033✔
1126
    QUERY_CHECK_CODE(code, line, _return);
28,315,033✔
1127
  }
1128

1129
  groups = taosArrayInit(1, sizeof(SRefColIdGroup));
3,730,404✔
1130
  QUERY_CHECK_NULL(groups, code, line, _return, terrno)
3,730,404✔
1131

1132
  // Group columns that share the same ref name into slotId lists.
1133
  void* pIter = taosHashIterate(refMap, NULL);
3,730,404✔
1134
  while (pIter != NULL) {
27,390,872✔
1135
    SArray* pList = *(SArray**)pIter;  // colId list
23,660,468✔
1136
    if (pList && taosArrayGetSize(pList) > 1) {
23,660,468✔
1137
      SArray* slotList = taosArrayInit(taosArrayGetSize(pList), sizeof(int32_t));
7,425✔
1138
      QUERY_CHECK_NULL(slotList, code, line, _return, terrno)
7,425✔
1139
      for (int32_t i = 0; i < taosArrayGetSize(pList); i++) {
27,203✔
1140
        col_id_t colId = *(col_id_t*)taosArrayGet(pList, i);
19,778✔
1141
        int32_t* slotId = taosHashGet(colIdToSlot, &colId, sizeof(colId));
19,778✔
1142
        if (slotId) {
19,778✔
1143
          QUERY_CHECK_NULL(taosArrayPush(slotList, slotId), code, line, _return, terrno)
19,778✔
1144
        }
1145
      }
1146
      if (taosArrayGetSize(slotList) > 1) {
7,425✔
1147
        SRefColIdGroup g = {.pSlotIdList = slotList};
7,425✔
1148
        QUERY_CHECK_NULL(taosArrayPush(groups, &g), code, line, _return, terrno)
7,425✔
1149
      } else {
1150
        taosArrayDestroy(slotList);
×
1151
      }
1152
    }
1153
    if (pList) {
23,660,468✔
1154
      taosArrayDestroy(pList);
23,660,468✔
1155
    }
1156
    pIter = taosHashIterate(refMap, pIter);
23,660,468✔
1157
  }
1158

1159
  if (taosArrayGetSize(groups) == 0) {
3,730,404✔
1160
    taosArrayDestroy(groups);
3,726,059✔
1161
    groups = NULL;
3,726,059✔
1162
  }
1163

1164
_return:
4,345✔
1165
  if (code) {
3,730,404✔
1166
    qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1167
    if (groups) {
×
1168
      taosArrayDestroyEx(groups, destroyRefColIdGroup);
×
1169
    }
1170
  }
1171
  if (refMap) {
3,730,404✔
1172
    taosHashCleanup(refMap);
3,730,404✔
1173
  }
1174
  if (colIdToSlot) {
3,730,404✔
1175
    taosHashCleanup(colIdToSlot);
3,730,404✔
1176
  }
1177
  *ppGroups = groups;
3,730,404✔
1178
  return code;
3,730,404✔
1179
}
1180

1181
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
164,615,976✔
1182
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
164,615,976✔
1183
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
164,615,976✔
1184

1185
  if (pVtbScan->scanAllCols) {
164,615,976✔
1186
    return true;
12,865,998✔
1187
  }
1188

1189
  // if readColSet exists, use it to check whether colId is needed, otherwise use readColList
1190
  if (pVtbScan->readColSet) {
151,749,978✔
1191
    return taosHashGet(pVtbScan->readColSet, &colId, sizeof(colId)) != NULL;
151,749,978✔
1192
  }
1193

1194
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->readColList); i++) {
×
1195
    if (colId == *(col_id_t*)taosArrayGet(pVtbScan->readColList, i)) {
×
1196
      return true;
×
1197
    }
1198
  }
1199
  return false;
×
1200
}
1201

1202
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
739,017✔
1203
  int32_t                       code = TSDB_CODE_SUCCESS;
739,017✔
1204
  int32_t                       lino = 0;
739,017✔
1205
  SExternalWindowOperatorParam* pExtWinOp = NULL;
739,017✔
1206

1207
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
739,017✔
1208
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
739,017✔
1209

1210
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
739,017✔
1211
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
739,017✔
1212

1213
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
739,017✔
1214
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
739,017✔
1215

1216
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
739,017✔
1217
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
739,017✔
1218

1219
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
739,017✔
1220
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
739,017✔
1221

1222
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
739,017✔
1223
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
739,017✔
1224

1225
  SOperatorParam* pExchangeOperator = NULL;
739,017✔
1226
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
739,017✔
1227
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
739,017✔
1228
  QUERY_CHECK_CODE(code, lino, _return);
739,017✔
1229
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
1,478,034✔
1230

1231
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
739,017✔
1232
  (*ppRes)->downstreamIdx = idx;
739,017✔
1233
  (*ppRes)->value = pExtWinOp;
739,017✔
1234
  (*ppRes)->reUse = false;
739,017✔
1235

1236
  return code;
739,017✔
1237
_return:
×
1238
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1239
  if (pExtWinOp) {
×
1240
    if (pExtWinOp->ExtWins) {
×
1241
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1242
    }
1243
    taosMemoryFree(pExtWinOp);
×
1244
  }
1245
  if (*ppRes) {
×
1246
    if ((*ppRes)->pChildren) {
×
1247
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); ++i) {
×
1248
        SOperatorParam* pChildParam = taosArrayGetP((*ppRes)->pChildren, i);
×
1249
        if (pChildParam) {
×
1250
          freeOperatorParam(pChildParam, OP_GET_PARAM);
×
1251
        }
1252
      }
1253
      taosArrayDestroy((*ppRes)->pChildren);
×
1254
    }
1255
    taosMemoryFree(*ppRes);
×
1256
    *ppRes = NULL;
×
1257
  }
1258
  return code;
×
1259
}
1260

1261
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
307,314✔
1262
                                       int32_t numOfDownstream, int32_t numOfWins) {
1263
  int32_t                   code = TSDB_CODE_SUCCESS;
307,314✔
1264
  int32_t                   lino = 0;
307,314✔
1265
  SMergeOperatorParam*      pMergeOp = NULL;
307,314✔
1266

1267
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
307,314✔
1268
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
307,314✔
1269

1270
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
307,314✔
1271
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
307,314✔
1272

1273
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
307,314✔
1274
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
307,314✔
1275

1276
  pMergeOp->winNum = numOfWins;
307,314✔
1277

1278
  for (int32_t i = 0; i < numOfDownstream; i++) {
1,046,331✔
1279
    SOperatorParam* pExternalWinParam = NULL;
739,017✔
1280
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
739,017✔
1281
    QUERY_CHECK_CODE(code, lino, _return);
739,017✔
1282
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
1,478,034✔
1283
  }
1284

1285
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
307,314✔
1286
  (*ppRes)->downstreamIdx = 0;
307,314✔
1287
  (*ppRes)->value = pMergeOp;
307,314✔
1288
  (*ppRes)->reUse = false;
307,314✔
1289

1290
  return TSDB_CODE_SUCCESS;
307,314✔
1291
_return:
×
1292
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1293
  if (pMergeOp) {
×
1294
    taosMemoryFree(pMergeOp);
×
1295
  }
1296
  if (*ppRes) {
×
1297
    if ((*ppRes)->pChildren) {
×
1298
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1299
        SOperatorParam* pChildParam = taosArrayGetP((*ppRes)->pChildren, i);
×
1300
        if (pChildParam) {
×
1301
          freeOperatorParam(pChildParam, OP_GET_PARAM);
×
1302
        }
1303
      }
1304
      taosArrayDestroy((*ppRes)->pChildren);
×
1305
    }
1306
    taosMemoryFree(*ppRes);
×
1307
    *ppRes = NULL;
×
1308
  }
1309
  return code;
×
1310
}
1311

1312
/*
1313
 * Build merge operator params for vtable ts-scan mode.
1314
 *
1315
 * @param pInfo Dynamic-query control operator runtime info.
1316
 * @param ppRes Output merge operator param.
1317
 *
1318
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1319
 */
1320
static int32_t buildMergeOperatorParamForTsScan(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
33,404✔
1321
  int32_t                   code = TSDB_CODE_SUCCESS;
33,404✔
1322
  int32_t                   lino = 0;
33,404✔
1323
  SOperatorParam*           pParam = NULL;
33,404✔
1324
  SOperatorParam*           pExchangeParam = NULL;
33,404✔
1325
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
33,404✔
1326

1327
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
33,404✔
1328
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
33,404✔
1329

1330
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
33,404✔
1331
  pParam->downstreamIdx = 0;
33,404✔
1332
  pParam->reUse = false;
33,404✔
1333
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
33,404✔
1334
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
33,404✔
1335

1336
  pParam->value = taosMemoryMalloc(sizeof(SMergeOperatorParam));
33,404✔
1337
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
33,404✔
1338

1339
  for (int32_t i = 0; i < taosHashGetSize(pVtbScan->otbVgIdToOtbInfoArrayMap); i++) {
100,212✔
1340
    code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, i, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap,
66,808✔
1341
                                                     (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN},
66,808✔
1342
                                                     EX_SRC_TYPE_VSTB_TS_SCAN,
1343
                                                     QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN);
1344
    QUERY_CHECK_CODE(code, lino, _return);
66,808✔
1345
    QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
133,616✔
1346
    pExchangeParam = NULL;
66,808✔
1347
  }
1348

1349
  *ppRes = pParam;
33,404✔
1350

1351
  return code;
33,404✔
1352
_return:
×
1353
  if (pExchangeParam) {
×
1354
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1355
  }
1356
  if (pParam) {
×
1357
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1358
  }
1359
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1360
  return code;
×
1361
}
1362

1363
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
130,041✔
1364
  int32_t                   code = TSDB_CODE_SUCCESS;
130,041✔
1365
  int32_t                   lino = 0;
130,041✔
1366
  SOperatorParam*           pParam = NULL;
130,041✔
1367
  SOperatorParam*           pExchangeParam = NULL;
130,041✔
1368
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
130,041✔
1369
  bool                      freeExchange = false;
130,041✔
1370

1371
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
130,041✔
1372
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
130,041✔
1373

1374
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
130,041✔
1375
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
130,041✔
1376

1377
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
130,041✔
1378
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
130,041✔
1379

1380
  code = buildBatchExchangeOperatorParamForVirtual(
130,041✔
1381
      &pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap,
1382
      (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN,
130,041✔
1383
      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
1384
  QUERY_CHECK_CODE(code, lino, _return);
130,041✔
1385

1386
  freeExchange = true;
130,041✔
1387

1388
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
260,082✔
1389

1390
  freeExchange = false;
130,041✔
1391

1392
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
130,041✔
1393
  pParam->downstreamIdx = 0;
130,041✔
1394
  pParam->reUse = false;
130,041✔
1395

1396
  *ppRes = pParam;
130,041✔
1397

1398
  return code;
130,041✔
1399
_return:
×
1400
  if (freeExchange) {
×
1401
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1402
  }
1403
  if (pParam) {
×
1404
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1405
  }
1406
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1407
  return code;
×
1408
}
1409

1410
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid,
3,812,710✔
1411
                                                SOperatorParam** ppRes) {
1412
  int32_t                   code = TSDB_CODE_SUCCESS;
3,812,710✔
1413
  int32_t                   lino = 0;
3,812,710✔
1414
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,812,710✔
1415
  SOperatorParam*           pParam = NULL;
3,812,710✔
1416
  SOperatorParam*           pExchangeParam = NULL;
3,812,710✔
1417
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
3,812,710✔
1418
  bool                      freeExchange = false;
3,812,710✔
1419
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
3,812,710✔
1420

1421
  if (!pIter) {
3,812,710✔
1422
    *ppRes = NULL;
522,830✔
1423
    return code;
522,830✔
1424
  }
1425

1426
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
3,289,880✔
1427

1428
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
3,289,880✔
1429
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
3,289,880✔
1430

1431
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
3,289,880✔
1432
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
3,289,880✔
1433

1434
  code = buildBatchExchangeOperatorParamForVirtual(
3,289,880✔
1435
      &pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap,
1436
      (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN,
3,289,880✔
1437
      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
1438
  QUERY_CHECK_CODE(code, lino, _return);
3,289,880✔
1439

1440
  freeExchange = true;
3,289,880✔
1441

1442
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
6,579,760✔
1443

1444
  freeExchange = false;
3,289,880✔
1445

1446
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3,289,880✔
1447
  pParam->downstreamIdx = 0;
3,289,880✔
1448
  pParam->value = NULL;
3,289,880✔
1449
  pParam->reUse = false;
3,289,880✔
1450

1451
  *ppRes = pParam;
3,289,880✔
1452

1453
  return code;
3,289,880✔
1454
_return:
×
1455
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1456
  if (freeExchange) {
×
1457
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1458
  }
1459
  if (pParam) {
×
1460
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1461
  }
1462
  return code;
×
1463
}
1464

1465
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid,
3,475,560✔
1466
                                                   uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
1467
  int32_t                   code = TSDB_CODE_SUCCESS;
3,475,560✔
1468
  int32_t                   lino = 0;
3,475,560✔
1469
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,475,560✔
1470
  SOperatorParam*           pParam = NULL;
3,475,560✔
1471
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
3,475,560✔
1472
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
3,475,560✔
1473

1474
  if (pIter) {
3,475,560✔
1475
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
2,785,640✔
1476

1477
    code = buildBatchExchangeOperatorParamForVirtual(
2,785,640✔
1478
        &pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap,
1479
        (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN,
2,785,640✔
1480
        QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
1481
    QUERY_CHECK_CODE(code, lino, _return);
2,785,640✔
1482

1483
    *ppRes = pParam;
2,785,640✔
1484
  } else {
1485
    *ppRes = NULL;
689,920✔
1486
  }
1487

1488
  return code;
3,475,560✔
1489
_return:
×
1490
  if (pParam) {
×
1491
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1492
  }
1493
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1494
  return code;
×
1495
}
1496

1497
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,798,367✔
1498
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,798,367✔
1499
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,798,367✔
1500
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,798,367✔
1501
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,798,367✔
1502
  SOperatorParam*            pParam = NULL;
3,798,367✔
1503
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,798,367✔
1504
  if (TSDB_CODE_SUCCESS != code) {
3,798,367✔
1505
    pOperator->pTaskInfo->code = code;
×
1506
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1507
  }
1508

1509
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,798,367✔
1510
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,798,367✔
1511
  if (*ppRes && (code == 0)) {
3,798,367✔
1512
    code = blockDataCheck(*ppRes);
222,483✔
1513
    if (code) {
222,483✔
1514
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1515
      pOperator->pTaskInfo->code = code;
×
1516
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1517
    }
1518
    pPost->isStarted = true;
222,483✔
1519
    pStbJoin->execInfo.postBlkNum++;
222,483✔
1520
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
222,483✔
1521
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
222,483✔
1522
  } else {
1523
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,575,884✔
1524
  }
1525
}
3,798,367✔
1526

1527

1528
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1529
  SOperatorParam* pGcParam = NULL;
×
1530
  SOperatorParam* pMergeJoinParam = NULL;
×
1531
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1532
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1533
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1534

1535
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1536

1537
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1538
  if (TSDB_CODE_SUCCESS != code) {
×
1539
    return code;
×
1540
  }
1541
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1542
  if (TSDB_CODE_SUCCESS != code) {
×
1543
    return code;
×
1544
  }
1545

1546
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1547
}
1548

1549
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,797,849✔
1550
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,797,849✔
1551
  int32_t code = 0;
3,797,849✔
1552
  
1553
  pPost->isStarted = false;
3,797,849✔
1554
  
1555
  if (pStbJoin->basic.batchFetch) {
3,797,849✔
1556
    return TSDB_CODE_SUCCESS;
3,795,515✔
1557
  }
1558
  
1559
  if (pPost->leftNeedCache) {
2,334✔
1560
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1561
    if (num && --(*num) <= 0) {
×
1562
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1563
      if (code) {
×
1564
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1565
        QRY_ERR_RET(code);
×
1566
      }
1567
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1568
    }
1569
  }
1570
  
1571
  if (!pPost->rightNeedCache) {
2,334✔
1572
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,334✔
1573
    if (NULL != v) {
2,334✔
1574
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1575
      if (code) {
×
1576
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1577
        QRY_ERR_RET(code);
×
1578
      }
1579
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1580
    }
1581
  }
1582

1583
  return TSDB_CODE_SUCCESS;
2,334✔
1584
}
1585

1586

1587
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1588
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
306,223✔
1589
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
306,223✔
1590
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
306,223✔
1591

1592
  if (!pPost->isStarted) {
306,223✔
1593
    return TSDB_CODE_SUCCESS;
84,258✔
1594
  }
1595
  
1596
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
221,965✔
1597
  
1598
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
221,965✔
1599
  if (NULL == *ppRes) {
221,965✔
1600
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
221,965✔
1601
    pPrev->pListHead->readIdx++;
221,965✔
1602
  } else {
1603
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1604
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1605
  }
1606

1607
  return TSDB_CODE_SUCCESS;
221,965✔
1608
}
1609

1610
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1611
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,596,210✔
1612
  if (NULL == ppArray) {
7,596,210✔
1613
    SArray* pArray = taosArrayInit(10, valSize);
272,798✔
1614
    if (NULL == pArray) {
272,798✔
1615
      return terrno;
×
1616
    }
1617
    if (NULL == taosArrayPush(pArray, pVal)) {
545,596✔
1618
      taosArrayDestroy(pArray);
×
1619
      return terrno;
×
1620
    }
1621
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
272,798✔
1622
      taosArrayDestroy(pArray);      
×
1623
      return terrno;
×
1624
    }
1625
    return TSDB_CODE_SUCCESS;
272,798✔
1626
  }
1627

1628
  if (NULL == taosArrayPush(*ppArray, pVal)) {
14,646,824✔
1629
    return terrno;
×
1630
  }
1631
  
1632
  return TSDB_CODE_SUCCESS;
7,323,412✔
1633
}
1634

1635
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1636
  int32_t code = TSDB_CODE_SUCCESS;
2,334✔
1637
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,334✔
1638
  if (NULL == pNum) {
2,334✔
1639
    uint32_t n = 1;
2,334✔
1640
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,334✔
1641
    if (code) {
2,334✔
1642
      return code;
×
1643
    }
1644
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,334✔
1645
    if (code) {
2,334✔
1646
      return code;
×
1647
    }
1648
    return TSDB_CODE_SUCCESS;
2,334✔
1649
  }
1650

1651
  switch (*pNum) {
×
1652
    case 0:
×
1653
      break;
×
1654
    case UINT32_MAX:
×
1655
      *pNum = 0;
×
1656
      break;
×
1657
    default:
×
1658
      if (1 == (*pNum)) {
×
1659
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1660
        if (code) {
×
1661
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1662
          QRY_ERR_RET(code);
×
1663
        }
1664
      }
1665
      (*pNum)++;
×
1666
      break;
×
1667
  }
1668
  
1669
  return TSDB_CODE_SUCCESS;
×
1670
}
1671

1672

1673
static void freeStbJoinTableList(SStbJoinTableList* pList) {
83,740✔
1674
  if (NULL == pList) {
83,740✔
1675
    return;
×
1676
  }
1677
  taosMemoryFree(pList->pLeftVg);
83,740✔
1678
  taosMemoryFree(pList->pLeftUid);
83,740✔
1679
  taosMemoryFree(pList->pRightVg);
83,740✔
1680
  taosMemoryFree(pList->pRightUid);
83,740✔
1681
  taosMemoryFree(pList);
83,740✔
1682
}
1683

1684
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
84,258✔
1685
  int32_t code = TSDB_CODE_SUCCESS;
84,258✔
1686
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
84,258✔
1687
  if (NULL == pNew) {
84,258✔
1688
    return terrno;
×
1689
  }
1690
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
84,258✔
1691
  if (NULL == pNew->pLeftVg) {
84,258✔
1692
    code = terrno;
×
1693
    freeStbJoinTableList(pNew);
×
1694
    return code;
×
1695
  }
1696
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
84,258✔
1697
  if (NULL == pNew->pLeftUid) {
84,258✔
1698
    code = terrno;
×
1699
    freeStbJoinTableList(pNew);
×
1700
    return code;
×
1701
  }
1702
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
84,258✔
1703
  if (NULL == pNew->pRightVg) {
84,258✔
1704
    code = terrno;
×
1705
    freeStbJoinTableList(pNew);
×
1706
    return code;
×
1707
  }
1708
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
84,258✔
1709
  if (NULL == pNew->pRightUid) {
84,258✔
1710
    code = terrno;
×
1711
    freeStbJoinTableList(pNew);
×
1712
    return code;
×
1713
  }
1714

1715
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
84,258✔
1716
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
84,258✔
1717
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
84,258✔
1718
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
84,258✔
1719

1720
  pNew->readIdx = 0;
84,258✔
1721
  pNew->uidNum = rows;
84,258✔
1722
  pNew->pNext = NULL;
84,258✔
1723
  
1724
  if (pCtx->pListTail) {
84,258✔
1725
    pCtx->pListTail->pNext = pNew;
×
1726
    pCtx->pListTail = pNew;
×
1727
  } else {
1728
    pCtx->pListHead = pNew;
84,258✔
1729
    pCtx->pListTail= pNew;
84,258✔
1730
  }
1731

1732
  return TSDB_CODE_SUCCESS;
84,258✔
1733
}
1734

1735
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
84,258✔
1736
  int32_t                    code = TSDB_CODE_SUCCESS;
84,258✔
1737
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
84,258✔
1738
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
84,258✔
1739
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
84,258✔
1740
  if (NULL == pVg0) {
84,258✔
1741
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1742
  }
1743
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
84,258✔
1744
  if (NULL == pVg1) {
84,258✔
1745
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1746
  }
1747
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
84,258✔
1748
  if (NULL == pUid0) {
84,258✔
1749
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1750
  }
1751
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
84,258✔
1752
  if (NULL == pUid1) {
84,258✔
1753
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1754
  }
1755

1756
  if (pStbJoin->basic.batchFetch) {
84,258✔
1757
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,881,196✔
1758
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,798,105✔
1759
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,798,105✔
1760
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,798,105✔
1761
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,798,105✔
1762

1763
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,798,105✔
1764
      if (TSDB_CODE_SUCCESS != code) {
3,798,105✔
1765
        break;
×
1766
      }
1767
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,798,105✔
1768
      if (TSDB_CODE_SUCCESS != code) {
3,798,105✔
1769
        break;
×
1770
      }
1771
    }
1772
  } else {
1773
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,501✔
1774
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,334✔
1775
    
1776
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,334✔
1777
      if (TSDB_CODE_SUCCESS != code) {
2,334✔
1778
        break;
×
1779
      }
1780
    }
1781
  }
1782

1783
  if (TSDB_CODE_SUCCESS == code) {
84,258✔
1784
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
84,258✔
1785
    if (TSDB_CODE_SUCCESS == code) {
84,258✔
1786
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
84,258✔
1787
    }
1788
  }
1789

1790
_return:
×
1791

1792
  if (TSDB_CODE_SUCCESS != code) {
84,258✔
1793
    pOperator->pTaskInfo->code = code;
×
1794
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1795
  }
1796
}
84,258✔
1797

1798

1799
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,207,635✔
1800
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,207,635✔
1801
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,207,635✔
1802

1803
  if (pStbJoin->basic.batchFetch) {
1,207,635✔
1804
    return;
1,206,468✔
1805
  }
1806

1807
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,167✔
1808
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,167✔
1809
    return;
1,167✔
1810
  }
1811

1812
  uint64_t* pUid = NULL;
×
1813
  int32_t iter = 0;
×
1814
  int32_t code = 0;
×
1815
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1816
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1817
    if (code) {
×
1818
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1819
    }
1820
  }
1821

1822
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1823
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1824

1825
/*
1826
  // debug only
1827
  iter = 0;
1828
  uint32_t* num = NULL;
1829
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1830
    A S S E R T(*num > 1);
1831
  }
1832
*/  
1833
}
1834

1835
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,207,635✔
1836
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,207,635✔
1837
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,207,635✔
1838

1839
  while (true) {
84,258✔
1840
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,291,893✔
1841
    if (NULL == pBlock) {
1,291,893✔
1842
      break;
1,207,635✔
1843
    }
1844

1845
    pStbJoin->execInfo.prevBlkNum++;
84,258✔
1846
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
84,258✔
1847
    
1848
    doBuildStbJoinTableHash(pOperator, pBlock);
84,258✔
1849
  }
1850

1851
  postProcessStbJoinTableHash(pOperator);
1,207,635✔
1852

1853
  pStbJoin->ctx.prev.joinBuild = true;
1,207,635✔
1854
}
1,207,635✔
1855

1856
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
306,223✔
1857
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
306,223✔
1858
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
306,223✔
1859
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
306,223✔
1860
  SStbJoinTableList*         pNode = pPrev->pListHead;
306,223✔
1861

1862
  while (pNode) {
3,965,847✔
1863
    if (pNode->readIdx >= pNode->uidNum) {
3,882,107✔
1864
      pPrev->pListHead = pNode->pNext;
83,740✔
1865
      freeStbJoinTableList(pNode);
83,740✔
1866
      pNode = pPrev->pListHead;
83,740✔
1867
      continue;
83,740✔
1868
    }
1869
    
1870
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,798,367✔
1871
    if (*ppRes) {
3,798,367✔
1872
      return TSDB_CODE_SUCCESS;
222,483✔
1873
    }
1874

1875
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,575,884✔
1876
    pPrev->pListHead->readIdx++;
3,575,884✔
1877
  }
1878

1879
  *ppRes = NULL;
83,740✔
1880
  setOperatorCompleted(pOperator);
83,740✔
1881

1882
  return TSDB_CODE_SUCCESS;
83,740✔
1883
}
1884

1885
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,429,600✔
1886
  if (pBlock) {
1,429,600✔
1887
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
222,483✔
1888
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
222,483✔
1889
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
222,483✔
1890

1891
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
224,817✔
1892
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,334✔
1893
        if (pSlot == NULL) {
2,334✔
1894
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1895
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1896
        }
1897
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,334✔
1898
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,334✔
1899
        if (code != TSDB_CODE_SUCCESS) {
2,334✔
1900
          return code;
×
1901
        }
1902
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,334✔
1903
        if (code != TSDB_CODE_SUCCESS) {
2,334✔
1904
          return code;
×
1905
        }
1906
      }
1907
    } else {
1908
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1909
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1910
    }
1911
  }
1912
  return TSDB_CODE_SUCCESS;
1,429,600✔
1913
}
1914

1915
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,461,534✔
1916
  int32_t                    code = TSDB_CODE_SUCCESS;
1,461,534✔
1917
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,461,534✔
1918
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,461,534✔
1919

1920
  QRY_PARAM_CHECK(pRes);
1,461,534✔
1921
  if (pOperator->status == OP_EXEC_DONE) {
1,461,534✔
1922
    return code;
31,934✔
1923
  }
1924

1925
  int64_t st = 0;
1,429,600✔
1926
  if (pOperator->cost.openCost == 0) {
1,429,600✔
1927
    st = taosGetTimestampUs();
1,207,099✔
1928
  }
1929

1930
  if (!pStbJoin->ctx.prev.joinBuild) {
1,429,600✔
1931
    buildStbJoinTableList(pOperator);
1,207,635✔
1932
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,207,635✔
1933
      setOperatorCompleted(pOperator);
1,123,377✔
1934
      goto _return;
1,123,377✔
1935
    }
1936
  }
1937

1938
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
306,223✔
1939
  if (*pRes) {
306,223✔
1940
    goto _return;
×
1941
  }
1942

1943
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
306,223✔
1944

1945
_return:
306,223✔
1946
  if (pOperator->cost.openCost == 0) {
1,429,600✔
1947
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,207,099✔
1948
  }
1949

1950
  if (code) {
1,429,600✔
1951
    qError("%s failed since %s", __func__, tstrerror(code));
×
1952
    pOperator->pTaskInfo->code = code;
×
1953
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1954
  } else {
1955
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,429,600✔
1956
  }
1957
  return code;
1,429,600✔
1958
}
1959

1960
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
3,297,223✔
1961
  int32_t                    lino = 0;
3,297,223✔
1962
  SOperatorInfo*             operator=(SOperatorInfo*) param;
3,297,223✔
1963
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
3,297,223✔
1964

1965
  if (TSDB_CODE_SUCCESS != code) {
3,297,223✔
1966
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1967
    if (operator->pTaskInfo->code != code) {
×
1968
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1969
             tstrerror(operator->pTaskInfo->code));
1970
    } else {
1971
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1972
    }
1973
    goto _return;
×
1974
  }
1975

1976
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
3,297,223✔
1977
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
3,297,223✔
1978

1979
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
3,297,223✔
1980
  QUERY_CHECK_CODE(code, lino, _return);
3,297,223✔
1981

1982
  taosMemoryFreeClear(pMsg->pData);
3,297,223✔
1983

1984
  code = tsem_post(&pScanResInfo->vtbScan.ready);
3,297,223✔
1985
  QUERY_CHECK_CODE(code, lino, _return);
3,297,223✔
1986

1987
  return code;
3,297,223✔
1988
_return:
×
1989
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1990
  return code;
×
1991
}
1992

1993
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
3,297,223✔
1994
  int32_t                    code = TSDB_CODE_SUCCESS;
3,297,223✔
1995
  int32_t                    lino = 0;
3,297,223✔
1996
  char*                      buf1 = NULL;
3,297,223✔
1997
  SUseDbReq*                 pReq = NULL;
3,297,223✔
1998
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
3,297,223✔
1999

2000
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
3,297,223✔
2001
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
3,297,223✔
2002
  code = tNameGetFullDbName(name, pReq->db);
3,297,223✔
2003
  QUERY_CHECK_CODE(code, lino, _return);
3,297,223✔
2004
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
3,297,223✔
2005
  buf1 = taosMemoryCalloc(1, contLen);
3,297,223✔
2006
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
3,297,223✔
2007
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
3,297,223✔
2008
  if (tempRes < 0) {
3,297,223✔
2009
    QUERY_CHECK_CODE(terrno, lino, _return);
×
2010
  }
2011

2012
  // send the fetch remote task result request
2013
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,297,223✔
2014
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
3,297,223✔
2015

2016
  pMsgSendInfo->param = pOperator;
3,297,223✔
2017
  pMsgSendInfo->msgInfo.pData = buf1;
3,297,223✔
2018
  pMsgSendInfo->msgInfo.len = contLen;
3,297,223✔
2019
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
3,297,223✔
2020
  pMsgSendInfo->fp = dynProcessUseDbRsp;
3,297,223✔
2021
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
3,297,223✔
2022

2023
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
3,297,223✔
2024
  QUERY_CHECK_CODE(code, lino, _return);
3,297,223✔
2025

2026
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
3,297,223✔
2027
  QUERY_CHECK_CODE(code, lino, _return);
3,297,223✔
2028

2029
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
3,297,223✔
2030
  QUERY_CHECK_CODE(code, lino, _return);
3,297,223✔
2031

2032
_return:
3,297,223✔
2033
  if (code) {
3,297,223✔
2034
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2035
     taosMemoryFree(buf1);
×
2036
  }
2037
  taosMemoryFree(pReq);
3,297,223✔
2038
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
3,297,223✔
2039
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
3,297,223✔
2040
  return code;
3,297,223✔
2041
}
2042

2043
int dynVgInfoComp(const void* lp, const void* rp) {
6,582,908✔
2044
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
6,582,908✔
2045
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
6,582,908✔
2046
  if (pLeft->hashBegin < pRight->hashBegin) {
6,582,908✔
2047
    return -1;
6,582,908✔
2048
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
2049
    return 1;
×
2050
  }
2051

2052
  return 0;
×
2053
}
2054

2055
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
19,621,260✔
2056
  if (NULL == dbInfo) {
19,621,260✔
2057
    return TSDB_CODE_SUCCESS;
×
2058
  }
2059

2060
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
19,621,260✔
2061
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
3,297,223✔
2062
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
3,297,223✔
2063
    if (NULL == dbInfo->vgArray) {
3,297,223✔
2064
      return terrno;
×
2065
    }
2066

2067
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
3,297,223✔
2068
    while (pIter) {
9,885,900✔
2069
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
13,177,354✔
2070
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
2071
        return terrno;
×
2072
      }
2073

2074
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
6,588,677✔
2075
    }
2076

2077
    taosArraySort(dbInfo->vgArray, sort_func);
3,297,223✔
2078
  }
2079

2080
  return TSDB_CODE_SUCCESS;
19,621,260✔
2081
}
2082

2083
int32_t dynHashValueComp(void const* lp, void const* rp) {
29,577,220✔
2084
  uint32_t*    key = (uint32_t*)lp;
29,577,220✔
2085
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
29,577,220✔
2086

2087
  if (*key < pVg->hashBegin) {
29,577,220✔
2088
    return -1;
×
2089
  } else if (*key > pVg->hashEnd) {
29,577,220✔
2090
    return 1;
9,955,960✔
2091
  }
2092

2093
  return 0;
19,621,260✔
2094
}
2095

2096
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
19,621,260✔
2097
  int32_t code = 0;
19,621,260✔
2098
  int32_t lino = 0;
19,621,260✔
2099
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
19,621,260✔
2100
  QUERY_CHECK_CODE(code, lino, _return);
19,621,260✔
2101

2102
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
19,621,260✔
2103
  if (vgNum <= 0) {
19,621,260✔
2104
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
2105
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
2106
  }
2107

2108
  SVgroupInfo* vgInfo = NULL;
19,621,260✔
2109
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
19,621,260✔
2110
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
19,621,260✔
2111
  int32_t offset = (int32_t)strlen(tbFullName);
19,621,260✔
2112

2113
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
19,621,260✔
2114
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
39,242,520✔
2115
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
19,621,260✔
2116

2117
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
19,621,260✔
2118
  if (NULL == vgInfo) {
19,621,260✔
2119
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
2120
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
2121
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
2122
  }
2123

2124
  *vgId = vgInfo->vgId;
19,621,260✔
2125

2126
_return:
19,621,260✔
2127
  return code;
19,621,260✔
2128
}
2129

2130
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
55,704,826✔
2131
  int32_t                    code = TSDB_CODE_SUCCESS;
55,704,826✔
2132
  int32_t                    line = 0;
55,704,826✔
2133
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
55,704,826✔
2134
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
55,704,826✔
2135
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
55,704,826✔
2136
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
55,704,826✔
2137
  SUseDbOutput*              output = NULL;
55,704,826✔
2138
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
55,704,826✔
2139

2140
  QRY_PARAM_CHECK(dbVgInfo);
55,704,826✔
2141

2142
  if (find == NULL) {
55,704,826✔
2143
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
3,297,223✔
2144
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
3,297,223✔
2145
    QUERY_CHECK_CODE(code, line, _return);
3,297,223✔
2146
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
3,297,223✔
2147
    QUERY_CHECK_CODE(code, line, _return);
3,297,223✔
2148
  } else {
2149
    output = *find;
52,407,603✔
2150
  }
2151

2152
  *dbVgInfo = output->dbVgroup;
55,704,826✔
2153
  return code;
55,704,826✔
2154
_return:
×
2155
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2156
  freeUseDbOutput(output);
×
2157
  return code;
×
2158
}
2159

2160
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
55,704,826✔
2161
  int32_t     code = TSDB_CODE_SUCCESS;
55,704,826✔
2162
  int32_t     line = 0;
55,704,826✔
2163

2164
  const char *first_dot = strchr(colref, '.');
55,704,826✔
2165
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
55,704,826✔
2166

2167
  const char *second_dot = strchr(first_dot + 1, '.');
55,704,826✔
2168
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
55,704,826✔
2169

2170
  size_t db_len = first_dot - colref;
55,704,826✔
2171
  size_t table_len = second_dot - first_dot - 1;
55,704,826✔
2172
  size_t col_len = strlen(second_dot + 1);
55,704,826✔
2173

2174
  *refDb = taosMemoryMalloc(db_len + 1);
55,704,826✔
2175
  *refTb = taosMemoryMalloc(table_len + 1);
55,704,826✔
2176
  *refCol = taosMemoryMalloc(col_len + 1);
55,704,826✔
2177
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
55,704,826✔
2178
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
55,704,826✔
2179
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
55,704,826✔
2180

2181
  tstrncpy(*refDb, colref, db_len + 1);
55,704,826✔
2182
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
55,704,826✔
2183
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
55,704,826✔
2184

2185
  return TSDB_CODE_SUCCESS;
55,704,826✔
2186
_return:
×
2187
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2188
  if (*refDb) {
×
2189
    taosMemoryFree(*refDb);
×
2190
    *refDb = NULL;
×
2191
  }
2192
  if (*refTb) {
×
2193
    taosMemoryFree(*refTb);
×
2194
    *refTb = NULL;
×
2195
  }
2196
  if (*refCol) {
×
2197
    taosMemoryFree(*refCol);
×
2198
    *refCol = NULL;
×
2199
  }
2200
  return code;
×
2201
}
2202

2203
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
412,008,698✔
2204
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
412,008,698✔
2205
      strlen(expectTbName) == varDataLen(tbName) &&
271,140,460✔
2206
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
271,140,460✔
2207
      strlen(expectDbName) == varDataLen(dbName)) {
271,140,460✔
2208
    return true;
271,140,460✔
2209
  }
2210
  return false;
140,868,456✔
2211
}
2212

2213
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
271,140,460✔
2214
  int32_t          code = TSDB_CODE_SUCCESS;
271,140,460✔
2215
  int32_t          line = 0;
271,140,460✔
2216

2217
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
271,140,460✔
2218
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
271,140,460✔
2219
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
271,140,460✔
2220
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
271,140,460✔
2221
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
271,140,460✔
2222
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
271,140,460✔
2223

2224
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
271,140,460✔
2225
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
271,140,460✔
2226
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
271,140,460✔
2227
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
271,140,460✔
2228
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
271,140,460✔
2229
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
271,140,460✔
2230

2231
  if (colDataIsNull_s(pRefCol, index)) {
542,280,920✔
2232
    pInfo->colrefName = NULL;
106,518,162✔
2233
  } else {
2234
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
164,622,298✔
2235
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
164,622,298✔
2236
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
164,622,298✔
2237
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
164,622,298✔
2238
  }
2239

2240
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
271,140,460✔
2241
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
271,140,460✔
2242
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
271,140,460✔
2243
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
271,140,460✔
2244

2245
  if (!colDataIsNull_s(pUidCol, index)) {
542,280,920✔
2246
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
271,140,460✔
2247
  }
2248
  if (!colDataIsNull_s(pColIdCol, index)) {
542,280,920✔
2249
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
164,622,298✔
2250
  }
2251
  if (!colDataIsNull_s(pVgIdCol, index)) {
542,280,920✔
2252
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
271,140,460✔
2253
  }
2254

2255
_return:
×
2256
  return code;
271,140,460✔
2257
}
2258

2259
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,539,872✔
2260
  int32_t                    code = TSDB_CODE_SUCCESS;
1,539,872✔
2261
  int32_t                    line = 0;
1,539,872✔
2262

2263
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,539,872✔
2264
    return code;
1,406,782✔
2265
  }
2266

2267
  if (pVtbScan->existOrgTbVg == NULL) {
133,090✔
2268
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2269
    pVtbScan->curOrgTbVg = NULL;
×
2270
  }
2271

2272
  if (pVtbScan->curOrgTbVg != NULL) {
133,090✔
2273
    // which means rversion has changed
2274
    void*   pCurIter = NULL;
10,148✔
2275
    SArray* tmpArray = NULL;
10,148✔
2276
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
29,234✔
2277
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
19,086✔
2278
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
19,086✔
2279
        if (tmpArray == NULL) {
2,398✔
2280
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,398✔
2281
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,398✔
2282
        }
2283
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,398✔
2284
      }
2285
    }
2286
    if (tmpArray == NULL) {
10,148✔
2287
      return TSDB_CODE_SUCCESS;
7,750✔
2288
    }
2289
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,398✔
2290
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,398✔
2291
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,398✔
2292
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2293
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2294
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2295
        }
2296
        taosArrayDestroy(expiredInfo);
×
2297
      }
2298
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,398✔
2299
        taosArrayDestroy(tmpArray);
×
2300
      }
2301
    }
2302
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,398✔
2303
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,398✔
2304
    taosHashClear(pVtbScan->curOrgTbVg);
2,398✔
2305
    pVtbScan->needRedeploy = true;
2,398✔
2306
    pVtbScan->rversion = rversion;
2,398✔
2307
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,398✔
2308
  }
2309
  return code;
122,942✔
2310
_return:
×
2311
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2312
  return code;
×
2313
}
2314

2315
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
26,444✔
2316
  int32_t                    code =TSDB_CODE_SUCCESS;
26,444✔
2317
  int32_t                    line = 0;
26,444✔
2318
  char*                      refDbName = NULL;
26,444✔
2319
  char*                      refTbName = NULL;
26,444✔
2320
  char*                      refColName = NULL;
26,444✔
2321
  SDBVgInfo*                 dbVgInfo = NULL;
26,444✔
2322
  SName                      name = {0};
26,444✔
2323
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
26,444✔
2324
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
26,444✔
2325

2326
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
26,444✔
2327
  QUERY_CHECK_CODE(code, line, _return);
26,444✔
2328

2329
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
26,444✔
2330

2331
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
26,444✔
2332
  QUERY_CHECK_CODE(code, line, _return);
26,444✔
2333

2334
  code = tNameGetFullDbName(&name, dbFname);
26,444✔
2335
  QUERY_CHECK_CODE(code, line, _return);
26,444✔
2336

2337
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
26,444✔
2338
  QUERY_CHECK_CODE(code, line, _return);
26,444✔
2339

2340
_return:
26,444✔
2341
  if (code) {
26,444✔
2342
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2343
  }
2344
  taosMemoryFree(refDbName);
26,444✔
2345
  taosMemoryFree(refTbName);
26,444✔
2346
  taosMemoryFree(refColName);
26,444✔
2347
  return code;
26,444✔
2348
}
2349

2350
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
3,475,560✔
2351
  int32_t code = TSDB_CODE_SUCCESS;
3,475,560✔
2352
  int32_t line = 0;
3,475,560✔
2353
  STagVal tagVal = {0};
3,475,560✔
2354
  // last col is uid
2355

2356
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
3,475,560✔
2357
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
3,475,560✔
2358

2359
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
24,842,472✔
2360
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
21,366,912✔
2361
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
21,366,912✔
2362
    tagVal.type = pTagCol->info.type;
21,366,912✔
2363
    tagVal.cid = pTagCol->info.colId;
21,366,912✔
2364
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
42,733,824✔
2365
      char*   pData = colDataGetData(pTagCol, rowIdx);
21,366,912✔
2366
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
21,366,912✔
2367
        tagVal.nData = varDataLen(pData);
9,444,096✔
2368
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
9,444,096✔
2369
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
9,444,096✔
2370
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
9,444,096✔
2371
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
18,888,192✔
2372
      } else {
2373
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
11,922,816✔
2374
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
23,845,632✔
2375
      }
2376
    } else {
2377
      tagVal.pData = NULL;
×
2378
      tagVal.nData = 0;
×
2379
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2380
    }
2381
    tagVal = (STagVal){0};
21,366,912✔
2382
  }
2383
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
3,475,560✔
2384
  QUERY_CHECK_CODE(code, line, _return);
3,475,560✔
2385

2386
  return code;
3,475,560✔
2387
_return:
×
2388
  if (tagVal.pData) {
×
2389
    taosMemoryFreeClear(tagVal.pData);
×
2390
  }
2391
  if (pTagList) {
×
2392
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2393
  }
2394
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2395
  return code;
×
2396
}
2397

2398
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId,
14,727,569✔
2399
                                          SHashObj** ppRefMap) {
2400
  int32_t                    code = TSDB_CODE_SUCCESS;
14,727,569✔
2401
  int32_t                    line = 0;
14,727,569✔
2402
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
14,727,569✔
2403
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
14,727,569✔
2404
  SDBVgInfo*                 dbVgInfo = NULL;
14,727,569✔
2405
  SHashObj*                  refMap = NULL;
14,727,569✔
2406

2407
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
285,855,821✔
2408
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
271,128,252✔
2409
    *uid = pKV->uid;
271,128,252✔
2410
    *vgId = pKV->vgId;
271,128,252✔
2411
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
271,128,252✔
2412
      char*   refDbName = NULL;
55,678,382✔
2413
      char*   refTbName = NULL;
55,678,382✔
2414
      char*   refColName = NULL;
55,678,382✔
2415
      SName   name = {0};
55,678,382✔
2416
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
55,678,382✔
2417
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
55,678,382✔
2418

2419
      if (ppRefMap != NULL) {
55,678,382✔
2420
        // Track colref -> colId mapping for later slot grouping.
2421
        if (refMap == NULL) {
23,672,821✔
2422
          refMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
3,730,404✔
2423
          QUERY_CHECK_NULL(refMap, code, line, _return, terrno)
3,730,404✔
2424
        }
2425
        code = addRefColIdToRefMap(refMap, pKV->colrefName, pKV->colId);
23,672,821✔
2426
        QUERY_CHECK_CODE(code, line, _return);
23,672,821✔
2427
      }
2428

2429
      // Parse db/tb/col ref and resolve source table vgId.
2430
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
55,678,382✔
2431
      QUERY_CHECK_CODE(code, line, _return);
55,678,382✔
2432

2433
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
55,678,382✔
2434

2435
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
55,678,382✔
2436
      QUERY_CHECK_CODE(code, line, _return);
55,678,382✔
2437
      code = tNameGetFullDbName(&name, dbFname);
55,678,382✔
2438
      QUERY_CHECK_CODE(code, line, _return);
55,678,382✔
2439
      code = tNameGetFullTableName(&name, orgTbFName);
55,678,382✔
2440
      QUERY_CHECK_CODE(code, line, _return);
55,678,382✔
2441

2442
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
55,678,382✔
2443
      if (!pVal) {
55,678,382✔
2444
        SOrgTbInfo orgTbInfo = {0};
19,594,816✔
2445
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
19,594,816✔
2446
        QUERY_CHECK_CODE(code, line, _return);
19,594,816✔
2447
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
19,594,816✔
2448
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
19,594,816✔
2449
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
19,594,816✔
2450
        SColIdNameKV colIdNameKV = {0};
19,594,816✔
2451
        colIdNameKV.colId = pKV->colId;
19,594,816✔
2452
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
19,594,816✔
2453
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
39,189,632✔
2454
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
19,594,816✔
2455
        QUERY_CHECK_CODE(code, line, _return);
19,594,816✔
2456
      } else {
2457
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
36,083,566✔
2458
        SColIdNameKV colIdNameKV = {0};
36,083,566✔
2459
        colIdNameKV.colId = pKV->colId;
36,083,566✔
2460
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
36,083,566✔
2461
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
72,167,132✔
2462
      }
2463
      taosMemoryFree(refDbName);
55,678,382✔
2464
      taosMemoryFree(refTbName);
55,678,382✔
2465
      taosMemoryFree(refColName);
55,678,382✔
2466
    }
2467
  }
2468

2469
  if (ppRefMap != NULL) {
14,727,569✔
2470
    *ppRefMap = refMap;
5,538,091✔
2471
  }
2472

2473
  return code;
14,727,569✔
2474

2475
_return:
×
2476
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2477
  if (refMap) {
×
2478
    taosHashCleanup(refMap);
×
2479
  }
2480
  return code;
×
2481
}
2482

2483
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
388,860✔
2484
  int32_t                    code = TSDB_CODE_SUCCESS;
388,860✔
2485
  int32_t                    line = 0;
388,860✔
2486
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
388,860✔
2487
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
388,860✔
2488
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
388,860✔
2489
  SArray*                    pColRefArray = NULL;
388,860✔
2490
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
388,860✔
2491
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
388,860✔
2492

2493
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
388,860✔
2494
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
388,860✔
2495
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
388,860✔
2496
  if (hasPartition) {
388,860✔
2497
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
317,568✔
2498
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
317,568✔
2499
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
317,568✔
2500
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
317,568✔
2501
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
317,568✔
2502
  }
2503

2504
  while (true) {
1,464,220✔
2505
    SSDataBlock *pTagVal = NULL;
1,853,080✔
2506
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
1,853,080✔
2507
    QUERY_CHECK_CODE(code, line, _return);
1,853,080✔
2508
    if (pTagVal == NULL) {
1,853,080✔
2509
      break;
388,860✔
2510
    }
2511
    SHashObj *vtbUidTagListMap = NULL;
1,464,220✔
2512
    if (hasPartition) {
1,464,220✔
2513
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
1,321,636✔
2514
      if (pIter) {
1,321,636✔
2515
        vtbUidTagListMap = *(SHashObj**)pIter;
10,780✔
2516
      } else {
2517
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,310,856✔
2518
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
1,310,856✔
2519
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
1,310,856✔
2520

2521
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
1,310,856✔
2522
        QUERY_CHECK_CODE(code, line, _return);
1,310,856✔
2523
      }
2524
    } else {
2525
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
142,584✔
2526
    }
2527

2528
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
1,464,220✔
2529
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
1,464,220✔
2530
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
4,939,780✔
2531
      tb_uid_t uid = 0;
3,475,560✔
2532
      if (!colDataIsNull_s(pUidCol, i)) {
6,951,120✔
2533
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
3,475,560✔
2534
        QUERY_CHECK_CODE(code, line, _return);
3,475,560✔
2535
      }
2536

2537
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
3,475,560✔
2538
      QUERY_CHECK_CODE(code, line, _return);
3,475,560✔
2539

2540
      if (hasPartition) {
3,475,560✔
2541
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
2,858,112✔
2542
        QUERY_CHECK_CODE(code, line, _return);
2,858,112✔
2543
      }
2544
    }
2545
  }
2546

2547
  return code;
388,860✔
2548

2549
_return:
×
2550
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2551
  return code;
×
2552
}
2553

2554
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
388,860✔
2555
  int32_t                    code = TSDB_CODE_SUCCESS;
388,860✔
2556
  int32_t                    line = 0;
388,860✔
2557
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
388,860✔
2558
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
388,860✔
2559
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
388,860✔
2560
  SArray*                    pColRefArray = NULL;
388,860✔
2561
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
388,860✔
2562
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
388,860✔
2563

2564
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
388,860✔
2565
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
388,860✔
2566

2567
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
3,174,500✔
2568
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,785,640✔
2569
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
2,785,640✔
2570

2571
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
2,785,640✔
2572
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
2,785,640✔
2573

2574
    tb_uid_t uid = 0;
2,785,640✔
2575
    int32_t  vgId = 0;
2,785,640✔
2576
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, NULL);
2,785,640✔
2577
    QUERY_CHECK_CODE(code, line, _return);
2,785,640✔
2578

2579
    size_t len = 0;
2,785,640✔
2580
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
2,785,640✔
2581
    while (pOrgTbInfo != NULL) {
6,639,056✔
2582
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
3,853,416✔
2583
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
3,853,416✔
2584

2585
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
3,853,416✔
2586
      if (!pIter) {
3,853,416✔
2587
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
3,293,292✔
2588
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
3,293,292✔
2589
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
6,586,584✔
2590
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
3,293,292✔
2591
        QUERY_CHECK_CODE(code, line, _return);
3,293,292✔
2592
      } else {
2593
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
560,124✔
2594
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
560,124✔
2595
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
560,124✔
2596
      }
2597

2598
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
3,853,416✔
2599

2600
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
3,853,416✔
2601
      QUERY_CHECK_CODE(code, line, _return);
3,853,416✔
2602
    }
2603

2604
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
2,785,640✔
2605
    QUERY_CHECK_CODE(code, line, _return);
2,785,640✔
2606
  }
2607

2608
  return code;
388,860✔
2609
_return:
×
2610
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2611
  return code;
×
2612
}
2613

2614
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
388,860✔
2615
  int32_t                    code = TSDB_CODE_SUCCESS;
388,860✔
2616
  int32_t                    line = 0;
388,860✔
2617

2618
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
388,860✔
2619
  QUERY_CHECK_CODE(code, line, _return);
388,860✔
2620

2621
  // process tag
2622
  code = getTagBlockAndProcess(pOperator, hasPartition);
388,860✔
2623
  QUERY_CHECK_CODE(code, line, _return);
388,860✔
2624

2625
  return code;
388,860✔
2626
_return:
×
2627
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2628
  return code;
×
2629
}
2630

2631
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
968,712✔
2632
  int32_t                    code = TSDB_CODE_SUCCESS;
968,712✔
2633
  int32_t                    line = 0;
968,712✔
2634
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
968,712✔
2635
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
968,712✔
2636
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
968,712✔
2637
  SArray*                    pColRefArray = NULL;
968,712✔
2638
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
968,712✔
2639
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
968,712✔
2640

2641
  if (hasPartition) {
968,712✔
2642
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
566,408✔
2643
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
566,408✔
2644
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
566,408✔
2645

2646
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
566,408✔
2647
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
566,408✔
2648
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
566,408✔
2649
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
566,408✔
2650
  } else {
2651
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
402,304✔
2652
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
402,304✔
2653
  }
2654

2655
  while (true && hasPartition) {
3,342,716✔
2656
    SSDataBlock* pTagVal = NULL;
2,940,412✔
2657
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
2,940,412✔
2658
    QUERY_CHECK_CODE(code, line, _return);
2,940,412✔
2659
    if (pTagVal == NULL) {
2,940,412✔
2660
      break;
566,408✔
2661
    }
2662

2663
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
2,374,004✔
2664
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
2,374,004✔
2665
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
7,447,316✔
2666
      tb_uid_t uid = 0;
5,073,312✔
2667
      if (!colDataIsNull_s(pUidCol, i)) {
10,146,624✔
2668
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
5,073,312✔
2669
        QUERY_CHECK_CODE(code, line, _return);
5,073,312✔
2670
      }
2671
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
5,073,312✔
2672
      QUERY_CHECK_CODE(code, line, _return);
5,073,312✔
2673
    }
2674
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
2,374,004✔
2675
    QUERY_CHECK_CODE(code, line, _return);
2,374,004✔
2676
  }
2677

2678
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
7,372,550✔
2679
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
6,403,838✔
2680
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
6,403,838✔
2681
    tb_uid_t uid = 0;
6,403,838✔
2682
    int32_t  vgId = 0;
6,403,838✔
2683
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, NULL);
6,403,838✔
2684
    QUERY_CHECK_CODE(code, line, _return);
6,403,838✔
2685

2686
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
6,403,838✔
2687
    if (hasPartition) {
6,403,838✔
2688
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
4,383,392✔
2689
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
4,383,392✔
2690

2691
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
4,383,392✔
2692
      if (pHashIter) {
4,383,392✔
2693
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
2,268,108✔
2694
      } else {
2695
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
2,115,284✔
2696
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
2,115,284✔
2697
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
2,115,284✔
2698
        QUERY_CHECK_CODE(code, line, _return);
2,115,284✔
2699
      }
2700
    } else {
2701
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
2,020,446✔
2702
    }
2703

2704
    size_t len = 0;
6,403,838✔
2705
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
6,403,838✔
2706
    while (pOrgTbInfo != NULL) {
14,275,559✔
2707
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
7,871,721✔
2708
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
7,871,721✔
2709
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
7,871,721✔
2710
      if (!pIter) {
7,871,721✔
2711
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
3,873,272✔
2712
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
3,873,272✔
2713
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
7,746,544✔
2714
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
3,873,272✔
2715
        QUERY_CHECK_CODE(code, line, _return);
3,873,272✔
2716
      } else {
2717
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
3,998,449✔
2718
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
3,998,449✔
2719
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
3,998,449✔
2720
      }
2721

2722
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
7,871,721✔
2723

2724
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
7,871,721✔
2725
      QUERY_CHECK_CODE(code, line, _return);
7,871,721✔
2726
    }
2727
  }
2728
  return code;
968,712✔
2729
_return:
×
2730
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2731
  return code;
×
2732
}
2733

2734
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
2,769,924✔
2735
  int32_t                    code = TSDB_CODE_SUCCESS;
2,769,924✔
2736
  int32_t                    line = 0;
2,769,924✔
2737
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,769,924✔
2738
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,770,142✔
2739
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,770,142✔
2740
  SArray*                    pColRefArray = NULL;
2,770,142✔
2741
  SOperatorInfo*             pSystableScanOp = NULL;
2,770,142✔
2742
  
2743
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,770,142✔
2744
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
2,770,142✔
2745

2746
  if (pInfo->qType == DYN_QTYPE_VTB_AGG || pInfo->qType == DYN_QTYPE_VTB_INTERVAL) {
2,770,142✔
2747
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,110,362✔
2748
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
1,110,362✔
2749
    pSystableScanOp = pOperator->pDownstream[0];
1,110,362✔
2750
  } else if (pInfo->qType == DYN_QTYPE_VTB_WINDOW || pInfo->qType == DYN_QTYPE_VTB_TS_SCAN) {
1,659,780✔
2751
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
247,428✔
2752
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
247,210✔
2753
    pSystableScanOp = pOperator->pDownstream[1];
247,210✔
2754
  } else {
2755
    pSystableScanOp = pOperator->pDownstream[1];
1,412,352✔
2756
  }
2757

2758
  while (true) {
5,542,126✔
2759
    SSDataBlock *pChildInfo = NULL;
8,312,268✔
2760
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
8,312,268✔
2761
    QUERY_CHECK_CODE(code, line, _return);
8,312,268✔
2762
    if (pChildInfo == NULL) {
8,312,268✔
2763
      break;
2,770,142✔
2764
    }
2765
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
5,542,126✔
2766
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
5,542,126✔
2767
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
5,542,126✔
2768

2769
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
5,542,126✔
2770
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
5,542,126✔
2771
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
5,542,126✔
2772

2773
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
408,452,706✔
2774
      if (!colDataIsNull_s(pStbNameCol, i)) {
805,821,160✔
2775
        char* stbrawname = colDataGetData(pStbNameCol, i);
402,910,580✔
2776
        char* dbrawname = colDataGetData(pDbNameCol, i);
402,910,580✔
2777
        char *ctbName = colDataGetData(pTableNameCol, i);
402,910,580✔
2778

2779
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
402,910,580✔
2780
          SColRefInfo info = {0};
270,507,876✔
2781
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
270,507,876✔
2782
          QUERY_CHECK_CODE(code, line, _return);
270,507,876✔
2783

2784
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
270,507,876✔
2785
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
123,476,228✔
2786
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2787
                     pInfo->vtbScan.dynTbUid);
2788
              destroyColRefInfo(&info);
×
2789
              continue;
×
2790
            }
2791

2792
            if (pTaskInfo->pStreamRuntimeInfo) {
123,476,228✔
2793
              if (pVtbScan->curOrgTbVg == NULL) {
35,840✔
2794
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,368✔
2795
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,368✔
2796
              }
2797

2798
              if (info.colrefName) {
35,840✔
2799
                int32_t vgId;
19,468✔
2800
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
19,468✔
2801
                QUERY_CHECK_CODE(code, line, _return);
19,468✔
2802
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
19,468✔
2803
                QUERY_CHECK_CODE(code, line, _return);
19,468✔
2804
              }
2805
            }
2806
          }
2807

2808
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
270,507,876✔
2809
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
14,603,319✔
2810
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
14,603,319✔
2811
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
29,206,638✔
2812
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
14,603,319✔
2813
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
29,206,638✔
2814
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
14,603,319✔
2815
            QUERY_CHECK_CODE(code, line, _return);
14,603,319✔
2816
          } else {
2817
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
255,904,557✔
2818
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
255,904,557✔
2819
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
255,904,557✔
2820
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
255,904,557✔
2821
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
511,809,114✔
2822
          }
2823
        }
2824
      }
2825
    }
2826
  }
2827

2828
  switch (pInfo->qType) {
2,770,142✔
2829
    case DYN_QTYPE_VTB_TS_SCAN:
272,263✔
2830
    case DYN_QTYPE_VTB_WINDOW:
2831
    case DYN_QTYPE_VTB_INTERVAL: {
2832
      code = buildOrgTbInfoBatch(pOperator, false);
272,263✔
2833
      break;
272,263✔
2834
    }
2835
    case DYN_QTYPE_VTB_AGG: {
1,085,309✔
2836
      if (pVtbScan->batchProcessChild) {
1,085,309✔
2837
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
696,449✔
2838
      } else {
2839
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
388,860✔
2840
      }
2841
      break;
1,085,309✔
2842
    }
2843
    case DYN_QTYPE_VTB_SCAN: {
1,412,570✔
2844
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,412,570✔
2845
      break;
1,412,570✔
2846
    }
2847
    default: {
×
2848
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2849
      break;
×
2850
    }
2851
  }
2852

2853
  QUERY_CHECK_CODE(code, line, _return);
2,770,142✔
2854

2855
_return:
2,770,142✔
2856
  if (code) {
2,770,142✔
2857
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,090✔
2858
  }
2859
  return code;
2,770,142✔
2860
}
2861

2862
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
127,302✔
2863
  int32_t                    code = TSDB_CODE_SUCCESS;
127,302✔
2864
  int32_t                    line = 0;
127,302✔
2865
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
127,302✔
2866
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
127,302✔
2867
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
127,302✔
2868
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
127,302✔
2869
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
127,302✔
2870
  int32_t                    rversion = 0;
127,302✔
2871

2872
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
127,302✔
2873
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
127,302✔
2874

2875
  while (true) {
250,862✔
2876
    SSDataBlock *pTableInfo = NULL;
378,164✔
2877
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
378,164✔
2878
    if (pTableInfo == NULL) {
378,164✔
2879
      break;
127,302✔
2880
    }
2881

2882
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
250,862✔
2883
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
250,862✔
2884
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
250,862✔
2885

2886
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
250,862✔
2887
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
250,862✔
2888
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
250,862✔
2889

2890
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
9,349,198✔
2891
      if (!colDataIsNull_s(pRefVerCol, i)) {
18,196,672✔
2892
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
9,098,336✔
2893
      }
2894

2895
      if (!colDataIsNull_s(pTableNameCol, i)) {
18,196,672✔
2896
        char* tbrawname = colDataGetData(pTableNameCol, i);
9,098,336✔
2897
        char* dbrawname = colDataGetData(pDbNameCol, i);
9,098,336✔
2898
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
9,098,336✔
2899
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
9,098,336✔
2900

2901
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
9,098,336✔
2902
          SColRefInfo info = {0};
632,584✔
2903
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
632,584✔
2904
          QUERY_CHECK_CODE(code, line, _return);
632,584✔
2905

2906
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
632,584✔
2907
            if (pVtbScan->curOrgTbVg == NULL) {
6,976✔
2908
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
436✔
2909
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
436✔
2910
            }
2911
            int32_t vgId;
6,976✔
2912
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,976✔
2913
            QUERY_CHECK_CODE(code, line, _return);
6,976✔
2914
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,976✔
2915
            QUERY_CHECK_CODE(code, line, _return);
6,976✔
2916
          }
2917

2918
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,265,168✔
2919
        }
2920
      }
2921
    }
2922
  }
2923
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
127,302✔
2924
  QUERY_CHECK_CODE(code, line, _return);
127,302✔
2925

2926
_return:
125,994✔
2927
  if (code) {
127,302✔
2928
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,308✔
2929
  }
2930
  return code;
127,302✔
2931
}
2932

2933
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,030,888✔
2934
  int32_t                    code = TSDB_CODE_SUCCESS;
2,030,888✔
2935
  int32_t                    line = 0;
2,030,888✔
2936
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,030,888✔
2937
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,030,888✔
2938
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,030,888✔
2939

2940
  SArray *tmpArray = NULL;
2,030,888✔
2941
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,030,888✔
2942
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,030,670✔
2943
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,796✔
2944
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,398✔
2945
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,398✔
2946
      QUERY_CHECK_CODE(code, line, _return);
2,398✔
2947
      if (pVtbScan->newAddedVgInfo == NULL) {
2,398✔
2948
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
872✔
2949
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
872✔
2950
      }
2951
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,398✔
2952
      QUERY_CHECK_CODE(code, line, _return);
2,398✔
2953
    }
2954
    pVtbScan->needRedeploy = false;
2,398✔
2955
  } else {
2956
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,028,272✔
2957
    QUERY_CHECK_CODE(code, line, _return);
2,028,272✔
2958
  }
2959

2960
_return:
×
2961
  taosArrayClear(tmpArray);
2,030,670✔
2962
  taosArrayDestroy(tmpArray);
2,030,670✔
2963
  if (code) {
2,030,888✔
2964
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,028,490✔
2965
  }
2966
  return code;
2,030,888✔
2967
}
2968

2969
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
5,538,091✔
2970
  int32_t                    code = TSDB_CODE_SUCCESS;
5,538,091✔
2971
  int32_t                    line = 0;
5,538,091✔
2972
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
5,538,091✔
2973
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
5,538,091✔
2974
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
5,538,091✔
2975

2976
  pVtbScan->vtbScanParam = NULL;
5,538,091✔
2977
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
5,538,091✔
2978
  QUERY_CHECK_CODE(code, line, _return);
5,538,091✔
2979

2980
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
5,538,091✔
2981
  while (pIter != NULL) {
13,407,770✔
2982
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
7,869,679✔
2983
    SOperatorParam*  pExchangeParam = NULL;
7,869,679✔
2984
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
7,869,679✔
2985
    if (addr != NULL) {
7,869,679✔
2986
      SDownstreamSourceNode newSource = {0};
2,398✔
2987
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,398✔
2988
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,398✔
2989
      newSource.taskId = addr->taskId;
2,398✔
2990
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,398✔
2991
      newSource.localExec = false;
2,398✔
2992
      newSource.addr.nodeId = addr->nodeId;
2,398✔
2993
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,398✔
2994

2995
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,398✔
2996
      QUERY_CHECK_CODE(code, line, _return);
2,398✔
2997
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,398✔
2998
      QUERY_CHECK_CODE(code, line, _return);
2,398✔
2999
    } else {
3000
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
7,867,281✔
3001
      QUERY_CHECK_CODE(code, line, _return);
7,867,281✔
3002
    }
3003
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
15,739,358✔
3004
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
7,869,679✔
3005
  }
3006

3007
  SOperatorParam*  pExchangeParam = NULL;
5,538,091✔
3008
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
5,538,091✔
3009
  QUERY_CHECK_CODE(code, line, _return);
5,538,091✔
3010
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
5,538,091✔
3011

3012
_return:
5,538,091✔
3013
  if (code) {
5,538,091✔
3014
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3015
  }
3016
  return code;
5,538,091✔
3017
}
3018

3019
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
14,615,050✔
3020
  int32_t                    code = TSDB_CODE_SUCCESS;
14,615,050✔
3021
  int32_t                    line = 0;
14,615,050✔
3022
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
14,615,050✔
3023
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
14,615,050✔
3024
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
14,615,050✔
3025

3026
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
14,615,050✔
3027
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
14,615,050✔
3028
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
14,615,050✔
3029

3030
  while (true) {
3031
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
18,658,939✔
3032
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
13,120,848✔
3033
      QUERY_CHECK_CODE(code, line, _return);
13,120,848✔
3034
    } else {
3035
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
5,538,091✔
3036
      SArray* pColRefInfo = NULL;
5,538,091✔
3037
      if (pVtbScan->isSuperTable) {
5,538,091✔
3038
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
5,412,097✔
3039
      } else {
3040
        pColRefInfo = pInfo->vtbScan.colRefInfo;
125,994✔
3041
      }
3042
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
5,538,091✔
3043

3044
      tb_uid_t  uid = 0;
5,538,091✔
3045
      int32_t   vgId = 0;
5,538,091✔
3046
      SHashObj* refMap = NULL;
5,538,091✔
3047
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, &refMap);
5,538,091✔
3048
      QUERY_CHECK_CODE(code, line, _return);
5,538,091✔
3049

3050
      qDebug("virtual table scan process subtable idx:%d uid:%" PRIu64 " vgId:%d", pVtbScan->curTableIdx, uid, vgId);
5,538,091✔
3051

3052
      code = buildRefSlotGroupsFromRefMap(refMap, pVtbScan->readColList, &pVtbScan->refColGroups);
5,538,091✔
3053
      QUERY_CHECK_CODE(code, line, _return);
5,538,091✔
3054

3055
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
5,538,091✔
3056
      QUERY_CHECK_CODE(code, line, _return);
5,538,091✔
3057

3058
      // reset downstream operator's status
3059
      pVtbScanOp->status = OP_NOT_OPENED;
5,538,091✔
3060
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
5,538,091✔
3061
      QUERY_CHECK_CODE(code, line, _return);
5,537,436✔
3062
    }
3063

3064
    if (*pRes) {
18,658,284✔
3065
      // has result, still read data from this table.
3066
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
13,125,684✔
3067
      break;
13,125,684✔
3068
    } else {
3069
      // no result, read next table.
3070
      pVtbScan->curTableIdx++;
5,532,600✔
3071
      if (pVtbScan->isSuperTable) {
5,532,600✔
3072
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
5,406,606✔
3073
          setOperatorCompleted(pOperator);
1,362,717✔
3074
          break;
1,362,717✔
3075
        }
3076
      } else {
3077
        setOperatorCompleted(pOperator);
125,994✔
3078
        break;
125,994✔
3079
      }
3080
    }
3081
  }
3082

3083
_return:
14,614,395✔
3084
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
14,614,395✔
3085
  pVtbScan->otbNameToOtbInfoMap = NULL;
14,614,395✔
3086
  if (code) {
14,614,395✔
3087
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3088
  }
3089
  return code;
14,614,395✔
3090
}
3091

3092
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
14,660,720✔
3093
  int32_t                    code = TSDB_CODE_SUCCESS;
14,660,720✔
3094
  int32_t                    line = 0;
14,660,720✔
3095
  int64_t                    st = 0;
14,660,720✔
3096
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
14,660,720✔
3097
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
14,660,720✔
3098

3099
  if (OPTR_IS_OPENED(pOperator)) {
14,660,720✔
3100
    return code;
13,120,848✔
3101
  }
3102

3103
  if (pOperator->cost.openCost == 0) {
1,539,872✔
3104
    st = taosGetTimestampUs();
1,438,738✔
3105
  }
3106

3107
  if (pVtbScan->isSuperTable) {
1,539,872✔
3108
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,412,570✔
3109
    QUERY_CHECK_CODE(code, line, _return);
1,412,570✔
3110
  } else {
3111
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
127,302✔
3112
    QUERY_CHECK_CODE(code, line, _return);
127,302✔
3113
  }
3114

3115
  OPTR_SET_OPENED(pOperator);
1,537,474✔
3116

3117
_return:
1,539,872✔
3118
  if (pOperator->cost.openCost == 0) {
1,539,872✔
3119
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,438,738✔
3120
  }
3121
  if (code) {
1,539,872✔
3122
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,398✔
3123
    pOperator->pTaskInfo->code = code;
2,398✔
3124
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,398✔
3125
  }
3126
  return code;
1,537,474✔
3127
}
3128

3129
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
16,688,992✔
3130
  int32_t                    code = TSDB_CODE_SUCCESS;
16,688,992✔
3131
  int32_t                    line = 0;
16,688,992✔
3132
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
16,688,992✔
3133
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
16,689,210✔
3134

3135
  QRY_PARAM_CHECK(pRes);
16,689,210✔
3136
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
16,689,210✔
3137
    return code;
×
3138
  }
3139
  if (pOperator->pOperatorGetParam) {
16,689,210✔
3140
    if (pOperator->status == OP_EXEC_DONE) {
×
3141
      pOperator->status = OP_OPENED;
×
3142
    }
3143
    pVtbScan->curTableIdx = 0;
×
3144
    pVtbScan->lastTableIdx = -1;
×
3145
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
NEW
3146
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
×
UNCOV
3147
    pOperator->pOperatorGetParam = NULL;
×
3148
  } else {
3149
    pVtbScan->window.skey = INT64_MAX;
16,689,210✔
3150
    pVtbScan->window.ekey = INT64_MIN;
16,689,210✔
3151
  }
3152

3153
  if (pVtbScan->needRedeploy) {
16,689,210✔
3154
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,030,888✔
3155
    QUERY_CHECK_CODE(code, line, _return);
2,030,888✔
3156
  }
3157

3158
  code = pOperator->fpSet._openFn(pOperator);
14,660,720✔
3159
  QUERY_CHECK_CODE(code, line, _return);
14,658,322✔
3160

3161
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
14,658,322✔
3162
    setOperatorCompleted(pOperator);
43,272✔
3163
    return code;
43,272✔
3164
  }
3165

3166
  code = virtualTableScanGetNext(pOperator, pRes);
14,615,050✔
3167
  QUERY_CHECK_CODE(code, line, _return);
14,614,395✔
3168

3169
  return code;
14,614,395✔
3170

3171
_return:
2,028,490✔
3172
  if (code) {
2,028,490✔
3173
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,028,490✔
3174
    pOperator->pTaskInfo->code = code;
2,028,490✔
3175
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,028,490✔
3176
  }
3177
  return code;
×
3178
}
3179

3180
/*
3181
 * Open dynamic vtable operator and build child-table mapping once.
3182
 *
3183
 * @param pOperator Dynamic-query control operator.
3184
 *
3185
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
3186
 */
3187
static int32_t vtbDefaultOpen(SOperatorInfo* pOperator) {
6,997,792✔
3188
  int32_t                    code = TSDB_CODE_SUCCESS;
6,997,792✔
3189
  int32_t                    line = 0;
6,997,792✔
3190
  int64_t                    st = 0;
6,997,792✔
3191
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
6,997,792✔
3192
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
6,997,792✔
3193

3194
  if (OPTR_IS_OPENED(pOperator)) {
6,997,792✔
3195
    return code;
5,854,026✔
3196
  }
3197

3198
  if (pOperator->cost.openCost == 0) {
1,143,766✔
3199
    st = taosGetTimestampUs();
1,143,766✔
3200
  }
3201

3202
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,143,766✔
3203
  QUERY_CHECK_CODE(code, line, _return);
1,143,766✔
3204

3205
  OPTR_SET_OPENED(pOperator);
1,143,766✔
3206

3207
_return:
1,143,766✔
3208
  if (pOperator->cost.openCost == 0) {
1,143,766✔
3209
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,143,766✔
3210
  }
3211
  if (code) {
1,143,766✔
3212
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3213
    pOperator->pTaskInfo->code = code;
×
3214
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3215
  }
3216
  return code;
1,143,766✔
3217
}
3218

3219
/*
3220
 * Get next result block for virtual-table ts-scan workflow.
3221
 *
3222
 * @param pOperator Dynamic-query control operator.
3223
 * @param pRes Output result data block pointer.
3224
 *
3225
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
3226
 */
3227
int32_t vtbTsScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,538,970✔
3228
  int32_t                    code = TSDB_CODE_SUCCESS;
1,538,970✔
3229
  int32_t                    line = 0;
1,538,970✔
3230
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,538,970✔
3231
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,538,970✔
3232
  SOperatorInfo*             pMergeOp = pOperator->pDownstream[0];
1,538,970✔
3233
  SOperatorParam*            pMergeParam = NULL;
1,538,970✔
3234
  SSDataBlock*               pResult = NULL;
1,538,970✔
3235

3236
  QRY_PARAM_CHECK(pRes);
1,538,970✔
3237
  if (pOperator->status == OP_EXEC_DONE) {
1,538,970✔
3238
    return code;
×
3239
  }
3240

3241
  code = pOperator->fpSet._openFn(pOperator);
1,538,970✔
3242
  QUERY_CHECK_CODE(code, line, _return);
1,538,970✔
3243

3244
  if (pVtbScan->genNewParam) {
1,538,970✔
3245
    code = buildMergeOperatorParamForTsScan(pInfo, &pMergeParam);
33,404✔
3246
    QUERY_CHECK_CODE(code, line, _return);
33,404✔
3247

3248
    code = pMergeOp->fpSet.getNextExtFn(pMergeOp, pMergeParam, &pResult);
33,404✔
3249
    QUERY_CHECK_CODE(code, line, _return);
33,404✔
3250
    pVtbScan->genNewParam = false;
33,404✔
3251
  } else {
3252
    code = pMergeOp->fpSet.getNextFn(pMergeOp, &pResult);
1,505,566✔
3253
    QUERY_CHECK_CODE(code, line, _return);
1,505,566✔
3254
  }
3255

3256
  if (pResult) {
1,538,970✔
3257
    *pRes = pResult;
1,505,566✔
3258
  } else {
3259
    *pRes = NULL;
33,404✔
3260
    setOperatorCompleted(pOperator);
33,404✔
3261
  }
3262

3263
_return:
1,538,970✔
3264
  if (code) {
1,538,970✔
3265
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3266
    pOperator->pTaskInfo->code = code;
×
3267
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3268
  }
3269
  return code;
1,538,970✔
3270
}
3271

3272
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,207,903✔
3273
  if (batchFetch) {
1,207,903✔
3274
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,206,736✔
3275
    if (NULL == pPrev->leftHash) {
1,206,736✔
3276
      return terrno;
×
3277
    }
3278
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,206,736✔
3279
    if (NULL == pPrev->rightHash) {
1,206,736✔
3280
      return terrno;
×
3281
    }
3282
  } else {
3283
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,167✔
3284
    if (NULL == pPrev->leftCache) {
1,167✔
3285
      return terrno;
×
3286
    }
3287
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,167✔
3288
    if (NULL == pPrev->rightCache) {
1,167✔
3289
      return terrno;
×
3290
    }
3291
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,167✔
3292
    if (NULL == pPrev->onceTable) {
1,167✔
3293
      return terrno;
×
3294
    }
3295
  }
3296

3297
  return TSDB_CODE_SUCCESS;
1,207,903✔
3298
}
3299

3300
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
3301
  if (pStreamRuntimeInfo == NULL) {
×
3302
    return;
×
3303
  }
3304

3305
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
3306
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
3307
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
3308
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
3309
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
3310

3311
      pVtbScan->dynTbUid = pValue->uid;
×
3312
      break;
×
3313
    }
3314
  }
3315
}
3316

3317
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
3,096,089✔
3318
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
3319
  int32_t      code = TSDB_CODE_SUCCESS;
3,096,089✔
3320
  int32_t      line = 0;
3,096,089✔
3321

3322
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
3,096,089✔
3323
  QUERY_CHECK_CODE(code, line, _return);
3,096,307✔
3324

3325
  pInfo->vtbScan.genNewParam = true;
3,096,307✔
3326
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
3,096,307✔
3327
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
3,096,307✔
3328
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
3,096,307✔
3329
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
3,096,307✔
3330
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
3,096,307✔
3331
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
3,096,307✔
3332
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
3,096,307✔
3333
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
3,096,307✔
3334
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
3,096,307✔
3335
  pInfo->vtbScan.needRedeploy = false;
3,096,307✔
3336
  pInfo->vtbScan.pMsgCb = pMsgCb;
3,096,307✔
3337
  pInfo->vtbScan.curTableIdx = 0;
3,096,307✔
3338
  pInfo->vtbScan.lastTableIdx = -1;
3,096,307✔
3339
  pInfo->vtbScan.dynTbUid = 0;
3,096,307✔
3340
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
3,096,307✔
3341
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
3,096,307✔
3342
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
3,096,307✔
3343
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
3,096,307✔
3344
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
3,096,307✔
3345
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
3,096,307✔
3346
  SNode* node = NULL;
3,096,307✔
3347
  FOREACH(node, pPhyciNode->vtbScan.pOrgVgIds) {
11,067,404✔
3348
    SValueNode* valueNode = (SValueNode*)node;
7,971,097✔
3349
    int32_t vgId = (int32_t)valueNode->datum.i;
7,971,097✔
3350
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
7,971,097✔
3351
    QUERY_CHECK_CODE(code, line, _return);
7,971,097✔
3352
  }
3353

3354
  if (pPhyciNode->dynTbname && pTaskInfo) {
3,096,307✔
3355
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3356
  }
3357

3358
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
3,096,307✔
3359
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
3,096,307✔
3360

3361
  SNode* colNode = NULL;
3,096,307✔
3362
  FOREACH(colNode, pPhyciNode->vtbScan.pScanCols) {
26,774,672✔
3363
    SColumnNode* pNode = (SColumnNode*)colNode;
23,678,365✔
3364
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
23,678,365✔
3365
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
47,356,730✔
3366
  }
3367

3368
  pInfo->vtbScan.readColSet =
3,096,307✔
3369
      taosHashInit(taosArrayGetSize(pInfo->vtbScan.readColList) > 0 ? taosArrayGetSize(pInfo->vtbScan.readColList) : 1,
3,096,307✔
3370
                   taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), true, HASH_NO_LOCK);
3371
  QUERY_CHECK_NULL(pInfo->vtbScan.readColSet, code, line, _return, terrno)
3,096,307✔
3372
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->vtbScan.readColList); i++) {
26,774,672✔
3373
    col_id_t colId = *(col_id_t*)taosArrayGet(pInfo->vtbScan.readColList, i);
23,678,365✔
3374
    code = taosHashPut(pInfo->vtbScan.readColSet, &colId, sizeof(colId), NULL, 0);
23,678,365✔
3375
    QUERY_CHECK_CODE(code, line, _return);
23,678,365✔
3376
  }
3377

3378
  pInfo->vtbScan.refColGroups = NULL;
3,096,307✔
3379

3380
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
3,096,307✔
3381
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
3,096,307✔
3382

3383
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
3,096,307✔
3384
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
3,096,307✔
3385

3386
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
3,096,307✔
3387
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
3,096,307✔
3388
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
3,096,307✔
3389
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
3,096,307✔
3390
  pInfo->vtbScan.vtbUidTagListMap = NULL;
3,096,307✔
3391
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
3,096,307✔
3392
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
3,096,307✔
3393

3394
  return code;
3,096,307✔
3395
_return:
×
3396
  // no need to destroy array and hashmap allocated in this function,
3397
  // since the operator's destroy function will take care of it
3398
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3399
  return code;
×
3400
}
3401

3402
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
513,803✔
3403
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3404
  int32_t              code = TSDB_CODE_SUCCESS;
513,803✔
3405
  int32_t              line = 0;
513,803✔
3406
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
513,803✔
3407

3408
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
513,803✔
3409
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
513,803✔
3410
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
513,803✔
3411
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
513,803✔
3412
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
513,803✔
3413
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
513,803✔
3414

3415
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
513,803✔
3416
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
513,803✔
3417

3418
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
513,803✔
3419
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
513,803✔
3420

3421
  pInfo->vtbWindow.outputWstartSlotId = -1;
513,803✔
3422
  pInfo->vtbWindow.outputWendSlotId = -1;
513,803✔
3423
  pInfo->vtbWindow.outputWdurationSlotId = -1;
513,803✔
3424
  pInfo->vtbWindow.curWinBatchIdx = 0;
513,803✔
3425

3426
  initResultSizeInfo(&pOperator->resultInfo, 1);
513,803✔
3427
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
513,803✔
3428
  QUERY_CHECK_CODE(code, line, _return);
513,803✔
3429

3430
  return code;
513,803✔
3431
_return:
×
3432
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3433
  return code;
×
3434
}
3435

3436
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
2,814,820✔
3437
  int32_t code = TSDB_CODE_SUCCESS;
2,814,820✔
3438
  int32_t lino = 0;
2,814,820✔
3439

3440
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
2,814,820✔
3441
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
2,814,820✔
3442
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
2,814,820✔
3443

3444
    *ppTsCols = (int64_t*)pColDataInfo->pData;
2,814,820✔
3445

3446
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
2,814,820✔
3447
      code = blockDataUpdateTsWindow(pBlock, slotId);
314,865✔
3448
      QUERY_CHECK_CODE(code, lino, _return);
314,865✔
3449
    }
3450
  }
3451

3452
  return code;
2,814,820✔
3453
_return:
×
3454
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3455
  return code;
×
3456
}
3457

3458
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
568,754✔
3459
  int32_t                    code = TSDB_CODE_SUCCESS;
568,754✔
3460
  int32_t                    lino = 0;
568,754✔
3461
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
568,754✔
3462
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
568,754✔
3463
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
568,754✔
3464
  int64_t                    st = 0;
568,754✔
3465

3466
  if (OPTR_IS_OPENED(pOperator)) {
568,754✔
3467
    return code;
54,951✔
3468
  }
3469

3470
  if (pOperator->cost.openCost == 0) {
513,803✔
3471
    st = taosGetTimestampUs();
513,803✔
3472
  }
3473

3474
  while (1) {
1,407,410✔
3475
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,921,213✔
3476
    if (pBlock == NULL) {
1,921,213✔
3477
      break;
513,803✔
3478
    }
3479

3480
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
1,407,410✔
3481
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
3,644,209✔
3482
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
3,131,021✔
3483
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
3,131,021✔
3484
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
856,971✔
3485
            pInfo->outputWstartSlotId = i;
331,694✔
3486
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
525,277✔
3487
            pInfo->outputWendSlotId = i;
331,694✔
3488
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
193,583✔
3489
            pInfo->outputWdurationSlotId = i;
193,583✔
3490
          }
3491
        }
3492
      }
3493
    }
3494

3495
    TSKEY* wstartCol = NULL;
1,407,410✔
3496
    TSKEY* wendCol = NULL;
1,407,410✔
3497

3498
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
1,407,410✔
3499
    QUERY_CHECK_CODE(code, lino, _return);
1,407,410✔
3500
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
1,407,410✔
3501
    QUERY_CHECK_CODE(code, lino, _return);
1,407,410✔
3502

3503
    SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
1,407,410✔
3504
    QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
1,407,410✔
3505

3506
    QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
1,407,410✔
3507

3508
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
3509
      SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
2,147,483,647✔
3510
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
2,147,483,647✔
3511
      pWindow->tw.skey = wstartCol[i];
2,147,483,647✔
3512
      pWindow->tw.ekey = wendCol[i] + 1;
2,147,483,647✔
3513
      pWindow->winOutIdx = -1;
2,147,483,647✔
3514
    }
3515

3516
    QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
2,814,820✔
3517
  }
3518

3519
  // handle first window's start key and last window's end key
3520
  int32_t winBatchNum = (int32_t)taosArrayGetSize(pDynInfo->vtbWindow.pWins);
513,803✔
3521
  if (winBatchNum > 0) {
513,803✔
3522
    SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
513,188✔
3523
    SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, winBatchNum - 1);
513,188✔
3524

3525
    QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
513,188✔
3526
    QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
513,188✔
3527

3528
    SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
513,188✔
3529
    SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
513,188✔
3530

3531
    QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
513,188✔
3532
    QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
513,188✔
3533

3534
    if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
513,188✔
3535
      lastWin->tw.ekey = INT64_MAX;
148,912✔
3536
    }
3537
    if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
513,188✔
3538
      firstWin->tw.skey = INT64_MIN;
148,912✔
3539
    }
3540
  }
3541

3542
  if (pInfo->isVstb) {
513,803✔
3543
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
213,806✔
3544
    QUERY_CHECK_CODE(code, lino, _return);
213,806✔
3545
  }
3546

3547
  OPTR_SET_OPENED(pOperator);
513,803✔
3548

3549
  if (pOperator->cost.openCost == 0) {
513,803✔
3550
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
513,803✔
3551
  }
3552

3553
_return:
×
3554
  if (code != TSDB_CODE_SUCCESS) {
513,803✔
3555
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3556
    pTaskInfo->code = code;
×
3557
    T_LONG_JMP(pTaskInfo->env, code);
×
3558
  }
3559
  return code;
513,803✔
3560
}
3561

3562
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
233,583✔
3563
  int32_t                       code = TSDB_CODE_SUCCESS;
233,583✔
3564
  int32_t                       lino = 0;
233,583✔
3565
  SExternalWindowOperatorParam* pExtWinOp = NULL;
233,583✔
3566

3567
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
233,583✔
3568
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
233,583✔
3569

3570
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
233,583✔
3571
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
233,583✔
3572

3573
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
233,583✔
3574
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
233,583✔
3575

3576
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(pWins, 0);
233,583✔
3577
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(pWins);
233,583✔
3578

3579
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno);
233,583✔
3580
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno);
233,583✔
3581

3582
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
233,583✔
3583
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
233,583✔
3584

3585
  SOperatorParam* pExchangeParam = NULL;
233,583✔
3586
  code = buildBatchExchangeOperatorParamForVirtual(
233,583✔
3587
      &pExchangeParam, 0, NULL, 0, pInfo->vtbScan.otbVgIdToOtbInfoArrayMap,
3588
      (STimeWindow){.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey}, EX_SRC_TYPE_VSTB_WIN_SCAN,
233,583✔
3589
      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
3590
  QUERY_CHECK_CODE(code, lino, _return);
233,583✔
3591

3592
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
467,166✔
3593

3594
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
233,583✔
3595
  (*ppRes)->downstreamIdx = idx;
233,583✔
3596
  (*ppRes)->value = pExtWinOp;
233,583✔
3597
  (*ppRes)->reUse = false;
233,583✔
3598

3599
  return code;
233,583✔
3600
_return:
×
3601
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3602
  if (pExtWinOp) {
×
3603
    if (pExtWinOp->ExtWins) {
×
3604
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3605
    }
3606
    taosMemoryFree(pExtWinOp);
×
3607
  }
3608
  if (*ppRes) {
×
3609
    if ((*ppRes)->pChildren) {
×
3610
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3611
        SOperatorParam* pChildParam = taosArrayGetP((*ppRes)->pChildren, i);
×
3612
        if (pChildParam) {
×
3613
          freeOperatorParam(pChildParam, OP_GET_PARAM);
×
3614
        }
3615
      }
3616
      taosArrayDestroy((*ppRes)->pChildren);
×
3617
    }
3618
    taosMemoryFree(*ppRes);
×
3619
    *ppRes = NULL;
×
3620
  }
3621
  return code;
×
3622
}
3623

3624
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
568,754✔
3625
  int32_t                    code = TSDB_CODE_SUCCESS;
568,754✔
3626
  int32_t                    lino = 0;
568,754✔
3627
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
568,754✔
3628
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
568,754✔
3629
  int64_t                    st = taosGetTimestampUs();
568,754✔
3630
  int32_t                    numOfWins = 0;
568,754✔
3631
  SOperatorInfo*             mergeOp = NULL;
568,754✔
3632
  SOperatorInfo*             extWinOp = NULL;
568,754✔
3633
  SOperatorParam*            pMergeParam = NULL;
568,754✔
3634
  SOperatorParam*            pExtWinParam = NULL;
568,754✔
3635
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
568,754✔
3636
  SSDataBlock*               pRes = pInfo->pRes;
568,754✔
3637

3638
  code = pOperator->fpSet._openFn(pOperator);
568,754✔
3639
  QUERY_CHECK_CODE(code, lino, _return);
568,754✔
3640

3641
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
568,754✔
3642
    *ppRes = NULL;
27,857✔
3643
    return code;
27,857✔
3644
  }
3645

3646
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
540,897✔
3647
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
540,897✔
3648

3649
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
540,897✔
3650

3651
  if (pInfo->isVstb) {
540,897✔
3652
    extWinOp = pOperator->pDownstream[2];
233,583✔
3653
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
233,583✔
3654
    QUERY_CHECK_CODE(code, lino, _return);
233,583✔
3655

3656
    SSDataBlock* pExtWinBlock = NULL;
233,583✔
3657
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
233,583✔
3658
    QUERY_CHECK_CODE(code, lino, _return);
233,583✔
3659
    setOperatorCompleted(extWinOp);
233,583✔
3660
    // Free the parameter after operator completes, as it's been saved to the operator
3661
    if (extWinOp->pOperatorGetParam) {
233,583✔
3662
      freeOperatorParam(extWinOp->pOperatorGetParam, OP_GET_PARAM);
×
3663
      extWinOp->pOperatorGetParam = NULL;
×
3664
    }
3665
    // Also free downstream params if any
3666
    if (extWinOp->pDownstreamGetParams) {
233,583✔
3667
      for (int32_t i = 0; i < extWinOp->numOfDownstream; i++) {
467,166✔
3668
        if (extWinOp->pDownstreamGetParams[i]) {
233,583✔
3669
          freeOperatorParam(extWinOp->pDownstreamGetParams[i], OP_GET_PARAM);
×
3670
          extWinOp->pDownstreamGetParams[i] = NULL;
×
3671
        }
3672
      }
3673
    }
3674

3675
    blockDataCleanup(pRes);
233,583✔
3676
    code = blockDataEnsureCapacity(pRes, numOfWins);
233,583✔
3677
    QUERY_CHECK_CODE(code, lino, _return);
233,583✔
3678

3679
    if (pExtWinBlock) {
233,583✔
3680
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
233,583✔
3681
      QUERY_CHECK_CODE(code, lino, _return);
233,583✔
3682

3683
      if (pInfo->curWinBatchIdx == 0) {
233,583✔
3684
        // first batch, bound _wstart by upstream window range
3685
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
213,191✔
3686
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
213,191✔
3687

3688
        firstWin->tw.skey = TMAX(firstWin->tw.skey, pExtWinBlock->info.window.skey);
213,191✔
3689
      }
3690
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
233,583✔
3691
        // last batch, bound _wend by upstream window range
3692
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
29,469✔
3693
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
29,469✔
3694

3695
        lastWin->tw.ekey = TMIN(lastWin->tw.ekey, pExtWinBlock->info.window.ekey + 1);
29,469✔
3696
      }
3697
    }
3698
  } else {
3699
    mergeOp = pOperator->pDownstream[1];
307,314✔
3700
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
307,314✔
3701
    QUERY_CHECK_CODE(code, lino, _return);
307,314✔
3702

3703
    SSDataBlock* pMergedBlock = NULL;
307,314✔
3704
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
307,314✔
3705
    QUERY_CHECK_CODE(code, lino, _return);
307,314✔
3706
    // Free the parameter after operator completes, as it's been saved to the operator
3707
    if (mergeOp->pOperatorGetParam) {
307,314✔
3708
      freeOperatorParam(mergeOp->pOperatorGetParam, OP_GET_PARAM);
×
3709
      mergeOp->pOperatorGetParam = NULL;
×
3710
    }
3711
    // Also free downstream params if any
3712
    if (mergeOp->pDownstreamGetParams) {
307,314✔
3713
      for (int32_t i = 0; i < mergeOp->numOfDownstream; i++) {
1,046,331✔
3714
        if (mergeOp->pDownstreamGetParams[i]) {
739,017✔
3715
          freeOperatorParam(mergeOp->pDownstreamGetParams[i], OP_GET_PARAM);
×
3716
          mergeOp->pDownstreamGetParams[i] = NULL;
×
3717
        }
3718
      }
3719
    }
3720

3721
    blockDataCleanup(pRes);
307,314✔
3722
    code = blockDataEnsureCapacity(pRes, numOfWins);
307,314✔
3723
    QUERY_CHECK_CODE(code, lino, _return);
307,314✔
3724

3725
    if (pMergedBlock) {
307,314✔
3726
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
307,314✔
3727
      QUERY_CHECK_CODE(code, lino, _return);
307,314✔
3728

3729
      if (pInfo->curWinBatchIdx == 0) {
307,314✔
3730
        // first batch, bound _wstart by upstream window range
3731
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
299,997✔
3732
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
299,997✔
3733

3734
        firstWin->tw.skey = TMAX(firstWin->tw.skey, pMergedBlock->info.window.skey);
299,997✔
3735
      }
3736
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
307,314✔
3737
        // last batch, bound _wend by upstream window range
3738
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
7,317✔
3739
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
7,317✔
3740

3741
        lastWin->tw.ekey = TMIN(lastWin->tw.ekey, pMergedBlock->info.window.ekey + 1);
7,317✔
3742
      }
3743
    }
3744
  }
3745

3746

3747
  if (pInfo->outputWstartSlotId != -1) {
540,897✔
3748
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
359,403✔
3749
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
359,403✔
3750

3751
    for (int32_t i = 0; i < numOfWins; i++) {
1,036,343,843✔
3752
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
1,035,984,440✔
3753
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
1,035,984,440✔
3754
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
1,035,984,440✔
3755
      QUERY_CHECK_CODE(code, lino, _return);
1,035,984,440✔
3756
    }
3757
  }
3758
  if (pInfo->outputWendSlotId != -1) {
540,897✔
3759
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
359,403✔
3760
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
359,403✔
3761

3762
    for (int32_t i = 0; i < numOfWins; i++) {
1,036,343,843✔
3763
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
1,035,984,440✔
3764
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
1,035,984,440✔
3765
      TSKEY ekey = pWindow->tw.ekey - 1;
1,035,984,440✔
3766
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
1,035,984,440✔
3767
      QUERY_CHECK_CODE(code, lino, _return);
1,035,984,440✔
3768
    }
3769
  }
3770
  if (pInfo->outputWdurationSlotId != -1) {
540,897✔
3771
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
217,602✔
3772
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
217,602✔
3773

3774
    for (int32_t i = 0; i < numOfWins; i++) {
652,560,861✔
3775
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
652,343,259✔
3776
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
652,343,259✔
3777
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
652,343,259✔
3778
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
652,343,259✔
3779
      QUERY_CHECK_CODE(code, lino, _return);
652,343,259✔
3780
    }
3781
  }
3782

3783
  pRes->info.rows = numOfWins;
540,897✔
3784
  *ppRes = pRes;
540,897✔
3785
  pInfo->curWinBatchIdx++;
540,897✔
3786

3787
  return code;
540,897✔
3788

3789
_return:
×
3790
  if (code != TSDB_CODE_SUCCESS) {
×
3791
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3792
    pTaskInfo->code = code;
×
3793
    T_LONG_JMP(pTaskInfo->env, code);
×
3794
  }
3795
  return code;
×
3796
}
3797

3798
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,259,088✔
3799
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
2,259,088✔
3800
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
2,259,316✔
3801
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
2,259,316✔
3802

3803
  pOper->status = OP_NOT_OPENED;
2,259,316✔
3804

3805
  switch (pDyn->qType) {
2,259,316✔
3806
    case DYN_QTYPE_STB_HASH:{
804✔
3807
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
804✔
3808
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
804✔
3809
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
804✔
3810
      
3811
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
804✔
3812
      if (TSDB_CODE_SUCCESS != code) {
804✔
3813
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
3814
        return code;
×
3815
      }
3816
      pStbJoin->ctx.prev.pListHead = NULL;
804✔
3817
      pStbJoin->ctx.prev.joinBuild = false;
804✔
3818
      pStbJoin->ctx.prev.pListTail = NULL;
804✔
3819
      pStbJoin->ctx.prev.tableNum = 0;
804✔
3820

3821
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
804✔
3822
      break; 
804✔
3823
    }
3824
    case DYN_QTYPE_VTB_SCAN:
2,258,512✔
3825
    case DYN_QTYPE_VTB_TS_SCAN:
3826
    case DYN_QTYPE_VTB_AGG:
3827
    case DYN_QTYPE_VTB_INTERVAL: {
3828
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,258,512✔
3829
      
3830
      if (pVtbScan->otbNameToOtbInfoMap) {
2,258,512✔
3831
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3832
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3833
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3834
      }
3835
      if (pVtbScan->pRsp) {
2,258,512✔
3836
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3837
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3838
      }
3839
      if (pVtbScan->colRefInfo) {
2,258,512✔
3840
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
127,302✔
3841
        pVtbScan->colRefInfo = NULL;
127,302✔
3842
      }
3843
      if (pVtbScan->childTableMap) {
2,258,512✔
3844
        taosHashCleanup(pVtbScan->childTableMap);
5,788✔
3845
        pVtbScan->childTableMap = NULL;
5,788✔
3846
      }
3847
      if (pVtbScan->childTableList) {
2,258,512✔
3848
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,258,512✔
3849
      }
3850
      if (pPhyciNode->dynTbname && pTaskInfo) {
2,257,630✔
3851
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3852
      }
3853
      pVtbScan->curTableIdx = 0;
2,258,076✔
3854
      pVtbScan->lastTableIdx = -1;
2,258,294✔
3855
      break;
2,258,512✔
3856
    }
3857
    case DYN_QTYPE_VTB_WINDOW: {
×
3858
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3859
      if (pVtbWindow->pRes) {
×
3860
        blockDataDestroy(pVtbWindow->pRes);
×
3861
        pVtbWindow->pRes = NULL;
×
3862
      }
3863
      if (pVtbWindow->pWins) {
×
3864
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
3865
        pVtbWindow->pWins = NULL;
×
3866
      }
3867
      pVtbWindow->outputWdurationSlotId = -1;
×
3868
      pVtbWindow->outputWendSlotId = -1;
×
3869
      pVtbWindow->outputWstartSlotId = -1;
×
3870
      pVtbWindow->curWinBatchIdx = 0;
×
3871
      break;
×
3872
    }
3873
    default:
×
3874
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
3875
      break;
×
3876
  }
3877
  return 0;
2,258,880✔
3878
}
3879

3880
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
5,402,751✔
3881
  int32_t                    code = TSDB_CODE_SUCCESS;
5,402,751✔
3882
  int32_t                    line = 0;
5,402,751✔
3883
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
5,402,751✔
3884
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
5,402,751✔
3885
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
5,402,751✔
3886
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
5,402,751✔
3887
  SOperatorParam*            pAggParam = NULL;
5,402,751✔
3888

3889
  if (pInfo->vtbScan.hasPartition) {
5,402,751✔
3890
    if (pInfo->vtbScan.batchProcessChild) {
4,689,168✔
3891
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
1,900,168✔
3892
      while (pIter) {
4,379,118✔
3893
        size_t     keyLen = 0;
3,812,710✔
3894
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
3,812,710✔
3895

3896
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
3,812,710✔
3897
        QUERY_CHECK_CODE(code, line, _return);
3,812,710✔
3898

3899
        if (pAggParam) {
3,812,710✔
3900
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
3,289,880✔
3901
          QUERY_CHECK_CODE(code, line, _return);
3,289,880✔
3902
        } else {
3903
          *pRes = NULL;
522,830✔
3904
        }
3905

3906
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
3,812,710✔
3907

3908
        if (*pRes) {
3,812,710✔
3909
          (*pRes)->info.id.groupId = groupid;
1,333,760✔
3910
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
1,333,760✔
3911
          QUERY_CHECK_CODE(code, line, _return);
1,333,760✔
3912
          break;
1,333,760✔
3913
        }
3914
      }
3915
    } else {
3916
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
2,789,000✔
3917
      while (pIter) {
4,099,856✔
3918
        size_t     keyLen = 0;
3,782,288✔
3919
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
3,782,288✔
3920
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
3,782,288✔
3921

3922
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
3,782,288✔
3923
        while (pIter2) {
6,640,400✔
3924
          size_t   keyLen2 = 0;
5,329,544✔
3925
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
5,329,544✔
3926
          SArray*  pTagList = *(SArray**)pIter2;
5,329,544✔
3927

3928
          if (pVtbScan->genNewParam) {
5,329,544✔
3929
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
2,858,112✔
3930
            QUERY_CHECK_CODE(code, line, _return);
2,858,112✔
3931
            if (pAggParam) {
2,858,112✔
3932
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
2,306,176✔
3933
              QUERY_CHECK_CODE(code, line, _return);
2,306,176✔
3934
            } else {
3935
              *pRes = NULL;
551,936✔
3936
            }
3937
          } else {
3938
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
2,471,432✔
3939
            QUERY_CHECK_CODE(code, line, _return);
2,471,432✔
3940
          }
3941

3942
          if (*pRes) {
5,329,544✔
3943
            pVtbScan->genNewParam = false;
2,471,432✔
3944
            (*pRes)->info.id.groupId = *groupid;
2,471,432✔
3945
            break;
2,471,432✔
3946
          }
3947
          pVtbScan->genNewParam = true;
2,858,112✔
3948
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
2,858,112✔
3949
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
2,858,112✔
3950
          QUERY_CHECK_CODE(code, line, _return);
2,858,112✔
3951
        }
3952
        if (*pRes) {
3,782,288✔
3953
          break;
2,471,432✔
3954
        }
3955
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
1,310,856✔
3956
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
1,310,856✔
3957
        QUERY_CHECK_CODE(code, line, _return);
1,310,856✔
3958
      }
3959
    }
3960

3961
  } else {
3962
    if (pInfo->vtbScan.batchProcessChild) {
713,583✔
3963
      code = buildAggOperatorParam(pInfo, &pAggParam);
130,041✔
3964
      QUERY_CHECK_CODE(code, line, _return);
130,041✔
3965

3966
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
130,041✔
3967
      QUERY_CHECK_CODE(code, line, _return);
130,041✔
3968
      setOperatorCompleted(pOperator);
130,041✔
3969
    } else {
3970
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
583,542✔
3971
      while (pIter) {
1,200,990✔
3972
        size_t   keyLen = 0;
1,129,698✔
3973
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
1,129,698✔
3974
        SArray*  pTagList = *(SArray**)pIter;
1,129,698✔
3975

3976
        if (pVtbScan->genNewParam) {
1,129,698✔
3977
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
617,448✔
3978
          QUERY_CHECK_CODE(code, line, _return);
617,448✔
3979

3980
          if (pAggParam) {
617,448✔
3981
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
479,464✔
3982
            QUERY_CHECK_CODE(code, line, _return);
479,464✔
3983
          } else {
3984
            *pRes = NULL;
137,984✔
3985
          }
3986
        } else {
3987
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
512,250✔
3988
          QUERY_CHECK_CODE(code, line, _return);
512,250✔
3989
        }
3990

3991
        if (*pRes) {
1,129,698✔
3992
          pVtbScan->genNewParam = false;
512,250✔
3993
          break;
512,250✔
3994
        }
3995
        pVtbScan->genNewParam = true;
617,448✔
3996
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
617,448✔
3997
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
617,448✔
3998
        QUERY_CHECK_CODE(code, line, _return);
617,448✔
3999
      }
4000
    }
4001
  }
4002
_return:
5,402,751✔
4003
  if (code) {
5,402,751✔
4004
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
4005
  }
4006
  return code;
5,402,751✔
4007
}
4008

4009
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
5,532,792✔
4010
  int32_t                    code = TSDB_CODE_SUCCESS;
5,532,792✔
4011
  int32_t                    line = 0;
5,532,792✔
4012
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
5,532,792✔
4013
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
5,532,792✔
4014

4015
  QRY_PARAM_CHECK(pRes);
5,532,792✔
4016
  if (pOperator->status == OP_EXEC_DONE) {
5,532,792✔
4017
    return code;
130,041✔
4018
  }
4019

4020
  code = pOperator->fpSet._openFn(pOperator);
5,402,751✔
4021
  QUERY_CHECK_CODE(code, line, _return);
5,402,751✔
4022

4023
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
5,402,751✔
4024
    setOperatorCompleted(pOperator);
×
4025
    return code;
×
4026
  }
4027

4028
  code = virtualTableAggGetNext(pOperator, pRes);
5,402,751✔
4029
  QUERY_CHECK_CODE(code, line, _return);
5,402,751✔
4030

4031
  return code;
5,402,751✔
4032

4033
_return:
×
4034
  if (code) {
×
4035
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
4036
    pOperator->pTaskInfo->code = code;
×
4037
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
4038
  }
4039
  return code;
×
4040
}
4041

4042
/*
4043
 * Build hash-interval operator params for interval dynamic query.
4044
 *
4045
 * @param pInfo Dynamic-query control operator runtime info.
4046
 * @param ppRes Output interval operator param.
4047
 *
4048
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4049
 */
4050
static int32_t buildHashIntervalOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
×
4051
  int32_t                   code = TSDB_CODE_SUCCESS;
×
4052
  int32_t                   lino = 0;
×
4053
  SOperatorParam*           pParam = NULL;
×
4054
  SOperatorParam*           pExchangeParam = NULL;
×
4055
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
4056

4057
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
×
4058
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
×
4059

4060
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
×
4061
  pParam->downstreamIdx = 0;
×
4062
  pParam->reUse = false;
×
4063
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
×
4064
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
×
4065

4066
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
×
4067
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
×
4068

4069
  code = buildBatchExchangeOperatorParamForVirtual(
×
4070
      &pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap,
4071
      (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_INTERVAL_SCAN,
×
4072
      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
4073
  QUERY_CHECK_CODE(code, lino, _return);
×
4074

4075
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
×
4076
  pExchangeParam = NULL;
×
4077

4078
  *ppRes = pParam;
×
4079

4080
  return code;
×
4081
_return:
×
4082
  if (pExchangeParam) {
×
4083
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
4084
  }
4085
  if (pParam) {
×
4086
    freeOperatorParam(pParam, OP_GET_PARAM);
×
4087
  }
4088
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4089
  return code;
×
4090
}
4091

4092
/*
4093
 * Build merge operator params for split-interval execution.
4094
 *
4095
 * @param pInfo Dynamic-query control operator runtime info.
4096
 * @param pMergeOp Merge operator used by interval execution.
4097
 * @param ppRes Output merge operator param.
4098
 *
4099
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4100
 */
4101
static int32_t buildSplitIntervalMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorInfo* pMergeOp,
25,053✔
4102
                                                    SOperatorParam** ppRes) {
4103
  int32_t              code = TSDB_CODE_SUCCESS;
25,053✔
4104
  int32_t              lino = 0;
25,053✔
4105
  SVtbScanDynCtrlInfo* pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
25,053✔
4106
  SOperatorParam*      pExchangeParam = NULL;
25,053✔
4107
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
25,053✔
4108
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
25,053✔
4109

4110
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
25,053✔
4111
  (*ppRes)->downstreamIdx = 0;
25,053✔
4112
  (*ppRes)->reUse = false;
25,053✔
4113
  (*ppRes)->value = taosMemoryCalloc(1, sizeof(SMergeOperatorParam));
25,053✔
4114
  QUERY_CHECK_NULL((*ppRes)->value, code, lino, _return, terrno)
25,053✔
4115

4116
  (*ppRes)->pChildren = taosArrayInit(pMergeOp->numOfDownstream, POINTER_BYTES);
25,053✔
4117
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
25,053✔
4118

4119
  for (int32_t i = 0; i < pMergeOp->numOfDownstream; ++i) {
75,159✔
4120
    code = buildBatchExchangeOperatorParamForVirtual(
50,106✔
4121
        &pExchangeParam, i, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap,
4122
        (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN,
50,106✔
4123
        QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
4124
    QUERY_CHECK_CODE(code, lino, _return);
50,106✔
4125
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
100,212✔
4126
    pExchangeParam = NULL;
50,106✔
4127
  }
4128

4129
  return code;
25,053✔
4130

4131
_return:
×
4132
  if (pExchangeParam) {
×
4133
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
4134
  }
4135
  freeOperatorParam(*ppRes, OP_GET_PARAM);
×
4136
  *ppRes = NULL;
×
4137
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4138
  return code;
×
4139
}
4140

4141
/*
4142
 * Build split-interval root operator params with one merge child.
4143
 *
4144
 * @param pInfo Dynamic-query control operator runtime info.
4145
 * @param pIntervalOp Interval operator whose downstream contains merge node.
4146
 * @param ppRes Output split-interval operator param.
4147
 *
4148
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4149
 */
4150
static int32_t buildSplitIntervalOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorInfo* pIntervalOp,
25,053✔
4151
                                               SOperatorParam** ppRes) {
4152
  int32_t         code = TSDB_CODE_SUCCESS;
25,053✔
4153
  int32_t         lino = 0;
25,053✔
4154
  SOperatorInfo*  pMergeOp = NULL;
25,053✔
4155
  SOperatorParam* pMergeParam = NULL;
25,053✔
4156

4157
  QUERY_CHECK_NULL(pIntervalOp->pDownstream, code, lino, _return, TSDB_CODE_INVALID_PARA)
25,053✔
4158
  pMergeOp = pIntervalOp->pDownstream[0];
25,053✔
4159
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, TSDB_CODE_INVALID_PARA)
25,053✔
4160
  if (QUERY_NODE_PHYSICAL_PLAN_MERGE != pMergeOp->operatorType) {
25,053✔
4161
    qError("%s invalid downstream operator type %d for split interval", __func__, pMergeOp->operatorType);
×
4162
    return TSDB_CODE_INVALID_PARA;
×
4163
  }
4164

4165
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
25,053✔
4166
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
25,053✔
4167

4168
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
25,053✔
4169
  (*ppRes)->downstreamIdx = 0;
25,053✔
4170
  (*ppRes)->reUse = false;
25,053✔
4171
  (*ppRes)->value = taosMemoryCalloc(1, sizeof(SAggOperatorParam));
25,053✔
4172
  QUERY_CHECK_NULL((*ppRes)->value, code, lino, _return, terrno)
25,053✔
4173

4174
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
25,053✔
4175
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
25,053✔
4176

4177
  code = buildSplitIntervalMergeOperatorParam(pInfo, pMergeOp, &pMergeParam);
25,053✔
4178
  QUERY_CHECK_CODE(code, lino, _return);
25,053✔
4179
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pMergeParam), code, lino, _return, terrno)
50,106✔
4180
  pMergeParam = NULL;
25,053✔
4181

4182
  return code;
25,053✔
4183

4184
_return:
×
4185
  if (pMergeParam) {
×
4186
    freeOperatorParam(pMergeParam, OP_GET_PARAM);
×
4187
  }
4188
  freeOperatorParam(*ppRes, OP_GET_PARAM);
×
4189
  *ppRes = NULL;
×
4190
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4191
  return code;
×
4192
}
4193

4194
/*
4195
 * Dispatch interval-param builder by interval operator physical type.
4196
 *
4197
 * @param pInfo Dynamic-query control operator runtime info.
4198
 * @param pIntervalOp Interval operator to build params for.
4199
 * @param ppRes Output interval operator param.
4200
 *
4201
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4202
 */
4203
static int32_t buildIntervalOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorInfo* pIntervalOp,
25,053✔
4204
                                          SOperatorParam** ppRes) {
4205
  if (NULL == pIntervalOp) {
25,053✔
4206
    return TSDB_CODE_INVALID_PARA;
×
4207
  }
4208

4209
  switch (pIntervalOp->operatorType) {
25,053✔
4210
    case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
×
4211
      return buildHashIntervalOperatorParam(pInfo, ppRes);
×
4212
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
25,053✔
4213
      return buildSplitIntervalOperatorParam(pInfo, pIntervalOp, ppRes);
25,053✔
4214
    default:
×
4215
      qError("%s unsupported interval operator type %d", __func__, pIntervalOp->operatorType);
×
4216
      return TSDB_CODE_INVALID_PARA;
×
4217
  }
4218
}
4219

4220
/*
4221
 * Get next result block for virtual-table interval workflow.
4222
 *
4223
 * @param pOperator Dynamic-query control operator.
4224
 * @param pRes Output result data block pointer.
4225
 *
4226
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4227
 */
4228
int32_t vtbIntervalNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
56,071✔
4229
  int32_t                    code = TSDB_CODE_SUCCESS;
56,071✔
4230
  int32_t                    line = 0;
56,071✔
4231
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
56,071✔
4232
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
56,071✔
4233
  SOperatorInfo*             pInterval = pOperator->pDownstream[pOperator->numOfDownstream - 1];
56,071✔
4234
  SOperatorParam*            pIntervalParam = NULL;
56,071✔
4235
  SSDataBlock*               pResult = NULL;
56,071✔
4236

4237
  QRY_PARAM_CHECK(pRes);
56,071✔
4238
  if (pOperator->status == OP_EXEC_DONE) {
56,071✔
4239
    return code;
×
4240
  }
4241

4242
  code = pOperator->fpSet._openFn(pOperator);
56,071✔
4243
  QUERY_CHECK_CODE(code, line, _return);
56,071✔
4244

4245
  if (pVtbScan->genNewParam) {
56,071✔
4246
    qDebug("%s vtb interval split start, intervalOp:%s type:%d", GET_TASKID(pOperator->pTaskInfo), pInterval->name,
25,053✔
4247
           pInterval->operatorType);
4248
    code = buildIntervalOperatorParam(pInfo, pInterval, &pIntervalParam);
25,053✔
4249
    QUERY_CHECK_CODE(code, line, _return);
25,053✔
4250

4251
    code = pInterval->fpSet.getNextExtFn(pInterval, pIntervalParam, &pResult);
25,053✔
4252
    QUERY_CHECK_CODE(code, line, _return);
25,053✔
4253

4254
    pVtbScan->genNewParam = false;
25,053✔
4255
  } else {
4256
    code = pInterval->fpSet.getNextFn(pInterval, &pResult);
31,018✔
4257
    QUERY_CHECK_CODE(code, line, _return);
31,018✔
4258
  }
4259

4260
  if (pResult) {
56,071✔
4261
    qDebug("%s vtb interval got result rows:%" PRId64 " status:%d", GET_TASKID(pOperator->pTaskInfo),
31,018✔
4262
           pResult->info.rows, pInterval->status);
4263
    *pRes = pResult;
31,018✔
4264
  } else {
4265
    qDebug("%s vtb interval got empty result, interval status:%d", GET_TASKID(pOperator->pTaskInfo), pInterval->status);
25,053✔
4266
    *pRes = NULL;
25,053✔
4267
    setOperatorCompleted(pOperator);
25,053✔
4268
  }
4269

4270
  return code;
56,071✔
4271

4272
_return:
×
4273
  if (code) {
×
4274
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
4275
    pOperator->pTaskInfo->code = code;
×
4276
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
4277
  }
4278
  return code;
×
4279
}
4280

4281
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
4,303,406✔
4282
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
4283
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
4284
  QRY_PARAM_CHECK(pOptrInfo);
4,303,406✔
4285

4286
  int32_t                    code = TSDB_CODE_SUCCESS;
4,303,406✔
4287
  int32_t                    line = 0;
4,303,406✔
4288
  __optr_fn_t                nextFp = NULL;
4,303,406✔
4289
  __optr_open_fn_t           openFp = NULL;
4,303,406✔
4290
  SOperatorInfo*             pOperator = NULL;
4,303,406✔
4291
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
4,303,406✔
4292
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
4,303,406✔
4293

4294
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
4,303,406✔
4295
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
4,303,406✔
4296

4297
  pOperator->pPhyNode = pPhyciNode;
4,303,406✔
4298
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
4,303,406✔
4299

4300
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
4,303,406✔
4301
  QUERY_CHECK_CODE(code, line, _error);
4,303,406✔
4302

4303
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
4,303,406✔
4304
                  pInfo, pTaskInfo);
4305

4306
  pInfo->qType = pPhyciNode->qType;
4,303,406✔
4307
  switch (pInfo->qType) {
4,303,406✔
4308
    case DYN_QTYPE_STB_HASH:
1,207,099✔
4309
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,207,099✔
4310
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,207,099✔
4311
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,207,099✔
4312
      QUERY_CHECK_CODE(code, line, _error);
1,207,099✔
4313
      nextFp = seqStableJoin;
1,207,099✔
4314
      openFp = optrDummyOpenFn;
1,207,099✔
4315
      break;
1,207,099✔
4316
    case DYN_QTYPE_VTB_SCAN:
1,438,520✔
4317
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,438,520✔
4318
      QUERY_CHECK_CODE(code, line, _error);
1,438,738✔
4319
      nextFp = vtbScanNext;
1,438,738✔
4320
      openFp = vtbScanOpen;
1,438,738✔
4321
      break;
1,438,738✔
4322
    case DYN_QTYPE_VTB_TS_SCAN:
33,404✔
4323
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
33,404✔
4324
      QUERY_CHECK_CODE(code, line, _error);
33,404✔
4325
      nextFp = vtbTsScanNext;
33,404✔
4326
      openFp = vtbDefaultOpen;
33,404✔
4327
      break;
33,404✔
4328
    case DYN_QTYPE_VTB_WINDOW:
513,803✔
4329
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
513,803✔
4330
      QUERY_CHECK_CODE(code, line, _error);
513,803✔
4331
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
513,803✔
4332
      QUERY_CHECK_CODE(code, line, _error);
513,803✔
4333
      nextFp = vtbWindowNext;
513,803✔
4334
      openFp = vtbWindowOpen;
513,803✔
4335
      break;
513,803✔
4336
    case DYN_QTYPE_VTB_AGG:
1,085,309✔
4337
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,085,309✔
4338
      QUERY_CHECK_CODE(code, line, _error);
1,085,309✔
4339
      nextFp = vtbAggNext;
1,085,309✔
4340
      openFp = vtbDefaultOpen;
1,085,309✔
4341
      break;
1,085,309✔
4342
    case DYN_QTYPE_VTB_INTERVAL:
25,053✔
4343
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
25,053✔
4344
      QUERY_CHECK_CODE(code, line, _error);
25,053✔
4345
      nextFp = vtbIntervalNext;
25,053✔
4346
      openFp = vtbDefaultOpen;
25,053✔
4347
      break;
25,053✔
4348
    default:
×
4349
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
4350
      code = TSDB_CODE_INVALID_PARA;
×
4351
      goto _error;
×
4352
  }
4353

4354
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
4,303,406✔
4355
                                         NULL, optrDefaultGetNextExtFn, NULL);
4356

4357
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
4,303,406✔
4358
  *pOptrInfo = pOperator;
4,303,406✔
4359
  return TSDB_CODE_SUCCESS;
4,303,406✔
4360

4361
_error:
×
4362
  if (pInfo != NULL) {
×
4363
    destroyDynQueryCtrlOperator(pInfo);
×
4364
  }
4365
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
4366
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
4367
  pTaskInfo->code = code;
×
4368
  return code;
×
4369
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc