• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4895

23 Dec 2025 01:08PM UTC coverage: 65.513% (-0.2%) from 65.72%
#4895

push

travis-ci

web-flow
fix: mem leak (#34023)

6 of 9 new or added lines in 1 file covered. (66.67%)

7770 existing lines in 123 files now uncovered.

184705 of 281937 relevant lines covered (65.51%)

112009834.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.27
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "tdataformat.h"
33
#include "dynqueryctrl.h"
34

35
int64_t gSessionId = 0;
36

37
void freeVgTableList(void* ptr) { 
×
38
  taosArrayDestroy(*(SArray**)ptr); 
×
UNCOV
39
}
×
40

41
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
518,519✔
42
  SStbJoinTableList* pNext = NULL;
518,519✔
43
  
44
  while (pListHead) {
519,107✔
45
    taosMemoryFree(pListHead->pLeftVg);
588✔
46
    taosMemoryFree(pListHead->pLeftUid);
588✔
47
    taosMemoryFree(pListHead->pRightVg);
588✔
48
    taosMemoryFree(pListHead->pRightUid);
588✔
49
    pNext = pListHead->pNext;
588✔
50
    taosMemoryFree(pListHead);
588✔
51
    pListHead = pNext;
588✔
52
  }
53
}
518,519✔
54

55
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
518,519✔
56
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
518,519✔
57
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
58
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
59

60
  if (pStbJoin->basic.batchFetch) {
518,519✔
61
    if (pStbJoin->ctx.prev.leftHash) {
517,847✔
62
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
433,118✔
63
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
433,118✔
64
    }
65
    if (pStbJoin->ctx.prev.rightHash) {
517,847✔
66
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
433,118✔
67
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
433,118✔
68
    }
69
  } else {
70
    if (pStbJoin->ctx.prev.leftCache) {
672✔
71
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
672✔
72
    }
73
    if (pStbJoin->ctx.prev.rightCache) {
672✔
74
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
672✔
75
    }
76
    if (pStbJoin->ctx.prev.onceTable) {
672✔
77
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
672✔
78
    }
79
  }
80

81
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
518,519✔
82
}
518,519✔
83

84
void destroyColRefInfo(void *info) {
33,736,428✔
85
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
33,736,428✔
86
  if (pColRefInfo) {
33,736,428✔
87
    taosMemoryFree(pColRefInfo->colName);
33,736,428✔
88
    taosMemoryFree(pColRefInfo->colrefName);
33,736,428✔
89
  }
90
}
33,736,428✔
91

92
void destroyColRefArray(void *info) {
411,724✔
93
  SArray *pColRefArray = *(SArray **)info;
411,724✔
94
  if (pColRefArray) {
411,724✔
95
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
411,724✔
96
  }
97
}
411,724✔
98

99
void freeUseDbOutput(void* pOutput) {
210,748✔
100
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
210,748✔
101
  if (NULL == pOutput) {
210,748✔
UNCOV
102
    return;
×
103
  }
104

105
  if (pOut->dbVgroup) {
210,748✔
106
    freeVgInfo(pOut->dbVgroup);
210,748✔
107
  }
108
  taosMemFree(pOut);
210,748✔
109
}
110

UNCOV
111
void destroyOtbInfoArray(void *info) {
×
UNCOV
112
  SArray *pOtbInfoArray = *(SArray **)info;
×
UNCOV
113
  if (pOtbInfoArray) {
×
UNCOV
114
    taosArrayDestroyEx(pOtbInfoArray, destroySOrgTbInfo);
×
115
  }
UNCOV
116
}
×
117

UNCOV
118
void destroyOtbVgIdToOtbInfoArrayMap(void *info) {
×
UNCOV
119
  SHashObj* pOtbVgIdToOtbInfoArrayMap = *(SHashObj **)info;
×
UNCOV
120
  if (pOtbVgIdToOtbInfoArrayMap) {
×
UNCOV
121
    taosHashSetFreeFp(pOtbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
×
UNCOV
122
    taosHashCleanup(pOtbVgIdToOtbInfoArrayMap);
×
123
  }
UNCOV
124
}
×
125

UNCOV
126
void destroyTagList(void *info) {
×
UNCOV
127
  SArray *pTagList = *(SArray **)info;
×
128
  if (pTagList) {
×
129
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
130
  }
UNCOV
131
}
×
132

UNCOV
133
void destroyVtbUidTagListMap(void *info) {
×
UNCOV
134
  SHashObj* pVtbUidTagListMap = *(SHashObj **)info;
×
UNCOV
135
  if (pVtbUidTagListMap) {
×
UNCOV
136
    taosHashSetFreeFp(pVtbUidTagListMap, destroyTagList);
×
UNCOV
137
    taosHashCleanup(pVtbUidTagListMap);
×
138
  }
UNCOV
139
}
×
140

141
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
237,436✔
142
  if (pVtbScan->dbName) {
237,436✔
143
    taosMemoryFreeClear(pVtbScan->dbName);
237,436✔
144
  }
145
  if (pVtbScan->tbName) {
237,436✔
146
    taosMemoryFreeClear(pVtbScan->tbName);
237,436✔
147
  }
148
  if (pVtbScan->childTableList) {
237,436✔
149
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
237,436✔
150
  }
151
  if (pVtbScan->colRefInfo) {
237,436✔
UNCOV
152
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
UNCOV
153
    pVtbScan->colRefInfo = NULL;
×
154
  }
155
  if (pVtbScan->childTableMap) {
237,436✔
156
    taosHashCleanup(pVtbScan->childTableMap);
195,619✔
157
  }
158
  if (pVtbScan->readColList) {
237,436✔
159
    taosArrayDestroy(pVtbScan->readColList);
237,436✔
160
  }
161
  if (pVtbScan->dbVgInfoMap) {
237,436✔
162
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
237,436✔
163
    taosHashCleanup(pVtbScan->dbVgInfoMap);
237,436✔
164
  }
165
  if (pVtbScan->otbNameToOtbInfoMap) {
237,436✔
166
    taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
692✔
167
    taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
692✔
168
  }
169
  if (pVtbScan->pRsp) {
237,436✔
UNCOV
170
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
171
    taosMemoryFreeClear(pVtbScan->pRsp);
×
172
  }
173
  if (pVtbScan->existOrgTbVg) {
237,436✔
174
    taosHashCleanup(pVtbScan->existOrgTbVg);
237,436✔
175
  }
176
  if (pVtbScan->curOrgTbVg) {
237,436✔
177
    taosHashCleanup(pVtbScan->curOrgTbVg);
3,268✔
178
  }
179
  if (pVtbScan->newAddedVgInfo) {
237,436✔
180
    taosHashCleanup(pVtbScan->newAddedVgInfo);
1,632✔
181
  }
182
  if (pVtbScan->otbVgIdToOtbInfoArrayMap) {
237,436✔
UNCOV
183
    taosHashSetFreeFp(pVtbScan->otbVgIdToOtbInfoArrayMap, destroyOtbInfoArray);
×
UNCOV
184
    taosHashCleanup(pVtbScan->otbVgIdToOtbInfoArrayMap);
×
185
  }
186
  if (pVtbScan->vtbUidToVgIdMapMap) {
237,436✔
187
    taosHashSetFreeFp(pVtbScan->vtbUidToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
×
188
    taosHashCleanup(pVtbScan->vtbUidToVgIdMapMap);
×
189
  }
190
  if (pVtbScan->vtbGroupIdToVgIdMapMap) {
237,436✔
191
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
×
UNCOV
192
    taosHashCleanup(pVtbScan->vtbGroupIdToVgIdMapMap);
×
193
  }
194
  if (pVtbScan->vtbUidTagListMap) {
237,436✔
UNCOV
195
    taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
×
UNCOV
196
    taosHashCleanup(pVtbScan->vtbUidTagListMap);
×
197
  }
198
  if (pVtbScan->vtbGroupIdTagListMap) {
237,436✔
UNCOV
199
    taosHashCleanup(pVtbScan->vtbGroupIdTagListMap);
×
200
  }
201
  if (pVtbScan->vtbUidToGroupIdMap) {
237,436✔
UNCOV
202
    taosHashCleanup(pVtbScan->vtbUidToGroupIdMap);
×
203
  }
204
}
237,436✔
205

UNCOV
206
void destroyWinArray(void *info) {
×
UNCOV
207
  SArray *pWinArray = *(SArray **)info;
×
UNCOV
208
  if (pWinArray) {
×
UNCOV
209
    taosArrayDestroy(pWinArray);
×
210
  }
UNCOV
211
}
×
212

UNCOV
213
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
×
UNCOV
214
  if (pVtbWindow->pRes) {
×
UNCOV
215
    blockDataDestroy(pVtbWindow->pRes);
×
216
  }
UNCOV
217
  if (pVtbWindow->pWins) {
×
UNCOV
218
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
219
  }
UNCOV
220
}
×
221

222
static void destroyDynQueryCtrlOperator(void* param) {
754,629✔
223
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
754,629✔
224

225
  switch (pDyn->qType) {
754,629✔
226
    case DYN_QTYPE_STB_HASH:
517,193✔
227
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
517,193✔
228
      break;
517,193✔
229
    case DYN_QTYPE_VTB_AGG:
237,436✔
230
    case DYN_QTYPE_VTB_SCAN:
231
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
237,436✔
232
      break;
237,436✔
UNCOV
233
    case DYN_QTYPE_VTB_WINDOW:
×
UNCOV
234
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
×
UNCOV
235
      break;
×
UNCOV
236
    default:
×
UNCOV
237
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
UNCOV
238
      break;
×
239
  }
240

241
  taosMemoryFreeClear(param);
754,629✔
242
}
754,629✔
243

244
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
245
  if (batchFetch) {
3,187,268✔
246
    return true;
3,184,580✔
247
  }
248
  
249
  if (rightTable) {
2,688✔
250
    return pPost->rightCurrUid == pPost->rightNextUid;
1,344✔
251
  }
252

253
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
1,344✔
254

255
  return (NULL == num) ? false : true;
1,344✔
256
}
257

258
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
1,593,634✔
259
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,593,634✔
260
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
1,593,634✔
261
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,593,634✔
262
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
1,593,634✔
263
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
1,593,634✔
264
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
1,593,634✔
265
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
1,593,634✔
266
  int64_t                    readIdx = pNode->readIdx + 1;
1,593,634✔
267
  int64_t                    rightPrevUid = pPost->rightCurrUid;
1,593,634✔
268

269
  pPost->leftCurrUid = *leftUid;
1,593,634✔
270
  pPost->rightCurrUid = *rightUid;
1,593,634✔
271

272
  pPost->leftVgId = *leftVgId;
1,593,634✔
273
  pPost->rightVgId = *rightVgId;
1,593,634✔
274

275
  while (true) {
276
    if (readIdx < pNode->uidNum) {
1,593,634✔
277
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
1,508,821✔
278
      break;
1,508,821✔
279
    }
280
    
281
    pNode = pNode->pNext;
84,813✔
282
    if (NULL == pNode) {
84,813✔
283
      pPost->rightNextUid = 0;
84,813✔
284
      break;
84,813✔
285
    }
286
    
287
    rightUid = pNode->pRightUid;
×
288
    readIdx = 0;
×
289
  }
290

291
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
3,187,268✔
292
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
3,187,268✔
293

294
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
1,593,634✔
UNCOV
295
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
UNCOV
296
    pStbJoin->execInfo.rightCacheNum++;
×
297
  }  
298

299
  return TSDB_CODE_SUCCESS;
1,593,634✔
300
}
301

302

303
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
3,187,268✔
304
  int32_t code = TSDB_CODE_SUCCESS;
3,187,268✔
305
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,187,268✔
306
  if (NULL == *ppRes) {
3,187,268✔
307
    code = terrno;
×
308
    freeOperatorParam(pChild, OP_GET_PARAM);
×
309
    return code;
×
310
  }
311
  if (pChild) {
3,187,268✔
312
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
172,146✔
313
    if (NULL == (*ppRes)->pChildren) {
172,146✔
UNCOV
314
      code = terrno;
×
315
      freeOperatorParam(pChild, OP_GET_PARAM);
×
316
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
317
      *ppRes = NULL;
×
318
      return code;
×
319
    }
320
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
344,292✔
UNCOV
321
      code = terrno;
×
322
      freeOperatorParam(pChild, OP_GET_PARAM);
×
323
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
324
      *ppRes = NULL;
×
UNCOV
325
      return code;
×
326
    }
327
  } else {
328
    (*ppRes)->pChildren = NULL;
3,015,122✔
329
  }
330

331
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
3,187,268✔
332
  if (NULL == pGc) {
3,187,268✔
UNCOV
333
    code = terrno;
×
UNCOV
334
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
335
    *ppRes = NULL;
×
336
    return code;
×
337
  }
338

339
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
3,187,268✔
340
  pGc->downstreamIdx = downstreamIdx;
3,187,268✔
341
  pGc->vgId = vgId;
3,187,268✔
342
  pGc->tbUid = tbUid;
3,187,268✔
343
  pGc->needCache = needCache;
3,187,268✔
344

345
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
3,187,268✔
346
  (*ppRes)->downstreamIdx = downstreamIdx;
3,187,268✔
347
  (*ppRes)->value = pGc;
3,187,268✔
348
  (*ppRes)->reUse = false;
3,187,268✔
349

350
  return TSDB_CODE_SUCCESS;
3,187,268✔
351
}
352

353

354
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
355
  int32_t code = TSDB_CODE_SUCCESS;
×
356
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
357
  if (NULL == *ppRes) {
×
358
    return terrno;
×
359
  }
360
  (*ppRes)->pChildren = NULL;
×
361

UNCOV
362
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
363
  if (NULL == pGc) {
×
364
    code = terrno;
×
365
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
366
    return code;
×
367
  }
368

369
  pGc->downstreamIdx = downstreamIdx;
×
370
  pGc->vgId = vgId;
×
371
  pGc->tbUid = tbUid;
×
372

373
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
UNCOV
374
  (*ppRes)->downstreamIdx = downstreamIdx;
×
375
  (*ppRes)->value = pGc;
×
UNCOV
376
  (*ppRes)->reUse = false;
×
377

378
  return TSDB_CODE_SUCCESS;
×
379
}
380

381
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid);
UNCOV
382
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
UNCOV
383
  int32_t                   code = TSDB_CODE_SUCCESS;
×
384
  int32_t                   lino = 0;
×
UNCOV
385
  SExchangeOperatorParam*   pExc = NULL;
×
386

UNCOV
387
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
388
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
389

390
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
391
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
392

UNCOV
393
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
UNCOV
394
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
×
395

396
  pExc->multiParams = false;
×
UNCOV
397
  pExc->basic.vgId = 0;
×
UNCOV
398
  pExc->basic.tableSeq = true;
×
UNCOV
399
  pExc->basic.type = EX_SRC_TYPE_VSTB_WIN_SCAN;
×
UNCOV
400
  pExc->basic.isNewParam = true;
×
UNCOV
401
  pExc->basic.window.skey = skey;
×
UNCOV
402
  pExc->basic.window.ekey = ekey;
×
UNCOV
403
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
UNCOV
404
  pExc->basic.orgTbInfo = NULL;
×
UNCOV
405
  pExc->basic.tagList = NULL;
×
UNCOV
406
  pExc->basic.batchOrgTbInfo = NULL;
×
UNCOV
407
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
UNCOV
408
  QUERY_CHECK_NULL(pExc->basic.uidList, code, lino, _return, terrno)
×
409

410
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
UNCOV
411
  (*ppRes)->downstreamIdx = 0;
×
UNCOV
412
  (*ppRes)->reUse = true;
×
413
  (*ppRes)->value = pExc;
×
414

415
  return code;
×
UNCOV
416
_return:
×
UNCOV
417
  qError("failed to build exchange operator param for external window, code:%d, line:%d", code, lino);
×
UNCOV
418
  if (pExc) {
×
UNCOV
419
   if (pExc->basic.uidList) {
×
UNCOV
420
      taosArrayDestroy(pExc->basic.uidList);
×
421
    }
UNCOV
422
    taosMemoryFreeClear(pExc);
×
423
  }
UNCOV
424
  if (*ppRes) {
×
UNCOV
425
    if ((*ppRes)->pChildren) {
×
UNCOV
426
      taosArrayDestroy((*ppRes)->pChildren);
×
427
    }
UNCOV
428
    taosMemoryFreeClear(*ppRes);
×
429
  }
430

UNCOV
431
  return code;
×
432
}
433

434
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
2,688✔
435
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
2,688✔
436
  if (NULL == *ppRes) {
2,688✔
UNCOV
437
    return terrno;
×
438
  }
439
  (*ppRes)->pChildren = NULL;
2,688✔
440
  
441
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
2,688✔
442
  if (NULL == pExc) {
2,688✔
443
    return terrno;
×
444
  }
445

446
  pExc->multiParams = false;
2,688✔
447
  pExc->basic.vgId = *pVgId;
2,688✔
448
  pExc->basic.tableSeq = true;
2,688✔
449
  pExc->basic.type = EX_SRC_TYPE_STB_JOIN_SCAN;
2,688✔
450
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
2,688✔
451
  pExc->basic.orgTbInfo = NULL;
2,688✔
452
  pExc->basic.tagList = NULL;
2,688✔
453
  pExc->basic.batchOrgTbInfo = NULL;
2,688✔
454
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
2,688✔
455
  if (NULL == pExc->basic.uidList) {
2,688✔
UNCOV
456
    taosMemoryFree(pExc);
×
UNCOV
457
    return terrno;
×
458
  }
459
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
5,376✔
UNCOV
460
    taosArrayDestroy(pExc->basic.uidList);
×
UNCOV
461
    taosMemoryFree(pExc);
×
UNCOV
462
    return terrno;
×
463
  }
464

465
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
2,688✔
466
  (*ppRes)->downstreamIdx = downstreamIdx;
2,688✔
467
  (*ppRes)->value = pExc;
2,688✔
468
  (*ppRes)->reUse = false;
2,688✔
469

470
  return TSDB_CODE_SUCCESS;
2,688✔
471
}
472

473
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
155,040✔
474
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
155,040✔
475
  if (NULL == *ppRes) {
155,040✔
UNCOV
476
    return terrno;
×
477
  }
478
  (*ppRes)->pChildren = NULL;
155,040✔
479
  
480
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
155,040✔
481
  if (NULL == pExc) {
155,040✔
UNCOV
482
    taosMemoryFreeClear(*ppRes);
×
UNCOV
483
    return terrno;
×
484
  }
485

486
  pExc->multiParams = true;
155,040✔
487
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
155,040✔
488
  if (NULL == pExc->pBatchs) {
155,040✔
UNCOV
489
    taosMemoryFree(pExc);
×
UNCOV
490
    taosMemoryFreeClear(*ppRes);
×
UNCOV
491
    return terrno;
×
492
  }
493
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
155,040✔
494
  
495
  SExchangeOperatorBasicParam basic;
155,040✔
496
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
155,040✔
497

498
  int32_t iter = 0;
155,040✔
499
  void* p = NULL;
155,040✔
500
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
429,520✔
501
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
274,480✔
502
    SArray* pUidList = *(SArray**)p;
274,480✔
503
    basic.vgId = *pVgId;
274,480✔
504
    basic.uidList = pUidList;
274,480✔
505
    basic.orgTbInfo = NULL;
274,480✔
506
    basic.tableSeq = false;
274,480✔
507
    basic.type = EX_SRC_TYPE_STB_JOIN_SCAN;
274,480✔
508
    basic.tagList = NULL;
274,480✔
509
    basic.batchOrgTbInfo = NULL;
274,480✔
510
    
511
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
274,480✔
512

513
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
274,480✔
514
    *(SArray**)p = NULL;
274,480✔
515
  }
516

517
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
155,040✔
518
  (*ppRes)->downstreamIdx = downstreamIdx;
155,040✔
519
  (*ppRes)->value = pExc;
155,040✔
520
  (*ppRes)->reUse = false;
155,040✔
521

522
  return TSDB_CODE_SUCCESS;
155,040✔
523
}
524

525
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
521,677✔
526
  int32_t                      code = TSDB_CODE_SUCCESS;
521,677✔
527
  int32_t                      lino = 0;
521,677✔
528
  SExchangeOperatorParam*      pExc = NULL;
521,677✔
529
  SExchangeOperatorBasicParam* basic = NULL;
521,677✔
530

531
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
521,677✔
532
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
521,677✔
533
  (*ppRes)->pChildren = NULL;
521,677✔
534

535
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
521,677✔
536
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
521,677✔
537

538
  pExc->multiParams = false;
521,677✔
539

540
  basic = &pExc->basic;
521,677✔
541
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
521,677✔
542

543
  basic->vgId = vgId;
521,677✔
544
  basic->tableSeq = false;
521,677✔
545
  basic->type = EX_SRC_TYPE_VSTB_TAG_SCAN;
521,677✔
546
  basic->isNewDeployed = false;
521,677✔
547
  basic->orgTbInfo = NULL;
521,677✔
548
  basic->tagList = NULL;
521,677✔
549
  basic->batchOrgTbInfo = NULL;
521,677✔
550

551
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
521,677✔
552
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
521,677✔
553
  QUERY_CHECK_NULL(taosArrayPush(basic->uidList, &uid), code, lino, _return, terrno)
1,043,354✔
554

555
  (*ppRes)->pChildren = NULL;
521,677✔
556

557
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
521,677✔
558
  (*ppRes)->downstreamIdx = downstreamIdx;
521,677✔
559
  (*ppRes)->value = pExc;
521,677✔
560
  (*ppRes)->reUse = true;
521,677✔
561

562
  return TSDB_CODE_SUCCESS;
521,677✔
563

UNCOV
564
_return:
×
UNCOV
565
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
566
  taosMemoryFreeClear(*ppRes);
×
UNCOV
567
  if (basic) {
×
UNCOV
568
    if (basic->orgTbInfo) {
×
UNCOV
569
      taosArrayDestroy(basic->orgTbInfo->colMap);
×
UNCOV
570
      taosMemoryFreeClear(basic->orgTbInfo);
×
571
    }
UNCOV
572
    if (basic->uidList) {
×
UNCOV
573
      taosArrayDestroy(basic->uidList);
×
574
    }
UNCOV
575
    taosMemoryFreeClear(basic);
×
576
  }
577
  taosMemoryFreeClear(pExc);
×
578
  return code;
×
579
}
580

581
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
858,223✔
582
  int32_t                      code = TSDB_CODE_SUCCESS;
858,223✔
583
  int32_t                      lino = 0;
858,223✔
584
  SExchangeOperatorParam*      pExc = NULL;
858,223✔
585
  SExchangeOperatorBasicParam* basic = NULL;
858,223✔
586

587
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
858,223✔
588
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
858,223✔
589
  (*ppRes)->pChildren = NULL;
858,223✔
590

591
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
858,223✔
592
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
858,223✔
593

594
  pExc->multiParams = false;
858,223✔
595

596
  basic = &pExc->basic;
858,223✔
597
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
858,223✔
598

599
  basic->vgId = pMap->vgId;
858,223✔
600
  basic->tableSeq = false;
858,223✔
601
  basic->type = EX_SRC_TYPE_VSTB_SCAN;
858,223✔
602
  basic->isNewDeployed = false;
858,223✔
603
  basic->isNewParam = true;
858,223✔
604
  basic->orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
858,223✔
605
  QUERY_CHECK_NULL(basic->orgTbInfo, code, lino, _return, terrno)
858,223✔
606
  basic->orgTbInfo->vgId = pMap->vgId;
858,223✔
607
  tstrncpy(basic->orgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
858,223✔
608
  basic->orgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
858,223✔
609
  QUERY_CHECK_NULL(basic->orgTbInfo->colMap, code, lino, _return, terrno)
858,223✔
610

611
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
858,223✔
612
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
858,223✔
613

614
  basic->tagList = NULL;
858,223✔
615
  basic->batchOrgTbInfo = NULL;
858,223✔
616

617
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
858,223✔
618
  (*ppRes)->downstreamIdx = downstreamIdx;
858,223✔
619
  (*ppRes)->value = pExc;
858,223✔
620
  (*ppRes)->reUse = true;
858,223✔
621

622
  return TSDB_CODE_SUCCESS;
858,223✔
623

UNCOV
624
_return:
×
UNCOV
625
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
626
  taosMemoryFreeClear(*ppRes);
×
UNCOV
627
  if (basic) {
×
UNCOV
628
    if (basic->orgTbInfo) {
×
UNCOV
629
      taosArrayDestroy(basic->orgTbInfo->colMap);
×
UNCOV
630
      taosMemoryFreeClear(basic->orgTbInfo);
×
631
    }
UNCOV
632
    if (basic->uidList) {
×
UNCOV
633
      taosArrayDestroy(basic->uidList);
×
634
    }
UNCOV
635
    taosMemoryFreeClear(basic);
×
636
  }
UNCOV
637
  taosMemoryFreeClear(pExc);
×
UNCOV
638
  return code;
×
639
}
640

641
static int32_t buildExchangeOperatorParamForVScanEx(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap, uint64_t taskId, SStreamTaskAddr* pTaskAddr) {
4,488✔
642
  int32_t                      code = TSDB_CODE_SUCCESS;
4,488✔
643
  int32_t                      lino = 0;
4,488✔
644
  SExchangeOperatorParam*      pExc = NULL;
4,488✔
645
  SExchangeOperatorBasicParam* basic = NULL;
4,488✔
646

647
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,488✔
648
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,488✔
649
  (*ppRes)->pChildren = NULL;
4,488✔
650

651
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
4,488✔
652
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
4,488✔
653

654
  pExc->multiParams = false;
4,488✔
655

656
  basic = &pExc->basic;
4,488✔
657
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
4,488✔
658

659
  basic->vgId = pMap->vgId;
4,488✔
660
  basic->tableSeq = false;
4,488✔
661
  basic->type = EX_SRC_TYPE_VSTB_SCAN;
4,488✔
662
  basic->isNewDeployed = true;
4,488✔
663
  basic->isNewParam = true;
4,488✔
664
  basic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
4,488✔
665
  basic->newDeployedSrc.clientId = taskId;// current task's taskid
4,488✔
666
  basic->newDeployedSrc.taskId = pTaskAddr->taskId;
4,488✔
667
  basic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
4,488✔
668
  basic->newDeployedSrc.localExec = false;
4,488✔
669
  basic->newDeployedSrc.addr.nodeId = pTaskAddr->nodeId;
4,488✔
670
  memcpy(&basic->newDeployedSrc.addr.epSet, &pTaskAddr->epset, sizeof(SEpSet));
4,488✔
671
  basic->orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
4,488✔
672
  QUERY_CHECK_NULL(basic->orgTbInfo, code, lino, _return, terrno)
4,488✔
673
  basic->orgTbInfo->vgId = pMap->vgId;
4,488✔
674
  tstrncpy(basic->orgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
4,488✔
675
  basic->orgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
4,488✔
676
  QUERY_CHECK_NULL(basic->orgTbInfo->colMap, code, lino, _return, terrno)
4,488✔
677

678
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
4,488✔
679
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
4,488✔
680

681
  basic->batchOrgTbInfo = NULL;
4,488✔
682
  basic->tagList = NULL;
4,488✔
683

684
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
4,488✔
685
  (*ppRes)->downstreamIdx = downstreamIdx;
4,488✔
686
  (*ppRes)->value = pExc;
4,488✔
687
  (*ppRes)->reUse = true;
4,488✔
688

689
  return TSDB_CODE_SUCCESS;
4,488✔
690

691
_return:
×
692
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
693
  taosMemoryFreeClear(*ppRes);
×
UNCOV
694
  if (basic) {
×
UNCOV
695
    if (basic->orgTbInfo) {
×
UNCOV
696
      taosArrayDestroy(basic->orgTbInfo->colMap);
×
UNCOV
697
      taosMemoryFreeClear(basic->orgTbInfo);
×
698
    }
UNCOV
699
    if (basic->uidList) {
×
UNCOV
700
      taosArrayDestroy(basic->uidList);
×
701
    }
UNCOV
702
    taosMemoryFreeClear(basic);
×
703
  }
UNCOV
704
  taosMemoryFreeClear(pExc);
×
705
  return code;
×
706
}
707

708
static int32_t buildTagListForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pTagList) {
×
709
  int32_t  code = TSDB_CODE_SUCCESS;
×
710
  int32_t  lino = 0;
×
711
  STagVal  tmpTag;
×
712
  pBasic->tagList = taosArrayInit(1, sizeof(STagVal));
×
UNCOV
713
  QUERY_CHECK_NULL(pBasic->tagList, code, lino, _return, terrno)
×
714

715
  for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
×
716
    STagVal* pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
×
717
    QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno)
×
718
    tmpTag.type = pSrcTag->type;
×
719
    tmpTag.cid = pSrcTag->cid;
×
720
    if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
×
UNCOV
721
      tmpTag.nData = pSrcTag->nData;
×
722
      tmpTag.pData = taosMemoryMalloc(tmpTag.nData);
×
723
      QUERY_CHECK_NULL(tmpTag.pData, code, lino, _return, terrno)
×
724
      memcpy(tmpTag.pData, pSrcTag->pData, tmpTag.nData);
×
725
    } else {
726
      tmpTag.i64 = pSrcTag->i64;
×
727
    }
728

UNCOV
729
    QUERY_CHECK_NULL(taosArrayPush(pBasic->tagList, &tmpTag), code, lino, _return, terrno)
×
730
    tmpTag = (STagVal){0};
×
731
  }
732

733
  return code;
×
734
_return:
×
735
  if (pBasic->tagList) {
×
UNCOV
736
    taosArrayDestroyEx(pBasic->tagList, destroyTagVal);
×
UNCOV
737
    pBasic->tagList = NULL;
×
738
  }
739
  if (tmpTag.pData) {
×
740
    taosMemoryFree(tmpTag.pData);
×
741
  }
742
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
UNCOV
743
  return code;
×
744
}
745

UNCOV
746
static int32_t buildBatchOrgTbInfoForExchangeBasicParam(SExchangeOperatorBasicParam* pBasic, SArray* pOrgTbInfoArray) {
×
UNCOV
747
  int32_t     code = TSDB_CODE_SUCCESS;
×
UNCOV
748
  int32_t     lino = 0;
×
749
  SOrgTbInfo  batchInfo;
×
750

UNCOV
751
  pBasic->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
UNCOV
752
  QUERY_CHECK_NULL(pBasic->batchOrgTbInfo, code, lino, _return, terrno)
×
753

UNCOV
754
  for (int32_t i = 0; i < taosArrayGetSize(pOrgTbInfoArray); ++i) {
×
UNCOV
755
    SOrgTbInfo* pSrc = (SOrgTbInfo*)taosArrayGet(pOrgTbInfoArray, i);
×
UNCOV
756
    QUERY_CHECK_NULL(pSrc, code, lino, _return, terrno)
×
UNCOV
757
    batchInfo.vgId = pSrc->vgId;
×
UNCOV
758
    tstrncpy(batchInfo.tbName, pSrc->tbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
759
    batchInfo.colMap = taosArrayDup(pSrc->colMap, NULL);
×
760
    QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno)
×
UNCOV
761
    QUERY_CHECK_NULL(taosArrayPush(pBasic->batchOrgTbInfo, &batchInfo), code, lino, _return, terrno)
×
UNCOV
762
    batchInfo = (SOrgTbInfo){0};
×
763
  }
764

UNCOV
765
  return code;
×
UNCOV
766
_return:
×
UNCOV
767
  qError("%s failed at line: %d, code: %d", __func__, lino, code);
×
UNCOV
768
  if (pBasic->batchOrgTbInfo) {
×
UNCOV
769
    taosArrayDestroyEx(pBasic->batchOrgTbInfo, destroySOrgTbInfo);
×
770
    pBasic->batchOrgTbInfo = NULL;
×
771
  }
772
  if (batchInfo.colMap) {
×
773
    taosArrayDestroy(batchInfo.colMap);
×
UNCOV
774
    batchInfo.colMap = NULL;
×
775
  }
776
  return code;
×
777
}
778

779
static int32_t buildExchangeOperatorParamForVSAgg(SOperatorParam** ppRes, int32_t downstreamIdx, SArray* pTagList, uint64_t groupid,  SHashObj* pBatchMaps) {
×
780
  int32_t                       code = TSDB_CODE_SUCCESS;
×
781
  int32_t                       lino = 0;
×
782
  SOperatorParam*               pParam = NULL;
×
UNCOV
783
  SExchangeOperatorBatchParam*  pExc = NULL;
×
UNCOV
784
  SExchangeOperatorBasicParam   basic = {0};
×
785

UNCOV
786
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
787
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
×
788

UNCOV
789
  pParam->value = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
UNCOV
790
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
×
791

UNCOV
792
  pExc = pParam->value;
×
UNCOV
793
  pExc->multiParams = true;
×
794

UNCOV
795
  pExc->pBatchs = tSimpleHashInit(taosHashGetSize(pBatchMaps), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
796
  QUERY_CHECK_NULL(pExc->pBatchs, code, lino, _return, terrno)
×
797

UNCOV
798
  size_t keyLen = 0;
×
UNCOV
799
  void*  pIter = taosHashIterate(pBatchMaps, NULL);
×
UNCOV
800
  while (pIter != NULL) {
×
UNCOV
801
    SArray*          pOrgTbInfoArray = *(SArray**)pIter;
×
UNCOV
802
    int32_t*         vgId = (int32_t*)taosHashGetKey(pIter, &keyLen);
×
803

UNCOV
804
    basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
UNCOV
805
    basic.vgId = *vgId;
×
UNCOV
806
    basic.tableSeq = false;
×
UNCOV
807
    basic.type = EX_SRC_TYPE_VSTB_AGG_SCAN;
×
UNCOV
808
    basic.isNewDeployed = false;
×
UNCOV
809
    basic.isNewParam = true;
×
UNCOV
810
    basic.groupid = groupid;
×
UNCOV
811
    basic.window.skey = INT64_MAX;
×
UNCOV
812
    basic.window.ekey = INT64_MIN;
×
813

UNCOV
814
    if (pTagList) {
×
UNCOV
815
      code = buildTagListForExchangeBasicParam(&basic, pTagList);
×
UNCOV
816
      QUERY_CHECK_CODE(code, lino, _return);
×
817
    } else {
UNCOV
818
      basic.tagList = NULL;
×
819
    }
820

UNCOV
821
    code = buildBatchOrgTbInfoForExchangeBasicParam(&basic, pOrgTbInfoArray);
×
UNCOV
822
    QUERY_CHECK_CODE(code, lino, _return);
×
823

UNCOV
824
    basic.orgTbInfo = NULL;
×
825

UNCOV
826
    basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
UNCOV
827
    QUERY_CHECK_NULL(basic.uidList, code, lino, _return, terrno)
×
828

UNCOV
829
    code = tSimpleHashPut(pExc->pBatchs, vgId, sizeof(*vgId), &basic, sizeof(basic));
×
UNCOV
830
    QUERY_CHECK_CODE(code, lino, _return);
×
831

UNCOV
832
    basic = (SExchangeOperatorBasicParam){0};
×
UNCOV
833
    pIter = taosHashIterate(pBatchMaps, pIter);
×
834
  }
835

UNCOV
836
  pParam->pChildren = NULL;
×
UNCOV
837
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
838
  pParam->downstreamIdx = downstreamIdx;
×
839
  pParam->reUse = false;
×
840

841
  *ppRes = pParam;
×
842
  return code;
×
843

844
_return:
×
845
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
846
  freeOperatorParam(pParam, OP_GET_PARAM);
×
847
  freeExchangeGetBasicOperatorParam(&basic);
×
848
  return code;
×
849
}
850

851
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
1,593,634✔
852
  int32_t code = TSDB_CODE_SUCCESS;
1,593,634✔
853
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,593,634✔
854
  if (NULL == *ppRes) {
1,593,634✔
UNCOV
855
    code = terrno;
×
UNCOV
856
    return code;
×
857
  }
858
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
1,593,634✔
859
  if (NULL == (*ppRes)->pChildren) {
1,593,634✔
UNCOV
860
    code = terrno;
×
UNCOV
861
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
862
    *ppRes = NULL;
×
UNCOV
863
    return code;
×
864
  }
865
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
3,187,268✔
UNCOV
866
    code = terrno;
×
867
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
868
    *ppRes = NULL;
×
UNCOV
869
    return code;
×
870
  }
871
  *ppChild0 = NULL;
1,593,634✔
872
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
3,187,268✔
UNCOV
873
    code = terrno;
×
UNCOV
874
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
875
    *ppRes = NULL;
×
876
    return code;
×
877
  }
878
  *ppChild1 = NULL;
1,593,634✔
879
  
880
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
1,593,634✔
881
  if (NULL == pJoin) {
1,593,634✔
UNCOV
882
    code = terrno;
×
UNCOV
883
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
884
    *ppRes = NULL;
×
UNCOV
885
    return code;
×
886
  }
887

888
  pJoin->initDownstream = initParam;
1,593,634✔
889
  
890
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
1,593,634✔
891
  (*ppRes)->value = pJoin;
1,593,634✔
892
  (*ppRes)->reUse = false;
1,593,634✔
893

894
  return TSDB_CODE_SUCCESS;
1,593,634✔
895
}
896

897
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
UNCOV
898
  int32_t code = TSDB_CODE_SUCCESS;
×
899
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
900
  if (NULL == *ppRes) {
×
901
    code = terrno;
×
UNCOV
902
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
903
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
904
    return code;
×
905
  }
UNCOV
906
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
UNCOV
907
  if (NULL == *ppRes) {
×
908
    code = terrno;
×
UNCOV
909
    taosMemoryFreeClear(*ppRes);
×
UNCOV
910
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
911
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
912
    return code;
×
913
  }
UNCOV
914
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
UNCOV
915
    code = terrno;
×
UNCOV
916
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
UNCOV
917
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
918
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
919
    *ppRes = NULL;
×
UNCOV
920
    return code;
×
921
  }
922
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
923
    code = terrno;
×
924
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
925
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
926
    *ppRes = NULL;
×
927
    return code;
×
928
  }
929
  
UNCOV
930
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
UNCOV
931
  (*ppRes)->value = NULL;
×
UNCOV
932
  (*ppRes)->reUse = false;
×
933

UNCOV
934
  return TSDB_CODE_SUCCESS;
×
935
}
936

937
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
14,418✔
938
  int32_t code = TSDB_CODE_SUCCESS;
14,418✔
939
  int32_t vgNum = tSimpleHashGetSize(pVg);
14,418✔
940
  if (vgNum <= 0 || vgNum > 1) {
14,418✔
941
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
UNCOV
942
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
943
  }
944

945
  int32_t iter = 0;
14,418✔
946
  void* p = NULL;
14,418✔
947
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
28,836✔
948
    SArray* pUidList = *(SArray**)p;
14,418✔
949

950
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
14,418✔
951
    if (code) {
14,418✔
UNCOV
952
      return code;
×
953
    }
954
    taosArrayDestroy(pUidList);
14,418✔
955
    *(SArray**)p = NULL;
14,418✔
956
  }
957
  
958
  return TSDB_CODE_SUCCESS;
14,418✔
959
}
960

961

UNCOV
962
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
UNCOV
963
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
UNCOV
964
  if (NULL == pUidList) {
×
965
    return terrno;
×
966
  }
UNCOV
967
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
UNCOV
968
    return terrno;
×
969
  }
970

UNCOV
971
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
UNCOV
972
  taosArrayDestroy(pUidList);
×
UNCOV
973
  if (code) {
×
UNCOV
974
    return code;
×
975
  }
976
  
977
  return TSDB_CODE_SUCCESS;
×
978
}
979

980
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
1,593,634✔
981
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
1,593,634✔
982
  SOperatorParam*             pSrcParam0 = NULL;
1,593,634✔
983
  SOperatorParam*             pSrcParam1 = NULL;
1,593,634✔
984
  SOperatorParam*             pGcParam0 = NULL;
1,593,634✔
985
  SOperatorParam*             pGcParam1 = NULL;  
1,593,634✔
986
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
1,593,634✔
987
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
1,593,634✔
988
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
1,593,634✔
989
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
1,593,634✔
990
  int32_t                     code = TSDB_CODE_SUCCESS;
1,593,634✔
991

992
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
1,593,634✔
993
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
994

995
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
1,593,634✔
996
  
997
  if (pInfo->stbJoin.basic.batchFetch) {
1,593,634✔
998
    if (pPrev->leftHash) {
1,592,290✔
999
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
84,729✔
1000
      if (TSDB_CODE_SUCCESS == code) {
84,729✔
1001
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
84,729✔
1002
      }
1003
      if (TSDB_CODE_SUCCESS == code) {
84,729✔
1004
        tSimpleHashCleanup(pPrev->leftHash);
84,729✔
1005
        tSimpleHashCleanup(pPrev->rightHash);
84,729✔
1006
        pPrev->leftHash = NULL;
84,729✔
1007
        pPrev->rightHash = NULL;
84,729✔
1008
      }
1009
    }
1010
  } else {
1011
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
1,344✔
1012
    if (TSDB_CODE_SUCCESS == code) {
1,344✔
1013
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
1,344✔
1014
    }
1015
  }
1016

1017
  bool initParam = pSrcParam0 ? true : false;
1,593,634✔
1018
  if (TSDB_CODE_SUCCESS == code) {
1,593,634✔
1019
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
1,593,634✔
1020
    pSrcParam0 = NULL;
1,593,634✔
1021
  }
1022
  if (TSDB_CODE_SUCCESS == code) {
1,593,634✔
1023
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
1,593,634✔
1024
    pSrcParam1 = NULL;
1,593,634✔
1025
  }
1026
  if (TSDB_CODE_SUCCESS == code) {
1,593,634✔
1027
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
1,593,634✔
1028
  }
1029
  if (TSDB_CODE_SUCCESS != code) {
1,593,634✔
UNCOV
1030
    if (pSrcParam0) {
×
1031
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
1032
    }
UNCOV
1033
    if (pSrcParam1) {
×
UNCOV
1034
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
1035
    }
UNCOV
1036
    if (pGcParam0) {
×
1037
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
1038
    }
UNCOV
1039
    if (pGcParam1) {
×
UNCOV
1040
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
1041
    }
UNCOV
1042
    if (*ppParam) {
×
UNCOV
1043
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
UNCOV
1044
      *ppParam = NULL;
×
1045
    }
1046
  }
1047
  
1048
  return code;
1,593,634✔
1049
}
1050

1051
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,593,634✔
1052
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,593,634✔
1053
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,593,634✔
1054
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,593,634✔
1055
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
1,593,634✔
1056
  SOperatorParam*            pParam = NULL;
1,593,634✔
1057
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
1,593,634✔
1058
  if (TSDB_CODE_SUCCESS != code) {
1,593,634✔
UNCOV
1059
    pOperator->pTaskInfo->code = code;
×
1060
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1061
  }
1062

1063
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
1,593,634✔
1064
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
1,593,634✔
1065
  if (*ppRes && (code == 0)) {
1,593,634✔
1066
    code = blockDataCheck(*ppRes);
249,143✔
1067
    if (code) {
249,143✔
1068
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
UNCOV
1069
      pOperator->pTaskInfo->code = code;
×
UNCOV
1070
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1071
    }
1072
    pPost->isStarted = true;
249,143✔
1073
    pStbJoin->execInfo.postBlkNum++;
249,143✔
1074
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
249,143✔
1075
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
249,143✔
1076
  } else {
1077
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
1,344,491✔
1078
  }
1079
}
1,593,634✔
1080

1081

UNCOV
1082
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
UNCOV
1083
  SOperatorParam* pGcParam = NULL;
×
UNCOV
1084
  SOperatorParam* pMergeJoinParam = NULL;
×
UNCOV
1085
  int32_t         downstreamId = leftTable ? 0 : 1;
×
UNCOV
1086
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
1087
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
1088

UNCOV
1089
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
1090

UNCOV
1091
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
UNCOV
1092
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1093
    return code;
×
1094
  }
UNCOV
1095
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
UNCOV
1096
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1097
    return code;
×
1098
  }
1099

UNCOV
1100
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
1101
}
1102

1103
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
1,593,046✔
1104
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
1,593,046✔
1105
  int32_t code = 0;
1,593,046✔
1106
  
1107
  pPost->isStarted = false;
1,593,046✔
1108
  
1109
  if (pStbJoin->basic.batchFetch) {
1,593,046✔
1110
    return TSDB_CODE_SUCCESS;
1,591,702✔
1111
  }
1112
  
1113
  if (pPost->leftNeedCache) {
1,344✔
UNCOV
1114
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
1115
    if (num && --(*num) <= 0) {
×
UNCOV
1116
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
UNCOV
1117
      if (code) {
×
UNCOV
1118
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
UNCOV
1119
        QRY_ERR_RET(code);
×
1120
      }
UNCOV
1121
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
1122
    }
1123
  }
1124
  
1125
  if (!pPost->rightNeedCache) {
1,344✔
1126
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
1,344✔
1127
    if (NULL != v) {
1,344✔
UNCOV
1128
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
UNCOV
1129
      if (code) {
×
UNCOV
1130
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
1131
        QRY_ERR_RET(code);
×
1132
      }
UNCOV
1133
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
1134
    }
1135
  }
1136

1137
  return TSDB_CODE_SUCCESS;
1,344✔
1138
}
1139

1140

1141
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1142
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
333,956✔
1143
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
333,956✔
1144
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
333,956✔
1145

1146
  if (!pPost->isStarted) {
333,956✔
1147
    return TSDB_CODE_SUCCESS;
85,401✔
1148
  }
1149
  
1150
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
248,555✔
1151
  
1152
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
248,555✔
1153
  if (NULL == *ppRes) {
248,555✔
1154
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
248,555✔
1155
    pPrev->pListHead->readIdx++;
248,555✔
1156
  } else {
UNCOV
1157
    pInfo->stbJoin.execInfo.postBlkNum++;
×
UNCOV
1158
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
1159
  }
1160

1161
  return TSDB_CODE_SUCCESS;
248,555✔
1162
}
1163

1164
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
1165
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
3,189,284✔
1166
  if (NULL == ppArray) {
3,189,284✔
1167
    SArray* pArray = taosArrayInit(10, valSize);
288,898✔
1168
    if (NULL == pArray) {
288,898✔
UNCOV
1169
      return terrno;
×
1170
    }
1171
    if (NULL == taosArrayPush(pArray, pVal)) {
577,796✔
UNCOV
1172
      taosArrayDestroy(pArray);
×
UNCOV
1173
      return terrno;
×
1174
    }
1175
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
288,898✔
1176
      taosArrayDestroy(pArray);      
×
1177
      return terrno;
×
1178
    }
1179
    return TSDB_CODE_SUCCESS;
288,898✔
1180
  }
1181

1182
  if (NULL == taosArrayPush(*ppArray, pVal)) {
5,800,772✔
UNCOV
1183
    return terrno;
×
1184
  }
1185
  
1186
  return TSDB_CODE_SUCCESS;
2,900,386✔
1187
}
1188

1189
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
1190
  int32_t code = TSDB_CODE_SUCCESS;
1,344✔
1191
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
1,344✔
1192
  if (NULL == pNum) {
1,344✔
1193
    uint32_t n = 1;
1,344✔
1194
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
1,344✔
1195
    if (code) {
1,344✔
UNCOV
1196
      return code;
×
1197
    }
1198
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
1,344✔
1199
    if (code) {
1,344✔
UNCOV
1200
      return code;
×
1201
    }
1202
    return TSDB_CODE_SUCCESS;
1,344✔
1203
  }
1204

UNCOV
1205
  switch (*pNum) {
×
UNCOV
1206
    case 0:
×
UNCOV
1207
      break;
×
UNCOV
1208
    case UINT32_MAX:
×
UNCOV
1209
      *pNum = 0;
×
UNCOV
1210
      break;
×
UNCOV
1211
    default:
×
UNCOV
1212
      if (1 == (*pNum)) {
×
UNCOV
1213
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
UNCOV
1214
        if (code) {
×
UNCOV
1215
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
UNCOV
1216
          QRY_ERR_RET(code);
×
1217
        }
1218
      }
UNCOV
1219
      (*pNum)++;
×
UNCOV
1220
      break;
×
1221
  }
1222
  
UNCOV
1223
  return TSDB_CODE_SUCCESS;
×
1224
}
1225

1226

1227
static void freeStbJoinTableList(SStbJoinTableList* pList) {
84,813✔
1228
  if (NULL == pList) {
84,813✔
UNCOV
1229
    return;
×
1230
  }
1231
  taosMemoryFree(pList->pLeftVg);
84,813✔
1232
  taosMemoryFree(pList->pLeftUid);
84,813✔
1233
  taosMemoryFree(pList->pRightVg);
84,813✔
1234
  taosMemoryFree(pList->pRightUid);
84,813✔
1235
  taosMemoryFree(pList);
84,813✔
1236
}
1237

1238
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
85,401✔
1239
  int32_t code = TSDB_CODE_SUCCESS;
85,401✔
1240
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
85,401✔
1241
  if (NULL == pNew) {
85,401✔
UNCOV
1242
    return terrno;
×
1243
  }
1244
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
85,401✔
1245
  if (NULL == pNew->pLeftVg) {
85,401✔
UNCOV
1246
    code = terrno;
×
UNCOV
1247
    freeStbJoinTableList(pNew);
×
UNCOV
1248
    return code;
×
1249
  }
1250
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
85,401✔
1251
  if (NULL == pNew->pLeftUid) {
85,401✔
UNCOV
1252
    code = terrno;
×
UNCOV
1253
    freeStbJoinTableList(pNew);
×
UNCOV
1254
    return code;
×
1255
  }
1256
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
85,401✔
1257
  if (NULL == pNew->pRightVg) {
85,401✔
UNCOV
1258
    code = terrno;
×
UNCOV
1259
    freeStbJoinTableList(pNew);
×
UNCOV
1260
    return code;
×
1261
  }
1262
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
85,401✔
1263
  if (NULL == pNew->pRightUid) {
85,401✔
UNCOV
1264
    code = terrno;
×
UNCOV
1265
    freeStbJoinTableList(pNew);
×
1266
    return code;
×
1267
  }
1268

1269
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
85,401✔
1270
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
85,401✔
1271
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
85,401✔
1272
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
85,401✔
1273

1274
  pNew->readIdx = 0;
85,401✔
1275
  pNew->uidNum = rows;
85,401✔
1276
  pNew->pNext = NULL;
85,401✔
1277
  
1278
  if (pCtx->pListTail) {
85,401✔
UNCOV
1279
    pCtx->pListTail->pNext = pNew;
×
UNCOV
1280
    pCtx->pListTail = pNew;
×
1281
  } else {
1282
    pCtx->pListHead = pNew;
85,401✔
1283
    pCtx->pListTail= pNew;
85,401✔
1284
  }
1285

1286
  return TSDB_CODE_SUCCESS;
85,401✔
1287
}
1288

1289
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
85,401✔
1290
  int32_t                    code = TSDB_CODE_SUCCESS;
85,401✔
1291
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
85,401✔
1292
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
85,401✔
1293
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
85,401✔
1294
  if (NULL == pVg0) {
85,401✔
UNCOV
1295
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1296
  }
1297
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
85,401✔
1298
  if (NULL == pVg1) {
85,401✔
UNCOV
1299
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1300
  }
1301
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
85,401✔
1302
  if (NULL == pUid0) {
85,401✔
UNCOV
1303
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1304
  }
1305
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
85,401✔
1306
  if (NULL == pUid1) {
85,401✔
UNCOV
1307
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1308
  }
1309

1310
  if (pStbJoin->basic.batchFetch) {
85,401✔
1311
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
1,679,371✔
1312
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
1,594,642✔
1313
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
1,594,642✔
1314
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
1,594,642✔
1315
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
1,594,642✔
1316

1317
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
1,594,642✔
1318
      if (TSDB_CODE_SUCCESS != code) {
1,594,642✔
UNCOV
1319
        break;
×
1320
      }
1321
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
1,594,642✔
1322
      if (TSDB_CODE_SUCCESS != code) {
1,594,642✔
UNCOV
1323
        break;
×
1324
      }
1325
    }
1326
  } else {
1327
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2,016✔
1328
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
1,344✔
1329
    
1330
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
1,344✔
1331
      if (TSDB_CODE_SUCCESS != code) {
1,344✔
UNCOV
1332
        break;
×
1333
      }
1334
    }
1335
  }
1336

1337
  if (TSDB_CODE_SUCCESS == code) {
85,401✔
1338
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
85,401✔
1339
    if (TSDB_CODE_SUCCESS == code) {
85,401✔
1340
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
85,401✔
1341
    }
1342
  }
1343

UNCOV
1344
_return:
×
1345

1346
  if (TSDB_CODE_SUCCESS != code) {
85,401✔
1347
    pOperator->pTaskInfo->code = code;
×
1348
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1349
  }
1350
}
85,401✔
1351

1352

1353
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
518,077✔
1354
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
518,077✔
1355
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
518,077✔
1356

1357
  if (pStbJoin->basic.batchFetch) {
518,077✔
1358
    return;
517,405✔
1359
  }
1360

1361
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
672✔
1362
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
672✔
1363
    return;
672✔
1364
  }
1365

1366
  uint64_t* pUid = NULL;
×
UNCOV
1367
  int32_t iter = 0;
×
UNCOV
1368
  int32_t code = 0;
×
1369
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
UNCOV
1370
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1371
    if (code) {
×
UNCOV
1372
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1373
    }
1374
  }
1375

UNCOV
1376
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
UNCOV
1377
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1378

1379
/*
1380
  // debug only
1381
  iter = 0;
1382
  uint32_t* num = NULL;
1383
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1384
    A S S E R T(*num > 1);
1385
  }
1386
*/  
1387
}
1388

1389
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
518,077✔
1390
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
518,077✔
1391
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
518,077✔
1392

1393
  while (true) {
85,401✔
1394
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
603,478✔
1395
    if (NULL == pBlock) {
603,478✔
1396
      break;
518,077✔
1397
    }
1398

1399
    pStbJoin->execInfo.prevBlkNum++;
85,401✔
1400
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
85,401✔
1401
    
1402
    doBuildStbJoinTableHash(pOperator, pBlock);
85,401✔
1403
  }
1404

1405
  postProcessStbJoinTableHash(pOperator);
518,077✔
1406

1407
  pStbJoin->ctx.prev.joinBuild = true;
518,077✔
1408
}
518,077✔
1409

1410
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
333,956✔
1411
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
333,956✔
1412
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
333,956✔
1413
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
333,956✔
1414
  SStbJoinTableList*         pNode = pPrev->pListHead;
333,956✔
1415

1416
  while (pNode) {
1,763,260✔
1417
    if (pNode->readIdx >= pNode->uidNum) {
1,678,447✔
1418
      pPrev->pListHead = pNode->pNext;
84,813✔
1419
      freeStbJoinTableList(pNode);
84,813✔
1420
      pNode = pPrev->pListHead;
84,813✔
1421
      continue;
84,813✔
1422
    }
1423
    
1424
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
1,593,634✔
1425
    if (*ppRes) {
1,593,634✔
1426
      return TSDB_CODE_SUCCESS;
249,143✔
1427
    }
1428

1429
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
1,344,491✔
1430
    pPrev->pListHead->readIdx++;
1,344,491✔
1431
  }
1432

1433
  *ppRes = NULL;
84,813✔
1434
  setOperatorCompleted(pOperator);
84,813✔
1435

1436
  return TSDB_CODE_SUCCESS;
84,813✔
1437
}
1438

1439
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
766,632✔
1440
  if (pBlock) {
766,632✔
1441
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
249,143✔
1442
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
249,143✔
1443
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
249,143✔
1444

1445
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
251,957✔
1446
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,814✔
1447
        if (pSlot == NULL) {
2,814✔
UNCOV
1448
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
UNCOV
1449
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1450
        }
1451
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,814✔
1452
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,814✔
1453
        if (code != TSDB_CODE_SUCCESS) {
2,814✔
UNCOV
1454
          return code;
×
1455
        }
1456
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,814✔
1457
        if (code != TSDB_CODE_SUCCESS) {
2,814✔
UNCOV
1458
          return code;
×
1459
        }
1460
      }
1461
    } else {
1462
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
UNCOV
1463
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1464
    }
1465
  }
1466
  return TSDB_CODE_SUCCESS;
766,632✔
1467
}
1468

1469
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
780,346✔
1470
  int32_t                    code = TSDB_CODE_SUCCESS;
780,346✔
1471
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
780,346✔
1472
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
780,346✔
1473

1474
  QRY_PARAM_CHECK(pRes);
780,346✔
1475
  if (pOperator->status == OP_EXEC_DONE) {
780,346✔
1476
    return code;
13,714✔
1477
  }
1478

1479
  int64_t st = 0;
766,632✔
1480
  if (pOperator->cost.openCost == 0) {
766,632✔
1481
    st = taosGetTimestampUs();
517,193✔
1482
  }
1483

1484
  if (!pStbJoin->ctx.prev.joinBuild) {
766,632✔
1485
    buildStbJoinTableList(pOperator);
518,077✔
1486
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
518,077✔
1487
      setOperatorCompleted(pOperator);
432,676✔
1488
      goto _return;
432,676✔
1489
    }
1490
  }
1491

1492
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
333,956✔
1493
  if (*pRes) {
333,956✔
UNCOV
1494
    goto _return;
×
1495
  }
1496

1497
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
333,956✔
1498

1499
_return:
333,956✔
1500
  if (pOperator->cost.openCost == 0) {
766,632✔
1501
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
517,193✔
1502
  }
1503

1504
  if (code) {
766,632✔
UNCOV
1505
    qError("%s failed since %s", __func__, tstrerror(code));
×
UNCOV
1506
    pOperator->pTaskInfo->code = code;
×
UNCOV
1507
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1508
  } else {
1509
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
766,632✔
1510
  }
1511
  return code;
766,632✔
1512
}
1513

1514
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
521,677✔
1515
  int32_t                   code = TSDB_CODE_SUCCESS;
521,677✔
1516
  int32_t                   lino = 0;
521,677✔
1517
  SVTableScanOperatorParam* pVScan = NULL;
521,677✔
1518
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
521,677✔
1519
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
521,677✔
1520

1521
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
521,677✔
1522
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
521,677✔
1523

1524
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
521,677✔
1525
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
521,677✔
1526
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
521,677✔
1527
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
521,677✔
1528
  pVScan->uid = uid;
521,677✔
1529
  pVScan->window = pInfo->vtbScan.window;
521,677✔
1530

1531
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
521,677✔
1532
  (*ppRes)->downstreamIdx = 0;
521,677✔
1533
  (*ppRes)->value = pVScan;
521,677✔
1534
  (*ppRes)->reUse = false;
521,677✔
1535

1536
  return TSDB_CODE_SUCCESS;
521,677✔
UNCOV
1537
_return:
×
UNCOV
1538
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1539
  if (pVScan) {
×
UNCOV
1540
    taosArrayDestroy(pVScan->pOpParamArray);
×
UNCOV
1541
    taosMemoryFreeClear(pVScan);
×
1542
  }
UNCOV
1543
  if (*ppRes) {
×
UNCOV
1544
    taosArrayDestroy((*ppRes)->pChildren);
×
UNCOV
1545
    taosMemoryFreeClear(*ppRes);
×
1546
  }
UNCOV
1547
  return code;
×
1548
}
1549

1550
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
210,748✔
1551
  int32_t                    lino = 0;
210,748✔
1552
  SOperatorInfo*             operator=(SOperatorInfo*) param;
210,748✔
1553
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
210,748✔
1554

1555
  if (TSDB_CODE_SUCCESS != code) {
210,748✔
UNCOV
1556
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
UNCOV
1557
    if (operator->pTaskInfo->code != code) {
×
UNCOV
1558
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1559
             tstrerror(operator->pTaskInfo->code));
1560
    } else {
UNCOV
1561
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1562
    }
UNCOV
1563
    goto _return;
×
1564
  }
1565

1566
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
210,748✔
1567
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
210,748✔
1568

1569
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
210,748✔
1570
  QUERY_CHECK_CODE(code, lino, _return);
210,748✔
1571

1572
  taosMemoryFreeClear(pMsg->pData);
210,748✔
1573

1574
  code = tsem_post(&pScanResInfo->vtbScan.ready);
210,748✔
1575
  QUERY_CHECK_CODE(code, lino, _return);
210,748✔
1576

1577
  return code;
210,748✔
UNCOV
1578
_return:
×
UNCOV
1579
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1580
  return code;
×
1581
}
1582

1583
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
210,748✔
1584
  int32_t                    code = TSDB_CODE_SUCCESS;
210,748✔
1585
  int32_t                    lino = 0;
210,748✔
1586
  char*                      buf1 = NULL;
210,748✔
1587
  SUseDbReq*                 pReq = NULL;
210,748✔
1588
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
210,748✔
1589

1590
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
210,748✔
1591
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
210,748✔
1592
  code = tNameGetFullDbName(name, pReq->db);
210,748✔
1593
  QUERY_CHECK_CODE(code, lino, _return);
210,748✔
1594
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
210,748✔
1595
  buf1 = taosMemoryCalloc(1, contLen);
210,748✔
1596
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
210,748✔
1597
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
210,748✔
1598
  if (tempRes < 0) {
210,748✔
1599
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1600
  }
1601

1602
  // send the fetch remote task result request
1603
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
210,748✔
1604
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
210,748✔
1605

1606
  pMsgSendInfo->param = pOperator;
210,748✔
1607
  pMsgSendInfo->msgInfo.pData = buf1;
210,748✔
1608
  pMsgSendInfo->msgInfo.len = contLen;
210,748✔
1609
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
210,748✔
1610
  pMsgSendInfo->fp = dynProcessUseDbRsp;
210,748✔
1611
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
210,748✔
1612

1613
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
210,748✔
1614
  QUERY_CHECK_CODE(code, lino, _return);
210,748✔
1615

1616
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
210,748✔
1617
  QUERY_CHECK_CODE(code, lino, _return);
210,748✔
1618

1619
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
210,748✔
1620
  QUERY_CHECK_CODE(code, lino, _return);
210,748✔
1621

1622
_return:
210,748✔
1623
  if (code) {
210,748✔
UNCOV
1624
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1625
     taosMemoryFree(buf1);
×
1626
  }
1627
  taosMemoryFree(pReq);
210,748✔
1628
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
210,748✔
1629
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
210,748✔
1630
  return code;
210,748✔
1631
}
1632

1633
int dynVgInfoComp(const void* lp, const void* rp) {
402,704✔
1634
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
402,704✔
1635
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
402,704✔
1636
  if (pLeft->hashBegin < pRight->hashBegin) {
402,704✔
1637
    return -1;
402,704✔
UNCOV
1638
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
UNCOV
1639
    return 1;
×
1640
  }
1641

UNCOV
1642
  return 0;
×
1643
}
1644

1645
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
910,903✔
1646
  if (NULL == dbInfo) {
910,903✔
UNCOV
1647
    return TSDB_CODE_SUCCESS;
×
1648
  }
1649

1650
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
910,903✔
1651
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
210,748✔
1652
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
210,748✔
1653
    if (NULL == dbInfo->vgArray) {
210,748✔
UNCOV
1654
      return terrno;
×
1655
    }
1656

1657
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
210,748✔
1658
    while (pIter) {
622,848✔
1659
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
824,200✔
UNCOV
1660
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
UNCOV
1661
        return terrno;
×
1662
      }
1663

1664
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
412,100✔
1665
    }
1666

1667
    taosArraySort(dbInfo->vgArray, sort_func);
210,748✔
1668
  }
1669

1670
  return TSDB_CODE_SUCCESS;
910,903✔
1671
}
1672

1673
int32_t dynHashValueComp(void const* lp, void const* rp) {
1,336,784✔
1674
  uint32_t*    key = (uint32_t*)lp;
1,336,784✔
1675
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
1,336,784✔
1676

1677
  if (*key < pVg->hashBegin) {
1,336,784✔
UNCOV
1678
    return -1;
×
1679
  } else if (*key > pVg->hashEnd) {
1,336,784✔
1680
    return 1;
425,881✔
1681
  }
1682

1683
  return 0;
910,903✔
1684
}
1685

1686
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
910,903✔
1687
  int32_t code = 0;
910,903✔
1688
  int32_t lino = 0;
910,903✔
1689
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
910,903✔
1690
  QUERY_CHECK_CODE(code, lino, _return);
910,903✔
1691

1692
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
910,903✔
1693
  if (vgNum <= 0) {
910,903✔
UNCOV
1694
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
UNCOV
1695
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1696
  }
1697

1698
  SVgroupInfo* vgInfo = NULL;
910,903✔
1699
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
910,903✔
1700
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
910,903✔
1701
  int32_t offset = (int32_t)strlen(tbFullName);
910,903✔
1702

1703
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
910,903✔
1704
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
1,821,806✔
1705
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
910,903✔
1706

1707
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
910,903✔
1708
  if (NULL == vgInfo) {
910,903✔
1709
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1710
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
UNCOV
1711
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1712
  }
1713

1714
  *vgId = vgInfo->vgId;
910,903✔
1715

1716
_return:
910,903✔
1717
  return code;
910,903✔
1718
}
1719

1720
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
31,146,558✔
1721
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
31,146,558✔
1722
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
31,147,370✔
1723
  SArray *                   pColList = pVtbScan->readColList;
31,147,370✔
1724
  if (pVtbScan->scanAllCols) {
31,147,370✔
1725
    return true;
709,254✔
1726
  }
1727
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
151,711,347✔
1728
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
122,585,960✔
1729
      return true;
1,311,917✔
1730
    }
1731
  }
1732
  return false;
29,126,199✔
1733
}
1734

1735
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
2,069,363✔
1736
  int32_t                    code = TSDB_CODE_SUCCESS;
2,069,363✔
1737
  int32_t                    line = 0;
2,069,363✔
1738
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,069,363✔
1739
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,069,363✔
1740
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,069,363✔
1741
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
2,069,363✔
1742
  SUseDbOutput*              output = NULL;
2,069,363✔
1743
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
2,069,363✔
1744

1745
  QRY_PARAM_CHECK(dbVgInfo);
2,069,363✔
1746

1747
  if (find == NULL) {
2,069,363✔
1748
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
210,748✔
1749
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
210,748✔
1750
    QUERY_CHECK_CODE(code, line, _return);
210,748✔
1751
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
210,748✔
1752
    QUERY_CHECK_CODE(code, line, _return);
210,748✔
1753
  } else {
1754
    output = *find;
1,858,615✔
1755
  }
1756

1757
  *dbVgInfo = output->dbVgroup;
2,069,363✔
1758
  return code;
2,069,363✔
UNCOV
1759
_return:
×
UNCOV
1760
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
1761
  freeUseDbOutput(output);
×
UNCOV
1762
  return code;
×
1763
}
1764

1765
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
2,069,363✔
1766
  int32_t     code = TSDB_CODE_SUCCESS;
2,069,363✔
1767
  int32_t     line = 0;
2,069,363✔
1768

1769
  const char *first_dot = strchr(colref, '.');
2,069,363✔
1770
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
2,069,363✔
1771

1772
  const char *second_dot = strchr(first_dot + 1, '.');
2,069,363✔
1773
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
2,069,363✔
1774

1775
  size_t db_len = first_dot - colref;
2,069,363✔
1776
  size_t table_len = second_dot - first_dot - 1;
2,069,363✔
1777
  size_t col_len = strlen(second_dot + 1);
2,069,363✔
1778

1779
  *refDb = taosMemoryMalloc(db_len + 1);
2,069,363✔
1780
  *refTb = taosMemoryMalloc(table_len + 1);
2,069,363✔
1781
  *refCol = taosMemoryMalloc(col_len + 1);
2,069,363✔
1782
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
2,069,363✔
1783
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
2,069,363✔
1784
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
2,069,363✔
1785

1786
  tstrncpy(*refDb, colref, db_len + 1);
2,069,363✔
1787
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
2,069,363✔
1788
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
2,069,363✔
1789

1790
  return TSDB_CODE_SUCCESS;
2,069,363✔
UNCOV
1791
_return:
×
UNCOV
1792
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
1793
  if (*refDb) {
×
UNCOV
1794
    taosMemoryFree(*refDb);
×
UNCOV
1795
    *refDb = NULL;
×
1796
  }
UNCOV
1797
  if (*refTb) {
×
UNCOV
1798
    taosMemoryFree(*refTb);
×
1799
    *refTb = NULL;
×
1800
  }
UNCOV
1801
  if (*refCol) {
×
1802
    taosMemoryFree(*refCol);
×
1803
    *refCol = NULL;
×
1804
  }
UNCOV
1805
  return code;
×
1806
}
1807

1808
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
50,252,190✔
1809
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
50,252,190✔
1810
      strlen(expectTbName) == varDataLen(tbName) &&
33,736,428✔
1811
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
33,736,428✔
1812
      strlen(expectDbName) == varDataLen(dbName)) {
33,736,428✔
1813
    return true;
33,736,428✔
1814
  }
1815
  return false;
16,515,762✔
1816
}
1817

1818
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
33,736,428✔
1819
  int32_t          code = TSDB_CODE_SUCCESS;
33,736,428✔
1820
  int32_t          line = 0;
33,736,428✔
1821

1822
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
33,736,428✔
1823
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
33,736,428✔
1824
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
33,736,428✔
1825
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
33,736,428✔
1826
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
33,736,428✔
1827
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
33,736,428✔
1828

1829
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
33,736,428✔
1830
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
33,736,428✔
1831
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
33,736,428✔
1832
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
33,736,428✔
1833
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
33,736,428✔
1834
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
33,736,428✔
1835

1836
  if (colDataIsNull_s(pRefCol, index)) {
67,472,856✔
1837
    pInfo->colrefName = NULL;
2,577,226✔
1838
  } else {
1839
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
31,159,202✔
1840
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
31,159,202✔
1841
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
31,159,202✔
1842
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
31,159,202✔
1843
  }
1844

1845
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
33,736,428✔
1846
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
33,736,428✔
1847
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
33,736,428✔
1848
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
33,736,428✔
1849

1850
  if (!colDataIsNull_s(pUidCol, index)) {
67,472,856✔
1851
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
33,736,428✔
1852
  }
1853
  if (!colDataIsNull_s(pColIdCol, index)) {
67,472,856✔
1854
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
31,159,202✔
1855
  }
1856
  if (!colDataIsNull_s(pVgIdCol, index)) {
67,472,856✔
1857
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
33,736,428✔
1858
  }
1859

UNCOV
1860
_return:
×
1861
  return code;
33,736,428✔
1862
}
1863

1864
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
321,900✔
1865
  int32_t                    code = TSDB_CODE_SUCCESS;
321,900✔
1866
  int32_t                    line = 0;
321,900✔
1867

1868
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
321,900✔
1869
    return code;
195,619✔
1870
  }
1871

1872
  if (pVtbScan->existOrgTbVg == NULL) {
126,281✔
UNCOV
1873
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
UNCOV
1874
    pVtbScan->curOrgTbVg = NULL;
×
1875
  }
1876

1877
  if (pVtbScan->curOrgTbVg != NULL) {
126,281✔
1878
    // which means rversion has changed
1879
    void*   pCurIter = NULL;
18,776✔
1880
    SArray* tmpArray = NULL;
18,776✔
1881
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
54,280✔
1882
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
35,504✔
1883
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
35,504✔
1884
        if (tmpArray == NULL) {
4,488✔
1885
          tmpArray = taosArrayInit(1, sizeof(int32_t));
4,488✔
1886
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
4,488✔
1887
        }
1888
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
4,488✔
1889
      }
1890
    }
1891
    if (tmpArray == NULL) {
18,776✔
1892
      return TSDB_CODE_SUCCESS;
14,288✔
1893
    }
1894
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
4,488✔
1895
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
4,488✔
1896
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
4,488✔
UNCOV
1897
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
UNCOV
1898
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
UNCOV
1899
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
1900
        }
UNCOV
1901
        taosArrayDestroy(expiredInfo);
×
1902
      }
1903
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
4,488✔
UNCOV
1904
        taosArrayDestroy(tmpArray);
×
1905
      }
1906
    }
1907
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
4,488✔
1908
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
4,488✔
1909
    taosHashClear(pVtbScan->curOrgTbVg);
4,488✔
1910
    pVtbScan->needRedeploy = true;
4,488✔
1911
    pVtbScan->rversion = rversion;
4,488✔
1912
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
4,488✔
1913
  }
1914
  return code;
107,505✔
UNCOV
1915
_return:
×
UNCOV
1916
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
UNCOV
1917
  return code;
×
1918
}
1919

1920
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
48,192✔
1921
  int32_t                    code =TSDB_CODE_SUCCESS;
48,192✔
1922
  int32_t                    line = 0;
48,192✔
1923
  char*                      refDbName = NULL;
48,192✔
1924
  char*                      refTbName = NULL;
48,192✔
1925
  char*                      refColName = NULL;
48,192✔
1926
  SDBVgInfo*                 dbVgInfo = NULL;
48,192✔
1927
  SName                      name = {0};
48,192✔
1928
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
48,192✔
1929
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
48,192✔
1930

1931
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
48,192✔
1932
  QUERY_CHECK_CODE(code, line, _return);
48,192✔
1933

1934
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
48,192✔
1935

1936
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
48,192✔
1937
  QUERY_CHECK_CODE(code, line, _return);
48,192✔
1938

1939
  code = tNameGetFullDbName(&name, dbFname);
48,192✔
1940
  QUERY_CHECK_CODE(code, line, _return);
48,192✔
1941

1942
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
48,192✔
1943
  QUERY_CHECK_CODE(code, line, _return);
48,192✔
1944

1945
_return:
48,192✔
1946
  if (code) {
48,192✔
UNCOV
1947
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1948
  }
1949
  taosMemoryFree(refDbName);
48,192✔
1950
  taosMemoryFree(refTbName);
48,192✔
1951
  taosMemoryFree(refColName);
48,192✔
1952
  return code;
48,192✔
1953
}
1954

UNCOV
1955
static int32_t generateTagArrayByTagBlockAndSave(SHashObj* vtbUidTagListMap, tb_uid_t uid, SSDataBlock *pTagVal, int32_t rowIdx) {
×
UNCOV
1956
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
1957
  int32_t line = 0;
×
UNCOV
1958
  STagVal tagVal = {0};
×
1959
  // last col is uid
1960

UNCOV
1961
  SArray* pTagList = taosArrayInit(1, sizeof(STagVal));
×
UNCOV
1962
  QUERY_CHECK_NULL(pTagList, code, line, _return, terrno)
×
1963

UNCOV
1964
  for (int32_t k = 0; k < taosArrayGetSize(pTagVal->pDataBlock) - 1; k++) {
×
UNCOV
1965
    SColumnInfoData *pTagCol = taosArrayGet(pTagVal->pDataBlock, k);
×
UNCOV
1966
    QUERY_CHECK_NULL(pTagCol, code, line, _return, terrno)
×
UNCOV
1967
    tagVal.type = pTagCol->info.type;
×
UNCOV
1968
    tagVal.cid = pTagCol->info.colId;
×
UNCOV
1969
    if (!colDataIsNull_s(pTagCol, rowIdx)) {
×
UNCOV
1970
      char*   pData = colDataGetData(pTagCol, rowIdx);
×
UNCOV
1971
      if (IS_VAR_DATA_TYPE(pTagCol->info.type)) {
×
UNCOV
1972
        tagVal.nData = varDataLen(pData);
×
UNCOV
1973
        tagVal.pData = taosMemoryMalloc(tagVal.nData);
×
UNCOV
1974
        QUERY_CHECK_NULL(tagVal.pData, code, line, _return, terrno)
×
UNCOV
1975
        memcpy(tagVal.pData, varDataVal(pData), varDataLen(pData));
×
UNCOV
1976
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
1977
      } else {
UNCOV
1978
        memcpy(&tagVal.i64, pData, tDataTypes[pTagCol->info.type].bytes);
×
UNCOV
1979
        QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
1980
      }
1981
    } else {
UNCOV
1982
      tagVal.pData = NULL;
×
UNCOV
1983
      tagVal.nData = 0;
×
UNCOV
1984
      QUERY_CHECK_NULL(taosArrayPush(pTagList, &tagVal), code, line, _return, terrno)
×
1985
    }
UNCOV
1986
    tagVal = (STagVal){0};
×
1987
  }
UNCOV
1988
  code = taosHashPut(vtbUidTagListMap, &uid, sizeof(uid), &pTagList, POINTER_BYTES);
×
UNCOV
1989
  QUERY_CHECK_CODE(code, line, _return);
×
1990

UNCOV
1991
  return code;
×
UNCOV
1992
_return:
×
UNCOV
1993
  if (tagVal.pData) {
×
UNCOV
1994
    taosMemoryFreeClear(tagVal.pData);
×
1995
  }
UNCOV
1996
  if (pTagList) {
×
UNCOV
1997
    taosArrayDestroyEx(pTagList, destroyTagVal);
×
1998
  }
UNCOV
1999
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
UNCOV
2000
  return code;
×
2001
}
2002

2003
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
521,677✔
2004
  int32_t                    code = TSDB_CODE_SUCCESS;
521,677✔
2005
  int32_t                    line = 0;
521,677✔
2006
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
521,677✔
2007
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
521,677✔
2008
  SDBVgInfo*                 dbVgInfo = NULL;
521,677✔
2009

2010
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
34,235,257✔
2011
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
33,713,580✔
2012
    *uid = pKV->uid;
33,712,768✔
2013
    *vgId = pKV->vgId;
33,713,580✔
2014
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
33,712,768✔
2015
      char*   refDbName = NULL;
2,021,171✔
2016
      char*   refTbName = NULL;
2,021,171✔
2017
      char*   refColName = NULL;
2,021,171✔
2018
      SName   name = {0};
2,021,171✔
2019
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
2,021,171✔
2020
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
2,021,171✔
2021

2022
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
2,021,171✔
2023
      QUERY_CHECK_CODE(code, line, _return);
2,021,171✔
2024

2025
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
2,021,171✔
2026

2027
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
2,021,171✔
2028
      QUERY_CHECK_CODE(code, line, _return);
2,021,171✔
2029
      code = tNameGetFullDbName(&name, dbFname);
2,021,171✔
2030
      QUERY_CHECK_CODE(code, line, _return);
2,021,171✔
2031
      code = tNameGetFullTableName(&name, orgTbFName);
2,021,171✔
2032
      QUERY_CHECK_CODE(code, line, _return);
2,021,171✔
2033

2034
      void *pVal = taosHashGet(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName));
2,021,171✔
2035
      if (!pVal) {
2,021,171✔
2036
        SOrgTbInfo orgTbInfo = {0};
862,711✔
2037
        code = getVgId(dbVgInfo, dbFname, &orgTbInfo.vgId, name.tname);
862,711✔
2038
        QUERY_CHECK_CODE(code, line, _return);
862,711✔
2039
        tstrncpy(orgTbInfo.tbName, orgTbFName, sizeof(orgTbInfo.tbName));
862,711✔
2040
        orgTbInfo.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
862,711✔
2041
        QUERY_CHECK_NULL(orgTbInfo.colMap, code, line, _return, terrno)
862,711✔
2042
        SColIdNameKV colIdNameKV = {0};
862,711✔
2043
        colIdNameKV.colId = pKV->colId;
862,711✔
2044
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
862,711✔
2045
        QUERY_CHECK_NULL(taosArrayPush(orgTbInfo.colMap, &colIdNameKV), code, line, _return, terrno)
1,725,422✔
2046
        code = taosHashPut(pVtbScan->otbNameToOtbInfoMap, orgTbFName, sizeof(orgTbFName), &orgTbInfo, sizeof(orgTbInfo));
862,711✔
2047
        QUERY_CHECK_CODE(code, line, _return);
861,899✔
2048
      } else {
2049
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
1,158,460✔
2050
        SColIdNameKV colIdNameKV = {0};
1,158,460✔
2051
        colIdNameKV.colId = pKV->colId;
1,158,460✔
2052
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
1,158,460✔
2053
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
2,316,920✔
2054
      }
2055
      taosMemoryFree(refDbName);
2,021,171✔
2056
      taosMemoryFree(refTbName);
2,020,359✔
2057
      taosMemoryFree(refColName);
2,021,171✔
2058
    }
2059
  }
2060

2061
_return:
520,865✔
2062
  if (code) {
521,677✔
UNCOV
2063
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2064
  }
2065
  return code;
521,677✔
2066
}
2067

UNCOV
2068
static int32_t getTagBlockAndProcess(SOperatorInfo* pOperator, bool hasPartition) {
×
UNCOV
2069
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
2070
  int32_t                    line = 0;
×
UNCOV
2071
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
2072
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
2073
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
2074
  SArray*                    pColRefArray = NULL;
×
UNCOV
2075
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
×
UNCOV
2076
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
×
2077

UNCOV
2078
  pVtbScan->vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2079
  QUERY_CHECK_NULL(pVtbScan->vtbUidTagListMap, code, line, _return, terrno)
×
UNCOV
2080
  taosHashSetFreeFp(pVtbScan->vtbUidTagListMap, destroyTagList);
×
UNCOV
2081
  if (hasPartition) {
×
UNCOV
2082
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2083
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2084
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
×
UNCOV
2085
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
×
UNCOV
2086
    taosHashSetFreeFp(pVtbScan->vtbGroupIdTagListMap, destroyVtbUidTagListMap);
×
2087
  }
2088

UNCOV
2089
  while (true) {
×
UNCOV
2090
    SSDataBlock *pTagVal = NULL;
×
UNCOV
2091
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
×
UNCOV
2092
    QUERY_CHECK_CODE(code, line, _return);
×
UNCOV
2093
    if (pTagVal == NULL) {
×
UNCOV
2094
      break;
×
2095
    }
UNCOV
2096
    SHashObj *vtbUidTagListMap = NULL;
×
UNCOV
2097
    if (hasPartition) {
×
UNCOV
2098
      void* pIter = taosHashGet(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
UNCOV
2099
      if (pIter) {
×
UNCOV
2100
        vtbUidTagListMap = *(SHashObj**)pIter;
×
2101
      } else {
UNCOV
2102
        vtbUidTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2103
        QUERY_CHECK_NULL(vtbUidTagListMap, code, line, _return, terrno)
×
UNCOV
2104
        taosHashSetFreeFp(vtbUidTagListMap, destroyTagList);
×
2105

UNCOV
2106
        code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), &vtbUidTagListMap, POINTER_BYTES);
×
UNCOV
2107
        QUERY_CHECK_CODE(code, line, _return);
×
2108
      }
2109
    } else {
UNCOV
2110
      vtbUidTagListMap = pVtbScan->vtbUidTagListMap;
×
2111
    }
2112

UNCOV
2113
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
×
UNCOV
2114
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
×
UNCOV
2115
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
×
UNCOV
2116
      tb_uid_t uid = 0;
×
UNCOV
2117
      if (!colDataIsNull_s(pUidCol, i)) {
×
UNCOV
2118
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
×
UNCOV
2119
        QUERY_CHECK_CODE(code, line, _return);
×
2120
      }
2121

UNCOV
2122
      code = generateTagArrayByTagBlockAndSave(vtbUidTagListMap, uid, pTagVal, i);
×
UNCOV
2123
      QUERY_CHECK_CODE(code, line, _return);
×
2124

UNCOV
2125
      if (hasPartition) {
×
UNCOV
2126
        code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
2127
        QUERY_CHECK_CODE(code, line, _return);
×
2128
      }
2129
    }
2130
  }
2131

UNCOV
2132
  return code;
×
2133

UNCOV
2134
_return:
×
UNCOV
2135
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2136
  return code;
×
2137
}
2138

UNCOV
2139
static int32_t processChildTableListAndGenerateOrgTbInfoMap(SOperatorInfo* pOperator) {
×
UNCOV
2140
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
2141
  int32_t                    line = 0;
×
UNCOV
2142
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
2143
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
2144
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
2145
  SArray*                    pColRefArray = NULL;
×
UNCOV
2146
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
×
UNCOV
2147
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
×
2148

UNCOV
2149
  pVtbScan->vtbUidToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2150
  QUERY_CHECK_NULL(pVtbScan->vtbUidToVgIdMapMap, code, line, _return, terrno)
×
2151

UNCOV
2152
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
×
UNCOV
2153
    SHashObj* otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
UNCOV
2154
    QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
×
2155

UNCOV
2156
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
×
UNCOV
2157
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
×
2158

UNCOV
2159
    tb_uid_t uid = 0;
×
UNCOV
2160
    int32_t  vgId = 0;
×
UNCOV
2161
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
×
UNCOV
2162
    QUERY_CHECK_CODE(code, line, _return);
×
2163

UNCOV
2164
    size_t len = 0;
×
UNCOV
2165
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
×
UNCOV
2166
    while (pOrgTbInfo != NULL) {
×
UNCOV
2167
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
×
UNCOV
2168
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
×
2169

UNCOV
2170
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
×
UNCOV
2171
      if (!pIter) {
×
UNCOV
2172
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
×
UNCOV
2173
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
×
UNCOV
2174
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
×
UNCOV
2175
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
×
UNCOV
2176
        QUERY_CHECK_CODE(code, line, _return);
×
2177
      } else {
UNCOV
2178
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
×
UNCOV
2179
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
×
2180
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
×
2181
      }
2182

2183
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
×
2184

2185
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
×
2186
      QUERY_CHECK_CODE(code, line, _return);
×
2187
    }
2188

UNCOV
2189
    code = taosHashPut(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
×
UNCOV
2190
    QUERY_CHECK_CODE(code, line, _return);
×
2191
  }
2192

UNCOV
2193
  return code;
×
UNCOV
2194
_return:
×
UNCOV
2195
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2196
  return code;
×
2197
}
2198

UNCOV
2199
static int32_t buildOrgTbInfoSingle(SOperatorInfo* pOperator, bool hasPartition) {
×
UNCOV
2200
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
2201
  int32_t                    line = 0;
×
2202

UNCOV
2203
  code = processChildTableListAndGenerateOrgTbInfoMap(pOperator);
×
UNCOV
2204
  QUERY_CHECK_CODE(code, line, _return);
×
2205

2206
  // process tag
UNCOV
2207
  code = getTagBlockAndProcess(pOperator, hasPartition);
×
UNCOV
2208
  QUERY_CHECK_CODE(code, line, _return);
×
2209

UNCOV
2210
  return code;
×
UNCOV
2211
_return:
×
UNCOV
2212
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2213
  return code;
×
2214
}
2215

2216
static int32_t buildOrgTbInfoBatch(SOperatorInfo* pOperator, bool hasPartition) {
×
UNCOV
2217
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
2218
  int32_t                    line = 0;
×
UNCOV
2219
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
2220
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
2221
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
2222
  SArray*                    pColRefArray = NULL;
×
2223
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[0];
×
UNCOV
2224
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
×
2225

UNCOV
2226
  if (hasPartition) {
×
2227
    pVtbScan->vtbUidToGroupIdMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2228
    pVtbScan->vtbGroupIdTagListMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
×
UNCOV
2229
    pVtbScan->vtbGroupIdToVgIdMapMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
×
2230

UNCOV
2231
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdToVgIdMapMap, code, line, _return, terrno)
×
2232
    QUERY_CHECK_NULL(pVtbScan->vtbUidToGroupIdMap, code, line, _return, terrno)
×
UNCOV
2233
    QUERY_CHECK_NULL(pVtbScan->vtbGroupIdTagListMap, code, line, _return, terrno)
×
UNCOV
2234
    taosHashSetFreeFp(pVtbScan->vtbGroupIdToVgIdMapMap, destroyOtbVgIdToOtbInfoArrayMap);
×
2235
  } else {
2236
    pVtbScan->otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
UNCOV
2237
    QUERY_CHECK_NULL(pVtbScan->otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
×
2238
  }
2239

2240
  while (true && hasPartition) {
×
UNCOV
2241
    SSDataBlock* pTagVal = NULL;
×
UNCOV
2242
    code = pTagScanOp->fpSet.getNextFn(pTagScanOp, &pTagVal);
×
UNCOV
2243
    QUERY_CHECK_CODE(code, line, _return);
×
UNCOV
2244
    if (pTagVal == NULL) {
×
UNCOV
2245
      break;
×
2246
    }
2247

2248
    SColumnInfoData *pUidCol = taosArrayGetLast(pTagVal->pDataBlock);
×
2249
    QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
×
UNCOV
2250
    for (int32_t i = 0; i < pTagVal->info.rows; i++) {
×
UNCOV
2251
      tb_uid_t uid = 0;
×
2252
      if (!colDataIsNull_s(pUidCol, i)) {
×
2253
        GET_TYPED_DATA(uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
×
2254
        QUERY_CHECK_CODE(code, line, _return);
×
2255
      }
2256
      code = taosHashPut(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid), &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId));
×
UNCOV
2257
      QUERY_CHECK_CODE(code, line, _return);
×
2258
    }
2259
    code = taosHashPut(pVtbScan->vtbGroupIdTagListMap, &pTagVal->info.id.groupId, sizeof(pTagVal->info.id.groupId), NULL, 0);
×
UNCOV
2260
    QUERY_CHECK_CODE(code, line, _return);
×
2261
  }
2262

UNCOV
2263
  for (int32_t i = 0; i < taosArrayGetSize(pVtbScan->childTableList); i++) {
×
UNCOV
2264
    SArray* pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, i);
×
UNCOV
2265
    QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
×
UNCOV
2266
    tb_uid_t uid = 0;
×
UNCOV
2267
    int32_t  vgId = 0;
×
UNCOV
2268
    code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
×
UNCOV
2269
    QUERY_CHECK_CODE(code, line, _return);
×
2270

UNCOV
2271
    SHashObj* otbVgIdToOtbInfoArrayMap = NULL;
×
UNCOV
2272
    if (hasPartition) {
×
UNCOV
2273
      uint64_t* groupId = (uint64_t *)taosHashGet(pVtbScan->vtbUidToGroupIdMap, &uid, sizeof(uid));
×
UNCOV
2274
      QUERY_CHECK_NULL(groupId, code, line, _return, terrno)
×
2275

UNCOV
2276
      void* pHashIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId));
×
UNCOV
2277
      if (pHashIter) {
×
UNCOV
2278
        otbVgIdToOtbInfoArrayMap = *(SHashObj**)pHashIter;
×
2279
      } else {
UNCOV
2280
        otbVgIdToOtbInfoArrayMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
×
UNCOV
2281
        QUERY_CHECK_NULL(otbVgIdToOtbInfoArrayMap, code, line, _return, terrno)
×
UNCOV
2282
        code = taosHashPut(pVtbScan->vtbGroupIdToVgIdMapMap, groupId, sizeof(*groupId), &otbVgIdToOtbInfoArrayMap, POINTER_BYTES);
×
UNCOV
2283
        QUERY_CHECK_CODE(code, line, _return);
×
2284
      }
2285
    } else {
UNCOV
2286
      otbVgIdToOtbInfoArrayMap = pVtbScan->otbVgIdToOtbInfoArrayMap;
×
2287
    }
2288

UNCOV
2289
    size_t len = 0;
×
UNCOV
2290
    void*  pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
×
UNCOV
2291
    while (pOrgTbInfo != NULL) {
×
UNCOV
2292
      char*       key = taosHashGetKey(pOrgTbInfo, &len);
×
UNCOV
2293
      SOrgTbInfo* orgTbInfo = (SOrgTbInfo*)pOrgTbInfo;
×
UNCOV
2294
      void* pIter = taosHashGet(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId));
×
UNCOV
2295
      if (!pIter) {
×
UNCOV
2296
        SArray* pOrgTbInfoArray = taosArrayInit(1, sizeof(SOrgTbInfo));
×
UNCOV
2297
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
×
2298
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
×
UNCOV
2299
        code = taosHashPut(otbVgIdToOtbInfoArrayMap, &orgTbInfo->vgId, sizeof(orgTbInfo->vgId), &pOrgTbInfoArray, POINTER_BYTES);
×
UNCOV
2300
        QUERY_CHECK_CODE(code, line, _return);
×
2301
      } else {
UNCOV
2302
        SArray* pOrgTbInfoArray = *(SArray**)pIter;
×
UNCOV
2303
        QUERY_CHECK_NULL(pOrgTbInfoArray, code, line, _return, terrno)
×
UNCOV
2304
        QUERY_CHECK_NULL(taosArrayPush(pOrgTbInfoArray, orgTbInfo), code, line, _return, terrno)
×
2305
      }
2306

UNCOV
2307
      pOrgTbInfo = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pOrgTbInfo);
×
2308

UNCOV
2309
      code = taosHashRemove(pVtbScan->otbNameToOtbInfoMap, key, len);
×
UNCOV
2310
      QUERY_CHECK_CODE(code, line, _return);
×
2311
    }
2312
  }
UNCOV
2313
  return code;
×
UNCOV
2314
_return:
×
UNCOV
2315
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2316
  return code;
×
2317
}
2318

2319
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
206,235✔
2320
  int32_t                    code = TSDB_CODE_SUCCESS;
206,235✔
2321
  int32_t                    line = 0;
206,235✔
2322
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
206,235✔
2323
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
206,235✔
2324
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
206,235✔
2325
  SArray*                    pColRefArray = NULL;
206,235✔
2326
  SOperatorInfo*             pSystableScanOp = NULL;
206,235✔
2327
  
2328
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
206,235✔
2329
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
206,235✔
2330

2331
  if (pInfo->qType == DYN_QTYPE_VTB_AGG) {
206,235✔
2332
    pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
2333
    QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
×
2334
    pSystableScanOp = pOperator->pDownstream[0];
×
2335
  } else {
2336
    pSystableScanOp = pOperator->pDownstream[1];
206,235✔
2337
  }
2338

2339
  while (true) {
417,691✔
2340
    SSDataBlock *pChildInfo = NULL;
623,926✔
2341
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
623,926✔
2342
    QUERY_CHECK_CODE(code, line, _return);
623,926✔
2343
    if (pChildInfo == NULL) {
623,926✔
2344
      break;
206,235✔
2345
    }
2346
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
417,691✔
2347
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
417,691✔
2348
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
417,691✔
2349

2350
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
417,691✔
2351
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
417,691✔
2352
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
417,691✔
2353

2354
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
42,832,573✔
2355
      if (!colDataIsNull_s(pStbNameCol, i)) {
84,829,764✔
2356
        char* stbrawname = colDataGetData(pStbNameCol, i);
42,414,882✔
2357
        char* dbrawname = colDataGetData(pDbNameCol, i);
42,414,882✔
2358
        char *ctbName = colDataGetData(pTableNameCol, i);
42,414,882✔
2359

2360
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
42,414,882✔
2361
          SColRefInfo info = {0};
33,165,118✔
2362
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
33,165,118✔
2363
          QUERY_CHECK_CODE(code, line, _return);
33,165,118✔
2364

2365
          if (pInfo->qType == DYN_QTYPE_VTB_SCAN) {
33,165,118✔
2366
            if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
33,165,118✔
2367
              qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
2368
                     pInfo->vtbScan.dynTbUid);
2369
              destroyColRefInfo(&info);
×
2370
              continue;
×
2371
            }
2372

2373
            if (pTaskInfo->pStreamRuntimeInfo) {
33,165,118✔
2374
              if (pVtbScan->curOrgTbVg == NULL) {
65,344✔
2375
                pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,452✔
2376
                QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
2,452✔
2377
              }
2378

2379
              if (info.colrefName) {
65,344✔
2380
                int32_t vgId;
35,136✔
2381
                code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
35,136✔
2382
                QUERY_CHECK_CODE(code, line, _return);
35,136✔
2383
                code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
35,136✔
2384
                QUERY_CHECK_CODE(code, line, _return);
35,136✔
2385
              }
2386
            }
2387
          }
2388

2389
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
33,165,118✔
2390
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
411,724✔
2391
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
411,724✔
2392
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
823,448✔
2393
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
411,724✔
2394
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
823,448✔
2395
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
411,724✔
2396
            QUERY_CHECK_CODE(code, line, _return);
411,724✔
2397
          } else {
2398
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
32,753,394✔
2399
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
32,753,394✔
2400
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
32,753,394✔
2401
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
32,753,394✔
2402
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
65,506,788✔
2403
          }
2404
        }
2405
      }
2406
    }
2407
  }
2408

2409
  switch (pInfo->qType) {
206,235✔
UNCOV
2410
    case DYN_QTYPE_VTB_AGG: {
×
2411
      if (pVtbScan->batchProcessChild) {
×
2412
        code = buildOrgTbInfoBatch(pOperator, pVtbScan->hasPartition);
×
2413
      } else {
2414
        code = buildOrgTbInfoSingle(pOperator, pVtbScan->hasPartition);
×
2415
      }
2416
      break;
×
2417
    }
2418
    case DYN_QTYPE_VTB_SCAN: {
206,235✔
2419
      code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
206,235✔
2420
      break;
206,235✔
2421
    }
2422
    default: {
×
UNCOV
2423
      code = TSDB_CODE_PLAN_INVALID_DYN_CTRL_TYPE;
×
2424
      break;
×
2425
    }
2426
  }
2427

2428
  QUERY_CHECK_CODE(code, line, _return);
206,235✔
2429

2430
_return:
206,235✔
2431
  if (code) {
206,235✔
2432
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,040✔
2433
  }
2434
  return code;
206,235✔
2435
}
2436

2437
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
115,665✔
2438
  int32_t                    code = TSDB_CODE_SUCCESS;
115,665✔
2439
  int32_t                    line = 0;
115,665✔
2440
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
115,665✔
2441
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
115,665✔
2442
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
115,665✔
2443
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
115,665✔
2444
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
115,665✔
2445
  int32_t                    rversion = 0;
115,665✔
2446

2447
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
115,665✔
2448
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
115,665✔
2449

2450
  while (true) {
224,387✔
2451
    SSDataBlock *pTableInfo = NULL;
340,052✔
2452
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
340,052✔
2453
    if (pTableInfo == NULL) {
340,052✔
2454
      break;
115,665✔
2455
    }
2456

2457
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
224,387✔
2458
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
224,387✔
2459
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
224,387✔
2460

2461
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
224,387✔
2462
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
224,387✔
2463
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
224,387✔
2464

2465
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
8,061,695✔
2466
      if (!colDataIsNull_s(pRefVerCol, i)) {
15,674,616✔
2467
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
7,837,308✔
2468
      }
2469

2470
      if (!colDataIsNull_s(pTableNameCol, i)) {
15,674,616✔
2471
        char* tbrawname = colDataGetData(pTableNameCol, i);
7,837,308✔
2472
        char* dbrawname = colDataGetData(pDbNameCol, i);
7,837,308✔
2473
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
7,837,308✔
2474
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
7,837,308✔
2475

2476
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
7,837,308✔
2477
          SColRefInfo info = {0};
571,310✔
2478
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
571,310✔
2479
          QUERY_CHECK_CODE(code, line, _return);
571,310✔
2480

2481
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
571,310✔
2482
            if (pVtbScan->curOrgTbVg == NULL) {
13,056✔
2483
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
816✔
2484
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
816✔
2485
            }
2486
            int32_t vgId;
13,056✔
2487
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
13,056✔
2488
            QUERY_CHECK_CODE(code, line, _return);
13,056✔
2489
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
13,056✔
2490
            QUERY_CHECK_CODE(code, line, _return);
13,056✔
2491
          }
2492

2493
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,142,620✔
2494
        }
2495
      }
2496
    }
2497
  }
2498
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
115,665✔
2499
  QUERY_CHECK_CODE(code, line, _return);
115,665✔
2500

2501
_return:
113,217✔
2502
  if (code) {
115,665✔
2503
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,448✔
2504
  }
2505
  return code;
115,665✔
2506
}
2507

2508
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
3,055,920✔
2509
  int32_t                    code = TSDB_CODE_SUCCESS;
3,055,920✔
2510
  int32_t                    line = 0;
3,055,920✔
2511
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3,055,920✔
2512
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
3,056,328✔
2513
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
3,056,328✔
2514

2515
  SArray *tmpArray = NULL;
3,056,328✔
2516
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
3,056,328✔
2517
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
3,056,328✔
2518
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
8,976✔
2519
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
4,488✔
2520
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
4,488✔
2521
      QUERY_CHECK_CODE(code, line, _return);
4,488✔
2522
      if (pVtbScan->newAddedVgInfo == NULL) {
4,488✔
2523
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,632✔
2524
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
1,632✔
2525
      }
2526
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
4,488✔
2527
      QUERY_CHECK_CODE(code, line, _return);
4,488✔
2528
    }
2529
    pVtbScan->needRedeploy = false;
4,488✔
2530
  } else {
2531
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
3,051,432✔
2532
    QUERY_CHECK_CODE(code, line, _return);
3,051,432✔
2533
  }
2534

2535
_return:
×
2536
  taosArrayClear(tmpArray);
3,055,920✔
2537
  taosArrayDestroy(tmpArray);
3,056,328✔
2538
  if (code) {
3,056,328✔
2539
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
3,051,840✔
2540
  }
2541
  return code;
3,056,328✔
2542
}
2543

2544
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
521,677✔
2545
  int32_t                    code = TSDB_CODE_SUCCESS;
521,677✔
2546
  int32_t                    line = 0;
521,677✔
2547
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
521,677✔
2548
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
521,677✔
2549
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
521,677✔
2550

2551
  pVtbScan->vtbScanParam = NULL;
521,677✔
2552
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
521,677✔
2553
  QUERY_CHECK_CODE(code, line, _return);
521,677✔
2554

2555
  void* pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, NULL);
521,677✔
2556
  while (pIter != NULL) {
1,384,388✔
2557
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
862,711✔
2558
    SOperatorParam*  pExchangeParam = NULL;
862,711✔
2559
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
862,711✔
2560
    if (addr != NULL) {
862,711✔
2561
      code = buildExchangeOperatorParamForVScanEx(&pExchangeParam, 0, pMap, pTaskInfo->id.taskId, addr);
4,488✔
2562
      QUERY_CHECK_CODE(code, line, _return);
4,488✔
2563
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
4,488✔
2564
      QUERY_CHECK_CODE(code, line, _return);
4,488✔
2565
    } else {
2566
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
858,223✔
2567
      QUERY_CHECK_CODE(code, line, _return);
858,223✔
2568
    }
2569
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
1,725,422✔
2570
    pIter = taosHashIterate(pVtbScan->otbNameToOtbInfoMap, pIter);
862,711✔
2571
  }
2572

2573
  SOperatorParam*  pExchangeParam = NULL;
521,677✔
2574
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
521,677✔
2575
  QUERY_CHECK_CODE(code, line, _return);
521,677✔
2576
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
521,677✔
2577

2578
_return:
521,677✔
2579
  if (code) {
521,677✔
2580
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2581
  }
2582
  return code;
521,677✔
2583
}
2584

2585
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,903,413✔
2586
  int32_t                    code = TSDB_CODE_SUCCESS;
1,903,413✔
2587
  int32_t                    line = 0;
1,903,413✔
2588
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,903,413✔
2589
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,903,413✔
2590
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
1,903,413✔
2591

2592
  pVtbScan->otbNameToOtbInfoMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,903,413✔
2593
  QUERY_CHECK_NULL(pVtbScan->otbNameToOtbInfoMap, code, line, _return, terrno)
1,903,413✔
2594
  taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
1,903,413✔
2595

2596
  while (true) {
2597
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
2,107,678✔
2598
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
1,586,001✔
2599
      QUERY_CHECK_CODE(code, line, _return);
1,586,001✔
2600
    } else {
2601
      taosHashClear(pVtbScan->otbNameToOtbInfoMap);
521,677✔
2602
      SArray* pColRefInfo = NULL;
521,677✔
2603
      if (pVtbScan->isSuperTable) {
521,677✔
2604
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
408,460✔
2605
      } else {
2606
        pColRefInfo = pInfo->vtbScan.colRefInfo;
113,217✔
2607
      }
2608
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
521,677✔
2609

2610
      tb_uid_t uid = 0;
521,677✔
2611
      int32_t  vgId = 0;
521,677✔
2612
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
521,677✔
2613
      QUERY_CHECK_CODE(code, line, _return);
521,677✔
2614

2615
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
521,677✔
2616
      QUERY_CHECK_CODE(code, line, _return);
521,677✔
2617

2618
      // reset downstream operator's status
2619
      pVtbScanOp->status = OP_NOT_OPENED;
521,677✔
2620
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
521,677✔
2621
      QUERY_CHECK_CODE(code, line, _return);
520,985✔
2622
    }
2623

2624
    if (*pRes) {
2,106,986✔
2625
      // has result, still read data from this table.
2626
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
1,586,001✔
2627
      break;
1,586,001✔
2628
    } else {
2629
      // no result, read next table.
2630
      pVtbScan->curTableIdx++;
520,985✔
2631
      if (pVtbScan->isSuperTable) {
520,985✔
2632
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
407,768✔
2633
          setOperatorCompleted(pOperator);
203,503✔
2634
          break;
203,503✔
2635
        }
2636
      } else {
2637
        setOperatorCompleted(pOperator);
113,217✔
2638
        break;
113,217✔
2639
      }
2640
    }
2641
  }
2642

2643
_return:
1,902,721✔
2644
  taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
1,902,721✔
2645
  pVtbScan->otbNameToOtbInfoMap = NULL;
1,902,721✔
2646
  if (code) {
1,902,721✔
2647
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2648
  }
2649
  return code;
1,902,721✔
2650
}
2651

2652
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
1,907,901✔
2653
  int32_t                    code = TSDB_CODE_SUCCESS;
1,907,901✔
2654
  int32_t                    line = 0;
1,907,901✔
2655
  int64_t                    st = 0;
1,907,901✔
2656
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,907,901✔
2657
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,907,901✔
2658

2659
  if (OPTR_IS_OPENED(pOperator)) {
1,907,901✔
2660
    return code;
1,586,001✔
2661
  }
2662

2663
  if (pOperator->cost.openCost == 0) {
321,900✔
2664
    st = taosGetTimestampUs();
237,436✔
2665
  }
2666

2667
  if (pVtbScan->isSuperTable) {
321,900✔
2668
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
206,235✔
2669
    QUERY_CHECK_CODE(code, line, _return);
206,235✔
2670
  } else {
2671
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
115,665✔
2672
    QUERY_CHECK_CODE(code, line, _return);
115,665✔
2673
  }
2674

2675
  OPTR_SET_OPENED(pOperator);
317,412✔
2676

2677
_return:
321,900✔
2678
  if (pOperator->cost.openCost == 0) {
321,900✔
2679
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
237,436✔
2680
  }
2681
  if (code) {
321,900✔
2682
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
4,488✔
2683
    pOperator->pTaskInfo->code = code;
4,488✔
2684
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
4,488✔
2685
  }
2686
  return code;
317,412✔
2687
}
2688

2689
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
4,959,333✔
2690
  int32_t                    code = TSDB_CODE_SUCCESS;
4,959,333✔
2691
  int32_t                    line = 0;
4,959,333✔
2692
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,959,333✔
2693
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,959,741✔
2694

2695
  QRY_PARAM_CHECK(pRes);
4,959,333✔
2696
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
4,959,741✔
UNCOV
2697
    return code;
×
2698
  }
2699
  if (pOperator->pOperatorGetParam) {
4,959,741✔
2700
    if (pOperator->status == OP_EXEC_DONE) {
×
2701
      pOperator->status = OP_OPENED;
×
2702
    }
2703
    pVtbScan->curTableIdx = 0;
×
2704
    pVtbScan->lastTableIdx = -1;
×
2705
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
2706
    pOperator->pOperatorGetParam = NULL;
×
2707
  } else {
2708
    pVtbScan->window.skey = INT64_MAX;
4,959,333✔
2709
    pVtbScan->window.ekey = INT64_MIN;
4,959,333✔
2710
  }
2711

2712
  if (pVtbScan->needRedeploy) {
4,959,741✔
2713
    code = virtualTableScanCheckNeedRedeploy(pOperator);
3,056,328✔
2714
    QUERY_CHECK_CODE(code, line, _return);
3,056,328✔
2715
  }
2716

2717
  code = pOperator->fpSet._openFn(pOperator);
1,907,901✔
2718
  QUERY_CHECK_CODE(code, line, _return);
1,903,413✔
2719

2720
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
1,903,413✔
2721
    setOperatorCompleted(pOperator);
×
UNCOV
2722
    return code;
×
2723
  }
2724

2725
  code = virtualTableScanGetNext(pOperator, pRes);
1,903,413✔
2726
  QUERY_CHECK_CODE(code, line, _return);
1,902,721✔
2727

2728
  return code;
1,902,721✔
2729

2730
_return:
3,051,840✔
2731
  if (code) {
3,051,840✔
2732
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
3,051,840✔
2733
    pOperator->pTaskInfo->code = code;
3,051,840✔
2734
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
3,051,840✔
2735
  }
2736
  return code;
×
2737
}
2738

2739
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
518,519✔
2740
  if (batchFetch) {
518,519✔
2741
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
517,847✔
2742
    if (NULL == pPrev->leftHash) {
517,847✔
UNCOV
2743
      return terrno;
×
2744
    }
2745
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
517,847✔
2746
    if (NULL == pPrev->rightHash) {
517,847✔
2747
      return terrno;
×
2748
    }
2749
  } else {
2750
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
672✔
2751
    if (NULL == pPrev->leftCache) {
672✔
2752
      return terrno;
×
2753
    }
2754
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
672✔
2755
    if (NULL == pPrev->rightCache) {
672✔
UNCOV
2756
      return terrno;
×
2757
    }
2758
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
672✔
2759
    if (NULL == pPrev->onceTable) {
672✔
2760
      return terrno;
×
2761
    }
2762
  }
2763

2764
  return TSDB_CODE_SUCCESS;
518,519✔
2765
}
2766

2767
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2768
  if (pStreamRuntimeInfo == NULL) {
×
UNCOV
2769
    return;
×
2770
  }
2771

2772
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
UNCOV
2773
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2774
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
UNCOV
2775
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2776
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2777

UNCOV
2778
      pVtbScan->dynTbUid = pValue->uid;
×
2779
      break;
×
2780
    }
2781
  }
2782
}
2783

2784
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
237,436✔
2785
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2786
  int32_t      code = TSDB_CODE_SUCCESS;
237,436✔
2787
  int32_t      line = 0;
237,436✔
2788

2789
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
237,436✔
2790
  QUERY_CHECK_CODE(code, line, _return);
237,436✔
2791

2792
  pInfo->vtbScan.genNewParam = true;
237,436✔
2793
  pInfo->vtbScan.batchProcessChild = pPhyciNode->vtbScan.batchProcessChild;
237,436✔
2794
  pInfo->vtbScan.hasPartition = pPhyciNode->vtbScan.hasPartition;
237,436✔
2795
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
237,436✔
2796
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
237,436✔
2797
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
237,436✔
2798
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
237,436✔
2799
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
237,436✔
2800
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
237,436✔
2801
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
237,436✔
2802
  pInfo->vtbScan.needRedeploy = false;
237,436✔
2803
  pInfo->vtbScan.pMsgCb = pMsgCb;
237,436✔
2804
  pInfo->vtbScan.curTableIdx = 0;
237,436✔
2805
  pInfo->vtbScan.lastTableIdx = -1;
237,436✔
2806
  pInfo->vtbScan.dynTbUid = 0;
237,436✔
2807
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
237,436✔
2808
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
237,436✔
2809
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
237,436✔
2810
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
237,436✔
2811
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
237,436✔
2812
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
237,436✔
2813
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
670,491✔
2814
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
433,055✔
2815
    int32_t vgId = (int32_t)valueNode->datum.i;
433,055✔
2816
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
433,055✔
2817
    QUERY_CHECK_CODE(code, line, _return);
433,055✔
2818
  }
2819

2820
  if (pPhyciNode->dynTbname && pTaskInfo) {
237,436✔
2821
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2822
  }
2823

2824
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
237,436✔
2825
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
237,436✔
2826

2827
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
1,635,174✔
2828
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
1,397,738✔
2829
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
1,397,738✔
2830
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
2,795,476✔
2831
  }
2832

2833
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
237,436✔
2834
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
237,436✔
2835

2836
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
237,436✔
2837
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
237,436✔
2838

2839
  pInfo->vtbScan.otbNameToOtbInfoMap = NULL;
237,436✔
2840
  pInfo->vtbScan.otbVgIdToOtbInfoArrayMap = NULL;
237,436✔
2841
  pInfo->vtbScan.vtbUidToVgIdMapMap = NULL;
237,436✔
2842
  pInfo->vtbScan.vtbGroupIdToVgIdMapMap = NULL;
237,436✔
2843
  pInfo->vtbScan.vtbUidTagListMap = NULL;
237,436✔
2844
  pInfo->vtbScan.vtbGroupIdTagListMap = NULL;
237,436✔
2845
  pInfo->vtbScan.vtbUidToGroupIdMap = NULL;
237,436✔
2846

2847
  return code;
237,436✔
UNCOV
2848
_return:
×
2849
  // no need to destroy array and hashmap allocated in this function,
2850
  // since the operator's destroy function will take care of it
UNCOV
2851
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2852
  return code;
×
2853
}
2854

UNCOV
2855
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
×
2856
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
UNCOV
2857
  int32_t              code = TSDB_CODE_SUCCESS;
×
2858
  int32_t              line = 0;
×
2859
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
×
2860

UNCOV
2861
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
×
UNCOV
2862
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
×
UNCOV
2863
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
×
UNCOV
2864
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
×
UNCOV
2865
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
×
UNCOV
2866
  pInfo->vtbWindow.singleWinMode = pPhyciNode->vtbWindow.singleWinMode;
×
UNCOV
2867
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
×
2868

UNCOV
2869
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
×
UNCOV
2870
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
×
2871

UNCOV
2872
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
×
2873
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
×
2874

2875
  pInfo->vtbWindow.outputWstartSlotId = -1;
×
UNCOV
2876
  pInfo->vtbWindow.outputWendSlotId = -1;
×
UNCOV
2877
  pInfo->vtbWindow.outputWdurationSlotId = -1;
×
2878
  pInfo->vtbWindow.curWinBatchIdx = 0;
×
2879

UNCOV
2880
  initResultSizeInfo(&pOperator->resultInfo, 1);
×
UNCOV
2881
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
×
UNCOV
2882
  QUERY_CHECK_CODE(code, line, _return);
×
2883

UNCOV
2884
  return code;
×
UNCOV
2885
_return:
×
UNCOV
2886
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2887
  return code;
×
2888
}
2889

UNCOV
2890
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
×
UNCOV
2891
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
2892
  int32_t lino = 0;
×
2893

UNCOV
2894
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
×
UNCOV
2895
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
×
UNCOV
2896
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
×
2897

UNCOV
2898
    *ppTsCols = (int64_t*)pColDataInfo->pData;
×
2899

2900
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
×
2901
      code = blockDataUpdateTsWindow(pBlock, slotId);
×
2902
      QUERY_CHECK_CODE(code, lino, _return);
×
2903
    }
2904
  }
2905

2906
  return code;
×
2907
_return:
×
UNCOV
2908
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2909
  return code;
×
2910
}
2911

2912
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
2913
  int32_t                       code = TSDB_CODE_SUCCESS;
×
UNCOV
2914
  int32_t                       lino = 0;
×
2915
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2916

2917
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
2918
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2919

UNCOV
2920
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
UNCOV
2921
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2922

UNCOV
2923
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
UNCOV
2924
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2925

UNCOV
2926
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
UNCOV
2927
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2928

UNCOV
2929
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
2930
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2931

UNCOV
2932
  SOperatorParam* pExchangeOperator = NULL;
×
UNCOV
2933
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
UNCOV
2934
  QUERY_CHECK_CODE(code, lino, _return);
×
UNCOV
2935
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
×
2936

UNCOV
2937
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
UNCOV
2938
  (*ppRes)->downstreamIdx = idx;
×
UNCOV
2939
  (*ppRes)->value = pExtWinOp;
×
UNCOV
2940
  (*ppRes)->reUse = false;
×
2941

UNCOV
2942
  return code;
×
UNCOV
2943
_return:
×
UNCOV
2944
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2945
  if (pExtWinOp) {
×
UNCOV
2946
    if (pExtWinOp->ExtWins) {
×
UNCOV
2947
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2948
    }
UNCOV
2949
    taosMemoryFree(pExtWinOp);
×
2950
  }
UNCOV
2951
  if (*ppRes) {
×
UNCOV
2952
    if ((*ppRes)->pChildren) {
×
UNCOV
2953
      taosArrayDestroy((*ppRes)->pChildren);
×
2954
    }
UNCOV
2955
    taosMemoryFree(*ppRes);
×
UNCOV
2956
    *ppRes = NULL;
×
2957
  }
UNCOV
2958
  return code;
×
2959
}
2960

UNCOV
2961
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
×
2962
                                       int32_t numOfDownstream, int32_t numOfWins) {
2963
  int32_t                   code = TSDB_CODE_SUCCESS;
×
2964
  int32_t                   lino = 0;
×
2965
  SMergeOperatorParam*      pMergeOp = NULL;
×
2966

2967
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2968
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2969

2970
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
×
2971
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2972

UNCOV
2973
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
×
UNCOV
2974
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
×
2975

UNCOV
2976
  pMergeOp->winNum = numOfWins;
×
2977

UNCOV
2978
  for (int32_t i = 0; i < numOfDownstream; i++) {
×
UNCOV
2979
    SOperatorParam* pExternalWinParam = NULL;
×
UNCOV
2980
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
×
UNCOV
2981
    QUERY_CHECK_CODE(code, lino, _return);
×
2982
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
×
2983
  }
2984

UNCOV
2985
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
×
2986
  (*ppRes)->downstreamIdx = 0;
×
2987
  (*ppRes)->value = pMergeOp;
×
2988
  (*ppRes)->reUse = false;
×
2989

UNCOV
2990
  return TSDB_CODE_SUCCESS;
×
UNCOV
2991
_return:
×
UNCOV
2992
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2993
  if (pMergeOp) {
×
UNCOV
2994
    taosMemoryFree(pMergeOp);
×
2995
  }
UNCOV
2996
  if (*ppRes) {
×
UNCOV
2997
    if ((*ppRes)->pChildren) {
×
UNCOV
2998
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
UNCOV
2999
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
UNCOV
3000
        if (pChildParam) {
×
UNCOV
3001
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
UNCOV
3002
          if (pExtWinOp) {
×
UNCOV
3003
            if (pExtWinOp->ExtWins) {
×
UNCOV
3004
              taosArrayDestroy(pExtWinOp->ExtWins);
×
3005
            }
UNCOV
3006
            taosMemoryFree(pExtWinOp);
×
3007
          }
UNCOV
3008
          taosMemoryFree(pChildParam);
×
3009
        }
3010
      }
UNCOV
3011
      taosArrayDestroy((*ppRes)->pChildren);
×
3012
    }
UNCOV
3013
    taosMemoryFree(*ppRes);
×
UNCOV
3014
    *ppRes = NULL;
×
3015
  }
UNCOV
3016
  return code;
×
3017
}
3018

UNCOV
3019
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
×
UNCOV
3020
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
3021
  int32_t                    lino = 0;
×
UNCOV
3022
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
UNCOV
3023
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
3024
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
UNCOV
3025
  int64_t                    st = 0;
×
3026

UNCOV
3027
  if (OPTR_IS_OPENED(pOperator)) {
×
UNCOV
3028
    return code;
×
3029
  }
3030

UNCOV
3031
  if (pOperator->cost.openCost == 0) {
×
UNCOV
3032
    st = taosGetTimestampUs();
×
3033
  }
3034

UNCOV
3035
  while (1) {
×
UNCOV
3036
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
UNCOV
3037
    if (pBlock == NULL) {
×
UNCOV
3038
      break;
×
3039
    }
3040

UNCOV
3041
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
×
UNCOV
3042
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
×
UNCOV
3043
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
×
UNCOV
3044
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
×
UNCOV
3045
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
×
UNCOV
3046
            pInfo->outputWstartSlotId = i;
×
UNCOV
3047
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
×
UNCOV
3048
            pInfo->outputWendSlotId = i;
×
UNCOV
3049
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
×
UNCOV
3050
            pInfo->outputWdurationSlotId = i;
×
3051
          }
3052
        }
3053
      }
3054
    }
3055

UNCOV
3056
    TSKEY* wstartCol = NULL;
×
UNCOV
3057
    TSKEY* wendCol = NULL;
×
3058

UNCOV
3059
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
×
UNCOV
3060
    QUERY_CHECK_CODE(code, lino, _return);
×
UNCOV
3061
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
×
UNCOV
3062
    QUERY_CHECK_CODE(code, lino, _return);
×
3063

UNCOV
3064
    if (pDynInfo->vtbWindow.singleWinMode) {
×
UNCOV
3065
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
UNCOV
3066
        SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
UNCOV
3067
        QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
3068

UNCOV
3069
        QUERY_CHECK_NULL(taosArrayReserve(pWin, 1), code, lino, _return, terrno);
×
3070

UNCOV
3071
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, 0);
×
UNCOV
3072
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
UNCOV
3073
        pWindow->tw.skey = wstartCol[i];
×
UNCOV
3074
        pWindow->tw.ekey = wendCol[i] + 1;
×
UNCOV
3075
        pWindow->winOutIdx = -1;
×
3076

UNCOV
3077
        QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
3078
      }
3079
    } else {
UNCOV
3080
      SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
UNCOV
3081
      QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
3082

UNCOV
3083
      QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
×
3084

UNCOV
3085
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
UNCOV
3086
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
×
UNCOV
3087
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
UNCOV
3088
        pWindow->tw.skey = wstartCol[i];
×
UNCOV
3089
        pWindow->tw.ekey = wendCol[i] + 1;
×
UNCOV
3090
        pWindow->winOutIdx = -1;
×
3091
      }
3092

UNCOV
3093
      QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
3094
    }
3095
  }
3096

3097
  // handle first window's start key and last window's end key
UNCOV
3098
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
×
UNCOV
3099
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
×
3100

UNCOV
3101
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
×
UNCOV
3102
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
×
3103

UNCOV
3104
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
×
UNCOV
3105
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
×
3106

UNCOV
3107
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
UNCOV
3108
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
3109

UNCOV
3110
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
×
UNCOV
3111
    lastWin->tw.ekey = INT64_MAX;
×
3112
  }
UNCOV
3113
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
×
UNCOV
3114
    firstWin->tw.skey = INT64_MIN;
×
3115
  }
3116

UNCOV
3117
  OPTR_SET_OPENED(pOperator);
×
3118

UNCOV
3119
  if (pOperator->cost.openCost == 0) {
×
UNCOV
3120
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
3121
  }
3122

UNCOV
3123
_return:
×
UNCOV
3124
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3125
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3126
    pTaskInfo->code = code;
×
UNCOV
3127
    T_LONG_JMP(pTaskInfo->env, code);
×
3128
  }
UNCOV
3129
  return code;
×
3130
}
3131

UNCOV
3132
static int32_t buildDynQueryCtrlOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
UNCOV
3133
  int32_t                     code = TSDB_CODE_SUCCESS;
×
UNCOV
3134
  int32_t                     lino = 0;
×
UNCOV
3135
  SDynQueryCtrlOperatorParam* pDyn = NULL;
×
3136

UNCOV
3137
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
3138
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
3139

UNCOV
3140
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
3141
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
3142

UNCOV
3143
  pDyn = taosMemoryMalloc(sizeof(SDynQueryCtrlOperatorParam));
×
UNCOV
3144
  QUERY_CHECK_NULL(pDyn, code, lino, _return, terrno);
×
3145

UNCOV
3146
  pDyn->window.skey = skey;
×
UNCOV
3147
  pDyn->window.ekey = ekey;
×
3148

UNCOV
3149
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL;
×
UNCOV
3150
  (*ppRes)->downstreamIdx = 0;
×
UNCOV
3151
  (*ppRes)->reUse = false;
×
UNCOV
3152
  (*ppRes)->value = pDyn;
×
3153

UNCOV
3154
  return code;
×
UNCOV
3155
_return:
×
UNCOV
3156
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3157
  if (pDyn) {
×
UNCOV
3158
    taosMemoryFree(pDyn);
×
3159
  }
UNCOV
3160
  if (*ppRes) {
×
UNCOV
3161
    if ((*ppRes)->pChildren) {
×
UNCOV
3162
      taosArrayDestroy((*ppRes)->pChildren);
×
3163
    }
UNCOV
3164
    taosMemoryFree(*ppRes);
×
UNCOV
3165
    *ppRes = NULL;
×
3166
  }
UNCOV
3167
  return code;
×
3168
}
3169

UNCOV
3170
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
UNCOV
3171
  int32_t                       code = TSDB_CODE_SUCCESS;
×
UNCOV
3172
  int32_t                       lino = 0;
×
UNCOV
3173
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
3174

UNCOV
3175
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
3176
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
3177

UNCOV
3178
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
UNCOV
3179
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
3180

UNCOV
3181
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
UNCOV
3182
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
3183

UNCOV
3184
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
UNCOV
3185
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
3186

UNCOV
3187
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
3188
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
3189

UNCOV
3190
  SOperatorParam* pDynQueryCtrlParam = NULL;
×
UNCOV
3191
  code = buildDynQueryCtrlOperatorParamForExternalWindow(&pDynQueryCtrlParam, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
UNCOV
3192
  QUERY_CHECK_CODE(code, lino, _return);
×
UNCOV
3193
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pDynQueryCtrlParam), code, lino, _return, terrno)
×
3194

UNCOV
3195
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
UNCOV
3196
  (*ppRes)->downstreamIdx = idx;
×
UNCOV
3197
  (*ppRes)->value = pExtWinOp;
×
UNCOV
3198
  (*ppRes)->reUse = false;
×
3199

UNCOV
3200
  return code;
×
UNCOV
3201
_return:
×
UNCOV
3202
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3203
  if (pExtWinOp) {
×
UNCOV
3204
    if (pExtWinOp->ExtWins) {
×
UNCOV
3205
      taosArrayDestroy(pExtWinOp->ExtWins);
×
3206
    }
UNCOV
3207
    taosMemoryFree(pExtWinOp);
×
3208
  }
UNCOV
3209
  if (*ppRes) {
×
UNCOV
3210
    if ((*ppRes)->pChildren) {
×
UNCOV
3211
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
UNCOV
3212
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
UNCOV
3213
        if (pChildParam) {
×
UNCOV
3214
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
UNCOV
3215
          if (pDynParam) {
×
UNCOV
3216
            taosMemoryFree(pDynParam);
×
3217
          }
UNCOV
3218
          taosMemoryFree(pChildParam);
×
3219
        }
3220
      }
UNCOV
3221
      taosArrayDestroy((*ppRes)->pChildren);
×
3222
    }
UNCOV
3223
    taosMemoryFree(*ppRes);
×
UNCOV
3224
    *ppRes = NULL;
×
3225
  }
UNCOV
3226
  return code;
×
3227
}
3228

UNCOV
3229
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
3230
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
3231
  int32_t                    lino = 0;
×
UNCOV
3232
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
UNCOV
3233
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
3234
  int64_t                    st = taosGetTimestampUs();
×
UNCOV
3235
  int32_t                    numOfWins = 0;
×
UNCOV
3236
  SOperatorInfo*             mergeOp = NULL;
×
UNCOV
3237
  SOperatorInfo*             extWinOp = NULL;
×
UNCOV
3238
  SOperatorParam*            pMergeParam = NULL;
×
UNCOV
3239
  SOperatorParam*            pExtWinParam = NULL;
×
UNCOV
3240
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
UNCOV
3241
  SSDataBlock*               pRes = pInfo->pRes;
×
3242

UNCOV
3243
  code = pOperator->fpSet._openFn(pOperator);
×
UNCOV
3244
  QUERY_CHECK_CODE(code, lino, _return);
×
3245

UNCOV
3246
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
×
UNCOV
3247
    *ppRes = NULL;
×
UNCOV
3248
    return code;
×
3249
  }
3250

UNCOV
3251
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
×
UNCOV
3252
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
×
3253

UNCOV
3254
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
×
3255

UNCOV
3256
  if (pInfo->isVstb) {
×
UNCOV
3257
    extWinOp = pOperator->pDownstream[1];
×
UNCOV
3258
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
×
UNCOV
3259
    QUERY_CHECK_CODE(code, lino, _return);
×
3260

UNCOV
3261
    SSDataBlock* pExtWinBlock = NULL;
×
UNCOV
3262
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
×
UNCOV
3263
    QUERY_CHECK_CODE(code, lino, _return);
×
3264

UNCOV
3265
    blockDataCleanup(pRes);
×
UNCOV
3266
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
UNCOV
3267
    QUERY_CHECK_CODE(code, lino, _return);
×
3268

UNCOV
3269
    if (pExtWinBlock) {
×
UNCOV
3270
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
×
UNCOV
3271
      QUERY_CHECK_CODE(code, lino, _return);
×
3272

UNCOV
3273
      if (pInfo->curWinBatchIdx == 0) {
×
3274
        // first batch, get _wstart from pMergedBlock
UNCOV
3275
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
UNCOV
3276
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
3277

UNCOV
3278
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
×
3279
      }
UNCOV
3280
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
3281
        // last batch, get _wend from pMergedBlock
UNCOV
3282
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
UNCOV
3283
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
3284

UNCOV
3285
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
×
3286
      }
3287
    }
3288
  } else {
UNCOV
3289
    mergeOp = pOperator->pDownstream[1];
×
UNCOV
3290
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
×
UNCOV
3291
    QUERY_CHECK_CODE(code, lino, _return);
×
3292

UNCOV
3293
    SSDataBlock* pMergedBlock = NULL;
×
UNCOV
3294
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
×
UNCOV
3295
    QUERY_CHECK_CODE(code, lino, _return);
×
3296

UNCOV
3297
    blockDataCleanup(pRes);
×
UNCOV
3298
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
UNCOV
3299
    QUERY_CHECK_CODE(code, lino, _return);
×
3300

UNCOV
3301
    if (pMergedBlock) {
×
UNCOV
3302
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
×
UNCOV
3303
      QUERY_CHECK_CODE(code, lino, _return);
×
3304

UNCOV
3305
      if (pInfo->curWinBatchIdx == 0) {
×
3306
        // first batch, get _wstart from pMergedBlock
UNCOV
3307
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
UNCOV
3308
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
3309

UNCOV
3310
        firstWin->tw.skey = pMergedBlock->info.window.skey;
×
3311
      }
UNCOV
3312
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
3313
        // last batch, get _wend from pMergedBlock
UNCOV
3314
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
UNCOV
3315
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
3316

UNCOV
3317
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
×
3318
      }
3319
    }
3320
  }
3321

3322

UNCOV
3323
  if (pInfo->outputWstartSlotId != -1) {
×
UNCOV
3324
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
×
UNCOV
3325
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
×
3326

UNCOV
3327
    for (int32_t i = 0; i < numOfWins; i++) {
×
UNCOV
3328
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
UNCOV
3329
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
UNCOV
3330
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
×
UNCOV
3331
      QUERY_CHECK_CODE(code, lino, _return);
×
3332
    }
3333
  }
UNCOV
3334
  if (pInfo->outputWendSlotId != -1) {
×
UNCOV
3335
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
×
UNCOV
3336
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
×
3337

UNCOV
3338
    for (int32_t i = 0; i < numOfWins; i++) {
×
UNCOV
3339
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
UNCOV
3340
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
UNCOV
3341
      TSKEY ekey = pWindow->tw.ekey - 1;
×
UNCOV
3342
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
×
UNCOV
3343
      QUERY_CHECK_CODE(code, lino, _return);
×
3344
    }
3345
  }
UNCOV
3346
  if (pInfo->outputWdurationSlotId != -1) {
×
UNCOV
3347
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
×
UNCOV
3348
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
×
3349

UNCOV
3350
    for (int32_t i = 0; i < numOfWins; i++) {
×
UNCOV
3351
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
UNCOV
3352
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
UNCOV
3353
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
×
UNCOV
3354
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
×
UNCOV
3355
      QUERY_CHECK_CODE(code, lino, _return);
×
3356
    }
3357
  }
3358

UNCOV
3359
  pRes->info.rows = numOfWins;
×
UNCOV
3360
  *ppRes = pRes;
×
UNCOV
3361
  pInfo->curWinBatchIdx++;
×
3362

UNCOV
3363
  return code;
×
3364

UNCOV
3365
_return:
×
UNCOV
3366
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3367
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3368
    pTaskInfo->code = code;
×
UNCOV
3369
    T_LONG_JMP(pTaskInfo->env, code);
×
3370
  }
UNCOV
3371
  return code;
×
3372
}
3373

3374
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
3,253,299✔
3375
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
3,253,299✔
3376
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
3,255,747✔
3377
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
3,255,339✔
3378

3379
  pOper->status = OP_NOT_OPENED;
3,255,747✔
3380

3381
  switch (pDyn->qType) {
3,253,707✔
3382
    case DYN_QTYPE_STB_HASH:{
1,326✔
3383
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
1,326✔
3384
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
1,326✔
3385
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,326✔
3386
      
3387
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
1,326✔
3388
      if (TSDB_CODE_SUCCESS != code) {
1,326✔
UNCOV
3389
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
UNCOV
3390
        return code;
×
3391
      }
3392
      pStbJoin->ctx.prev.pListHead = NULL;
1,326✔
3393
      pStbJoin->ctx.prev.joinBuild = false;
1,326✔
3394
      pStbJoin->ctx.prev.pListTail = NULL;
1,326✔
3395
      pStbJoin->ctx.prev.tableNum = 0;
1,326✔
3396

3397
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
1,326✔
3398
      break; 
1,326✔
3399
    }
3400
    case DYN_QTYPE_VTB_SCAN: {
3,254,421✔
3401
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
3,254,421✔
3402
      
3403
      if (pVtbScan->otbNameToOtbInfoMap) {
3,253,605✔
UNCOV
3404
        taosHashSetFreeFp(pVtbScan->otbNameToOtbInfoMap, destroySOrgTbInfo);
×
UNCOV
3405
        taosHashCleanup(pVtbScan->otbNameToOtbInfoMap);
×
UNCOV
3406
        pVtbScan->otbNameToOtbInfoMap = NULL;
×
3407
      }
3408
      if (pVtbScan->pRsp) {
3,252,789✔
UNCOV
3409
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
UNCOV
3410
        taosMemoryFreeClear(pVtbScan->pRsp);
×
3411
      }
3412
      if (pVtbScan->colRefInfo) {
3,254,013✔
3413
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
115,665✔
3414
        pVtbScan->colRefInfo = NULL;
115,665✔
3415
      }
3416
      if (pVtbScan->childTableMap) {
3,252,381✔
3417
        taosHashCleanup(pVtbScan->childTableMap);
10,616✔
3418
        pVtbScan->childTableMap = NULL;
10,616✔
3419
      }
3420
      if (pVtbScan->childTableList) {
3,251,973✔
3421
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
3,252,381✔
3422
      }
3423
      if (pPhyciNode->dynTbname && pTaskInfo) {
3,253,605✔
UNCOV
3424
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
3425
      }
3426
      pVtbScan->curTableIdx = 0;
3,253,605✔
3427
      pVtbScan->lastTableIdx = -1;
3,254,829✔
3428
      break;
3,254,421✔
3429
    }
UNCOV
3430
    case DYN_QTYPE_VTB_WINDOW: {
×
UNCOV
3431
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
UNCOV
3432
      if (pVtbWindow->pRes) {
×
UNCOV
3433
        blockDataDestroy(pVtbWindow->pRes);
×
UNCOV
3434
        pVtbWindow->pRes = NULL;
×
3435
      }
UNCOV
3436
      if (pVtbWindow->pWins) {
×
UNCOV
3437
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
UNCOV
3438
        pVtbWindow->pWins = NULL;
×
3439
      }
UNCOV
3440
      pVtbWindow->outputWdurationSlotId = -1;
×
UNCOV
3441
      pVtbWindow->outputWendSlotId = -1;
×
UNCOV
3442
      pVtbWindow->outputWstartSlotId = -1;
×
UNCOV
3443
      pVtbWindow->curWinBatchIdx = 0;
×
UNCOV
3444
      break;
×
3445
    }
UNCOV
3446
    default:
×
UNCOV
3447
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
UNCOV
3448
      break;
×
3449
  }
3450
  return 0;
3,253,299✔
3451
}
3452

UNCOV
3453
int32_t vtbAggOpen(SOperatorInfo* pOperator) {
×
UNCOV
3454
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
3455
  int32_t                    line = 0;
×
UNCOV
3456
  int64_t                    st = 0;
×
UNCOV
3457
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
3458

UNCOV
3459
  if (OPTR_IS_OPENED(pOperator)) {
×
UNCOV
3460
    return code;
×
3461
  }
3462

UNCOV
3463
  if (pOperator->cost.openCost == 0) {
×
UNCOV
3464
    st = taosGetTimestampUs();
×
3465
  }
3466

UNCOV
3467
  code = buildVirtualSuperTableScanChildTableMap(pOperator);
×
UNCOV
3468
  QUERY_CHECK_CODE(code, line, _return);
×
UNCOV
3469
  OPTR_SET_OPENED(pOperator);
×
3470

UNCOV
3471
_return:
×
UNCOV
3472
  if (pOperator->cost.openCost == 0) {
×
UNCOV
3473
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
3474
  }
UNCOV
3475
  if (code) {
×
UNCOV
3476
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
UNCOV
3477
    pOperator->pTaskInfo->code = code;
×
UNCOV
3478
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3479
  }
UNCOV
3480
  return code;
×
3481
}
3482

UNCOV
3483
static int32_t buildAggOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
×
UNCOV
3484
  int32_t                   code = TSDB_CODE_SUCCESS;
×
UNCOV
3485
  int32_t                   lino = 0;
×
UNCOV
3486
  SOperatorParam*           pParam = NULL;
×
UNCOV
3487
  SOperatorParam*           pExchangeParam = NULL;
×
UNCOV
3488
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
3489
  bool                      freeExchange = false;
×
3490

UNCOV
3491
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
3492
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
×
3493

UNCOV
3494
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
3495
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
×
3496

UNCOV
3497
  pParam->value = taosMemoryMalloc(sizeof(SAggOperatorParam));
×
UNCOV
3498
  QUERY_CHECK_NULL(pParam->value, code, lino, _return, terrno)
×
3499

UNCOV
3500
  code = buildExchangeOperatorParamForVSAgg(&pExchangeParam, 0, NULL, 0, pVtbScan->otbVgIdToOtbInfoArrayMap);
×
UNCOV
3501
  QUERY_CHECK_CODE(code, lino, _return);
×
3502

UNCOV
3503
  freeExchange = true;
×
3504

UNCOV
3505
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
×
3506

UNCOV
3507
  freeExchange = false;
×
3508

UNCOV
3509
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
×
UNCOV
3510
  pParam->downstreamIdx = 0;
×
UNCOV
3511
  pParam->reUse = false;
×
3512

UNCOV
3513
  *ppRes = pParam;
×
3514

UNCOV
3515
  return code;
×
UNCOV
3516
_return:
×
UNCOV
3517
  if (freeExchange) {
×
UNCOV
3518
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
3519
  }
UNCOV
3520
  if (pParam) {
×
UNCOV
3521
    freeOperatorParam(pParam, OP_GET_PARAM);
×
3522
  }
UNCOV
3523
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3524
  return code;
×
3525
}
3526

UNCOV
3527
static int32_t buildAggOperatorParamWithGroupId(SDynQueryCtrlOperatorInfo* pInfo, uint64_t groupid, SOperatorParam** ppRes) {
×
UNCOV
3528
  int32_t                   code = TSDB_CODE_SUCCESS;
×
UNCOV
3529
  int32_t                   lino = 0;
×
UNCOV
3530
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
3531
  SOperatorParam*           pParam = NULL;
×
UNCOV
3532
  SOperatorParam*           pExchangeParam = NULL;
×
UNCOV
3533
  SHashObj*                 otbVgIdToOtbInfoArrayMap = NULL;
×
UNCOV
3534
  bool                      freeExchange = false;
×
UNCOV
3535
  void*                     pIter = taosHashGet(pVtbScan->vtbGroupIdToVgIdMapMap, &groupid, sizeof(groupid));
×
3536

UNCOV
3537
  if (!pIter) {
×
UNCOV
3538
    *ppRes = NULL;
×
UNCOV
3539
    return code;
×
3540
  }
3541

UNCOV
3542
  otbVgIdToOtbInfoArrayMap = *(SHashObj**)pIter;
×
3543

UNCOV
3544
  pParam = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
3545
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno)
×
3546

UNCOV
3547
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
3548
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno)
×
3549

UNCOV
3550
  code = buildExchangeOperatorParamForVSAgg(&pExchangeParam, 0, NULL, groupid, otbVgIdToOtbInfoArrayMap);
×
UNCOV
3551
  QUERY_CHECK_CODE(code, lino, _return);
×
3552

UNCOV
3553
  freeExchange = true;
×
3554

UNCOV
3555
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pExchangeParam), code, lino, _return, terrno)
×
3556

UNCOV
3557
  freeExchange = false;
×
3558

UNCOV
3559
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
×
UNCOV
3560
  pParam->downstreamIdx = 0;
×
UNCOV
3561
  pParam->value = NULL;
×
UNCOV
3562
  pParam->reUse = false;
×
3563

UNCOV
3564
  *ppRes = pParam;
×
3565

UNCOV
3566
  return code;
×
UNCOV
3567
_return:
×
UNCOV
3568
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3569
  if (freeExchange) {
×
UNCOV
3570
    freeOperatorParam(pExchangeParam, OP_GET_PARAM);
×
3571
  }
UNCOV
3572
  if (pParam) {
×
UNCOV
3573
    freeOperatorParam(pParam, OP_GET_PARAM);
×
3574
  }
UNCOV
3575
  return code;
×
3576
}
3577

UNCOV
3578
static int32_t buildAggOperatorParamForSingleChild(SDynQueryCtrlOperatorInfo* pInfo, tb_uid_t uid, uint64_t groupid, SArray* pTagList, SOperatorParam** ppRes) {
×
UNCOV
3579
  int32_t                   code = TSDB_CODE_SUCCESS;
×
UNCOV
3580
  int32_t                   lino = 0;
×
UNCOV
3581
  SVtbScanDynCtrlInfo*      pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
3582
  SOperatorParam*           pParam = NULL;
×
UNCOV
3583
  SHashObj*                 pOtbVgIdToOtbInfoArrayMap = NULL;
×
UNCOV
3584
  void*                     pIter = taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
×
3585

UNCOV
3586
  if (pIter) {
×
UNCOV
3587
    pOtbVgIdToOtbInfoArrayMap = *(SHashObj**)taosHashGet(pVtbScan->vtbUidToVgIdMapMap, &uid, sizeof(uid));
×
3588

UNCOV
3589
    code = buildExchangeOperatorParamForVSAgg(&pParam, 0, pTagList, groupid, pOtbVgIdToOtbInfoArrayMap);
×
UNCOV
3590
    QUERY_CHECK_CODE(code, lino, _return);
×
3591

UNCOV
3592
    *ppRes = pParam;
×
3593
  } else {
UNCOV
3594
    *ppRes = NULL;
×
3595
  }
3596

UNCOV
3597
  return code;
×
UNCOV
3598
_return:
×
UNCOV
3599
  if (pParam) {
×
UNCOV
3600
    freeOperatorParam(pParam, OP_GET_PARAM);
×
3601
  }
UNCOV
3602
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
3603
  return code;
×
3604
}
3605

UNCOV
3606
int32_t virtualTableAggGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
UNCOV
3607
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
3608
  int32_t                    line = 0;
×
UNCOV
3609
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
3610
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
3611
  SOperatorInfo*             pAggOp = pOperator->pDownstream[pOperator->numOfDownstream - 1];
×
UNCOV
3612
  SOperatorInfo*             pTagScanOp = pOperator->pDownstream[1];
×
UNCOV
3613
  SOperatorParam*            pAggParam = NULL;
×
3614

UNCOV
3615
  if (pInfo->vtbScan.hasPartition) {
×
UNCOV
3616
    if (pInfo->vtbScan.batchProcessChild) {
×
UNCOV
3617
      void* pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
×
UNCOV
3618
      while (pIter) {
×
UNCOV
3619
        size_t     keyLen = 0;
×
UNCOV
3620
        uint64_t   groupid = *(uint64_t*)taosHashGetKey(pIter, &keyLen);
×
3621

UNCOV
3622
        code = buildAggOperatorParamWithGroupId(pInfo, groupid, &pAggParam);
×
UNCOV
3623
        QUERY_CHECK_CODE(code, line, _return);
×
3624

UNCOV
3625
        if (pAggParam) {
×
UNCOV
3626
          code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
×
UNCOV
3627
          QUERY_CHECK_CODE(code, line, _return);
×
3628
        } else {
UNCOV
3629
          *pRes = NULL;
×
3630
        }
3631

UNCOV
3632
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
×
3633

UNCOV
3634
        if (*pRes) {
×
UNCOV
3635
          (*pRes)->info.id.groupId = groupid;
×
UNCOV
3636
          code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, &groupid, keyLen);
×
UNCOV
3637
          QUERY_CHECK_CODE(code, line, _return);
×
UNCOV
3638
          break;
×
3639
        }
3640
      }
3641
    } else {
UNCOV
3642
      void *pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, NULL);
×
UNCOV
3643
      while (pIter) {
×
UNCOV
3644
        size_t     keyLen = 0;
×
UNCOV
3645
        uint64_t*  groupid = (uint64_t*)taosHashGetKey(pIter, &keyLen);
×
UNCOV
3646
        SHashObj*  vtbUidTagListMap = *(SHashObj**)pIter;
×
3647

UNCOV
3648
        void* pIter2 = taosHashIterate(vtbUidTagListMap, NULL);
×
UNCOV
3649
        while (pIter2) {
×
UNCOV
3650
          size_t   keyLen2 = 0;
×
UNCOV
3651
          tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter2, &keyLen2);
×
UNCOV
3652
          SArray*  pTagList = *(SArray**)pIter2;
×
3653

UNCOV
3654
          if (pVtbScan->genNewParam) {
×
UNCOV
3655
            code = buildAggOperatorParamForSingleChild(pInfo, uid, *groupid, pTagList, &pAggParam);
×
UNCOV
3656
            QUERY_CHECK_CODE(code, line, _return);
×
UNCOV
3657
            if (pAggParam) {
×
UNCOV
3658
              code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
×
UNCOV
3659
              QUERY_CHECK_CODE(code, line, _return);
×
3660
            } else {
UNCOV
3661
              *pRes = NULL;
×
3662
            }
3663
          } else {
UNCOV
3664
            code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
×
UNCOV
3665
            QUERY_CHECK_CODE(code, line, _return);
×
3666
          }
3667

UNCOV
3668
          if (*pRes) {
×
UNCOV
3669
            pVtbScan->genNewParam = false;
×
UNCOV
3670
            (*pRes)->info.id.groupId = *groupid;
×
UNCOV
3671
            break;
×
3672
          }
UNCOV
3673
          pVtbScan->genNewParam = true;
×
UNCOV
3674
          pIter2 = taosHashIterate(vtbUidTagListMap, pIter2);
×
UNCOV
3675
          code = taosHashRemove(vtbUidTagListMap, &uid, keyLen);
×
UNCOV
3676
          QUERY_CHECK_CODE(code, line, _return);
×
3677
        }
UNCOV
3678
        if (*pRes) {
×
UNCOV
3679
          break;
×
3680
        }
UNCOV
3681
        pIter = taosHashIterate(pVtbScan->vtbGroupIdTagListMap, pIter);
×
UNCOV
3682
        code = taosHashRemove(pVtbScan->vtbGroupIdTagListMap, groupid, keyLen);
×
UNCOV
3683
        QUERY_CHECK_CODE(code, line, _return);
×
3684
      }
3685
    }
3686

3687
  } else {
UNCOV
3688
    if (pInfo->vtbScan.batchProcessChild) {
×
UNCOV
3689
      code = buildAggOperatorParam(pInfo, &pAggParam);
×
UNCOV
3690
      QUERY_CHECK_CODE(code, line, _return);
×
3691

UNCOV
3692
      code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
×
UNCOV
3693
      QUERY_CHECK_CODE(code, line, _return);
×
UNCOV
3694
      setOperatorCompleted(pOperator);
×
3695
    } else {
UNCOV
3696
      void* pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, NULL);
×
UNCOV
3697
      while (pIter) {
×
UNCOV
3698
        size_t   keyLen = 0;
×
UNCOV
3699
        tb_uid_t uid = *(tb_uid_t*)taosHashGetKey(pIter, &keyLen);
×
UNCOV
3700
        SArray*  pTagList = *(SArray**)pIter;
×
3701

UNCOV
3702
        if (pVtbScan->genNewParam) {
×
UNCOV
3703
          code = buildAggOperatorParamForSingleChild(pInfo, uid, 0, pTagList, &pAggParam);
×
UNCOV
3704
          QUERY_CHECK_CODE(code, line, _return);
×
3705

UNCOV
3706
          if (pAggParam) {
×
UNCOV
3707
            code = pAggOp->fpSet.getNextExtFn(pAggOp, pAggParam, pRes);
×
UNCOV
3708
            QUERY_CHECK_CODE(code, line, _return);
×
3709
          } else {
UNCOV
3710
            *pRes = NULL;
×
3711
          }
3712
        } else {
UNCOV
3713
          code = pAggOp->fpSet.getNextFn(pAggOp, pRes);
×
UNCOV
3714
          QUERY_CHECK_CODE(code, line, _return);
×
3715
        }
3716

UNCOV
3717
        if (*pRes) {
×
UNCOV
3718
          pVtbScan->genNewParam = false;
×
UNCOV
3719
          break;
×
3720
        }
UNCOV
3721
        pVtbScan->genNewParam = true;
×
UNCOV
3722
        pIter = taosHashIterate(pVtbScan->vtbUidTagListMap, pIter);
×
UNCOV
3723
        code = taosHashRemove(pVtbScan->vtbUidTagListMap, &uid, keyLen);
×
UNCOV
3724
        QUERY_CHECK_CODE(code, line, _return);
×
3725
      }
3726
    }
3727
  }
UNCOV
3728
_return:
×
UNCOV
3729
  if (code) {
×
UNCOV
3730
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
3731
  }
UNCOV
3732
  return code;
×
3733
}
3734

UNCOV
3735
int32_t vtbAggNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
UNCOV
3736
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
3737
  int32_t                    line = 0;
×
UNCOV
3738
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
3739
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
3740

UNCOV
3741
  QRY_PARAM_CHECK(pRes);
×
UNCOV
3742
  if (pOperator->status == OP_EXEC_DONE) {
×
UNCOV
3743
    return code;
×
3744
  }
3745

UNCOV
3746
  code = pOperator->fpSet._openFn(pOperator);
×
UNCOV
3747
  QUERY_CHECK_CODE(code, line, _return);
×
3748

UNCOV
3749
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
×
UNCOV
3750
    setOperatorCompleted(pOperator);
×
UNCOV
3751
    return code;
×
3752
  }
3753

UNCOV
3754
  code = virtualTableAggGetNext(pOperator, pRes);
×
UNCOV
3755
  QUERY_CHECK_CODE(code, line, _return);
×
3756

UNCOV
3757
  return code;
×
3758

UNCOV
3759
_return:
×
UNCOV
3760
  if (code) {
×
UNCOV
3761
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
UNCOV
3762
    pOperator->pTaskInfo->code = code;
×
UNCOV
3763
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
3764
  }
UNCOV
3765
  return code;
×
3766
}
3767

3768
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
754,629✔
3769
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
3770
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
3771
  QRY_PARAM_CHECK(pOptrInfo);
754,629✔
3772

3773
  int32_t                    code = TSDB_CODE_SUCCESS;
754,629✔
3774
  int32_t                    line = 0;
754,629✔
3775
  __optr_fn_t                nextFp = NULL;
754,629✔
3776
  __optr_open_fn_t           openFp = NULL;
754,629✔
3777
  SOperatorInfo*             pOperator = NULL;
754,629✔
3778
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
754,629✔
3779
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
754,629✔
3780

3781
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
754,629✔
3782
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
754,629✔
3783

3784
  pOperator->pPhyNode = pPhyciNode;
754,629✔
3785
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
754,629✔
3786

3787
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
754,629✔
3788
  QUERY_CHECK_CODE(code, line, _error);
754,629✔
3789

3790
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
754,629✔
3791
                  pInfo, pTaskInfo);
3792

3793
  pInfo->qType = pPhyciNode->qType;
754,629✔
3794
  switch (pInfo->qType) {
754,629✔
3795
    case DYN_QTYPE_STB_HASH:
517,193✔
3796
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
517,193✔
3797
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
517,193✔
3798
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
517,193✔
3799
      QUERY_CHECK_CODE(code, line, _error);
517,193✔
3800
      nextFp = seqStableJoin;
517,193✔
3801
      openFp = optrDummyOpenFn;
517,193✔
3802
      break;
517,193✔
3803
    case DYN_QTYPE_VTB_SCAN:
237,436✔
3804
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
237,436✔
3805
      QUERY_CHECK_CODE(code, line, _error);
237,436✔
3806
      nextFp = vtbScanNext;
237,436✔
3807
      openFp = vtbScanOpen;
237,436✔
3808
      break;
237,436✔
UNCOV
3809
    case DYN_QTYPE_VTB_WINDOW:
×
UNCOV
3810
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
×
UNCOV
3811
      QUERY_CHECK_CODE(code, line, _error);
×
UNCOV
3812
      nextFp = vtbWindowNext;
×
UNCOV
3813
      openFp = vtbWindowOpen;
×
UNCOV
3814
      break;
×
UNCOV
3815
    case DYN_QTYPE_VTB_AGG:
×
UNCOV
3816
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
×
UNCOV
3817
      QUERY_CHECK_CODE(code, line, _error);
×
UNCOV
3818
      nextFp = vtbAggNext;
×
UNCOV
3819
      openFp = vtbAggOpen;
×
UNCOV
3820
      break;
×
UNCOV
3821
    default:
×
UNCOV
3822
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
UNCOV
3823
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
3824
      goto _error;
×
3825
  }
3826

3827
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
754,629✔
3828
                                         NULL, optrDefaultGetNextExtFn, NULL);
3829

3830
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
754,629✔
3831
  *pOptrInfo = pOperator;
754,629✔
3832
  return TSDB_CODE_SUCCESS;
754,629✔
3833

UNCOV
3834
_error:
×
UNCOV
3835
  if (pInfo != NULL) {
×
UNCOV
3836
    destroyDynQueryCtrlOperator(pInfo);
×
3837
  }
UNCOV
3838
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
UNCOV
3839
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
UNCOV
3840
  pTaskInfo->code = code;
×
UNCOV
3841
  return code;
×
3842
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc