• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5012

03 Apr 2026 03:59PM UTC coverage: 72.305% (+0.005%) from 72.3%
#5012

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

13105 existing lines in 173 files now uncovered.

257446 of 356056 relevant lines covered (72.3%)

131779375.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.5
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "nodes.h"
18
#include "operator.h"
19
#include "os.h"
20
#include "plannodes.h"
21
#include "query.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tarray.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "thash.h"
28
#include "tmsg.h"
29
#include "trpc.h"
30
#include "ttypes.h"
31
#include "tdataformat.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,223,955✔
41
  SStbJoinTableList* pNext = NULL;
1,223,955✔
42
  
43
  while (pListHead) {
1,224,486✔
44
    taosMemoryFree(pListHead->pLeftVg);
531✔
45
    taosMemoryFree(pListHead->pLeftUid);
531✔
46
    taosMemoryFree(pListHead->pRightVg);
531✔
47
    taosMemoryFree(pListHead->pRightUid);
531✔
48
    pNext = pListHead->pNext;
531✔
49
    taosMemoryFree(pListHead);
531✔
50
    pListHead = pNext;
531✔
51
  }
52
}
1,223,955✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,223,955✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,223,955✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1,223,955✔
60
    if (pStbJoin->ctx.prev.leftHash) {
1,222,773✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,146,688✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,146,688✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1,222,773✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,146,688✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,146,688✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
1,182✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,182✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
1,182✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,182✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
1,182✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,182✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,223,955✔
81
}
1,223,955✔
82

83
void destroyColRefInfo(void *info) {
171,925,935✔
84
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
171,925,935✔
85
  if (pColRefInfo) {
171,925,935✔
86
    taosMemoryFree(pColRefInfo->colName);
171,925,935✔
87
    taosMemoryFree(pColRefInfo->colrefName);
171,925,935✔
88
  }
89
}
171,925,935✔
90

91
void destroyColRefArray(void *info) {
8,628,593✔
92
  SArray *pColRefArray = *(SArray **)info;
8,628,593✔
93
  if (pColRefArray) {
8,628,593✔
94
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
8,628,593✔
95
  }
96
}
8,628,593✔
97

98
void freeUseDbOutput(void* pOutput) {
3,397,776✔
99
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
3,397,776✔
100
  if (NULL == pOutput) {
3,397,776✔
101
    return;
×
102
  }
103

104
  if (pOut->dbVgroup) {
3,397,776✔
105
    freeVgInfo(pOut->dbVgroup);
3,397,776✔
106
  }
107
  taosMemFree(pOut);
3,397,776✔
108
}
109

110
void destroyOtbInfoArray(void *info) {
2,882,732✔
111
  SArray *pOtbInfoArray = *(SArray **)info;
2,882,732✔
112
  if (pOtbInfoArray) {
2,882,732✔
113
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
2,882,732✔
114
  }
115
}
2,882,732✔
116

117
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
1,227,489✔
118
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
1,227,489✔
119
  if (pOtbVgIdToOtbInfoArrayMap) {
1,227,489✔
120
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
1,227,489✔
121
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
1,227,489✔
122
  }
123
}
1,227,489✔
124

125
void destroyTagList(void *info) {
695,060✔
126
  SArray *pTagList = *(SArray **)info;
695,060✔
127
  if (pTagList) {
695,060✔
128
    taosArrayDestroyEx(pTagList, destroyTagVal);
695,060✔
129
  }
130
}
695,060✔
131

132
static void destroyRefColIdGroup(void *info) {
7,683✔
133
  SRefColIdGroup *pGroup = (SRefColIdGroup *)info;
7,683✔
134
  if (pGroup && pGroup->pSlotIdList) {
7,683✔
135
    taosArrayDestroy(pGroup->pSlotIdList);
7,683✔
136
    pGroup->pSlotIdList = NULL;
7,683✔
137
  }
138
}
7,683✔
139

140
void destroyVtbUidTagListMap(void *info) {
255,204✔
141
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
255,204✔
142
  if (pVtbUidTagListMap) {
255,204✔
143
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
255,204✔
144
    taosHashCleanup(pVtbUidTagListMap);
255,204✔
145
  }
146
}
255,204✔
147

148
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
2,492,061✔
149
  if (pVtbScan->dbName) {
2,492,061✔
150
    taosMemoryFreeClear(pVtbScan->dbName);
2,492,061✔
151
  }
152
  if (pVtbScan->tbName) {
2,492,061✔
153
    taosMemoryFreeClear(pVtbScan->tbName);
2,492,061✔
154
  }
155
  if (pVtbScan->refColGroups) {
2,492,061✔
156
    taosArrayDestroyEx(pVtbScan->refColGroups, destroyRefColIdGroup);
1,283✔
157
    pVtbScan->refColGroups = NULL;
1,283✔
158
  }
159
  if (pVtbScan->childTableList) {
2,492,061✔
160
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
2,492,061✔
161
  }
162
  if (pVtbScan->colRefInfo) {
2,492,061✔
163
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
6,073✔
164
    pVtbScan->colRefInfo = NULL;
6,073✔
165
  }
166
  if (pVtbScan->childTableMap) {
2,492,061✔
167
    taosHashCleanup(pVtbScan->childTableMap);
2,142,212✔
168
  }
169
  if (pVtbScan->readColList) {
2,492,061✔
170
    taosArrayDestroy(pVtbScan->readColList);
2,492,061✔
171
  }
172
  if (pVtbScan->readColSet) {
2,492,061✔
173
    taosHashCleanup(pVtbScan->readColSet);
2,492,061✔
174
  }
175
  if (pVtbScan->dbVgInfoMap) {
2,492,061✔
176
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
2,492,061✔
177
    taosHashCleanup(pVtbScan->dbVgInfoMap);
2,492,061✔
178
  }
179
  if (pVtbScan->otbNameToOtbInfoMap) {
2,492,061✔
180
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
640,917✔
181
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
640,917✔
182
  }
183
  if (pVtbScan->pRsp) {
2,492,061✔
184
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
185
    taosMemoryFreeClear(pVtbScan->pRsp);
×
186
  }
187
  if (pVtbScan->existOrgTbVg) {
2,492,061✔
188
    taosHashCleanup(pVtbScan->existOrgTbVg);
2,492,061✔
189
  }
190
  if (pVtbScan->curOrgTbVg) {
2,492,061✔
191
    taosHashCleanup(pVtbScan->curOrgTbVg);
6,536✔
192
  }
193
  if (pVtbScan->newAddedVgInfo) {
2,492,061✔
194
    taosHashCleanup(pVtbScan->newAddedVgInfo);
936✔
195
  }
196
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
2,492,061✔
197
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
436,588✔
198
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
436,588✔
199
  }
200
  if (pVtbScan->vtbUidToVgIdMapMap) {
2,492,061✔
201
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
79,940✔
202
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
79,940✔
203
  }
204
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
2,492,061✔
205
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
123,095✔
206
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
123,095✔
207
  }
208
  if (pVtbScan->vtbUidTagListMap) {
2,492,061✔
209
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
79,940✔
210
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
79,940✔
211
  }
212
  if (pVtbScan->vtbGroupIdTagListMap) {
2,492,061✔
213
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
183,143✔
214
  }
215
  if (pVtbScan->vtbUidToGroupIdMap) {
2,492,061✔
216
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
183,143✔
217
  }
218
}
2,492,061✔
219

220
void destroyWinArray(void *info) {
1,779,923✔
221
  SArray *pWinArray = *(SArray **)info;
1,779,923✔
222
  if (pWinArray) {
1,779,923✔
223
    taosArrayDestroy(pWinArray);
1,779,923✔
224
  }
225
}
1,779,923✔
226

227
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
612,671✔
228
  if (pVtbWindow->pRes) {
612,671✔
229
    blockDataDestroy(pVtbWindow->pRes);
612,671✔
230
  }
231
  if (pVtbWindow->pWins) {
612,671✔
232
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
612,671✔
233
  }
234
}
612,671✔
235

236
static void destroyDynQueryCtrlOperator(void* param) {
3,715,185✔
237
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
3,715,185✔
238

239
  switch (pDyn->qType) {
3,715,185✔
240
    case DYN_QTYPE_STB_HASH:
1,223,124✔
241
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,223,124✔
242
      break;
1,223,124✔
243
    case DYN_QTYPE_VTB_WINDOW:
612,671✔
244
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
612,671✔
245
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
612,671✔
246
      break;
612,671✔
247
    case DYN_QTYPE_VTB_INTERVAL:
1,879,390✔
248
    case DYN_QTYPE_VTB_AGG:
249
    case DYN_QTYPE_VTB_SCAN:
250
    case DYN_QTYPE_VTB_TS_SCAN:
251
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
1,879,390✔
252
      break;
1,879,390✔
253
    default:
×
254
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
255
      break;
×
256
  }
257

258
  taosMemoryFreeClear(param);
3,715,185✔
259
}
3,715,185✔
260

261
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
262
  if (batchFetch) {
7,804,648✔
263
    return true;
7,799,920✔
264
  }
265
  
266
  if (rightTable) {
4,728✔
267
    return pPost->rightCurrUid == pPost->rightNextUid;
2,364✔
268
  }
269

270
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,364✔
271

272
  return (NULL == num) ? false : true;
2,364✔
273
}
274

275
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,902,324✔
276
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,902,324✔
277
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,902,324✔
278
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,902,324✔
279
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,902,324✔
280
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,902,324✔
281
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,902,324✔
282
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,902,324✔
283
  int64_t                    readIdx = pNode->readIdx + 1;
3,902,324✔
284
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,902,324✔
285

286
  pPost->leftCurrUid = *leftUid;
3,902,324✔
287
  pPost->rightCurrUid = *rightUid;
3,902,324✔
288

289
  pPost->leftVgId = *leftVgId;
3,902,324✔
290
  pPost->rightVgId = *rightVgId;
3,902,324✔
291

292
  while (true) {
293
    if (readIdx < pNode->uidNum) {
3,902,324✔
294
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,825,588✔
295
      break;
3,825,588✔
296
    }
297
    
298
    pNode = pNode->pNext;
76,736✔
299
    if (NULL == pNode) {
76,736✔
300
      pPost->rightNextUid = 0;
76,736✔
301
      break;
76,736✔
302
    }
303
    
304
    rightUid = pNode->pRightUid;
×
305
    readIdx = 0;
×
306
  }
307

308
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,804,648✔
309
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,804,648✔
310

311
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,902,324✔
312
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
313
    pStbJoin->execInfo.rightCacheNum++;
×
314
  }  
315

316
  return TSDB_CODE_SUCCESS;
3,902,324✔
317
}
318

319
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
11,734,132✔
320
  int32_t     code = TSDB_CODE_SUCCESS;
11,734,132✔
321
  int32_t     lino = 0;
11,734,132✔
322
  SOrgTbInfo* pTbInfo = NULL;
11,734,132✔
323

324
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
11,734,132✔
325

326
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
11,734,132✔
327
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
11,734,132✔
328

329
  pTbInfo->vgId = pSrc->vgId;
11,734,132✔
330
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
11,734,132✔
331

332
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
11,734,132✔
333
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
11,734,132✔
334

335
  *ppDst = pTbInfo;
11,734,132✔
336

337
  return code;
11,734,132✔
338
_return:
×
339
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
340
  if (pTbInfo) {
×
341
    if (pTbInfo->colMap) {
×
342
      taosArrayDestroy(pTbInfo->colMap);
×
343
    }
344
    taosMemoryFreeClear(pTbInfo);
×
345
  }
346
  return code;
×
347
}
348

349
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
839,748✔
350
  int32_t  code = TSDB_CODE_SUCCESS;
839,748✔
351
  int32_t  lino = 0;
839,748✔
352
  STagVal  tmpTag;
839,748✔
353

354
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
839,748✔
355
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
839,748✔
356

357
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
5,201,664✔
358
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
4,361,916✔
359
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
4,361,916✔
360
    tmpTag.type = pSrcTag->type;
4,361,916✔
361
    tmpTag.cid = pSrcTag->cid;
4,361,916✔
362
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,361,916✔
363
      tmpTag.nData = pSrcTag->nData;
1,893,564✔
364
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
1,893,564✔
365
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
1,893,564✔
366
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
1,893,564✔
367
    } else {
368
      tmpTag.i64 = pSrcTag->i64;
2,468,352✔
369
    }
370

371
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
8,723,832✔
372
    tmpTag = (STagVal){0};
4,361,916✔
373
  }
374

375
  return code;
839,748✔
376
_return:
×
377
  if (pBasic->tagList) {
×
378
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
379
    pBasic->tagList = NULL;
×
380
  }
381
  if (tmpTag.pData) {
×
382
    taosMemoryFree(tmpTag.pData);
×
383
  }
384
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
385
  return code;
×
386
}
387

388
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
4,192,782✔
389
  int32_t     code = TSDB_CODE_SUCCESS;
4,192,782✔
390
  int32_t     lino = 0;
4,192,782✔
391
  SOrgTbInfo  batchInfo;
4,192,782✔
392

393
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
4,192,782✔
394
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
4,192,782✔
395

396
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
12,459,705✔
397
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
8,266,923✔
398
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
8,266,923✔
399
    batchInfo.vgId = pSrc->vgId;
8,266,923✔
400
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
8,266,923✔
401
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
8,266,923✔
402
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
8,266,923✔
403
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
16,533,846✔
404
    batchInfo = (SOrgTbInfo){0};
8,266,923✔
405
  }
406

407
  return code;
4,192,782✔
408
_return:
×
409
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
410
  if (pBasic->batchOrgTbInfo) {
×
411
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
412
    pBasic->batchOrgTbInfo = NULL;
×
413
  }
414
  if (batchInfo.colMap) {
×
415
    taosArrayDestroy(batchInfo.colMap);
×
416
    batchInfo.colMap = NULL;
×
417
  }
418
  return code;
×
419
}
420

421
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,804,648✔
422
  int32_t code = TSDB_CODE_SUCCESS;
7,804,648✔
423
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
7,804,648✔
424
  if (NULL == *ppRes) {
7,804,648✔
425
    code = terrno;
×
426
    freeOperatorParam(pChild, OP_GET_PARAM);
×
427
    return code;
×
428
  }
429
  if (pChild) {
7,804,648✔
430
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
156,898✔
431
    if (NULL == (*ppRes)->pChildren) {
156,898✔
432
      code = terrno;
×
433
      freeOperatorParam(pChild, OP_GET_PARAM);
×
434
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
435
      *ppRes = NULL;
×
436
      return code;
×
437
    }
438
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
313,796✔
439
      code = terrno;
×
440
      freeOperatorParam(pChild, OP_GET_PARAM);
×
441
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
442
      *ppRes = NULL;
×
443
      return code;
×
444
    }
445
  } else {
446
    (*ppRes)->pChildren = NULL;
7,647,750✔
447
  }
448

449
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,804,648✔
450
  if (NULL == pGc) {
7,804,648✔
451
    code = terrno;
×
452
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
453
    *ppRes = NULL;
×
454
    return code;
×
455
  }
456

457
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,804,648✔
458
  pGc->downstreamIdx = downstreamIdx;
7,804,648✔
459
  pGc->vgId = vgId;
7,804,648✔
460
  pGc->tbUid = tbUid;
7,804,648✔
461
  pGc->needCache = needCache;
7,804,648✔
462

463
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,804,648✔
464
  (*ppRes)->downstreamIdx = downstreamIdx;
7,804,648✔
465
  (*ppRes)->value = pGc;
7,804,648✔
466
  (*ppRes)->reUse = false;
7,804,648✔
467

468
  return TSDB_CODE_SUCCESS;
7,804,648✔
469
}
470

471

472
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
473
  int32_t code = TSDB_CODE_SUCCESS;
×
474
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
×
475
  if (NULL == *ppRes) {
×
476
    return terrno;
×
477
  }
478
  (*ppRes)->pChildren = NULL;
×
479

480
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
481
  if (NULL == pGc) {
×
482
    code = terrno;
×
483
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
484
    return code;
×
485
  }
486

487
  pGc->downstreamIdx = downstreamIdx;
×
488
  pGc->vgId = vgId;
×
489
  pGc->tbUid = tbUid;
×
490

491
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
492
  (*ppRes)->downstreamIdx = downstreamIdx;
×
493
  (*ppRes)->value = pGc;
×
494
  (*ppRes)->reUse = false;
×
495

496
  return TSDB_CODE_SUCCESS;
×
497
}
498

499
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
22,032,211✔
500
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
501
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
502
                                               SArray* pOrgTbInfoArray, STimeWindow window,
503
                                               SDownstreamSourceNode* pDownstreamSourceNode,
504
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
505
  int32_t code = TSDB_CODE_SUCCESS;
22,032,211✔
506
  int32_t lino = 0;
22,032,211✔
507

508
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
22,032,211✔
509
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
510

511
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
22,032,211✔
512
  pBasic->srcOpType = srcOpType;
22,032,211✔
513
  pBasic->vgId = vgId;
22,032,211✔
514
  pBasic->groupid = groupId;
22,032,211✔
515
  pBasic->window = window;
22,032,211✔
516
  pBasic->tableSeq = tableSeq;
22,032,211✔
517
  pBasic->type = exchangeType;
22,032,211✔
518
  pBasic->isNewParam = isNewParam;
22,032,211✔
519

520
  if (pDownstreamSourceNode) {
22,032,211✔
521
    pBasic->isNewDeployed = true;
2,574✔
522
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,574✔
523
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,574✔
524
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,574✔
525
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,574✔
526
    pBasic->newDeployedSrc.localExec = false;
2,574✔
527
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,574✔
528
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,574✔
529
  } else {
530
    pBasic->isNewDeployed = false;
22,029,637✔
531
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
22,029,637✔
532
  }
533

534
  if (pUidList) {
22,032,211✔
535
    pBasic->uidList = taosArrayDup(pUidList, NULL);
5,353,251✔
536
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
5,353,251✔
537
  } else {
538
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
16,678,960✔
539
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
16,678,960✔
540
  }
541

542
  if (pOrgTbInfo) {
22,032,211✔
543
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
11,734,132✔
544
    QUERY_CHECK_CODE(code, lino, _return);
11,734,132✔
545
  } else {
546
    pBasic->orgTbInfo = NULL;
10,298,079✔
547
  }
548

549
  if (pTagList) {
22,032,211✔
550
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
839,748✔
551
    QUERY_CHECK_CODE(code, lino, _return);
839,748✔
552
  } else {
553
    pBasic->tagList = NULL;
21,192,463✔
554
  }
555

556
  if (pOrgTbInfoArray) {
22,032,211✔
557
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
4,192,782✔
558
    QUERY_CHECK_CODE(code, lino, _return);
4,192,782✔
559
  } else {
560
    pBasic->batchOrgTbInfo = NULL;
17,839,429✔
561
  }
562
  return code;
22,032,211✔
563

564
_return:
×
565
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
566
  freeExchangeGetBasicOperatorParam(pBasic);
×
567
  return code;
×
568
}
569

570
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
17,591,018✔
571
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
572
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
573
                                              SArray* pOrgTbInfoArray, STimeWindow window,
574
                                              SDownstreamSourceNode* pDownstreamSourceNode,
575
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
576

577
  int32_t                      code = TSDB_CODE_SUCCESS;
17,591,018✔
578
  int32_t                      lino = 0;
17,591,018✔
579
  SOperatorParam*              pParam = NULL;
17,591,018✔
580
  SExchangeOperatorParam*      pExc = NULL;
17,591,018✔
581

582
  *ppRes = NULL;
17,591,018✔
583

584
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
17,591,018✔
585
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
17,591,018✔
586

587
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
17,591,018✔
588
  pParam->downstreamIdx = downstreamIdx;
17,591,018✔
589
  pParam->reUse = reUse;
17,591,018✔
590
  pParam->pChildren = NULL;
17,591,018✔
591
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
17,591,018✔
592
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
17,591,018✔
593

594
  pExc = (SExchangeOperatorParam*)pParam->value;
17,591,018✔
595
  pExc->multiParams = false;
17,591,018✔
596

597
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
17,591,018✔
598
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
599
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
600
  QUERY_CHECK_CODE(code, lino, _return);
17,591,018✔
601

602
  *ppRes = pParam;
17,591,018✔
603
  return code;
17,591,018✔
604
_return:
×
605
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
606
  if (pParam) {
×
607
    freeOperatorParam(pParam, OP_GET_PARAM);
×
608
  }
609
  return code;
×
610
}
611

612
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,728✔
613
  int32_t code = TSDB_CODE_SUCCESS;
4,728✔
614
  int32_t lino = 0;
4,728✔
615

616
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,728✔
617
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,728✔
618

619
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,728✔
620

621
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,728✔
622
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,728✔
623
  QUERY_CHECK_CODE(code, lino, _return);
4,728✔
624

625
_return:
4,728✔
626
  if (code) {
4,728✔
627
    qError("failed to build exchange operator param, code:%d", code);
×
628
  }
629
  taosArrayDestroy(pUidList);
4,728✔
630
  return code;
4,728✔
631
}
632

633
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
752,046✔
634
  int32_t                   code = TSDB_CODE_SUCCESS;
752,046✔
635
  int32_t                   lino = 0;
752,046✔
636

637
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VTB_WIN_SCAN,
752,046✔
638
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
639
  QUERY_CHECK_CODE(code, lino, _return);
752,046✔
640

641
  return code;
752,046✔
642
_return:
×
643
  qError("failed to build exchange operator param for external window, code:%d", code);
×
644
  return code;
×
645
}
646

647
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
5,100,112✔
648
  int32_t                      code = TSDB_CODE_SUCCESS;
5,100,112✔
649
  int32_t                      lino = 0;
5,100,112✔
650
  SArray*                      pUidList = NULL;
5,100,112✔
651

652
  pUidList = taosArrayInit(1, sizeof(int64_t));
5,100,112✔
653
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
5,100,112✔
654

655
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
5,100,112✔
656

657
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
5,100,112✔
658
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
5,100,112✔
659
  QUERY_CHECK_CODE(code, lino, _return);
5,100,112✔
660

661
_return:
5,100,112✔
662
  if (code) {
5,100,112✔
663
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
664
  }
665
  taosArrayDestroy(pUidList);
5,100,112✔
666
  return code;
5,100,112✔
667
}
668

669
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
11,734,132✔
670
                                                  SDownstreamSourceNode* pNewSource) {
671
  int32_t                      code = TSDB_CODE_SUCCESS;
11,734,132✔
672
  int32_t                      lino = 0;
11,734,132✔
673

674
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
11,734,132✔
675
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
11,734,132✔
676
  QUERY_CHECK_CODE(code, lino, _return);
11,734,132✔
677

678
  return code;
11,734,132✔
679
_return:
×
680
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
681
  return code;
×
682
}
683

684
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
139,846✔
685
  int32_t                       code = TSDB_CODE_SUCCESS;
139,846✔
686
  int32_t                       line = 0;
139,846✔
687
  SOperatorParam*               pParam = NULL;
139,846✔
688
  SExchangeOperatorBatchParam*  pExc = NULL;
139,846✔
689
  SExchangeOperatorBasicParam   basic = {0};
139,846✔
690

691
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
139,846✔
692
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
139,846✔
693

694
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
139,846✔
695
  pParam->downstreamIdx = downstreamIdx;
139,846✔
696
  pParam->reUse = false;
139,846✔
697
  pParam->pChildren = NULL;
139,846✔
698
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
139,846✔
699
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
139,846✔
700

701
  pExc = pParam->value;
139,846✔
702
  pExc->multiParams = true;
139,846✔
703
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
139,846✔
704
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
139,846✔
705

706
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
139,846✔
707

708
  int32_t iter = 0;
139,846✔
709
  void*   p = NULL;
139,846✔
710
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
388,257✔
711
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
248,411✔
712
    SArray*  pUidList = *(SArray**)p;
248,411✔
713

714
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
248,411✔
715
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
716
                                           pUidList, NULL, NULL, NULL,
717
                                           (STimeWindow){0}, NULL, false, false, false);
248,411✔
718
    QUERY_CHECK_CODE(code, line, _return);
248,411✔
719

720
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
248,411✔
721

722
    basic = (SExchangeOperatorBasicParam){0};
248,411✔
723
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
248,411✔
724

725
    // already transferred to batch param, can free here
726
    taosArrayDestroy(pUidList);
248,411✔
727

728
    *(SArray**)p = NULL;
248,411✔
729
  }
730
  *ppRes = pParam;
139,846✔
731

732
  return code;
139,846✔
733
  
734
_return:
×
735
  qError("failed to build batch exchange operator param, code:%d", code);
×
736
  freeOperatorParam(pParam, OP_GET_PARAM);
×
737
  freeExchangeGetBasicOperatorParam(&basic);
×
738
  return code;
×
739
}
740

741
/*
742
 * Build one batch-exchange get-param for virtual-table dynamic execution.
743
 *
744
 * @param ppRes Output operator param.
745
 * @param downstreamIdx Downstream operator index to fetch from.
746
 * @param pTagList Optional tag values bound to each source table.
747
 * @param groupid Group id used by downstream operators.
748
 * @param pBatchMaps Hash map from vgroup id to source-table metadata array.
749
 * @param window Time window forwarded to downstream scan.
750
 * @param type Exchange source type.
751
 * @param srcOpType Physical scan operator type for each batch source.
752
 *
753
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
754
 */
755
static int32_t buildBatchExchangeOperatorParamForVirtual(SOperatorParam** ppRes, int32_t downstreamIdx,
2,410,415✔
756
                                                         SArray* pTagList, uint64_t groupid, SHashObj* pBatchMaps,
757
                                                         STimeWindow window, EExchangeSourceType type,
758
                                                         ENodeType srcOpType) {
759
  int32_t                       code = TSDB_CODE_SUCCESS;
2,410,415✔
760
  int32_t                       lino = 0;
2,410,415✔
761
  SOperatorParam*               pParam = NULL;
2,410,415✔
762
  SExchangeOperatorBatchParam*  pExc = NULL;
2,410,415✔
763
  SExchangeOperatorBasicParam   basic = {0};
2,410,415✔
764

765
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
2,410,415✔
766
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
2,410,415✔
767

768
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
2,410,415✔
769
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
2,410,415✔
770

771
  pExc = pParam->value;
2,410,415✔
772
  pExc->multiParams = true;
2,410,415✔
773

774
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
2,410,415✔
775
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
2,410,415✔
776
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
2,410,415✔
777

778
  size_t keyLen = 0;
2,410,415✔
779
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
2,410,415✔
780
  while (pIter != NULL) {
6,603,197✔
781
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
4,192,782✔
782
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
4,192,782✔
783

784
    code = buildExchangeOperatorBasicParam(&basic, srcOpType,
4,192,782✔
785
                                           type, *vgId, groupid,
786
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
787
                                           window, NULL, false, true, false);
788
    QUERY_CHECK_CODE(code, lino, _return);
4,192,782✔
789

790
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
4,192,782✔
791
    QUERY_CHECK_CODE(code, lino, _return);
4,192,782✔
792

793
    basic = (SExchangeOperatorBasicParam){0};
4,192,782✔
794
    pIter = taosHashIterate(pBatchMaps, pIter);
4,192,782✔
795
  }
796

797
  pParam->pChildren = NULL;
2,410,415✔
798
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
2,410,415✔
799
  pParam->downstreamIdx = downstreamIdx;
2,410,415✔
800
  pParam->reUse = false;
2,410,415✔
801

802
  *ppRes = pParam;
2,410,415✔
803
  return code;
2,410,415✔
804

805
_return:
×
806
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
807
  freeOperatorParam(pParam, OP_GET_PARAM);
×
808
  freeExchangeGetBasicOperatorParam(&basic);
×
809
  return code;
×
810
}
811

812
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,902,324✔
813
  int32_t code = TSDB_CODE_SUCCESS;
3,902,324✔
814
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
3,902,324✔
815
  if (NULL == *ppRes) {
3,902,324✔
816
    code = terrno;
×
817
    return code;
×
818
  }
819
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,902,324✔
820
  if (NULL == (*ppRes)->pChildren) {
3,902,324✔
821
    code = terrno;
×
822
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
823
    *ppRes = NULL;
×
824
    return code;
×
825
  }
826
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,804,648✔
827
    code = terrno;
×
828
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
829
    *ppRes = NULL;
×
830
    return code;
×
831
  }
832
  *ppChild0 = NULL;
3,902,324✔
833
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,804,648✔
834
    code = terrno;
×
835
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
836
    *ppRes = NULL;
×
837
    return code;
×
838
  }
839
  *ppChild1 = NULL;
3,902,324✔
840
  
841
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,902,324✔
842
  if (NULL == pJoin) {
3,902,324✔
843
    code = terrno;
×
844
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
845
    *ppRes = NULL;
×
846
    return code;
×
847
  }
848

849
  pJoin->initDownstream = initParam;
3,902,324✔
850
  
851
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,902,324✔
852
  (*ppRes)->value = pJoin;
3,902,324✔
853
  (*ppRes)->reUse = false;
3,902,324✔
854

855
  return TSDB_CODE_SUCCESS;
3,902,324✔
856
}
857

858
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
859
  int32_t code = TSDB_CODE_SUCCESS;
×
860
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
×
861
  if (NULL == *ppRes) {
×
862
    code = terrno;
×
863
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
864
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
865
    return code;
×
866
  }
867
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
868
  if (NULL == *ppRes) {
×
869
    code = terrno;
×
870
    taosMemoryFreeClear(*ppRes);
×
871
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
872
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
873
    return code;
×
874
  }
875
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
876
    code = terrno;
×
877
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
878
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
879
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
880
    *ppRes = NULL;
×
881
    return code;
×
882
  }
883
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
884
    code = terrno;
×
885
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
886
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
887
    *ppRes = NULL;
×
888
    return code;
×
889
  }
890
  
891
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
892
  (*ppRes)->value = NULL;
×
893
  (*ppRes)->reUse = false;
×
894

895
  return TSDB_CODE_SUCCESS;
×
896
}
897

898
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
12,324✔
899
  int32_t code = TSDB_CODE_SUCCESS;
12,324✔
900
  int32_t vgNum = tSimpleHashGetSize(pVg);
12,324✔
901
  if (vgNum <= 0 || vgNum > 1) {
12,324✔
902
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
903
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
904
  }
905

906
  int32_t iter = 0;
12,324✔
907
  void* p = NULL;
12,324✔
908
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
24,648✔
909
    SArray* pUidList = *(SArray**)p;
12,324✔
910

911
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
12,324✔
912
    if (code) {
12,324✔
913
      return code;
×
914
    }
915
    taosArrayDestroy(pUidList);
12,324✔
916
    *(SArray**)p = NULL;
12,324✔
917
  }
918
  
919
  return TSDB_CODE_SUCCESS;
12,324✔
920
}
921

922
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
923
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
924
  if (NULL == pUidList) {
×
925
    return terrno;
×
926
  }
927
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
928
    return terrno;
×
929
  }
930

931
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
932
  taosArrayDestroy(pUidList);
×
933
  if (code) {
×
934
    return code;
×
935
  }
936
  
937
  return TSDB_CODE_SUCCESS;
×
938
}
939

940
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,902,324✔
941
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,902,324✔
942
  SOperatorParam*             pSrcParam0 = NULL;
3,902,324✔
943
  SOperatorParam*             pSrcParam1 = NULL;
3,902,324✔
944
  SOperatorParam*             pGcParam0 = NULL;
3,902,324✔
945
  SOperatorParam*             pGcParam1 = NULL;  
3,902,324✔
946
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,902,324✔
947
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,902,324✔
948
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,902,324✔
949
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,902,324✔
950
  int32_t                     code = TSDB_CODE_SUCCESS;
3,902,324✔
951

952
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,902,324✔
953
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
954

955
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,902,324✔
956
  
957
  if (pInfo->stbJoin.basic.batchFetch) {
3,902,324✔
958
    if (pPrev->leftHash) {
3,899,960✔
959
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
76,085✔
960
      if (TSDB_CODE_SUCCESS == code) {
76,085✔
961
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
76,085✔
962
      }
963
      if (TSDB_CODE_SUCCESS == code) {
76,085✔
964
        tSimpleHashCleanup(pPrev->leftHash);
76,085✔
965
        tSimpleHashCleanup(pPrev->rightHash);
76,085✔
966
        pPrev->leftHash = NULL;
76,085✔
967
        pPrev->rightHash = NULL;
76,085✔
968
      }
969
    }
970
  } else {
971
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,364✔
972
    if (TSDB_CODE_SUCCESS == code) {
2,364✔
973
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,364✔
974
    }
975
  }
976

977
  bool initParam = pSrcParam0 ? true : false;
3,902,324✔
978
  if (TSDB_CODE_SUCCESS == code) {
3,902,324✔
979
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,902,324✔
980
    pSrcParam0 = NULL;
3,902,324✔
981
  }
982
  if (TSDB_CODE_SUCCESS == code) {
3,902,324✔
983
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,902,324✔
984
    pSrcParam1 = NULL;
3,902,324✔
985
  }
986
  if (TSDB_CODE_SUCCESS == code) {
3,902,324✔
987
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,902,324✔
988
  }
989
  if (TSDB_CODE_SUCCESS != code) {
3,902,324✔
990
    if (pSrcParam0) {
×
991
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
992
    }
993
    if (pSrcParam1) {
×
994
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
995
    }
996
    if (pGcParam0) {
×
997
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
998
    }
999
    if (pGcParam1) {
×
1000
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
1001
    }
1002
    if (*ppParam) {
×
1003
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
1004
      *ppParam = NULL;
×
1005
    }
1006
  }
1007
  
1008
  return code;
3,902,324✔
1009
}
1010

1011
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
5,100,112✔
1012
  int32_t                   code = TSDB_CODE_SUCCESS;
5,100,112✔
1013
  int32_t                   lino = 0;
5,100,112✔
1014
  SVTableScanOperatorParam* pVScan = NULL;
5,100,112✔
1015
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
5,100,112✔
1016
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
5,100,112✔
1017

1018
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
5,100,112✔
1019
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
5,100,112✔
1020

1021
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
5,100,112✔
1022
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
5,100,112✔
1023
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
5,100,112✔
1024
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
5,100,112✔
1025
  pVScan->uid = uid;
5,100,112✔
1026
  pVScan->window = pInfo->vtbScan.window;
5,100,112✔
1027
  if (pInfo->vtbScan.refColGroups) {
5,100,112✔
1028
    pVScan->pRefColGroups = taosArrayInit(taosArrayGetSize(pInfo->vtbScan.refColGroups), sizeof(SRefColIdGroup));
5,132✔
1029
    QUERY_CHECK_NULL(pVScan->pRefColGroups, code, lino, _return, terrno)
5,132✔
1030
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->vtbScan.refColGroups); i++) {
13,459✔
1031
      SRefColIdGroup* pSrc = (SRefColIdGroup*)taosArrayGet(pInfo->vtbScan.refColGroups, i);
8,327✔
1032
      SRefColIdGroup  dst = {0};
8,327✔
1033
      QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
8,327✔
1034
      dst.pSlotIdList = taosArrayDup(pSrc->pSlotIdList, NULL);
8,327✔
1035
      QUERY_CHECK_NULL(dst.pSlotIdList, code, lino, _return, terrno)
8,327✔
1036
      void* px = taosArrayPush(pVScan->pRefColGroups, &dst);
8,327✔
1037
      if (NULL == px) {
8,327✔
1038
        taosArrayDestroy(dst.pSlotIdList);
×
1039
        dst.pSlotIdList = NULL;
×
1040
      }
1041
      QUERY_CHECK_NULL(px, code, lino, _return, terrno)
8,327✔
1042
    }
1043
  } else {
1044
    pVScan->pRefColGroups = NULL;
5,094,980✔
1045
  }
1046

1047
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
5,100,112✔
1048
  (*ppRes)->downstreamIdx = 0;
5,100,112✔
1049
  (*ppRes)->value = pVScan;
5,100,112✔
1050
  (*ppRes)->reUse = false;
5,100,112✔
1051

1052
  return TSDB_CODE_SUCCESS;
5,100,112✔
1053
_return:
×
1054
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1055
  if (pVScan) {
×
1056
    if (pVScan->pRefColGroups) {
×
1057
      taosArrayDestroyEx(pVScan->pRefColGroups, destroyRefColIdGroup);
×
1058
      pVScan->pRefColGroups = NULL;
×
1059
    }
1060
    taosArrayDestroy(pVScan->pOpParamArray);
×
1061
    taosMemoryFreeClear(pVScan);
×
1062
  }
1063
  if (*ppRes) {
×
1064
    taosArrayDestroy((*ppRes)->pChildren);
×
1065
    taosMemoryFreeClear(*ppRes);
×
1066
  }
1067
  return code;
×
1068
}
1069

1070
static int32_t addRefColIdToRefMap(SHashObj* refMap, const char* colrefName, col_id_t colId) {
19,290,705✔
1071
  int32_t  code = TSDB_CODE_SUCCESS;
19,290,705✔
1072
  int32_t  line = 0;
19,290,705✔
1073
  SArray** sameRefColIdList = NULL;
19,290,705✔
1074

1075
  if (colrefName == NULL || colrefName[0] == '\0') {
19,290,705✔
1076
    return code;
×
1077
  }
1078

1079
  sameRefColIdList = (SArray**)taosHashGet(refMap, colrefName, strlen(colrefName));
19,290,705✔
1080
  if (sameRefColIdList == NULL) {
19,290,705✔
1081
    SArray* list = taosArrayInit(2, sizeof(col_id_t));
19,277,910✔
1082
    QUERY_CHECK_NULL(list, code, line, _return, terrno)
19,277,910✔
1083
    QUERY_CHECK_CODE(taosHashPut(refMap, colrefName, strlen(colrefName), &list, POINTER_BYTES), line, _return);
19,277,910✔
1084
    sameRefColIdList = (SArray**)taosHashGet(refMap, colrefName, strlen(colrefName));
19,277,910✔
1085
    QUERY_CHECK_NULL(sameRefColIdList, code, line, _return, terrno)
19,277,910✔
1086
  }
1087

1088
  for (int32_t i = 0; i < taosArrayGetSize(*sameRefColIdList); i++) {
19,309,890✔
1089
    col_id_t existing = *(col_id_t*)taosArrayGet(*sameRefColIdList, i);
19,185✔
1090
    if (existing == colId) {
19,185✔
1091
      return code;
×
1092
    }
1093
  }
1094
  QUERY_CHECK_NULL(taosArrayPush(*sameRefColIdList, &colId), code, line, _return, terrno)
38,581,410✔
1095
  return code;
19,290,705✔
1096

1097
_return:
×
1098
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1099
  return code;
×
1100
}
1101

1102
static int32_t buildRefSlotGroupsFromRefMap(SHashObj* refMap, SArray* readColList, SArray** ppGroups) {
5,100,112✔
1103
  int32_t   code = TSDB_CODE_SUCCESS;
5,100,112✔
1104
  int32_t   line = 0;
5,100,112✔
1105
  SArray*   groups = NULL;
5,100,112✔
1106
  SHashObj* colIdToSlot = NULL;
5,100,112✔
1107

1108
  if (refMap == NULL || readColList == NULL) {
5,100,112✔
1109
    return code;
1,439,770✔
1110
  }
1111

1112
  if (*ppGroups) {
3,660,342✔
1113
    taosArrayDestroyEx(*ppGroups, destroyRefColIdGroup);
3,205✔
1114
    *ppGroups = NULL;
3,205✔
1115
  }
1116

1117
  colIdToSlot = taosHashInit(taosArrayGetSize(readColList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false,
3,660,342✔
1118
                             HASH_NO_LOCK);
1119
  QUERY_CHECK_NULL(colIdToSlot, code, line, _return, terrno)
3,660,342✔
1120

1121
  // Build a quick colId -> slotId lookup for columns actually read.
1122
  for (int32_t i = 0; i < taosArrayGetSize(readColList); i++) {
29,057,561✔
1123
    col_id_t colId = *(col_id_t*)taosArrayGet(readColList, i);
25,397,219✔
1124
    int32_t  slotId = i;
25,397,219✔
1125
    code = taosHashPut(colIdToSlot, &colId, sizeof(colId), &slotId, sizeof(slotId));
25,397,219✔
1126
    QUERY_CHECK_CODE(code, line, _return);
25,397,219✔
1127
  }
1128

1129
  groups = taosArrayInit(1, sizeof(SRefColIdGroup));
3,660,342✔
1130
  QUERY_CHECK_NULL(groups, code, line, _return, terrno)
3,660,342✔
1131

1132
  // Group columns that share the same ref name into slotId lists.
1133
  void* pIter = taosHashIterate(refMap, NULL);
3,660,342✔
1134
  while (pIter != NULL) {
22,938,252✔
1135
    SArray* pList = *(SArray**)pIter;  // colId list
19,277,910✔
1136
    if (pList && taosArrayGetSize(pList) > 1) {
19,277,910✔
1137
      SArray* slotList = taosArrayInit(taosArrayGetSize(pList), sizeof(int32_t));
7,683✔
1138
      QUERY_CHECK_NULL(slotList, code, line, _return, terrno)
7,683✔
1139
      for (int32_t i = 0; i < taosArrayGetSize(pList); i++) {
28,161✔
1140
        col_id_t colId = *(col_id_t*)taosArrayGet(pList, i);
20,478✔
1141
        int32_t* slotId = taosHashGet(colIdToSlot, &colId, sizeof(colId));
20,478✔
1142
        if (slotId) {
20,478✔
1143
          QUERY_CHECK_NULL(taosArrayPush(slotList, slotId), code, line, _return, terrno)
20,478✔
1144
        }
1145
      }
1146
      if (taosArrayGetSize(slotList) > 1) {
7,683✔
1147
        SRefColIdGroup g = {.pSlotIdList = slotList};
7,683✔
1148
        QUERY_CHECK_NULL(taosArrayPush(groups, &g), code, line, _return, terrno)
7,683✔
1149
      } else {
1150
        taosArrayDestroy(slotList);
×
1151
      }
1152
    }
1153
    if (pList) {
19,277,910✔
1154
      taosArrayDestroy(pList);
19,277,910✔
1155
    }
1156
    pIter = taosHashIterate(refMap, pIter);
19,277,910✔
1157
  }
1158

1159
  if (taosArrayGetSize(groups) == 0) {
3,660,342✔
1160
    taosArrayDestroy(groups);
3,655,854✔
1161
    groups = NULL;
3,655,854✔
1162
  }
1163

1164
_return:
4,488✔
1165
  if (code) {
3,660,342✔
1166
    qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1167
    if (groups) {
×
1168
      taosArrayDestroyEx(groups, destroyRefColIdGroup);
×
1169
    }
1170
  }
1171
  if (refMap) {
3,660,342✔
1172
    taosHashCleanup(refMap);
3,660,342✔
1173
  }
1174
  if (colIdToSlot) {
3,660,342✔
1175
    taosHashCleanup(colIdToSlot);
3,660,342✔
1176
  }
1177
  *ppGroups = groups;
3,660,342✔
1178
  return code;
3,660,342✔
1179
}
1180

1181
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
116,198,177✔
1182
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
116,198,177✔
1183
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
116,198,177✔
1184

1185
  if (pVtbScan->scanAllCols) {
116,198,177✔
1186
    return true;
6,722,981✔
1187
  }
1188

1189
  // if readColSet exists, use it to check whether colId is needed, otherwise use readColList
1190
  if (pVtbScan->readColSet) {
109,475,196✔
1191
    return taosHashGet(pVtbScan->readColSet, &colId, sizeof(colId)) != NULL;
109,475,196✔
1192
  }
1193

1194
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->readColList); i++) {
×
1195
    if (colId == *(col_id_t*)taosArrayGet(pVtbScan->readColList, i)) {
×
1196
      return true;
×
1197
    }
1198
  }
1199
  return false;
×
1200
}
1201

1202
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
752,046✔
1203
  int32_t                       code = TSDB_CODE_SUCCESS;
752,046✔
1204
  int32_t                       lino = 0;
752,046✔
1205
  SExternalWindowOperatorParam* pExtWinOp = NULL;
752,046✔
1206

1207
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
752,046✔
1208
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
752,046✔
1209

1210
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
752,046✔
1211
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
752,046✔
1212

1213
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
752,046✔
1214
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
752,046✔
1215

1216
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
752,046✔
1217
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
752,046✔
1218

1219
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
752,046✔
1220
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
752,046✔
1221

1222
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
752,046✔
1223
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
752,046✔
1224

1225
  SOperatorParam* pExchangeOperator = NULL;
752,046✔
1226
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
752,046✔
1227
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
752,046✔
1228
  QUERY_CHECK_CODE(code, lino, _return);
752,046✔
1229
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
1,504,092✔
1230

1231
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
752,046✔
1232
  (*ppRes)->downstreamIdx = idx;
752,046✔
1233
  (*ppRes)->value = pExtWinOp;
752,046✔
1234
  (*ppRes)->reUse = false;
752,046✔
1235

1236
  return code;
752,046✔
1237
_return:
×
1238
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1239
  if (pExtWinOp) {
×
1240
    if (pExtWinOp->ExtWins) {
×
1241
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1242
    }
1243
    taosMemoryFree(pExtWinOp);
×
1244
  }
1245
  if (*ppRes) {
×
1246
    if ((*ppRes)->pChildren) {
×
1247
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); ++i) {
×
1248
        SOperatorParam* pChildParam = taosArrayGetP((*ppRes)->pChildren, i);
×
1249
        if (pChildParam) {
×
1250
          freeOperatorParam(pChildParam, OP_GET_PARAM);
×
1251
        }
1252
      }
1253
      taosArrayDestroy((*ppRes)->pChildren);
×
1254
    }
1255
    taosMemoryFree(*ppRes);
×
1256
    *ppRes = NULL;
×
1257
  }
1258
  return code;
×
1259
}
1260

1261
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
312,732✔
1262
                                       int32_t numOfDownstream, int32_t numOfWins) {
1263
  int32_t                   code = TSDB_CODE_SUCCESS;
312,732✔
1264
  int32_t                   lino = 0;
312,732✔
1265
  SMergeOperatorParam*      pMergeOp = NULL;
312,732✔
1266

1267
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
312,732✔
1268
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
312,732✔
1269

1270
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
312,732✔
1271
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
312,732✔
1272

1273
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
312,732✔
1274
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
312,732✔
1275

1276
  pMergeOp->winNum = numOfWins;
312,732✔
1277

1278
  for (int32_t i = 0; i < numOfDownstream; i++) {
1,064,778✔
1279
    SOperatorParam* pExternalWinParam = NULL;
752,046✔
1280
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
752,046✔
1281
    QUERY_CHECK_CODE(code, lino, _return);
752,046✔
1282
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
1,504,092✔
1283
  }
1284

1285
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
312,732✔
1286
  (*ppRes)->downstreamIdx = 0;
312,732✔
1287
  (*ppRes)->value = pMergeOp;
312,732✔
1288
  (*ppRes)->reUse = false;
312,732✔
1289

1290
  return TSDB_CODE_SUCCESS;
312,732✔
1291
_return:
×
1292
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1293
  if (pMergeOp) {
×
1294
    taosMemoryFree(pMergeOp);
×
1295
  }
1296
  if (*ppRes) {
×
1297
    if ((*ppRes)->pChildren) {
×
1298
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1299
        SOperatorParam* pChildParam = taosArrayGetP((*ppRes)->pChildren, i);
×
1300
        if (pChildParam) {
×
1301
          freeOperatorParam(pChildParam, OP_GET_PARAM);
×
1302
        }
1303
      }
1304
      taosArrayDestroy((*ppRes)->pChildren);
×
1305
    }
1306
    taosMemoryFree(*ppRes);
×
1307
    *ppRes = NULL;
×
1308
  }
1309
  return code;
×
1310
}
1311

1312
/*
1313
 * Build merge operator params for vtable ts-scan mode.
1314
 *
1315
 * @param pInfo Dynamic-query control operator runtime info.
1316
 * @param ppRes Output merge operator param.
1317
 *
1318
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1319
 */
1320
static int32_t buildMergeOperatorParamForTsScan(SDynQueryCtrlOperatorInfo* pInfo, int32_t numOfDownstream,
48,917✔
1321
                                                SOperatorParam** ppRes) {
1322
  int32_t                   code = TSDB_CODE_SUCCESS;
48,917✔
1323
  int32_t                   lino = 0;
48,917✔
1324
  SOperatorParam*           pParam = NULL;
48,917✔
1325
  SOperatorParam*           pExchangeParam = NULL;
48,917✔
1326
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
48,917✔
1327

1328
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
48,917✔
1329
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
48,917✔
1330

1331
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
48,917✔
1332
  pParam->downstreamIdx = 0;
48,917✔
1333
  pParam->reUse = false;
48,917✔
1334
  pParam->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
48,917✔
1335
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
48,917✔
1336

1337
  pParam->value = taosMemoryMalloc(sizeof(SMergeOperatorParam));
48,917✔
1338
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
48,917✔
1339

1340
  for (int32_t i = 0; i < numOfDownstream; i++) {
333,903✔
1341
    code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, i, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap,
284,986✔
1342
                                                     (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN},
284,986✔
1343
                                                     EX_SRC_TYPE_VSTB_TS_SCAN,
1344
                                                     QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN);
1345
    QUERY_CHECK_CODE(code, lino, _return);
284,986✔
1346
    QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
569,972✔
1347
    pExchangeParam = NULL;
284,986✔
1348
  }
1349

1350
  *ppRes = pParam;
48,917✔
1351

1352
  return code;
48,917✔
UNCOV
1353
_return:
×
1354
  if (pExchangeParam) {
×
1355
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1356
  }
UNCOV
1357
  if (pParam) {
×
1358
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1359
  }
UNCOV
1360
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1361
  return code;
×
1362
}
1363

1364
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
44,061✔
1365
  int32_t                   code = TSDB_CODE_SUCCESS;
44,061✔
1366
  int32_t                   lino = 0;
44,061✔
1367
  SOperatorParam*           pParam = NULL;
44,061✔
1368
  SOperatorParam*           pExchangeParam = NULL;
44,061✔
1369
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
44,061✔
1370
  bool                      freeExchange = false;
44,061✔
1371

1372
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
44,061✔
1373
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
44,061✔
1374

1375
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
44,061✔
1376
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
44,061✔
1377

1378
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
44,061✔
1379
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
44,061✔
1380

1381
  code = buildBatchExchangeOperatorParamForVirtual(
44,061✔
1382
      &pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap,
1383
      (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN,
44,061✔
1384
      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
1385
  QUERY_CHECK_CODE(code, lino, _return);
44,061✔
1386

1387
  freeExchange = true;
44,061✔
1388

1389
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
88,122✔
1390

1391
  freeExchange = false;
44,061✔
1392

1393
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
44,061✔
1394
  pParam->downstreamIdx = 0;
44,061✔
1395
  pParam->reUse = false;
44,061✔
1396

1397
  *ppRes = pParam;
44,061✔
1398

1399
  return code;
44,061✔
UNCOV
1400
_return:
×
1401
  if (freeExchange) {
×
1402
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1403
  }
UNCOV
1404
  if (pParam) {
×
1405
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1406
  }
UNCOV
1407
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1408
  return code;
×
1409
}
1410

1411
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid,
838,187✔
1412
                                                SOperatorParam** ppRes) {
1413
  int32_t                   code = TSDB_CODE_SUCCESS;
838,187✔
1414
  int32_t                   lino = 0;
838,187✔
1415
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
838,187✔
1416
  SOperatorParam*           pParam = NULL;
838,187✔
1417
  SOperatorParam*           pExchangeParam = NULL;
838,187✔
1418
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
838,187✔
1419
  bool                      freeExchange = false;
838,187✔
1420
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
838,187✔
1421

1422
  if (!pIter) {
838,187✔
UNCOV
1423
    *ppRes = NULL;
×
1424
    return code;
×
1425
  }
1426

1427
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
838,187✔
1428

1429
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
838,187✔
1430
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
838,187✔
1431

1432
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
838,187✔
1433
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
838,187✔
1434

1435
  code = buildBatchExchangeOperatorParamForVirtual(
838,187✔
1436
      &pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap,
1437
      (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN,
838,187✔
1438
      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
1439
  QUERY_CHECK_CODE(code, lino, _return);
838,187✔
1440

1441
  freeExchange = true;
838,187✔
1442

1443
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
1,676,374✔
1444

1445
  freeExchange = false;
838,187✔
1446

1447
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
838,187✔
1448
  pParam->downstreamIdx = 0;
838,187✔
1449
  pParam->value = NULL;
838,187✔
1450
  pParam->reUse = false;
838,187✔
1451

1452
  *ppRes = pParam;
838,187✔
1453

1454
  return code;
838,187✔
UNCOV
1455
_return:
×
1456
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1457
  if (freeExchange) {
×
1458
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1459
  }
UNCOV
1460
  if (pParam) {
×
1461
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1462
  }
UNCOV
1463
  return code;
×
1464
}
1465

1466
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid,
695,060✔
1467
                                                   uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
1468
  int32_t                   code = TSDB_CODE_SUCCESS;
695,060✔
1469
  int32_t                   lino = 0;
695,060✔
1470
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
695,060✔
1471
  SOperatorParam*           pParam = NULL;
695,060✔
1472
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
695,060✔
1473
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
695,060✔
1474

1475
  if (pIter) {
695,060✔
1476
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
695,060✔
1477

1478
    code = buildBatchExchangeOperatorParamForVirtual(
695,060✔
1479
        &pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap,
1480
        (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN,
695,060✔
1481
        QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
1482
    QUERY_CHECK_CODE(code, lino, _return);
695,060✔
1483

1484
    *ppRes = pParam;
695,060✔
1485
  } else {
UNCOV
1486
    *ppRes = NULL;
×
1487
  }
1488

1489
  return code;
695,060✔
UNCOV
1490
_return:
×
1491
  if (pParam) {
×
1492
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1493
  }
UNCOV
1494
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1495
  return code;
×
1496
}
1497

1498
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,902,324✔
1499
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,902,324✔
1500
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,902,324✔
1501
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,902,324✔
1502
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,902,324✔
1503
  SOperatorParam*            pParam = NULL;
3,902,324✔
1504
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,902,324✔
1505
  if (TSDB_CODE_SUCCESS != code) {
3,902,324✔
UNCOV
1506
    pOperator->pTaskInfo->code = code;
×
1507
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1508
  }
1509

1510
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,902,324✔
1511
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,902,324✔
1512
  if (*ppRes && (code == 0)) {
3,902,324✔
1513
    code = blockDataCheck(*ppRes);
226,620✔
1514
    if (code) {
226,620✔
UNCOV
1515
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1516
      pOperator->pTaskInfo->code = code;
×
1517
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1518
    }
1519
    pPost->isStarted = true;
226,620✔
1520
    pStbJoin->execInfo.postBlkNum++;
226,620✔
1521
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
226,620✔
1522
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
226,620✔
1523
  } else {
1524
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,675,704✔
1525
  }
1526
}
3,902,324✔
1527

1528

UNCOV
1529
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1530
  SOperatorParam* pGcParam = NULL;
×
1531
  SOperatorParam* pMergeJoinParam = NULL;
×
1532
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1533
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1534
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1535

UNCOV
1536
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1537

UNCOV
1538
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1539
  if (TSDB_CODE_SUCCESS != code) {
×
1540
    return code;
×
1541
  }
UNCOV
1542
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1543
  if (TSDB_CODE_SUCCESS != code) {
×
1544
    return code;
×
1545
  }
1546

UNCOV
1547
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1548
}
1549

1550
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,901,793✔
1551
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,901,793✔
1552
  int32_t code = 0;
3,901,793✔
1553
  
1554
  pPost->isStarted = false;
3,901,793✔
1555
  
1556
  if (pStbJoin->basic.batchFetch) {
3,901,793✔
1557
    return TSDB_CODE_SUCCESS;
3,899,429✔
1558
  }
1559
  
1560
  if (pPost->leftNeedCache) {
2,364✔
UNCOV
1561
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1562
    if (num && --(*num) <= 0) {
×
1563
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1564
      if (code) {
×
1565
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1566
        QRY_ERR_RET(code);
×
1567
      }
UNCOV
1568
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1569
    }
1570
  }
1571
  
1572
  if (!pPost->rightNeedCache) {
2,364✔
1573
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,364✔
1574
    if (NULL != v) {
2,364✔
UNCOV
1575
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1576
      if (code) {
×
1577
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1578
        QRY_ERR_RET(code);
×
1579
      }
UNCOV
1580
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1581
    }
1582
  }
1583

1584
  return TSDB_CODE_SUCCESS;
2,364✔
1585
}
1586

1587

1588
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1589
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
303,356✔
1590
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
303,356✔
1591
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
303,356✔
1592

1593
  if (!pPost->isStarted) {
303,356✔
1594
    return TSDB_CODE_SUCCESS;
77,267✔
1595
  }
1596
  
1597
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
226,089✔
1598
  
1599
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
226,089✔
1600
  if (NULL == *ppRes) {
226,089✔
1601
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
226,089✔
1602
    pPrev->pListHead->readIdx++;
226,089✔
1603
  } else {
UNCOV
1604
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1605
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1606
  }
1607

1608
  return TSDB_CODE_SUCCESS;
226,089✔
1609
}
1610

1611
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1612
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,804,168✔
1613
  if (NULL == ppArray) {
7,804,168✔
1614
    SArray* pArray = taosArrayInit(10, valSize);
260,735✔
1615
    if (NULL == pArray) {
260,735✔
UNCOV
1616
      return terrno;
×
1617
    }
1618
    if (NULL == taosArrayPush(pArray, pVal)) {
521,470✔
UNCOV
1619
      taosArrayDestroy(pArray);
×
1620
      return terrno;
×
1621
    }
1622
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
260,735✔
UNCOV
1623
      taosArrayDestroy(pArray);      
×
1624
      return terrno;
×
1625
    }
1626
    return TSDB_CODE_SUCCESS;
260,735✔
1627
  }
1628

1629
  if (NULL == taosArrayPush(*ppArray, pVal)) {
15,086,866✔
UNCOV
1630
    return terrno;
×
1631
  }
1632
  
1633
  return TSDB_CODE_SUCCESS;
7,543,433✔
1634
}
1635

1636
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1637
  int32_t code = TSDB_CODE_SUCCESS;
2,364✔
1638
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,364✔
1639
  if (NULL == pNum) {
2,364✔
1640
    uint32_t n = 1;
2,364✔
1641
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,364✔
1642
    if (code) {
2,364✔
UNCOV
1643
      return code;
×
1644
    }
1645
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,364✔
1646
    if (code) {
2,364✔
UNCOV
1647
      return code;
×
1648
    }
1649
    return TSDB_CODE_SUCCESS;
2,364✔
1650
  }
1651

UNCOV
1652
  switch (*pNum) {
×
1653
    case 0:
×
1654
      break;
×
1655
    case UINT32_MAX:
×
1656
      *pNum = 0;
×
1657
      break;
×
1658
    default:
×
1659
      if (1 == (*pNum)) {
×
1660
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1661
        if (code) {
×
1662
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1663
          QRY_ERR_RET(code);
×
1664
        }
1665
      }
UNCOV
1666
      (*pNum)++;
×
1667
      break;
×
1668
  }
1669
  
UNCOV
1670
  return TSDB_CODE_SUCCESS;
×
1671
}
1672

1673

1674
static void freeStbJoinTableList(SStbJoinTableList* pList) {
76,736✔
1675
  if (NULL == pList) {
76,736✔
UNCOV
1676
    return;
×
1677
  }
1678
  taosMemoryFree(pList->pLeftVg);
76,736✔
1679
  taosMemoryFree(pList->pLeftUid);
76,736✔
1680
  taosMemoryFree(pList->pRightVg);
76,736✔
1681
  taosMemoryFree(pList->pRightUid);
76,736✔
1682
  taosMemoryFree(pList);
76,736✔
1683
}
1684

1685
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
77,267✔
1686
  int32_t code = TSDB_CODE_SUCCESS;
77,267✔
1687
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
77,267✔
1688
  if (NULL == pNew) {
77,267✔
UNCOV
1689
    return terrno;
×
1690
  }
1691
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
77,267✔
1692
  if (NULL == pNew->pLeftVg) {
77,267✔
UNCOV
1693
    code = terrno;
×
1694
    freeStbJoinTableList(pNew);
×
1695
    return code;
×
1696
  }
1697
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
77,267✔
1698
  if (NULL == pNew->pLeftUid) {
77,267✔
UNCOV
1699
    code = terrno;
×
1700
    freeStbJoinTableList(pNew);
×
1701
    return code;
×
1702
  }
1703
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
77,267✔
1704
  if (NULL == pNew->pRightVg) {
77,267✔
UNCOV
1705
    code = terrno;
×
1706
    freeStbJoinTableList(pNew);
×
1707
    return code;
×
1708
  }
1709
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
77,267✔
1710
  if (NULL == pNew->pRightUid) {
77,267✔
UNCOV
1711
    code = terrno;
×
1712
    freeStbJoinTableList(pNew);
×
1713
    return code;
×
1714
  }
1715

1716
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
77,267✔
1717
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
77,267✔
1718
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
77,267✔
1719
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
77,267✔
1720

1721
  pNew->readIdx = 0;
77,267✔
1722
  pNew->uidNum = rows;
77,267✔
1723
  pNew->pNext = NULL;
77,267✔
1724
  
1725
  if (pCtx->pListTail) {
77,267✔
UNCOV
1726
    pCtx->pListTail->pNext = pNew;
×
1727
    pCtx->pListTail = pNew;
×
1728
  } else {
1729
    pCtx->pListHead = pNew;
77,267✔
1730
    pCtx->pListTail= pNew;
77,267✔
1731
  }
1732

1733
  return TSDB_CODE_SUCCESS;
77,267✔
1734
}
1735

1736
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
77,267✔
1737
  int32_t                    code = TSDB_CODE_SUCCESS;
77,267✔
1738
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
77,267✔
1739
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
77,267✔
1740
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
77,267✔
1741
  if (NULL == pVg0) {
77,267✔
UNCOV
1742
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1743
  }
1744
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
77,267✔
1745
  if (NULL == pVg1) {
77,267✔
UNCOV
1746
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1747
  }
1748
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
77,267✔
1749
  if (NULL == pUid0) {
77,267✔
UNCOV
1750
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1751
  }
1752
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
77,267✔
1753
  if (NULL == pUid1) {
77,267✔
UNCOV
1754
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1755
  }
1756

1757
  if (pStbJoin->basic.batchFetch) {
77,267✔
1758
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,978,169✔
1759
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,902,084✔
1760
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,902,084✔
1761
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,902,084✔
1762
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,902,084✔
1763

1764
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,902,084✔
1765
      if (TSDB_CODE_SUCCESS != code) {
3,902,084✔
UNCOV
1766
        break;
×
1767
      }
1768
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,902,084✔
1769
      if (TSDB_CODE_SUCCESS != code) {
3,902,084✔
UNCOV
1770
        break;
×
1771
      }
1772
    }
1773
  } else {
1774
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,546✔
1775
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,364✔
1776
    
1777
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,364✔
1778
      if (TSDB_CODE_SUCCESS != code) {
2,364✔
UNCOV
1779
        break;
×
1780
      }
1781
    }
1782
  }
1783

1784
  if (TSDB_CODE_SUCCESS == code) {
77,267✔
1785
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
77,267✔
1786
    if (TSDB_CODE_SUCCESS == code) {
77,267✔
1787
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
77,267✔
1788
    }
1789
  }
1790

UNCOV
1791
_return:
×
1792

1793
  if (TSDB_CODE_SUCCESS != code) {
77,267✔
UNCOV
1794
    pOperator->pTaskInfo->code = code;
×
1795
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1796
  }
1797
}
77,267✔
1798

1799

1800
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,223,678✔
1801
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,223,678✔
1802
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,223,678✔
1803

1804
  if (pStbJoin->basic.batchFetch) {
1,223,678✔
1805
    return;
1,222,496✔
1806
  }
1807

1808
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,182✔
1809
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,182✔
1810
    return;
1,182✔
1811
  }
1812

UNCOV
1813
  uint64_t* pUid = NULL;
×
1814
  int32_t iter = 0;
×
1815
  int32_t code = 0;
×
1816
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1817
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1818
    if (code) {
×
1819
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1820
    }
1821
  }
1822

UNCOV
1823
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1824
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1825

1826
/*
1827
  // debug only
1828
  iter = 0;
1829
  uint32_t* num = NULL;
1830
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1831
    A S S E R T(*num > 1);
1832
  }
1833
*/  
1834
}
1835

1836
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,223,678✔
1837
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,223,678✔
1838
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,223,678✔
1839

1840
  while (true) {
77,267✔
1841
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,300,945✔
1842
    if (NULL == pBlock) {
1,300,945✔
1843
      break;
1,223,678✔
1844
    }
1845

1846
    pStbJoin->execInfo.prevBlkNum++;
77,267✔
1847
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
77,267✔
1848
    
1849
    doBuildStbJoinTableHash(pOperator, pBlock);
77,267✔
1850
  }
1851

1852
  postProcessStbJoinTableHash(pOperator);
1,223,678✔
1853

1854
  pStbJoin->ctx.prev.joinBuild = true;
1,223,678✔
1855
}
1,223,678✔
1856

1857
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
303,356✔
1858
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
303,356✔
1859
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
303,356✔
1860
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
303,356✔
1861
  SStbJoinTableList*         pNode = pPrev->pListHead;
303,356✔
1862

1863
  while (pNode) {
4,055,796✔
1864
    if (pNode->readIdx >= pNode->uidNum) {
3,979,060✔
1865
      pPrev->pListHead = pNode->pNext;
76,736✔
1866
      freeStbJoinTableList(pNode);
76,736✔
1867
      pNode = pPrev->pListHead;
76,736✔
1868
      continue;
76,736✔
1869
    }
1870
    
1871
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,902,324✔
1872
    if (*ppRes) {
3,902,324✔
1873
      return TSDB_CODE_SUCCESS;
226,620✔
1874
    }
1875

1876
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,675,704✔
1877
    pPrev->pListHead->readIdx++;
3,675,704✔
1878
  }
1879

1880
  *ppRes = NULL;
76,736✔
1881
  setOperatorCompleted(pOperator);
76,736✔
1882

1883
  return TSDB_CODE_SUCCESS;
76,736✔
1884
}
1885

1886
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,449,767✔
1887
  if (pBlock) {
1,449,767✔
1888
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
226,620✔
1889
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
226,620✔
1890
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
226,620✔
1891

1892
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
228,984✔
1893
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,364✔
1894
        if (pSlot == NULL) {
2,364✔
UNCOV
1895
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1896
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1897
        }
1898
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,364✔
1899
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,364✔
1900
        if (code != TSDB_CODE_SUCCESS) {
2,364✔
UNCOV
1901
          return code;
×
1902
        }
1903
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,364✔
1904
        if (code != TSDB_CODE_SUCCESS) {
2,364✔
UNCOV
1905
          return code;
×
1906
        }
1907
      }
1908
    } else {
UNCOV
1909
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1910
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1911
    }
1912
  }
1913
  return TSDB_CODE_SUCCESS;
1,449,767✔
1914
}
1915

1916
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,477,487✔
1917
  int32_t                    code = TSDB_CODE_SUCCESS;
1,477,487✔
1918
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,477,487✔
1919
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,477,487✔
1920

1921
  QRY_PARAM_CHECK(pRes);
1,477,487✔
1922
  if (pOperator->status == OP_EXEC_DONE) {
1,477,487✔
1923
    return code;
27,720✔
1924
  }
1925

1926
  if (!pStbJoin->ctx.prev.joinBuild) {
1,449,767✔
1927
    buildStbJoinTableList(pOperator);
1,223,678✔
1928
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,223,678✔
1929
      setOperatorCompleted(pOperator);
1,146,411✔
1930
      goto _return;
1,146,411✔
1931
    }
1932
  }
1933

1934
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
303,356✔
1935
  if (*pRes) {
303,356✔
UNCOV
1936
    goto _return;
×
1937
  }
1938

1939
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
303,356✔
1940

1941
_return:
303,356✔
1942
  if (code) {
1,449,767✔
UNCOV
1943
    qError("%s failed since %s", __func__, tstrerror(code));
×
1944
    pOperator->pTaskInfo->code = code;
×
1945
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1946
  } else {
1947
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,449,767✔
1948
  }
1949
  return code;
1,449,767✔
1950
}
1951

1952
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
3,397,776✔
1953
  int32_t                    lino = 0;
3,397,776✔
1954
  SOperatorInfo*             operator=(SOperatorInfo*) param;
3,397,776✔
1955
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
3,397,776✔
1956

1957
  if (TSDB_CODE_SUCCESS != code) {
3,397,776✔
UNCOV
1958
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1959
    if (operator->pTaskInfo->code != code) {
×
1960
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1961
             tstrerror(operator->pTaskInfo->code));
1962
    } else {
UNCOV
1963
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1964
    }
UNCOV
1965
    goto _return;
×
1966
  }
1967

1968
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
3,397,776✔
1969
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
3,397,776✔
1970

1971
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
3,397,776✔
1972
  QUERY_CHECK_CODE(code, lino, _return);
3,397,776✔
1973

1974
  taosMemoryFreeClear(pMsg->pData);
3,397,776✔
1975

1976
  code = tsem_post(&pScanResInfo->vtbScan.ready);
3,397,776✔
1977
  QUERY_CHECK_CODE(code, lino, _return);
3,397,776✔
1978

1979
  return code;
3,397,776✔
UNCOV
1980
_return:
×
1981
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1982
  return code;
×
1983
}
1984

1985
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
3,397,776✔
1986
  int32_t                    code = TSDB_CODE_SUCCESS;
3,397,776✔
1987
  int32_t                    lino = 0;
3,397,776✔
1988
  char*                      buf1 = NULL;
3,397,776✔
1989
  SUseDbReq*                 pReq = NULL;
3,397,776✔
1990
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
3,397,776✔
1991

1992
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
3,397,776✔
1993
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
3,397,776✔
1994
  code = tNameGetFullDbName(name, pReq->db);
3,397,776✔
1995
  QUERY_CHECK_CODE(code, lino, _return);
3,397,776✔
1996
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
3,397,776✔
1997
  buf1 = taosMemoryCalloc(1, contLen);
3,397,776✔
1998
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
3,397,776✔
1999
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
3,397,776✔
2000
  if (tempRes < 0) {
3,397,776✔
UNCOV
2001
    QUERY_CHECK_CODE(terrno, lino, _return);
×
2002
  }
2003

2004
  // send the fetch remote task result request
2005
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,397,776✔
2006
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
3,397,776✔
2007

2008
  pMsgSendInfo->param = pOperator;
3,397,776✔
2009
  pMsgSendInfo->msgInfo.pData = buf1;
3,397,776✔
2010
  pMsgSendInfo->msgInfo.len = contLen;
3,397,776✔
2011
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
3,397,776✔
2012
  pMsgSendInfo->fp = dynProcessUseDbRsp;
3,397,776✔
2013
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
3,397,776✔
2014

2015
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
3,397,776✔
2016
  QUERY_CHECK_CODE(code, lino, _return);
3,397,776✔
2017

2018
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
3,397,776✔
2019
  QUERY_CHECK_CODE(code, lino, _return);
3,397,776✔
2020

2021
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
3,397,776✔
2022
  QUERY_CHECK_CODE(code, lino, _return);
3,397,776✔
2023

2024
_return:
3,397,776✔
2025
  if (code) {
3,397,776✔
UNCOV
2026
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2027
     taosMemoryFree(buf1);
×
2028
  }
2029
  taosMemoryFree(pReq);
3,397,776✔
2030
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
3,397,776✔
2031
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
3,397,776✔
2032
  return code;
3,397,776✔
2033
}
2034

2035
int dynVgInfoComp(const void* lp, const void* rp) {
7,156,626✔
2036
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
7,156,626✔
2037
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
7,156,626✔
2038
  if (pLeft->hashBegin < pRight->hashBegin) {
7,156,626✔
2039
    return -1;
7,148,644✔
2040
  } else if (pLeft->hashBegin > pRight->hashBegin) {
7,982✔
2041
    return 1;
7,982✔
2042
  }
2043

UNCOV
2044
  return 0;
×
2045
}
2046

2047
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
17,265,726✔
2048
  if (NULL == dbInfo) {
17,265,726✔
UNCOV
2049
    return TSDB_CODE_SUCCESS;
×
2050
  }
2051

2052
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
17,265,726✔
2053
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
3,397,776✔
2054
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
3,397,776✔
2055
    if (NULL == dbInfo->vgArray) {
3,397,776✔
UNCOV
2056
      return terrno;
×
2057
    }
2058

2059
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
3,397,776✔
2060
    while (pIter) {
10,335,026✔
2061
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
13,874,500✔
UNCOV
2062
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
2063
        return terrno;
×
2064
      }
2065

2066
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
6,937,250✔
2067
    }
2068

2069
    taosArraySort(dbInfo->vgArray, sort_func);
3,397,776✔
2070
  }
2071

2072
  return TSDB_CODE_SUCCESS;
17,265,726✔
2073
}
2074

2075
int32_t dynHashValueComp(void const* lp, void const* rp) {
26,419,701✔
2076
  uint32_t*    key = (uint32_t*)lp;
26,419,701✔
2077
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
26,419,701✔
2078

2079
  if (*key < pVg->hashBegin) {
26,419,701✔
2080
    return -1;
338,604✔
2081
  } else if (*key > pVg->hashEnd) {
26,081,097✔
2082
    return 1;
8,815,371✔
2083
  }
2084

2085
  return 0;
17,265,726✔
2086
}
2087

2088
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
17,265,726✔
2089
  int32_t code = 0;
17,265,726✔
2090
  int32_t lino = 0;
17,265,726✔
2091
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
17,265,726✔
2092
  QUERY_CHECK_CODE(code, lino, _return);
17,265,726✔
2093

2094
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
17,265,726✔
2095
  if (vgNum <= 0) {
17,265,726✔
UNCOV
2096
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
2097
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
2098
  }
2099

2100
  SVgroupInfo* vgInfo = NULL;
17,265,726✔
2101
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
17,265,726✔
2102
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
17,265,726✔
2103
  int32_t offset = (int32_t)strlen(tbFullName);
17,265,726✔
2104

2105
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
17,265,726✔
2106
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
34,531,452✔
2107
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
17,265,726✔
2108

2109
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
17,265,726✔
2110
  if (NULL == vgInfo) {
17,265,726✔
UNCOV
2111
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
2112
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
UNCOV
2113
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
2114
  }
2115

2116
  *vgId = vgInfo->vgId;
17,265,726✔
2117

2118
_return:
17,265,726✔
2119
  return code;
17,265,726✔
2120
}
2121

2122
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
29,749,261✔
2123
  int32_t                    code = TSDB_CODE_SUCCESS;
29,749,261✔
2124
  int32_t                    line = 0;
29,749,261✔
2125
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
29,749,261✔
2126
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
29,749,261✔
2127
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
29,749,261✔
2128
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
29,749,261✔
2129
  SUseDbOutput*              output = NULL;
29,749,261✔
2130
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
29,749,261✔
2131

2132
  QRY_PARAM_CHECK(dbVgInfo);
29,749,261✔
2133

2134
  if (find == NULL) {
29,749,261✔
2135
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
3,397,776✔
2136
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
3,397,776✔
2137
    QUERY_CHECK_CODE(code, line, _return);
3,397,776✔
2138
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
3,397,776✔
2139
    QUERY_CHECK_CODE(code, line, _return);
3,397,776✔
2140
  } else {
2141
    output = *find;
26,351,485✔
2142
  }
2143

2144
  *dbVgInfo = output->dbVgroup;
29,749,261✔
2145
  return code;
29,749,261✔
UNCOV
2146
_return:
×
2147
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2148
  freeUseDbOutput(output);
×
2149
  return code;
×
2150
}
2151

2152
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
29,749,261✔
2153
  int32_t     code = TSDB_CODE_SUCCESS;
29,749,261✔
2154
  int32_t     line = 0;
29,749,261✔
2155

2156
  const char *first_dot = strchr(colref, '.');
29,749,261✔
2157
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
29,749,261✔
2158

2159
  const char *second_dot = strchr(first_dot + 1, '.');
29,749,261✔
2160
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
29,749,261✔
2161

2162
  size_t db_len = first_dot - colref;
29,749,261✔
2163
  size_t table_len = second_dot - first_dot - 1;
29,749,261✔
2164
  size_t col_len = strlen(second_dot + 1);
29,749,261✔
2165

2166
  *refDb = taosMemoryMalloc(db_len + 1);
29,749,261✔
2167
  *refTb = taosMemoryMalloc(table_len + 1);
29,749,261✔
2168
  *refCol = taosMemoryMalloc(col_len + 1);
29,749,261✔
2169
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
29,749,261✔
2170
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
29,749,261✔
2171
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
29,749,261✔
2172

2173
  tstrncpy(*refDb, colref, db_len + 1);
29,749,261✔
2174
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
29,749,261✔
2175
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
29,749,261✔
2176

2177
  return TSDB_CODE_SUCCESS;
29,749,261✔
UNCOV
2178
_return:
×
2179
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2180
  if (*refDb) {
×
2181
    taosMemoryFree(*refDb);
×
2182
    *refDb = NULL;
×
2183
  }
UNCOV
2184
  if (*refTb) {
×
2185
    taosMemoryFree(*refTb);
×
2186
    *refTb = NULL;
×
2187
  }
UNCOV
2188
  if (*refCol) {
×
2189
    taosMemoryFree(*refCol);
×
2190
    *refCol = NULL;
×
2191
  }
UNCOV
2192
  return code;
×
2193
}
2194

2195
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
280,585,767✔
2196
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
280,585,767✔
2197
      strlen(expectTbName) == varDataLen(tbName) &&
171,925,935✔
2198
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
171,925,935✔
2199
      strlen(expectDbName) == varDataLen(dbName)) {
171,925,935✔
2200
    return true;
171,925,935✔
2201
  }
2202
  return false;
108,661,097✔
2203
}
2204

2205
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
171,925,935✔
2206
  int32_t          code = TSDB_CODE_SUCCESS;
171,925,935✔
2207
  int32_t          line = 0;
171,925,935✔
2208

2209
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
171,925,935✔
2210
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
171,925,935✔
2211
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
171,925,935✔
2212
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
171,925,935✔
2213
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
171,925,935✔
2214
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
171,925,935✔
2215

2216
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
171,925,935✔
2217
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
171,925,935✔
2218
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
171,925,935✔
2219
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
171,925,935✔
2220
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
171,925,935✔
2221
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
171,925,935✔
2222

2223
  if (colDataIsNull_s(pRefCol, index)) {
343,851,870✔
2224
    pInfo->colrefName = NULL;
55,706,164✔
2225
  } else {
2226
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
116,219,771✔
2227
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
116,219,771✔
2228
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
116,219,771✔
2229
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
116,219,771✔
2230
  }
2231

2232
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
171,925,935✔
2233
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
171,925,935✔
2234
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
171,925,935✔
2235
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
171,925,935✔
2236

2237
  if (!colDataIsNull_s(pUidCol, index)) {
343,851,870✔
2238
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
171,925,935✔
2239
  }
2240
  if (!colDataIsNull_s(pColIdCol, index)) {
343,851,870✔
2241
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
116,219,771✔
2242
  }
2243
  if (!colDataIsNull_s(pVgIdCol, index)) {
343,851,870✔
2244
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
171,925,935✔
2245
  }
2246

UNCOV
2247
_return:
×
2248
  return code;
171,925,935✔
2249
}
2250

2251
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,663,395✔
2252
  int32_t                    code = TSDB_CODE_SUCCESS;
1,663,395✔
2253
  int32_t                    line = 0;
1,663,395✔
2254

2255
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,663,395✔
2256
    return code;
1,497,937✔
2257
  }
2258

2259
  if (pVtbScan->existOrgTbVg == NULL) {
165,458✔
UNCOV
2260
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2261
    pVtbScan->curOrgTbVg = NULL;
×
2262
  }
2263

2264
  if (pVtbScan->curOrgTbVg != NULL) {
165,458✔
2265
    // which means rversion has changed
2266
    void*   pCurIter = NULL;
15,440✔
2267
    SArray* tmpArray = NULL;
15,440✔
2268
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
44,874✔
2269
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
29,434✔
2270
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
29,434✔
2271
        if (tmpArray == NULL) {
2,574✔
2272
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,574✔
2273
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,574✔
2274
        }
2275
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,574✔
2276
      }
2277
    }
2278
    if (tmpArray == NULL) {
15,440✔
2279
      return TSDB_CODE_SUCCESS;
12,866✔
2280
    }
2281
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,574✔
2282
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,574✔
2283
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,574✔
UNCOV
2284
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2285
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2286
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2287
        }
UNCOV
2288
        taosArrayDestroy(expiredInfo);
×
2289
      }
2290
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,574✔
UNCOV
2291
        taosArrayDestroy(tmpArray);
×
2292
      }
2293
    }
2294
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,574✔
2295
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,574✔
2296
    taosHashClear(pVtbScan->curOrgTbVg);
2,574✔
2297
    pVtbScan->needRedeploy = true;
2,574✔
2298
    pVtbScan->rversion = rversion;
2,574✔
2299
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,574✔
2300
  }
2301
  return code;
150,018✔
UNCOV
2302
_return:
×
2303
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2304
  return code;
×
2305
}
2306

2307
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
46,364✔
2308
  int32_t                    code =TSDB_CODE_SUCCESS;
46,364✔
2309
  int32_t                    line = 0;
46,364✔
2310
  char*                      refDbName = NULL;
46,364✔
2311
  char*                      refTbName = NULL;
46,364✔
2312
  char*                      refColName = NULL;
46,364✔
2313
  SDBVgInfo*                 dbVgInfo = NULL;
46,364✔
2314
  SName                      name = {0};
46,364✔
2315
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
46,364✔
2316
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
46,364✔
2317

2318
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
46,364✔
2319
  QUERY_CHECK_CODE(code, line, _return);
46,364✔
2320

2321
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
46,364✔
2322

2323
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
46,364✔
2324
  QUERY_CHECK_CODE(code, line, _return);
46,364✔
2325

2326
  code = tNameGetFullDbName(&name, dbFname);
46,364✔
2327
  QUERY_CHECK_CODE(code, line, _return);
46,364✔
2328

2329
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
46,364✔
2330
  QUERY_CHECK_CODE(code, line, _return);
46,364✔
2331

2332
_return:
46,364✔
2333
  if (code) {
46,364✔
UNCOV
2334
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2335
  }
2336
  taosMemoryFree(refDbName);
46,364✔
2337
  taosMemoryFree(refTbName);
46,364✔
2338
  taosMemoryFree(refColName);
46,364✔
2339
  return code;
46,364✔
2340
}
2341

2342
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
695,060✔
2343
  int32_t code = TSDB_CODE_SUCCESS;
695,060✔
2344
  int32_t line = 0;
695,060✔
2345
  STagVal tagVal = {0};
695,060✔
2346
  // last col is uid
2347

2348
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
695,060✔
2349
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
695,060✔
2350

2351
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
4,677,748✔
2352
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
3,982,688✔
2353
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
3,982,688✔
2354
    tagVal.type = pTagCol->info.type;
3,982,688✔
2355
    tagVal.cid = pTagCol->info.colId;
3,982,688✔
2356
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
7,965,376✔
2357
      char*   pData = colDataGetData(pTagCol, rowIdx);
3,982,688✔
2358
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
3,982,688✔
2359
        tagVal.nData = varDataLen(pData);
1,756,404✔
2360
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
1,756,404✔
2361
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
1,756,404✔
2362
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
1,756,404✔
2363
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
3,512,808✔
2364
      } else {
2365
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
2,226,284✔
2366
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
4,452,568✔
2367
      }
2368
    } else {
UNCOV
2369
      tagVal.pData = NULL;
×
2370
      tagVal.nData = 0;
×
2371
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2372
    }
2373
    tagVal = (STagVal){0};
3,982,688✔
2374
  }
2375
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
695,060✔
2376
  QUERY_CHECK_CODE(code, line, _return);
695,060✔
2377

2378
  return code;
695,060✔
UNCOV
2379
_return:
×
2380
  if (tagVal.pData) {
×
2381
    taosMemoryFreeClear(tagVal.pData);
×
2382
  }
UNCOV
2383
  if (pTagList) {
×
2384
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2385
  }
UNCOV
2386
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2387
  return code;
×
2388
}
2389

2390
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId,
8,778,164✔
2391
                                          SHashObj** ppRefMap) {
2392
  int32_t                    code = TSDB_CODE_SUCCESS;
8,778,164✔
2393
  int32_t                    line = 0;
8,778,164✔
2394
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
8,778,164✔
2395
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
8,778,164✔
2396
  SDBVgInfo*                 dbVgInfo = NULL;
8,778,164✔
2397
  SHashObj*                  refMap = NULL;
8,778,164✔
2398

2399
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
180,661,379✔
2400
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
171,883,215✔
2401
    *uid = pKV->uid;
171,883,215✔
2402
    *vgId = pKV->vgId;
171,883,215✔
2403
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
171,883,215✔
2404
      char*   refDbName = NULL;
29,702,897✔
2405
      char*   refTbName = NULL;
29,702,897✔
2406
      char*   refColName = NULL;
29,702,897✔
2407
      SName   name = {0};
29,702,897✔
2408
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
29,702,897✔
2409
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
29,702,897✔
2410

2411
      if (ppRefMap != NULL) {
29,702,897✔
2412
        // Track colref -> colId mapping for later slot grouping.
2413
        if (refMap == NULL) {
19,290,705✔
2414
          refMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
3,660,342✔
2415
          QUERY_CHECK_NULL(refMap, code, line, _return, terrno)
3,660,342✔
2416
        }
2417
        code = addRefColIdToRefMap(refMap, pKV->colrefName, pKV->colId);
19,290,705✔
2418
        QUERY_CHECK_CODE(code, line, _return);
19,290,705✔
2419
      }
2420

2421
      // Parse db/tb/col ref and resolve source table vgId.
2422
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
29,702,897✔
2423
      QUERY_CHECK_CODE(code, line, _return);
29,702,897✔
2424

2425
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
29,702,897✔
2426

2427
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
29,702,897✔
2428
      QUERY_CHECK_CODE(code, line, _return);
29,702,897✔
2429
      code = tNameGetFullDbName(&name, dbFname);
29,702,897✔
2430
      QUERY_CHECK_CODE(code, line, _return);
29,702,897✔
2431
      code = tNameGetFullTableName(&name, orgTbFName);
29,702,897✔
2432
      QUERY_CHECK_CODE(code, line, _return);
29,702,897✔
2433

2434
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
29,702,897✔
2435
      if (!pVal) {
29,702,897✔
2436
        SOrgTbInfo orgTbInfo = {0};
17,219,362✔
2437
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
17,219,362✔
2438
        QUERY_CHECK_CODE(code, line, _return);
17,219,362✔
2439
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
17,219,362✔
2440
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
17,219,362✔
2441
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
17,219,362✔
2442
        SColIdNameKV colIdNameKV = {0};
17,219,362✔
2443
        colIdNameKV.colId = pKV->colId;
17,219,362✔
2444
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
17,219,362✔
2445
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
34,438,724✔
2446
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
17,219,362✔
2447
        QUERY_CHECK_CODE(code, line, _return);
17,219,362✔
2448
      } else {
2449
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
12,483,535✔
2450
        SColIdNameKV colIdNameKV = {0};
12,483,535✔
2451
        colIdNameKV.colId = pKV->colId;
12,483,535✔
2452
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
12,483,535✔
2453
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
24,967,070✔
2454
      }
2455
      taosMemoryFree(refDbName);
29,702,897✔
2456
      taosMemoryFree(refTbName);
29,702,897✔
2457
      taosMemoryFree(refColName);
29,702,897✔
2458
    }
2459
  }
2460

2461
  if (ppRefMap != NULL) {
8,778,164✔
2462
    *ppRefMap = refMap;
5,100,112✔
2463
  }
2464

2465
  return code;
8,778,164✔
2466

UNCOV
2467
_return:
×
2468
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2469
  if (refMap) {
×
2470
    taosHashCleanup(refMap);
×
2471
  }
UNCOV
2472
  return code;
×
2473
}
2474

2475
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
79,940✔
2476
  int32_t                    code = TSDB_CODE_SUCCESS;
79,940✔
2477
  int32_t                    line = 0;
79,940✔
2478
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
79,940✔
2479
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
79,940✔
2480
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
79,940✔
2481
  SArray*                    pColRefArray = NULL;
79,940✔
2482
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
79,940✔
2483
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
79,940✔
2484

2485
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
79,940✔
2486
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
79,940✔
2487
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
79,940✔
2488
  if (hasPartition) {
79,940✔
2489
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
60,048✔
2490
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
60,048✔
2491
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
60,048✔
2492
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
60,048✔
2493
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
60,048✔
2494
  }
2495

2496
  while (true) {
294,988✔
2497
    SSDataBlock *pTagVal = NULL;
374,928✔
2498
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
374,928✔
2499
    QUERY_CHECK_CODE(code, line, _return);
374,928✔
2500
    if (pTagVal == NULL) {
374,928✔
2501
      break;
79,940✔
2502
    }
2503
    SHashObj *vtbUidTagListMap = NULL;
294,988✔
2504
    if (hasPartition) {
294,988✔
2505
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
255,204✔
2506
      if (pIter) {
255,204✔
UNCOV
2507
        vtbUidTagListMap = *(SHashObj**)pIter;
×
2508
      } else {
2509
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
255,204✔
2510
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
255,204✔
2511
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
255,204✔
2512

2513
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
255,204✔
2514
        QUERY_CHECK_CODE(code, line, _return);
255,204✔
2515
      }
2516
    } else {
2517
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
39,784✔
2518
    }
2519

2520
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
294,988✔
2521
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
294,988✔
2522
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
990,048✔
2523
      tb_uid_t uid = 0;
695,060✔
2524
      if (!colDataIsNull_s(pUidCol, i)) {
1,390,120✔
2525
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
695,060✔
2526
        QUERY_CHECK_CODE(code, line, _return);
695,060✔
2527
      }
2528

2529
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
695,060✔
2530
      QUERY_CHECK_CODE(code, line, _return);
695,060✔
2531

2532
      if (hasPartition) {
695,060✔
2533
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
540,432✔
2534
        QUERY_CHECK_CODE(code, line, _return);
540,432✔
2535
      }
2536
    }
2537
  }
2538

2539
  return code;
79,940✔
2540

UNCOV
2541
_return:
×
2542
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2543
  return code;
×
2544
}
2545

2546
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
79,940✔
2547
  int32_t                    code = TSDB_CODE_SUCCESS;
79,940✔
2548
  int32_t                    line = 0;
79,940✔
2549
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
79,940✔
2550
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
79,940✔
2551
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
79,940✔
2552
  SArray*                    pColRefArray = NULL;
79,940✔
2553
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
79,940✔
2554
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
79,940✔
2555

2556
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
79,940✔
2557
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
79,940✔
2558

2559
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
775,000✔
2560
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
695,060✔
2561
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
695,060✔
2562

2563
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
695,060✔
2564
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
695,060✔
2565

2566
    tb_uid_t uid = 0;
695,060✔
2567
    int32_t  vgId = 0;
695,060✔
2568
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, NULL);
695,060✔
2569
    QUERY_CHECK_CODE(code, line, _return);
695,060✔
2570

2571
    size_t len = 0;
695,060✔
2572
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
695,060✔
2573
    while (pOrgTbInfo != NULL) {
1,708,422✔
2574
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
1,013,362✔
2575
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
1,013,362✔
2576

2577
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
1,013,362✔
2578
      if (!pIter) {
1,013,362✔
2579
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
839,748✔
2580
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
839,748✔
2581
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
1,679,496✔
2582
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
839,748✔
2583
        QUERY_CHECK_CODE(code, line, _return);
839,748✔
2584
      } else {
2585
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
173,614✔
2586
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
173,614✔
2587
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
173,614✔
2588
      }
2589

2590
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
1,013,362✔
2591

2592
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
1,013,362✔
2593
      QUERY_CHECK_CODE(code, line, _return);
1,013,362✔
2594
    }
2595

2596
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
695,060✔
2597
    QUERY_CHECK_CODE(code, line, _return);
695,060✔
2598
  }
2599

2600
  return code;
79,940✔
UNCOV
2601
_return:
×
2602
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2603
  return code;
×
2604
}
2605

2606
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
79,940✔
2607
  int32_t                    code = TSDB_CODE_SUCCESS;
79,940✔
2608
  int32_t                    line = 0;
79,940✔
2609

2610
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
79,940✔
2611
  QUERY_CHECK_CODE(code, line, _return);
79,940✔
2612

2613
  // process tag
2614
  code = getTagBlockAndProcess(pOperator, hasPartition);
79,940✔
2615
  QUERY_CHECK_CODE(code, line, _return);
79,940✔
2616

2617
  return code;
79,940✔
UNCOV
2618
_return:
×
2619
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2620
  return code;
×
2621
}
2622

2623
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
559,683✔
2624
  int32_t                    code = TSDB_CODE_SUCCESS;
559,683✔
2625
  int32_t                    line = 0;
559,683✔
2626
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
559,683✔
2627
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
559,683✔
2628
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
559,683✔
2629
  SArray*                    pColRefArray = NULL;
559,683✔
2630
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
559,683✔
2631
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
559,683✔
2632

2633
  if (hasPartition) {
559,683✔
2634
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
123,095✔
2635
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
123,095✔
2636
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
123,095✔
2637

2638
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
123,095✔
2639
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
123,095✔
2640
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
123,095✔
2641
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
123,095✔
2642
  } else {
2643
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
436,588✔
2644
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
436,588✔
2645
  }
2646

2647
  while (true && hasPartition) {
1,092,112✔
2648
    SSDataBlock* pTagVal = NULL;
655,524✔
2649
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
655,524✔
2650
    QUERY_CHECK_CODE(code, line, _return);
655,524✔
2651
    if (pTagVal == NULL) {
655,524✔
2652
      break;
123,095✔
2653
    }
2654

2655
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
532,429✔
2656
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
532,429✔
2657
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
1,615,864✔
2658
      tb_uid_t uid = 0;
1,083,435✔
2659
      if (!colDataIsNull_s(pUidCol, i)) {
2,166,870✔
2660
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
1,083,435✔
2661
        QUERY_CHECK_CODE(code, line, _return);
1,083,435✔
2662
      }
2663
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
1,083,435✔
2664
      QUERY_CHECK_CODE(code, line, _return);
1,083,435✔
2665
    }
2666
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
532,429✔
2667
    QUERY_CHECK_CODE(code, line, _return);
532,429✔
2668
  }
2669

2670
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
3,542,675✔
2671
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
2,982,992✔
2672
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
2,982,992✔
2673
    tb_uid_t uid = 0;
2,982,992✔
2674
    int32_t  vgId = 0;
2,982,992✔
2675
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, NULL);
2,982,992✔
2676
    QUERY_CHECK_CODE(code, line, _return);
2,982,992✔
2677

2678
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
2,982,992✔
2679
    if (hasPartition) {
2,982,992✔
2680
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
1,083,435✔
2681
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
1,083,435✔
2682

2683
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
1,083,435✔
2684
      if (pHashIter) {
1,083,435✔
2685
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
551,006✔
2686
      } else {
2687
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
532,429✔
2688
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
532,429✔
2689
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
532,429✔
2690
        QUERY_CHECK_CODE(code, line, _return);
532,429✔
2691
      }
2692
    } else {
2693
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
1,899,557✔
2694
    }
2695

2696
    size_t len = 0;
2,982,992✔
2697
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
2,982,992✔
2698
    while (pOrgTbInfo != NULL) {
7,454,860✔
2699
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
4,471,868✔
2700
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
4,471,868✔
2701
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
4,471,868✔
2702
      if (!pIter) {
4,471,868✔
2703
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
2,042,984✔
2704
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,042,984✔
2705
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
4,085,968✔
2706
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
2,042,984✔
2707
        QUERY_CHECK_CODE(code, line, _return);
2,042,984✔
2708
      } else {
2709
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
2,428,884✔
2710
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
2,428,884✔
2711
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
2,428,884✔
2712
      }
2713

2714
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
4,471,868✔
2715

2716
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
4,471,868✔
2717
      QUERY_CHECK_CODE(code, line, _return);
4,471,868✔
2718
    }
2719
  }
2720
  return code;
559,683✔
UNCOV
2721
_return:
×
2722
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2723
  return code;
×
2724
}
2725

2726
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
2,148,320✔
2727
  int32_t                    code = TSDB_CODE_SUCCESS;
2,148,320✔
2728
  int32_t                    line = 0;
2,148,320✔
2729
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,148,320✔
2730
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,148,320✔
2731
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,148,320✔
2732
  SArray*                    pColRefArray = NULL;
2,148,320✔
2733
  SOperatorInfo*             pSystableScanOp = NULL;
2,148,320✔
2734
  
2735
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,148,320✔
2736
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
2,148,320✔
2737

2738
  if (pInfo->qType == DYN_QTYPE_VTB_AGG || pInfo->qType == DYN_QTYPE_VTB_INTERVAL) {
2,148,320✔
2739
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
283,938✔
2740
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
283,938✔
2741
    pSystableScanOp = pOperator->pDownstream[0];
283,938✔
2742
  } else if (pInfo->qType == DYN_QTYPE_VTB_WINDOW || pInfo->qType == DYN_QTYPE_VTB_TS_SCAN) {
1,864,382✔
2743
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
355,685✔
2744
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
355,685✔
2745
    pSystableScanOp = pOperator->pDownstream[1];
355,685✔
2746
  } else {
2747
    pSystableScanOp = pOperator->pDownstream[1];
1,508,697✔
2748
  }
2749

2750
  while (true) {
4,238,288✔
2751
    SSDataBlock *pChildInfo = NULL;
6,386,608✔
2752
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
6,386,608✔
2753
    QUERY_CHECK_CODE(code, line, _return);
6,386,608✔
2754
    if (pChildInfo == NULL) {
6,386,608✔
2755
      break;
2,148,320✔
2756
    }
2757
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
4,238,288✔
2758
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
4,238,288✔
2759
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
4,238,288✔
2760

2761
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
4,238,288✔
2762
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
4,238,288✔
2763
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
4,238,288✔
2764

2765
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
274,065,000✔
2766
      if (!colDataIsNull_s(pStbNameCol, i)) {
539,653,424✔
2767
        char* stbrawname = colDataGetData(pStbNameCol, i);
269,826,712✔
2768
        char* dbrawname = colDataGetData(pDbNameCol, i);
269,826,712✔
2769
        char *ctbName = colDataGetData(pTableNameCol, i);
269,826,712✔
2770

2771
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
269,826,712✔
2772
          SColRefInfo info = {0};
171,167,912✔
2773
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
171,167,912✔
2774
          QUERY_CHECK_CODE(code, line, _return);
171,167,912✔
2775

2776
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
171,167,912✔
2777
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
112,319,080✔
UNCOV
2778
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2779
                     pInfo->vtbScan.dynTbUid);
UNCOV
2780
              destroyColRefInfo(&info);
×
2781
              continue;
×
2782
            }
2783

2784
            if (pTaskInfo->pStreamRuntimeInfo) {
112,319,080✔
2785
              if (pVtbScan->curOrgTbVg == NULL) {
65,544✔
2786
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
6,068✔
2787
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
6,068✔
2788
              }
2789

2790
              if (info.colrefName) {
65,544✔
2791
                int32_t vgId;
38,876✔
2792
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
38,876✔
2793
                QUERY_CHECK_CODE(code, line, _return);
38,876✔
2794
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
38,876✔
2795
                QUERY_CHECK_CODE(code, line, _return);
38,876✔
2796
              }
2797
            }
2798
          }
2799

2800
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
171,167,912✔
2801
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
8,628,593✔
2802
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
8,628,593✔
2803
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
17,257,186✔
2804
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
8,628,593✔
2805
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
17,257,186✔
2806
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
8,628,593✔
2807
            QUERY_CHECK_CODE(code, line, _return);
8,628,593✔
2808
          } else {
2809
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
162,539,319✔
2810
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
162,539,319✔
2811
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
162,539,319✔
2812
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
162,539,319✔
2813
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
325,078,638✔
2814
          }
2815
        }
2816
      }
2817
    }
2818
  }
2819

2820
  switch (pInfo->qType) {
2,148,320✔
2821
    case DYN_QTYPE_VTB_TS_SCAN:
392,527✔
2822
    case DYN_QTYPE_VTB_WINDOW:
2823
    case DYN_QTYPE_VTB_INTERVAL: {
2824
      code = buildOrgTbInfoBatch(pOperator, false);
392,527✔
2825
      break;
392,527✔
2826
    }
2827
    case DYN_QTYPE_VTB_AGG: {
247,096✔
2828
      if (pVtbScan->batchProcessChild) {
247,096✔
2829
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
167,156✔
2830
      } else {
2831
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
79,940✔
2832
      }
2833
      break;
247,096✔
2834
    }
2835
    case DYN_QTYPE_VTB_SCAN: {
1,508,697✔
2836
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,508,697✔
2837
      break;
1,508,697✔
2838
    }
UNCOV
2839
    default: {
×
2840
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2841
      break;
×
2842
    }
2843
  }
2844

2845
  QUERY_CHECK_CODE(code, line, _return);
2,148,320✔
2846

2847
_return:
2,148,320✔
2848
  if (code) {
2,148,320✔
2849
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,170✔
2850
  }
2851
  return code;
2,148,320✔
2852
}
2853

2854
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
154,698✔
2855
  int32_t                    code = TSDB_CODE_SUCCESS;
154,698✔
2856
  int32_t                    line = 0;
154,698✔
2857
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
154,698✔
2858
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
154,698✔
2859
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
154,698✔
2860
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
154,698✔
2861
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
154,698✔
2862
  int32_t                    rversion = 0;
154,698✔
2863

2864
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
154,698✔
2865
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
154,698✔
2866

2867
  while (true) {
319,050✔
2868
    SSDataBlock *pTableInfo = NULL;
473,748✔
2869
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
473,748✔
2870
    if (pTableInfo == NULL) {
473,748✔
2871
      break;
154,698✔
2872
    }
2873

2874
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
319,050✔
2875
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
319,050✔
2876
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
319,050✔
2877

2878
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
319,050✔
2879
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
319,050✔
2880
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
319,050✔
2881

2882
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
11,079,370✔
2883
      if (!colDataIsNull_s(pRefVerCol, i)) {
21,520,640✔
2884
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
10,760,320✔
2885
      }
2886

2887
      if (!colDataIsNull_s(pTableNameCol, i)) {
21,520,640✔
2888
        char* tbrawname = colDataGetData(pTableNameCol, i);
10,760,320✔
2889
        char* dbrawname = colDataGetData(pDbNameCol, i);
10,760,320✔
2890
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
10,760,320✔
2891
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
10,760,320✔
2892

2893
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
10,760,320✔
2894
          SColRefInfo info = {0};
758,023✔
2895
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
758,023✔
2896
          QUERY_CHECK_CODE(code, line, _return);
758,023✔
2897

2898
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
758,023✔
2899
            if (pVtbScan->curOrgTbVg == NULL) {
7,488✔
2900
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
468✔
2901
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
468✔
2902
            }
2903
            int32_t vgId;
7,488✔
2904
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
7,488✔
2905
            QUERY_CHECK_CODE(code, line, _return);
7,488✔
2906
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
7,488✔
2907
            QUERY_CHECK_CODE(code, line, _return);
7,488✔
2908
          }
2909

2910
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,516,046✔
2911
        }
2912
      }
2913
    }
2914
  }
2915
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
154,698✔
2916
  QUERY_CHECK_CODE(code, line, _return);
154,698✔
2917

2918
_return:
153,294✔
2919
  if (code) {
154,698✔
2920
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,404✔
2921
  }
2922
  return code;
154,698✔
2923
}
2924

2925
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,059,902✔
2926
  int32_t                    code = TSDB_CODE_SUCCESS;
2,059,902✔
2927
  int32_t                    line = 0;
2,059,902✔
2928
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,059,902✔
2929
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,059,902✔
2930
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,059,902✔
2931

2932
  SArray *tmpArray = NULL;
2,059,902✔
2933
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,059,902✔
2934
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,059,902✔
2935
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
5,148✔
2936
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,574✔
2937
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,574✔
2938
      QUERY_CHECK_CODE(code, line, _return);
2,574✔
2939
      if (pVtbScan->newAddedVgInfo == NULL) {
2,574✔
2940
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
936✔
2941
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
936✔
2942
      }
2943
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,574✔
2944
      QUERY_CHECK_CODE(code, line, _return);
2,574✔
2945
    }
2946
    pVtbScan->needRedeploy = false;
2,574✔
2947
  } else {
2948
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,057,328✔
2949
    QUERY_CHECK_CODE(code, line, _return);
2,057,328✔
2950
  }
2951

UNCOV
2952
_return:
×
2953
  taosArrayClear(tmpArray);
2,059,902✔
2954
  taosArrayDestroy(tmpArray);
2,059,902✔
2955
  if (code) {
2,059,902✔
2956
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,057,328✔
2957
  }
2958
  return code;
2,059,668✔
2959
}
2960

2961
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
5,100,112✔
2962
  int32_t                    code = TSDB_CODE_SUCCESS;
5,100,112✔
2963
  int32_t                    line = 0;
5,100,112✔
2964
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
5,100,112✔
2965
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
5,100,112✔
2966
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
5,100,112✔
2967

2968
  pVtbScan->vtbScanParam = NULL;
5,100,112✔
2969
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
5,100,112✔
2970
  QUERY_CHECK_CODE(code, line, _return);
5,100,112✔
2971

2972
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
5,100,112✔
2973
  while (pIter != NULL) {
16,834,244✔
2974
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
11,734,132✔
2975
    SOperatorParam*  pExchangeParam = NULL;
11,734,132✔
2976
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
11,734,132✔
2977
    if (addr != NULL) {
11,734,132✔
2978
      SDownstreamSourceNode newSource = {0};
2,574✔
2979
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,574✔
2980
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,574✔
2981
      newSource.taskId = addr->taskId;
2,574✔
2982
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,574✔
2983
      newSource.localExec = false;
2,574✔
2984
      newSource.addr.nodeId = addr->nodeId;
2,574✔
2985
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,574✔
2986

2987
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,574✔
2988
      QUERY_CHECK_CODE(code, line, _return);
2,574✔
2989
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,574✔
2990
      QUERY_CHECK_CODE(code, line, _return);
2,574✔
2991
    } else {
2992
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
11,731,558✔
2993
      QUERY_CHECK_CODE(code, line, _return);
11,731,558✔
2994
    }
2995
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
23,468,264✔
2996
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
11,734,132✔
2997
  }
2998

2999
  SOperatorParam*  pExchangeParam = NULL;
5,100,112✔
3000
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
5,100,112✔
3001
  QUERY_CHECK_CODE(code, line, _return);
5,100,112✔
3002
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
5,100,112✔
3003

3004
_return:
5,100,112✔
3005
  if (code) {
5,100,112✔
UNCOV
3006
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3007
  }
3008
  return code;
5,100,112✔
3009
}
3010

3011
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
13,203,140✔
3012
  int32_t                    code = TSDB_CODE_SUCCESS;
13,203,140✔
3013
  int32_t                    line = 0;
13,203,140✔
3014
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
13,203,140✔
3015
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
13,203,140✔
3016
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
13,203,140✔
3017

3018
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
13,203,140✔
3019
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
13,203,140✔
3020
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
13,203,140✔
3021

3022
  while (true) {
3023
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
16,685,343✔
3024
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
11,585,231✔
3025
      QUERY_CHECK_CODE(code, line, _return);
11,585,231✔
3026
    } else {
3027
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
5,100,112✔
3028
      SArray* pColRefInfo = NULL;
5,100,112✔
3029
      if (pVtbScan->isSuperTable) {
5,100,112✔
3030
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
4,946,818✔
3031
      } else {
3032
        pColRefInfo = pInfo->vtbScan.colRefInfo;
153,294✔
3033
      }
3034
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
5,100,112✔
3035

3036
      tb_uid_t  uid = 0;
5,100,112✔
3037
      int32_t   vgId = 0;
5,100,112✔
3038
      SHashObj* refMap = NULL;
5,100,112✔
3039
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId, &refMap);
5,100,112✔
3040
      QUERY_CHECK_CODE(code, line, _return);
5,100,112✔
3041

3042
      qDebug("virtual table scan process subtable idx:%d uid:%" PRIu64 " vgId:%d", pVtbScan->curTableIdx, uid, vgId);
5,100,112✔
3043

3044
      code = buildRefSlotGroupsFromRefMap(refMap, pVtbScan->readColList, &pVtbScan->refColGroups);
5,100,112✔
3045
      QUERY_CHECK_CODE(code, line, _return);
5,100,112✔
3046

3047
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
5,100,112✔
3048
      QUERY_CHECK_CODE(code, line, _return);
5,100,112✔
3049

3050
      // reset downstream operator's status
3051
      pVtbScanOp->status = OP_NOT_OPENED;
5,100,112✔
3052
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
5,100,112✔
3053
      QUERY_CHECK_CODE(code, line, _return);
5,098,818✔
3054
    }
3055

3056
    if (*pRes) {
16,684,049✔
3057
      // has result, still read data from this table.
3058
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
11,590,111✔
3059
      break;
11,590,111✔
3060
    } else {
3061
      // no result, read next table.
3062
      pVtbScan->curTableIdx++;
5,093,938✔
3063
      if (pVtbScan->isSuperTable) {
5,093,938✔
3064
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
4,940,644✔
3065
          setOperatorCompleted(pOperator);
1,458,441✔
3066
          break;
1,458,441✔
3067
        }
3068
      } else {
3069
        setOperatorCompleted(pOperator);
153,294✔
3070
        break;
153,294✔
3071
      }
3072
    }
3073
  }
3074

3075
_return:
13,201,846✔
3076
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
13,201,846✔
3077
  pVtbScan->otbNameToOtbInfoMap = NULL;
13,201,846✔
3078
  if (code) {
13,201,846✔
UNCOV
3079
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3080
  }
3081
  return code;
13,201,846✔
3082
}
3083

3084
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
13,248,626✔
3085
  int32_t                    code = TSDB_CODE_SUCCESS;
13,248,626✔
3086
  int32_t                    line = 0;
13,248,626✔
3087
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
13,248,626✔
3088
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
13,248,626✔
3089

3090
  if (OPTR_IS_OPENED(pOperator)) {
13,248,626✔
3091
    return code;
11,585,231✔
3092
  }
3093

3094
  if (pVtbScan->isSuperTable) {
1,663,395✔
3095
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,508,697✔
3096
    QUERY_CHECK_CODE(code, line, _return);
1,508,697✔
3097
  } else {
3098
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
154,698✔
3099
    QUERY_CHECK_CODE(code, line, _return);
154,698✔
3100
  }
3101

3102
  OPTR_SET_OPENED(pOperator);
1,660,821✔
3103

3104
_return:
1,663,395✔
3105
  if (code) {
1,663,395✔
3106
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,574✔
3107
    pOperator->pTaskInfo->code = code;
2,574✔
3108
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,574✔
3109
  }
3110
  return code;
1,660,821✔
3111
}
3112

3113
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
15,305,954✔
3114
  int32_t                    code = TSDB_CODE_SUCCESS;
15,305,954✔
3115
  int32_t                    line = 0;
15,305,954✔
3116
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
15,305,954✔
3117
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
15,305,954✔
3118

3119
  QRY_PARAM_CHECK(pRes);
15,305,954✔
3120
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
15,305,954✔
UNCOV
3121
    return code;
×
3122
  }
3123
  if (pOperator->pOperatorGetParam) {
15,305,954✔
UNCOV
3124
    if (pOperator->status == OP_EXEC_DONE) {
×
3125
      pOperator->status = OP_OPENED;
×
3126
    }
UNCOV
3127
    pVtbScan->curTableIdx = 0;
×
3128
    pVtbScan->lastTableIdx = -1;
×
3129
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
3130
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
×
3131
    pOperator->pOperatorGetParam = NULL;
×
3132
  } else {
3133
    pVtbScan->window.skey = INT64_MAX;
15,305,954✔
3134
    pVtbScan->window.ekey = INT64_MIN;
15,305,954✔
3135
  }
3136

3137
  if (pVtbScan->needRedeploy) {
15,305,954✔
3138
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,059,902✔
3139
    QUERY_CHECK_CODE(code, line, _return);
2,059,902✔
3140
  }
3141

3142
  code = pOperator->fpSet._openFn(pOperator);
13,248,626✔
3143
  QUERY_CHECK_CODE(code, line, _return);
13,246,052✔
3144

3145
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
13,246,052✔
3146
    setOperatorCompleted(pOperator);
42,912✔
3147
    return code;
42,912✔
3148
  }
3149

3150
  code = virtualTableScanGetNext(pOperator, pRes);
13,203,140✔
3151
  QUERY_CHECK_CODE(code, line, _return);
13,201,846✔
3152

3153
  return code;
13,201,846✔
3154

3155
_return:
2,057,328✔
3156
  if (code) {
2,057,328✔
3157
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,057,328✔
3158
    pOperator->pTaskInfo->code = code;
2,057,328✔
3159
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,057,328✔
3160
  }
UNCOV
3161
  return code;
×
3162
}
3163

3164
/*
3165
 * Open dynamic vtable operator and build child-table mapping once.
3166
 *
3167
 * @param pOperator Dynamic-query control operator.
3168
 *
3169
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
3170
 */
3171
static int32_t vtbDefaultOpen(SOperatorInfo* pOperator) {
3,956,087✔
3172
  int32_t                    code = TSDB_CODE_SUCCESS;
3,956,087✔
3173
  int32_t                    line = 0;
3,956,087✔
3174
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,956,087✔
3175
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,956,087✔
3176

3177
  if (OPTR_IS_OPENED(pOperator)) {
3,956,087✔
3178
    return code;
3,623,232✔
3179
  }
3180

3181
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
332,855✔
3182
  QUERY_CHECK_CODE(code, line, _return);
332,855✔
3183

3184
  OPTR_SET_OPENED(pOperator);
332,855✔
3185

3186
_return:
332,855✔
3187
  if (code) {
332,855✔
UNCOV
3188
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3189
    pOperator->pTaskInfo->code = code;
×
3190
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3191
  }
3192
  return code;
332,855✔
3193
}
3194

3195
/*
3196
 * Get next result block for virtual-table ts-scan workflow.
3197
 *
3198
 * @param pOperator Dynamic-query control operator.
3199
 * @param pRes Output result data block pointer.
3200
 *
3201
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
3202
 */
3203
int32_t vtbTsScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
2,447,639✔
3204
  int32_t                    code = TSDB_CODE_SUCCESS;
2,447,639✔
3205
  int32_t                    line = 0;
2,447,639✔
3206
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,447,639✔
3207
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,447,639✔
3208
  SOperatorInfo*             pMergeOp = pOperator->pDownstream[0];
2,447,639✔
3209
  SOperatorParam*            pMergeParam = NULL;
2,447,639✔
3210
  SSDataBlock*               pResult = NULL;
2,447,639✔
3211

3212
  QRY_PARAM_CHECK(pRes);
2,447,639✔
3213
  if (pOperator->status == OP_EXEC_DONE) {
2,447,639✔
UNCOV
3214
    return code;
×
3215
  }
3216

3217
  code = pOperator->fpSet._openFn(pOperator);
2,447,639✔
3218
  QUERY_CHECK_CODE(code, line, _return);
2,447,639✔
3219

3220
  if (pVtbScan->genNewParam) {
2,447,639✔
3221
    code = buildMergeOperatorParamForTsScan(pInfo, pMergeOp->numOfDownstream, &pMergeParam);
48,917✔
3222
    QUERY_CHECK_CODE(code, line, _return);
48,917✔
3223

3224
    code = pMergeOp->fpSet.getNextExtFn(pMergeOp, pMergeParam, &pResult);
48,917✔
3225
    QUERY_CHECK_CODE(code, line, _return);
48,917✔
3226
    pVtbScan->genNewParam = false;
48,917✔
3227
  } else {
3228
    code = pMergeOp->fpSet.getNextFn(pMergeOp, &pResult);
2,398,722✔
3229
    QUERY_CHECK_CODE(code, line, _return);
2,398,722✔
3230
  }
3231

3232
  if (pResult) {
2,447,639✔
3233
    *pRes = pResult;
2,398,722✔
3234
  } else {
3235
    *pRes = NULL;
48,917✔
3236
    setOperatorCompleted(pOperator);
48,917✔
3237
  }
3238

3239
_return:
2,447,639✔
3240
  if (code) {
2,447,639✔
UNCOV
3241
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3242
    pOperator->pTaskInfo->code = code;
×
3243
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3244
  }
3245
  return code;
2,447,639✔
3246
}
3247

3248
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,223,955✔
3249
  if (batchFetch) {
1,223,955✔
3250
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,222,773✔
3251
    if (NULL == pPrev->leftHash) {
1,222,773✔
UNCOV
3252
      return terrno;
×
3253
    }
3254
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,222,773✔
3255
    if (NULL == pPrev->rightHash) {
1,222,773✔
UNCOV
3256
      return terrno;
×
3257
    }
3258
  } else {
3259
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,182✔
3260
    if (NULL == pPrev->leftCache) {
1,182✔
UNCOV
3261
      return terrno;
×
3262
    }
3263
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,182✔
3264
    if (NULL == pPrev->rightCache) {
1,182✔
UNCOV
3265
      return terrno;
×
3266
    }
3267
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,182✔
3268
    if (NULL == pPrev->onceTable) {
1,182✔
UNCOV
3269
      return terrno;
×
3270
    }
3271
  }
3272

3273
  return TSDB_CODE_SUCCESS;
1,223,955✔
3274
}
3275

UNCOV
3276
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
3277
  if (pStreamRuntimeInfo == NULL) {
×
3278
    return;
×
3279
  }
3280

UNCOV
3281
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
3282
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
3283
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
3284
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
3285
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
3286

UNCOV
3287
      pVtbScan->dynTbUid = pValue->uid;
×
3288
      break;
×
3289
    }
3290
  }
3291
}
3292

3293
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
2,492,061✔
3294
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
3295
  int32_t      code = TSDB_CODE_SUCCESS;
2,492,061✔
3296
  int32_t      line = 0;
2,492,061✔
3297

3298
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
2,492,061✔
3299
  QUERY_CHECK_CODE(code, line, _return);
2,492,061✔
3300

3301
  pInfo->vtbScan.genNewParam = true;
2,492,061✔
3302
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
2,492,061✔
3303
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
2,492,061✔
3304
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
2,492,061✔
3305
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
2,492,061✔
3306
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
2,492,061✔
3307
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
2,492,061✔
3308
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
2,492,061✔
3309
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
2,492,061✔
3310
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
2,492,061✔
3311
  pInfo->vtbScan.needRedeploy = false;
2,492,061✔
3312
  pInfo->vtbScan.pMsgCb = pMsgCb;
2,492,061✔
3313
  pInfo->vtbScan.curTableIdx = 0;
2,492,061✔
3314
  pInfo->vtbScan.lastTableIdx = -1;
2,492,061✔
3315
  pInfo->vtbScan.dynTbUid = 0;
2,492,061✔
3316
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
2,492,061✔
3317
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
2,492,061✔
3318
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
2,492,061✔
3319
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
2,492,061✔
3320
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,492,061✔
3321
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
2,492,061✔
3322
  SNode* node = NULL;
2,492,061✔
3323
  FOREACH(node, pPhyciNode->vtbScan.pOrgVgIds) {
13,580,950✔
3324
    SValueNode* valueNode = (SValueNode*)node;
11,088,889✔
3325
    int32_t vgId = (int32_t)valueNode->datum.i;
11,088,889✔
3326
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
11,088,889✔
3327
    QUERY_CHECK_CODE(code, line, _return);
11,088,889✔
3328
  }
3329

3330
  if (pPhyciNode->dynTbname && pTaskInfo) {
2,492,061✔
UNCOV
3331
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3332
  }
3333

3334
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
2,492,061✔
3335
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
2,492,061✔
3336

3337
  SNode* colNode = NULL;
2,492,061✔
3338
  FOREACH(colNode, pPhyciNode->vtbScan.pScanCols) {
18,113,426✔
3339
    SColumnNode* pNode = (SColumnNode*)colNode;
15,621,365✔
3340
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
15,621,365✔
3341
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
31,242,730✔
3342
  }
3343

3344
  pInfo->vtbScan.readColSet =
2,492,061✔
3345
      taosHashInit(taosArrayGetSize(pInfo->vtbScan.readColList) > 0 ? taosArrayGetSize(pInfo->vtbScan.readColList) : 1,
2,492,061✔
3346
                   taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), true, HASH_NO_LOCK);
3347
  QUERY_CHECK_NULL(pInfo->vtbScan.readColSet, code, line, _return, terrno)
2,492,061✔
3348
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->vtbScan.readColList); i++) {
18,113,426✔
3349
    col_id_t colId = *(col_id_t*)taosArrayGet(pInfo->vtbScan.readColList, i);
15,621,365✔
3350
    code = taosHashPut(pInfo->vtbScan.readColSet, &colId, sizeof(colId), NULL, 0);
15,621,365✔
3351
    QUERY_CHECK_CODE(code, line, _return);
15,621,365✔
3352
  }
3353

3354
  pInfo->vtbScan.refColGroups = NULL;
2,492,061✔
3355

3356
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
2,492,061✔
3357
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
2,492,061✔
3358

3359
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,492,061✔
3360
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
2,492,061✔
3361

3362
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
2,492,061✔
3363
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
2,492,061✔
3364
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
2,492,061✔
3365
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
2,492,061✔
3366
  pInfo->vtbScan.vtbUidTagListMap = NULL;
2,492,061✔
3367
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
2,492,061✔
3368
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
2,492,061✔
3369

3370
  return code;
2,492,061✔
UNCOV
3371
_return:
×
3372
  // no need to destroy array and hashmap allocated in this function,
3373
  // since the operator's destroy function will take care of it
UNCOV
3374
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3375
  return code;
×
3376
}
3377

3378
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
612,671✔
3379
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3380
  int32_t              code = TSDB_CODE_SUCCESS;
612,671✔
3381
  int32_t              line = 0;
612,671✔
3382
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
612,671✔
3383

3384
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
612,671✔
3385
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
612,671✔
3386
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
612,671✔
3387
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
612,671✔
3388
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
612,671✔
3389
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
612,671✔
3390

3391
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
612,671✔
3392
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
612,671✔
3393

3394
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
612,671✔
3395
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
612,671✔
3396

3397
  pInfo->vtbWindow.outputWstartSlotId = -1;
612,671✔
3398
  pInfo->vtbWindow.outputWendSlotId = -1;
612,671✔
3399
  pInfo->vtbWindow.outputWdurationSlotId = -1;
612,671✔
3400
  pInfo->vtbWindow.curWinBatchIdx = 0;
612,671✔
3401

3402
  initResultSizeInfo(&pOperator->resultInfo, 1);
612,671✔
3403
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
612,671✔
3404
  QUERY_CHECK_CODE(code, line, _return);
612,671✔
3405

3406
  return code;
612,671✔
UNCOV
3407
_return:
×
3408
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3409
  return code;
×
3410
}
3411

3412
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
3,559,846✔
3413
  int32_t code = TSDB_CODE_SUCCESS;
3,559,846✔
3414
  int32_t lino = 0;
3,559,846✔
3415

3416
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
3,559,846✔
3417
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
3,559,846✔
3418
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
3,559,846✔
3419

3420
    *ppTsCols = (int64_t*)pColDataInfo->pData;
3,559,846✔
3421

3422
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
3,559,846✔
3423
      code = blockDataUpdateTsWindow(pBlock, slotId);
400,225✔
3424
      QUERY_CHECK_CODE(code, lino, _return);
400,225✔
3425
    }
3426
  }
3427

3428
  return code;
3,559,846✔
UNCOV
3429
_return:
×
3430
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3431
  return code;
×
3432
}
3433

3434
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
680,952✔
3435
  int32_t                    code = TSDB_CODE_SUCCESS;
680,952✔
3436
  int32_t                    lino = 0;
680,952✔
3437
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
680,952✔
3438
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
680,952✔
3439
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
680,952✔
3440

3441
  if (OPTR_IS_OPENED(pOperator)) {
680,952✔
3442
    return code;
68,281✔
3443
  }
3444

3445
  while (1) {
1,779,923✔
3446
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
2,392,594✔
3447
    if (pBlock == NULL) {
2,391,977✔
3448
      break;
612,054✔
3449
    }
3450

3451
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
1,779,923✔
3452
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
4,328,296✔
3453
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
3,716,873✔
3454
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
3,716,873✔
3455
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
1,024,906✔
3456
            pInfo->outputWstartSlotId = i;
399,137✔
3457
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
625,769✔
3458
            pInfo->outputWendSlotId = i;
399,137✔
3459
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
226,632✔
3460
            pInfo->outputWdurationSlotId = i;
226,632✔
3461
          }
3462
        }
3463
      }
3464
    }
3465

3466
    TSKEY* wstartCol = NULL;
1,779,923✔
3467
    TSKEY* wendCol = NULL;
1,779,923✔
3468

3469
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
1,779,923✔
3470
    QUERY_CHECK_CODE(code, lino, _return);
1,779,923✔
3471
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
1,779,923✔
3472
    QUERY_CHECK_CODE(code, lino, _return);
1,779,923✔
3473

3474
    SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
1,779,923✔
3475
    QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
1,779,923✔
3476

3477
    QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
1,779,923✔
3478

3479
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
3480
      SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
2,147,483,647✔
3481
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
2,147,483,647✔
3482
      pWindow->tw.skey = wstartCol[i];
2,147,483,647✔
3483
      pWindow->tw.ekey = wendCol[i] + 1;
2,147,483,647✔
3484
      pWindow->resWinIdx = -1;
2,147,483,647✔
3485
    }
3486

3487
    QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
3,559,846✔
3488
  }
3489

3490
  // handle first window's start key and last window's end key
3491
  int32_t winBatchNum = (int32_t)taosArrayGetSize(pDynInfo->vtbWindow.pWins);
612,054✔
3492
  if (winBatchNum > 0) {
612,054✔
3493
    SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
611,423✔
3494
    SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, winBatchNum - 1);
611,423✔
3495

3496
    QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
611,423✔
3497
    QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
611,423✔
3498

3499
    SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
611,423✔
3500
    SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
611,423✔
3501

3502
    QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
611,423✔
3503
    QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
611,423✔
3504

3505
    if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
611,423✔
3506
      lastWin->tw.ekey = INT64_MAX;
172,487✔
3507
    }
3508
    if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
611,423✔
3509
      firstWin->tw.skey = INT64_MIN;
172,487✔
3510
    }
3511
  }
3512

3513
  if (pInfo->isVstb) {
612,054✔
3514
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
306,768✔
3515
    QUERY_CHECK_CODE(code, lino, _return);
306,768✔
3516
  }
3517

3518
  OPTR_SET_OPENED(pOperator);
612,054✔
3519
_return:
612,054✔
3520
  if (code != TSDB_CODE_SUCCESS) {
612,054✔
UNCOV
3521
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3522
    pTaskInfo->code = code;
×
3523
    T_LONG_JMP(pTaskInfo->env, code);
×
3524
  }
3525
  return code;
612,054✔
3526
}
3527

3528
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
334,073✔
3529
  int32_t                       code = TSDB_CODE_SUCCESS;
334,073✔
3530
  int32_t                       lino = 0;
334,073✔
3531
  SExternalWindowOperatorParam* pExtWinOp = NULL;
334,073✔
3532

3533
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
334,073✔
3534
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
334,073✔
3535

3536
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
334,073✔
3537
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
334,073✔
3538

3539
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
334,073✔
3540
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
334,073✔
3541

3542
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(pWins, 0);
334,073✔
3543
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(pWins);
334,073✔
3544

3545
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno);
334,073✔
3546
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno);
334,073✔
3547

3548
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
334,073✔
3549
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
334,073✔
3550

3551
  SOperatorParam* pExchangeParam = NULL;
334,073✔
3552
  code = buildBatchExchangeOperatorParamForVirtual(
334,073✔
3553
      &pExchangeParam, 0, NULL, 0, pInfo->vtbScan.otbVgIdToOtbInfoArrayMap,
3554
      (STimeWindow){.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey}, EX_SRC_TYPE_VSTB_WIN_SCAN,
334,073✔
3555
      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
3556
  QUERY_CHECK_CODE(code, lino, _return);
334,073✔
3557

3558
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
668,146✔
3559

3560
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
334,073✔
3561
  (*ppRes)->downstreamIdx = idx;
334,073✔
3562
  (*ppRes)->value = pExtWinOp;
334,073✔
3563
  (*ppRes)->reUse = false;
334,073✔
3564

3565
  return code;
334,073✔
UNCOV
3566
_return:
×
3567
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3568
  if (pExtWinOp) {
×
3569
    if (pExtWinOp->ExtWins) {
×
3570
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3571
    }
UNCOV
3572
    taosMemoryFree(pExtWinOp);
×
3573
  }
UNCOV
3574
  if (*ppRes) {
×
3575
    if ((*ppRes)->pChildren) {
×
3576
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3577
        SOperatorParam* pChildParam = taosArrayGetP((*ppRes)->pChildren, i);
×
3578
        if (pChildParam) {
×
3579
          freeOperatorParam(pChildParam, OP_GET_PARAM);
×
3580
        }
3581
      }
UNCOV
3582
      taosArrayDestroy((*ppRes)->pChildren);
×
3583
    }
UNCOV
3584
    taosMemoryFree(*ppRes);
×
3585
    *ppRes = NULL;
×
3586
  }
UNCOV
3587
  return code;
×
3588
}
3589

3590
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
680,952✔
3591
  int32_t                    code = TSDB_CODE_SUCCESS;
680,952✔
3592
  int32_t                    lino = 0;
680,952✔
3593
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
680,952✔
3594
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
680,952✔
3595
  int32_t                    numOfWins = 0;
680,952✔
3596
  SOperatorInfo*             mergeOp = NULL;
680,952✔
3597
  SOperatorInfo*             extWinOp = NULL;
680,952✔
3598
  SOperatorParam*            pMergeParam = NULL;
680,952✔
3599
  SOperatorParam*            pExtWinParam = NULL;
680,952✔
3600
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
680,952✔
3601
  SSDataBlock*               pRes = pInfo->pRes;
680,952✔
3602

3603
  code = pOperator->fpSet._openFn(pOperator);
680,952✔
3604
  QUERY_CHECK_CODE(code, lino, _return);
680,335✔
3605

3606
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
680,335✔
3607
    *ppRes = NULL;
33,530✔
3608
    return code;
33,530✔
3609
  }
3610

3611
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
646,805✔
3612
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
646,805✔
3613

3614
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
646,805✔
3615

3616
  if (pInfo->isVstb) {
646,805✔
3617
    extWinOp = pOperator->pDownstream[2];
334,073✔
3618
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
334,073✔
3619
    QUERY_CHECK_CODE(code, lino, _return);
334,073✔
3620

3621
    SSDataBlock* pExtWinBlock = NULL;
334,073✔
3622
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
334,073✔
3623
    QUERY_CHECK_CODE(code, lino, _return);
332,839✔
3624
    setOperatorCompleted(extWinOp);
332,839✔
3625
    // Free the parameter after operator completes, as it's been saved to the operator
3626
    if (extWinOp->pOperatorGetParam) {
332,839✔
UNCOV
3627
      freeOperatorParam(extWinOp->pOperatorGetParam, OP_GET_PARAM);
×
3628
      extWinOp->pOperatorGetParam = NULL;
×
3629
    }
3630
    // Also free downstream params if any
3631
    if (extWinOp->pDownstreamGetParams) {
332,839✔
3632
      for (int32_t i = 0; i < extWinOp->numOfDownstream; i++) {
665,678✔
3633
        if (extWinOp->pDownstreamGetParams[i]) {
332,839✔
UNCOV
3634
          freeOperatorParam(extWinOp->pDownstreamGetParams[i], OP_GET_PARAM);
×
3635
          extWinOp->pDownstreamGetParams[i] = NULL;
×
3636
        }
3637
      }
3638
    }
3639

3640
    blockDataCleanup(pRes);
332,839✔
3641
    code = blockDataEnsureCapacity(pRes, numOfWins);
332,839✔
3642
    QUERY_CHECK_CODE(code, lino, _return);
332,839✔
3643

3644
    if (pExtWinBlock) {
332,839✔
3645
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
332,839✔
3646
      QUERY_CHECK_CODE(code, lino, _return);
332,839✔
3647

3648
      if (pInfo->curWinBatchIdx == 0) {
332,839✔
3649
        // first batch, bound _wstart by upstream window range
3650
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
304,903✔
3651
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
304,903✔
3652

3653
        firstWin->tw.skey = TMAX(firstWin->tw.skey, pExtWinBlock->info.window.skey);
304,903✔
3654
      }
3655
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
332,839✔
3656
        // last batch, bound _wend by upstream window range
3657
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
39,253✔
3658
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
39,253✔
3659

3660
        lastWin->tw.ekey = TMIN(lastWin->tw.ekey, pExtWinBlock->info.window.ekey + 1);
39,253✔
3661
      }
3662
    }
3663
  } else {
3664
    mergeOp = pOperator->pDownstream[1];
312,732✔
3665
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
312,732✔
3666
    QUERY_CHECK_CODE(code, lino, _return);
312,732✔
3667

3668
    SSDataBlock* pMergedBlock = NULL;
312,732✔
3669
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
312,732✔
3670
    QUERY_CHECK_CODE(code, lino, _return);
312,732✔
3671
    // Free the parameter after operator completes, as it's been saved to the operator
3672
    if (mergeOp->pOperatorGetParam) {
312,732✔
UNCOV
3673
      freeOperatorParam(mergeOp->pOperatorGetParam, OP_GET_PARAM);
×
3674
      mergeOp->pOperatorGetParam = NULL;
×
3675
    }
3676
    // Also free downstream params if any
3677
    if (mergeOp->pDownstreamGetParams) {
312,732✔
3678
      for (int32_t i = 0; i < mergeOp->numOfDownstream; i++) {
1,064,778✔
3679
        if (mergeOp->pDownstreamGetParams[i]) {
752,046✔
UNCOV
3680
          freeOperatorParam(mergeOp->pDownstreamGetParams[i], OP_GET_PARAM);
×
3681
          mergeOp->pDownstreamGetParams[i] = NULL;
×
3682
        }
3683
      }
3684
    }
3685

3686
    blockDataCleanup(pRes);
312,732✔
3687
    code = blockDataEnsureCapacity(pRes, numOfWins);
312,732✔
3688
    QUERY_CHECK_CODE(code, lino, _return);
312,732✔
3689

3690
    if (pMergedBlock) {
312,732✔
3691
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
312,732✔
3692
      QUERY_CHECK_CODE(code, lino, _return);
312,732✔
3693

3694
      if (pInfo->curWinBatchIdx == 0) {
312,732✔
3695
        // first batch, bound _wstart by upstream window range
3696
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
305,286✔
3697
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
305,286✔
3698

3699
        firstWin->tw.skey = TMAX(firstWin->tw.skey, pMergedBlock->info.window.skey);
305,286✔
3700
      }
3701
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
312,732✔
3702
        // last batch, bound _wend by upstream window range
3703
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
7,446✔
3704
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
7,446✔
3705

3706
        lastWin->tw.ekey = TMIN(lastWin->tw.ekey, pMergedBlock->info.window.ekey + 1);
7,446✔
3707
      }
3708
    }
3709
  }
3710

3711

3712
  if (pInfo->outputWstartSlotId != -1) {
645,571✔
3713
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
433,285✔
3714
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
433,285✔
3715

3716
    for (int32_t i = 0; i < numOfWins; i++) {
1,242,152,702✔
3717
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
1,241,719,417✔
3718
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
1,241,719,417✔
3719
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
1,241,719,417✔
3720
      QUERY_CHECK_CODE(code, lino, _return);
1,241,719,417✔
3721
    }
3722
  }
3723
  if (pInfo->outputWendSlotId != -1) {
645,571✔
3724
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
433,285✔
3725
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
433,285✔
3726

3727
    for (int32_t i = 0; i < numOfWins; i++) {
1,242,152,702✔
3728
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
1,241,719,417✔
3729
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
1,241,719,417✔
3730
      TSKEY ekey = pWindow->tw.ekey - 1;
1,241,719,417✔
3731
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
1,241,719,417✔
3732
      QUERY_CHECK_CODE(code, lino, _return);
1,241,719,417✔
3733
    }
3734
  }
3735
  if (pInfo->outputWdurationSlotId != -1) {
645,571✔
3736
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
258,228✔
3737
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
258,228✔
3738

3739
    for (int32_t i = 0; i < numOfWins; i++) {
773,517,864✔
3740
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
773,259,636✔
3741
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
773,259,636✔
3742
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
773,259,636✔
3743
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
773,259,636✔
3744
      QUERY_CHECK_CODE(code, lino, _return);
773,259,636✔
3745
    }
3746
  }
3747

3748
  pRes->info.rows = numOfWins;
645,571✔
3749
  *ppRes = pRes;
645,571✔
3750
  pInfo->curWinBatchIdx++;
645,571✔
3751

3752
  return code;
645,571✔
3753

UNCOV
3754
_return:
×
3755
  if (code != TSDB_CODE_SUCCESS) {
×
3756
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3757
    pTaskInfo->code = code;
×
3758
    T_LONG_JMP(pTaskInfo->env, code);
×
3759
  }
UNCOV
3760
  return code;
×
3761
}
3762

3763
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,323,422✔
3764
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
2,323,422✔
3765
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
2,324,592✔
3766
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
2,325,294✔
3767

3768
  pOper->status = OP_NOT_OPENED;
2,325,294✔
3769

3770
  switch (pDyn->qType) {
2,325,294✔
3771
    case DYN_QTYPE_STB_HASH:{
831✔
3772
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
831✔
3773
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
831✔
3774
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
831✔
3775
      
3776
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
831✔
3777
      if (TSDB_CODE_SUCCESS != code) {
831✔
UNCOV
3778
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
3779
        return code;
×
3780
      }
3781
      pStbJoin->ctx.prev.pListHead = NULL;
831✔
3782
      pStbJoin->ctx.prev.joinBuild = false;
831✔
3783
      pStbJoin->ctx.prev.pListTail = NULL;
831✔
3784
      pStbJoin->ctx.prev.tableNum = 0;
831✔
3785

3786
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
831✔
3787
      break; 
831✔
3788
    }
3789
    case DYN_QTYPE_VTB_SCAN:
2,323,761✔
3790
    case DYN_QTYPE_VTB_TS_SCAN:
3791
    case DYN_QTYPE_VTB_AGG:
3792
    case DYN_QTYPE_VTB_INTERVAL: {
3793
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,323,761✔
3794
      
3795
      if (pVtbScan->otbNameToOtbInfoMap) {
2,323,761✔
UNCOV
3796
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3797
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3798
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3799
      }
3800
      if (pVtbScan->pRsp) {
2,323,761✔
UNCOV
3801
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3802
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3803
      }
3804
      if (pVtbScan->colRefInfo) {
2,323,059✔
3805
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
148,625✔
3806
        pVtbScan->colRefInfo = NULL;
148,625✔
3807
      }
3808
      if (pVtbScan->childTableMap) {
2,323,293✔
3809
        taosHashCleanup(pVtbScan->childTableMap);
6,108✔
3810
        pVtbScan->childTableMap = NULL;
6,108✔
3811
      }
3812
      if (pVtbScan->childTableList) {
2,322,123✔
3813
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,322,123✔
3814
      }
3815
      if (pPhyciNode->dynTbname && pTaskInfo) {
2,323,761✔
UNCOV
3816
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3817
      }
3818
      pVtbScan->curTableIdx = 0;
2,323,995✔
3819
      pVtbScan->lastTableIdx = -1;
2,323,527✔
3820
      break;
2,323,761✔
3821
    }
UNCOV
3822
    case DYN_QTYPE_VTB_WINDOW: {
×
3823
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3824
      if (pVtbWindow->pRes) {
×
3825
        blockDataDestroy(pVtbWindow->pRes);
×
3826
        pVtbWindow->pRes = NULL;
×
3827
      }
UNCOV
3828
      if (pVtbWindow->pWins) {
×
3829
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
3830
        pVtbWindow->pWins = NULL;
×
3831
      }
UNCOV
3832
      pVtbWindow->outputWdurationSlotId = -1;
×
3833
      pVtbWindow->outputWendSlotId = -1;
×
3834
      pVtbWindow->outputWstartSlotId = -1;
×
3835
      pVtbWindow->curWinBatchIdx = 0;
×
3836
      break;
×
3837
    }
UNCOV
3838
    default:
×
3839
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
3840
      break;
×
3841
  }
3842
  return 0;
2,324,124✔
3843
}
3844

3845
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,426,756✔
3846
  int32_t                    code = TSDB_CODE_SUCCESS;
1,426,756✔
3847
  int32_t                    line = 0;
1,426,756✔
3848
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,426,756✔
3849
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,426,756✔
3850
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
1,426,756✔
3851
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
1,426,756✔
3852
  SOperatorParam*            pAggParam = NULL;
1,426,756✔
3853

3854
  if (pInfo->vtbScan.hasPartition) {
1,426,756✔
3855
    if (pInfo->vtbScan.batchProcessChild) {
1,147,967✔
3856
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
463,007✔
3857
      while (pIter) {
960,665✔
3858
        size_t     keyLen = 0;
838,187✔
3859
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
838,187✔
3860

3861
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
838,187✔
3862
        QUERY_CHECK_CODE(code, line, _return);
838,187✔
3863

3864
        if (pAggParam) {
838,187✔
3865
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
838,187✔
3866
          QUERY_CHECK_CODE(code, line, _return);
837,570✔
3867
        } else {
UNCOV
3868
          *pRes = NULL;
×
3869
        }
3870

3871
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
837,570✔
3872

3873
        if (*pRes) {
837,570✔
3874
          (*pRes)->info.id.groupId = groupid;
339,912✔
3875
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
339,912✔
3876
          QUERY_CHECK_CODE(code, line, _return);
339,912✔
3877
          break;
339,912✔
3878
        }
3879
      }
3880
    } else {
3881
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
684,960✔
3882
      while (pIter) {
940,164✔
3883
        size_t     keyLen = 0;
880,116✔
3884
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
880,116✔
3885
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
880,116✔
3886

3887
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
880,116✔
3888
        while (pIter2) {
1,420,548✔
3889
          size_t   keyLen2 = 0;
1,165,344✔
3890
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
1,165,344✔
3891
          SArray*  pTagList = *(SArray**)pIter2;
1,165,344✔
3892

3893
          if (pVtbScan->genNewParam) {
1,165,344✔
3894
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
540,432✔
3895
            QUERY_CHECK_CODE(code, line, _return);
540,432✔
3896
            if (pAggParam) {
540,432✔
3897
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
540,432✔
3898
              QUERY_CHECK_CODE(code, line, _return);
540,432✔
3899
            } else {
UNCOV
3900
              *pRes = NULL;
×
3901
            }
3902
          } else {
3903
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
624,912✔
3904
            QUERY_CHECK_CODE(code, line, _return);
624,912✔
3905
          }
3906

3907
          if (*pRes) {
1,165,344✔
3908
            pVtbScan->genNewParam = false;
624,912✔
3909
            (*pRes)->info.id.groupId = *groupid;
624,912✔
3910
            break;
624,912✔
3911
          }
3912
          pVtbScan->genNewParam = true;
540,432✔
3913
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
540,432✔
3914
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
540,432✔
3915
          QUERY_CHECK_CODE(code, line, _return);
540,432✔
3916
        }
3917
        if (*pRes) {
880,116✔
3918
          break;
624,912✔
3919
        }
3920
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
255,204✔
3921
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
255,204✔
3922
        QUERY_CHECK_CODE(code, line, _return);
255,204✔
3923
      }
3924
    }
3925

3926
  } else {
3927
    if (pInfo->vtbScan.batchProcessChild) {
278,789✔
3928
      code = buildAggOperatorParam(pInfo, &pAggParam);
44,061✔
3929
      QUERY_CHECK_CODE(code, line, _return);
44,061✔
3930

3931
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
44,061✔
3932
      QUERY_CHECK_CODE(code, line, _return);
43,444✔
3933
      setOperatorCompleted(pOperator);
43,444✔
3934
    } else {
3935
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
234,728✔
3936
      while (pIter) {
389,356✔
3937
        size_t   keyLen = 0;
369,464✔
3938
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
369,464✔
3939
        SArray*  pTagList = *(SArray**)pIter;
369,464✔
3940

3941
        if (pVtbScan->genNewParam) {
369,464✔
3942
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
154,628✔
3943
          QUERY_CHECK_CODE(code, line, _return);
154,628✔
3944

3945
          if (pAggParam) {
154,628✔
3946
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
154,628✔
3947
            QUERY_CHECK_CODE(code, line, _return);
154,628✔
3948
          } else {
UNCOV
3949
            *pRes = NULL;
×
3950
          }
3951
        } else {
3952
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
214,836✔
3953
          QUERY_CHECK_CODE(code, line, _return);
214,836✔
3954
        }
3955

3956
        if (*pRes) {
369,464✔
3957
          pVtbScan->genNewParam = false;
214,836✔
3958
          break;
214,836✔
3959
        }
3960
        pVtbScan->genNewParam = true;
154,628✔
3961
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
154,628✔
3962
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
154,628✔
3963
        QUERY_CHECK_CODE(code, line, _return);
154,628✔
3964
      }
3965
    }
3966
  }
3967
_return:
1,425,522✔
3968
  if (code) {
1,425,522✔
UNCOV
3969
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3970
  }
3971
  return code;
1,425,522✔
3972
}
3973

3974
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,470,200✔
3975
  int32_t                    code = TSDB_CODE_SUCCESS;
1,470,200✔
3976
  int32_t                    line = 0;
1,470,200✔
3977
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,470,200✔
3978
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,470,200✔
3979

3980
  QRY_PARAM_CHECK(pRes);
1,470,200✔
3981
  if (pOperator->status == OP_EXEC_DONE) {
1,470,200✔
3982
    return code;
43,444✔
3983
  }
3984

3985
  code = pOperator->fpSet._openFn(pOperator);
1,426,756✔
3986
  QUERY_CHECK_CODE(code, line, _return);
1,426,756✔
3987

3988
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
1,426,756✔
UNCOV
3989
    setOperatorCompleted(pOperator);
×
3990
    return code;
×
3991
  }
3992

3993
  code = virtualTableAggGetNext(pOperator, pRes);
1,426,756✔
3994
  QUERY_CHECK_CODE(code, line, _return);
1,425,522✔
3995

3996
  return code;
1,425,522✔
3997

UNCOV
3998
_return:
×
3999
  if (code) {
×
4000
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
4001
    pOperator->pTaskInfo->code = code;
×
4002
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
4003
  }
UNCOV
4004
  return code;
×
4005
}
4006

4007
/*
4008
 * Build hash-interval operator params for interval dynamic query.
4009
 *
4010
 * @param pInfo Dynamic-query control operator runtime info.
4011
 * @param ppRes Output interval operator param.
4012
 *
4013
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4014
 */
UNCOV
4015
static int32_t buildHashIntervalOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
×
4016
  int32_t                   code = TSDB_CODE_SUCCESS;
×
4017
  int32_t                   lino = 0;
×
4018
  SOperatorParam*           pParam = NULL;
×
4019
  SOperatorParam*           pExchangeParam = NULL;
×
4020
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
4021

UNCOV
4022
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
×
4023
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
×
4024

UNCOV
4025
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
×
4026
  pParam->downstreamIdx = 0;
×
4027
  pParam->reUse = false;
×
4028
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
×
4029
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
×
4030

UNCOV
4031
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
×
4032
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
×
4033

UNCOV
4034
  code = buildBatchExchangeOperatorParamForVirtual(
×
4035
      &pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap,
UNCOV
4036
      (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_INTERVAL_SCAN,
×
4037
      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
UNCOV
4038
  QUERY_CHECK_CODE(code, lino, _return);
×
4039

UNCOV
4040
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
×
4041
  pExchangeParam = NULL;
×
4042

UNCOV
4043
  *ppRes = pParam;
×
4044

UNCOV
4045
  return code;
×
4046
_return:
×
4047
  if (pExchangeParam) {
×
4048
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
4049
  }
UNCOV
4050
  if (pParam) {
×
4051
    freeOperatorParam(pParam, OP_GET_PARAM);
×
4052
  }
UNCOV
4053
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4054
  return code;
×
4055
}
4056

4057
/*
4058
 * Build merge operator params for split-interval execution.
4059
 *
4060
 * @param pInfo Dynamic-query control operator runtime info.
4061
 * @param pMergeOp Merge operator used by interval execution.
4062
 * @param ppRes Output merge operator param.
4063
 *
4064
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4065
 */
4066
static int32_t buildSplitIntervalMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorInfo* pMergeOp,
36,842✔
4067
                                                    SOperatorParam** ppRes) {
4068
  int32_t              code = TSDB_CODE_SUCCESS;
36,842✔
4069
  int32_t              lino = 0;
36,842✔
4070
  SVtbScanDynCtrlInfo* pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
36,842✔
4071
  SOperatorParam*      pExchangeParam = NULL;
36,842✔
4072
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
36,842✔
4073
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
36,842✔
4074

4075
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
36,842✔
4076
  (*ppRes)->downstreamIdx = 0;
36,842✔
4077
  (*ppRes)->reUse = false;
36,842✔
4078
  (*ppRes)->value = taosMemoryCalloc(1, sizeof(SMergeOperatorParam));
36,842✔
4079
  QUERY_CHECK_NULL((*ppRes)->value, code, lino, _return, terrno)
36,842✔
4080

4081
  (*ppRes)->pChildren = taosArrayInit(pMergeOp->numOfDownstream, POINTER_BYTES);
36,842✔
4082
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
36,842✔
4083

4084
  for (int32_t i = 0; i < pMergeOp->numOfDownstream; ++i) {
250,890✔
4085
    code = buildBatchExchangeOperatorParamForVirtual(
214,048✔
4086
        &pExchangeParam, i, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap,
4087
        (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN,
214,048✔
4088
        QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
4089
    QUERY_CHECK_CODE(code, lino, _return);
214,048✔
4090
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
428,096✔
4091
    pExchangeParam = NULL;
214,048✔
4092
  }
4093

4094
  return code;
36,842✔
4095

UNCOV
4096
_return:
×
4097
  if (pExchangeParam) {
×
4098
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
4099
  }
UNCOV
4100
  freeOperatorParam(*ppRes, OP_GET_PARAM);
×
4101
  *ppRes = NULL;
×
4102
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4103
  return code;
×
4104
}
4105

4106
/*
4107
 * Build split-interval root operator params with one merge child.
4108
 *
4109
 * @param pInfo Dynamic-query control operator runtime info.
4110
 * @param pIntervalOp Interval operator whose downstream contains merge node.
4111
 * @param ppRes Output split-interval operator param.
4112
 *
4113
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4114
 */
4115
static int32_t buildSplitIntervalOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorInfo* pIntervalOp,
36,842✔
4116
                                               SOperatorParam** ppRes) {
4117
  int32_t         code = TSDB_CODE_SUCCESS;
36,842✔
4118
  int32_t         lino = 0;
36,842✔
4119
  SOperatorInfo*  pMergeOp = NULL;
36,842✔
4120
  SOperatorParam* pMergeParam = NULL;
36,842✔
4121

4122
  QUERY_CHECK_NULL(pIntervalOp->pDownstream, code, lino, _return, TSDB_CODE_INVALID_PARA)
36,842✔
4123
  pMergeOp = pIntervalOp->pDownstream[0];
36,842✔
4124
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, TSDB_CODE_INVALID_PARA)
36,842✔
4125
  if (QUERY_NODE_PHYSICAL_PLAN_MERGE != pMergeOp->operatorType) {
36,842✔
UNCOV
4126
    qError("%s invalid downstream operator type %d for split interval", __func__, pMergeOp->operatorType);
×
4127
    return TSDB_CODE_INVALID_PARA;
×
4128
  }
4129

4130
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
36,842✔
4131
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
36,842✔
4132

4133
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL;
36,842✔
4134
  (*ppRes)->downstreamIdx = 0;
36,842✔
4135
  (*ppRes)->reUse = false;
36,842✔
4136
  (*ppRes)->value = taosMemoryCalloc(1, sizeof(SAggOperatorParam));
36,842✔
4137
  QUERY_CHECK_NULL((*ppRes)->value, code, lino, _return, terrno)
36,842✔
4138

4139
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
36,842✔
4140
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
36,842✔
4141

4142
  code = buildSplitIntervalMergeOperatorParam(pInfo, pMergeOp, &pMergeParam);
36,842✔
4143
  QUERY_CHECK_CODE(code, lino, _return);
36,842✔
4144
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pMergeParam), code, lino, _return, terrno)
73,684✔
4145
  pMergeParam = NULL;
36,842✔
4146

4147
  return code;
36,842✔
4148

UNCOV
4149
_return:
×
4150
  if (pMergeParam) {
×
4151
    freeOperatorParam(pMergeParam, OP_GET_PARAM);
×
4152
  }
UNCOV
4153
  freeOperatorParam(*ppRes, OP_GET_PARAM);
×
4154
  *ppRes = NULL;
×
4155
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
4156
  return code;
×
4157
}
4158

4159
/*
4160
 * Dispatch interval-param builder by interval operator physical type.
4161
 *
4162
 * @param pInfo Dynamic-query control operator runtime info.
4163
 * @param pIntervalOp Interval operator to build params for.
4164
 * @param ppRes Output interval operator param.
4165
 *
4166
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4167
 */
4168
static int32_t buildIntervalOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorInfo* pIntervalOp,
36,842✔
4169
                                          SOperatorParam** ppRes) {
4170
  if (NULL == pIntervalOp) {
36,842✔
UNCOV
4171
    return TSDB_CODE_INVALID_PARA;
×
4172
  }
4173

4174
  switch (pIntervalOp->operatorType) {
36,842✔
UNCOV
4175
    case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
×
4176
      return buildHashIntervalOperatorParam(pInfo, ppRes);
×
4177
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
36,842✔
4178
      return buildSplitIntervalOperatorParam(pInfo, pIntervalOp, ppRes);
36,842✔
UNCOV
4179
    default:
×
4180
      qError("%s unsupported interval operator type %d", __func__, pIntervalOp->operatorType);
×
4181
      return TSDB_CODE_INVALID_PARA;
×
4182
  }
4183
}
4184

4185
/*
4186
 * Get next result block for virtual-table interval workflow.
4187
 *
4188
 * @param pOperator Dynamic-query control operator.
4189
 * @param pRes Output result data block pointer.
4190
 *
4191
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
4192
 */
4193
int32_t vtbIntervalNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
81,692✔
4194
  int32_t                    code = TSDB_CODE_SUCCESS;
81,692✔
4195
  int32_t                    line = 0;
81,692✔
4196
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
81,692✔
4197
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
81,692✔
4198
  SOperatorInfo*             pInterval = pOperator->pDownstream[pOperator->numOfDownstream - 1];
81,692✔
4199
  SOperatorParam*            pIntervalParam = NULL;
81,692✔
4200
  SSDataBlock*               pResult = NULL;
81,692✔
4201

4202
  QRY_PARAM_CHECK(pRes);
81,692✔
4203
  if (pOperator->status == OP_EXEC_DONE) {
81,692✔
UNCOV
4204
    return code;
×
4205
  }
4206

4207
  code = pOperator->fpSet._openFn(pOperator);
81,692✔
4208
  QUERY_CHECK_CODE(code, line, _return);
81,692✔
4209

4210
  if (pVtbScan->genNewParam) {
81,692✔
4211
    qDebug("%s vtb interval split start, intervalOp:%s type:%d", GET_TASKID(pOperator->pTaskInfo), pInterval->name,
36,842✔
4212
           pInterval->operatorType);
4213
    code = buildIntervalOperatorParam(pInfo, pInterval, &pIntervalParam);
36,842✔
4214
    QUERY_CHECK_CODE(code, line, _return);
36,842✔
4215

4216
    code = pInterval->fpSet.getNextExtFn(pInterval, pIntervalParam, &pResult);
36,842✔
4217
    QUERY_CHECK_CODE(code, line, _return);
36,225✔
4218

4219
    pVtbScan->genNewParam = false;
36,225✔
4220
  } else {
4221
    code = pInterval->fpSet.getNextFn(pInterval, &pResult);
44,850✔
4222
    QUERY_CHECK_CODE(code, line, _return);
44,850✔
4223
  }
4224

4225
  if (pResult) {
81,075✔
4226
    qDebug("%s vtb interval got result rows:%" PRId64 " status:%d", GET_TASKID(pOperator->pTaskInfo),
44,850✔
4227
           pResult->info.rows, pInterval->status);
4228
    *pRes = pResult;
44,850✔
4229
  } else {
4230
    qDebug("%s vtb interval got empty result, interval status:%d", GET_TASKID(pOperator->pTaskInfo), pInterval->status);
36,225✔
4231
    *pRes = NULL;
36,225✔
4232
    setOperatorCompleted(pOperator);
36,225✔
4233
  }
4234

4235
  return code;
81,075✔
4236

UNCOV
4237
_return:
×
4238
  if (code) {
×
4239
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
4240
    pOperator->pTaskInfo->code = code;
×
4241
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
4242
  }
UNCOV
4243
  return code;
×
4244
}
4245

4246
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
3,715,185✔
4247
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
4248
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
4249
  QRY_PARAM_CHECK(pOptrInfo);
3,715,185✔
4250

4251
  int32_t                    code = TSDB_CODE_SUCCESS;
3,715,185✔
4252
  int32_t                    line = 0;
3,715,185✔
4253
  __optr_fn_t                nextFp = NULL;
3,715,185✔
4254
  __optr_open_fn_t           openFp = NULL;
3,715,185✔
4255
  SOperatorInfo*             pOperator = NULL;
3,715,185✔
4256
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
3,715,185✔
4257
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
3,715,185✔
4258

4259
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,715,185✔
4260
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
3,715,185✔
4261
  initOperatorCostInfo(pOperator);
3,715,185✔
4262

4263
  pOperator->pPhyNode = pPhyciNode;
3,715,185✔
4264
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
3,715,185✔
4265

4266
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
3,715,185✔
4267
  QUERY_CHECK_CODE(code, line, _error);
3,715,185✔
4268

4269
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
3,715,185✔
4270
                  pInfo, pTaskInfo);
4271

4272
  pInfo->qType = pPhyciNode->qType;
3,715,185✔
4273
  switch (pInfo->qType) {
3,715,185✔
4274
    case DYN_QTYPE_STB_HASH:
1,223,124✔
4275
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,223,124✔
4276
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,223,124✔
4277
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,223,124✔
4278
      QUERY_CHECK_CODE(code, line, _error);
1,223,124✔
4279
      nextFp = seqStableJoin;
1,223,124✔
4280
      openFp = optrDummyOpenFn;
1,223,124✔
4281
      break;
1,223,124✔
4282
    case DYN_QTYPE_VTB_SCAN:
1,546,535✔
4283
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,546,535✔
4284
      QUERY_CHECK_CODE(code, line, _error);
1,546,535✔
4285
      nextFp = vtbScanNext;
1,546,535✔
4286
      openFp = vtbScanOpen;
1,546,535✔
4287
      break;
1,546,535✔
4288
    case DYN_QTYPE_VTB_TS_SCAN:
48,917✔
4289
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
48,917✔
4290
      QUERY_CHECK_CODE(code, line, _error);
48,917✔
4291
      nextFp = vtbTsScanNext;
48,917✔
4292
      openFp = vtbDefaultOpen;
48,917✔
4293
      break;
48,917✔
4294
    case DYN_QTYPE_VTB_WINDOW:
612,671✔
4295
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
612,671✔
4296
      QUERY_CHECK_CODE(code, line, _error);
612,671✔
4297
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
612,671✔
4298
      QUERY_CHECK_CODE(code, line, _error);
612,671✔
4299
      nextFp = vtbWindowNext;
612,671✔
4300
      openFp = vtbWindowOpen;
612,671✔
4301
      break;
612,671✔
4302
    case DYN_QTYPE_VTB_AGG:
247,096✔
4303
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
247,096✔
4304
      QUERY_CHECK_CODE(code, line, _error);
247,096✔
4305
      nextFp = vtbAggNext;
247,096✔
4306
      openFp = vtbDefaultOpen;
247,096✔
4307
      break;
247,096✔
4308
    case DYN_QTYPE_VTB_INTERVAL:
36,842✔
4309
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
36,842✔
4310
      QUERY_CHECK_CODE(code, line, _error);
36,842✔
4311
      nextFp = vtbIntervalNext;
36,842✔
4312
      openFp = vtbDefaultOpen;
36,842✔
4313
      break;
36,842✔
UNCOV
4314
    default:
×
4315
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
4316
      code = TSDB_CODE_INVALID_PARA;
×
4317
      goto _error;
×
4318
  }
4319

4320
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
3,715,185✔
4321
                                         NULL, optrDefaultGetNextExtFn, NULL);
4322

4323
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
3,715,185✔
4324
  *pOptrInfo = pOperator;
3,715,185✔
4325
  return TSDB_CODE_SUCCESS;
3,715,185✔
4326

UNCOV
4327
_error:
×
4328
  if (pInfo != NULL) {
×
4329
    destroyDynQueryCtrlOperator(pInfo);
×
4330
  }
UNCOV
4331
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
4332
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
4333
  pTaskInfo->code = code;
×
4334
  return code;
×
4335
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc