• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3809

01 Apr 2025 03:03AM UTC coverage: 34.048% (+0.02%) from 34.033%
#3809

push

travis-ci

happyguoxy
test:alter gcda dir

148452 of 599532 branches covered (24.76%)

Branch coverage included in aggregate %.

222312 of 489411 relevant lines covered (45.42%)

761122.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.94
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
1✔
41
  SStbJoinTableList* pNext = NULL;
1✔
42
  
43
  while (pListHead) {
1!
44
    taosMemoryFree(pListHead->pLeftVg);
×
45
    taosMemoryFree(pListHead->pLeftUid);
×
46
    taosMemoryFree(pListHead->pRightVg);
×
47
    taosMemoryFree(pListHead->pRightUid);
×
48
    pNext = pListHead->pNext;
×
49
    taosMemoryFree(pListHead);
×
50
    pListHead = pNext;
×
51
  }
52
}
1✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
1✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
1!
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
1!
60
    if (pStbJoin->ctx.prev.leftHash) {
1!
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
×
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
×
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
1!
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
×
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
×
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
×
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
×
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
×
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
×
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
×
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
×
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
1✔
81
}
1✔
82

83
void destroyOrgTbInfo(void *info) {
×
84
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
×
85
  if (pOrgTbInfo) {
×
86
    taosArrayDestroy(pOrgTbInfo->colMap);
×
87
  }
88
}
×
89

90
void freeUseDbOutput(void* pOutput) {
×
91
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
×
92
  if (NULL == pOutput) {
×
93
    return;
×
94
  }
95

96
  if (pOut->dbVgroup) {
×
97
    freeVgInfo(pOut->dbVgroup);
×
98
  }
99
  taosMemFree(pOut);
×
100
}
101

102
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
×
103
  if (pVtbScan->dbName) {
×
104
    taosMemoryFreeClear(pVtbScan->dbName);
×
105
  }
106
  if (pVtbScan->childTableList) {
×
107
    taosArrayDestroy(pVtbScan->childTableList);
×
108
  }
109
  if (pVtbScan->readColList) {
×
110
    taosArrayDestroy(pVtbScan->readColList);
×
111
  }
112
  if (pVtbScan->dbVgInfoMap) {
×
113
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
×
114
    taosHashCleanup(pVtbScan->dbVgInfoMap);
×
115
  }
116
  if (pVtbScan->orgTbVgColMap) {
×
117
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
118
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
119
  }
120
  if (pVtbScan->pRsp) {
×
121
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
122
    taosMemoryFreeClear(pVtbScan->pRsp);
×
123
  }
124
}
×
125

126
static void destroyDynQueryCtrlOperator(void* param) {
1✔
127
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
1✔
128

129
  switch (pDyn->qType) {
1!
130
    case DYN_QTYPE_STB_HASH:
1✔
131
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1✔
132
      break;
1✔
133
    case DYN_QTYPE_VTB_SCAN:
×
134
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
×
135
      break;
×
136
    default:
×
137
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
138
      break;
×
139
  }
140

141
  taosMemoryFreeClear(param);
1!
142
}
1✔
143

144
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
145
  if (batchFetch) {
4✔
146
    return true;
4✔
147
  }
148
  
149
  if (rightTable) {
×
150
    return pPost->rightCurrUid == pPost->rightNextUid;
×
151
  }
152

153
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
×
154

155
  return (NULL == num) ? false : true;
×
156
}
157

158
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
2✔
159
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
2✔
160
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
2✔
161
  SStbJoinTableList*         pNode = pPrev->pListHead;
2✔
162
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
2✔
163
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
2✔
164
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
2✔
165
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
2✔
166
  int64_t                    readIdx = pNode->readIdx + 1;
2✔
167
  int64_t                    rightPrevUid = pPost->rightCurrUid;
2✔
168

169
  pPost->leftCurrUid = *leftUid;
2✔
170
  pPost->rightCurrUid = *rightUid;
2✔
171

172
  pPost->leftVgId = *leftVgId;
2✔
173
  pPost->rightVgId = *rightVgId;
2✔
174

175
  while (true) {
176
    if (readIdx < pNode->uidNum) {
2✔
177
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
1✔
178
      break;
1✔
179
    }
180
    
181
    pNode = pNode->pNext;
1✔
182
    if (NULL == pNode) {
1!
183
      pPost->rightNextUid = 0;
1✔
184
      break;
1✔
185
    }
186
    
187
    rightUid = pNode->pRightUid;
×
188
    readIdx = 0;
×
189
  }
190

191
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
2!
192
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
2!
193

194
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
2!
195
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
196
    pStbJoin->execInfo.rightCacheNum++;
×
197
  }  
198

199
  return TSDB_CODE_SUCCESS;
2✔
200
}
201

202

203
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
4✔
204
  int32_t code = TSDB_CODE_SUCCESS;
4✔
205
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4!
206
  if (NULL == *ppRes) {
4!
207
    code = terrno;
×
208
    freeOperatorParam(pChild, OP_GET_PARAM);
×
209
    return code;
×
210
  }
211
  if (pChild) {
4✔
212
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
2✔
213
    if (NULL == (*ppRes)->pChildren) {
2!
214
      code = terrno;
×
215
      freeOperatorParam(pChild, OP_GET_PARAM);
×
216
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
217
      *ppRes = NULL;
×
218
      return code;
×
219
    }
220
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
4!
221
      code = terrno;
×
222
      freeOperatorParam(pChild, OP_GET_PARAM);
×
223
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
224
      *ppRes = NULL;
×
225
      return code;
×
226
    }
227
  } else {
228
    (*ppRes)->pChildren = NULL;
2✔
229
  }
230

231
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
4!
232
  if (NULL == pGc) {
4!
233
    code = terrno;
×
234
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
235
    *ppRes = NULL;
×
236
    return code;
×
237
  }
238

239
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
4✔
240
  pGc->downstreamIdx = downstreamIdx;
4✔
241
  pGc->vgId = vgId;
4✔
242
  pGc->tbUid = tbUid;
4✔
243
  pGc->needCache = needCache;
4✔
244

245
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
4✔
246
  (*ppRes)->downstreamIdx = downstreamIdx;
4✔
247
  (*ppRes)->value = pGc;
4✔
248
  (*ppRes)->reUse = false;
4✔
249

250
  return TSDB_CODE_SUCCESS;
4✔
251
}
252

253

254
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
255
  int32_t code = TSDB_CODE_SUCCESS;
×
256
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
257
  if (NULL == *ppRes) {
×
258
    return terrno;
×
259
  }
260
  (*ppRes)->pChildren = NULL;
×
261

262
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
263
  if (NULL == pGc) {
×
264
    code = terrno;
×
265
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
266
    return code;
×
267
  }
268

269
  pGc->downstreamIdx = downstreamIdx;
×
270
  pGc->vgId = vgId;
×
271
  pGc->tbUid = tbUid;
×
272

273
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
274
  (*ppRes)->downstreamIdx = downstreamIdx;
×
275
  (*ppRes)->value = pGc;
×
276
  (*ppRes)->reUse = false;
×
277

278
  return TSDB_CODE_SUCCESS;
×
279
}
280

281

282
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
×
283
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
284
  if (NULL == *ppRes) {
×
285
    return terrno;
×
286
  }
287
  (*ppRes)->pChildren = NULL;
×
288
  
289
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
290
  if (NULL == pExc) {
×
291
    return terrno;
×
292
  }
293

294
  pExc->multiParams = false;
×
295
  pExc->basic.vgId = *pVgId;
×
296
  pExc->basic.tableSeq = true;
×
297
  pExc->basic.isVtbRefScan = false;
×
298
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
299
  pExc->basic.colMap = NULL;
×
300
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
301
  if (NULL == pExc->basic.uidList) {
×
302
    taosMemoryFree(pExc);
×
303
    return terrno;
×
304
  }
305
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
×
306
    taosArrayDestroy(pExc->basic.uidList);
×
307
    taosMemoryFree(pExc);
×
308
    return terrno;
×
309
  }
310

311
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
312
  (*ppRes)->downstreamIdx = downstreamIdx;
×
313
  (*ppRes)->value = pExc;
×
314
  (*ppRes)->reUse = false;
×
315

316
  return TSDB_CODE_SUCCESS;
×
317
}
318

319
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
2✔
320
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
2!
321
  if (NULL == *ppRes) {
2!
322
    return terrno;
×
323
  }
324
  (*ppRes)->pChildren = NULL;
2✔
325
  
326
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
2!
327
  if (NULL == pExc) {
2!
328
    taosMemoryFreeClear(*ppRes);
×
329
    return terrno;
×
330
  }
331

332
  pExc->multiParams = true;
2✔
333
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
2✔
334
  if (NULL == pExc->pBatchs) {
2!
335
    taosMemoryFree(pExc);
×
336
    taosMemoryFreeClear(*ppRes);
×
337
    return terrno;
×
338
  }
339
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
2✔
340
  
341
  SExchangeOperatorBasicParam basic;
342
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
2✔
343

344
  int32_t iter = 0;
2✔
345
  void* p = NULL;
2✔
346
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
5✔
347
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
3✔
348
    SArray* pUidList = *(SArray**)p;
3✔
349
    basic.vgId = *pVgId;
3✔
350
    basic.uidList = pUidList;
3✔
351
    basic.colMap = NULL;
3✔
352
    basic.tableSeq = false;
3✔
353
    basic.isVtbRefScan = false;
3✔
354
    
355
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
3!
356

357
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
3!
358
    *(SArray**)p = NULL;
3✔
359
  }
360

361
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
2✔
362
  (*ppRes)->downstreamIdx = downstreamIdx;
2✔
363
  (*ppRes)->value = pExc;
2✔
364
  (*ppRes)->reUse = false;
2✔
365

366
  return TSDB_CODE_SUCCESS;
2✔
367
}
368

369
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
×
370
  int32_t                      code = TSDB_CODE_SUCCESS;
×
371
  int32_t                      lino = 0;
×
372
  SExchangeOperatorParam*      pExc = NULL;
×
373
  SExchangeOperatorBasicParam* basic = NULL;
×
374

375
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
376
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
377
  (*ppRes)->pChildren = NULL;
×
378

379
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
380
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno);
×
381

382
  pExc->multiParams = false;
×
383

384
  basic = &pExc->basic;
×
385
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
386

387
  basic->vgId = pMap->vgId;
×
388
  basic->tableSeq = false;
×
389
  basic->isVtbRefScan = true;
×
390
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
391
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno);
×
392
  basic->colMap->vgId = pMap->vgId;
×
393
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
394
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
×
395
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno);
×
396

397
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
×
398
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno);
×
399

400
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
401
  (*ppRes)->downstreamIdx = downstreamIdx;
×
402
  (*ppRes)->value = pExc;
×
403
  (*ppRes)->reUse = true;
×
404

405
  return TSDB_CODE_SUCCESS;
×
406

407
_return:
×
408
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
409
  taosMemoryFreeClear(*ppRes);
×
410
  if (basic) {
×
411
    if (basic->colMap) {
×
412
      taosArrayDestroy(basic->colMap->colMap);
×
413
      taosMemoryFreeClear(basic->colMap);
×
414
    }
415
    if (basic->uidList) {
×
416
      taosArrayDestroy(basic->uidList);
×
417
    }
418
    taosMemoryFreeClear(basic);
×
419
  }
420
  taosMemoryFreeClear(pExc);
×
421
  return code;
×
422
}
423

424
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
2✔
425
  int32_t code = TSDB_CODE_SUCCESS;
2✔
426
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
2!
427
  if (NULL == *ppRes) {
2!
428
    code = terrno;
×
429
    return code;
×
430
  }
431
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
2✔
432
  if (NULL == (*ppRes)->pChildren) {
2!
433
    code = terrno;
×
434
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
435
    *ppRes = NULL;
×
436
    return code;
×
437
  }
438
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
4!
439
    code = terrno;
×
440
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
441
    *ppRes = NULL;
×
442
    return code;
×
443
  }
444
  *ppChild0 = NULL;
2✔
445
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
4!
446
    code = terrno;
×
447
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
448
    *ppRes = NULL;
×
449
    return code;
×
450
  }
451
  *ppChild1 = NULL;
2✔
452
  
453
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
2!
454
  if (NULL == pJoin) {
2!
455
    code = terrno;
×
456
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
457
    *ppRes = NULL;
×
458
    return code;
×
459
  }
460

461
  pJoin->initDownstream = initParam;
2✔
462
  
463
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
2✔
464
  (*ppRes)->value = pJoin;
2✔
465
  (*ppRes)->reUse = false;
2✔
466

467
  return TSDB_CODE_SUCCESS;
2✔
468
}
469

470
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
471
  int32_t code = TSDB_CODE_SUCCESS;
×
472
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
473
  if (NULL == *ppRes) {
×
474
    code = terrno;
×
475
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
476
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
477
    return code;
×
478
  }
479
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
480
  if (NULL == *ppRes) {
×
481
    code = terrno;
×
482
    taosMemoryFreeClear(*ppRes);
×
483
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
484
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
485
    return code;
×
486
  }
487
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
488
    code = terrno;
×
489
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
490
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
491
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
492
    *ppRes = NULL;
×
493
    return code;
×
494
  }
495
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
496
    code = terrno;
×
497
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
498
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
499
    *ppRes = NULL;
×
500
    return code;
×
501
  }
502
  
503
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
504
  (*ppRes)->value = NULL;
×
505
  (*ppRes)->reUse = false;
×
506

507
  return TSDB_CODE_SUCCESS;
×
508
}
509

510
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
×
511
  int32_t code = TSDB_CODE_SUCCESS;
×
512
  int32_t vgNum = tSimpleHashGetSize(pVg);
×
513
  if (vgNum <= 0 || vgNum > 1) {
×
514
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
515
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
516
  }
517

518
  int32_t iter = 0;
×
519
  void* p = NULL;
×
520
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
521
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
×
522
    SArray* pUidList = *(SArray**)p;
×
523

524
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
×
525
    if (code) {
×
526
      return code;
×
527
    }
528
    taosArrayDestroy(pUidList);
×
529
    *(SArray**)p = NULL;
×
530
  }
531
  
532
  return TSDB_CODE_SUCCESS;
×
533
}
534

535

536
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
537
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
538
  if (NULL == pUidList) {
×
539
    return terrno;
×
540
  }
541
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
542
    return terrno;
×
543
  }
544

545
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
546
  taosArrayDestroy(pUidList);
×
547
  if (code) {
×
548
    return code;
×
549
  }
550
  
551
  return TSDB_CODE_SUCCESS;
×
552
}
553

554
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
2✔
555
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
2✔
556
  SOperatorParam*             pSrcParam0 = NULL;
2✔
557
  SOperatorParam*             pSrcParam1 = NULL;
2✔
558
  SOperatorParam*             pGcParam0 = NULL;
2✔
559
  SOperatorParam*             pGcParam1 = NULL;  
2✔
560
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
2✔
561
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
2✔
562
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
2✔
563
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
2✔
564
  int32_t                     code = TSDB_CODE_SUCCESS;
2✔
565

566
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
2!
567
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
568

569
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
2!
570
  
571
  if (pInfo->stbJoin.basic.batchFetch) {
2!
572
    if (pPrev->leftHash) {
2✔
573
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
1!
574
      if (TSDB_CODE_SUCCESS == code) {
1!
575
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
1!
576
      }
577
      if (TSDB_CODE_SUCCESS == code) {
1!
578
        tSimpleHashCleanup(pPrev->leftHash);
1✔
579
        tSimpleHashCleanup(pPrev->rightHash);
1✔
580
        pPrev->leftHash = NULL;
1✔
581
        pPrev->rightHash = NULL;
1✔
582
      }
583
    }
584
  } else {
585
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
×
586
    if (TSDB_CODE_SUCCESS == code) {
×
587
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
×
588
    }
589
  }
590

591
  bool initParam = pSrcParam0 ? true : false;
2✔
592
  if (TSDB_CODE_SUCCESS == code) {
2!
593
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
2✔
594
    pSrcParam0 = NULL;
2✔
595
  }
596
  if (TSDB_CODE_SUCCESS == code) {
2!
597
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
2✔
598
    pSrcParam1 = NULL;
2✔
599
  }
600
  if (TSDB_CODE_SUCCESS == code) {
2!
601
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
2✔
602
  }
603
  if (TSDB_CODE_SUCCESS != code) {
2!
604
    if (pSrcParam0) {
×
605
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
606
    }
607
    if (pSrcParam1) {
×
608
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
609
    }
610
    if (pGcParam0) {
×
611
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
612
    }
613
    if (pGcParam1) {
×
614
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
615
    }
616
    if (*ppParam) {
×
617
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
618
      *ppParam = NULL;
×
619
    }
620
  }
621
  
622
  return code;
2✔
623
}
624

625
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2✔
626
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2✔
627
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
2✔
628
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
2✔
629
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
2✔
630
  SOperatorParam*            pParam = NULL;
2✔
631
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
2✔
632
  if (TSDB_CODE_SUCCESS != code) {
2!
633
    pOperator->pTaskInfo->code = code;
×
634
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
635
  }
636

637
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
2!
638
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
2✔
639
  if (*ppRes && (code == 0)) {
2!
640
    code = blockDataCheck(*ppRes);
2✔
641
    if (code) {
2!
642
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
643
      pOperator->pTaskInfo->code = code;
×
644
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
645
    }
646
    pPost->isStarted = true;
2✔
647
    pStbJoin->execInfo.postBlkNum++;
2✔
648
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
2✔
649
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
2!
650
  } else {
651
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
×
652
  }
653
}
2✔
654

655

656
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
657
  SOperatorParam* pGcParam = NULL;
×
658
  SOperatorParam* pMergeJoinParam = NULL;
×
659
  int32_t         downstreamId = leftTable ? 0 : 1;
×
660
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
661
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
662

663
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
664

665
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
666
  if (TSDB_CODE_SUCCESS != code) {
×
667
    return code;
×
668
  }
669
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
670
  if (TSDB_CODE_SUCCESS != code) {
×
671
    return code;
×
672
  }
673

674
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
675
}
676

677
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
2✔
678
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
2✔
679
  int32_t code = 0;
2✔
680
  
681
  pPost->isStarted = false;
2✔
682
  
683
  if (pStbJoin->basic.batchFetch) {
2!
684
    return TSDB_CODE_SUCCESS;
2✔
685
  }
686
  
687
  if (pPost->leftNeedCache) {
×
688
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
689
    if (num && --(*num) <= 0) {
×
690
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
691
      if (code) {
×
692
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
693
        QRY_ERR_RET(code);
×
694
      }
695
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
696
    }
697
  }
698
  
699
  if (!pPost->rightNeedCache) {
×
700
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
701
    if (NULL != v) {
×
702
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
703
      if (code) {
×
704
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
705
        QRY_ERR_RET(code);
×
706
      }
707
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
708
    }
709
  }
710

711
  return TSDB_CODE_SUCCESS;
×
712
}
713

714

715
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
716
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3✔
717
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
3✔
718
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
3✔
719

720
  if (!pPost->isStarted) {
3✔
721
    return TSDB_CODE_SUCCESS;
1✔
722
  }
723
  
724
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
2!
725
  
726
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
2✔
727
  if (NULL == *ppRes) {
2!
728
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
2!
729
    pPrev->pListHead->readIdx++;
2✔
730
  } else {
731
    pInfo->stbJoin.execInfo.postBlkNum++;
×
732
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
733
  }
734

735
  return TSDB_CODE_SUCCESS;
2✔
736
}
737

738
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
739
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
4✔
740
  if (NULL == ppArray) {
4!
741
    SArray* pArray = taosArrayInit(10, valSize);
3✔
742
    if (NULL == pArray) {
3!
743
      return terrno;
×
744
    }
745
    if (NULL == taosArrayPush(pArray, pVal)) {
6!
746
      taosArrayDestroy(pArray);
×
747
      return terrno;
×
748
    }
749
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
3!
750
      taosArrayDestroy(pArray);      
×
751
      return terrno;
×
752
    }
753
    return TSDB_CODE_SUCCESS;
3✔
754
  }
755

756
  if (NULL == taosArrayPush(*ppArray, pVal)) {
2!
757
    return terrno;
×
758
  }
759
  
760
  return TSDB_CODE_SUCCESS;
1✔
761
}
762

763
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
764
  int32_t code = TSDB_CODE_SUCCESS;
×
765
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
×
766
  if (NULL == pNum) {
×
767
    uint32_t n = 1;
×
768
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
×
769
    if (code) {
×
770
      return code;
×
771
    }
772
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
×
773
    if (code) {
×
774
      return code;
×
775
    }
776
    return TSDB_CODE_SUCCESS;
×
777
  }
778

779
  switch (*pNum) {
×
780
    case 0:
×
781
      break;
×
782
    case UINT32_MAX:
×
783
      *pNum = 0;
×
784
      break;
×
785
    default:
×
786
      if (1 == (*pNum)) {
×
787
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
788
        if (code) {
×
789
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
790
          QRY_ERR_RET(code);
×
791
        }
792
      }
793
      (*pNum)++;
×
794
      break;
×
795
  }
796
  
797
  return TSDB_CODE_SUCCESS;
×
798
}
799

800

801
static void freeStbJoinTableList(SStbJoinTableList* pList) {
1✔
802
  if (NULL == pList) {
1!
803
    return;
×
804
  }
805
  taosMemoryFree(pList->pLeftVg);
1!
806
  taosMemoryFree(pList->pLeftUid);
1!
807
  taosMemoryFree(pList->pRightVg);
1!
808
  taosMemoryFree(pList->pRightUid);
1!
809
  taosMemoryFree(pList);
1!
810
}
811

812
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
1✔
813
  int32_t code = TSDB_CODE_SUCCESS;
1✔
814
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
1!
815
  if (NULL == pNew) {
1!
816
    return terrno;
×
817
  }
818
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
1!
819
  if (NULL == pNew->pLeftVg) {
1!
820
    code = terrno;
×
821
    freeStbJoinTableList(pNew);
×
822
    return code;
×
823
  }
824
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
1!
825
  if (NULL == pNew->pLeftUid) {
1!
826
    code = terrno;
×
827
    freeStbJoinTableList(pNew);
×
828
    return code;
×
829
  }
830
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
1!
831
  if (NULL == pNew->pRightVg) {
1!
832
    code = terrno;
×
833
    freeStbJoinTableList(pNew);
×
834
    return code;
×
835
  }
836
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
1!
837
  if (NULL == pNew->pRightUid) {
1!
838
    code = terrno;
×
839
    freeStbJoinTableList(pNew);
×
840
    return code;
×
841
  }
842

843
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
1✔
844
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
1✔
845
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
1✔
846
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
1✔
847

848
  pNew->readIdx = 0;
1✔
849
  pNew->uidNum = rows;
1✔
850
  pNew->pNext = NULL;
1✔
851
  
852
  if (pCtx->pListTail) {
1!
853
    pCtx->pListTail->pNext = pNew;
×
854
    pCtx->pListTail = pNew;
×
855
  } else {
856
    pCtx->pListHead = pNew;
1✔
857
    pCtx->pListTail= pNew;
1✔
858
  }
859

860
  return TSDB_CODE_SUCCESS;
1✔
861
}
862

863
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
1✔
864
  int32_t                    code = TSDB_CODE_SUCCESS;
1✔
865
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1✔
866
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1✔
867
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
1✔
868
  if (NULL == pVg0) {
1!
869
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
870
  }
871
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
1✔
872
  if (NULL == pVg1) {
1!
873
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
874
  }
875
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
1✔
876
  if (NULL == pUid0) {
1!
877
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
878
  }
879
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
1✔
880
  if (NULL == pUid1) {
1!
881
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
882
  }
883

884
  if (pStbJoin->basic.batchFetch) {
1!
885
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
3✔
886
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
2✔
887
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
2✔
888
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
2✔
889
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
2✔
890

891
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
2✔
892
      if (TSDB_CODE_SUCCESS != code) {
2!
893
        break;
×
894
      }
895
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
2✔
896
      if (TSDB_CODE_SUCCESS != code) {
2!
897
        break;
×
898
      }
899
    }
900
  } else {
901
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
902
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
×
903
    
904
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
×
905
      if (TSDB_CODE_SUCCESS != code) {
×
906
        break;
×
907
      }
908
    }
909
  }
910

911
  if (TSDB_CODE_SUCCESS == code) {
1!
912
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
1✔
913
    if (TSDB_CODE_SUCCESS == code) {
1!
914
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
1✔
915
    }
916
  }
917

918
_return:
×
919

920
  if (TSDB_CODE_SUCCESS != code) {
1!
921
    pOperator->pTaskInfo->code = code;
×
922
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
923
  }
924
}
1✔
925

926

927
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
1✔
928
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1✔
929
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1✔
930

931
  if (pStbJoin->basic.batchFetch) {
1!
932
    return;
1✔
933
  }
934

935
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
×
936
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
×
937
    return;
×
938
  }
939

940
  uint64_t* pUid = NULL;
×
941
  int32_t iter = 0;
×
942
  int32_t code = 0;
×
943
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
944
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
945
    if (code) {
×
946
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
947
    }
948
  }
949

950
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
951
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
952

953
/*
954
  // debug only
955
  iter = 0;
956
  uint32_t* num = NULL;
957
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
958
    A S S E R T(*num > 1);
959
  }
960
*/  
961
}
962

963
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
1✔
964
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1✔
965
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1✔
966

967
  while (true) {
1✔
968
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
2✔
969
    if (NULL == pBlock) {
2✔
970
      break;
1✔
971
    }
972

973
    pStbJoin->execInfo.prevBlkNum++;
1✔
974
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
1✔
975
    
976
    doBuildStbJoinTableHash(pOperator, pBlock);
1✔
977
  }
978

979
  postProcessStbJoinTableHash(pOperator);
1✔
980

981
  pStbJoin->ctx.prev.joinBuild = true;
1✔
982
}
1✔
983

984
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3✔
985
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3✔
986
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3✔
987
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
3✔
988
  SStbJoinTableList*         pNode = pPrev->pListHead;
3✔
989

990
  while (pNode) {
4✔
991
    if (pNode->readIdx >= pNode->uidNum) {
3✔
992
      pPrev->pListHead = pNode->pNext;
1✔
993
      freeStbJoinTableList(pNode);
1✔
994
      pNode = pPrev->pListHead;
1✔
995
      continue;
1✔
996
    }
997
    
998
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
2✔
999
    if (*ppRes) {
2!
1000
      return TSDB_CODE_SUCCESS;
2✔
1001
    }
1002

1003
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
×
1004
    pPrev->pListHead->readIdx++;
×
1005
  }
1006

1007
  *ppRes = NULL;
1✔
1008
  setOperatorCompleted(pOperator);
1✔
1009

1010
  return TSDB_CODE_SUCCESS;
1✔
1011
}
1012

1013
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
3✔
1014
  if (pBlock) {
3✔
1015
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
2!
1016
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
2✔
1017
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
2!
1018

1019
      for (int i = pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
2!
1020
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
×
1021
        if (pSlot == NULL) {
×
1022
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1023
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1024
        }
1025
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
×
1026
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
×
1027
        if (code != TSDB_CODE_SUCCESS) {
×
1028
          return code;
×
1029
        }
1030
        code = blockDataAppendColInfo(pBlock, &colInfo);
×
1031
        if (code != TSDB_CODE_SUCCESS) {
×
1032
          return code;
×
1033
        }
1034
      }
1035
    } else {
1036
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1037
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1038
    }
1039
  }
1040
  return TSDB_CODE_SUCCESS;
3✔
1041
}
1042

1043
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
3✔
1044
  int32_t                    code = TSDB_CODE_SUCCESS;
3✔
1045
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
3✔
1046
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
3✔
1047

1048
  QRY_PARAM_CHECK(pRes);
3!
1049
  if (pOperator->status == OP_EXEC_DONE) {
3!
1050
    return code;
×
1051
  }
1052

1053
  int64_t st = 0;
3✔
1054
  if (pOperator->cost.openCost == 0) {
3✔
1055
    st = taosGetTimestampUs();
1✔
1056
  }
1057

1058
  if (!pStbJoin->ctx.prev.joinBuild) {
3✔
1059
    buildStbJoinTableList(pOperator);
1✔
1060
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
1!
1061
      setOperatorCompleted(pOperator);
×
1062
      goto _return;
×
1063
    }
1064
  }
1065

1066
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
3!
1067
  if (*pRes) {
3!
1068
    goto _return;
×
1069
  }
1070

1071
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
3!
1072

1073
_return:
3✔
1074
  if (pOperator->cost.openCost == 0) {
3✔
1075
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
1✔
1076
  }
1077

1078
  if (code) {
3!
1079
    qError("%s failed since %s", __func__, tstrerror(code));
×
1080
    pOperator->pTaskInfo->code = code;
×
1081
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1082
  } else {
1083
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
3✔
1084
  }
1085
  return code;
3✔
1086
}
1087

1088
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
×
1089
  int32_t                   code = TSDB_CODE_SUCCESS;
×
1090
  int32_t                   lino = 0;
×
1091
  SVTableScanOperatorParam* pVScan = NULL;
×
1092
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
1093
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
1094

1095
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
1096
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
×
1097

1098
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
×
1099
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno);
×
1100
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
×
1101
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno);
×
1102
  pVScan->uid = uid;
×
1103

1104
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
×
1105
  (*ppRes)->downstreamIdx = 0;
×
1106
  (*ppRes)->value = pVScan;
×
1107
  (*ppRes)->reUse = false;
×
1108

1109
  return TSDB_CODE_SUCCESS;
×
1110
_return:
×
1111
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1112
  if (pVScan) {
×
1113
    taosArrayDestroy(pVScan->pOpParamArray);
×
1114
    taosMemoryFreeClear(pVScan);
×
1115
  }
1116
  if (*ppRes) {
×
1117
    taosArrayDestroy((*ppRes)->pChildren);
×
1118
    taosMemoryFreeClear(*ppRes);
×
1119
  }
1120
  return code;
×
1121
}
1122

1123
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
1124
  int32_t                    lino = 0;
×
1125
  SOperatorInfo*             operator=(SOperatorInfo*) param;
×
1126
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
×
1127

1128
  if (TSDB_CODE_SUCCESS != code) {
×
1129
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1130
    if (operator->pTaskInfo->code != code) {
×
1131
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1132
             tstrerror(operator->pTaskInfo->code));
1133
    } else {
1134
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1135
    }
1136
    goto _return;
×
1137
  }
1138

1139
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
×
1140
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno);
×
1141

1142
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
×
1143
  QUERY_CHECK_CODE(code, lino, _return);
×
1144

1145
  taosMemoryFreeClear(pMsg->pData);
×
1146

1147
  code = tsem_post(&pScanResInfo->vtbScan.ready);
×
1148
  QUERY_CHECK_CODE(code, lino, _return);
×
1149

1150
  return code;
×
1151
_return:
×
1152
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1153
  return code;
×
1154
}
1155

1156
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SReadHandle* pHandle, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
×
1157
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1158
  int32_t                    lino = 0;
×
1159
  char*                      buf1 = NULL;
×
1160
  SUseDbReq*                 pReq = NULL;
×
1161
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
×
1162

1163
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
×
1164
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
×
1165
  code = tNameGetFullDbName(name, pReq->db);
×
1166
  QUERY_CHECK_CODE(code, lino, _return);
×
1167
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
×
1168
  buf1 = taosMemoryCalloc(1, contLen);
×
1169
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
×
1170
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
×
1171
  if (tempRes < 0) {
×
1172
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1173
  }
1174

1175
  // send the fetch remote task result request
1176
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
1177
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
×
1178

1179
  pMsgSendInfo->param = pOperator;
×
1180
  pMsgSendInfo->msgInfo.pData = buf1;
×
1181
  pMsgSendInfo->msgInfo.len = contLen;
×
1182
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
×
1183
  pMsgSendInfo->fp = dynProcessUseDbRsp;
×
1184
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
×
1185

1186
  code = asyncSendMsgToServer(pHandle->pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
×
1187
  QUERY_CHECK_CODE(code, lino, _return);
×
1188

1189
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
×
1190
  QUERY_CHECK_CODE(code, lino, _return);
×
1191

1192
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
×
1193
  QUERY_CHECK_CODE(code, lino, _return);
×
1194

1195
_return:
×
1196
  if (code) {
×
1197
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1198
     taosMemoryFree(buf1);
×
1199
  }
1200
  taosMemoryFree(pReq);
×
1201
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
×
1202
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
×
1203
  return code;
×
1204
}
1205

1206
int dynVgInfoComp(const void* lp, const void* rp) {
×
1207
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
×
1208
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
×
1209
  if (pLeft->hashBegin < pRight->hashBegin) {
×
1210
    return -1;
×
1211
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1212
    return 1;
×
1213
  }
1214

1215
  return 0;
×
1216
}
1217

1218
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
×
1219
  if (NULL == dbInfo) {
×
1220
    return TSDB_CODE_SUCCESS;
×
1221
  }
1222

1223
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
×
1224
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
×
1225
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
×
1226
    if (NULL == dbInfo->vgArray) {
×
1227
      return terrno;
×
1228
    }
1229

1230
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
×
1231
    while (pIter) {
×
1232
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
×
1233
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1234
        return terrno;
×
1235
      }
1236

1237
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
×
1238
    }
1239

1240
    taosArraySort(dbInfo->vgArray, sort_func);
×
1241
  }
1242

1243
  return TSDB_CODE_SUCCESS;
×
1244
}
1245

1246
int32_t dynHashValueComp(void const* lp, void const* rp) {
×
1247
  uint32_t*    key = (uint32_t*)lp;
×
1248
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
1249

1250
  if (*key < pVg->hashBegin) {
×
1251
    return -1;
×
1252
  } else if (*key > pVg->hashEnd) {
×
1253
    return 1;
×
1254
  }
1255

1256
  return 0;
×
1257
}
1258

1259
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
×
1260
  int32_t code = 0;
×
1261
  int32_t lino = 0;
×
1262
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
×
1263
  QUERY_CHECK_CODE(code, lino, _return);
×
1264

1265
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
×
1266
  if (vgNum <= 0) {
×
1267
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1268
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1269
  }
1270

1271
  SVgroupInfo* vgInfo = NULL;
×
1272
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1273
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
×
1274
  int32_t offset = (int32_t)strlen(tbFullName);
×
1275

1276
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
×
1277
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
×
1278
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
1279

1280
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
×
1281
  if (NULL == vgInfo) {
×
1282
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1283
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1284
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1285
  }
1286

1287
  *vgId = vgInfo->vgId;
×
1288

1289
_return:
×
1290
  return code;
×
1291
}
1292

1293
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
×
1294
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1295
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1296
  SArray *                   pColList = pVtbScan->readColList;
×
1297
  if (pVtbScan->scanAllCols) {
×
1298
    return true;
×
1299
  }
1300
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
×
1301
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
×
1302
      return true;
×
1303
    }
1304
  }
1305
  return false;
×
1306
}
1307

1308
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
×
1309
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1310
  int32_t                    line = 0;
×
1311
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1312
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1313
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1314
  SReadHandle*               pHandle = &pVtbScan->readHandle;
×
1315
  SUseDbOutput*              output = NULL;
×
1316
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
×
1317

1318
  QRY_PARAM_CHECK(dbVgInfo);
×
1319

1320
  if (find == NULL) {
×
1321
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
×
1322
    code = buildDbVgInfoMap(pOperator, pHandle, name, pTaskInfo, output);
×
1323
    QUERY_CHECK_CODE(code, line, _return);
×
1324
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
×
1325
    QUERY_CHECK_CODE(code, line, _return);
×
1326
  } else {
1327
    output = *find;
×
1328
  }
1329

1330
  *dbVgInfo = output->dbVgroup;
×
1331
  return code;
×
1332
_return:
×
1333
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1334
  freeUseDbOutput(output);
×
1335
  return code;
×
1336
}
1337

1338
int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
1339
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1340
  int32_t                    line = 0;
×
1341
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1342
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1343
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1344
  SReadHandle*               pHandle = &pVtbScan->readHandle;
×
1345
  SMetaReader                mr = {0};
×
1346
  SDBVgInfo*                 dbVgInfo = NULL;
×
1347
  bool                       readerInit = false;
×
1348

1349
  QRY_PARAM_CHECK(pRes);
×
1350
  if (pOperator->status == OP_EXEC_DONE) {
×
1351
    return code;
×
1352
  }
1353

1354
  int64_t st = 0;
×
1355
  if (pOperator->cost.openCost == 0) {
×
1356
    st = taosGetTimestampUs();
×
1357
  }
1358

1359
  size_t num = taosArrayGetSize(pVtbScan->childTableList);
×
1360

1361
  // no child table, return
1362
  if (num == 0) {
×
1363
    setOperatorCompleted(pOperator);
×
1364
    return code;
×
1365
  }
1366

1367
  pVtbScan->orgTbVgColMap = taosHashInit(num * 64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1368
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno);
×
1369
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
1370

1371
  while (true) {
1372
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
×
1373
      code = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes);
×
1374
      QUERY_CHECK_CODE(code, line, _return);
×
1375
    } else {
1376
      uint64_t* id = taosArrayGet(pVtbScan->childTableList, pVtbScan->curTableIdx);
×
1377
      QUERY_CHECK_NULL(id, code, line, _return, terrno);
×
1378
      pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn);
×
1379
      readerInit = true;
×
1380
      code = pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id);
×
1381
      QUERY_CHECK_CODE(code, line, _return);
×
1382

1383
      for (int32_t j = 0; j < mr.me.colRef.nCols; j++) {
×
1384
        if (mr.me.colRef.pColRef[j].hasRef && colNeedScan(pOperator, mr.me.colRef.pColRef[j].id)) {
×
1385
          SName   name = {0};
×
1386
          char    dbFname[TSDB_DB_FNAME_LEN] = {0};
×
1387
          char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
×
1388
          toName(pInfo->vtbScan.acctId, mr.me.colRef.pColRef[j].refDbName, mr.me.colRef.pColRef[j].refTableName, &name);
×
1389
          code = getDbVgInfo(pOperator, &name, &dbVgInfo);
×
1390
          QUERY_CHECK_CODE(code, line, _return);
×
1391
          tNameGetFullDbName(&name, dbFname);
×
1392
          QUERY_CHECK_CODE(code, line, _return);
×
1393
          tNameGetFullTableName(&name, orgTbFName);
×
1394
          QUERY_CHECK_CODE(code, line, _return);
×
1395

1396
          void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
×
1397
          if (!pVal) {
×
1398
            SOrgTbInfo map = {0};
×
1399
            code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
×
1400
            QUERY_CHECK_CODE(code, line, _return);
×
1401
            tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
×
1402
            map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
×
1403
            QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno);
×
1404
            SColIdNameKV colIdNameKV = {0};
×
1405
            colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
×
1406
            tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
×
1407
            QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno);
×
1408
            code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
×
1409
            QUERY_CHECK_CODE(code, line, _return);
×
1410
          } else {
1411
            SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
×
1412
            SColIdNameKV colIdNameKV = {0};
×
1413
            colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
×
1414
            tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
×
1415
            QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno);
×
1416
          }
1417
        }
1418
      }
1419

1420
      pVtbScan->vtbScanParam = NULL;
×
1421
      code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, *id);
×
1422
      QUERY_CHECK_CODE(code, line, _return);
×
1423

1424
      void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
×
1425
      while (pIter != NULL) {
×
1426
        SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
×
1427
        SOperatorParam*  pExchangeParam = NULL;
×
1428
        code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
×
1429
        QUERY_CHECK_CODE(code, line, _return);
×
1430
        QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno);
×
1431
        pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
×
1432
      }
1433
      pHandle->api.metaReaderFn.clearReader(&mr);
×
1434
      readerInit = false;
×
1435

1436
      // reset downstream operator's status
1437
      pOperator->pDownstream[0]->status = OP_NOT_OPENED;
×
1438
      code = pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes);
×
1439
      QUERY_CHECK_CODE(code, line, _return);
×
1440
    }
1441

1442
    if (*pRes) {
×
1443
      // has result, still read data from this table.
1444
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
×
1445
      break;
×
1446
    } else {
1447
      // no result, read next table.
1448
      pVtbScan->curTableIdx++;
×
1449
      if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
×
1450
        setOperatorCompleted(pOperator);
×
1451
        break;
×
1452
      }
1453
    }
1454
  }
1455

1456
_return:
×
1457
  if (readerInit) {
×
1458
    pHandle->api.metaReaderFn.clearReader(&mr);
×
1459
  }
1460
  taosHashCleanup(pVtbScan->orgTbVgColMap);
×
1461
  pVtbScan->orgTbVgColMap = NULL;
×
1462
  if (pOperator->cost.openCost == 0) {
×
1463
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
1464
  }
1465

1466
  if (code) {
×
1467
    qError("%s failed since %s", __func__, tstrerror(code));
×
1468
    pOperator->pTaskInfo->code = code;
×
1469
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1470
  }
1471

1472
  return code;
×
1473
}
1474

1475
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
1✔
1476
  if (batchFetch) {
1!
1477
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1✔
1478
    if (NULL == pPrev->leftHash) {
1!
1479
      return terrno;
×
1480
    }
1481
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1✔
1482
    if (NULL == pPrev->rightHash) {
1!
1483
      return terrno;
×
1484
    }
1485
  } else {
1486
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1487
    if (NULL == pPrev->leftCache) {
×
1488
      return terrno;
×
1489
    }
1490
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1491
    if (NULL == pPrev->rightCache) {
×
1492
      return terrno;
×
1493
    }
1494
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1495
    if (NULL == pPrev->onceTable) {
×
1496
      return terrno;
×
1497
    }
1498
  }
1499

1500
  return TSDB_CODE_SUCCESS;
1✔
1501
}
1502

1503
static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorInfo* pInfo, SReadHandle* pHandle,
×
1504
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
1505
  int32_t      code = TSDB_CODE_SUCCESS;
×
1506
  int32_t      line = 0;
×
1507

1508
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
×
1509
  QUERY_CHECK_CODE(code, line, _return);
×
1510

1511
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
×
1512
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
×
1513
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
×
1514
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
×
1515
  pInfo->vtbScan.readHandle = *pHandle;
×
1516
  pInfo->vtbScan.curTableIdx = 0;
×
1517
  pInfo->vtbScan.lastTableIdx = -1;
×
1518
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
×
1519
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno);
×
1520

1521
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
×
1522
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno);
×
1523

1524
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
×
1525
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
×
1526
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno);
×
1527
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno);
×
1528
  }
1529

1530
  pInfo->vtbScan.childTableList = taosArrayInit(10, sizeof(uint64_t));
×
1531
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno);
×
1532
  code = pHandle->api.metaFn.getChildTableList(pHandle->vnode, pInfo->vtbScan.suid, pInfo->vtbScan.childTableList);
×
1533
  QUERY_CHECK_CODE(code, line, _return);
×
1534

1535
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1536
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno);
×
1537

1538
  return code;
×
1539
_return:
×
1540
  // no need to destroy array and hashmap allocated in this function,
1541
  // since the operator's destroy function will take care of it
1542
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1543
  return code;
×
1544
}
1545

1546
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
1✔
1547
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1548
                                       SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
1549
  QRY_PARAM_CHECK(pOptrInfo);
1!
1550

1551
  int32_t                    code = TSDB_CODE_SUCCESS;
1✔
1552
  int32_t                    line = 0;
1✔
1553
  __optr_fn_t                nextFp = NULL;
1✔
1554
  SOperatorInfo*             pOperator = NULL;
1✔
1555
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
1!
1556
  if (pInfo == NULL) {
1!
1557
    code = terrno;
×
1558
    goto _error;
×
1559
  }
1560

1561
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1!
1562
  if (pOperator == NULL) {
1!
1563
    code = terrno;
×
1564
    goto _error;
×
1565
  }
1566

1567
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
1✔
1568

1569
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
1✔
1570
  if (TSDB_CODE_SUCCESS != code) {
1!
1571
    goto _error;
×
1572
  }
1573

1574
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
1✔
1575
                  pInfo, pTaskInfo);
1576

1577
  pInfo->qType = pPhyciNode->qType;
1✔
1578
  switch (pInfo->qType) {
1!
1579
    case DYN_QTYPE_STB_HASH:
1✔
1580
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
1✔
1581
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
1✔
1582
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
1✔
1583
      if (TSDB_CODE_SUCCESS != code) {
1!
1584
        goto _error;
×
1585
      }
1586
      nextFp = seqStableJoin;
1✔
1587
      break;
1✔
1588
    case DYN_QTYPE_VTB_SCAN:
×
1589
      code = initVtbScanInfo(pOperator, pInfo, pHandle, pPhyciNode, pTaskInfo);
×
1590
      QUERY_CHECK_CODE(code, line, _error);
×
1591
      nextFp = vtbScan;
×
1592
      break;
×
1593
    default:
×
1594
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1595
      code = TSDB_CODE_INVALID_PARA;
×
1596
      goto _error;
×
1597
  }
1598

1599
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
1✔
1600
                                         NULL, optrDefaultGetNextExtFn, NULL);
1601

1602
  *pOptrInfo = pOperator;
1✔
1603
  return TSDB_CODE_SUCCESS;
1✔
1604

1605
_error:
×
1606
  if (pInfo != NULL) {
×
1607
    destroyDynQueryCtrlOperator(pInfo);
×
1608
  }
1609

1610
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1611
  pTaskInfo->code = code;
×
1612
  return code;
×
1613
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc