• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3780

31 Mar 2025 08:30AM UTC coverage: 11.99% (-22.0%) from 34.001%
#3780

push

travis-ci

happyguoxy
test:add case 2

40223 of 491333 branches covered (8.19%)

Branch coverage included in aggregate %.

70904 of 435487 relevant lines covered (16.28%)

1596.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
×
41
  SStbJoinTableList* pNext = NULL;
×
42
  
43
  while (pListHead) {
×
44
    taosMemoryFree(pListHead->pLeftVg);
×
45
    taosMemoryFree(pListHead->pLeftUid);
×
46
    taosMemoryFree(pListHead->pRightVg);
×
47
    taosMemoryFree(pListHead->pRightUid);
×
48
    pNext = pListHead->pNext;
×
49
    taosMemoryFree(pListHead);
×
50
    pListHead = pNext;
×
51
  }
52
}
×
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
×
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
×
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
×
60
    if (pStbJoin->ctx.prev.leftHash) {
×
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
×
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
×
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
×
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
×
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
×
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
×
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
×
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
×
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
×
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
×
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
×
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
×
81
}
×
82

83
void destroyOrgTbInfo(void *info) {
×
84
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
×
85
  if (pOrgTbInfo) {
×
86
    taosArrayDestroy(pOrgTbInfo->colMap);
×
87
  }
88
}
×
89

90
void freeUseDbOutput(void* pOutput) {
×
91
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
×
92
  if (NULL == pOutput) {
×
93
    return;
×
94
  }
95

96
  if (pOut->dbVgroup) {
×
97
    freeVgInfo(pOut->dbVgroup);
×
98
  }
99
  taosMemFree(pOut);
×
100
}
101

102
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
×
103
  if (pVtbScan->dbName) {
×
104
    taosMemoryFreeClear(pVtbScan->dbName);
×
105
  }
106
  if (pVtbScan->childTableList) {
×
107
    taosArrayDestroy(pVtbScan->childTableList);
×
108
  }
109
  if (pVtbScan->readColList) {
×
110
    taosArrayDestroy(pVtbScan->readColList);
×
111
  }
112
  if (pVtbScan->dbVgInfoMap) {
×
113
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
×
114
    taosHashCleanup(pVtbScan->dbVgInfoMap);
×
115
  }
116
  if (pVtbScan->orgTbVgColMap) {
×
117
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
118
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
119
  }
120
  if (pVtbScan->pRsp) {
×
121
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
122
    taosMemoryFreeClear(pVtbScan->pRsp);
×
123
  }
124
}
×
125

126
static void destroyDynQueryCtrlOperator(void* param) {
×
127
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
×
128

129
  switch (pDyn->qType) {
×
130
    case DYN_QTYPE_STB_HASH:
×
131
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
×
132
      break;
×
133
    case DYN_QTYPE_VTB_SCAN:
×
134
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
×
135
      break;
×
136
    default:
×
137
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
138
      break;
×
139
  }
140

141
  taosMemoryFreeClear(param);
×
142
}
×
143

144
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
145
  if (batchFetch) {
×
146
    return true;
×
147
  }
148
  
149
  if (rightTable) {
×
150
    return pPost->rightCurrUid == pPost->rightNextUid;
×
151
  }
152

153
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
×
154

155
  return (NULL == num) ? false : true;
×
156
}
157

158
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
×
159
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
×
160
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
×
161
  SStbJoinTableList*         pNode = pPrev->pListHead;
×
162
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
×
163
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
×
164
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
×
165
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
×
166
  int64_t                    readIdx = pNode->readIdx + 1;
×
167
  int64_t                    rightPrevUid = pPost->rightCurrUid;
×
168

169
  pPost->leftCurrUid = *leftUid;
×
170
  pPost->rightCurrUid = *rightUid;
×
171

172
  pPost->leftVgId = *leftVgId;
×
173
  pPost->rightVgId = *rightVgId;
×
174

175
  while (true) {
176
    if (readIdx < pNode->uidNum) {
×
177
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
×
178
      break;
×
179
    }
180
    
181
    pNode = pNode->pNext;
×
182
    if (NULL == pNode) {
×
183
      pPost->rightNextUid = 0;
×
184
      break;
×
185
    }
186
    
187
    rightUid = pNode->pRightUid;
×
188
    readIdx = 0;
×
189
  }
190

191
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
×
192
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
×
193

194
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
×
195
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
196
    pStbJoin->execInfo.rightCacheNum++;
×
197
  }  
198

199
  return TSDB_CODE_SUCCESS;
×
200
}
201

202

203
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
×
204
  int32_t code = TSDB_CODE_SUCCESS;
×
205
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
206
  if (NULL == *ppRes) {
×
207
    code = terrno;
×
208
    freeOperatorParam(pChild, OP_GET_PARAM);
×
209
    return code;
×
210
  }
211
  if (pChild) {
×
212
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
213
    if (NULL == (*ppRes)->pChildren) {
×
214
      code = terrno;
×
215
      freeOperatorParam(pChild, OP_GET_PARAM);
×
216
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
217
      *ppRes = NULL;
×
218
      return code;
×
219
    }
220
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
×
221
      code = terrno;
×
222
      freeOperatorParam(pChild, OP_GET_PARAM);
×
223
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
224
      *ppRes = NULL;
×
225
      return code;
×
226
    }
227
  } else {
228
    (*ppRes)->pChildren = NULL;
×
229
  }
230

231
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
×
232
  if (NULL == pGc) {
×
233
    code = terrno;
×
234
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
235
    *ppRes = NULL;
×
236
    return code;
×
237
  }
238

239
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
×
240
  pGc->downstreamIdx = downstreamIdx;
×
241
  pGc->vgId = vgId;
×
242
  pGc->tbUid = tbUid;
×
243
  pGc->needCache = needCache;
×
244

245
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
246
  (*ppRes)->downstreamIdx = downstreamIdx;
×
247
  (*ppRes)->value = pGc;
×
248
  (*ppRes)->reUse = false;
×
249

250
  return TSDB_CODE_SUCCESS;
×
251
}
252

253

254
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
255
  int32_t code = TSDB_CODE_SUCCESS;
×
256
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
257
  if (NULL == *ppRes) {
×
258
    return terrno;
×
259
  }
260
  (*ppRes)->pChildren = NULL;
×
261

262
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
263
  if (NULL == pGc) {
×
264
    code = terrno;
×
265
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
266
    return code;
×
267
  }
268

269
  pGc->downstreamIdx = downstreamIdx;
×
270
  pGc->vgId = vgId;
×
271
  pGc->tbUid = tbUid;
×
272

273
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
274
  (*ppRes)->downstreamIdx = downstreamIdx;
×
275
  (*ppRes)->value = pGc;
×
276
  (*ppRes)->reUse = false;
×
277

278
  return TSDB_CODE_SUCCESS;
×
279
}
280

281

282
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
×
283
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
284
  if (NULL == *ppRes) {
×
285
    return terrno;
×
286
  }
287
  (*ppRes)->pChildren = NULL;
×
288
  
289
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
290
  if (NULL == pExc) {
×
291
    return terrno;
×
292
  }
293

294
  pExc->multiParams = false;
×
295
  pExc->basic.vgId = *pVgId;
×
296
  pExc->basic.tableSeq = true;
×
297
  pExc->basic.isVtbRefScan = false;
×
298
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
299
  pExc->basic.colMap = NULL;
×
300
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
301
  if (NULL == pExc->basic.uidList) {
×
302
    taosMemoryFree(pExc);
×
303
    return terrno;
×
304
  }
305
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
×
306
    taosArrayDestroy(pExc->basic.uidList);
×
307
    taosMemoryFree(pExc);
×
308
    return terrno;
×
309
  }
310

311
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
312
  (*ppRes)->downstreamIdx = downstreamIdx;
×
313
  (*ppRes)->value = pExc;
×
314
  (*ppRes)->reUse = false;
×
315

316
  return TSDB_CODE_SUCCESS;
×
317
}
318

319
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
×
320
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
321
  if (NULL == *ppRes) {
×
322
    return terrno;
×
323
  }
324
  (*ppRes)->pChildren = NULL;
×
325
  
326
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
×
327
  if (NULL == pExc) {
×
328
    taosMemoryFreeClear(*ppRes);
×
329
    return terrno;
×
330
  }
331

332
  pExc->multiParams = true;
×
333
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
334
  if (NULL == pExc->pBatchs) {
×
335
    taosMemoryFree(pExc);
×
336
    taosMemoryFreeClear(*ppRes);
×
337
    return terrno;
×
338
  }
339
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
×
340
  
341
  SExchangeOperatorBasicParam basic;
342
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
343

344
  int32_t iter = 0;
×
345
  void* p = NULL;
×
346
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
347
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
×
348
    SArray* pUidList = *(SArray**)p;
×
349
    basic.vgId = *pVgId;
×
350
    basic.uidList = pUidList;
×
351
    basic.colMap = NULL;
×
352
    basic.tableSeq = false;
×
353
    basic.isVtbRefScan = false;
×
354
    
355
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
×
356

357
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
×
358
    *(SArray**)p = NULL;
×
359
  }
360

361
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
362
  (*ppRes)->downstreamIdx = downstreamIdx;
×
363
  (*ppRes)->value = pExc;
×
364
  (*ppRes)->reUse = false;
×
365

366
  return TSDB_CODE_SUCCESS;
×
367
}
368

369
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
×
370
  int32_t                      code = TSDB_CODE_SUCCESS;
×
371
  int32_t                      lino = 0;
×
372
  SExchangeOperatorParam*      pExc = NULL;
×
373
  SExchangeOperatorBasicParam* basic = NULL;
×
374

375
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
376
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
377
  (*ppRes)->pChildren = NULL;
×
378

379
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
380
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno);
×
381

382
  pExc->multiParams = false;
×
383

384
  basic = &pExc->basic;
×
385
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
386

387
  basic->vgId = pMap->vgId;
×
388
  basic->tableSeq = false;
×
389
  basic->isVtbRefScan = true;
×
390
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
391
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno);
×
392
  basic->colMap->vgId = pMap->vgId;
×
393
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
394
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
×
395
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno);
×
396

397
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
×
398
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno);
×
399

400
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
401
  (*ppRes)->downstreamIdx = downstreamIdx;
×
402
  (*ppRes)->value = pExc;
×
403
  (*ppRes)->reUse = true;
×
404

405
  return TSDB_CODE_SUCCESS;
×
406

407
_return:
×
408
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
409
  taosMemoryFreeClear(*ppRes);
×
410
  if (basic) {
×
411
    if (basic->colMap) {
×
412
      taosArrayDestroy(basic->colMap->colMap);
×
413
      taosMemoryFreeClear(basic->colMap);
×
414
    }
415
    if (basic->uidList) {
×
416
      taosArrayDestroy(basic->uidList);
×
417
    }
418
    taosMemoryFreeClear(basic);
×
419
  }
420
  taosMemoryFreeClear(pExc);
×
421
  return code;
×
422
}
423

424
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
×
425
  int32_t code = TSDB_CODE_SUCCESS;
×
426
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
427
  if (NULL == *ppRes) {
×
428
    code = terrno;
×
429
    return code;
×
430
  }
431
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
432
  if (NULL == (*ppRes)->pChildren) {
×
433
    code = terrno;
×
434
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
435
    *ppRes = NULL;
×
436
    return code;
×
437
  }
438
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
×
439
    code = terrno;
×
440
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
441
    *ppRes = NULL;
×
442
    return code;
×
443
  }
444
  *ppChild0 = NULL;
×
445
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
×
446
    code = terrno;
×
447
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
448
    *ppRes = NULL;
×
449
    return code;
×
450
  }
451
  *ppChild1 = NULL;
×
452
  
453
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
×
454
  if (NULL == pJoin) {
×
455
    code = terrno;
×
456
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
457
    *ppRes = NULL;
×
458
    return code;
×
459
  }
460

461
  pJoin->initDownstream = initParam;
×
462
  
463
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
464
  (*ppRes)->value = pJoin;
×
465
  (*ppRes)->reUse = false;
×
466

467
  return TSDB_CODE_SUCCESS;
×
468
}
469

470
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
471
  int32_t code = TSDB_CODE_SUCCESS;
×
472
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
473
  if (NULL == *ppRes) {
×
474
    code = terrno;
×
475
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
476
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
477
    return code;
×
478
  }
479
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
480
  if (NULL == *ppRes) {
×
481
    code = terrno;
×
482
    taosMemoryFreeClear(*ppRes);
×
483
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
484
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
485
    return code;
×
486
  }
487
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
488
    code = terrno;
×
489
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
490
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
491
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
492
    *ppRes = NULL;
×
493
    return code;
×
494
  }
495
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
496
    code = terrno;
×
497
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
498
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
499
    *ppRes = NULL;
×
500
    return code;
×
501
  }
502
  
503
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
504
  (*ppRes)->value = NULL;
×
505
  (*ppRes)->reUse = false;
×
506

507
  return TSDB_CODE_SUCCESS;
×
508
}
509

510
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
×
511
  int32_t code = TSDB_CODE_SUCCESS;
×
512
  int32_t vgNum = tSimpleHashGetSize(pVg);
×
513
  if (vgNum <= 0 || vgNum > 1) {
×
514
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
515
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
516
  }
517

518
  int32_t iter = 0;
×
519
  void* p = NULL;
×
520
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
521
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
×
522
    SArray* pUidList = *(SArray**)p;
×
523

524
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
×
525
    if (code) {
×
526
      return code;
×
527
    }
528
    taosArrayDestroy(pUidList);
×
529
    *(SArray**)p = NULL;
×
530
  }
531
  
532
  return TSDB_CODE_SUCCESS;
×
533
}
534

535

536
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
537
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
538
  if (NULL == pUidList) {
×
539
    return terrno;
×
540
  }
541
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
542
    return terrno;
×
543
  }
544

545
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
546
  taosArrayDestroy(pUidList);
×
547
  if (code) {
×
548
    return code;
×
549
  }
550
  
551
  return TSDB_CODE_SUCCESS;
×
552
}
553

554
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
×
555
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
×
556
  SOperatorParam*             pSrcParam0 = NULL;
×
557
  SOperatorParam*             pSrcParam1 = NULL;
×
558
  SOperatorParam*             pGcParam0 = NULL;
×
559
  SOperatorParam*             pGcParam1 = NULL;  
×
560
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
×
561
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
×
562
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
×
563
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
×
564
  int32_t                     code = TSDB_CODE_SUCCESS;
×
565

566
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
×
567
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
568

569
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
×
570
  
571
  if (pInfo->stbJoin.basic.batchFetch) {
×
572
    if (pPrev->leftHash) {
×
573
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
×
574
      if (TSDB_CODE_SUCCESS == code) {
×
575
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
×
576
      }
577
      if (TSDB_CODE_SUCCESS == code) {
×
578
        tSimpleHashCleanup(pPrev->leftHash);
×
579
        tSimpleHashCleanup(pPrev->rightHash);
×
580
        pPrev->leftHash = NULL;
×
581
        pPrev->rightHash = NULL;
×
582
      }
583
    }
584
  } else {
585
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
×
586
    if (TSDB_CODE_SUCCESS == code) {
×
587
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
×
588
    }
589
  }
590

591
  bool initParam = pSrcParam0 ? true : false;
×
592
  if (TSDB_CODE_SUCCESS == code) {
×
593
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
×
594
    pSrcParam0 = NULL;
×
595
  }
596
  if (TSDB_CODE_SUCCESS == code) {
×
597
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
×
598
    pSrcParam1 = NULL;
×
599
  }
600
  if (TSDB_CODE_SUCCESS == code) {
×
601
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
×
602
  }
603
  if (TSDB_CODE_SUCCESS != code) {
×
604
    if (pSrcParam0) {
×
605
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
606
    }
607
    if (pSrcParam1) {
×
608
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
609
    }
610
    if (pGcParam0) {
×
611
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
612
    }
613
    if (pGcParam1) {
×
614
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
615
    }
616
    if (*ppParam) {
×
617
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
618
      *ppParam = NULL;
×
619
    }
620
  }
621
  
622
  return code;
×
623
}
624

625
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
626
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
627
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
628
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
×
629
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
×
630
  SOperatorParam*            pParam = NULL;
×
631
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
×
632
  if (TSDB_CODE_SUCCESS != code) {
×
633
    pOperator->pTaskInfo->code = code;
×
634
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
635
  }
636

637
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
×
638
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
×
639
  if (*ppRes && (code == 0)) {
×
640
    code = blockDataCheck(*ppRes);
×
641
    if (code) {
×
642
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
643
      pOperator->pTaskInfo->code = code;
×
644
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
645
    }
646
    pPost->isStarted = true;
×
647
    pStbJoin->execInfo.postBlkNum++;
×
648
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
×
649
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
×
650
  } else {
651
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
×
652
  }
653
}
×
654

655

656
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
657
  SOperatorParam* pGcParam = NULL;
×
658
  SOperatorParam* pMergeJoinParam = NULL;
×
659
  int32_t         downstreamId = leftTable ? 0 : 1;
×
660
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
661
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
662

663
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
664

665
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
666
  if (TSDB_CODE_SUCCESS != code) {
×
667
    return code;
×
668
  }
669
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
670
  if (TSDB_CODE_SUCCESS != code) {
×
671
    return code;
×
672
  }
673

674
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
675
}
676

677
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
×
678
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
×
679
  int32_t code = 0;
×
680
  
681
  pPost->isStarted = false;
×
682
  
683
  if (pStbJoin->basic.batchFetch) {
×
684
    return TSDB_CODE_SUCCESS;
×
685
  }
686
  
687
  if (pPost->leftNeedCache) {
×
688
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
689
    if (num && --(*num) <= 0) {
×
690
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
691
      if (code) {
×
692
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
693
        QRY_ERR_RET(code);
×
694
      }
695
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
696
    }
697
  }
698
  
699
  if (!pPost->rightNeedCache) {
×
700
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
701
    if (NULL != v) {
×
702
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
703
      if (code) {
×
704
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
705
        QRY_ERR_RET(code);
×
706
      }
707
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
708
    }
709
  }
710

711
  return TSDB_CODE_SUCCESS;
×
712
}
713

714

715
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
716
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
717
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
×
718
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
×
719

720
  if (!pPost->isStarted) {
×
721
    return TSDB_CODE_SUCCESS;
×
722
  }
723
  
724
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
×
725
  
726
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
×
727
  if (NULL == *ppRes) {
×
728
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
×
729
    pPrev->pListHead->readIdx++;
×
730
  } else {
731
    pInfo->stbJoin.execInfo.postBlkNum++;
×
732
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
733
  }
734

735
  return TSDB_CODE_SUCCESS;
×
736
}
737

738
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
739
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
×
740
  if (NULL == ppArray) {
×
741
    SArray* pArray = taosArrayInit(10, valSize);
×
742
    if (NULL == pArray) {
×
743
      return terrno;
×
744
    }
745
    if (NULL == taosArrayPush(pArray, pVal)) {
×
746
      taosArrayDestroy(pArray);
×
747
      return terrno;
×
748
    }
749
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
×
750
      taosArrayDestroy(pArray);      
×
751
      return terrno;
×
752
    }
753
    return TSDB_CODE_SUCCESS;
×
754
  }
755

756
  if (NULL == taosArrayPush(*ppArray, pVal)) {
×
757
    return terrno;
×
758
  }
759
  
760
  return TSDB_CODE_SUCCESS;
×
761
}
762

763
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
764
  int32_t code = TSDB_CODE_SUCCESS;
×
765
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
×
766
  if (NULL == pNum) {
×
767
    uint32_t n = 1;
×
768
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
×
769
    if (code) {
×
770
      return code;
×
771
    }
772
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
×
773
    if (code) {
×
774
      return code;
×
775
    }
776
    return TSDB_CODE_SUCCESS;
×
777
  }
778

779
  switch (*pNum) {
×
780
    case 0:
×
781
      break;
×
782
    case UINT32_MAX:
×
783
      *pNum = 0;
×
784
      break;
×
785
    default:
×
786
      if (1 == (*pNum)) {
×
787
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
788
        if (code) {
×
789
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
790
          QRY_ERR_RET(code);
×
791
        }
792
      }
793
      (*pNum)++;
×
794
      break;
×
795
  }
796
  
797
  return TSDB_CODE_SUCCESS;
×
798
}
799

800

801
static void freeStbJoinTableList(SStbJoinTableList* pList) {
×
802
  if (NULL == pList) {
×
803
    return;
×
804
  }
805
  taosMemoryFree(pList->pLeftVg);
×
806
  taosMemoryFree(pList->pLeftUid);
×
807
  taosMemoryFree(pList->pRightVg);
×
808
  taosMemoryFree(pList->pRightUid);
×
809
  taosMemoryFree(pList);
×
810
}
811

812
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
×
813
  int32_t code = TSDB_CODE_SUCCESS;
×
814
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
×
815
  if (NULL == pNew) {
×
816
    return terrno;
×
817
  }
818
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
×
819
  if (NULL == pNew->pLeftVg) {
×
820
    code = terrno;
×
821
    freeStbJoinTableList(pNew);
×
822
    return code;
×
823
  }
824
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
×
825
  if (NULL == pNew->pLeftUid) {
×
826
    code = terrno;
×
827
    freeStbJoinTableList(pNew);
×
828
    return code;
×
829
  }
830
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
×
831
  if (NULL == pNew->pRightVg) {
×
832
    code = terrno;
×
833
    freeStbJoinTableList(pNew);
×
834
    return code;
×
835
  }
836
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
×
837
  if (NULL == pNew->pRightUid) {
×
838
    code = terrno;
×
839
    freeStbJoinTableList(pNew);
×
840
    return code;
×
841
  }
842

843
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
×
844
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
×
845
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
×
846
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
×
847

848
  pNew->readIdx = 0;
×
849
  pNew->uidNum = rows;
×
850
  pNew->pNext = NULL;
×
851
  
852
  if (pCtx->pListTail) {
×
853
    pCtx->pListTail->pNext = pNew;
×
854
    pCtx->pListTail = pNew;
×
855
  } else {
856
    pCtx->pListHead = pNew;
×
857
    pCtx->pListTail= pNew;
×
858
  }
859

860
  return TSDB_CODE_SUCCESS;
×
861
}
862

863
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
864
  int32_t                    code = TSDB_CODE_SUCCESS;
×
865
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
866
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
867
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
×
868
  if (NULL == pVg0) {
×
869
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
870
  }
871
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
×
872
  if (NULL == pVg1) {
×
873
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
874
  }
875
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
×
876
  if (NULL == pUid0) {
×
877
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
878
  }
879
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
×
880
  if (NULL == pUid1) {
×
881
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
882
  }
883

884
  if (pStbJoin->basic.batchFetch) {
×
885
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
886
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
×
887
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
×
888
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
×
889
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
×
890

891
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
×
892
      if (TSDB_CODE_SUCCESS != code) {
×
893
        break;
×
894
      }
895
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
×
896
      if (TSDB_CODE_SUCCESS != code) {
×
897
        break;
×
898
      }
899
    }
900
  } else {
901
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
902
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
×
903
    
904
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
×
905
      if (TSDB_CODE_SUCCESS != code) {
×
906
        break;
×
907
      }
908
    }
909
  }
910

911
  if (TSDB_CODE_SUCCESS == code) {
×
912
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
×
913
    if (TSDB_CODE_SUCCESS == code) {
×
914
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
×
915
    }
916
  }
917

918
_return:
×
919

920
  if (TSDB_CODE_SUCCESS != code) {
×
921
    pOperator->pTaskInfo->code = code;
×
922
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
923
  }
924
}
×
925

926

927
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
×
928
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
929
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
930

931
  if (pStbJoin->basic.batchFetch) {
×
932
    return;
×
933
  }
934

935
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
×
936
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
×
937
    return;
×
938
  }
939

940
  uint64_t* pUid = NULL;
×
941
  int32_t iter = 0;
×
942
  int32_t code = 0;
×
943
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
944
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
945
    if (code) {
×
946
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
947
    }
948
  }
949

950
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
951
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
952

953
/*
954
  // debug only
955
  iter = 0;
956
  uint32_t* num = NULL;
957
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
958
    A S S E R T(*num > 1);
959
  }
960
*/  
961
}
962

963
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
×
964
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
965
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
966

967
  while (true) {
×
968
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
969
    if (NULL == pBlock) {
×
970
      break;
×
971
    }
972

973
    pStbJoin->execInfo.prevBlkNum++;
×
974
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
×
975
    
976
    doBuildStbJoinTableHash(pOperator, pBlock);
×
977
  }
978

979
  postProcessStbJoinTableHash(pOperator);
×
980

981
  pStbJoin->ctx.prev.joinBuild = true;
×
982
}
×
983

984
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
985
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
986
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
987
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
×
988
  SStbJoinTableList*         pNode = pPrev->pListHead;
×
989

990
  while (pNode) {
×
991
    if (pNode->readIdx >= pNode->uidNum) {
×
992
      pPrev->pListHead = pNode->pNext;
×
993
      freeStbJoinTableList(pNode);
×
994
      pNode = pPrev->pListHead;
×
995
      continue;
×
996
    }
997
    
998
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
×
999
    if (*ppRes) {
×
1000
      return TSDB_CODE_SUCCESS;
×
1001
    }
1002

1003
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
×
1004
    pPrev->pListHead->readIdx++;
×
1005
  }
1006

1007
  *ppRes = NULL;
×
1008
  setOperatorCompleted(pOperator);
×
1009

1010
  return TSDB_CODE_SUCCESS;
×
1011
}
1012

1013
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
×
1014
  if (pBlock) {
×
1015
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
×
1016
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
×
1017
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
×
1018

1019
      for (int i = pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
×
1020
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
×
1021
        if (pSlot == NULL) {
×
1022
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1023
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1024
        }
1025
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
×
1026
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
×
1027
        if (code != TSDB_CODE_SUCCESS) {
×
1028
          return code;
×
1029
        }
1030
        code = blockDataAppendColInfo(pBlock, &colInfo);
×
1031
        if (code != TSDB_CODE_SUCCESS) {
×
1032
          return code;
×
1033
        }
1034
      }
1035
    } else {
1036
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1037
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1038
    }
1039
  }
1040
  return TSDB_CODE_SUCCESS;
×
1041
}
1042

1043
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
1044
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1045
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1046
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
1047

1048
  QRY_PARAM_CHECK(pRes);
×
1049
  if (pOperator->status == OP_EXEC_DONE) {
×
1050
    return code;
×
1051
  }
1052

1053
  int64_t st = 0;
×
1054
  if (pOperator->cost.openCost == 0) {
×
1055
    st = taosGetTimestampUs();
×
1056
  }
1057

1058
  if (!pStbJoin->ctx.prev.joinBuild) {
×
1059
    buildStbJoinTableList(pOperator);
×
1060
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
×
1061
      setOperatorCompleted(pOperator);
×
1062
      goto _return;
×
1063
    }
1064
  }
1065

1066
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
×
1067
  if (*pRes) {
×
1068
    goto _return;
×
1069
  }
1070

1071
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
×
1072

1073
_return:
×
1074
  if (pOperator->cost.openCost == 0) {
×
1075
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
1076
  }
1077

1078
  if (code) {
×
1079
    qError("%s failed since %s", __func__, tstrerror(code));
×
1080
    pOperator->pTaskInfo->code = code;
×
1081
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1082
  } else {
1083
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
×
1084
  }
1085
  return code;
×
1086
}
1087

1088
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
×
1089
  int32_t                   code = TSDB_CODE_SUCCESS;
×
1090
  int32_t                   lino = 0;
×
1091
  SVTableScanOperatorParam* pVScan = NULL;
×
1092
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
1093
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
1094

1095
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
1096
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
×
1097

1098
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
×
1099
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno);
×
1100
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
×
1101
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno);
×
1102
  pVScan->uid = uid;
×
1103

1104
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
×
1105
  (*ppRes)->downstreamIdx = 0;
×
1106
  (*ppRes)->value = pVScan;
×
1107
  (*ppRes)->reUse = false;
×
1108

1109
  return TSDB_CODE_SUCCESS;
×
1110
_return:
×
1111
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1112
  if (pVScan) {
×
1113
    taosArrayDestroy(pVScan->pOpParamArray);
×
1114
    taosMemoryFreeClear(pVScan);
×
1115
  }
1116
  if (*ppRes) {
×
1117
    taosArrayDestroy((*ppRes)->pChildren);
×
1118
    taosMemoryFreeClear(*ppRes);
×
1119
  }
1120
  return code;
×
1121
}
1122

1123
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
1124
  int32_t                    lino = 0;
×
1125
  SOperatorInfo*             operator=(SOperatorInfo*) param;
×
1126
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
×
1127

1128
  if (TSDB_CODE_SUCCESS != code) {
×
1129
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1130
    if (operator->pTaskInfo->code != code) {
×
1131
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1132
             tstrerror(operator->pTaskInfo->code));
1133
    } else {
1134
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1135
    }
1136
    goto _return;
×
1137
  }
1138

1139
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
×
1140
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno);
×
1141

1142
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
×
1143
  QUERY_CHECK_CODE(code, lino, _return);
×
1144

1145
  taosMemoryFreeClear(pMsg->pData);
×
1146

1147
  code = tsem_post(&pScanResInfo->vtbScan.ready);
×
1148
  QUERY_CHECK_CODE(code, lino, _return);
×
1149

1150
  return code;
×
1151
_return:
×
1152
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1153
  return code;
×
1154
}
1155

1156
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SReadHandle* pHandle, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
×
1157
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1158
  int32_t                    lino = 0;
×
1159
  char*                      buf1 = NULL;
×
1160
  SUseDbReq*                 pReq = NULL;
×
1161
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
×
1162

1163
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
×
1164
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
×
1165
  code = tNameGetFullDbName(name, pReq->db);
×
1166
  QUERY_CHECK_CODE(code, lino, _return);
×
1167
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
×
1168
  buf1 = taosMemoryCalloc(1, contLen);
×
1169
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
×
1170
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
×
1171
  if (tempRes < 0) {
×
1172
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1173
  }
1174

1175
  // send the fetch remote task result request
1176
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
1177
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
×
1178

1179
  pMsgSendInfo->param = pOperator;
×
1180
  pMsgSendInfo->msgInfo.pData = buf1;
×
1181
  pMsgSendInfo->msgInfo.len = contLen;
×
1182
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
×
1183
  pMsgSendInfo->fp = dynProcessUseDbRsp;
×
1184
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
×
1185

1186
  code = asyncSendMsgToServer(pHandle->pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
×
1187
  QUERY_CHECK_CODE(code, lino, _return);
×
1188

1189
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
×
1190
  QUERY_CHECK_CODE(code, lino, _return);
×
1191

1192
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
×
1193
  QUERY_CHECK_CODE(code, lino, _return);
×
1194

1195
_return:
×
1196
  if (code) {
×
1197
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1198
     taosMemoryFree(buf1);
×
1199
  }
1200
  taosMemoryFree(pReq);
×
1201
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
×
1202
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
×
1203
  return code;
×
1204
}
1205

1206
int dynVgInfoComp(const void* lp, const void* rp) {
×
1207
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
×
1208
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
×
1209
  if (pLeft->hashBegin < pRight->hashBegin) {
×
1210
    return -1;
×
1211
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1212
    return 1;
×
1213
  }
1214

1215
  return 0;
×
1216
}
1217

1218
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
×
1219
  if (NULL == dbInfo) {
×
1220
    return TSDB_CODE_SUCCESS;
×
1221
  }
1222

1223
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
×
1224
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
×
1225
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
×
1226
    if (NULL == dbInfo->vgArray) {
×
1227
      return terrno;
×
1228
    }
1229

1230
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
×
1231
    while (pIter) {
×
1232
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
×
1233
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1234
        return terrno;
×
1235
      }
1236

1237
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
×
1238
    }
1239

1240
    taosArraySort(dbInfo->vgArray, sort_func);
×
1241
  }
1242

1243
  return TSDB_CODE_SUCCESS;
×
1244
}
1245

1246
int32_t dynHashValueComp(void const* lp, void const* rp) {
×
1247
  uint32_t*    key = (uint32_t*)lp;
×
1248
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
1249

1250
  if (*key < pVg->hashBegin) {
×
1251
    return -1;
×
1252
  } else if (*key > pVg->hashEnd) {
×
1253
    return 1;
×
1254
  }
1255

1256
  return 0;
×
1257
}
1258

1259
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
×
1260
  int32_t code = 0;
×
1261
  int32_t lino = 0;
×
1262
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
×
1263
  QUERY_CHECK_CODE(code, lino, _return);
×
1264

1265
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
×
1266
  if (vgNum <= 0) {
×
1267
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1268
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1269
  }
1270

1271
  SVgroupInfo* vgInfo = NULL;
×
1272
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1273
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
×
1274
  int32_t offset = (int32_t)strlen(tbFullName);
×
1275

1276
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
×
1277
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
×
1278
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
1279

1280
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
×
1281
  if (NULL == vgInfo) {
×
1282
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1283
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1284
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1285
  }
1286

1287
  *vgId = vgInfo->vgId;
×
1288

1289
_return:
×
1290
  return code;
×
1291
}
1292

1293
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
×
1294
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1295
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1296
  SArray *                   pColList = pVtbScan->readColList;
×
1297
  if (pVtbScan->scanAllCols) {
×
1298
    return true;
×
1299
  }
1300
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
×
1301
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
×
1302
      return true;
×
1303
    }
1304
  }
1305
  return false;
×
1306
}
1307

1308
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
×
1309
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1310
  int32_t                    line = 0;
×
1311
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1312
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1313
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1314
  SReadHandle*               pHandle = &pVtbScan->readHandle;
×
1315
  SUseDbOutput*              output = NULL;
×
1316
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
×
1317

1318
  QRY_PARAM_CHECK(dbVgInfo);
×
1319

1320
  if (find == NULL) {
×
1321
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
×
1322
    code = buildDbVgInfoMap(pOperator, pHandle, name, pTaskInfo, output);
×
1323
    QUERY_CHECK_CODE(code, line, _return);
×
1324
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
×
1325
    QUERY_CHECK_CODE(code, line, _return);
×
1326
  } else {
1327
    output = *find;
×
1328
  }
1329

1330
  *dbVgInfo = output->dbVgroup;
×
1331
  return code;
×
1332
_return:
×
1333
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1334
  freeUseDbOutput(output);
×
1335
  return code;
×
1336
}
1337

1338
int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
1339
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1340
  int32_t                    line = 0;
×
1341
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1342
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1343
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1344
  SReadHandle*               pHandle = &pVtbScan->readHandle;
×
1345
  SMetaReader                mr = {0};
×
1346
  SDBVgInfo*                 dbVgInfo = NULL;
×
1347

1348
  QRY_PARAM_CHECK(pRes);
×
1349
  if (pOperator->status == OP_EXEC_DONE) {
×
1350
    return code;
×
1351
  }
1352

1353
  int64_t st = 0;
×
1354
  if (pOperator->cost.openCost == 0) {
×
1355
    st = taosGetTimestampUs();
×
1356
  }
1357

1358
  size_t num = taosArrayGetSize(pVtbScan->childTableList);
×
1359

1360
  // no child table, return
1361
  if (num == 0) {
×
1362
    setOperatorCompleted(pOperator);
×
1363
    return code;
×
1364
  }
1365

1366
  pVtbScan->orgTbVgColMap = taosHashInit(num * 64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1367
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno);
×
1368
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
1369

1370
  while (true) {
1371
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
×
1372
      code = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes);
×
1373
      QUERY_CHECK_CODE(code, line, _return);
×
1374
    } else {
1375
      uint64_t* id = taosArrayGet(pVtbScan->childTableList, pVtbScan->curTableIdx);
×
1376
      QUERY_CHECK_NULL(id, code, line, _return, terrno);
×
1377
      pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn);
×
1378
      code = pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id);
×
1379
      QUERY_CHECK_CODE(code, line, _return);
×
1380

1381
      for (int32_t j = 0; j < mr.me.colRef.nCols; j++) {
×
1382
        if (mr.me.colRef.pColRef[j].hasRef && colNeedScan(pOperator, mr.me.colRef.pColRef[j].id)) {
×
1383
          SName   name = {0};
×
1384
          char    dbFname[TSDB_DB_FNAME_LEN] = {0};
×
1385
          char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
×
1386

1387
          if (strncmp(mr.me.colRef.pColRef[j].refDbName, pVtbScan->dbName, strlen(pVtbScan->dbName)) != 0) {
×
1388
            QUERY_CHECK_CODE(code = TSDB_CODE_VTABLE_NOT_SUPPORT_CROSS_DB, line, _return);
×
1389
          }
1390
          toName(pInfo->vtbScan.acctId, mr.me.colRef.pColRef[j].refDbName, mr.me.colRef.pColRef[j].refTableName, &name);
×
1391
          code = getDbVgInfo(pOperator, &name, &dbVgInfo);
×
1392
          QUERY_CHECK_CODE(code, line, _return);
×
1393
          tNameGetFullDbName(&name, dbFname);
×
1394
          QUERY_CHECK_CODE(code, line, _return);
×
1395
          tNameGetFullTableName(&name, orgTbFName);
×
1396
          QUERY_CHECK_CODE(code, line, _return);
×
1397

1398
          void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
×
1399
          if (!pVal) {
×
1400
            SOrgTbInfo map = {0};
×
1401
            code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
×
1402
            QUERY_CHECK_CODE(code, line, _return);
×
1403
            tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
×
1404
            map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
×
1405
            QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno);
×
1406
            SColIdNameKV colIdNameKV = {0};
×
1407
            colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
×
1408
            tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
×
1409
            QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno);
×
1410
            code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
×
1411
            QUERY_CHECK_CODE(code, line, _return);
×
1412
          } else {
1413
            SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
×
1414
            SColIdNameKV colIdNameKV = {0};
×
1415
            colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
×
1416
            tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
×
1417
            QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno);
×
1418
          }
1419
        }
1420
      }
1421

1422
      pVtbScan->vtbScanParam = NULL;
×
1423
      code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, *id);
×
1424
      QUERY_CHECK_CODE(code, line, _return);
×
1425

1426
      void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
×
1427
      while (pIter != NULL) {
×
1428
        SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
×
1429
        SOperatorParam*  pExchangeParam = NULL;
×
1430
        code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
×
1431
        QUERY_CHECK_CODE(code, line, _return);
×
1432
        QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno);
×
1433
        pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
×
1434
      }
1435
      pHandle->api.metaReaderFn.clearReader(&mr);
×
1436

1437
      // reset downstream operator's status
1438
      pOperator->pDownstream[0]->status = OP_NOT_OPENED;
×
1439
      code = pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes);
×
1440
      QUERY_CHECK_CODE(code, line, _return);
×
1441
    }
1442

1443
    if (*pRes) {
×
1444
      // has result, still read data from this table.
1445
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
×
1446
      break;
×
1447
    } else {
1448
      // no result, read next table.
1449
      pVtbScan->curTableIdx++;
×
1450
      if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
×
1451
        setOperatorCompleted(pOperator);
×
1452
        break;
×
1453
      }
1454
    }
1455
  }
1456

1457
_return:
×
1458
  taosHashCleanup(pVtbScan->orgTbVgColMap);
×
1459
  pVtbScan->orgTbVgColMap = NULL;
×
1460
  if (pOperator->cost.openCost == 0) {
×
1461
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
1462
  }
1463

1464
  if (code) {
×
1465
    qError("%s failed since %s", __func__, tstrerror(code));
×
1466
    pOperator->pTaskInfo->code = code;
×
1467
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1468
  }
1469

1470
  return code;
×
1471
}
1472

1473
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
×
1474
  if (batchFetch) {
×
1475
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1476
    if (NULL == pPrev->leftHash) {
×
1477
      return terrno;
×
1478
    }
1479
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
1480
    if (NULL == pPrev->rightHash) {
×
1481
      return terrno;
×
1482
    }
1483
  } else {
1484
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1485
    if (NULL == pPrev->leftCache) {
×
1486
      return terrno;
×
1487
    }
1488
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1489
    if (NULL == pPrev->rightCache) {
×
1490
      return terrno;
×
1491
    }
1492
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1493
    if (NULL == pPrev->onceTable) {
×
1494
      return terrno;
×
1495
    }
1496
  }
1497

1498
  return TSDB_CODE_SUCCESS;
×
1499
}
1500

1501
static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorInfo* pInfo, SReadHandle* pHandle,
×
1502
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
1503
  int32_t      code = TSDB_CODE_SUCCESS;
×
1504
  int32_t      line = 0;
×
1505

1506
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
×
1507
  QUERY_CHECK_CODE(code, line, _return);
×
1508

1509
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
×
1510
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
×
1511
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
×
1512
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
×
1513
  pInfo->vtbScan.readHandle = *pHandle;
×
1514
  pInfo->vtbScan.curTableIdx = 0;
×
1515
  pInfo->vtbScan.lastTableIdx = -1;
×
1516
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
×
1517
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno);
×
1518

1519
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
×
1520
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno);
×
1521

1522
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
×
1523
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
×
1524
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno);
×
1525
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno);
×
1526
  }
1527

1528
  pInfo->vtbScan.childTableList = taosArrayInit(10, sizeof(uint64_t));
×
1529
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno);
×
1530
  code = pHandle->api.metaFn.getChildTableList(pHandle->vnode, pInfo->vtbScan.suid, pInfo->vtbScan.childTableList);
×
1531
  QUERY_CHECK_CODE(code, line, _return);
×
1532

1533
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1534
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno);
×
1535

1536
  return code;
×
1537
_return:
×
1538
  // no need to destroy array and hashmap allocated in this function,
1539
  // since the operator's destroy function will take care of it
1540
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1541
  return code;
×
1542
}
1543

1544
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
×
1545
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1546
                                       SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
1547
  QRY_PARAM_CHECK(pOptrInfo);
×
1548

1549
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1550
  int32_t                    line = 0;
×
1551
  __optr_fn_t                nextFp = NULL;
×
1552
  SOperatorInfo*             pOperator = NULL;
×
1553
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
×
1554
  if (pInfo == NULL) {
×
1555
    code = terrno;
×
1556
    goto _error;
×
1557
  }
1558

1559
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
1560
  if (pOperator == NULL) {
×
1561
    code = terrno;
×
1562
    goto _error;
×
1563
  }
1564

1565
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
×
1566

1567
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
×
1568
  if (TSDB_CODE_SUCCESS != code) {
×
1569
    goto _error;
×
1570
  }
1571

1572
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
×
1573
                  pInfo, pTaskInfo);
1574

1575
  pInfo->qType = pPhyciNode->qType;
×
1576
  switch (pInfo->qType) {
×
1577
    case DYN_QTYPE_STB_HASH:
×
1578
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
×
1579
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
×
1580
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
×
1581
      if (TSDB_CODE_SUCCESS != code) {
×
1582
        goto _error;
×
1583
      }
1584
      nextFp = seqStableJoin;
×
1585
      break;
×
1586
    case DYN_QTYPE_VTB_SCAN:
×
1587
      code = initVtbScanInfo(pOperator, pInfo, pHandle, pPhyciNode, pTaskInfo);
×
1588
      QUERY_CHECK_CODE(code, line, _error);
×
1589
      nextFp = vtbScan;
×
1590
      break;
×
1591
    default:
×
1592
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1593
      code = TSDB_CODE_INVALID_PARA;
×
1594
      goto _error;
×
1595
  }
1596

1597
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
×
1598
                                         NULL, optrDefaultGetNextExtFn, NULL);
1599

1600
  *pOptrInfo = pOperator;
×
1601
  return TSDB_CODE_SUCCESS;
×
1602

1603
_error:
×
1604
  if (pInfo != NULL) {
×
1605
    destroyDynQueryCtrlOperator(pInfo);
×
1606
  }
1607

1608
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1609
  pTaskInfo->code = code;
×
1610
  return code;
×
1611
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc