• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4940

27 Jan 2026 10:23AM UTC coverage: 66.832% (-0.1%) from 66.931%
#4940

push

travis-ci

web-flow
fix: asan invalid write issue (#34400)

7 of 8 new or added lines in 2 files covered. (87.5%)

822 existing lines in 141 files now uncovered.

204293 of 305680 relevant lines covered (66.83%)

124534723.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.75
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "nodes.h"
18
#include "operator.h"
19
#include "os.h"
20
#include "plannodes.h"
21
#include "query.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tarray.h"
25
#include "tcompare.h"
26
#include "tdatablock.h"
27
#include "thash.h"
28
#include "tmsg.h"
29
#include "trpc.h"
30
#include "ttypes.h"
31
#include "tdataformat.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1,144,720✔
41
  SStbJoinTableList* pNext = NULL;
1,144,720✔
42
  
43
  while (pListHead) {
1,145,201✔
44
    taosMemoryFree(pListHead->pLeftVg);
481✔
45
    taosMemoryFree(pListHead->pLeftUid);
481✔
46
    taosMemoryFree(pListHead->pRightVg);
481✔
47
    taosMemoryFree(pListHead->pRightUid);
481✔
48
    pNext = pListHead->pNext;
481✔
49
    taosMemoryFree(pListHead);
481✔
50
    pListHead = pNext;
481✔
51
  }
52
}
1,144,720✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1,144,720✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1,144,720✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1,144,720✔
60
    if (pStbJoin->ctx.prev.leftHash) {
1,143,646✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
1,074,678✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
1,074,678✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1,143,646✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
1,074,678✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
1,074,678✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
1,074✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
1,074✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
1,074✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
1,074✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
1,074✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
1,074✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1,144,720✔
81
}
1,144,720✔
82

83
void destroyColRefInfo(void *info) {
63,429,148✔
84
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
63,429,148✔
85
  if (pColRefInfo) {
63,429,148✔
86
    taosMemoryFree(pColRefInfo->colName);
63,429,148✔
87
    taosMemoryFree(pColRefInfo->colrefName);
63,429,148✔
88
  }
89
}
63,429,148✔
90

91
void destroyColRefArray(void *info) {
3,921,868✔
92
  SArray *pColRefArray = *(SArray **)info;
3,921,868✔
93
  if (pColRefArray) {
3,921,868✔
94
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
3,921,868✔
95
  }
96
}
3,921,868✔
97

98
void freeUseDbOutput(void* pOutput) {
1,135,941✔
99
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
1,135,941✔
100
  if (NULL == pOutput) {
1,135,941✔
101
    return;
×
102
  }
103

104
  if (pOut->dbVgroup) {
1,135,941✔
105
    freeVgInfo(pOut->dbVgroup);
1,135,941✔
106
  }
107
  taosMemFree(pOut);
1,135,941✔
108
}
109

110
void destroyOtbInfoArray(void *info) {
258,648✔
111
  SArray *pOtbInfoArray = *(SArray **)info;
258,648✔
112
  if (pOtbInfoArray) {
258,648✔
113
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
258,648✔
114
  }
115
}
258,648✔
116

117
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
36,352✔
118
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
36,352✔
119
  if (pOtbVgIdToOtbInfoArrayMap) {
36,352✔
120
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
36,352✔
121
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
36,352✔
122
  }
123
}
36,352✔
124

125
void destroyTagList(void *info) {
18,064✔
126
  SArray *pTagList = *(SArray **)info;
18,064✔
127
  if (pTagList) {
18,064✔
128
    taosArrayDestroyEx(pTagList, destroyTagVal);
18,064✔
129
  }
130
}
18,064✔
131

UNCOV
132
void destroyVtbUidTagListMap(void *info) {
×
UNCOV
133
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
×
UNCOV
134
  if (pVtbUidTagListMap) {
×
UNCOV
135
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
×
UNCOV
136
    taosHashCleanup(pVtbUidTagListMap);
×
137
  }
UNCOV
138
}
×
139

140
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
1,517,744✔
141
  if (pVtbScan->dbName) {
1,517,744✔
142
    taosMemoryFreeClear(pVtbScan->dbName);
1,517,744✔
143
  }
144
  if (pVtbScan->tbName) {
1,517,744✔
145
    taosMemoryFreeClear(pVtbScan->tbName);
1,517,744✔
146
  }
147
  if (pVtbScan->childTableList) {
1,517,744✔
148
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
1,517,744✔
149
  }
150
  if (pVtbScan->colRefInfo) {
1,517,744✔
UNCOV
151
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
UNCOV
152
    pVtbScan->colRefInfo = NULL;
×
153
  }
154
  if (pVtbScan->childTableMap) {
1,517,744✔
155
    taosHashCleanup(pVtbScan->childTableMap);
1,196,629✔
156
  }
157
  if (pVtbScan->readColList) {
1,517,744✔
158
    taosArrayDestroy(pVtbScan->readColList);
1,517,744✔
159
  }
160
  if (pVtbScan->dbVgInfoMap) {
1,517,744✔
161
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
1,517,744✔
162
    taosHashCleanup(pVtbScan->dbVgInfoMap);
1,517,744✔
163
  }
164
  if (pVtbScan->otbNameToOtbInfoMap) {
1,517,744✔
165
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
115,191✔
166
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
115,191✔
167
  }
168
  if (pVtbScan->pRsp) {
1,517,744✔
169
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
170
    taosMemoryFreeClear(pVtbScan->pRsp);
×
171
  }
172
  if (pVtbScan->existOrgTbVg) {
1,517,744✔
173
    taosHashCleanup(pVtbScan->existOrgTbVg);
1,517,744✔
174
  }
175
  if (pVtbScan->curOrgTbVg) {
1,517,744✔
176
    taosHashCleanup(pVtbScan->curOrgTbVg);
1,720✔
177
  }
178
  if (pVtbScan->newAddedVgInfo) {
1,517,744✔
179
    taosHashCleanup(pVtbScan->newAddedVgInfo);
856✔
180
  }
181
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
1,517,744✔
182
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
105,489✔
183
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
105,489✔
184
  }
185
  if (pVtbScan->vtbUidToVgIdMapMap) {
1,517,744✔
186
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,516✔
187
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
4,516✔
188
  }
189
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
1,517,744✔
190
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,572✔
191
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
4,572✔
192
  }
193
  if (pVtbScan->vtbUidTagListMap) {
1,517,744✔
194
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
4,516✔
195
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
4,516✔
196
  }
197
  if (pVtbScan->vtbGroupIdTagListMap) {
1,517,744✔
198
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
4,572✔
199
  }
200
  if (pVtbScan->vtbUidToGroupIdMap) {
1,517,744✔
201
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
4,572✔
202
  }
203
}
1,517,744✔
204

205
void destroyWinArray(void *info) {
935,620✔
206
  SArray *pWinArray = *(SArray **)info;
935,620✔
207
  if (pWinArray) {
935,620✔
208
    taosArrayDestroy(pWinArray);
935,620✔
209
  }
210
}
935,620✔
211

212
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
374,740✔
213
  if (pVtbWindow->pRes) {
374,740✔
214
    blockDataDestroy(pVtbWindow->pRes);
374,740✔
215
  }
216
  if (pVtbWindow->pWins) {
374,740✔
217
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
374,740✔
218
  }
219
}
374,740✔
220

221
static void destroyDynQueryCtrlOperator(void* param) {
2,661,729✔
222
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
2,661,729✔
223

224
  switch (pDyn->qType) {
2,661,729✔
225
    case DYN_QTYPE_STB_HASH:
1,143,985✔
226
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,143,985✔
227
      break;
1,143,985✔
228
    case DYN_QTYPE_VTB_WINDOW:
374,740✔
229
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
374,740✔
230
    case DYN_QTYPE_VTB_AGG:
1,517,744✔
231
    case DYN_QTYPE_VTB_SCAN:
232
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
1,517,744✔
233
      break;
1,517,744✔
234
    default:
×
235
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
236
      break;
×
237
  }
238

239
  taosMemoryFreeClear(param);
2,661,729✔
240
}
2,661,729✔
241

242
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
243
  if (batchFetch) {
7,165,006✔
244
    return true;
7,160,710✔
245
  }
246
  
247
  if (rightTable) {
4,296✔
248
    return pPost->rightCurrUid == pPost->rightNextUid;
2,148✔
249
  }
250

251
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
2,148✔
252

253
  return (NULL == num) ? false : true;
2,148✔
254
}
255

256
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
3,582,503✔
257
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,582,503✔
258
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,582,503✔
259
  SStbJoinTableList*         pNode = pPrev->pListHead;
3,582,503✔
260
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
3,582,503✔
261
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
3,582,503✔
262
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
3,582,503✔
263
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
3,582,503✔
264
  int64_t                    readIdx = pNode->readIdx + 1;
3,582,503✔
265
  int64_t                    rightPrevUid = pPost->rightCurrUid;
3,582,503✔
266

267
  pPost->leftCurrUid = *leftUid;
3,582,503✔
268
  pPost->rightCurrUid = *rightUid;
3,582,503✔
269

270
  pPost->leftVgId = *leftVgId;
3,582,503✔
271
  pPost->rightVgId = *rightVgId;
3,582,503✔
272

273
  while (true) {
274
    if (readIdx < pNode->uidNum) {
3,582,503✔
275
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
3,512,942✔
276
      break;
3,512,942✔
277
    }
278
    
279
    pNode = pNode->pNext;
69,561✔
280
    if (NULL == pNode) {
69,561✔
281
      pPost->rightNextUid = 0;
69,561✔
282
      break;
69,561✔
283
    }
284
    
285
    rightUid = pNode->pRightUid;
×
286
    readIdx = 0;
×
287
  }
288

289
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
7,165,006✔
290
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
7,165,006✔
291

292
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
3,582,503✔
293
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
294
    pStbJoin->execInfo.rightCacheNum++;
×
295
  }  
296

297
  return TSDB_CODE_SUCCESS;
3,582,503✔
298
}
299

300
static int32_t copyOrgTbInfo(SOrgTbInfo* pSrc, SOrgTbInfo** ppDst) {
5,493,561✔
301
  int32_t     code = TSDB_CODE_SUCCESS;
5,493,561✔
302
  int32_t     lino = 0;
5,493,561✔
303
  SOrgTbInfo* pTbInfo = NULL;
5,493,561✔
304

305
  qDebug("start to copy org table info, vgId:%d, tbName:%s", pSrc->vgId, pSrc->tbName);
5,493,561✔
306

307
  pTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
5,493,561✔
308
  QUERY_CHECK_NULL(pTbInfo, code, lino, _return, terrno)
5,493,561✔
309

310
  pTbInfo->vgId = pSrc->vgId;
5,493,561✔
311
  tstrncpy(pTbInfo->tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
5,493,561✔
312

313
  pTbInfo->colMap = taosArrayDup(pSrc->colMap, NULL);
5,493,561✔
314
  QUERY_CHECK_NULL(pTbInfo->colMap, code, lino, _return, terrno)
5,493,561✔
315

316
  *ppDst = pTbInfo;
5,493,561✔
317

318
  return code;
5,493,561✔
319
_return:
×
320
  qError("failed to copy org table info, code:%d, line:%d", code, lino);
×
321
  if (pTbInfo) {
×
322
    if (pTbInfo->colMap) {
×
323
      taosArrayDestroy(pTbInfo->colMap);
×
324
    }
325
    taosMemoryFreeClear(pTbInfo);
×
326
  }
327
  return code;
×
328
}
329

330
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
27,096✔
331
  int32_t  code = TSDB_CODE_SUCCESS;
27,096✔
332
  int32_t  lino = 0;
27,096✔
333
  STagVal  tmpTag;
27,096✔
334

335
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
27,096✔
336
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
27,096✔
337

338
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
54,192✔
339
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
27,096✔
340
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
27,096✔
341
    tmpTag.type = pSrcTag->type;
27,096✔
342
    tmpTag.cid = pSrcTag->cid;
27,096✔
343
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
27,096✔
UNCOV
344
      tmpTag.nData = pSrcTag->nData;
×
UNCOV
345
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
×
UNCOV
346
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
×
UNCOV
347
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
×
348
    } else {
349
      tmpTag.i64 = pSrcTag->i64;
27,096✔
350
    }
351

352
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
54,192✔
353
    tmpTag = (STagVal){0};
27,096✔
354
  }
355

356
  return code;
27,096✔
357
_return:
×
358
  if (pBasic->tagList) {
×
359
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
360
    pBasic->tagList = NULL;
×
361
  }
362
  if (tmpTag.pData) {
×
363
    taosMemoryFree(tmpTag.pData);
×
364
  }
365
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
366
  return code;
×
367
}
368

369
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
272,268✔
370
  int32_t     code = TSDB_CODE_SUCCESS;
272,268✔
371
  int32_t     lino = 0;
272,268✔
372
  SOrgTbInfo  batchInfo;
272,268✔
373

374
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
272,268✔
375
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
272,268✔
376

377
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
1,083,571✔
378
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
811,303✔
379
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
811,303✔
380
    batchInfo.vgId = pSrc->vgId;
811,303✔
381
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
811,303✔
382
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
811,303✔
383
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
811,303✔
384
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
1,622,606✔
385
    batchInfo = (SOrgTbInfo){0};
811,303✔
386
  }
387

388
  return code;
272,268✔
389
_return:
×
390
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
391
  if (pBasic->batchOrgTbInfo) {
×
392
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
393
    pBasic->batchOrgTbInfo = NULL;
×
394
  }
395
  if (batchInfo.colMap) {
×
396
    taosArrayDestroy(batchInfo.colMap);
×
397
    batchInfo.colMap = NULL;
×
398
  }
399
  return code;
×
400
}
401

402
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
7,165,006✔
403
  int32_t code = TSDB_CODE_SUCCESS;
7,165,006✔
404
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
7,165,006✔
405
  if (NULL == *ppRes) {
7,165,006✔
406
    code = terrno;
×
407
    freeOperatorParam(pChild, OP_GET_PARAM);
×
408
    return code;
×
409
  }
410
  if (pChild) {
7,165,006✔
411
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
142,232✔
412
    if (NULL == (*ppRes)->pChildren) {
142,232✔
413
      code = terrno;
×
414
      freeOperatorParam(pChild, OP_GET_PARAM);
×
415
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
416
      *ppRes = NULL;
×
417
      return code;
×
418
    }
419
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
284,464✔
420
      code = terrno;
×
421
      freeOperatorParam(pChild, OP_GET_PARAM);
×
422
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
423
      *ppRes = NULL;
×
424
      return code;
×
425
    }
426
  } else {
427
    (*ppRes)->pChildren = NULL;
7,022,774✔
428
  }
429

430
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
7,165,006✔
431
  if (NULL == pGc) {
7,165,006✔
432
    code = terrno;
×
433
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
434
    *ppRes = NULL;
×
435
    return code;
×
436
  }
437

438
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
7,165,006✔
439
  pGc->downstreamIdx = downstreamIdx;
7,165,006✔
440
  pGc->vgId = vgId;
7,165,006✔
441
  pGc->tbUid = tbUid;
7,165,006✔
442
  pGc->needCache = needCache;
7,165,006✔
443

444
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
7,165,006✔
445
  (*ppRes)->downstreamIdx = downstreamIdx;
7,165,006✔
446
  (*ppRes)->value = pGc;
7,165,006✔
447
  (*ppRes)->reUse = false;
7,165,006✔
448

449
  return TSDB_CODE_SUCCESS;
7,165,006✔
450
}
451

452

453
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
454
  int32_t code = TSDB_CODE_SUCCESS;
×
455
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
456
  if (NULL == *ppRes) {
×
457
    return terrno;
×
458
  }
459
  (*ppRes)->pChildren = NULL;
×
460

461
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
462
  if (NULL == pGc) {
×
463
    code = terrno;
×
464
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
465
    return code;
×
466
  }
467

468
  pGc->downstreamIdx = downstreamIdx;
×
469
  pGc->vgId = vgId;
×
470
  pGc->tbUid = tbUid;
×
471

472
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
473
  (*ppRes)->downstreamIdx = downstreamIdx;
×
474
  (*ppRes)->value = pGc;
×
475
  (*ppRes)->reUse = false;
×
476

477
  return TSDB_CODE_SUCCESS;
×
478
}
479

480
static int32_t buildExchangeOperatorBasicParam(SExchangeOperatorBasicParam* pBasic, ENodeType srcOpType,
10,320,741✔
481
                                               EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
482
                                               SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
483
                                               SArray* pOrgTbInfoArray, STimeWindow window,
484
                                               SDownstreamSourceNode* pDownstreamSourceNode,
485
                                               bool tableSeq, bool isNewParam, bool isNewDeployed) {
486
  int32_t code = TSDB_CODE_SUCCESS;
10,320,741✔
487
  int32_t lino = 0;
10,320,741✔
488

489
  qDebug("buildExchangeOperatorBasicParam, srcOpType:%d, exchangeType:%d, vgId:%d, groupId:%" PRIu64 ", tableSeq:%d, "
10,320,741✔
490
         "isNewParam:%d, isNewDeployed:%d", srcOpType, exchangeType, vgId, groupId, tableSeq, isNewParam, isNewDeployed);
491

492
  pBasic->paramType = DYN_TYPE_EXCHANGE_PARAM;
10,320,741✔
493
  pBasic->srcOpType = srcOpType;
10,320,741✔
494
  pBasic->vgId = vgId;
10,320,741✔
495
  pBasic->groupid = groupId;
10,320,741✔
496
  pBasic->window = window;
10,320,741✔
497
  pBasic->tableSeq = tableSeq;
10,320,741✔
498
  pBasic->type = exchangeType;
10,320,741✔
499
  pBasic->isNewParam = isNewParam;
10,320,741✔
500

501
  if (pDownstreamSourceNode) {
10,320,741✔
502
    pBasic->isNewDeployed = true;
2,354✔
503
    pBasic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,354✔
504
    pBasic->newDeployedSrc.clientId = pDownstreamSourceNode->clientId;// current task's taskid
2,354✔
505
    pBasic->newDeployedSrc.taskId = pDownstreamSourceNode->taskId;
2,354✔
506
    pBasic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
2,354✔
507
    pBasic->newDeployedSrc.localExec = false;
2,354✔
508
    pBasic->newDeployedSrc.addr.nodeId = pDownstreamSourceNode->addr.nodeId;
2,354✔
509
    memcpy(&pBasic->newDeployedSrc.addr.epSet, &pDownstreamSourceNode->addr.epSet, sizeof(SEpSet));
2,354✔
510
  } else {
511
    pBasic->isNewDeployed = false;
10,318,387✔
512
    pBasic->newDeployedSrc = (SDownstreamSourceNode){0};
10,318,387✔
513
  }
514

515
  if (pUidList) {
10,320,741✔
516
    pBasic->uidList = taosArrayDup(pUidList, NULL);
3,861,042✔
517
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
3,861,042✔
518
  } else {
519
    pBasic->uidList = taosArrayInit(1, sizeof(int64_t));
6,459,699✔
520
    QUERY_CHECK_NULL(pBasic->uidList, code, lino, _return, terrno)
6,459,699✔
521
  }
522

523
  if (pOrgTbInfo) {
10,320,741✔
524
    code = copyOrgTbInfo(pOrgTbInfo, &pBasic->orgTbInfo);
5,493,561✔
525
    QUERY_CHECK_CODE(code, lino, _return);
5,493,561✔
526
  } else {
527
    pBasic->orgTbInfo = NULL;
4,827,180✔
528
  }
529

530
  if (pTagList) {
10,320,741✔
531
    code = buildTagListForExchangeBasicParam(pBasic, pTagList);
27,096✔
532
    QUERY_CHECK_CODE(code, lino, _return);
27,096✔
533
  } else {
534
    pBasic->tagList = NULL;
10,293,645✔
535
  }
536

537
  if (pOrgTbInfoArray) {
10,320,741✔
538
    code = buildBatchOrgTbInfoForExchangeBasicParam(pBasic, pOrgTbInfoArray);
272,268✔
539
    QUERY_CHECK_CODE(code, lino, _return);
272,268✔
540
  } else {
541
    pBasic->batchOrgTbInfo = NULL;
10,048,473✔
542
  }
543
  return code;
10,320,741✔
544

545
_return:
×
546
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
547
  freeExchangeGetBasicOperatorParam(pBasic);
×
548
  return code;
×
549
}
550

551
static int32_t buildExchangeOperatorParamImpl(SOperatorParam** ppRes, int32_t downstreamIdx, ENodeType srcOpType,
9,823,388✔
552
                                              EExchangeSourceType exchangeType, int32_t vgId, uint64_t groupId,
553
                                              SArray* pUidList, SOrgTbInfo* pOrgTbInfo, SArray* pTagList,
554
                                              SArray* pOrgTbInfoArray, STimeWindow window,
555
                                              SDownstreamSourceNode* pDownstreamSourceNode,
556
                                              bool tableSeq, bool isNewParam, bool reUse, bool isNewDeployed) {
557

558
  int32_t                      code = TSDB_CODE_SUCCESS;
9,823,388✔
559
  int32_t                      lino = 0;
9,823,388✔
560
  SOperatorParam*              pParam = NULL;
9,823,388✔
561
  SExchangeOperatorParam*      pExc = NULL;
9,823,388✔
562

563
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
9,823,388✔
564
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
9,823,388✔
565

566
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
9,823,388✔
567
  pParam->downstreamIdx = downstreamIdx;
9,823,388✔
568
  pParam->reUse = reUse;
9,823,388✔
569
  pParam->pChildren = NULL;
9,823,388✔
570
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
9,823,388✔
571
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
9,823,388✔
572

573
  pExc = (SExchangeOperatorParam*)pParam->value;
9,823,388✔
574
  pExc->multiParams = false;
9,823,388✔
575

576
  code = buildExchangeOperatorBasicParam(&pExc->basic, srcOpType, exchangeType, vgId, groupId,
9,823,388✔
577
                                         pUidList, pOrgTbInfo, pTagList, pOrgTbInfoArray,
578
                                         window, pDownstreamSourceNode, tableSeq, isNewParam, isNewDeployed);
579

580
  *ppRes = pParam;
9,823,388✔
581
  return code;
9,823,388✔
582
_return:
×
583
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
584
  if (pParam) {
×
585
    freeOperatorParam(pParam, OP_GET_PARAM);
×
586
  }
587
  return code;
×
588
}
589

590
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
4,296✔
591
  int32_t code = TSDB_CODE_SUCCESS;
4,296✔
592
  int32_t lino = 0;
4,296✔
593

594
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
4,296✔
595
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
4,296✔
596

597
  QUERY_CHECK_NULL(taosArrayPush(pUidList, pUid), code, lino, _return, terrno);
4,296✔
598

599
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_STB_JOIN_SCAN,
4,296✔
600
                                        *pVgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, true, false, false, false);
4,296✔
601
  QUERY_CHECK_CODE(code, lino, _return);
4,296✔
602

603
_return:
4,296✔
604
  if (code) {
4,296✔
605
    qError("failed to build exchange operator param, code:%d", code);
×
606
  }
607
  taosArrayDestroy(pUidList);
4,296✔
608
  return code;
4,296✔
609
}
610

611
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, STimeWindow win) {
693,870✔
612
  int32_t                   code = TSDB_CODE_SUCCESS;
693,870✔
613
  int32_t                   lino = 0;
693,870✔
614

615
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VTB_WIN_SCAN,
693,870✔
616
                                        0, 0, NULL, NULL, NULL, NULL, win, NULL, true, true, true, false);
617
  QUERY_CHECK_CODE(code, lino, _return);
693,870✔
618

619
  return code;
693,870✔
620
_return:
×
621
  qError("failed to build exchange operator param for external window, code:%d", code);
×
622
  return code;
×
623
}
624

625
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
3,631,661✔
626
  int32_t                      code = TSDB_CODE_SUCCESS;
3,631,661✔
627
  int32_t                      lino = 0;
3,631,661✔
628
  SArray*                      pUidList = NULL;
3,631,661✔
629

630
  pUidList = taosArrayInit(1, sizeof(int64_t));
3,631,661✔
631
  QUERY_CHECK_NULL(pUidList, code, lino, _return, terrno)
3,631,661✔
632

633
  QUERY_CHECK_NULL(taosArrayPush(pUidList, &uid), code, lino, _return, terrno)
3,631,661✔
634

635
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, EX_SRC_TYPE_VSTB_TAG_SCAN,
3,631,661✔
636
                                        vgId, 0, pUidList, NULL, NULL, NULL, (STimeWindow){0}, NULL, false, false, true, false);
3,631,661✔
637
  QUERY_CHECK_CODE(code, lino, _return);
3,631,661✔
638

639
_return:
3,631,661✔
640
  if (code) {
3,631,661✔
641
    qError("failed to build exchange operator param for tag scan, code:%d", code);
×
642
  }
643
  taosArrayDestroy(pUidList);
3,631,661✔
644
  return code;
3,631,661✔
645
}
646

647
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pOrgTbInfo,
5,493,561✔
648
                                                  SDownstreamSourceNode* pNewSource) {
649
  int32_t                      code = TSDB_CODE_SUCCESS;
5,493,561✔
650
  int32_t                      lino = 0;
5,493,561✔
651

652
  code = buildExchangeOperatorParamImpl(ppRes, downstreamIdx, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, EX_SRC_TYPE_VSTB_SCAN,
5,493,561✔
653
                                        pOrgTbInfo->vgId, 0, NULL, pOrgTbInfo, NULL, NULL, (STimeWindow){0}, pNewSource, false, true, true, true);
5,493,561✔
654
  QUERY_CHECK_CODE(code, lino, _return);
5,493,561✔
655

656
  return code;
5,493,561✔
657
_return:
×
658
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
659
  return code;
×
660
}
661

662
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
126,750✔
663
  int32_t                       code = TSDB_CODE_SUCCESS;
126,750✔
664
  int32_t                       line = 0;
126,750✔
665
  SOperatorParam*               pParam = NULL;
126,750✔
666
  SExchangeOperatorBatchParam*  pExc = NULL;
126,750✔
667
  SExchangeOperatorBasicParam   basic = {0};
126,750✔
668

669
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
126,750✔
670
  QUERY_CHECK_NULL(pParam, code, line, _return, terrno);
126,750✔
671

672
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
126,750✔
673
  pParam->downstreamIdx = downstreamIdx;
126,750✔
674
  pParam->reUse = false;
126,750✔
675
  pParam->pChildren = NULL;
126,750✔
676
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
126,750✔
677
  QUERY_CHECK_NULL(pParam->value, code, line, _return, terrno);
126,750✔
678

679
  pExc = pParam->value;
126,750✔
680
  pExc->multiParams = true;
126,750✔
681
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
126,750✔
682
  QUERY_CHECK_NULL(pExc->pBatchs, code, line, _return, terrno)
126,750✔
683

684
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
126,750✔
685

686
  int32_t iter = 0;
126,750✔
687
  void*   p = NULL;
126,750✔
688
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
351,835✔
689
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
225,085✔
690
    SArray*  pUidList = *(SArray**)p;
225,085✔
691

692
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
225,085✔
693
                                           EX_SRC_TYPE_STB_JOIN_SCAN, *pVgId, 0,
694
                                           pUidList, NULL, NULL, NULL,
695
                                           (STimeWindow){0}, NULL, false, false, false);
225,085✔
696
    QUERY_CHECK_CODE(code, line, _return);
225,085✔
697

698
    // already transferred to batch param, can free here
699
    taosArrayDestroy(pUidList);
225,085✔
700

701
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));
225,085✔
702

703
    basic = (SExchangeOperatorBasicParam){0};
225,085✔
704
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
225,085✔
705
    *(SArray**)p = NULL;
225,085✔
706
  }
707
  *ppRes = pParam;
126,750✔
708

709
  return code;
126,750✔
710
  
711
_return:
×
712
  qError("failed to build batch exchange operator param, code:%d", code);
×
713
  freeOperatorParam(pParam, OP_GET_PARAM);
×
714
  freeExchangeGetBasicOperatorParam(&basic);
×
715
  return code;
×
716
}
717

718
static int32_t buildBatchExchangeOperatorParamForVirtual(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps, STimeWindow window, EExchangeSourceType type) {
153,223✔
719
  int32_t                       code = TSDB_CODE_SUCCESS;
153,223✔
720
  int32_t                       lino = 0;
153,223✔
721
  SOperatorParam*               pParam = NULL;
153,223✔
722
  SExchangeOperatorBatchParam*  pExc = NULL;
153,223✔
723
  SExchangeOperatorBasicParam   basic = {0};
153,223✔
724

725
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
153,223✔
726
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
153,223✔
727

728
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
153,223✔
729
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
153,223✔
730

731
  pExc = pParam->value;
153,223✔
732
  pExc->multiParams = true;
153,223✔
733

734
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
153,223✔
735
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
153,223✔
736
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
153,223✔
737

738
  size_t keyLen = 0;
153,223✔
739
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
153,223✔
740
  while (pIter != NULL) {
425,491✔
741
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
272,268✔
742
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
272,268✔
743

744
    code = buildExchangeOperatorBasicParam(&basic, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
272,268✔
745
                                           type, *vgId, groupid,
746
                                           NULL, NULL, pTagList, pOrgTbInfoArray,
747
                                           window, NULL, false, true, false);
748
    QUERY_CHECK_CODE(code, lino, _return);
272,268✔
749

750
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
272,268✔
751
    QUERY_CHECK_CODE(code, lino, _return);
272,268✔
752

753
    basic = (SExchangeOperatorBasicParam){0};
272,268✔
754
    pIter = taosHashIterate(pBatchMaps, pIter);
272,268✔
755
  }
756

757
  pParam->pChildren = NULL;
153,223✔
758
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
153,223✔
759
  pParam->downstreamIdx = downstreamIdx;
153,223✔
760
  pParam->reUse = false;
153,223✔
761

762
  *ppRes = pParam;
153,223✔
763
  return code;
153,223✔
764

765
_return:
×
766
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
767
  freeOperatorParam(pParam, OP_GET_PARAM);
×
768
  freeExchangeGetBasicOperatorParam(&basic);
×
769
  return code;
×
770
}
771

772
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
3,582,503✔
773
  int32_t code = TSDB_CODE_SUCCESS;
3,582,503✔
774
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,582,503✔
775
  if (NULL == *ppRes) {
3,582,503✔
776
    code = terrno;
×
777
    return code;
×
778
  }
779
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
3,582,503✔
780
  if (NULL == (*ppRes)->pChildren) {
3,582,503✔
781
    code = terrno;
×
782
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
783
    *ppRes = NULL;
×
784
    return code;
×
785
  }
786
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
7,165,006✔
787
    code = terrno;
×
788
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
789
    *ppRes = NULL;
×
790
    return code;
×
791
  }
792
  *ppChild0 = NULL;
3,582,503✔
793
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
7,165,006✔
794
    code = terrno;
×
795
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
796
    *ppRes = NULL;
×
797
    return code;
×
798
  }
799
  *ppChild1 = NULL;
3,582,503✔
800
  
801
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
3,582,503✔
802
  if (NULL == pJoin) {
3,582,503✔
803
    code = terrno;
×
804
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
805
    *ppRes = NULL;
×
806
    return code;
×
807
  }
808

809
  pJoin->initDownstream = initParam;
3,582,503✔
810
  
811
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
3,582,503✔
812
  (*ppRes)->value = pJoin;
3,582,503✔
813
  (*ppRes)->reUse = false;
3,582,503✔
814

815
  return TSDB_CODE_SUCCESS;
3,582,503✔
816
}
817

818
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
819
  int32_t code = TSDB_CODE_SUCCESS;
×
820
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
821
  if (NULL == *ppRes) {
×
822
    code = terrno;
×
823
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
824
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
825
    return code;
×
826
  }
827
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
828
  if (NULL == *ppRes) {
×
829
    code = terrno;
×
830
    taosMemoryFreeClear(*ppRes);
×
831
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
832
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
833
    return code;
×
834
  }
835
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
836
    code = terrno;
×
837
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
838
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
839
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
840
    *ppRes = NULL;
×
841
    return code;
×
842
  }
843
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
844
    code = terrno;
×
845
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
846
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
847
    *ppRes = NULL;
×
848
    return code;
×
849
  }
850
  
851
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
852
  (*ppRes)->value = NULL;
×
853
  (*ppRes)->reUse = false;
×
854

855
  return TSDB_CODE_SUCCESS;
×
856
}
857

858
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
11,186✔
859
  int32_t code = TSDB_CODE_SUCCESS;
11,186✔
860
  int32_t vgNum = tSimpleHashGetSize(pVg);
11,186✔
861
  if (vgNum <= 0 || vgNum > 1) {
11,186✔
862
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
863
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
864
  }
865

866
  int32_t iter = 0;
11,186✔
867
  void* p = NULL;
11,186✔
868
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
22,372✔
869
    SArray* pUidList = *(SArray**)p;
11,186✔
870

871
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
11,186✔
872
    if (code) {
11,186✔
873
      return code;
×
874
    }
875
    taosArrayDestroy(pUidList);
11,186✔
876
    *(SArray**)p = NULL;
11,186✔
877
  }
878
  
879
  return TSDB_CODE_SUCCESS;
11,186✔
880
}
881

882
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
883
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
884
  if (NULL == pUidList) {
×
885
    return terrno;
×
886
  }
887
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
888
    return terrno;
×
889
  }
890

891
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
892
  taosArrayDestroy(pUidList);
×
893
  if (code) {
×
894
    return code;
×
895
  }
896
  
897
  return TSDB_CODE_SUCCESS;
×
898
}
899

900
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
3,582,503✔
901
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
3,582,503✔
902
  SOperatorParam*             pSrcParam0 = NULL;
3,582,503✔
903
  SOperatorParam*             pSrcParam1 = NULL;
3,582,503✔
904
  SOperatorParam*             pGcParam0 = NULL;
3,582,503✔
905
  SOperatorParam*             pGcParam1 = NULL;  
3,582,503✔
906
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
3,582,503✔
907
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
3,582,503✔
908
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
3,582,503✔
909
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
3,582,503✔
910
  int32_t                     code = TSDB_CODE_SUCCESS;
3,582,503✔
911

912
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
3,582,503✔
913
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
914

915
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
3,582,503✔
916
  
917
  if (pInfo->stbJoin.basic.batchFetch) {
3,582,503✔
918
    if (pPrev->leftHash) {
3,580,355✔
919
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
68,968✔
920
      if (TSDB_CODE_SUCCESS == code) {
68,968✔
921
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
68,968✔
922
      }
923
      if (TSDB_CODE_SUCCESS == code) {
68,968✔
924
        tSimpleHashCleanup(pPrev->leftHash);
68,968✔
925
        tSimpleHashCleanup(pPrev->rightHash);
68,968✔
926
        pPrev->leftHash = NULL;
68,968✔
927
        pPrev->rightHash = NULL;
68,968✔
928
      }
929
    }
930
  } else {
931
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
2,148✔
932
    if (TSDB_CODE_SUCCESS == code) {
2,148✔
933
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
2,148✔
934
    }
935
  }
936

937
  bool initParam = pSrcParam0 ? true : false;
3,582,503✔
938
  if (TSDB_CODE_SUCCESS == code) {
3,582,503✔
939
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
3,582,503✔
940
    pSrcParam0 = NULL;
3,582,503✔
941
  }
942
  if (TSDB_CODE_SUCCESS == code) {
3,582,503✔
943
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
3,582,503✔
944
    pSrcParam1 = NULL;
3,582,503✔
945
  }
946
  if (TSDB_CODE_SUCCESS == code) {
3,582,503✔
947
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
3,582,503✔
948
  }
949
  if (TSDB_CODE_SUCCESS != code) {
3,582,503✔
950
    if (pSrcParam0) {
×
951
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
952
    }
953
    if (pSrcParam1) {
×
954
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
955
    }
956
    if (pGcParam0) {
×
957
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
958
    }
959
    if (pGcParam1) {
×
960
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
961
    }
962
    if (*ppParam) {
×
963
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
964
      *ppParam = NULL;
×
965
    }
966
  }
967
  
968
  return code;
3,582,503✔
969
}
970

971
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
3,631,661✔
972
  int32_t                   code = TSDB_CODE_SUCCESS;
3,631,661✔
973
  int32_t                   lino = 0;
3,631,661✔
974
  SVTableScanOperatorParam* pVScan = NULL;
3,631,661✔
975
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,631,661✔
976
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
3,631,661✔
977

978
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
3,631,661✔
979
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
3,631,661✔
980

981
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
3,631,661✔
982
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
3,631,661✔
983
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
3,631,661✔
984
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
3,631,661✔
985
  pVScan->uid = uid;
3,631,661✔
986
  pVScan->window = pInfo->vtbScan.window;
3,631,661✔
987

988
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
3,631,661✔
989
  (*ppRes)->downstreamIdx = 0;
3,631,661✔
990
  (*ppRes)->value = pVScan;
3,631,661✔
991
  (*ppRes)->reUse = false;
3,631,661✔
992

993
  return TSDB_CODE_SUCCESS;
3,631,661✔
994
_return:
×
995
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
996
  if (pVScan) {
×
997
    taosArrayDestroy(pVScan->pOpParamArray);
×
998
    taosMemoryFreeClear(pVScan);
×
999
  }
1000
  if (*ppRes) {
×
1001
    taosArrayDestroy((*ppRes)->pChildren);
×
1002
    taosMemoryFreeClear(*ppRes);
×
1003
  }
1004
  return code;
×
1005
}
1006

1007
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
693,870✔
1008
  int32_t                       code = TSDB_CODE_SUCCESS;
693,870✔
1009
  int32_t                       lino = 0;
693,870✔
1010
  SExternalWindowOperatorParam* pExtWinOp = NULL;
693,870✔
1011

1012
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
693,870✔
1013
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
693,870✔
1014

1015
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
693,870✔
1016
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
693,870✔
1017

1018
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
693,870✔
1019
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
693,870✔
1020

1021
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
693,870✔
1022
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
693,870✔
1023

1024
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
693,870✔
1025
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
693,870✔
1026

1027
  SOperatorParam* pExchangeOperator = NULL;
693,870✔
1028
  STimeWindow     twin = {.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey};
693,870✔
1029
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, twin);
693,870✔
1030
  QUERY_CHECK_CODE(code, lino, _return);
693,870✔
1031
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
1,387,740✔
1032

1033
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
693,870✔
1034
  (*ppRes)->downstreamIdx = idx;
693,870✔
1035
  (*ppRes)->value = pExtWinOp;
693,870✔
1036
  (*ppRes)->reUse = false;
693,870✔
1037

1038
  return code;
693,870✔
1039
_return:
×
1040
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1041
  if (pExtWinOp) {
×
1042
    if (pExtWinOp->ExtWins) {
×
1043
      taosArrayDestroy(pExtWinOp->ExtWins);
×
1044
    }
1045
    taosMemoryFree(pExtWinOp);
×
1046
  }
1047
  if (*ppRes) {
×
1048
    if ((*ppRes)->pChildren) {
×
1049
      taosArrayDestroy((*ppRes)->pChildren);
×
1050
    }
1051
    taosMemoryFree(*ppRes);
×
1052
    *ppRes = NULL;
×
1053
  }
1054
  return code;
×
1055
}
1056

1057
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
288,540✔
1058
                                       int32_t numOfDownstream, int32_t numOfWins) {
1059
  int32_t                   code = TSDB_CODE_SUCCESS;
288,540✔
1060
  int32_t                   lino = 0;
288,540✔
1061
  SMergeOperatorParam*      pMergeOp = NULL;
288,540✔
1062

1063
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
288,540✔
1064
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
288,540✔
1065

1066
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
288,540✔
1067
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
288,540✔
1068

1069
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
288,540✔
1070
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
288,540✔
1071

1072
  pMergeOp->winNum = numOfWins;
288,540✔
1073

1074
  for (int32_t i = 0; i < numOfDownstream; i++) {
982,410✔
1075
    SOperatorParam* pExternalWinParam = NULL;
693,870✔
1076
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
693,870✔
1077
    QUERY_CHECK_CODE(code, lino, _return);
693,870✔
1078
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
1,387,740✔
1079
  }
1080

1081
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
288,540✔
1082
  (*ppRes)->downstreamIdx = 0;
288,540✔
1083
  (*ppRes)->value = pMergeOp;
288,540✔
1084
  (*ppRes)->reUse = false;
288,540✔
1085

1086
  return TSDB_CODE_SUCCESS;
288,540✔
1087
_return:
×
1088
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1089
  if (pMergeOp) {
×
1090
    taosMemoryFree(pMergeOp);
×
1091
  }
1092
  if (*ppRes) {
×
1093
    if ((*ppRes)->pChildren) {
×
1094
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
1095
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
1096
        if (pChildParam) {
×
1097
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
1098
          if (pExtWinOp) {
×
1099
            if (pExtWinOp->ExtWins) {
×
1100
              taosArrayDestroy(pExtWinOp->ExtWins);
×
1101
            }
1102
            taosMemoryFree(pExtWinOp);
×
1103
          }
1104
          taosMemoryFree(pChildParam);
×
1105
        }
1106
      }
1107
      taosArrayDestroy((*ppRes)->pChildren);
×
1108
    }
1109
    taosMemoryFree(*ppRes);
×
1110
    *ppRes = NULL;
×
1111
  }
1112
  return code;
×
1113
}
1114

1115
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
12,419✔
1116
  int32_t                   code = TSDB_CODE_SUCCESS;
12,419✔
1117
  int32_t                   lino = 0;
12,419✔
1118
  SOperatorParam*           pParam = NULL;
12,419✔
1119
  SOperatorParam*           pExchangeParam = NULL;
12,419✔
1120
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
12,419✔
1121
  bool                      freeExchange = false;
12,419✔
1122

1123
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
12,419✔
1124
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
12,419✔
1125

1126
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
12,419✔
1127
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
12,419✔
1128

1129
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
12,419✔
1130
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
12,419✔
1131

1132
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
12,419✔
1133
  QUERY_CHECK_CODE(code, lino, _return);
12,419✔
1134

1135
  freeExchange = true;
12,419✔
1136

1137
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
24,838✔
1138

1139
  freeExchange = false;
12,419✔
1140

1141
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
12,419✔
1142
  pParam->downstreamIdx = 0;
12,419✔
1143
  pParam->reUse = false;
12,419✔
1144

1145
  *ppRes = pParam;
12,419✔
1146

1147
  return code;
12,419✔
1148
_return:
×
1149
  if (freeExchange) {
×
1150
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1151
  }
1152
  if (pParam) {
×
1153
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1154
  }
1155
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1156
  return code;
×
1157
}
1158

1159
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
22,860✔
1160
  int32_t                   code = TSDB_CODE_SUCCESS;
22,860✔
1161
  int32_t                   lino = 0;
22,860✔
1162
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
22,860✔
1163
  SOperatorParam*           pParam = NULL;
22,860✔
1164
  SOperatorParam*           pExchangeParam = NULL;
22,860✔
1165
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
22,860✔
1166
  bool                      freeExchange = false;
22,860✔
1167
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
22,860✔
1168

1169
  if (!pIter) {
22,860✔
UNCOV
1170
    *ppRes = NULL;
×
UNCOV
1171
    return code;
×
1172
  }
1173

1174
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
22,860✔
1175

1176
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
22,860✔
1177
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
22,860✔
1178

1179
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
22,860✔
1180
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
22,860✔
1181

1182
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
22,860✔
1183
  QUERY_CHECK_CODE(code, lino, _return);
22,860✔
1184

1185
  freeExchange = true;
22,860✔
1186

1187
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
45,720✔
1188

1189
  freeExchange = false;
22,860✔
1190

1191
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
22,860✔
1192
  pParam->downstreamIdx = 0;
22,860✔
1193
  pParam->value = NULL;
22,860✔
1194
  pParam->reUse = false;
22,860✔
1195

1196
  *ppRes = pParam;
22,860✔
1197

1198
  return code;
22,860✔
1199
_return:
×
1200
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1201
  if (freeExchange) {
×
1202
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
1203
  }
1204
  if (pParam) {
×
1205
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1206
  }
1207
  return code;
×
1208
}
1209

1210
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
18,064✔
1211
  int32_t                   code = TSDB_CODE_SUCCESS;
18,064✔
1212
  int32_t                   lino = 0;
18,064✔
1213
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
18,064✔
1214
  SOperatorParam*           pParam = NULL;
18,064✔
1215
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
18,064✔
1216
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
18,064✔
1217

1218
  if (pIter) {
18,064✔
1219
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
18,064✔
1220

1221
    code = buildBatchExchangeOperatorParamForVirtual(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MIN}, EX_SRC_TYPE_VSTB_AGG_SCAN);
18,064✔
1222
    QUERY_CHECK_CODE(code, lino, _return);
18,064✔
1223

1224
    *ppRes = pParam;
18,064✔
1225
  } else {
UNCOV
1226
    *ppRes = NULL;
×
1227
  }
1228

1229
  return code;
18,064✔
1230
_return:
×
1231
  if (pParam) {
×
1232
    freeOperatorParam(pParam, OP_GET_PARAM);
×
1233
  }
1234
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1235
  return code;
×
1236
}
1237

1238
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,582,503✔
1239
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,582,503✔
1240
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3,582,503✔
1241
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3,582,503✔
1242
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
3,582,503✔
1243
  SOperatorParam*            pParam = NULL;
3,582,503✔
1244
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
3,582,503✔
1245
  if (TSDB_CODE_SUCCESS != code) {
3,582,503✔
1246
    pOperator->pTaskInfo->code = code;
×
1247
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1248
  }
1249

1250
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
3,582,503✔
1251
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
3,582,503✔
1252
  if (*ppRes && (code == 0)) {
3,582,503✔
1253
    code = blockDataCheck(*ppRes);
205,120✔
1254
    if (code) {
205,120✔
1255
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
1256
      pOperator->pTaskInfo->code = code;
×
1257
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1258
    }
1259
    pPost->isStarted = true;
205,120✔
1260
    pStbJoin->execInfo.postBlkNum++;
205,120✔
1261
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
205,120✔
1262
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
205,120✔
1263
  } else {
1264
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
3,377,383✔
1265
  }
1266
}
3,582,503✔
1267

1268

1269
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
1270
  SOperatorParam* pGcParam = NULL;
×
1271
  SOperatorParam* pMergeJoinParam = NULL;
×
1272
  int32_t         downstreamId = leftTable ? 0 : 1;
×
1273
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1274
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1275

1276
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1277

1278
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
1279
  if (TSDB_CODE_SUCCESS != code) {
×
1280
    return code;
×
1281
  }
1282
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
1283
  if (TSDB_CODE_SUCCESS != code) {
×
1284
    return code;
×
1285
  }
1286

1287
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1288
}
1289

1290
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
3,582,022✔
1291
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
3,582,022✔
1292
  int32_t code = 0;
3,582,022✔
1293
  
1294
  pPost->isStarted = false;
3,582,022✔
1295
  
1296
  if (pStbJoin->basic.batchFetch) {
3,582,022✔
1297
    return TSDB_CODE_SUCCESS;
3,579,874✔
1298
  }
1299
  
1300
  if (pPost->leftNeedCache) {
2,148✔
1301
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1302
    if (num && --(*num) <= 0) {
×
1303
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1304
      if (code) {
×
1305
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
1306
        QRY_ERR_RET(code);
×
1307
      }
1308
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1309
    }
1310
  }
1311
  
1312
  if (!pPost->rightNeedCache) {
2,148✔
1313
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
2,148✔
1314
    if (NULL != v) {
2,148✔
1315
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
1316
      if (code) {
×
1317
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1318
        QRY_ERR_RET(code);
×
1319
      }
1320
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1321
    }
1322
  }
1323

1324
  return TSDB_CODE_SUCCESS;
2,148✔
1325
}
1326

1327

1328
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1329
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
274,681✔
1330
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
274,681✔
1331
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
274,681✔
1332

1333
  if (!pPost->isStarted) {
274,681✔
1334
    return TSDB_CODE_SUCCESS;
70,042✔
1335
  }
1336
  
1337
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
204,639✔
1338
  
1339
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
204,639✔
1340
  if (NULL == *ppRes) {
204,639✔
1341
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
204,639✔
1342
    pPrev->pListHead->readIdx++;
204,639✔
1343
  } else {
1344
    pInfo->stbJoin.execInfo.postBlkNum++;
×
1345
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1346
  }
1347

1348
  return TSDB_CODE_SUCCESS;
204,639✔
1349
}
1350

1351
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1352
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
7,164,558✔
1353
  if (NULL == ppArray) {
7,164,558✔
1354
    SArray* pArray = taosArrayInit(10, valSize);
236,271✔
1355
    if (NULL == pArray) {
236,271✔
1356
      return terrno;
×
1357
    }
1358
    if (NULL == taosArrayPush(pArray, pVal)) {
472,542✔
1359
      taosArrayDestroy(pArray);
×
1360
      return terrno;
×
1361
    }
1362
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
236,271✔
1363
      taosArrayDestroy(pArray);      
×
1364
      return terrno;
×
1365
    }
1366
    return TSDB_CODE_SUCCESS;
236,271✔
1367
  }
1368

1369
  if (NULL == taosArrayPush(*ppArray, pVal)) {
13,856,574✔
1370
    return terrno;
×
1371
  }
1372
  
1373
  return TSDB_CODE_SUCCESS;
6,928,287✔
1374
}
1375

1376
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1377
  int32_t code = TSDB_CODE_SUCCESS;
2,148✔
1378
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
2,148✔
1379
  if (NULL == pNum) {
2,148✔
1380
    uint32_t n = 1;
2,148✔
1381
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
2,148✔
1382
    if (code) {
2,148✔
1383
      return code;
×
1384
    }
1385
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
2,148✔
1386
    if (code) {
2,148✔
1387
      return code;
×
1388
    }
1389
    return TSDB_CODE_SUCCESS;
2,148✔
1390
  }
1391

1392
  switch (*pNum) {
×
1393
    case 0:
×
1394
      break;
×
1395
    case UINT32_MAX:
×
1396
      *pNum = 0;
×
1397
      break;
×
1398
    default:
×
1399
      if (1 == (*pNum)) {
×
1400
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1401
        if (code) {
×
1402
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1403
          QRY_ERR_RET(code);
×
1404
        }
1405
      }
1406
      (*pNum)++;
×
1407
      break;
×
1408
  }
1409
  
1410
  return TSDB_CODE_SUCCESS;
×
1411
}
1412

1413

1414
static void freeStbJoinTableList(SStbJoinTableList* pList) {
69,561✔
1415
  if (NULL == pList) {
69,561✔
1416
    return;
×
1417
  }
1418
  taosMemoryFree(pList->pLeftVg);
69,561✔
1419
  taosMemoryFree(pList->pLeftUid);
69,561✔
1420
  taosMemoryFree(pList->pRightVg);
69,561✔
1421
  taosMemoryFree(pList->pRightUid);
69,561✔
1422
  taosMemoryFree(pList);
69,561✔
1423
}
1424

1425
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
70,042✔
1426
  int32_t code = TSDB_CODE_SUCCESS;
70,042✔
1427
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
70,042✔
1428
  if (NULL == pNew) {
70,042✔
1429
    return terrno;
×
1430
  }
1431
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
70,042✔
1432
  if (NULL == pNew->pLeftVg) {
70,042✔
1433
    code = terrno;
×
1434
    freeStbJoinTableList(pNew);
×
1435
    return code;
×
1436
  }
1437
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
70,042✔
1438
  if (NULL == pNew->pLeftUid) {
70,042✔
1439
    code = terrno;
×
1440
    freeStbJoinTableList(pNew);
×
1441
    return code;
×
1442
  }
1443
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
70,042✔
1444
  if (NULL == pNew->pRightVg) {
70,042✔
1445
    code = terrno;
×
1446
    freeStbJoinTableList(pNew);
×
1447
    return code;
×
1448
  }
1449
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
70,042✔
1450
  if (NULL == pNew->pRightUid) {
70,042✔
1451
    code = terrno;
×
1452
    freeStbJoinTableList(pNew);
×
1453
    return code;
×
1454
  }
1455

1456
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
70,042✔
1457
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
70,042✔
1458
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
70,042✔
1459
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
70,042✔
1460

1461
  pNew->readIdx = 0;
70,042✔
1462
  pNew->uidNum = rows;
70,042✔
1463
  pNew->pNext = NULL;
70,042✔
1464
  
1465
  if (pCtx->pListTail) {
70,042✔
1466
    pCtx->pListTail->pNext = pNew;
×
1467
    pCtx->pListTail = pNew;
×
1468
  } else {
1469
    pCtx->pListHead = pNew;
70,042✔
1470
    pCtx->pListTail= pNew;
70,042✔
1471
  }
1472

1473
  return TSDB_CODE_SUCCESS;
70,042✔
1474
}
1475

1476
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
70,042✔
1477
  int32_t                    code = TSDB_CODE_SUCCESS;
70,042✔
1478
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
70,042✔
1479
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
70,042✔
1480
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
70,042✔
1481
  if (NULL == pVg0) {
70,042✔
1482
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1483
  }
1484
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
70,042✔
1485
  if (NULL == pVg1) {
70,042✔
1486
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1487
  }
1488
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
70,042✔
1489
  if (NULL == pUid0) {
70,042✔
1490
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1491
  }
1492
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
70,042✔
1493
  if (NULL == pUid1) {
70,042✔
1494
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1495
  }
1496

1497
  if (pStbJoin->basic.batchFetch) {
70,042✔
1498
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,651,247✔
1499
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
3,582,279✔
1500
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
3,582,279✔
1501
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
3,582,279✔
1502
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
3,582,279✔
1503

1504
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
3,582,279✔
1505
      if (TSDB_CODE_SUCCESS != code) {
3,582,279✔
1506
        break;
×
1507
      }
1508
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
3,582,279✔
1509
      if (TSDB_CODE_SUCCESS != code) {
3,582,279✔
1510
        break;
×
1511
      }
1512
    }
1513
  } else {
1514
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3,222✔
1515
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2,148✔
1516
    
1517
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
2,148✔
1518
      if (TSDB_CODE_SUCCESS != code) {
2,148✔
1519
        break;
×
1520
      }
1521
    }
1522
  }
1523

1524
  if (TSDB_CODE_SUCCESS == code) {
70,042✔
1525
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
70,042✔
1526
    if (TSDB_CODE_SUCCESS == code) {
70,042✔
1527
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
70,042✔
1528
    }
1529
  }
1530

1531
_return:
×
1532

1533
  if (TSDB_CODE_SUCCESS != code) {
70,042✔
1534
    pOperator->pTaskInfo->code = code;
×
1535
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1536
  }
1537
}
70,042✔
1538

1539

1540
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1,144,475✔
1541
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,144,475✔
1542
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,144,475✔
1543

1544
  if (pStbJoin->basic.batchFetch) {
1,144,475✔
1545
    return;
1,143,401✔
1546
  }
1547

1548
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
1,074✔
1549
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
1,074✔
1550
    return;
1,074✔
1551
  }
1552

1553
  uint64_t* pUid = NULL;
×
1554
  int32_t iter = 0;
×
1555
  int32_t code = 0;
×
1556
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1557
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1558
    if (code) {
×
1559
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1560
    }
1561
  }
1562

1563
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1564
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1565

1566
/*
1567
  // debug only
1568
  iter = 0;
1569
  uint32_t* num = NULL;
1570
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1571
    A S S E R T(*num > 1);
1572
  }
1573
*/  
1574
}
1575

1576
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1,144,475✔
1577
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,144,475✔
1578
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,144,475✔
1579

1580
  while (true) {
70,042✔
1581
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,214,517✔
1582
    if (NULL == pBlock) {
1,214,517✔
1583
      break;
1,144,475✔
1584
    }
1585

1586
    pStbJoin->execInfo.prevBlkNum++;
70,042✔
1587
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
70,042✔
1588
    
1589
    doBuildStbJoinTableHash(pOperator, pBlock);
70,042✔
1590
  }
1591

1592
  postProcessStbJoinTableHash(pOperator);
1,144,475✔
1593

1594
  pStbJoin->ctx.prev.joinBuild = true;
1,144,475✔
1595
}
1,144,475✔
1596

1597
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
274,681✔
1598
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
274,681✔
1599
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
274,681✔
1600
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
274,681✔
1601
  SStbJoinTableList*         pNode = pPrev->pListHead;
274,681✔
1602

1603
  while (pNode) {
3,721,625✔
1604
    if (pNode->readIdx >= pNode->uidNum) {
3,652,064✔
1605
      pPrev->pListHead = pNode->pNext;
69,561✔
1606
      freeStbJoinTableList(pNode);
69,561✔
1607
      pNode = pPrev->pListHead;
69,561✔
1608
      continue;
69,561✔
1609
    }
1610
    
1611
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
3,582,503✔
1612
    if (*ppRes) {
3,582,503✔
1613
      return TSDB_CODE_SUCCESS;
205,120✔
1614
    }
1615

1616
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
3,377,383✔
1617
    pPrev->pListHead->readIdx++;
3,377,383✔
1618
  }
1619

1620
  *ppRes = NULL;
69,561✔
1621
  setOperatorCompleted(pOperator);
69,561✔
1622

1623
  return TSDB_CODE_SUCCESS;
69,561✔
1624
}
1625

1626
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
1,349,114✔
1627
  if (pBlock) {
1,349,114✔
1628
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
205,120✔
1629
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
205,120✔
1630
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
205,120✔
1631

1632
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
207,268✔
1633
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,148✔
1634
        if (pSlot == NULL) {
2,148✔
1635
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1636
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1637
        }
1638
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,148✔
1639
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,148✔
1640
        if (code != TSDB_CODE_SUCCESS) {
2,148✔
1641
          return code;
×
1642
        }
1643
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,148✔
1644
        if (code != TSDB_CODE_SUCCESS) {
2,148✔
1645
          return code;
×
1646
        }
1647
      }
1648
    } else {
1649
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1650
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1651
    }
1652
  }
1653
  return TSDB_CODE_SUCCESS;
1,349,114✔
1654
}
1655

1656
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,376,991✔
1657
  int32_t                    code = TSDB_CODE_SUCCESS;
1,376,991✔
1658
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,376,991✔
1659
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,376,991✔
1660

1661
  QRY_PARAM_CHECK(pRes);
1,376,991✔
1662
  if (pOperator->status == OP_EXEC_DONE) {
1,376,991✔
1663
    return code;
27,877✔
1664
  }
1665

1666
  int64_t st = 0;
1,349,114✔
1667
  if (pOperator->cost.openCost == 0) {
1,349,114✔
1668
    st = taosGetTimestampUs();
1,143,985✔
1669
  }
1670

1671
  if (!pStbJoin->ctx.prev.joinBuild) {
1,349,114✔
1672
    buildStbJoinTableList(pOperator);
1,144,475✔
1673
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1,144,475✔
1674
      setOperatorCompleted(pOperator);
1,074,433✔
1675
      goto _return;
1,074,433✔
1676
    }
1677
  }
1678

1679
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
274,681✔
1680
  if (*pRes) {
274,681✔
1681
    goto _return;
×
1682
  }
1683

1684
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
274,681✔
1685

1686
_return:
274,681✔
1687
  if (pOperator->cost.openCost == 0) {
1,349,114✔
1688
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,143,985✔
1689
  }
1690

1691
  if (code) {
1,349,114✔
1692
    qError("%s failed since %s", __func__, tstrerror(code));
×
1693
    pOperator->pTaskInfo->code = code;
×
1694
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1695
  } else {
1696
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
1,349,114✔
1697
  }
1698
  return code;
1,349,114✔
1699
}
1700

1701
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,135,941✔
1702
  int32_t                    lino = 0;
1,135,941✔
1703
  SOperatorInfo*             operator=(SOperatorInfo*) param;
1,135,941✔
1704
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
1,135,941✔
1705

1706
  if (TSDB_CODE_SUCCESS != code) {
1,135,941✔
1707
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1708
    if (operator->pTaskInfo->code != code) {
×
1709
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1710
             tstrerror(operator->pTaskInfo->code));
1711
    } else {
1712
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1713
    }
1714
    goto _return;
×
1715
  }
1716

1717
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
1,135,941✔
1718
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
1,135,941✔
1719

1720
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
1,135,941✔
1721
  QUERY_CHECK_CODE(code, lino, _return);
1,135,941✔
1722

1723
  taosMemoryFreeClear(pMsg->pData);
1,135,941✔
1724

1725
  code = tsem_post(&pScanResInfo->vtbScan.ready);
1,135,941✔
1726
  QUERY_CHECK_CODE(code, lino, _return);
1,135,941✔
1727

1728
  return code;
1,135,941✔
1729
_return:
×
1730
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1731
  return code;
×
1732
}
1733

1734
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
1,135,941✔
1735
  int32_t                    code = TSDB_CODE_SUCCESS;
1,135,941✔
1736
  int32_t                    lino = 0;
1,135,941✔
1737
  char*                      buf1 = NULL;
1,135,941✔
1738
  SUseDbReq*                 pReq = NULL;
1,135,941✔
1739
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
1,135,941✔
1740

1741
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
1,135,941✔
1742
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
1,135,941✔
1743
  code = tNameGetFullDbName(name, pReq->db);
1,135,941✔
1744
  QUERY_CHECK_CODE(code, lino, _return);
1,135,941✔
1745
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
1,135,941✔
1746
  buf1 = taosMemoryCalloc(1, contLen);
1,135,941✔
1747
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
1,135,941✔
1748
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
1,135,941✔
1749
  if (tempRes < 0) {
1,135,941✔
1750
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1751
  }
1752

1753
  // send the fetch remote task result request
1754
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,135,941✔
1755
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
1,135,941✔
1756

1757
  pMsgSendInfo->param = pOperator;
1,135,941✔
1758
  pMsgSendInfo->msgInfo.pData = buf1;
1,135,941✔
1759
  pMsgSendInfo->msgInfo.len = contLen;
1,135,941✔
1760
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
1,135,941✔
1761
  pMsgSendInfo->fp = dynProcessUseDbRsp;
1,135,941✔
1762
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
1,135,941✔
1763

1764
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
1,135,941✔
1765
  QUERY_CHECK_CODE(code, lino, _return);
1,135,941✔
1766

1767
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
1,135,941✔
1768
  QUERY_CHECK_CODE(code, lino, _return);
1,135,941✔
1769

1770
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
1,135,941✔
1771
  QUERY_CHECK_CODE(code, lino, _return);
1,135,941✔
1772

1773
_return:
1,135,941✔
1774
  if (code) {
1,135,941✔
1775
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1776
     taosMemoryFree(buf1);
×
1777
  }
1778
  taosMemoryFree(pReq);
1,135,941✔
1779
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
1,135,941✔
1780
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
1,135,941✔
1781
  return code;
1,135,941✔
1782
}
1783

1784
int dynVgInfoComp(const void* lp, const void* rp) {
2,262,006✔
1785
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
2,262,006✔
1786
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
2,262,006✔
1787
  if (pLeft->hashBegin < pRight->hashBegin) {
2,262,006✔
1788
    return -1;
2,262,006✔
1789
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1790
    return 1;
×
1791
  }
1792

1793
  return 0;
×
1794
}
1795

1796
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
6,268,922✔
1797
  if (NULL == dbInfo) {
6,268,922✔
1798
    return TSDB_CODE_SUCCESS;
×
1799
  }
1800

1801
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
6,268,922✔
1802
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
1,135,941✔
1803
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
1,135,941✔
1804
    if (NULL == dbInfo->vgArray) {
1,135,941✔
1805
      return terrno;
×
1806
    }
1807

1808
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
1,135,941✔
1809
    while (pIter) {
3,402,885✔
1810
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
4,533,888✔
1811
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1812
        return terrno;
×
1813
      }
1814

1815
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
2,266,944✔
1816
    }
1817

1818
    taosArraySort(dbInfo->vgArray, sort_func);
1,135,941✔
1819
  }
1820

1821
  return TSDB_CODE_SUCCESS;
6,268,922✔
1822
}
1823

1824
int32_t dynHashValueComp(void const* lp, void const* rp) {
9,713,067✔
1825
  uint32_t*    key = (uint32_t*)lp;
9,713,067✔
1826
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
9,713,067✔
1827

1828
  if (*key < pVg->hashBegin) {
9,713,067✔
1829
    return -1;
×
1830
  } else if (*key > pVg->hashEnd) {
9,713,067✔
1831
    return 1;
3,444,145✔
1832
  }
1833

1834
  return 0;
6,268,922✔
1835
}
1836

1837
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
6,268,922✔
1838
  int32_t code = 0;
6,268,922✔
1839
  int32_t lino = 0;
6,268,922✔
1840
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
6,268,922✔
1841
  QUERY_CHECK_CODE(code, lino, _return);
6,268,922✔
1842

1843
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
6,268,922✔
1844
  if (vgNum <= 0) {
6,268,922✔
1845
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1846
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1847
  }
1848

1849
  SVgroupInfo* vgInfo = NULL;
6,268,922✔
1850
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
6,268,922✔
1851
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
6,268,922✔
1852
  int32_t offset = (int32_t)strlen(tbFullName);
6,268,922✔
1853

1854
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
6,268,922✔
1855
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
12,537,844✔
1856
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
6,268,922✔
1857

1858
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
6,268,922✔
1859
  if (NULL == vgInfo) {
6,268,922✔
1860
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1861
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1862
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1863
  }
1864

1865
  *vgId = vgInfo->vgId;
6,268,922✔
1866

1867
_return:
6,268,922✔
1868
  return code;
6,268,922✔
1869
}
1870

1871
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
38,819,724✔
1872
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
38,819,724✔
1873
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
38,819,724✔
1874
  SArray *                   pColList = pVtbScan->readColList;
38,819,724✔
1875
  if (pVtbScan->scanAllCols) {
38,819,724✔
1876
    return true;
4,782,924✔
1877
  }
1878
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
183,981,738✔
1879
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
162,761,696✔
1880
      return true;
12,816,758✔
1881
    }
1882
  }
1883
  return false;
21,220,042✔
1884
}
1885

1886
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
17,625,030✔
1887
  int32_t                    code = TSDB_CODE_SUCCESS;
17,625,030✔
1888
  int32_t                    line = 0;
17,625,030✔
1889
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
17,625,030✔
1890
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
17,625,030✔
1891
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
17,625,030✔
1892
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
17,625,030✔
1893
  SUseDbOutput*              output = NULL;
17,625,030✔
1894
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
17,625,030✔
1895

1896
  QRY_PARAM_CHECK(dbVgInfo);
17,625,030✔
1897

1898
  if (find == NULL) {
17,625,030✔
1899
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
1,135,941✔
1900
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
1,135,941✔
1901
    QUERY_CHECK_CODE(code, line, _return);
1,135,941✔
1902
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
1,135,941✔
1903
    QUERY_CHECK_CODE(code, line, _return);
1,135,941✔
1904
  } else {
1905
    output = *find;
16,489,089✔
1906
  }
1907

1908
  *dbVgInfo = output->dbVgroup;
17,625,030✔
1909
  return code;
17,625,030✔
1910
_return:
×
1911
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1912
  freeUseDbOutput(output);
×
1913
  return code;
×
1914
}
1915

1916
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
17,625,030✔
1917
  int32_t     code = TSDB_CODE_SUCCESS;
17,625,030✔
1918
  int32_t     line = 0;
17,625,030✔
1919

1920
  const char *first_dot = strchr(colref, '.');
17,625,030✔
1921
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
17,625,030✔
1922

1923
  const char *second_dot = strchr(first_dot + 1, '.');
17,625,030✔
1924
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
17,625,030✔
1925

1926
  size_t db_len = first_dot - colref;
17,625,030✔
1927
  size_t table_len = second_dot - first_dot - 1;
17,625,030✔
1928
  size_t col_len = strlen(second_dot + 1);
17,625,030✔
1929

1930
  *refDb = taosMemoryMalloc(db_len + 1);
17,625,030✔
1931
  *refTb = taosMemoryMalloc(table_len + 1);
17,625,030✔
1932
  *refCol = taosMemoryMalloc(col_len + 1);
17,625,030✔
1933
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
17,625,030✔
1934
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
17,625,030✔
1935
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
17,625,030✔
1936

1937
  tstrncpy(*refDb, colref, db_len + 1);
17,625,030✔
1938
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
17,625,030✔
1939
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
17,625,030✔
1940

1941
  return TSDB_CODE_SUCCESS;
17,625,030✔
1942
_return:
×
1943
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1944
  if (*refDb) {
×
1945
    taosMemoryFree(*refDb);
×
1946
    *refDb = NULL;
×
1947
  }
1948
  if (*refTb) {
×
1949
    taosMemoryFree(*refTb);
×
1950
    *refTb = NULL;
×
1951
  }
1952
  if (*refCol) {
×
1953
    taosMemoryFree(*refCol);
×
1954
    *refCol = NULL;
×
1955
  }
1956
  return code;
×
1957
}
1958

1959
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
132,270,874✔
1960
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
132,270,874✔
1961
      strlen(expectTbName) == varDataLen(tbName) &&
63,429,148✔
1962
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
63,429,148✔
1963
      strlen(expectDbName) == varDataLen(dbName)) {
63,429,148✔
1964
    return true;
63,429,148✔
1965
  }
1966
  return false;
68,841,726✔
1967
}
1968

1969
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
63,429,148✔
1970
  int32_t          code = TSDB_CODE_SUCCESS;
63,429,148✔
1971
  int32_t          line = 0;
63,429,148✔
1972

1973
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
63,429,148✔
1974
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
63,429,148✔
1975
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
63,429,148✔
1976
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
63,429,148✔
1977
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
63,429,148✔
1978
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
63,429,148✔
1979

1980
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
63,429,148✔
1981
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
63,429,148✔
1982
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
63,429,148✔
1983
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
63,429,148✔
1984
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
63,429,148✔
1985
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
63,429,148✔
1986

1987
  if (colDataIsNull_s(pRefCol, index)) {
126,858,296✔
1988
    pInfo->colrefName = NULL;
24,603,218✔
1989
  } else {
1990
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
38,825,930✔
1991
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
38,825,930✔
1992
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
38,825,930✔
1993
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
38,825,930✔
1994
  }
1995

1996
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
63,429,148✔
1997
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
63,429,148✔
1998
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
63,429,148✔
1999
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
63,429,148✔
2000

2001
  if (!colDataIsNull_s(pUidCol, index)) {
126,858,296✔
2002
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
63,429,148✔
2003
  }
2004
  if (!colDataIsNull_s(pColIdCol, index)) {
126,858,296✔
2005
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
38,825,930✔
2006
  }
2007
  if (!colDataIsNull_s(pVgIdCol, index)) {
126,858,296✔
2008
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
63,429,148✔
2009
  }
2010

2011
_return:
×
2012
  return code;
63,429,148✔
2013
}
2014

2015
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,258,729✔
2016
  int32_t                    code = TSDB_CODE_SUCCESS;
1,258,729✔
2017
  int32_t                    line = 0;
1,258,729✔
2018

2019
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,258,729✔
2020
    return code;
1,082,052✔
2021
  }
2022

2023
  if (pVtbScan->existOrgTbVg == NULL) {
176,677✔
2024
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
2025
    pVtbScan->curOrgTbVg = NULL;
×
2026
  }
2027

2028
  if (pVtbScan->curOrgTbVg != NULL) {
176,677✔
2029
    // which means rversion has changed
2030
    void*   pCurIter = NULL;
9,860✔
2031
    SArray* tmpArray = NULL;
9,860✔
2032
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
28,494✔
2033
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
18,634✔
2034
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
18,634✔
2035
        if (tmpArray == NULL) {
2,354✔
2036
          tmpArray = taosArrayInit(1, sizeof(int32_t));
2,354✔
2037
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
2,354✔
2038
        }
2039
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
2,354✔
2040
      }
2041
    }
2042
    if (tmpArray == NULL) {
9,860✔
2043
      return TSDB_CODE_SUCCESS;
7,506✔
2044
    }
2045
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
2,354✔
2046
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,354✔
2047
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
2,354✔
2048
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
2049
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
2050
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
2051
        }
2052
        taosArrayDestroy(expiredInfo);
×
2053
      }
2054
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
2,354✔
2055
        taosArrayDestroy(tmpArray);
×
2056
      }
2057
    }
2058
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
2,354✔
2059
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
2,354✔
2060
    taosHashClear(pVtbScan->curOrgTbVg);
2,354✔
2061
    pVtbScan->needRedeploy = true;
2,354✔
2062
    pVtbScan->rversion = rversion;
2,354✔
2063
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,354✔
2064
  }
2065
  return code;
166,817✔
2066
_return:
×
2067
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2068
  return code;
×
2069
}
2070

2071
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
25,348✔
2072
  int32_t                    code =TSDB_CODE_SUCCESS;
25,348✔
2073
  int32_t                    line = 0;
25,348✔
2074
  char*                      refDbName = NULL;
25,348✔
2075
  char*                      refTbName = NULL;
25,348✔
2076
  char*                      refColName = NULL;
25,348✔
2077
  SDBVgInfo*                 dbVgInfo = NULL;
25,348✔
2078
  SName                      name = {0};
25,348✔
2079
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
25,348✔
2080
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
25,348✔
2081

2082
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
25,348✔
2083
  QUERY_CHECK_CODE(code, line, _return);
25,348✔
2084

2085
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
25,348✔
2086

2087
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
25,348✔
2088
  QUERY_CHECK_CODE(code, line, _return);
25,348✔
2089

2090
  code = tNameGetFullDbName(&name, dbFname);
25,348✔
2091
  QUERY_CHECK_CODE(code, line, _return);
25,348✔
2092

2093
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
25,348✔
2094
  QUERY_CHECK_CODE(code, line, _return);
25,348✔
2095

2096
_return:
25,348✔
2097
  if (code) {
25,348✔
2098
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2099
  }
2100
  taosMemoryFree(refDbName);
25,348✔
2101
  taosMemoryFree(refTbName);
25,348✔
2102
  taosMemoryFree(refColName);
25,348✔
2103
  return code;
25,348✔
2104
}
2105

2106
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
18,064✔
2107
  int32_t code = TSDB_CODE_SUCCESS;
18,064✔
2108
  int32_t line = 0;
18,064✔
2109
  STagVal tagVal = {0};
18,064✔
2110
  // last col is uid
2111

2112
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
18,064✔
2113
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
18,064✔
2114

2115
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
36,128✔
2116
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
18,064✔
2117
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
18,064✔
2118
    tagVal.type = pTagCol->info.type;
18,064✔
2119
    tagVal.cid = pTagCol->info.colId;
18,064✔
2120
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
36,128✔
2121
      char*   pData = colDataGetData(pTagCol, rowIdx);
18,064✔
2122
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
18,064✔
UNCOV
2123
        tagVal.nData = varDataLen(pData);
×
UNCOV
2124
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
×
UNCOV
2125
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
×
UNCOV
2126
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
×
UNCOV
2127
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2128
      } else {
2129
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
18,064✔
2130
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
36,128✔
2131
      }
2132
    } else {
2133
      tagVal.pData = NULL;
×
2134
      tagVal.nData = 0;
×
2135
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
2136
    }
2137
    tagVal = (STagVal){0};
18,064✔
2138
  }
2139
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
18,064✔
2140
  QUERY_CHECK_CODE(code, line, _return);
18,064✔
2141

2142
  return code;
18,064✔
2143
_return:
×
2144
  if (tagVal.pData) {
×
2145
    taosMemoryFreeClear(tagVal.pData);
×
2146
  }
2147
  if (pTagList) {
×
2148
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
2149
  }
2150
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2151
  return code;
×
2152
}
2153

2154
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
4,089,969✔
2155
  int32_t                    code = TSDB_CODE_SUCCESS;
4,089,969✔
2156
  int32_t                    line = 0;
4,089,969✔
2157
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,089,969✔
2158
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,089,969✔
2159
  SDBVgInfo*                 dbVgInfo = NULL;
4,089,969✔
2160

2161
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
67,507,133✔
2162
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
63,417,164✔
2163
    *uid = pKV->uid;
63,417,164✔
2164
    *vgId = pKV->vgId;
63,417,164✔
2165
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
63,417,164✔
2166
      char*   refDbName = NULL;
17,599,682✔
2167
      char*   refTbName = NULL;
17,599,682✔
2168
      char*   refColName = NULL;
17,599,682✔
2169
      SName   name = {0};
17,599,682✔
2170
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
17,599,682✔
2171
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
17,599,682✔
2172

2173
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
17,599,682✔
2174
      QUERY_CHECK_CODE(code, line, _return);
17,599,682✔
2175

2176
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
17,599,682✔
2177

2178
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
17,599,682✔
2179
      QUERY_CHECK_CODE(code, line, _return);
17,599,682✔
2180
      code = tNameGetFullDbName(&name, dbFname);
17,599,682✔
2181
      QUERY_CHECK_CODE(code, line, _return);
17,599,682✔
2182
      code = tNameGetFullTableName(&name, orgTbFName);
17,599,682✔
2183
      QUERY_CHECK_CODE(code, line, _return);
17,599,682✔
2184

2185
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
17,599,682✔
2186
      if (!pVal) {
17,599,682✔
2187
        SOrgTbInfo orgTbInfo = {0};
6,243,574✔
2188
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
6,243,574✔
2189
        QUERY_CHECK_CODE(code, line, _return);
6,243,574✔
2190
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
6,243,574✔
2191
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
6,243,574✔
2192
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
6,243,574✔
2193
        SColIdNameKV colIdNameKV = {0};
6,243,574✔
2194
        colIdNameKV.colId = pKV->colId;
6,243,574✔
2195
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
6,243,574✔
2196
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
12,487,148✔
2197
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
6,243,574✔
2198
        QUERY_CHECK_CODE(code, line, _return);
6,243,574✔
2199
      } else {
2200
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
11,356,108✔
2201
        SColIdNameKV colIdNameKV = {0};
11,356,108✔
2202
        colIdNameKV.colId = pKV->colId;
11,356,108✔
2203
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
11,356,108✔
2204
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
22,712,216✔
2205
      }
2206
      taosMemoryFree(refDbName);
17,599,682✔
2207
      taosMemoryFree(refTbName);
17,599,682✔
2208
      taosMemoryFree(refColName);
17,599,682✔
2209
    }
2210
  }
2211

2212
_return:
4,089,969✔
2213
  if (code) {
4,089,969✔
2214
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2215
  }
2216
  return code;
4,089,969✔
2217
}
2218

2219
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
4,516✔
2220
  int32_t                    code = TSDB_CODE_SUCCESS;
4,516✔
2221
  int32_t                    line = 0;
4,516✔
2222
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,516✔
2223
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,516✔
2224
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,516✔
2225
  SArray*                    pColRefArray = NULL;
4,516✔
2226
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
4,516✔
2227
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
4,516✔
2228

2229
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,516✔
2230
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
4,516✔
2231
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
4,516✔
2232
  if (hasPartition) {
4,516✔
UNCOV
2233
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2234
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2235
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
×
UNCOV
2236
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
×
UNCOV
2237
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
×
2238
  }
2239

2240
  while (true) {
9,032✔
2241
    SSDataBlock *pTagVal = NULL;
13,548✔
2242
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
13,548✔
2243
    QUERY_CHECK_CODE(code, line, _return);
13,548✔
2244
    if (pTagVal == NULL) {
13,548✔
2245
      break;
4,516✔
2246
    }
2247
    SHashObj *vtbUidTagListMap = NULL;
9,032✔
2248
    if (hasPartition) {
9,032✔
UNCOV
2249
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
UNCOV
2250
      if (pIter) {
×
UNCOV
2251
        vtbUidTagListMap = *(SHashObj**)pIter;
×
2252
      } else {
UNCOV
2253
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2254
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
×
UNCOV
2255
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
×
2256

UNCOV
2257
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
×
UNCOV
2258
        QUERY_CHECK_CODE(code, line, _return);
×
2259
      }
2260
    } else {
2261
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
9,032✔
2262
    }
2263

2264
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
9,032✔
2265
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
9,032✔
2266
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
27,096✔
2267
      tb_uid_t uid = 0;
18,064✔
2268
      if (!colDataIsNull_s(pUidCol, i)) {
36,128✔
2269
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
18,064✔
2270
        QUERY_CHECK_CODE(code, line, _return);
18,064✔
2271
      }
2272

2273
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
18,064✔
2274
      QUERY_CHECK_CODE(code, line, _return);
18,064✔
2275

2276
      if (hasPartition) {
18,064✔
UNCOV
2277
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
UNCOV
2278
        QUERY_CHECK_CODE(code, line, _return);
×
2279
      }
2280
    }
2281
  }
2282

2283
  return code;
4,516✔
2284

2285
_return:
×
2286
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2287
  return code;
×
2288
}
2289

2290
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
4,516✔
2291
  int32_t                    code = TSDB_CODE_SUCCESS;
4,516✔
2292
  int32_t                    line = 0;
4,516✔
2293
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,516✔
2294
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,516✔
2295
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,516✔
2296
  SArray*                    pColRefArray = NULL;
4,516✔
2297
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
4,516✔
2298
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
4,516✔
2299

2300
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,516✔
2301
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
4,516✔
2302

2303
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
22,580✔
2304
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
18,064✔
2305
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
18,064✔
2306

2307
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
18,064✔
2308
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
18,064✔
2309

2310
    tb_uid_t uid = 0;
18,064✔
2311
    int32_t  vgId = 0;
18,064✔
2312
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
18,064✔
2313
    QUERY_CHECK_CODE(code, line, _return);
18,064✔
2314

2315
    size_t len = 0;
18,064✔
2316
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
18,064✔
2317
    while (pOrgTbInfo != NULL) {
58,708✔
2318
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
40,644✔
2319
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
40,644✔
2320

2321
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
40,644✔
2322
      if (!pIter) {
40,644✔
2323
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
27,096✔
2324
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
27,096✔
2325
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
54,192✔
2326
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
27,096✔
2327
        QUERY_CHECK_CODE(code, line, _return);
27,096✔
2328
      } else {
2329
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
13,548✔
2330
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
13,548✔
2331
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
13,548✔
2332
      }
2333

2334
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
40,644✔
2335

2336
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
40,644✔
2337
      QUERY_CHECK_CODE(code, line, _return);
40,644✔
2338
    }
2339

2340
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
18,064✔
2341
    QUERY_CHECK_CODE(code, line, _return);
18,064✔
2342
  }
2343

2344
  return code;
4,516✔
2345
_return:
×
2346
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2347
  return code;
×
2348
}
2349

2350
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
4,516✔
2351
  int32_t                    code = TSDB_CODE_SUCCESS;
4,516✔
2352
  int32_t                    line = 0;
4,516✔
2353

2354
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
4,516✔
2355
  QUERY_CHECK_CODE(code, line, _return);
4,516✔
2356

2357
  // process tag
2358
  code = getTagBlockAndProcess(pOperator, hasPartition);
4,516✔
2359
  QUERY_CHECK_CODE(code, line, _return);
4,516✔
2360

2361
  return code;
4,516✔
2362
_return:
×
2363
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2364
  return code;
×
2365
}
2366

2367
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
110,061✔
2368
  int32_t                    code = TSDB_CODE_SUCCESS;
110,061✔
2369
  int32_t                    line = 0;
110,061✔
2370
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
110,061✔
2371
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
110,061✔
2372
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
110,061✔
2373
  SArray*                    pColRefArray = NULL;
110,061✔
2374
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
110,061✔
2375
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
110,061✔
2376

2377
  if (hasPartition) {
110,061✔
2378
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,572✔
2379
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
4,572✔
2380
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
4,572✔
2381

2382
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
4,572✔
2383
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
4,572✔
2384
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
4,572✔
2385
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
4,572✔
2386
  } else {
2387
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
105,489✔
2388
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
105,489✔
2389
  }
2390

2391
  while (true && hasPartition) {
128,349✔
2392
    SSDataBlock* pTagVal = NULL;
22,860✔
2393
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
22,860✔
2394
    QUERY_CHECK_CODE(code, line, _return);
22,860✔
2395
    if (pTagVal == NULL) {
22,860✔
2396
      break;
4,572✔
2397
    }
2398

2399
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
18,288✔
2400
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
18,288✔
2401
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
36,576✔
2402
      tb_uid_t uid = 0;
18,288✔
2403
      if (!colDataIsNull_s(pUidCol, i)) {
36,576✔
2404
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
18,288✔
2405
        QUERY_CHECK_CODE(code, line, _return);
18,288✔
2406
      }
2407
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
18,288✔
2408
      QUERY_CHECK_CODE(code, line, _return);
18,288✔
2409
    }
2410
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
18,288✔
2411
    QUERY_CHECK_CODE(code, line, _return);
18,288✔
2412
  }
2413

2414
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
550,305✔
2415
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
440,244✔
2416
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
440,244✔
2417
    tb_uid_t uid = 0;
440,244✔
2418
    int32_t  vgId = 0;
440,244✔
2419
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
440,244✔
2420
    QUERY_CHECK_CODE(code, line, _return);
440,244✔
2421

2422
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
440,244✔
2423
    if (hasPartition) {
440,244✔
2424
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
18,288✔
2425
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
18,288✔
2426

2427
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
18,288✔
2428
      if (pHashIter) {
18,288✔
UNCOV
2429
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
×
2430
      } else {
2431
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
18,288✔
2432
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
18,288✔
2433
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
18,288✔
2434
        QUERY_CHECK_CODE(code, line, _return);
18,288✔
2435
      }
2436
    } else {
2437
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
421,956✔
2438
    }
2439

2440
    size_t len = 0;
440,244✔
2441
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
440,244✔
2442
    while (pOrgTbInfo != NULL) {
1,149,613✔
2443
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
709,369✔
2444
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
709,369✔
2445
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
709,369✔
2446
      if (!pIter) {
709,369✔
2447
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
231,552✔
2448
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
231,552✔
2449
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
463,104✔
2450
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
231,552✔
2451
        QUERY_CHECK_CODE(code, line, _return);
231,552✔
2452
      } else {
2453
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
477,817✔
2454
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
477,817✔
2455
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
477,817✔
2456
      }
2457

2458
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
709,369✔
2459

2460
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
709,369✔
2461
      QUERY_CHECK_CODE(code, line, _return);
709,369✔
2462
    }
2463
  }
2464
  return code;
110,061✔
2465
_return:
×
2466
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2467
  return code;
×
2468
}
2469

2470
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
1,202,209✔
2471
  int32_t                    code = TSDB_CODE_SUCCESS;
1,202,209✔
2472
  int32_t                    line = 0;
1,202,209✔
2473
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,202,209✔
2474
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,202,209✔
2475
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,202,209✔
2476
  SArray*                    pColRefArray = NULL;
1,202,209✔
2477
  SOperatorInfo*             pSystableScanOp = NULL;
1,202,209✔
2478
  
2479
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,202,209✔
2480
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
1,202,209✔
2481

2482
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
1,202,209✔
2483
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
21,507✔
2484
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
21,507✔
2485
    pSystableScanOp = pOperator->pDownstream[0];
21,507✔
2486
  } else if (pInfo->qType == DYN_QTYPE_VTB_WINDOW) {
1,180,702✔
2487
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
93,070✔
2488
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
93,070✔
2489
    pSystableScanOp = pOperator->pDownstream[1];
93,070✔
2490
  } else {
2491
    pSystableScanOp = pOperator->pDownstream[1];
1,087,632✔
2492
  }
2493

2494
  while (true) {
2,406,717✔
2495
    SSDataBlock *pChildInfo = NULL;
3,608,926✔
2496
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
3,608,926✔
2497
    QUERY_CHECK_CODE(code, line, _return);
3,608,926✔
2498
    if (pChildInfo == NULL) {
3,608,926✔
2499
      break;
1,202,209✔
2500
    }
2501
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
2,406,717✔
2502
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
2,406,717✔
2503
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
2,406,717✔
2504

2505
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
2,406,717✔
2506
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
2,406,717✔
2507
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
2,406,717✔
2508

2509
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
122,283,385✔
2510
      if (!colDataIsNull_s(pStbNameCol, i)) {
239,753,336✔
2511
        char* stbrawname = colDataGetData(pStbNameCol, i);
119,876,668✔
2512
        char* dbrawname = colDataGetData(pDbNameCol, i);
119,876,668✔
2513
        char *ctbName = colDataGetData(pTableNameCol, i);
119,876,668✔
2514

2515
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
119,876,668✔
2516
          SColRefInfo info = {0};
62,577,724✔
2517
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
62,577,724✔
2518
          QUERY_CHECK_CODE(code, line, _return);
62,577,724✔
2519

2520
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
62,577,724✔
2521
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
55,244,796✔
2522
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2523
                     pInfo->vtbScan.dynTbUid);
2524
              destroyColRefInfo(&info);
×
2525
              continue;
×
2526
            }
2527

2528
            if (pTaskInfo->pStreamRuntimeInfo) {
55,244,796✔
2529
              if (pVtbScan->curOrgTbVg == NULL) {
34,368✔
2530
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,292✔
2531
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
1,292✔
2532
              }
2533

2534
              if (info.colrefName) {
34,368✔
2535
                int32_t vgId;
18,500✔
2536
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
18,500✔
2537
                QUERY_CHECK_CODE(code, line, _return);
18,500✔
2538
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
18,500✔
2539
                QUERY_CHECK_CODE(code, line, _return);
18,500✔
2540
              }
2541
            }
2542
          }
2543

2544
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
62,577,724✔
2545
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
3,921,868✔
2546
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
3,921,868✔
2547
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
7,843,736✔
2548
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
3,921,868✔
2549
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
7,843,736✔
2550
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
3,921,868✔
2551
            QUERY_CHECK_CODE(code, line, _return);
3,921,868✔
2552
          } else {
2553
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
58,655,856✔
2554
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
58,655,856✔
2555
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
58,655,856✔
2556
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
58,655,856✔
2557
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
117,311,712✔
2558
          }
2559
        }
2560
      }
2561
    }
2562
  }
2563

2564
  switch (pInfo->qType) {
1,202,209✔
2565
    case DYN_QTYPE_VTB_WINDOW: {
93,070✔
2566
      code = buildOrgTbInfoBatch(pOperator, false);
93,070✔
2567
      break;
93,070✔
2568
    }
2569
    case DYN_QTYPE_VTB_AGG: {
21,507✔
2570
      if (pVtbScan->batchProcessChild) {
21,507✔
2571
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
16,991✔
2572
      } else {
2573
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
4,516✔
2574
      }
2575
      break;
21,507✔
2576
    }
2577
    case DYN_QTYPE_VTB_SCAN: {
1,087,632✔
2578
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,087,632✔
2579
      break;
1,087,632✔
2580
    }
2581
    default: {
×
2582
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2583
      break;
×
2584
    }
2585
  }
2586

2587
  QUERY_CHECK_CODE(code, line, _return);
1,202,209✔
2588

2589
_return:
1,202,209✔
2590
  if (code) {
1,202,209✔
2591
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,070✔
2592
  }
2593
  return code;
1,202,209✔
2594
}
2595

2596
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
171,097✔
2597
  int32_t                    code = TSDB_CODE_SUCCESS;
171,097✔
2598
  int32_t                    line = 0;
171,097✔
2599
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
171,097✔
2600
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
171,097✔
2601
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
171,097✔
2602
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
171,097✔
2603
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
171,097✔
2604
  int32_t                    rversion = 0;
171,097✔
2605

2606
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
171,097✔
2607
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
171,097✔
2608

2609
  while (true) {
338,569✔
2610
    SSDataBlock *pTableInfo = NULL;
509,666✔
2611
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
509,666✔
2612
    if (pTableInfo == NULL) {
509,666✔
2613
      break;
171,097✔
2614
    }
2615

2616
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
338,569✔
2617
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
338,569✔
2618
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
338,569✔
2619

2620
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
338,569✔
2621
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
338,569✔
2622
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
338,569✔
2623

2624
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
12,732,775✔
2625
      if (!colDataIsNull_s(pRefVerCol, i)) {
24,788,412✔
2626
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
12,394,206✔
2627
      }
2628

2629
      if (!colDataIsNull_s(pTableNameCol, i)) {
24,788,412✔
2630
        char* tbrawname = colDataGetData(pTableNameCol, i);
12,394,206✔
2631
        char* dbrawname = colDataGetData(pDbNameCol, i);
12,394,206✔
2632
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
12,394,206✔
2633
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
12,394,206✔
2634

2635
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
12,394,206✔
2636
          SColRefInfo info = {0};
851,424✔
2637
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
851,424✔
2638
          QUERY_CHECK_CODE(code, line, _return);
851,424✔
2639

2640
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
851,424✔
2641
            if (pVtbScan->curOrgTbVg == NULL) {
6,848✔
2642
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
428✔
2643
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
428✔
2644
            }
2645
            int32_t vgId;
6,848✔
2646
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
6,848✔
2647
            QUERY_CHECK_CODE(code, line, _return);
6,848✔
2648
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
6,848✔
2649
            QUERY_CHECK_CODE(code, line, _return);
6,848✔
2650
          }
2651

2652
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,702,848✔
2653
        }
2654
      }
2655
    }
2656
  }
2657
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
171,097✔
2658
  QUERY_CHECK_CODE(code, line, _return);
171,097✔
2659

2660
_return:
169,813✔
2661
  if (code) {
171,097✔
2662
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,284✔
2663
  }
2664
  return code;
171,097✔
2665
}
2666

2667
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
1,114,512✔
2668
  int32_t                    code = TSDB_CODE_SUCCESS;
1,114,512✔
2669
  int32_t                    line = 0;
1,114,512✔
2670
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,114,512✔
2671
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,114,512✔
2672
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,114,512✔
2673

2674
  SArray *tmpArray = NULL;
1,114,512✔
2675
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
1,114,512✔
2676
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
1,114,512✔
2677
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
4,708✔
2678
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
2,354✔
2679
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
2,354✔
2680
      QUERY_CHECK_CODE(code, line, _return);
2,354✔
2681
      if (pVtbScan->newAddedVgInfo == NULL) {
2,354✔
2682
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
856✔
2683
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
856✔
2684
      }
2685
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
2,354✔
2686
      QUERY_CHECK_CODE(code, line, _return);
2,354✔
2687
    }
2688
    pVtbScan->needRedeploy = false;
2,354✔
2689
  } else {
2690
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
1,112,158✔
2691
    QUERY_CHECK_CODE(code, line, _return);
1,112,158✔
2692
  }
2693

2694
_return:
×
2695
  taosArrayClear(tmpArray);
1,114,512✔
2696
  taosArrayDestroy(tmpArray);
1,114,512✔
2697
  if (code) {
1,114,512✔
2698
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,112,158✔
2699
  }
2700
  return code;
1,114,512✔
2701
}
2702

2703
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
3,631,661✔
2704
  int32_t                    code = TSDB_CODE_SUCCESS;
3,631,661✔
2705
  int32_t                    line = 0;
3,631,661✔
2706
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,631,661✔
2707
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,631,661✔
2708
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
3,631,661✔
2709

2710
  pVtbScan->vtbScanParam = NULL;
3,631,661✔
2711
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
3,631,661✔
2712
  QUERY_CHECK_CODE(code, line, _return);
3,631,661✔
2713

2714
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
3,631,661✔
2715
  while (pIter != NULL) {
9,125,222✔
2716
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
5,493,561✔
2717
    SOperatorParam*  pExchangeParam = NULL;
5,493,561✔
2718
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
5,493,561✔
2719
    if (addr != NULL) {
5,493,561✔
2720
      SDownstreamSourceNode newSource = {0};
2,354✔
2721
      newSource.type = QUERY_NODE_DOWNSTREAM_SOURCE;
2,354✔
2722
      newSource.clientId = pTaskInfo->id.taskId;// current task's taskid
2,354✔
2723
      newSource.taskId = addr->taskId;
2,354✔
2724
      newSource.fetchMsgType = TDMT_STREAM_FETCH;
2,354✔
2725
      newSource.localExec = false;
2,354✔
2726
      newSource.addr.nodeId = addr->nodeId;
2,354✔
2727
      memcpy(&newSource.addr.epSet, &addr->epset, sizeof(SEpSet));
2,354✔
2728

2729
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, &newSource);
2,354✔
2730
      QUERY_CHECK_CODE(code, line, _return);
2,354✔
2731
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
2,354✔
2732
      QUERY_CHECK_CODE(code, line, _return);
2,354✔
2733
    } else {
2734
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap, NULL);
5,491,207✔
2735
      QUERY_CHECK_CODE(code, line, _return);
5,491,207✔
2736
    }
2737
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
10,987,122✔
2738
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
5,493,561✔
2739
  }
2740

2741
  SOperatorParam*  pExchangeParam = NULL;
3,631,661✔
2742
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
3,631,661✔
2743
  QUERY_CHECK_CODE(code, line, _return);
3,631,661✔
2744
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
3,631,661✔
2745

2746
_return:
3,631,661✔
2747
  if (code) {
3,631,661✔
2748
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2749
  }
2750
  return code;
3,631,661✔
2751
}
2752

2753
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
9,792,028✔
2754
  int32_t                    code = TSDB_CODE_SUCCESS;
9,792,028✔
2755
  int32_t                    line = 0;
9,792,028✔
2756
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
9,792,028✔
2757
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
9,792,028✔
2758
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
9,792,028✔
2759

2760
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
9,792,028✔
2761
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
9,792,028✔
2762
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
9,792,028✔
2763

2764
  while (true) {
2765
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
12,207,742✔
2766
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
8,576,081✔
2767
      QUERY_CHECK_CODE(code, line, _return);
8,576,081✔
2768
    } else {
2769
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
3,631,661✔
2770
      SArray* pColRefInfo = NULL;
3,631,661✔
2771
      if (pVtbScan->isSuperTable) {
3,631,661✔
2772
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
3,461,848✔
2773
      } else {
2774
        pColRefInfo = pInfo->vtbScan.colRefInfo;
169,813✔
2775
      }
2776
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
3,631,661✔
2777

2778
      tb_uid_t uid = 0;
3,631,661✔
2779
      int32_t  vgId = 0;
3,631,661✔
2780
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
3,631,661✔
2781
      QUERY_CHECK_CODE(code, line, _return);
3,631,661✔
2782

2783
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
3,631,661✔
2784
      QUERY_CHECK_CODE(code, line, _return);
3,631,661✔
2785

2786
      // reset downstream operator's status
2787
      pVtbScanOp->status = OP_NOT_OPENED;
3,631,661✔
2788
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
3,631,661✔
2789
      QUERY_CHECK_CODE(code, line, _return);
3,631,047✔
2790
    }
2791

2792
    if (*pRes) {
12,207,128✔
2793
      // has result, still read data from this table.
2794
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
8,580,597✔
2795
      break;
8,580,597✔
2796
    } else {
2797
      // no result, read next table.
2798
      pVtbScan->curTableIdx++;
3,626,531✔
2799
      if (pVtbScan->isSuperTable) {
3,626,531✔
2800
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
3,456,718✔
2801
          setOperatorCompleted(pOperator);
1,041,004✔
2802
          break;
1,041,004✔
2803
        }
2804
      } else {
2805
        setOperatorCompleted(pOperator);
169,813✔
2806
        break;
169,813✔
2807
      }
2808
    }
2809
  }
2810

2811
_return:
9,791,414✔
2812
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
9,791,414✔
2813
  pVtbScan->otbNameToOtbInfoMap = NULL;
9,791,414✔
2814
  if (code) {
9,791,414✔
2815
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2816
  }
2817
  return code;
9,791,414✔
2818
}
2819

2820
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
9,834,810✔
2821
  int32_t                    code = TSDB_CODE_SUCCESS;
9,834,810✔
2822
  int32_t                    line = 0;
9,834,810✔
2823
  int64_t                    st = 0;
9,834,810✔
2824
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
9,834,810✔
2825
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
9,834,810✔
2826

2827
  if (OPTR_IS_OPENED(pOperator)) {
9,834,810✔
2828
    return code;
8,576,081✔
2829
  }
2830

2831
  if (pOperator->cost.openCost == 0) {
1,258,729✔
2832
    st = taosGetTimestampUs();
1,121,497✔
2833
  }
2834

2835
  if (pVtbScan->isSuperTable) {
1,258,729✔
2836
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,087,632✔
2837
    QUERY_CHECK_CODE(code, line, _return);
1,087,632✔
2838
  } else {
2839
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
171,097✔
2840
    QUERY_CHECK_CODE(code, line, _return);
171,097✔
2841
  }
2842

2843
  OPTR_SET_OPENED(pOperator);
1,256,375✔
2844

2845
_return:
1,258,729✔
2846
  if (pOperator->cost.openCost == 0) {
1,258,729✔
2847
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1,121,497✔
2848
  }
2849
  if (code) {
1,258,729✔
2850
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,354✔
2851
    pOperator->pTaskInfo->code = code;
2,354✔
2852
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,354✔
2853
  }
2854
  return code;
1,256,375✔
2855
}
2856

2857
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
10,946,968✔
2858
  int32_t                    code = TSDB_CODE_SUCCESS;
10,946,968✔
2859
  int32_t                    line = 0;
10,946,968✔
2860
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
10,946,968✔
2861
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
10,946,968✔
2862

2863
  QRY_PARAM_CHECK(pRes);
10,946,968✔
2864
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
10,946,968✔
2865
    return code;
×
2866
  }
2867
  if (pOperator->pOperatorGetParam) {
10,946,968✔
2868
    if (pOperator->status == OP_EXEC_DONE) {
×
2869
      pOperator->status = OP_OPENED;
×
2870
    }
2871
    pVtbScan->curTableIdx = 0;
×
2872
    pVtbScan->lastTableIdx = -1;
×
2873
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
2874
    pOperator->pOperatorGetParam = NULL;
×
2875
  } else {
2876
    pVtbScan->window.skey = INT64_MAX;
10,946,968✔
2877
    pVtbScan->window.ekey = INT64_MIN;
10,946,968✔
2878
  }
2879

2880
  if (pVtbScan->needRedeploy) {
10,946,968✔
2881
    code = virtualTableScanCheckNeedRedeploy(pOperator);
1,114,512✔
2882
    QUERY_CHECK_CODE(code, line, _return);
1,114,512✔
2883
  }
2884

2885
  code = pOperator->fpSet._openFn(pOperator);
9,834,810✔
2886
  QUERY_CHECK_CODE(code, line, _return);
9,832,456✔
2887

2888
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
9,832,456✔
2889
    setOperatorCompleted(pOperator);
40,428✔
2890
    return code;
40,428✔
2891
  }
2892

2893
  code = virtualTableScanGetNext(pOperator, pRes);
9,792,028✔
2894
  QUERY_CHECK_CODE(code, line, _return);
9,791,414✔
2895

2896
  return code;
9,791,414✔
2897

2898
_return:
1,112,158✔
2899
  if (code) {
1,112,158✔
2900
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,112,158✔
2901
    pOperator->pTaskInfo->code = code;
1,112,158✔
2902
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
1,112,158✔
2903
  }
2904
  return code;
×
2905
}
2906

2907
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1,144,720✔
2908
  if (batchFetch) {
1,144,720✔
2909
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,143,646✔
2910
    if (NULL == pPrev->leftHash) {
1,143,646✔
2911
      return terrno;
×
2912
    }
2913
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,143,646✔
2914
    if (NULL == pPrev->rightHash) {
1,143,646✔
2915
      return terrno;
×
2916
    }
2917
  } else {
2918
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,074✔
2919
    if (NULL == pPrev->leftCache) {
1,074✔
2920
      return terrno;
×
2921
    }
2922
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,074✔
2923
    if (NULL == pPrev->rightCache) {
1,074✔
2924
      return terrno;
×
2925
    }
2926
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
1,074✔
2927
    if (NULL == pPrev->onceTable) {
1,074✔
2928
      return terrno;
×
2929
    }
2930
  }
2931

2932
  return TSDB_CODE_SUCCESS;
1,144,720✔
2933
}
2934

2935
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2936
  if (pStreamRuntimeInfo == NULL) {
×
2937
    return;
×
2938
  }
2939

2940
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2941
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2942
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2943
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2944
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2945

2946
      pVtbScan->dynTbUid = pValue->uid;
×
2947
      break;
×
2948
    }
2949
  }
2950
}
2951

2952
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
1,517,744✔
2953
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2954
  int32_t      code = TSDB_CODE_SUCCESS;
1,517,744✔
2955
  int32_t      line = 0;
1,517,744✔
2956

2957
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
1,517,744✔
2958
  QUERY_CHECK_CODE(code, line, _return);
1,517,744✔
2959

2960
  pInfo->vtbScan.genNewParam = true;
1,517,744✔
2961
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
1,517,744✔
2962
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
1,517,744✔
2963
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
1,517,744✔
2964
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
1,517,744✔
2965
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
1,517,744✔
2966
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
1,517,744✔
2967
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
1,517,744✔
2968
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
1,517,744✔
2969
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
1,517,744✔
2970
  pInfo->vtbScan.needRedeploy = false;
1,517,744✔
2971
  pInfo->vtbScan.pMsgCb = pMsgCb;
1,517,744✔
2972
  pInfo->vtbScan.curTableIdx = 0;
1,517,744✔
2973
  pInfo->vtbScan.lastTableIdx = -1;
1,517,744✔
2974
  pInfo->vtbScan.dynTbUid = 0;
1,517,744✔
2975
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
1,517,744✔
2976
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
1,517,744✔
2977
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
1,517,744✔
2978
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
1,517,744✔
2979
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,517,744✔
2980
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
1,517,744✔
2981
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
3,950,447✔
2982
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
2,432,703✔
2983
    int32_t vgId = (int32_t)valueNode->datum.i;
2,432,703✔
2984
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
2,432,703✔
2985
    QUERY_CHECK_CODE(code, line, _return);
2,432,703✔
2986
  }
2987

2988
  if (pPhyciNode->dynTbname && pTaskInfo) {
1,517,744✔
2989
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2990
  }
2991

2992
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
1,517,744✔
2993
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
1,517,744✔
2994

2995
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
12,001,267✔
2996
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
10,483,523✔
2997
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
10,483,523✔
2998
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
20,967,046✔
2999
  }
3000

3001
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
1,517,744✔
3002
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
1,517,744✔
3003

3004
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,517,744✔
3005
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
1,517,744✔
3006

3007
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
1,517,744✔
3008
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
1,517,744✔
3009
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
1,517,744✔
3010
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
1,517,744✔
3011
  pInfo->vtbScan.vtbUidTagListMap = NULL;
1,517,744✔
3012
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
1,517,744✔
3013
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
1,517,744✔
3014

3015
  return code;
1,517,744✔
3016
_return:
×
3017
  // no need to destroy array and hashmap allocated in this function,
3018
  // since the operator's destroy function will take care of it
3019
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3020
  return code;
×
3021
}
3022

3023
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
374,740✔
3024
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
3025
  int32_t              code = TSDB_CODE_SUCCESS;
374,740✔
3026
  int32_t              line = 0;
374,740✔
3027
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
374,740✔
3028

3029
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
374,740✔
3030
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
374,740✔
3031
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
374,740✔
3032
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
374,740✔
3033
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
374,740✔
3034
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
374,740✔
3035

3036
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
374,740✔
3037
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
374,740✔
3038

3039
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
374,740✔
3040
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
374,740✔
3041

3042
  pInfo->vtbWindow.outputWstartSlotId = -1;
374,740✔
3043
  pInfo->vtbWindow.outputWendSlotId = -1;
374,740✔
3044
  pInfo->vtbWindow.outputWdurationSlotId = -1;
374,740✔
3045
  pInfo->vtbWindow.curWinBatchIdx = 0;
374,740✔
3046

3047
  initResultSizeInfo(&pOperator->resultInfo, 1);
374,740✔
3048
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
374,740✔
3049
  QUERY_CHECK_CODE(code, line, _return);
374,740✔
3050

3051
  return code;
374,740✔
3052
_return:
×
3053
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
3054
  return code;
×
3055
}
3056

3057
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
1,871,240✔
3058
  int32_t code = TSDB_CODE_SUCCESS;
1,871,240✔
3059
  int32_t lino = 0;
1,871,240✔
3060

3061
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
1,871,240✔
3062
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
1,871,240✔
3063
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
1,871,240✔
3064

3065
    *ppTsCols = (int64_t*)pColDataInfo->pData;
1,871,240✔
3066

3067
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
1,871,240✔
3068
      code = blockDataUpdateTsWindow(pBlock, slotId);
182,560✔
3069
      QUERY_CHECK_CODE(code, lino, _return);
182,560✔
3070
    }
3071
  }
3072

3073
  return code;
1,871,240✔
3074
_return:
×
3075
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3076
  return code;
×
3077
}
3078

3079
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
397,560✔
3080
  int32_t                    code = TSDB_CODE_SUCCESS;
397,560✔
3081
  int32_t                    lino = 0;
397,560✔
3082
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
397,560✔
3083
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
397,560✔
3084
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
397,560✔
3085
  int64_t                    st = 0;
397,560✔
3086

3087
  if (OPTR_IS_OPENED(pOperator)) {
397,560✔
3088
    return code;
22,820✔
3089
  }
3090

3091
  if (pOperator->cost.openCost == 0) {
374,740✔
3092
    st = taosGetTimestampUs();
374,740✔
3093
  }
3094

3095
  while (1) {
935,620✔
3096
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,310,360✔
3097
    if (pBlock == NULL) {
1,310,360✔
3098
      break;
374,740✔
3099
    }
3100

3101
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
935,620✔
3102
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
2,705,440✔
3103
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
2,330,700✔
3104
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
2,330,700✔
3105
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
612,380✔
3106
            pInfo->outputWstartSlotId = i;
228,500✔
3107
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
383,880✔
3108
            pInfo->outputWendSlotId = i;
228,500✔
3109
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
155,380✔
3110
            pInfo->outputWdurationSlotId = i;
155,380✔
3111
          }
3112
        }
3113
      }
3114
    }
3115

3116
    TSKEY* wstartCol = NULL;
935,620✔
3117
    TSKEY* wendCol = NULL;
935,620✔
3118

3119
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
935,620✔
3120
    QUERY_CHECK_CODE(code, lino, _return);
935,620✔
3121
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
935,620✔
3122
    QUERY_CHECK_CODE(code, lino, _return);
935,620✔
3123

3124
    SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
935,620✔
3125
    QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
935,620✔
3126

3127
    QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
935,620✔
3128

3129
    for (int32_t i = 0; i < pBlock->info.rows; i++) {
2,147,483,647✔
3130
      SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
2,147,483,647✔
3131
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
2,147,483,647✔
3132
      pWindow->tw.skey = wstartCol[i];
2,147,483,647✔
3133
      pWindow->tw.ekey = wendCol[i] + 1;
2,147,483,647✔
3134
      pWindow->winOutIdx = -1;
2,147,483,647✔
3135
    }
3136

3137
    QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
1,871,240✔
3138
  }
3139

3140
  // handle first window's start key and last window's end key
3141
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
374,740✔
3142
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
374,740✔
3143

3144
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
374,740✔
3145
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
374,740✔
3146

3147
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
374,740✔
3148
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
374,740✔
3149

3150
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
374,740✔
3151
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
374,740✔
3152

3153
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
374,740✔
3154
    lastWin->tw.ekey = INT64_MAX;
93,890✔
3155
  }
3156
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
374,740✔
3157
    firstWin->tw.skey = INT64_MIN;
140,425✔
3158
  }
3159

3160
  if (pInfo->isVstb) {
374,740✔
3161
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
93,070✔
3162
    QUERY_CHECK_CODE(code, lino, _return);
93,070✔
3163
  }
3164

3165
  OPTR_SET_OPENED(pOperator);
374,740✔
3166

3167
  if (pOperator->cost.openCost == 0) {
374,740✔
3168
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
374,740✔
3169
  }
3170

3171
_return:
×
3172
  if (code != TSDB_CODE_SUCCESS) {
374,740✔
3173
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3174
    pTaskInfo->code = code;
×
3175
    T_LONG_JMP(pTaskInfo->env, code);
×
3176
  }
3177
  return code;
374,740✔
3178
}
3179

3180
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
99,880✔
3181
  int32_t                       code = TSDB_CODE_SUCCESS;
99,880✔
3182
  int32_t                       lino = 0;
99,880✔
3183
  SExternalWindowOperatorParam* pExtWinOp = NULL;
99,880✔
3184

3185
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
99,880✔
3186
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
99,880✔
3187

3188
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
99,880✔
3189
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
99,880✔
3190

3191
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
99,880✔
3192
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
99,880✔
3193

3194
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
99,880✔
3195
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGetLast(pWins);
99,880✔
3196

3197
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
99,880✔
3198
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
99,880✔
3199

3200
  SOperatorParam* pExchangeParam = NULL;
99,880✔
3201
  code = buildBatchExchangeOperatorParamForVirtual(&pExchangeParam, 0, NULL, 0, pInfo->vtbScan.otbVgIdToOtbInfoArrayMap, (STimeWindow){.skey = firstWin->tw.skey, .ekey = lastWin->tw.ekey}, EX_SRC_TYPE_VSTB_WIN_SCAN);
99,880✔
3202
  QUERY_CHECK_CODE(code, lino, _return);
99,880✔
3203

3204
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeParam), code, lino, _return, terrno)
199,760✔
3205

3206
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
99,880✔
3207
  (*ppRes)->downstreamIdx = idx;
99,880✔
3208
  (*ppRes)->value = pExtWinOp;
99,880✔
3209
  (*ppRes)->reUse = false;
99,880✔
3210

3211
  return code;
99,880✔
3212
_return:
×
3213
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3214
  if (pExtWinOp) {
×
3215
    if (pExtWinOp->ExtWins) {
×
3216
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3217
    }
3218
    taosMemoryFree(pExtWinOp);
×
3219
  }
3220
  if (*ppRes) {
×
3221
    if ((*ppRes)->pChildren) {
×
3222
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
3223
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
3224
        if (pChildParam) {
×
3225
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
3226
          if (pDynParam) {
×
3227
            taosMemoryFree(pDynParam);
×
3228
          }
3229
          taosMemoryFree(pChildParam);
×
3230
        }
3231
      }
3232
      taosArrayDestroy((*ppRes)->pChildren);
×
3233
    }
3234
    taosMemoryFree(*ppRes);
×
3235
    *ppRes = NULL;
×
3236
  }
3237
  return code;
×
3238
}
3239

3240
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
397,560✔
3241
  int32_t                    code = TSDB_CODE_SUCCESS;
397,560✔
3242
  int32_t                    lino = 0;
397,560✔
3243
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
397,560✔
3244
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
397,560✔
3245
  int64_t                    st = taosGetTimestampUs();
397,560✔
3246
  int32_t                    numOfWins = 0;
397,560✔
3247
  SOperatorInfo*             mergeOp = NULL;
397,560✔
3248
  SOperatorInfo*             extWinOp = NULL;
397,560✔
3249
  SOperatorParam*            pMergeParam = NULL;
397,560✔
3250
  SOperatorParam*            pExtWinParam = NULL;
397,560✔
3251
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
397,560✔
3252
  SSDataBlock*               pRes = pInfo->pRes;
397,560✔
3253

3254
  code = pOperator->fpSet._openFn(pOperator);
397,560✔
3255
  QUERY_CHECK_CODE(code, lino, _return);
397,560✔
3256

3257
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
397,560✔
3258
    *ppRes = NULL;
9,140✔
3259
    return code;
9,140✔
3260
  }
3261

3262
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
388,420✔
3263
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
388,420✔
3264

3265
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
388,420✔
3266

3267
  if (pInfo->isVstb) {
388,420✔
3268
    extWinOp = pOperator->pDownstream[2];
99,880✔
3269
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
99,880✔
3270
    QUERY_CHECK_CODE(code, lino, _return);
99,880✔
3271

3272
    SSDataBlock* pExtWinBlock = NULL;
99,880✔
3273
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
99,880✔
3274
    QUERY_CHECK_CODE(code, lino, _return);
99,880✔
3275
    setOperatorCompleted(extWinOp);
99,880✔
3276

3277
    blockDataCleanup(pRes);
99,880✔
3278
    code = blockDataEnsureCapacity(pRes, numOfWins);
99,880✔
3279
    QUERY_CHECK_CODE(code, lino, _return);
99,880✔
3280

3281
    if (pExtWinBlock) {
99,880✔
3282
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
99,880✔
3283
      QUERY_CHECK_CODE(code, lino, _return);
99,880✔
3284

3285
      if (pInfo->curWinBatchIdx == 0) {
99,880✔
3286
        // first batch, get _wstart from pMergedBlock
3287
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
93,070✔
3288
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
93,070✔
3289

3290
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
93,070✔
3291
      }
3292
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
99,880✔
3293
        // last batch, get _wend from pMergedBlock
3294
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
2,270✔
3295
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
2,270✔
3296

3297
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
2,270✔
3298
      }
3299
    }
3300
  } else {
3301
    mergeOp = pOperator->pDownstream[1];
288,540✔
3302
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
288,540✔
3303
    QUERY_CHECK_CODE(code, lino, _return);
288,540✔
3304

3305
    SSDataBlock* pMergedBlock = NULL;
288,540✔
3306
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
288,540✔
3307
    QUERY_CHECK_CODE(code, lino, _return);
288,540✔
3308

3309
    blockDataCleanup(pRes);
288,540✔
3310
    code = blockDataEnsureCapacity(pRes, numOfWins);
288,540✔
3311
    QUERY_CHECK_CODE(code, lino, _return);
288,540✔
3312

3313
    if (pMergedBlock) {
288,540✔
3314
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
288,540✔
3315
      QUERY_CHECK_CODE(code, lino, _return);
288,540✔
3316

3317
      if (pInfo->curWinBatchIdx == 0) {
288,540✔
3318
        // first batch, get _wstart from pMergedBlock
3319
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
281,670✔
3320
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
281,670✔
3321

3322
        firstWin->tw.skey = pMergedBlock->info.window.skey;
281,670✔
3323
      }
3324
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
288,540✔
3325
        // last batch, get _wend from pMergedBlock
3326
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
6,870✔
3327
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
6,870✔
3328

3329
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
6,870✔
3330
      }
3331
    }
3332
  }
3333

3334

3335
  if (pInfo->outputWstartSlotId != -1) {
388,420✔
3336
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
242,180✔
3337
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
242,180✔
3338

3339
    for (int32_t i = 0; i < numOfWins; i++) {
731,963,670✔
3340
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
731,721,490✔
3341
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
731,721,490✔
3342
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
731,721,490✔
3343
      QUERY_CHECK_CODE(code, lino, _return);
731,721,490✔
3344
    }
3345
  }
3346
  if (pInfo->outputWendSlotId != -1) {
388,420✔
3347
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
242,180✔
3348
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
242,180✔
3349

3350
    for (int32_t i = 0; i < numOfWins; i++) {
731,963,670✔
3351
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
731,721,490✔
3352
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
731,721,490✔
3353
      TSKEY ekey = pWindow->tw.ekey - 1;
731,721,490✔
3354
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
731,721,490✔
3355
      QUERY_CHECK_CODE(code, lino, _return);
731,721,490✔
3356
    }
3357
  }
3358
  if (pInfo->outputWdurationSlotId != -1) {
388,420✔
3359
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
169,060✔
3360
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
169,060✔
3361

3362
    for (int32_t i = 0; i < numOfWins; i++) {
507,265,910✔
3363
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
507,096,850✔
3364
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
507,096,850✔
3365
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
507,096,850✔
3366
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
507,096,850✔
3367
      QUERY_CHECK_CODE(code, lino, _return);
507,096,850✔
3368
    }
3369
  }
3370

3371
  pRes->info.rows = numOfWins;
388,420✔
3372
  *ppRes = pRes;
388,420✔
3373
  pInfo->curWinBatchIdx++;
388,420✔
3374

3375
  return code;
388,420✔
3376

3377
_return:
×
3378
  if (code != TSDB_CODE_SUCCESS) {
×
3379
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
3380
    pTaskInfo->code = code;
×
3381
    T_LONG_JMP(pTaskInfo->env, code);
×
3382
  }
3383
  return code;
×
3384
}
3385

3386
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
1,422,300✔
3387
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
1,422,300✔
3388
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
1,422,728✔
3389
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
1,422,514✔
3390

3391
  pOper->status = OP_NOT_OPENED;
1,422,728✔
3392

3393
  switch (pDyn->qType) {
1,422,728✔
3394
    case DYN_QTYPE_STB_HASH:{
735✔
3395
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
735✔
3396
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
735✔
3397
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
735✔
3398
      
3399
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
735✔
3400
      if (TSDB_CODE_SUCCESS != code) {
735✔
3401
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
3402
        return code;
×
3403
      }
3404
      pStbJoin->ctx.prev.pListHead = NULL;
735✔
3405
      pStbJoin->ctx.prev.joinBuild = false;
735✔
3406
      pStbJoin->ctx.prev.pListTail = NULL;
735✔
3407
      pStbJoin->ctx.prev.tableNum = 0;
735✔
3408

3409
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
735✔
3410
      break; 
735✔
3411
    }
3412
    case DYN_QTYPE_VTB_SCAN: {
1,421,565✔
3413
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
1,421,565✔
3414
      
3415
      if (pVtbScan->otbNameToOtbInfoMap) {
1,421,779✔
3416
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
3417
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
3418
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3419
      }
3420
      if (pVtbScan->pRsp) {
1,421,993✔
3421
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
3422
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3423
      }
3424
      if (pVtbScan->colRefInfo) {
1,421,993✔
3425
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
171,097✔
3426
        pVtbScan->colRefInfo = NULL;
171,097✔
3427
      }
3428
      if (pVtbScan->childTableMap) {
1,421,993✔
3429
        taosHashCleanup(pVtbScan->childTableMap);
5,580✔
3430
        pVtbScan->childTableMap = NULL;
5,580✔
3431
      }
3432
      if (pVtbScan->childTableList) {
1,421,779✔
3433
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
1,421,565✔
3434
      }
3435
      if (pPhyciNode->dynTbname && pTaskInfo) {
1,421,779✔
3436
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3437
      }
3438
      pVtbScan->curTableIdx = 0;
1,421,779✔
3439
      pVtbScan->lastTableIdx = -1;
1,421,779✔
3440
      break;
1,421,993✔
3441
    }
3442
    case DYN_QTYPE_VTB_WINDOW: {
×
3443
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
3444
      if (pVtbWindow->pRes) {
×
3445
        blockDataDestroy(pVtbWindow->pRes);
×
3446
        pVtbWindow->pRes = NULL;
×
3447
      }
3448
      if (pVtbWindow->pWins) {
×
3449
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
3450
        pVtbWindow->pWins = NULL;
×
3451
      }
3452
      pVtbWindow->outputWdurationSlotId = -1;
×
3453
      pVtbWindow->outputWendSlotId = -1;
×
3454
      pVtbWindow->outputWstartSlotId = -1;
×
3455
      pVtbWindow->curWinBatchIdx = 0;
×
3456
      break;
×
3457
    }
3458
    default:
×
3459
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
3460
      break;
×
3461
  }
3462
  return 0;
1,422,514✔
3463
}
3464

3465
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
62,319✔
3466
  int32_t                    code = TSDB_CODE_SUCCESS;
62,319✔
3467
  int32_t                    line = 0;
62,319✔
3468
  int64_t                    st = 0;
62,319✔
3469
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
62,319✔
3470

3471
  if (OPTR_IS_OPENED(pOperator)) {
62,319✔
3472
    return code;
40,812✔
3473
  }
3474

3475
  if (pOperator->cost.openCost == 0) {
21,507✔
3476
    st = taosGetTimestampUs();
21,507✔
3477
  }
3478

3479
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
21,507✔
3480
  QUERY_CHECK_CODE(code, line, _return);
21,507✔
3481
  OPTR_SET_OPENED(pOperator);
21,507✔
3482

3483
_return:
21,507✔
3484
  if (pOperator->cost.openCost == 0) {
21,507✔
3485
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
21,507✔
3486
  }
3487
  if (code) {
21,507✔
3488
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3489
    pOperator->pTaskInfo->code = code;
×
3490
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3491
  }
3492
  return code;
21,507✔
3493
}
3494

3495
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
62,319✔
3496
  int32_t                    code = TSDB_CODE_SUCCESS;
62,319✔
3497
  int32_t                    line = 0;
62,319✔
3498
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
62,319✔
3499
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
62,319✔
3500
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
62,319✔
3501
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
62,319✔
3502
  SOperatorParam*            pAggParam = NULL;
62,319✔
3503

3504
  if (pInfo->vtbScan.hasPartition) {
62,319✔
3505
    if (pInfo->vtbScan.batchProcessChild) {
18,288✔
3506
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
18,288✔
3507
      while (pIter) {
27,432✔
3508
        size_t     keyLen = 0;
22,860✔
3509
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
22,860✔
3510

3511
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
22,860✔
3512
        QUERY_CHECK_CODE(code, line, _return);
22,860✔
3513

3514
        if (pAggParam) {
22,860✔
3515
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
22,860✔
3516
          QUERY_CHECK_CODE(code, line, _return);
22,860✔
3517
        } else {
UNCOV
3518
          *pRes = NULL;
×
3519
        }
3520

3521
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
22,860✔
3522

3523
        if (*pRes) {
22,860✔
3524
          (*pRes)->info.id.groupId = groupid;
13,716✔
3525
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
13,716✔
3526
          QUERY_CHECK_CODE(code, line, _return);
13,716✔
3527
          break;
13,716✔
3528
        }
3529
      }
3530
    } else {
UNCOV
3531
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
×
UNCOV
3532
      while (pIter) {
×
UNCOV
3533
        size_t     keyLen = 0;
×
UNCOV
3534
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
×
UNCOV
3535
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
×
3536

UNCOV
3537
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
×
UNCOV
3538
        while (pIter2) {
×
UNCOV
3539
          size_t   keyLen2 = 0;
×
UNCOV
3540
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
×
UNCOV
3541
          SArray*  pTagList = *(SArray**)pIter2;
×
3542

UNCOV
3543
          if (pVtbScan->genNewParam) {
×
UNCOV
3544
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
×
UNCOV
3545
            QUERY_CHECK_CODE(code, line, _return);
×
UNCOV
3546
            if (pAggParam) {
×
UNCOV
3547
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
×
UNCOV
3548
              QUERY_CHECK_CODE(code, line, _return);
×
3549
            } else {
UNCOV
3550
              *pRes = NULL;
×
3551
            }
3552
          } else {
UNCOV
3553
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
×
UNCOV
3554
            QUERY_CHECK_CODE(code, line, _return);
×
3555
          }
3556

UNCOV
3557
          if (*pRes) {
×
UNCOV
3558
            pVtbScan->genNewParam = false;
×
UNCOV
3559
            (*pRes)->info.id.groupId = *groupid;
×
UNCOV
3560
            break;
×
3561
          }
UNCOV
3562
          pVtbScan->genNewParam = true;
×
UNCOV
3563
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
×
UNCOV
3564
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
×
UNCOV
3565
          QUERY_CHECK_CODE(code, line, _return);
×
3566
        }
UNCOV
3567
        if (*pRes) {
×
UNCOV
3568
          break;
×
3569
        }
UNCOV
3570
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
×
UNCOV
3571
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
×
UNCOV
3572
        QUERY_CHECK_CODE(code, line, _return);
×
3573
      }
3574
    }
3575

3576
  } else {
3577
    if (pInfo->vtbScan.batchProcessChild) {
44,031✔
3578
      code = buildAggOperatorParam(pInfo, &pAggParam);
12,419✔
3579
      QUERY_CHECK_CODE(code, line, _return);
12,419✔
3580

3581
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
12,419✔
3582
      QUERY_CHECK_CODE(code, line, _return);
12,419✔
3583
      setOperatorCompleted(pOperator);
12,419✔
3584
    } else {
3585
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
31,612✔
3586
      while (pIter) {
49,676✔
3587
        size_t   keyLen = 0;
45,160✔
3588
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
45,160✔
3589
        SArray*  pTagList = *(SArray**)pIter;
45,160✔
3590

3591
        if (pVtbScan->genNewParam) {
45,160✔
3592
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
18,064✔
3593
          QUERY_CHECK_CODE(code, line, _return);
18,064✔
3594

3595
          if (pAggParam) {
18,064✔
3596
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
18,064✔
3597
            QUERY_CHECK_CODE(code, line, _return);
18,064✔
3598
          } else {
UNCOV
3599
            *pRes = NULL;
×
3600
          }
3601
        } else {
3602
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
27,096✔
3603
          QUERY_CHECK_CODE(code, line, _return);
27,096✔
3604
        }
3605

3606
        if (*pRes) {
45,160✔
3607
          pVtbScan->genNewParam = false;
27,096✔
3608
          break;
27,096✔
3609
        }
3610
        pVtbScan->genNewParam = true;
18,064✔
3611
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
18,064✔
3612
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
18,064✔
3613
        QUERY_CHECK_CODE(code, line, _return);
18,064✔
3614
      }
3615
    }
3616
  }
3617
_return:
62,319✔
3618
  if (code) {
62,319✔
3619
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3620
  }
3621
  return code;
62,319✔
3622
}
3623

3624
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
74,738✔
3625
  int32_t                    code = TSDB_CODE_SUCCESS;
74,738✔
3626
  int32_t                    line = 0;
74,738✔
3627
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
74,738✔
3628
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
74,738✔
3629

3630
  QRY_PARAM_CHECK(pRes);
74,738✔
3631
  if (pOperator->status == OP_EXEC_DONE) {
74,738✔
3632
    return code;
12,419✔
3633
  }
3634

3635
  code = pOperator->fpSet._openFn(pOperator);
62,319✔
3636
  QUERY_CHECK_CODE(code, line, _return);
62,319✔
3637

3638
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
62,319✔
3639
    setOperatorCompleted(pOperator);
×
3640
    return code;
×
3641
  }
3642

3643
  code = virtualTableAggGetNext(pOperator, pRes);
62,319✔
3644
  QUERY_CHECK_CODE(code, line, _return);
62,319✔
3645

3646
  return code;
62,319✔
3647

3648
_return:
×
3649
  if (code) {
×
3650
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3651
    pOperator->pTaskInfo->code = code;
×
3652
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3653
  }
3654
  return code;
×
3655
}
3656

3657
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
2,661,729✔
3658
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3659
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3660
  QRY_PARAM_CHECK(pOptrInfo);
2,661,729✔
3661

3662
  int32_t                    code = TSDB_CODE_SUCCESS;
2,661,729✔
3663
  int32_t                    line = 0;
2,661,729✔
3664
  __optr_fn_t                nextFp = NULL;
2,661,729✔
3665
  __optr_open_fn_t           openFp = NULL;
2,661,729✔
3666
  SOperatorInfo*             pOperator = NULL;
2,661,729✔
3667
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
2,661,729✔
3668
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
2,661,729✔
3669

3670
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,661,729✔
3671
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
2,661,729✔
3672

3673
  pOperator->pPhyNode = pPhyciNode;
2,661,729✔
3674
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
2,661,729✔
3675

3676
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
2,661,729✔
3677
  QUERY_CHECK_CODE(code, line, _error);
2,661,729✔
3678

3679
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
2,661,729✔
3680
                  pInfo, pTaskInfo);
3681

3682
  pInfo->qType = pPhyciNode->qType;
2,661,729✔
3683
  switch (pInfo->qType) {
2,661,729✔
3684
    case DYN_QTYPE_STB_HASH:
1,143,985✔
3685
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1,143,985✔
3686
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1,143,985✔
3687
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1,143,985✔
3688
      QUERY_CHECK_CODE(code, line, _error);
1,143,985✔
3689
      nextFp = seqStableJoin;
1,143,985✔
3690
      openFp = optrDummyOpenFn;
1,143,985✔
3691
      break;
1,143,985✔
3692
    case DYN_QTYPE_VTB_SCAN:
1,121,497✔
3693
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,121,497✔
3694
      QUERY_CHECK_CODE(code, line, _error);
1,121,497✔
3695
      nextFp = vtbScanNext;
1,121,497✔
3696
      openFp = vtbScanOpen;
1,121,497✔
3697
      break;
1,121,497✔
3698
    case DYN_QTYPE_VTB_WINDOW:
374,740✔
3699
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
374,740✔
3700
      QUERY_CHECK_CODE(code, line, _error);
374,740✔
3701
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
374,740✔
3702
      QUERY_CHECK_CODE(code, line, _error);
374,740✔
3703
      nextFp = vtbWindowNext;
374,740✔
3704
      openFp = vtbWindowOpen;
374,740✔
3705
      break;
374,740✔
3706
    case DYN_QTYPE_VTB_AGG:
21,507✔
3707
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
21,507✔
3708
      QUERY_CHECK_CODE(code, line, _error);
21,507✔
3709
      nextFp = vtbAggNext;
21,507✔
3710
      openFp = vtbAggOpen;
21,507✔
3711
      break;
21,507✔
3712
    default:
×
3713
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
3714
      code = TSDB_CODE_INVALID_PARA;
×
3715
      goto _error;
×
3716
  }
3717

3718
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
2,661,729✔
3719
                                         NULL, optrDefaultGetNextExtFn, NULL);
3720

3721
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
2,661,729✔
3722
  *pOptrInfo = pOperator;
2,661,729✔
3723
  return TSDB_CODE_SUCCESS;
2,661,729✔
3724

3725
_error:
×
3726
  if (pInfo != NULL) {
×
3727
    destroyDynQueryCtrlOperator(pInfo);
×
3728
  }
3729
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
3730
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
3731
  pTaskInfo->code = code;
×
3732
  return code;
×
3733
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc