• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4768

01 Oct 2025 04:06AM UTC coverage: 57.85% (-0.8%) from 58.606%
#4768

push

travis-ci

web-flow
Merge pull request #33171 from taosdata/merge/3.3.6tomain

merge: from 3.3.6 to main branch

137167 of 302743 branches covered (45.31%)

Branch coverage included in aggregate %.

15 of 20 new or added lines in 2 files covered. (75.0%)

12125 existing lines in 175 files now uncovered.

208282 of 294403 relevant lines covered (70.75%)

5618137.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.44
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
6,279✔
41
  SStbJoinTableList* pNext = NULL;
6,279✔
42
  
43
  while (pListHead) {
6,280✔
44
    taosMemoryFree(pListHead->pLeftVg);
1!
45
    taosMemoryFree(pListHead->pLeftUid);
1!
46
    taosMemoryFree(pListHead->pRightVg);
1!
47
    taosMemoryFree(pListHead->pRightUid);
1!
48
    pNext = pListHead->pNext;
1✔
49
    taosMemoryFree(pListHead);
1!
50
    pListHead = pNext;
1✔
51
  }
52
}
6,279✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
6,279✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
6,279✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
6,279✔
60
    if (pStbJoin->ctx.prev.leftHash) {
6,270✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
5,972✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
5,972✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
6,270✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
5,972✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
5,972✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
9!
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
9✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
9!
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
9✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
9!
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
9✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
6,279✔
81
}
6,279✔
82

83
void destroyOrgTbInfo(void *info) {
11,808✔
84
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
11,808✔
85
  if (pOrgTbInfo) {
11,808!
86
    taosArrayDestroy(pOrgTbInfo->colMap);
11,808✔
87
  }
88
}
11,808✔
89

90
void destroyColRefInfo(void *info) {
73,650✔
91
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
73,650✔
92
  if (pColRefInfo) {
73,650!
93
    taosMemoryFree(pColRefInfo->colName);
73,650!
94
    taosMemoryFree(pColRefInfo->colrefName);
73,650!
95
  }
96
}
73,650✔
97

98
void destroyColRefArray(void *info) {
4,522✔
99
  SArray *pColRefArray = *(SArray **)info;
4,522✔
100
  if (pColRefArray) {
4,522!
101
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
4,522✔
102
  }
103
}
4,522✔
104

105
void freeUseDbOutput(void* pOutput) {
2,084✔
106
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
2,084✔
107
  if (NULL == pOutput) {
2,084!
108
    return;
×
109
  }
110

111
  if (pOut->dbVgroup) {
2,084!
112
    freeVgInfo(pOut->dbVgroup);
2,084✔
113
  }
114
  taosMemFree(pOut);
2,084✔
115
}
116

117
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
1,185✔
118
  if (pVtbScan->dbName) {
1,185!
119
    taosMemoryFreeClear(pVtbScan->dbName);
1,185!
120
  }
121
  if (pVtbScan->tbName) {
1,185!
122
    taosMemoryFreeClear(pVtbScan->tbName);
1,185!
123
  }
124
  if (pVtbScan->childTableList) {
1,185!
125
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
1,185✔
126
  }
127
  if (pVtbScan->colRefInfo) {
1,185!
128
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
129
    pVtbScan->colRefInfo = NULL;
×
130
  }
131
  if (pVtbScan->childTableMap) {
1,185✔
132
    taosHashCleanup(pVtbScan->childTableMap);
1,120✔
133
  }
134
  if (pVtbScan->readColList) {
1,185!
135
    taosArrayDestroy(pVtbScan->readColList);
1,185✔
136
  }
137
  if (pVtbScan->dbVgInfoMap) {
1,185!
138
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
1,185✔
139
    taosHashCleanup(pVtbScan->dbVgInfoMap);
1,185✔
140
  }
141
  if (pVtbScan->orgTbVgColMap) {
1,185!
142
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
143
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
144
  }
145
  if (pVtbScan->pRsp) {
1,185!
146
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
147
    taosMemoryFreeClear(pVtbScan->pRsp);
×
148
  }
149
  if (pVtbScan->existOrgTbVg) {
1,185!
150
    taosHashCleanup(pVtbScan->existOrgTbVg);
1,185✔
151
  }
152
  if (pVtbScan->curOrgTbVg) {
1,185✔
153
    taosHashCleanup(pVtbScan->curOrgTbVg);
16✔
154
  }
155
  if (pVtbScan->newAddedVgInfo) {
1,185✔
156
    taosHashCleanup(pVtbScan->newAddedVgInfo);
10✔
157
  }
158
}
1,185✔
159

160
static void destroyDynQueryCtrlOperator(void* param) {
7,464✔
161
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
7,464✔
162

163
  switch (pDyn->qType) {
7,464!
164
    case DYN_QTYPE_STB_HASH:
6,279✔
165
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
6,279✔
166
      break;
6,279✔
167
    case DYN_QTYPE_VTB_SCAN:
1,185✔
168
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
1,185✔
169
      break;
1,185✔
170
    default:
×
171
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
172
      break;
×
173
  }
174

175
  taosMemoryFreeClear(param);
7,464!
176
}
7,464✔
177

178
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
179
  if (batchFetch) {
55,686✔
180
    return true;
55,650✔
181
  }
182
  
183
  if (rightTable) {
36!
184
    return pPost->rightCurrUid == pPost->rightNextUid;
18✔
185
  }
186

187
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
18✔
188

189
  return (NULL == num) ? false : true;
18✔
190
}
191

192
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
27,843✔
193
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
27,843✔
194
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
27,843✔
195
  SStbJoinTableList*         pNode = pPrev->pListHead;
27,843✔
196
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
27,843✔
197
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
27,843✔
198
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
27,843✔
199
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
27,843✔
200
  int64_t                    readIdx = pNode->readIdx + 1;
27,843✔
201
  int64_t                    rightPrevUid = pPost->rightCurrUid;
27,843✔
202

203
  pPost->leftCurrUid = *leftUid;
27,843✔
204
  pPost->rightCurrUid = *rightUid;
27,843✔
205

206
  pPost->leftVgId = *leftVgId;
27,843✔
207
  pPost->rightVgId = *rightVgId;
27,843✔
208

209
  while (true) {
210
    if (readIdx < pNode->uidNum) {
27,843✔
211
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
27,537✔
212
      break;
27,537✔
213
    }
214
    
215
    pNode = pNode->pNext;
306✔
216
    if (NULL == pNode) {
306!
217
      pPost->rightNextUid = 0;
306✔
218
      break;
306✔
219
    }
220
    
221
    rightUid = pNode->pRightUid;
×
222
    readIdx = 0;
×
223
  }
224

225
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
27,843✔
226
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
27,843✔
227

228
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
27,843!
229
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
230
    pStbJoin->execInfo.rightCacheNum++;
×
231
  }  
232

233
  return TSDB_CODE_SUCCESS;
27,843✔
234
}
235

236

237
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
55,686✔
238
  int32_t code = TSDB_CODE_SUCCESS;
55,686✔
239
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
55,686!
240
  if (NULL == *ppRes) {
55,686!
241
    code = terrno;
×
242
    freeOperatorParam(pChild, OP_GET_PARAM);
×
243
    return code;
×
244
  }
245
  if (pChild) {
55,686✔
246
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
632✔
247
    if (NULL == (*ppRes)->pChildren) {
632!
248
      code = terrno;
×
249
      freeOperatorParam(pChild, OP_GET_PARAM);
×
250
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
251
      *ppRes = NULL;
×
252
      return code;
×
253
    }
254
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
1,264!
255
      code = terrno;
×
256
      freeOperatorParam(pChild, OP_GET_PARAM);
×
257
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
258
      *ppRes = NULL;
×
259
      return code;
×
260
    }
261
  } else {
262
    (*ppRes)->pChildren = NULL;
55,054✔
263
  }
264

265
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
55,686!
266
  if (NULL == pGc) {
55,686!
267
    code = terrno;
×
268
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
269
    *ppRes = NULL;
×
270
    return code;
×
271
  }
272

273
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
55,686✔
274
  pGc->downstreamIdx = downstreamIdx;
55,686✔
275
  pGc->vgId = vgId;
55,686✔
276
  pGc->tbUid = tbUid;
55,686✔
277
  pGc->needCache = needCache;
55,686✔
278

279
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
55,686✔
280
  (*ppRes)->downstreamIdx = downstreamIdx;
55,686✔
281
  (*ppRes)->value = pGc;
55,686✔
282
  (*ppRes)->reUse = false;
55,686✔
283

284
  return TSDB_CODE_SUCCESS;
55,686✔
285
}
286

287

288
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
289
  int32_t code = TSDB_CODE_SUCCESS;
×
290
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
291
  if (NULL == *ppRes) {
×
292
    return terrno;
×
293
  }
294
  (*ppRes)->pChildren = NULL;
×
295

296
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
297
  if (NULL == pGc) {
×
298
    code = terrno;
×
299
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
300
    return code;
×
301
  }
302

303
  pGc->downstreamIdx = downstreamIdx;
×
304
  pGc->vgId = vgId;
×
305
  pGc->tbUid = tbUid;
×
306

307
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
308
  (*ppRes)->downstreamIdx = downstreamIdx;
×
309
  (*ppRes)->value = pGc;
×
310
  (*ppRes)->reUse = false;
×
311

312
  return TSDB_CODE_SUCCESS;
×
313
}
314

315

316
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
36✔
317
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
36!
318
  if (NULL == *ppRes) {
36!
319
    return terrno;
×
320
  }
321
  (*ppRes)->pChildren = NULL;
36✔
322
  
323
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
36!
324
  if (NULL == pExc) {
36!
325
    return terrno;
×
326
  }
327

328
  pExc->multiParams = false;
36✔
329
  pExc->basic.vgId = *pVgId;
36✔
330
  pExc->basic.tableSeq = true;
36✔
331
  pExc->basic.isVtbRefScan = false;
36✔
332
  pExc->basic.isVtbTagScan = false;
36✔
333
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
36✔
334
  pExc->basic.colMap = NULL;
36✔
335
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
36✔
336
  if (NULL == pExc->basic.uidList) {
36!
337
    taosMemoryFree(pExc);
×
338
    return terrno;
×
339
  }
340
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
72!
341
    taosArrayDestroy(pExc->basic.uidList);
×
342
    taosMemoryFree(pExc);
×
343
    return terrno;
×
344
  }
345

346
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
36✔
347
  (*ppRes)->downstreamIdx = downstreamIdx;
36✔
348
  (*ppRes)->value = pExc;
36✔
349
  (*ppRes)->reUse = false;
36✔
350

351
  return TSDB_CODE_SUCCESS;
36✔
352
}
353

354
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
529✔
355
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
529!
356
  if (NULL == *ppRes) {
529!
357
    return terrno;
×
358
  }
359
  (*ppRes)->pChildren = NULL;
529✔
360
  
361
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
529!
362
  if (NULL == pExc) {
529!
363
    taosMemoryFreeClear(*ppRes);
×
364
    return terrno;
×
365
  }
366

367
  pExc->multiParams = true;
529✔
368
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
529✔
369
  if (NULL == pExc->pBatchs) {
529!
370
    taosMemoryFree(pExc);
×
371
    taosMemoryFreeClear(*ppRes);
×
372
    return terrno;
×
373
  }
374
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
529✔
375
  
376
  SExchangeOperatorBasicParam basic;
377
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
529✔
378

379
  int32_t iter = 0;
529✔
380
  void* p = NULL;
529✔
381
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
1,421✔
382
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
892✔
383
    SArray* pUidList = *(SArray**)p;
892✔
384
    basic.vgId = *pVgId;
892✔
385
    basic.uidList = pUidList;
892✔
386
    basic.colMap = NULL;
892✔
387
    basic.tableSeq = false;
892✔
388
    basic.isVtbRefScan = false;
892✔
389
    basic.isVtbTagScan = false;
892✔
390
    
391
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
892!
392

393
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
892!
394
    *(SArray**)p = NULL;
892✔
395
  }
396

397
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
529✔
398
  (*ppRes)->downstreamIdx = downstreamIdx;
529✔
399
  (*ppRes)->value = pExc;
529✔
400
  (*ppRes)->reUse = false;
529✔
401

402
  return TSDB_CODE_SUCCESS;
529✔
403
}
404

405
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
4,844✔
406
  int32_t                      code = TSDB_CODE_SUCCESS;
4,844✔
407
  int32_t                      lino = 0;
4,844✔
408
  SExchangeOperatorParam*      pExc = NULL;
4,844✔
409
  SExchangeOperatorBasicParam* basic = NULL;
4,844✔
410

411
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,844!
412
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,844!
413
  (*ppRes)->pChildren = NULL;
4,844✔
414

415
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
4,844!
416
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
4,844!
417

418
  pExc->multiParams = false;
4,844✔
419

420
  basic = &pExc->basic;
4,844✔
421
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
4,844✔
422

423
  basic->vgId = vgId;
4,844✔
424
  basic->tableSeq = false;
4,844✔
425
  basic->isVtbRefScan = false;
4,844✔
426
  basic->isVtbTagScan = true;
4,844✔
427
  basic->isNewDeployed = false;
4,844✔
428
  basic->colMap = NULL;
4,844✔
429

430
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
4,844✔
431
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
4,844!
432
  QUERY_CHECK_NULL(taosArrayPush(basic->uidList, &uid), code, lino, _return, terrno)
9,688!
433

434
  (*ppRes)->pChildren = NULL;
4,844✔
435

436
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
4,844✔
437
  (*ppRes)->downstreamIdx = downstreamIdx;
4,844✔
438
  (*ppRes)->value = pExc;
4,844✔
439
  (*ppRes)->reUse = true;
4,844✔
440

441
  return TSDB_CODE_SUCCESS;
4,844✔
442

443
_return:
×
444
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
445
  taosMemoryFreeClear(*ppRes);
×
446
  if (basic) {
×
447
    if (basic->colMap) {
×
448
      taosArrayDestroy(basic->colMap->colMap);
×
449
      taosMemoryFreeClear(basic->colMap);
×
450
    }
451
    if (basic->uidList) {
×
452
      taosArrayDestroy(basic->uidList);
×
453
    }
454
    taosMemoryFreeClear(basic);
×
455
  }
456
  taosMemoryFreeClear(pExc);
×
457
  return code;
×
458
}
459

460
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
11,788✔
461
  int32_t                      code = TSDB_CODE_SUCCESS;
11,788✔
462
  int32_t                      lino = 0;
11,788✔
463
  SExchangeOperatorParam*      pExc = NULL;
11,788✔
464
  SExchangeOperatorBasicParam* basic = NULL;
11,788✔
465

466
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
11,788!
467
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
11,788!
468
  (*ppRes)->pChildren = NULL;
11,788✔
469

470
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
11,788!
471
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
11,788!
472

473
  pExc->multiParams = false;
11,788✔
474

475
  basic = &pExc->basic;
11,788✔
476
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
11,788✔
477

478
  basic->vgId = pMap->vgId;
11,788✔
479
  basic->tableSeq = false;
11,788✔
480
  basic->isVtbRefScan = true;
11,788✔
481
  basic->isVtbTagScan = false;
11,788✔
482
  basic->isNewDeployed = false;
11,788✔
483
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
11,788!
484
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
11,788!
485
  basic->colMap->vgId = pMap->vgId;
11,788✔
486
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
11,788✔
487
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
11,788✔
488
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
11,788!
489

490
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
11,788✔
491
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
11,788!
492

493
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
11,788✔
494
  (*ppRes)->downstreamIdx = downstreamIdx;
11,788✔
495
  (*ppRes)->value = pExc;
11,788✔
496
  (*ppRes)->reUse = true;
11,788✔
497

498
  return TSDB_CODE_SUCCESS;
11,788✔
499

500
_return:
×
501
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
502
  taosMemoryFreeClear(*ppRes);
×
503
  if (basic) {
×
504
    if (basic->colMap) {
×
505
      taosArrayDestroy(basic->colMap->colMap);
×
506
      taosMemoryFreeClear(basic->colMap);
×
507
    }
508
    if (basic->uidList) {
×
509
      taosArrayDestroy(basic->uidList);
×
510
    }
511
    taosMemoryFreeClear(basic);
×
512
  }
513
  taosMemoryFreeClear(pExc);
×
514
  return code;
×
515
}
516

517
static int32_t buildExchangeOperatorParamForVScanEx(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap, uint64_t taskId, SStreamTaskAddr* pTaskAddr) {
20✔
518
  int32_t                      code = TSDB_CODE_SUCCESS;
20✔
519
  int32_t                      lino = 0;
20✔
520
  SExchangeOperatorParam*      pExc = NULL;
20✔
521
  SExchangeOperatorBasicParam* basic = NULL;
20✔
522

523
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
20!
524
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
20!
525
  (*ppRes)->pChildren = NULL;
20✔
526

527
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
20!
528
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
20!
529

530
  pExc->multiParams = false;
20✔
531

532
  basic = &pExc->basic;
20✔
533
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
20✔
534

535
  basic->vgId = pMap->vgId;
20✔
536
  basic->tableSeq = false;
20✔
537
  basic->isVtbRefScan = true;
20✔
538
  basic->isVtbTagScan = false;
20✔
539
  basic->isNewDeployed = true;
20✔
540
  basic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
20✔
541
  basic->newDeployedSrc.clientId = taskId;// current task's taskid
20✔
542
  basic->newDeployedSrc.taskId = pTaskAddr->taskId;
20✔
543
  basic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
20✔
544
  basic->newDeployedSrc.localExec = false;
20✔
545
  basic->newDeployedSrc.addr.nodeId = pTaskAddr->nodeId;
20✔
546
  memcpy(&basic->newDeployedSrc.addr.epSet, &pTaskAddr->epset, sizeof(SEpSet));
20✔
547
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
20!
548
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
20!
549
  basic->colMap->vgId = pMap->vgId;
20✔
550
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
20✔
551
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
20✔
552
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
20!
553

554
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
20✔
555
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
20!
556

557
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
20✔
558
  (*ppRes)->downstreamIdx = downstreamIdx;
20✔
559
  (*ppRes)->value = pExc;
20✔
560
  (*ppRes)->reUse = true;
20✔
561

562
  return TSDB_CODE_SUCCESS;
20✔
563

564
_return:
×
565
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
566
  taosMemoryFreeClear(*ppRes);
×
567
  if (basic) {
×
568
    if (basic->colMap) {
×
569
      taosArrayDestroy(basic->colMap->colMap);
×
570
      taosMemoryFreeClear(basic->colMap);
×
571
    }
572
    if (basic->uidList) {
×
573
      taosArrayDestroy(basic->uidList);
×
574
    }
575
    taosMemoryFreeClear(basic);
×
576
  }
577
  taosMemoryFreeClear(pExc);
×
578
  return code;
×
579
}
580

581
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
27,843✔
582
  int32_t code = TSDB_CODE_SUCCESS;
27,843✔
583
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
27,843!
584
  if (NULL == *ppRes) {
27,843!
585
    code = terrno;
×
586
    return code;
×
587
  }
588
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
27,843✔
589
  if (NULL == (*ppRes)->pChildren) {
27,843!
590
    code = terrno;
×
591
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
592
    *ppRes = NULL;
×
593
    return code;
×
594
  }
595
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
55,686!
596
    code = terrno;
×
597
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
598
    *ppRes = NULL;
×
599
    return code;
×
600
  }
601
  *ppChild0 = NULL;
27,843✔
602
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
55,686!
603
    code = terrno;
×
604
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
605
    *ppRes = NULL;
×
606
    return code;
×
607
  }
608
  *ppChild1 = NULL;
27,843✔
609
  
610
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
27,843!
611
  if (NULL == pJoin) {
27,843!
612
    code = terrno;
×
613
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
614
    *ppRes = NULL;
×
615
    return code;
×
616
  }
617

618
  pJoin->initDownstream = initParam;
27,843✔
619
  
620
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
27,843✔
621
  (*ppRes)->value = pJoin;
27,843✔
622
  (*ppRes)->reUse = false;
27,843✔
623

624
  return TSDB_CODE_SUCCESS;
27,843✔
625
}
626

627
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
628
  int32_t code = TSDB_CODE_SUCCESS;
×
629
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
630
  if (NULL == *ppRes) {
×
631
    code = terrno;
×
632
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
633
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
634
    return code;
×
635
  }
636
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
637
  if (NULL == *ppRes) {
×
638
    code = terrno;
×
639
    taosMemoryFreeClear(*ppRes);
×
640
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
641
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
642
    return code;
×
643
  }
644
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
645
    code = terrno;
×
646
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
647
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
648
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
649
    *ppRes = NULL;
×
650
    return code;
×
651
  }
652
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
653
    code = terrno;
×
654
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
655
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
656
    *ppRes = NULL;
×
657
    return code;
×
658
  }
659
  
660
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
661
  (*ppRes)->value = NULL;
×
662
  (*ppRes)->reUse = false;
×
663

664
  return TSDB_CODE_SUCCESS;
×
665
}
666

667
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
67✔
668
  int32_t code = TSDB_CODE_SUCCESS;
67✔
669
  int32_t vgNum = tSimpleHashGetSize(pVg);
67✔
670
  if (vgNum <= 0 || vgNum > 1) {
67!
671
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
672
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
673
  }
674

675
  int32_t iter = 0;
67✔
676
  void* p = NULL;
67✔
677
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
134✔
678
    SArray* pUidList = *(SArray**)p;
67✔
679

680
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
67✔
681
    if (code) {
67!
682
      return code;
×
683
    }
684
    taosArrayDestroy(pUidList);
67✔
685
    *(SArray**)p = NULL;
67✔
686
  }
687
  
688
  return TSDB_CODE_SUCCESS;
67✔
689
}
690

691

692
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
693
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
694
  if (NULL == pUidList) {
×
695
    return terrno;
×
696
  }
697
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
698
    return terrno;
×
699
  }
700

701
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
702
  taosArrayDestroy(pUidList);
×
703
  if (code) {
×
704
    return code;
×
705
  }
706
  
707
  return TSDB_CODE_SUCCESS;
×
708
}
709

710
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
27,843✔
711
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
27,843✔
712
  SOperatorParam*             pSrcParam0 = NULL;
27,843✔
713
  SOperatorParam*             pSrcParam1 = NULL;
27,843✔
714
  SOperatorParam*             pGcParam0 = NULL;
27,843✔
715
  SOperatorParam*             pGcParam1 = NULL;  
27,843✔
716
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
27,843✔
717
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
27,843✔
718
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
27,843✔
719
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
27,843✔
720
  int32_t                     code = TSDB_CODE_SUCCESS;
27,843✔
721

722
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
27,843✔
723
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
724

725
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
27,843!
726
  
727
  if (pInfo->stbJoin.basic.batchFetch) {
27,843✔
728
    if (pPrev->leftHash) {
27,825✔
729
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
298✔
730
      if (TSDB_CODE_SUCCESS == code) {
298!
731
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
298✔
732
      }
733
      if (TSDB_CODE_SUCCESS == code) {
298!
734
        tSimpleHashCleanup(pPrev->leftHash);
298✔
735
        tSimpleHashCleanup(pPrev->rightHash);
298✔
736
        pPrev->leftHash = NULL;
298✔
737
        pPrev->rightHash = NULL;
298✔
738
      }
739
    }
740
  } else {
741
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
18!
742
    if (TSDB_CODE_SUCCESS == code) {
18!
743
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
18!
744
    }
745
  }
746

747
  bool initParam = pSrcParam0 ? true : false;
27,843✔
748
  if (TSDB_CODE_SUCCESS == code) {
27,843!
749
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
27,843✔
750
    pSrcParam0 = NULL;
27,843✔
751
  }
752
  if (TSDB_CODE_SUCCESS == code) {
27,843!
753
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
27,843✔
754
    pSrcParam1 = NULL;
27,843✔
755
  }
756
  if (TSDB_CODE_SUCCESS == code) {
27,843!
757
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
27,843✔
758
  }
759
  if (TSDB_CODE_SUCCESS != code) {
27,843!
760
    if (pSrcParam0) {
×
761
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
762
    }
763
    if (pSrcParam1) {
×
764
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
765
    }
766
    if (pGcParam0) {
×
767
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
768
    }
769
    if (pGcParam1) {
×
770
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
771
    }
772
    if (*ppParam) {
×
773
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
774
      *ppParam = NULL;
×
775
    }
776
  }
777
  
778
  return code;
27,843✔
779
}
780

781
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
27,843✔
782
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
27,843✔
783
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
27,843✔
784
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
27,843✔
785
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
27,843✔
786
  SOperatorParam*            pParam = NULL;
27,843✔
787
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
27,843✔
788
  if (TSDB_CODE_SUCCESS != code) {
27,843!
789
    pOperator->pTaskInfo->code = code;
×
790
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
791
  }
792

793
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
27,843✔
794
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
27,843✔
795
  if (*ppRes && (code == 0)) {
27,843!
796
    code = blockDataCheck(*ppRes);
806✔
797
    if (code) {
806!
798
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
799
      pOperator->pTaskInfo->code = code;
×
800
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
801
    }
802
    pPost->isStarted = true;
806✔
803
    pStbJoin->execInfo.postBlkNum++;
806✔
804
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
806✔
805
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
806!
806
  } else {
807
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
27,037✔
808
  }
809
}
27,843✔
810

811

812
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
813
  SOperatorParam* pGcParam = NULL;
×
814
  SOperatorParam* pMergeJoinParam = NULL;
×
815
  int32_t         downstreamId = leftTable ? 0 : 1;
×
816
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
817
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
818

819
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
820

821
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
822
  if (TSDB_CODE_SUCCESS != code) {
×
823
    return code;
×
824
  }
825
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
826
  if (TSDB_CODE_SUCCESS != code) {
×
827
    return code;
×
828
  }
829

830
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
831
}
832

833
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
27,842✔
834
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
27,842✔
835
  int32_t code = 0;
27,842✔
836
  
837
  pPost->isStarted = false;
27,842✔
838
  
839
  if (pStbJoin->basic.batchFetch) {
27,842✔
840
    return TSDB_CODE_SUCCESS;
27,824✔
841
  }
842
  
843
  if (pPost->leftNeedCache) {
18!
844
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
845
    if (num && --(*num) <= 0) {
×
846
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
847
      if (code) {
×
848
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
849
        QRY_ERR_RET(code);
×
850
      }
851
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
852
    }
853
  }
854
  
855
  if (!pPost->rightNeedCache) {
18!
856
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
18✔
857
    if (NULL != v) {
18!
858
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
859
      if (code) {
×
860
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
861
        QRY_ERR_RET(code);
×
862
      }
863
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
864
    }
865
  }
866

867
  return TSDB_CODE_SUCCESS;
18✔
868
}
869

870

871
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
872
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,112✔
873
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
1,112✔
874
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
1,112✔
875

876
  if (!pPost->isStarted) {
1,112✔
877
    return TSDB_CODE_SUCCESS;
307✔
878
  }
879
  
880
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
805!
881
  
882
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
805✔
883
  if (NULL == *ppRes) {
805!
884
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
805!
885
    pPrev->pListHead->readIdx++;
805✔
886
  } else {
887
    pInfo->stbJoin.execInfo.postBlkNum++;
×
888
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
889
  }
890

891
  return TSDB_CODE_SUCCESS;
805✔
892
}
893

894
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
895
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
55,658✔
896
  if (NULL == ppArray) {
55,658✔
897
    SArray* pArray = taosArrayInit(10, valSize);
959✔
898
    if (NULL == pArray) {
959!
899
      return terrno;
×
900
    }
901
    if (NULL == taosArrayPush(pArray, pVal)) {
1,918!
902
      taosArrayDestroy(pArray);
×
903
      return terrno;
×
904
    }
905
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
959!
906
      taosArrayDestroy(pArray);      
×
907
      return terrno;
×
908
    }
909
    return TSDB_CODE_SUCCESS;
959✔
910
  }
911

912
  if (NULL == taosArrayPush(*ppArray, pVal)) {
109,398!
913
    return terrno;
×
914
  }
915
  
916
  return TSDB_CODE_SUCCESS;
54,699✔
917
}
918

919
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
920
  int32_t code = TSDB_CODE_SUCCESS;
18✔
921
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
18✔
922
  if (NULL == pNum) {
18!
923
    uint32_t n = 1;
18✔
924
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
18✔
925
    if (code) {
18!
926
      return code;
×
927
    }
928
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
18✔
929
    if (code) {
18!
930
      return code;
×
931
    }
932
    return TSDB_CODE_SUCCESS;
18✔
933
  }
934

935
  switch (*pNum) {
×
936
    case 0:
×
937
      break;
×
938
    case UINT32_MAX:
×
939
      *pNum = 0;
×
940
      break;
×
941
    default:
×
942
      if (1 == (*pNum)) {
×
943
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
944
        if (code) {
×
945
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
946
          QRY_ERR_RET(code);
×
947
        }
948
      }
949
      (*pNum)++;
×
950
      break;
×
951
  }
952
  
953
  return TSDB_CODE_SUCCESS;
×
954
}
955

956

957
static void freeStbJoinTableList(SStbJoinTableList* pList) {
306✔
958
  if (NULL == pList) {
306!
959
    return;
×
960
  }
961
  taosMemoryFree(pList->pLeftVg);
306!
962
  taosMemoryFree(pList->pLeftUid);
306!
963
  taosMemoryFree(pList->pRightVg);
306!
964
  taosMemoryFree(pList->pRightUid);
306!
965
  taosMemoryFree(pList);
306!
966
}
967

968
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
307✔
969
  int32_t code = TSDB_CODE_SUCCESS;
307✔
970
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
307!
971
  if (NULL == pNew) {
307!
972
    return terrno;
×
973
  }
974
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
307!
975
  if (NULL == pNew->pLeftVg) {
307!
976
    code = terrno;
×
977
    freeStbJoinTableList(pNew);
×
978
    return code;
×
979
  }
980
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
307!
981
  if (NULL == pNew->pLeftUid) {
307!
982
    code = terrno;
×
983
    freeStbJoinTableList(pNew);
×
984
    return code;
×
985
  }
986
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
307!
987
  if (NULL == pNew->pRightVg) {
307!
988
    code = terrno;
×
989
    freeStbJoinTableList(pNew);
×
990
    return code;
×
991
  }
992
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
307!
993
  if (NULL == pNew->pRightUid) {
307!
994
    code = terrno;
×
995
    freeStbJoinTableList(pNew);
×
996
    return code;
×
997
  }
998

999
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
307✔
1000
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
307✔
1001
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
307✔
1002
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
307✔
1003

1004
  pNew->readIdx = 0;
307✔
1005
  pNew->uidNum = rows;
307✔
1006
  pNew->pNext = NULL;
307✔
1007
  
1008
  if (pCtx->pListTail) {
307!
1009
    pCtx->pListTail->pNext = pNew;
×
1010
    pCtx->pListTail = pNew;
×
1011
  } else {
1012
    pCtx->pListHead = pNew;
307✔
1013
    pCtx->pListTail= pNew;
307✔
1014
  }
1015

1016
  return TSDB_CODE_SUCCESS;
307✔
1017
}
1018

1019
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
307✔
1020
  int32_t                    code = TSDB_CODE_SUCCESS;
307✔
1021
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
307✔
1022
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
307✔
1023
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
307✔
1024
  if (NULL == pVg0) {
307!
1025
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1026
  }
1027
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
307✔
1028
  if (NULL == pVg1) {
307!
1029
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1030
  }
1031
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
307✔
1032
  if (NULL == pUid0) {
307!
1033
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1034
  }
1035
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
307✔
1036
  if (NULL == pUid1) {
307!
1037
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1038
  }
1039

1040
  if (pStbJoin->basic.batchFetch) {
307✔
1041
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
28,127✔
1042
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
27,829✔
1043
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
27,829✔
1044
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
27,829✔
1045
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
27,829✔
1046

1047
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
27,829✔
1048
      if (TSDB_CODE_SUCCESS != code) {
27,829!
1049
        break;
×
1050
      }
1051
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
27,829✔
1052
      if (TSDB_CODE_SUCCESS != code) {
27,829!
1053
        break;
×
1054
      }
1055
    }
1056
  } else {
1057
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
27✔
1058
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
18✔
1059
    
1060
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
18✔
1061
      if (TSDB_CODE_SUCCESS != code) {
18!
1062
        break;
×
1063
      }
1064
    }
1065
  }
1066

1067
  if (TSDB_CODE_SUCCESS == code) {
307!
1068
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
307✔
1069
    if (TSDB_CODE_SUCCESS == code) {
307!
1070
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
307✔
1071
    }
1072
  }
1073

1074
_return:
×
1075

1076
  if (TSDB_CODE_SUCCESS != code) {
307!
1077
    pOperator->pTaskInfo->code = code;
×
1078
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1079
  }
1080
}
307✔
1081

1082

1083
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
6,279✔
1084
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
6,279✔
1085
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
6,279✔
1086

1087
  if (pStbJoin->basic.batchFetch) {
6,279✔
1088
    return;
6,279✔
1089
  }
1090

1091
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
9!
1092
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
9✔
1093
    return;
9✔
1094
  }
1095

1096
  uint64_t* pUid = NULL;
×
1097
  int32_t iter = 0;
×
1098
  int32_t code = 0;
×
1099
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1100
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1101
    if (code) {
×
1102
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1103
    }
1104
  }
1105

1106
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1107
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1108

1109
/*
1110
  // debug only
1111
  iter = 0;
1112
  uint32_t* num = NULL;
1113
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1114
    A S S E R T(*num > 1);
1115
  }
1116
*/  
1117
}
1118

1119
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
6,279✔
1120
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
6,279✔
1121
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
6,279✔
1122

1123
  while (true) {
307✔
1124
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
6,586✔
1125
    if (NULL == pBlock) {
6,586✔
1126
      break;
6,279✔
1127
    }
1128

1129
    pStbJoin->execInfo.prevBlkNum++;
307✔
1130
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
307✔
1131
    
1132
    doBuildStbJoinTableHash(pOperator, pBlock);
307✔
1133
  }
1134

1135
  postProcessStbJoinTableHash(pOperator);
6,279✔
1136

1137
  pStbJoin->ctx.prev.joinBuild = true;
6,279✔
1138
}
6,279✔
1139

1140
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,112✔
1141
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,112✔
1142
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,112✔
1143
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,112✔
1144
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,112✔
1145

1146
  while (pNode) {
28,455✔
1147
    if (pNode->readIdx >= pNode->uidNum) {
28,149✔
1148
      pPrev->pListHead = pNode->pNext;
306✔
1149
      freeStbJoinTableList(pNode);
306✔
1150
      pNode = pPrev->pListHead;
306✔
1151
      continue;
306✔
1152
    }
1153
    
1154
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
27,843✔
1155
    if (*ppRes) {
27,843✔
1156
      return TSDB_CODE_SUCCESS;
806✔
1157
    }
1158

1159
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
27,037!
1160
    pPrev->pListHead->readIdx++;
27,037✔
1161
  }
1162

1163
  *ppRes = NULL;
306✔
1164
  setOperatorCompleted(pOperator);
306✔
1165

1166
  return TSDB_CODE_SUCCESS;
306✔
1167
}
1168

1169
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
7,084✔
1170
  if (pBlock) {
7,084✔
1171
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
806!
1172
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
806✔
1173
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
806!
1174

1175
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
836✔
1176
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
30✔
1177
        if (pSlot == NULL) {
30!
1178
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1179
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1180
        }
1181
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
30✔
1182
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
30✔
1183
        if (code != TSDB_CODE_SUCCESS) {
30!
1184
          return code;
×
1185
        }
1186
        code = blockDataAppendColInfo(pBlock, &colInfo);
30✔
1187
        if (code != TSDB_CODE_SUCCESS) {
30!
1188
          return code;
×
1189
        }
1190
      }
1191
    } else {
1192
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1193
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1194
    }
1195
  }
1196
  return TSDB_CODE_SUCCESS;
7,084✔
1197
}
1198

1199
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
7,228✔
1200
  int32_t                    code = TSDB_CODE_SUCCESS;
7,228✔
1201
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
7,228✔
1202
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
7,228✔
1203

1204
  QRY_PARAM_CHECK(pRes);
7,228!
1205
  if (pOperator->status == OP_EXEC_DONE) {
7,228✔
1206
    return code;
144✔
1207
  }
1208

1209
  int64_t st = 0;
7,084✔
1210
  if (pOperator->cost.openCost == 0) {
7,084✔
1211
    st = taosGetTimestampUs();
6,279✔
1212
  }
1213

1214
  if (!pStbJoin->ctx.prev.joinBuild) {
7,084✔
1215
    buildStbJoinTableList(pOperator);
6,279✔
1216
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
6,279✔
1217
      setOperatorCompleted(pOperator);
5,972✔
1218
      goto _return;
5,972✔
1219
    }
1220
  }
1221

1222
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
1,112!
1223
  if (*pRes) {
1,112!
1224
    goto _return;
×
1225
  }
1226

1227
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
1,112!
1228

1229
_return:
1,112✔
1230
  if (pOperator->cost.openCost == 0) {
7,084✔
1231
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
6,279✔
1232
  }
1233

1234
  if (code) {
7,084!
1235
    qError("%s failed since %s", __func__, tstrerror(code));
×
1236
    pOperator->pTaskInfo->code = code;
×
1237
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1238
  } else {
1239
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
7,084✔
1240
  }
1241
  return code;
7,084✔
1242
}
1243

1244
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
4,844✔
1245
  int32_t                   code = TSDB_CODE_SUCCESS;
4,844✔
1246
  int32_t                   lino = 0;
4,844✔
1247
  SVTableScanOperatorParam* pVScan = NULL;
4,844✔
1248
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,844!
1249
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,844!
1250

1251
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
4,844✔
1252
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
4,844!
1253

1254
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
4,844!
1255
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
4,844!
1256
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
4,844✔
1257
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
4,844!
1258
  pVScan->uid = uid;
4,844✔
1259

1260
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
4,844✔
1261
  (*ppRes)->downstreamIdx = 0;
4,844✔
1262
  (*ppRes)->value = pVScan;
4,844✔
1263
  (*ppRes)->reUse = false;
4,844✔
1264

1265
  return TSDB_CODE_SUCCESS;
4,844✔
1266
_return:
×
1267
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1268
  if (pVScan) {
×
1269
    taosArrayDestroy(pVScan->pOpParamArray);
×
1270
    taosMemoryFreeClear(pVScan);
×
1271
  }
1272
  if (*ppRes) {
×
1273
    taosArrayDestroy((*ppRes)->pChildren);
×
1274
    taosMemoryFreeClear(*ppRes);
×
1275
  }
1276
  return code;
×
1277
}
1278

1279
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,084✔
1280
  int32_t                    lino = 0;
2,084✔
1281
  SOperatorInfo*             operator=(SOperatorInfo*) param;
2,084✔
1282
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
2,084✔
1283

1284
  if (TSDB_CODE_SUCCESS != code) {
2,084!
1285
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1286
    if (operator->pTaskInfo->code != code) {
×
1287
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1288
             tstrerror(operator->pTaskInfo->code));
1289
    } else {
1290
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1291
    }
1292
    goto _return;
×
1293
  }
1294

1295
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
2,084!
1296
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
2,084!
1297

1298
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
2,084✔
1299
  QUERY_CHECK_CODE(code, lino, _return);
2,084!
1300

1301
  taosMemoryFreeClear(pMsg->pData);
2,084!
1302

1303
  code = tsem_post(&pScanResInfo->vtbScan.ready);
2,084✔
1304
  QUERY_CHECK_CODE(code, lino, _return);
2,084!
1305

1306
  return code;
2,084✔
1307
_return:
×
1308
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1309
  return code;
×
1310
}
1311

1312
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
2,084✔
1313
  int32_t                    code = TSDB_CODE_SUCCESS;
2,084✔
1314
  int32_t                    lino = 0;
2,084✔
1315
  char*                      buf1 = NULL;
2,084✔
1316
  SUseDbReq*                 pReq = NULL;
2,084✔
1317
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
2,084✔
1318

1319
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
2,084!
1320
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
2,084!
1321
  code = tNameGetFullDbName(name, pReq->db);
2,084✔
1322
  QUERY_CHECK_CODE(code, lino, _return);
2,084!
1323
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
2,084✔
1324
  buf1 = taosMemoryCalloc(1, contLen);
2,084!
1325
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
2,084!
1326
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
2,084✔
1327
  if (tempRes < 0) {
2,084!
1328
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1329
  }
1330

1331
  // send the fetch remote task result request
1332
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,084!
1333
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
2,084!
1334

1335
  pMsgSendInfo->param = pOperator;
2,084✔
1336
  pMsgSendInfo->msgInfo.pData = buf1;
2,084✔
1337
  pMsgSendInfo->msgInfo.len = contLen;
2,084✔
1338
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
2,084✔
1339
  pMsgSendInfo->fp = dynProcessUseDbRsp;
2,084✔
1340
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
2,084✔
1341

1342
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
2,084✔
1343
  QUERY_CHECK_CODE(code, lino, _return);
2,084!
1344

1345
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
2,084✔
1346
  QUERY_CHECK_CODE(code, lino, _return);
2,084!
1347

1348
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
2,084✔
1349
  QUERY_CHECK_CODE(code, lino, _return);
2,084!
1350

1351
_return:
2,084✔
1352
  if (code) {
2,084!
1353
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1354
     taosMemoryFree(buf1);
×
1355
  }
1356
  taosMemoryFree(pReq);
2,084!
1357
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
2,084✔
1358
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
2,084!
1359
  return code;
2,084✔
1360
}
1361

1362
int dynVgInfoComp(const void* lp, const void* rp) {
2,044✔
1363
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
2,044✔
1364
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
2,044✔
1365
  if (pLeft->hashBegin < pRight->hashBegin) {
2,044!
1366
    return -1;
2,044✔
1367
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1368
    return 1;
×
1369
  }
1370

1371
  return 0;
×
1372
}
1373

1374
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
11,931✔
1375
  if (NULL == dbInfo) {
11,931!
1376
    return TSDB_CODE_SUCCESS;
×
1377
  }
1378

1379
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
11,931!
1380
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
2,084✔
1381
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
2,084✔
1382
    if (NULL == dbInfo->vgArray) {
2,084!
1383
      return terrno;
×
1384
    }
1385

1386
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
2,084✔
1387
    while (pIter) {
6,212✔
1388
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
8,256!
1389
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1390
        return terrno;
×
1391
      }
1392

1393
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
4,128✔
1394
    }
1395

1396
    taosArraySort(dbInfo->vgArray, sort_func);
2,084✔
1397
  }
1398

1399
  return TSDB_CODE_SUCCESS;
11,931✔
1400
}
1401

1402
int32_t dynHashValueComp(void const* lp, void const* rp) {
17,341✔
1403
  uint32_t*    key = (uint32_t*)lp;
17,341✔
1404
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
17,341✔
1405

1406
  if (*key < pVg->hashBegin) {
17,341!
1407
    return -1;
×
1408
  } else if (*key > pVg->hashEnd) {
17,341✔
1409
    return 1;
5,410✔
1410
  }
1411

1412
  return 0;
11,931✔
1413
}
1414

1415
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
11,931✔
1416
  int32_t code = 0;
11,931✔
1417
  int32_t lino = 0;
11,931✔
1418
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
11,931✔
1419
  QUERY_CHECK_CODE(code, lino, _return);
11,931!
1420

1421
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
11,931✔
1422
  if (vgNum <= 0) {
11,931!
1423
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1424
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1425
  }
1426

1427
  SVgroupInfo* vgInfo = NULL;
11,931✔
1428
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1429
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
11,931✔
1430
  int32_t offset = (int32_t)strlen(tbFullName);
11,931✔
1431

1432
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
11,931✔
1433
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
23,862✔
1434
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
11,931!
1435

1436
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
11,931✔
1437
  if (NULL == vgInfo) {
11,931!
1438
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1439
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1440
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1441
  }
1442

1443
  *vgId = vgInfo->vgId;
11,931✔
1444

1445
_return:
11,931✔
1446
  return code;
11,931✔
1447
}
1448

1449
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
44,930✔
1450
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
44,930✔
1451
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
44,930✔
1452
  SArray *                   pColList = pVtbScan->readColList;
44,930✔
1453
  if (pVtbScan->scanAllCols) {
44,930✔
1454
    return true;
2,340✔
1455
  }
1456
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
240,140✔
1457
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
214,274✔
1458
      return true;
16,724✔
1459
    }
1460
  }
1461
  return false;
25,866✔
1462
}
1463

1464
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
19,187✔
1465
  int32_t                    code = TSDB_CODE_SUCCESS;
19,187✔
1466
  int32_t                    line = 0;
19,187✔
1467
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
19,187✔
1468
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
19,187✔
1469
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
19,187✔
1470
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
19,187✔
1471
  SUseDbOutput*              output = NULL;
19,187✔
1472
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
19,187✔
1473

1474
  QRY_PARAM_CHECK(dbVgInfo);
19,187!
1475

1476
  if (find == NULL) {
19,187✔
1477
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
2,084!
1478
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
2,084✔
1479
    QUERY_CHECK_CODE(code, line, _return);
2,084!
1480
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
2,084✔
1481
    QUERY_CHECK_CODE(code, line, _return);
2,084!
1482
  } else {
1483
    output = *find;
17,103✔
1484
  }
1485

1486
  *dbVgInfo = output->dbVgroup;
19,187✔
1487
  return code;
19,187✔
1488
_return:
×
1489
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1490
  freeUseDbOutput(output);
×
1491
  return code;
×
1492
}
1493

1494
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
19,187✔
1495
  int32_t     code = TSDB_CODE_SUCCESS;
19,187✔
1496
  int32_t     line = 0;
19,187✔
1497

1498
  const char *first_dot = strchr(colref, '.');
19,187✔
1499
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
19,187!
1500

1501
  const char *second_dot = strchr(first_dot + 1, '.');
19,187✔
1502
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
19,187!
1503

1504
  size_t db_len = first_dot - colref;
19,187✔
1505
  size_t table_len = second_dot - first_dot - 1;
19,187✔
1506
  size_t col_len = strlen(second_dot + 1);
19,187✔
1507

1508
  *refDb = taosMemoryMalloc(db_len + 1);
19,187!
1509
  *refTb = taosMemoryMalloc(table_len + 1);
19,187!
1510
  *refCol = taosMemoryMalloc(col_len + 1);
19,187!
1511
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
19,187!
1512
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
19,187!
1513
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
19,187!
1514

1515
  tstrncpy(*refDb, colref, db_len + 1);
19,187✔
1516
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
19,187✔
1517
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
19,187✔
1518

1519
  return TSDB_CODE_SUCCESS;
19,187✔
1520
_return:
×
1521
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1522
  if (*refDb) {
×
1523
    taosMemoryFree(*refDb);
×
1524
    *refDb = NULL;
×
1525
  }
1526
  if (*refTb) {
×
1527
    taosMemoryFree(*refTb);
×
1528
    *refTb = NULL;
×
1529
  }
1530
  if (*refCol) {
×
1531
    taosMemoryFree(*refCol);
×
1532
    *refCol = NULL;
×
1533
  }
1534
  return code;
×
1535
}
1536

1537
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
152,498✔
1538
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
152,498✔
1539
      strlen(expectTbName) == varDataLen(tbName) &&
74,106!
1540
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
74,106✔
1541
      strlen(expectDbName) == varDataLen(dbName)) {
73,650!
1542
    return true;
73,650✔
1543
  }
1544
  return false;
78,848✔
1545
}
1546

1547
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
73,650✔
1548
  int32_t          code = TSDB_CODE_SUCCESS;
73,650✔
1549
  int32_t          line = 0;
73,650✔
1550

1551
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
73,650✔
1552
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
73,650✔
1553
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
73,650✔
1554
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
73,650✔
1555
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
73,650✔
1556
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
73,650✔
1557

1558
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
73,650!
1559
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
73,650!
1560
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
73,650!
1561
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
73,650!
1562
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
73,650!
1563
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
73,650!
1564

1565
  if (colDataIsNull_s(pRefCol, index)) {
147,300!
1566
    pInfo->colrefName = NULL;
28,475✔
1567
  } else {
1568
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
45,175!
1569
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
45,175!
1570
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
45,175!
1571
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
45,175!
1572
  }
1573

1574
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
73,650!
1575
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
73,650!
1576
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
73,650!
1577
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
73,650!
1578

1579
  if (!colDataIsNull_s(pUidCol, index)) {
147,300!
1580
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
73,650✔
1581
  }
1582
  if (!colDataIsNull_s(pColIdCol, index)) {
147,300!
1583
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
45,175✔
1584
  }
1585
  if (!colDataIsNull_s(pVgIdCol, index)) {
147,300!
1586
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
73,650✔
1587
  }
1588

1589
_return:
×
1590
  return code;
73,650✔
1591
}
1592

1593
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
1,514✔
1594
  int32_t                    code = TSDB_CODE_SUCCESS;
1,514✔
1595
  int32_t                    line = 0;
1,514✔
1596

1597
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
1,514✔
1598
    return code;
1,120✔
1599
  }
1600

1601
  if (pVtbScan->existOrgTbVg == NULL) {
394!
1602
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
1603
    pVtbScan->curOrgTbVg = NULL;
×
1604
  }
1605

1606
  if (pVtbScan->curOrgTbVg != NULL) {
394✔
1607
    // which means rversion has changed
1608
    void*   pCurIter = NULL;
51✔
1609
    SArray* tmpArray = NULL;
51✔
1610
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
139✔
1611
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
88✔
1612
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
88✔
1613
        if (tmpArray == NULL) {
20✔
1614
          tmpArray = taosArrayInit(1, sizeof(int32_t));
16✔
1615
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
16!
1616
        }
1617
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
20!
1618
      }
1619
    }
1620
    if (tmpArray == NULL) {
51✔
1621
      return TSDB_CODE_SUCCESS;
35✔
1622
    }
1623
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
16!
1624
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
16✔
1625
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
16!
1626
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
1627
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
1628
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
1629
        }
1630
        taosArrayDestroy(expiredInfo);
×
1631
      }
1632
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
16!
1633
        taosArrayDestroy(tmpArray);
×
1634
      }
1635
    }
1636
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
16✔
1637
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
16✔
1638
    taosHashClear(pVtbScan->curOrgTbVg);
16✔
1639
    pVtbScan->needRedeploy = true;
16✔
1640
    pVtbScan->rversion = rversion;
16✔
1641
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
16✔
1642
  }
1643
  return code;
343✔
1644
_return:
×
1645
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1646
  return code;
×
1647
}
1648

1649
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
123✔
1650
  int32_t                    code =TSDB_CODE_SUCCESS;
123✔
1651
  int32_t                    line = 0;
123✔
1652
  char*                      refDbName = NULL;
123✔
1653
  char*                      refTbName = NULL;
123✔
1654
  char*                      refColName = NULL;
123✔
1655
  SDBVgInfo*                 dbVgInfo = NULL;
123✔
1656
  SName                      name = {0};
123✔
1657
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
123✔
1658
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
123✔
1659

1660
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
123✔
1661
  QUERY_CHECK_CODE(code, line, _return);
123!
1662

1663
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
123✔
1664

1665
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
123✔
1666
  QUERY_CHECK_CODE(code, line, _return);
123!
1667

1668
  code = tNameGetFullDbName(&name, dbFname);
123✔
1669
  QUERY_CHECK_CODE(code, line, _return);
123!
1670

1671
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
123✔
1672
  QUERY_CHECK_CODE(code, line, _return);
123!
1673

1674
_return:
123✔
1675
  taosMemoryFree(refDbName);
123!
1676
  taosMemoryFree(refTbName);
123!
1677
  taosMemoryFree(refColName);
123!
1678
  return code;
123✔
1679
}
1680

1681
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
1,148✔
1682
  int32_t                    code = TSDB_CODE_SUCCESS;
1,148✔
1683
  int32_t                    line = 0;
1,148✔
1684
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,148✔
1685
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,148✔
1686
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,148✔
1687
  SArray*                    pColRefArray = NULL;
1,148✔
1688
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
1,148✔
1689

1690
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,148✔
1691
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
1,148!
1692

1693
  while (true) {
2,362✔
1694
    SSDataBlock *pChildInfo = NULL;
3,510✔
1695
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
3,510✔
1696
    if (pChildInfo == NULL) {
3,510✔
1697
      break;
1,148✔
1698
    }
1699
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
2,362✔
1700
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
2,362✔
1701
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
2,362✔
1702

1703
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
2,362!
1704
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
2,362!
1705
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
2,362!
1706

1707
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
128,734✔
1708
      if (!colDataIsNull_s(pStbNameCol, i)) {
252,744!
1709
        char* stbrawname = colDataGetData(pStbNameCol, i);
126,372!
1710
        char* dbrawname = colDataGetData(pDbNameCol, i);
126,372!
1711
        char *ctbName = colDataGetData(pTableNameCol, i);
126,372!
1712

1713
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
126,372✔
1714
          SColRefInfo info = {0};
71,852✔
1715
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
71,852✔
1716
          QUERY_CHECK_CODE(code, line, _return);
71,852!
1717

1718
          if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
71,852!
1719
            continue;
×
1720
          }
1721

1722
          if (pVtbScan->curOrgTbVg == NULL && pTaskInfo->pStreamRuntimeInfo) {
71,852✔
1723
            pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
10✔
1724
            QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
10!
1725
          }
1726

1727
          if (info.colrefName && pTaskInfo->pStreamRuntimeInfo) {
71,852✔
1728
            int32_t vgId;
1729
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
91✔
1730
            QUERY_CHECK_CODE(code, line, _return);
91!
1731
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
91✔
1732
            QUERY_CHECK_CODE(code, line, _return);
91!
1733
          }
1734

1735
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
71,852✔
1736
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
4,522✔
1737
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
4,522!
1738
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
9,044!
1739
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
4,522✔
1740
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
9,044!
1741
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
4,522✔
1742
            QUERY_CHECK_CODE(code, line, _return);
4,522!
1743
          } else {
1744
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
67,330✔
1745
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
67,330!
1746
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
67,330✔
1747
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
67,330!
1748
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
134,660!
1749
          }
1750
        }
1751
      }
1752
    }
1753
  }
1754

1755
  code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
1,148✔
1756
  QUERY_CHECK_CODE(code, line, _return);
1,148✔
1757

1758
_return:
1,141✔
1759
  if (code) {
1,148✔
1760
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
7!
1761
  }
1762
  return code;
1,148✔
1763
}
1764

1765
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
366✔
1766
  int32_t                    code = TSDB_CODE_SUCCESS;
366✔
1767
  int32_t                    line = 0;
366✔
1768
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
366✔
1769
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
366✔
1770
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
366✔
1771
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
366✔
1772
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
366✔
1773
  int32_t                    rversion = 0;
366✔
1774

1775
  while (true) {
738✔
1776
    SSDataBlock *pTableInfo = NULL;
1,104✔
1777
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
1,104✔
1778
    if (pTableInfo == NULL) {
1,104✔
1779
      break;
366✔
1780
    }
1781

1782
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
738✔
1783
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
738✔
1784
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
738✔
1785

1786
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
738!
1787
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
738!
1788
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
738!
1789

1790
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
26,864✔
1791
      if (!colDataIsNull_s(pRefVerCol, i)) {
52,252!
1792
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
26,126✔
1793
      }
1794

1795
      if (!colDataIsNull_s(pTableNameCol, i)) {
52,252!
1796
        char* tbrawname = colDataGetData(pTableNameCol, i);
26,126!
1797
        char* dbrawname = colDataGetData(pDbNameCol, i);
26,126!
1798
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
26,126!
1799
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
26,126!
1800

1801
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
26,126✔
1802
          SColRefInfo info = {0};
1,798✔
1803
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
1,798✔
1804
          QUERY_CHECK_CODE(code, line, _return);
1,798!
1805

1806
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
1,798!
1807
            if (pVtbScan->curOrgTbVg == NULL) {
32✔
1808
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
6✔
1809
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
6!
1810
            }
1811
            int32_t vgId;
1812
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
32✔
1813
            QUERY_CHECK_CODE(code, line, _return);
32!
1814
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
32✔
1815
            QUERY_CHECK_CODE(code, line, _return);
32!
1816
          }
1817

1818
          QUERY_CHECK_NULL(taosArrayPush(pColRefInfo, &info), code, line, _return, terrno)
1,798!
1819
        }
1820
      }
1821
    }
1822
  }
1823
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
366✔
1824
  QUERY_CHECK_CODE(code, line, _return);
366✔
1825

1826
_return:
357✔
1827
  if (code) {
366✔
1828
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
9!
1829
  }
1830
  return code;
366✔
1831
}
1832

1833
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
18,252✔
1834
  int32_t                    code = TSDB_CODE_SUCCESS;
18,252✔
1835
  int32_t                    line = 0;
18,252✔
1836
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
18,252✔
1837
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
18,252✔
1838
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
18,252✔
1839

1840
  SArray *tmpArray = NULL;
18,252✔
1841
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
18,252✔
1842
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
18,252!
1843
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
36✔
1844
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
20✔
1845
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
20✔
1846
      QUERY_CHECK_CODE(code, line, _return);
20!
1847
      if (pVtbScan->newAddedVgInfo == NULL) {
20✔
1848
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
10✔
1849
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
10!
1850
      }
1851
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
20✔
1852
      QUERY_CHECK_CODE(code, line, _return);
20!
1853
    }
1854
    pVtbScan->needRedeploy = false;
16✔
1855
  } else {
1856
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
18,236✔
1857
    QUERY_CHECK_CODE(code, line, _return);
18,236!
1858
  }
1859

1860
_return:
×
1861
  taosArrayClear(tmpArray);
18,252✔
1862
  taosArrayDestroy(tmpArray);
18,252✔
1863
  if (code) {
18,252✔
1864
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
18,236!
1865
  }
1866
  return code;
18,252✔
1867
}
1868

1869
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
4,844✔
1870
  int32_t                    code = TSDB_CODE_SUCCESS;
4,844✔
1871
  int32_t                    line = 0;
4,844✔
1872
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,844✔
1873
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,844✔
1874
  SDBVgInfo*                 dbVgInfo = NULL;
4,844✔
1875

1876
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
78,018✔
1877
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
73,174✔
1878
    *uid = pKV->uid;
73,174✔
1879
    *vgId = pKV->vgId;
73,174✔
1880
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
73,174✔
1881
      char*   refDbName = NULL;
19,064✔
1882
      char*   refTbName = NULL;
19,064✔
1883
      char*   refColName = NULL;
19,064✔
1884
      SName   name = {0};
19,064✔
1885
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
19,064✔
1886
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
19,064✔
1887

1888
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
19,064✔
1889
      QUERY_CHECK_CODE(code, line, _return);
19,064!
1890

1891
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
19,064✔
1892

1893
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
19,064✔
1894
      QUERY_CHECK_CODE(code, line, _return);
19,064!
1895
      code = tNameGetFullDbName(&name, dbFname);
19,064✔
1896
      QUERY_CHECK_CODE(code, line, _return);
19,064!
1897
      code = tNameGetFullTableName(&name, orgTbFName);
19,064✔
1898
      QUERY_CHECK_CODE(code, line, _return);
19,064!
1899

1900
      void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
19,064✔
1901
      if (!pVal) {
19,064✔
1902
        SOrgTbInfo map = {0};
11,808✔
1903
        code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
11,808✔
1904
        QUERY_CHECK_CODE(code, line, _return);
11,808!
1905
        tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
11,808✔
1906
        map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
11,808✔
1907
        QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno)
11,808!
1908
        SColIdNameKV colIdNameKV = {0};
11,808✔
1909
        colIdNameKV.colId = pKV->colId;
11,808✔
1910
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
11,808✔
1911
        QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno)
23,616!
1912
        code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
11,808✔
1913
        QUERY_CHECK_CODE(code, line, _return);
11,808!
1914
      } else {
1915
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
7,256✔
1916
        SColIdNameKV colIdNameKV = {0};
7,256✔
1917
        colIdNameKV.colId = pKV->colId;
7,256✔
1918
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
7,256✔
1919
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
14,512!
1920
      }
1921
      taosMemoryFree(refDbName);
19,064!
1922
      taosMemoryFree(refTbName);
19,064!
1923
      taosMemoryFree(refColName);
19,064!
1924
    }
1925
  }
1926

1927
_return:
4,844✔
1928
  if (code) {
4,844!
1929
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1930
  }
1931
  return code;
4,844✔
1932
}
1933

1934
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
4,844✔
1935
  int32_t                    code = TSDB_CODE_SUCCESS;
4,844✔
1936
  int32_t                    line = 0;
4,844✔
1937
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,844✔
1938
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,844✔
1939
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
4,844✔
1940

1941
  pVtbScan->vtbScanParam = NULL;
4,844✔
1942
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
4,844✔
1943
  QUERY_CHECK_CODE(code, line, _return);
4,844!
1944

1945
  void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
4,844✔
1946
  while (pIter != NULL) {
16,652✔
1947
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
11,808✔
1948
    SOperatorParam*  pExchangeParam = NULL;
11,808✔
1949
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
11,808✔
1950
    if (addr != NULL) {
11,808✔
1951
      code = buildExchangeOperatorParamForVScanEx(&pExchangeParam, 0, pMap, pTaskInfo->id.taskId, addr);
20✔
1952
      QUERY_CHECK_CODE(code, line, _return);
20!
1953
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
20✔
1954
      QUERY_CHECK_CODE(code, line, _return);
20!
1955
    } else {
1956
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
11,788✔
1957
      QUERY_CHECK_CODE(code, line, _return);
11,788!
1958
    }
1959
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
23,616!
1960
    pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
11,808✔
1961
  }
1962

1963
  SOperatorParam*  pExchangeParam = NULL;
4,844✔
1964
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
4,844✔
1965
  QUERY_CHECK_CODE(code, line, _return);
4,844!
1966
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
4,844✔
1967

1968
_return:
4,844✔
1969
  if (code) {
4,844!
1970
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1971
  }
1972
  return code;
4,844✔
1973
}
1974

1975
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
41,384✔
1976
  int32_t                    code = TSDB_CODE_SUCCESS;
41,384✔
1977
  int32_t                    line = 0;
41,384✔
1978
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
41,384✔
1979
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
41,384✔
1980
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
41,384✔
1981

1982
  pVtbScan->orgTbVgColMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
41,384✔
1983
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno)
41,384!
1984
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
41,384✔
1985

1986
  while (true) {
1987
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
44,730✔
1988
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
39,886✔
1989
      QUERY_CHECK_CODE(code, line, _return);
39,886!
1990
    } else {
1991
      taosHashClear(pVtbScan->orgTbVgColMap);
4,844✔
1992
      SArray* pColRefInfo = NULL;
4,844✔
1993
      if (pVtbScan->isSuperTable) {
4,844✔
1994
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
4,487✔
1995
      } else {
1996
        pColRefInfo = pInfo->vtbScan.colRefInfo;
357✔
1997
      }
1998
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
4,844!
1999

2000
      tb_uid_t uid = 0;
4,844✔
2001
      int32_t  vgId = 0;
4,844✔
2002
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
4,844✔
2003
      QUERY_CHECK_CODE(code, line, _return);
4,844!
2004

2005
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
4,844✔
2006
      QUERY_CHECK_CODE(code, line, _return);
4,844!
2007

2008
      // reset downstream operator's status
2009
      pVtbScanOp->status = OP_NOT_OPENED;
4,844✔
2010
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
4,844✔
2011
      QUERY_CHECK_CODE(code, line, _return);
4,844!
2012
    }
2013

2014
    if (*pRes) {
44,730✔
2015
      // has result, still read data from this table.
2016
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
39,910✔
2017
      break;
39,910✔
2018
    } else {
2019
      // no result, read next table.
2020
      pVtbScan->curTableIdx++;
4,820✔
2021
      if (pVtbScan->isSuperTable) {
4,820✔
2022
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
4,463✔
2023
          setOperatorCompleted(pOperator);
1,117✔
2024
          break;
1,117✔
2025
        }
2026
      } else {
2027
        setOperatorCompleted(pOperator);
357✔
2028
        break;
357✔
2029
      }
2030
    }
2031
  }
2032

2033
_return:
41,384✔
2034
  taosHashCleanup(pVtbScan->orgTbVgColMap);
41,384✔
2035
  pVtbScan->orgTbVgColMap = NULL;
41,384✔
2036
  if (code) {
41,384!
2037
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2038
  }
2039
  return code;
41,384✔
2040
}
2041

2042
int32_t virtualNormalChildTableScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
678✔
2043
  int32_t                    code = TSDB_CODE_SUCCESS;
678✔
2044
  int32_t                    line = 0;
678✔
2045
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
678✔
2046
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
678✔
2047

2048
  if (pVtbScan->colRefInfo == NULL) {
678✔
2049
    pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
366✔
2050
    QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
366!
2051
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
366✔
2052
    QUERY_CHECK_CODE(code, line, _return);
366✔
2053
  }
2054

2055
  // no child table, return
2056
  code = virtualTableScanGetNext(pOperator, pRes);
669✔
2057
  QUERY_CHECK_CODE(code, line, _return);
669!
2058

2059
_return:
669✔
2060
  if (code) {
678✔
2061
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
9!
2062
  }
2063
  return code;
678✔
2064
}
2065

2066
int32_t virtualSuperTableScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
40,722✔
2067
  int32_t                    code = TSDB_CODE_SUCCESS;
40,722✔
2068
  int32_t                    line = 0;
40,722✔
2069
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
40,722✔
2070
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
40,722✔
2071

2072
  if (pVtbScan->childTableMap == NULL) {
40,722✔
2073
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
1,148✔
2074
    QUERY_CHECK_CODE(code, line, _return);
1,148✔
2075
  }
2076

2077
  size_t num = taosArrayGetSize(pVtbScan->childTableList);
40,715✔
2078

2079
  // no child table, return
2080
  if (num == 0) {
40,715!
2081
    setOperatorCompleted(pOperator);
×
2082
    return code;
×
2083
  }
2084

2085
  code = virtualTableScanGetNext(pOperator, pRes);
40,715✔
2086
  QUERY_CHECK_CODE(code, line, _return);
40,715!
2087

2088
_return:
40,715✔
2089
  if (code) {
40,722✔
2090
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
7!
2091
  }
2092
  return code;
40,722✔
2093
}
2094

2095
int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
59,636✔
2096
  int32_t                    code = TSDB_CODE_SUCCESS;
59,636✔
2097
  int32_t                    line = 0;
59,636✔
2098
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
59,636✔
2099
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
59,636✔
2100

2101
  QRY_PARAM_CHECK(pRes);
59,636!
2102
  if (pOperator->status == OP_EXEC_DONE) {
59,636!
2103
    return code;
×
2104
  }
2105

2106
  int64_t st = 0;
59,636✔
2107
  if (pOperator->cost.openCost == 0) {
59,636✔
2108
    st = taosGetTimestampUs();
41,345✔
2109
  }
2110

2111
  if (pVtbScan->needRedeploy) {
59,636✔
2112
    code = virtualTableScanCheckNeedRedeploy(pOperator);
18,252✔
2113
    QUERY_CHECK_CODE(code, line, _return);
18,252✔
2114
  }
2115

2116
  if (pVtbScan->isSuperTable) {
41,400✔
2117
    code = virtualSuperTableScan(pOperator, pRes);
40,722✔
2118
  } else {
2119
    code = virtualNormalChildTableScan(pOperator, pRes);
678✔
2120
  }
2121
  QUERY_CHECK_CODE(code, line, _return);
41,400✔
2122

2123
  return code;
41,384✔
2124

2125
_return:
18,252✔
2126
  if (pOperator->cost.openCost == 0) {
18,252✔
2127
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
10✔
2128
  }
2129
  if (code) {
18,252!
2130
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
18,252!
2131
    pOperator->pTaskInfo->code = code;
18,252✔
2132
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
18,252!
2133
  }
2134
  return code;
×
2135
}
2136

2137
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
6,279✔
2138
  if (batchFetch) {
6,279✔
2139
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,270✔
2140
    if (NULL == pPrev->leftHash) {
6,270!
2141
      return terrno;
×
2142
    }
2143
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,270✔
2144
    if (NULL == pPrev->rightHash) {
6,270!
2145
      return terrno;
×
2146
    }
2147
  } else {
2148
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
9✔
2149
    if (NULL == pPrev->leftCache) {
9!
2150
      return terrno;
×
2151
    }
2152
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
9✔
2153
    if (NULL == pPrev->rightCache) {
9!
2154
      return terrno;
×
2155
    }
2156
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
9✔
2157
    if (NULL == pPrev->onceTable) {
9!
2158
      return terrno;
×
2159
    }
2160
  }
2161

2162
  return TSDB_CODE_SUCCESS;
6,279✔
2163
}
2164

2165
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
1,185✔
2166
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2167
  int32_t      code = TSDB_CODE_SUCCESS;
1,185✔
2168
  int32_t      line = 0;
1,185✔
2169

2170
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
1,185✔
2171
  QUERY_CHECK_CODE(code, line, _return);
1,185!
2172

2173
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
1,185✔
2174
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
1,185✔
2175
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
1,185✔
2176
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
1,185✔
2177
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
1,185✔
2178
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
1,185✔
2179
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
1,185✔
2180
  pInfo->vtbScan.needRedeploy = false;
1,185✔
2181
  pInfo->vtbScan.pMsgCb = pMsgCb;
1,185✔
2182
  pInfo->vtbScan.curTableIdx = 0;
1,185✔
2183
  pInfo->vtbScan.lastTableIdx = -1;
1,185✔
2184
  pInfo->vtbScan.dynTbUid = 0;
1,185✔
2185
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
1,185!
2186
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
1,185!
2187
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
1,185!
2188
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
1,185!
2189
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,185✔
2190
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
1,185!
2191
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
6,832!
2192
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
5,647✔
2193
    int32_t vgId = (int32_t)valueNode->datum.i;
5,647✔
2194
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
5,647✔
2195
    QUERY_CHECK_CODE(code, line, _return);
5,647!
2196
  }
2197

2198
  if (pPhyciNode->dynTbname) {
1,185!
2199
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2200
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2201
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2202
      if (pValue != NULL && pValue->isTbname) {
×
2203
        pInfo->vtbScan.dynTbUid = pValue->uid;
×
2204
        break;
×
2205
      }
2206
    }
2207
  }
2208

2209
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
1,185!
2210
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
1,185!
2211

2212
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
10,717!
2213
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
9,532✔
2214
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
9,532!
2215
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
19,064!
2216
  }
2217

2218
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
1,185✔
2219
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
1,185!
2220

2221
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,185✔
2222
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
1,185!
2223

2224
  return code;
1,185✔
2225
_return:
×
2226
  // no need to destroy array and hashmap allocated in this function,
2227
  // since the operator's destroy function will take care of it
2228
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2229
  return code;
×
2230
}
2231

2232
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
18,698✔
2233
  SDynQueryCtrlOperatorInfo* pDyn = pOper->info;
18,698✔
2234
  pOper->status = OP_NOT_OPENED;
18,698✔
2235

2236
  switch (pDyn->qType) {
18,698!
2237
    case DYN_QTYPE_STB_HASH:{
×
2238
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
×
2239
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
×
2240
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
×
2241
      
2242
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
×
2243
      if (TSDB_CODE_SUCCESS != code) {
×
2244
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
2245
        return code;
×
2246
      }
2247
      pStbJoin->ctx.prev.pListHead = NULL;
×
2248
      pStbJoin->ctx.prev.joinBuild = false;
×
2249
      pStbJoin->ctx.prev.pListTail = NULL;
×
2250
      pStbJoin->ctx.prev.tableNum = 0;
×
2251

2252
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
×
2253
      break; 
×
2254
    }
2255
    case DYN_QTYPE_VTB_SCAN: {
18,698✔
2256
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
18,698✔
2257
      
2258
      if (pVtbScan->orgTbVgColMap) {
18,698!
2259
        taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
2260
        taosHashCleanup(pVtbScan->orgTbVgColMap);
×
2261
        pVtbScan->orgTbVgColMap = NULL;
×
2262
      }
2263
      if (pVtbScan->pRsp) {
18,698!
2264
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
2265
        taosMemoryFreeClear(pVtbScan->pRsp);
×
2266
      }
2267
      if (pVtbScan->colRefInfo) {
18,698✔
2268
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
366✔
2269
        pVtbScan->colRefInfo = NULL;
366✔
2270
      }
2271
      if (pVtbScan->childTableMap) {
18,698✔
2272
        taosHashCleanup(pVtbScan->childTableMap);
28✔
2273
        pVtbScan->childTableMap = NULL;
28✔
2274
      }
2275
      if (pVtbScan->childTableList) {
18,698!
2276
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
18,698✔
2277
      }
2278
      pVtbScan->curTableIdx = 0;
18,698✔
2279
      pVtbScan->lastTableIdx = -1;
18,698✔
2280
      break;
18,698✔
2281
    }
UNCOV
2282
    default:
×
UNCOV
2283
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
2284
      break;
×
2285
  }
2286
  return 0;
18,698✔
2287
}
2288

2289
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
7,464✔
2290
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
2291
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
2292
  QRY_PARAM_CHECK(pOptrInfo);
7,464!
2293

2294
  int32_t                    code = TSDB_CODE_SUCCESS;
7,464✔
2295
  int32_t                    line = 0;
7,464✔
2296
  __optr_fn_t                nextFp = NULL;
7,464✔
2297
  SOperatorInfo*             pOperator = NULL;
7,464✔
2298
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
7,464!
2299
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
7,464!
2300

2301
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
7,464!
2302
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
7,464!
2303

2304
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
7,464✔
2305

2306
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
7,464✔
2307
  QUERY_CHECK_CODE(code, line, _error);
7,464!
2308

2309
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
7,464✔
2310
                  pInfo, pTaskInfo);
2311

2312
  pInfo->qType = pPhyciNode->qType;
7,464✔
2313
  switch (pInfo->qType) {
7,464!
2314
    case DYN_QTYPE_STB_HASH:
6,279✔
2315
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
6,279✔
2316
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
6,279✔
2317
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
6,279✔
2318
      QUERY_CHECK_CODE(code, line, _error);
6,279!
2319
      nextFp = seqStableJoin;
6,279✔
2320
      break;
6,279✔
2321
    case DYN_QTYPE_VTB_SCAN:
1,185✔
2322
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
1,185✔
2323
      QUERY_CHECK_CODE(code, line, _error);
1,185!
2324
      nextFp = vtbScan;
1,185✔
2325
      break;
1,185✔
2326
    default:
×
2327
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
2328
      code = TSDB_CODE_INVALID_PARA;
×
2329
      goto _error;
×
2330
  }
2331

2332
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
7,464✔
2333
                                         NULL, optrDefaultGetNextExtFn, NULL);
2334

2335
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
7,464✔
2336
  *pOptrInfo = pOperator;
7,464✔
2337
  return TSDB_CODE_SUCCESS;
7,464✔
2338

2339
_error:
×
2340
  if (pInfo != NULL) {
×
2341
    destroyDynQueryCtrlOperator(pInfo);
×
2342
  }
2343
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
2344
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
2345
  pTaskInfo->code = code;
×
2346
  return code;
×
2347
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc