• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.43
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
11,035✔
41
  SStbJoinTableList* pNext = NULL;
11,035✔
42
  
43
  while (pListHead) {
11,036✔
44
    taosMemoryFree(pListHead->pLeftVg);
1!
45
    taosMemoryFree(pListHead->pLeftUid);
1!
46
    taosMemoryFree(pListHead->pRightVg);
1!
47
    taosMemoryFree(pListHead->pRightUid);
1!
48
    pNext = pListHead->pNext;
1✔
49
    taosMemoryFree(pListHead);
1!
50
    pListHead = pNext;
1✔
51
  }
52
}
11,035✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
11,035✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
11,035✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
11,035✔
60
    if (pStbJoin->ctx.prev.leftHash) {
11,023✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
10,705✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
10,705✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
11,023✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
10,705✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
10,705✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
12!
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
12✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
12!
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
12✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
12!
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
12✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
11,035✔
81
}
11,035✔
82

83
void destroyOrgTbInfo(void *info) {
×
84
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
×
85
  if (pOrgTbInfo) {
×
86
    taosArrayDestroy(pOrgTbInfo->colMap);
×
87
  }
88
}
×
89

90
void freeUseDbOutput(void* pOutput) {
×
91
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
×
92
  if (NULL == pOutput) {
×
93
    return;
×
94
  }
95

96
  if (pOut->dbVgroup) {
×
97
    freeVgInfo(pOut->dbVgroup);
×
98
  }
99
  taosMemFree(pOut);
×
100
}
101

102
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
×
103
  if (pVtbScan->childTableList) {
×
104
    taosArrayDestroy(pVtbScan->childTableList);
×
105
  }
106
  if (pVtbScan->readColList) {
×
107
    taosArrayDestroy(pVtbScan->readColList);
×
108
  }
109
  if (pVtbScan->dbVgInfoMap) {
×
110
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
×
111
    taosHashCleanup(pVtbScan->dbVgInfoMap);
×
112
  }
113
  if (pVtbScan->orgTbVgColMap) {
×
114
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
115
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
116
  }
117
  if (pVtbScan->pRsp) {
×
118
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
119
    taosMemoryFreeClear(pVtbScan->pRsp);
×
120
  }
121
}
×
122

123
static void destroyDynQueryCtrlOperator(void* param) {
11,035✔
124
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
11,035✔
125

126
  switch (pDyn->qType) {
11,035!
127
    case DYN_QTYPE_STB_HASH:
11,035✔
128
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
11,035✔
129
      break;
11,035✔
130
    case DYN_QTYPE_VTB_SCAN:
×
131
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
×
132
      break;
×
133
    default:
×
134
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
135
      break;
×
136
  }
137

138
  taosMemoryFreeClear(param);
11,035!
139
}
11,035✔
140

141
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
142
  if (batchFetch) {
42,112✔
143
    return true;
42,064✔
144
  }
145
  
146
  if (rightTable) {
48!
147
    return pPost->rightCurrUid == pPost->rightNextUid;
24✔
148
  }
149

150
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
24✔
151

152
  return (NULL == num) ? false : true;
24✔
153
}
154

155
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
21,056✔
156
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
21,056✔
157
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
21,056✔
158
  SStbJoinTableList*         pNode = pPrev->pListHead;
21,056✔
159
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
21,056✔
160
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
21,056✔
161
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
21,056✔
162
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
21,056✔
163
  int64_t                    readIdx = pNode->readIdx + 1;
21,056✔
164
  int64_t                    rightPrevUid = pPost->rightCurrUid;
21,056✔
165

166
  pPost->leftCurrUid = *leftUid;
21,056✔
167
  pPost->rightCurrUid = *rightUid;
21,056✔
168

169
  pPost->leftVgId = *leftVgId;
21,056✔
170
  pPost->rightVgId = *rightVgId;
21,056✔
171

172
  while (true) {
173
    if (readIdx < pNode->uidNum) {
21,056✔
174
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
20,727✔
175
      break;
20,727✔
176
    }
177
    
178
    pNode = pNode->pNext;
329✔
179
    if (NULL == pNode) {
329!
180
      pPost->rightNextUid = 0;
329✔
181
      break;
329✔
182
    }
183
    
184
    rightUid = pNode->pRightUid;
×
185
    readIdx = 0;
×
186
  }
187

188
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
21,056✔
189
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
21,056✔
190

191
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
21,056!
192
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
193
    pStbJoin->execInfo.rightCacheNum++;
×
194
  }  
195

196
  return TSDB_CODE_SUCCESS;
21,056✔
197
}
198

199

200
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
42,112✔
201
  int32_t code = TSDB_CODE_SUCCESS;
42,112✔
202
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
42,112!
203
  if (NULL == *ppRes) {
42,112!
204
    code = terrno;
×
205
    freeOperatorParam(pChild, OP_GET_PARAM);
×
206
    return code;
×
207
  }
208
  if (pChild) {
42,112✔
209
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
684✔
210
    if (NULL == (*ppRes)->pChildren) {
684!
211
      code = terrno;
×
212
      freeOperatorParam(pChild, OP_GET_PARAM);
×
213
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
214
      *ppRes = NULL;
×
215
      return code;
×
216
    }
217
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
1,368!
218
      code = terrno;
×
219
      freeOperatorParam(pChild, OP_GET_PARAM);
×
220
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
221
      *ppRes = NULL;
×
222
      return code;
×
223
    }
224
  } else {
225
    (*ppRes)->pChildren = NULL;
41,428✔
226
  }
227

228
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
42,112!
229
  if (NULL == pGc) {
42,112!
230
    code = terrno;
×
231
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
232
    *ppRes = NULL;
×
233
    return code;
×
234
  }
235

236
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
42,112✔
237
  pGc->downstreamIdx = downstreamIdx;
42,112✔
238
  pGc->vgId = vgId;
42,112✔
239
  pGc->tbUid = tbUid;
42,112✔
240
  pGc->needCache = needCache;
42,112✔
241

242
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
42,112✔
243
  (*ppRes)->downstreamIdx = downstreamIdx;
42,112✔
244
  (*ppRes)->value = pGc;
42,112✔
245
  (*ppRes)->reUse = false;
42,112✔
246

247
  return TSDB_CODE_SUCCESS;
42,112✔
248
}
249

250

251
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
252
  int32_t code = TSDB_CODE_SUCCESS;
×
253
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
254
  if (NULL == *ppRes) {
×
255
    return terrno;
×
256
  }
257
  (*ppRes)->pChildren = NULL;
×
258

259
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
260
  if (NULL == pGc) {
×
261
    code = terrno;
×
262
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
263
    return code;
×
264
  }
265

266
  pGc->downstreamIdx = downstreamIdx;
×
267
  pGc->vgId = vgId;
×
268
  pGc->tbUid = tbUid;
×
269

270
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
271
  (*ppRes)->downstreamIdx = downstreamIdx;
×
272
  (*ppRes)->value = pGc;
×
273
  (*ppRes)->reUse = false;
×
274

275
  return TSDB_CODE_SUCCESS;
×
276
}
277

278

279
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
48✔
280
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
48!
281
  if (NULL == *ppRes) {
48!
282
    return terrno;
×
283
  }
284
  (*ppRes)->pChildren = NULL;
48✔
285
  
286
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
48!
287
  if (NULL == pExc) {
48!
288
    return terrno;
×
289
  }
290

291
  pExc->multiParams = false;
48✔
292
  pExc->basic.vgId = *pVgId;
48✔
293
  pExc->basic.tableSeq = true;
48✔
294
  pExc->basic.isVtbRefScan = false;
48✔
295
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
48✔
296
  pExc->basic.colMap = NULL;
48✔
297
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
48✔
298
  if (NULL == pExc->basic.uidList) {
48!
299
    taosMemoryFree(pExc);
×
300
    return terrno;
×
301
  }
302
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
96!
303
    taosArrayDestroy(pExc->basic.uidList);
×
304
    taosMemoryFree(pExc);
×
305
    return terrno;
×
306
  }
307

308
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
48✔
309
  (*ppRes)->downstreamIdx = downstreamIdx;
48✔
310
  (*ppRes)->value = pExc;
48✔
311
  (*ppRes)->reUse = false;
48✔
312

313
  return TSDB_CODE_SUCCESS;
48✔
314
}
315

316
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
587✔
317
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
587!
318
  if (NULL == *ppRes) {
587!
319
    return terrno;
×
320
  }
321
  (*ppRes)->pChildren = NULL;
587✔
322
  
323
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
587!
324
  if (NULL == pExc) {
587!
325
    taosMemoryFreeClear(*ppRes);
×
326
    return terrno;
×
327
  }
328

329
  pExc->multiParams = true;
587✔
330
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
587✔
331
  if (NULL == pExc->pBatchs) {
587!
332
    taosMemoryFree(pExc);
×
333
    taosMemoryFreeClear(*ppRes);
×
334
    return terrno;
×
335
  }
336
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
587✔
337
  
338
  SExchangeOperatorBasicParam basic;
339
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
587✔
340

341
  int32_t iter = 0;
587✔
342
  void* p = NULL;
587✔
343
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
1,543✔
344
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
956✔
345
    SArray* pUidList = *(SArray**)p;
956✔
346
    basic.vgId = *pVgId;
956✔
347
    basic.uidList = pUidList;
956✔
348
    basic.colMap = NULL;
956✔
349
    basic.tableSeq = false;
956✔
350
    basic.isVtbRefScan = false;
956✔
351
    
352
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
956!
353

354
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
956!
355
    *(SArray**)p = NULL;
956✔
356
  }
357

358
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
587✔
359
  (*ppRes)->downstreamIdx = downstreamIdx;
587✔
360
  (*ppRes)->value = pExc;
587✔
361
  (*ppRes)->reUse = false;
587✔
362

363
  return TSDB_CODE_SUCCESS;
587✔
364
}
365

366
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
×
367
  int32_t                      code = TSDB_CODE_SUCCESS;
×
368
  int32_t                      lino = 0;
×
369
  SExchangeOperatorParam*      pExc = NULL;
×
370
  SExchangeOperatorBasicParam* basic = NULL;
×
371

372
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
373
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
374
  (*ppRes)->pChildren = NULL;
×
375

376
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
377
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno);
×
378

379
  pExc->multiParams = false;
×
380

381
  basic = &pExc->basic;
×
382
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
383

384
  basic->vgId = pMap->vgId;
×
385
  basic->tableSeq = false;
×
386
  basic->isVtbRefScan = true;
×
387
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
388
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno);
×
389
  basic->colMap->vgId = pMap->vgId;
×
390
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
391
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
×
392
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno);
×
393

394
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
×
395
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno);
×
396

397
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
398
  (*ppRes)->downstreamIdx = downstreamIdx;
×
399
  (*ppRes)->value = pExc;
×
400
  (*ppRes)->reUse = true;
×
401

402
  return TSDB_CODE_SUCCESS;
×
403

404
_return:
×
405
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
406
  taosMemoryFreeClear(*ppRes);
×
407
  if (basic) {
×
408
    if (basic->colMap) {
×
409
      taosArrayDestroy(basic->colMap->colMap);
×
410
      taosMemoryFreeClear(basic->colMap);
×
411
    }
412
    if (basic->uidList) {
×
413
      taosArrayDestroy(basic->uidList);
×
414
    }
415
    taosMemoryFreeClear(basic);
×
416
  }
417
  taosMemoryFreeClear(pExc);
×
418
  return code;
×
419
}
420

421
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
21,056✔
422
  int32_t code = TSDB_CODE_SUCCESS;
21,056✔
423
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
21,056!
424
  if (NULL == *ppRes) {
21,056!
425
    code = terrno;
×
426
    return code;
×
427
  }
428
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
21,056✔
429
  if (NULL == (*ppRes)->pChildren) {
21,056!
430
    code = terrno;
×
431
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
432
    *ppRes = NULL;
×
433
    return code;
×
434
  }
435
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
42,112!
436
    code = terrno;
×
437
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
438
    *ppRes = NULL;
×
439
    return code;
×
440
  }
441
  *ppChild0 = NULL;
21,056✔
442
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
42,112!
443
    code = terrno;
×
444
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
445
    *ppRes = NULL;
×
446
    return code;
×
447
  }
448
  *ppChild1 = NULL;
21,056✔
449
  
450
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
21,056!
451
  if (NULL == pJoin) {
21,056!
452
    code = terrno;
×
453
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
454
    *ppRes = NULL;
×
455
    return code;
×
456
  }
457

458
  pJoin->initDownstream = initParam;
21,056✔
459
  
460
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
21,056✔
461
  (*ppRes)->value = pJoin;
21,056✔
462
  (*ppRes)->reUse = false;
21,056✔
463

464
  return TSDB_CODE_SUCCESS;
21,056✔
465
}
466

467
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
468
  int32_t code = TSDB_CODE_SUCCESS;
×
469
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
470
  if (NULL == *ppRes) {
×
471
    code = terrno;
×
472
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
473
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
474
    return code;
×
475
  }
476
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
477
  if (NULL == *ppRes) {
×
478
    code = terrno;
×
479
    taosMemoryFreeClear(*ppRes);
×
480
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
481
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
482
    return code;
×
483
  }
484
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
485
    code = terrno;
×
486
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
487
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
488
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
489
    *ppRes = NULL;
×
490
    return code;
×
491
  }
492
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
493
    code = terrno;
×
494
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
495
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
496
    *ppRes = NULL;
×
497
    return code;
×
498
  }
499
  
500
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
501
  (*ppRes)->value = NULL;
×
502
  (*ppRes)->reUse = false;
×
503

504
  return TSDB_CODE_SUCCESS;
×
505
}
506

507
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
49✔
508
  int32_t code = TSDB_CODE_SUCCESS;
49✔
509
  int32_t vgNum = tSimpleHashGetSize(pVg);
49✔
510
  if (vgNum <= 0 || vgNum > 1) {
49!
511
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
512
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
513
  }
514

515
  int32_t iter = 0;
49✔
516
  void* p = NULL;
49✔
517
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
98✔
518
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
49✔
519
    SArray* pUidList = *(SArray**)p;
49✔
520

521
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
49✔
522
    if (code) {
49!
523
      return code;
×
524
    }
525
    taosArrayDestroy(pUidList);
49✔
526
    *(SArray**)p = NULL;
49✔
527
  }
528
  
529
  return TSDB_CODE_SUCCESS;
49✔
530
}
531

532

533
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
534
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
535
  if (NULL == pUidList) {
×
536
    return terrno;
×
537
  }
538
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
539
    return terrno;
×
540
  }
541

542
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
543
  taosArrayDestroy(pUidList);
×
544
  if (code) {
×
545
    return code;
×
546
  }
547
  
548
  return TSDB_CODE_SUCCESS;
×
549
}
550

551
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
21,056✔
552
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
21,056✔
553
  SOperatorParam*             pSrcParam0 = NULL;
21,056✔
554
  SOperatorParam*             pSrcParam1 = NULL;
21,056✔
555
  SOperatorParam*             pGcParam0 = NULL;
21,056✔
556
  SOperatorParam*             pGcParam1 = NULL;  
21,056✔
557
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
21,056✔
558
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
21,056✔
559
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
21,056✔
560
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
21,056✔
561
  int32_t                     code = TSDB_CODE_SUCCESS;
21,056✔
562

563
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
21,056✔
564
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
565

566
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
21,056!
567
  
568
  if (pInfo->stbJoin.basic.batchFetch) {
21,056✔
569
    if (pPrev->leftHash) {
21,032✔
570
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
318✔
571
      if (TSDB_CODE_SUCCESS == code) {
318!
572
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
318✔
573
      }
574
      if (TSDB_CODE_SUCCESS == code) {
318!
575
        tSimpleHashCleanup(pPrev->leftHash);
318✔
576
        tSimpleHashCleanup(pPrev->rightHash);
318✔
577
        pPrev->leftHash = NULL;
318✔
578
        pPrev->rightHash = NULL;
318✔
579
      }
580
    }
581
  } else {
582
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
24!
583
    if (TSDB_CODE_SUCCESS == code) {
24!
584
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
24!
585
    }
586
  }
587

588
  bool initParam = pSrcParam0 ? true : false;
21,056✔
589
  if (TSDB_CODE_SUCCESS == code) {
21,056!
590
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
21,056✔
591
    pSrcParam0 = NULL;
21,056✔
592
  }
593
  if (TSDB_CODE_SUCCESS == code) {
21,056!
594
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
21,056✔
595
    pSrcParam1 = NULL;
21,056✔
596
  }
597
  if (TSDB_CODE_SUCCESS == code) {
21,056!
598
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
21,056✔
599
  }
600
  if (TSDB_CODE_SUCCESS != code) {
21,056!
601
    if (pSrcParam0) {
×
602
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
603
    }
604
    if (pSrcParam1) {
×
605
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
606
    }
607
    if (pGcParam0) {
×
608
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
609
    }
610
    if (pGcParam1) {
×
611
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
612
    }
613
    if (*ppParam) {
×
614
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
615
      *ppParam = NULL;
×
616
    }
617
  }
618
  
619
  return code;
21,056✔
620
}
621

622
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
21,056✔
623
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
21,056✔
624
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
21,056✔
625
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
21,056✔
626
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
21,056✔
627
  SOperatorParam*            pParam = NULL;
21,056✔
628
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
21,056✔
629
  if (TSDB_CODE_SUCCESS != code) {
21,056!
630
    pOperator->pTaskInfo->code = code;
×
631
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
632
  }
633

634
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
21,056✔
635
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
21,056✔
636
  if (*ppRes && (code == 0)) {
21,056!
637
    code = blockDataCheck(*ppRes);
734✔
638
    if (code) {
734!
639
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
640
      pOperator->pTaskInfo->code = code;
×
641
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
642
    }
643
    pPost->isStarted = true;
734✔
644
    pStbJoin->execInfo.postBlkNum++;
734✔
645
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
734✔
646
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
734✔
647
  } else {
648
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
20,322✔
649
  }
650
}
21,056✔
651

652

653
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
654
  SOperatorParam* pGcParam = NULL;
×
655
  SOperatorParam* pMergeJoinParam = NULL;
×
656
  int32_t         downstreamId = leftTable ? 0 : 1;
×
657
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
658
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
659

660
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
661

662
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
663
  if (TSDB_CODE_SUCCESS != code) {
×
664
    return code;
×
665
  }
666
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
667
  if (TSDB_CODE_SUCCESS != code) {
×
668
    return code;
×
669
  }
670

671
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
672
}
673

674
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
21,055✔
675
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
21,055✔
676
  int32_t code = 0;
21,055✔
677
  
678
  pPost->isStarted = false;
21,055✔
679
  
680
  if (pStbJoin->basic.batchFetch) {
21,055✔
681
    return TSDB_CODE_SUCCESS;
21,031✔
682
  }
683
  
684
  if (pPost->leftNeedCache) {
24!
685
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
686
    if (num && --(*num) <= 0) {
×
687
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
688
      if (code) {
×
689
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
690
        QRY_ERR_RET(code);
×
691
      }
692
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
693
    }
694
  }
695
  
696
  if (!pPost->rightNeedCache) {
24!
697
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
24✔
698
    if (NULL != v) {
24!
699
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
700
      if (code) {
×
701
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
702
        QRY_ERR_RET(code);
×
703
      }
704
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
705
    }
706
  }
707

708
  return TSDB_CODE_SUCCESS;
24✔
709
}
710

711

712
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
713
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,063✔
714
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
1,063✔
715
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
1,063✔
716

717
  if (!pPost->isStarted) {
1,063✔
718
    return TSDB_CODE_SUCCESS;
330✔
719
  }
720
  
721
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
733✔
722
  
723
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
733✔
724
  if (NULL == *ppRes) {
733!
725
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
733!
726
    pPrev->pListHead->readIdx++;
733✔
727
  } else {
728
    pInfo->stbJoin.execInfo.postBlkNum++;
×
729
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
730
  }
731

732
  return TSDB_CODE_SUCCESS;
733✔
733
}
734

735
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
736
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
42,072✔
737
  if (NULL == ppArray) {
42,072✔
738
    SArray* pArray = taosArrayInit(10, valSize);
1,005✔
739
    if (NULL == pArray) {
1,005!
740
      return terrno;
×
741
    }
742
    if (NULL == taosArrayPush(pArray, pVal)) {
2,010!
743
      taosArrayDestroy(pArray);
×
744
      return terrno;
×
745
    }
746
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
1,005!
747
      taosArrayDestroy(pArray);      
×
748
      return terrno;
×
749
    }
750
    return TSDB_CODE_SUCCESS;
1,005✔
751
  }
752

753
  if (NULL == taosArrayPush(*ppArray, pVal)) {
82,134!
754
    return terrno;
×
755
  }
756
  
757
  return TSDB_CODE_SUCCESS;
41,067✔
758
}
759

760
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
761
  int32_t code = TSDB_CODE_SUCCESS;
24✔
762
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
24✔
763
  if (NULL == pNum) {
24!
764
    uint32_t n = 1;
24✔
765
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
24✔
766
    if (code) {
24!
767
      return code;
×
768
    }
769
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
24✔
770
    if (code) {
24!
771
      return code;
×
772
    }
773
    return TSDB_CODE_SUCCESS;
24✔
774
  }
775

776
  switch (*pNum) {
×
777
    case 0:
×
778
      break;
×
779
    case UINT32_MAX:
×
780
      *pNum = 0;
×
781
      break;
×
782
    default:
×
783
      if (1 == (*pNum)) {
×
784
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
785
        if (code) {
×
786
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
787
          QRY_ERR_RET(code);
×
788
        }
789
      }
790
      (*pNum)++;
×
791
      break;
×
792
  }
793
  
794
  return TSDB_CODE_SUCCESS;
×
795
}
796

797

798
static void freeStbJoinTableList(SStbJoinTableList* pList) {
329✔
799
  if (NULL == pList) {
329!
800
    return;
×
801
  }
802
  taosMemoryFree(pList->pLeftVg);
329!
803
  taosMemoryFree(pList->pLeftUid);
329!
804
  taosMemoryFree(pList->pRightVg);
329!
805
  taosMemoryFree(pList->pRightUid);
329!
806
  taosMemoryFree(pList);
329!
807
}
808

809
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
330✔
810
  int32_t code = TSDB_CODE_SUCCESS;
330✔
811
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
330!
812
  if (NULL == pNew) {
330!
813
    return terrno;
×
814
  }
815
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
330!
816
  if (NULL == pNew->pLeftVg) {
330!
817
    code = terrno;
×
818
    freeStbJoinTableList(pNew);
×
819
    return code;
×
820
  }
821
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
330!
822
  if (NULL == pNew->pLeftUid) {
330!
823
    code = terrno;
×
824
    freeStbJoinTableList(pNew);
×
825
    return code;
×
826
  }
827
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
330!
828
  if (NULL == pNew->pRightVg) {
330!
829
    code = terrno;
×
830
    freeStbJoinTableList(pNew);
×
831
    return code;
×
832
  }
833
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
330!
834
  if (NULL == pNew->pRightUid) {
330!
835
    code = terrno;
×
836
    freeStbJoinTableList(pNew);
×
837
    return code;
×
838
  }
839

840
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
330✔
841
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
330✔
842
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
330✔
843
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
330✔
844

845
  pNew->readIdx = 0;
330✔
846
  pNew->uidNum = rows;
330✔
847
  pNew->pNext = NULL;
330✔
848
  
849
  if (pCtx->pListTail) {
330!
850
    pCtx->pListTail->pNext = pNew;
×
851
    pCtx->pListTail = pNew;
×
852
  } else {
853
    pCtx->pListHead = pNew;
330✔
854
    pCtx->pListTail= pNew;
330✔
855
  }
856

857
  return TSDB_CODE_SUCCESS;
330✔
858
}
859

860
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
330✔
861
  int32_t                    code = TSDB_CODE_SUCCESS;
330✔
862
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
330✔
863
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
330✔
864
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
330✔
865
  if (NULL == pVg0) {
330!
866
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
867
  }
868
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
330✔
869
  if (NULL == pVg1) {
330!
870
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
871
  }
872
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
330✔
873
  if (NULL == pUid0) {
330!
874
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
875
  }
876
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
330✔
877
  if (NULL == pUid1) {
330!
878
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
879
  }
880

881
  if (pStbJoin->basic.batchFetch) {
330✔
882
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
21,354✔
883
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
21,036✔
884
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
21,036✔
885
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
21,036✔
886
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
21,036✔
887

888
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
21,036✔
889
      if (TSDB_CODE_SUCCESS != code) {
21,036!
890
        break;
×
891
      }
892
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
21,036✔
893
      if (TSDB_CODE_SUCCESS != code) {
21,036!
894
        break;
×
895
      }
896
    }
897
  } else {
898
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
36✔
899
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
24✔
900
    
901
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
24✔
902
      if (TSDB_CODE_SUCCESS != code) {
24!
903
        break;
×
904
      }
905
    }
906
  }
907

908
  if (TSDB_CODE_SUCCESS == code) {
330!
909
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
330✔
910
    if (TSDB_CODE_SUCCESS == code) {
330!
911
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
330✔
912
    }
913
  }
914

915
_return:
×
916

917
  if (TSDB_CODE_SUCCESS != code) {
330!
918
    pOperator->pTaskInfo->code = code;
×
919
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
920
  }
921
}
330✔
922

923

924
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
11,035✔
925
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,035✔
926
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,035✔
927

928
  if (pStbJoin->basic.batchFetch) {
11,035✔
929
    return;
11,035✔
930
  }
931

932
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
12!
933
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
12✔
934
    return;
12✔
935
  }
936

937
  uint64_t* pUid = NULL;
×
938
  int32_t iter = 0;
×
939
  int32_t code = 0;
×
940
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
941
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
942
    if (code) {
×
943
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
944
    }
945
  }
946

947
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
948
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
949

950
/*
951
  // debug only
952
  iter = 0;
953
  uint32_t* num = NULL;
954
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
955
    A S S E R T(*num > 1);
956
  }
957
*/  
958
}
959

960
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
11,035✔
961
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,035✔
962
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,035✔
963

964
  while (true) {
330✔
965
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
11,365✔
966
    if (NULL == pBlock) {
11,365✔
967
      break;
11,035✔
968
    }
969

970
    pStbJoin->execInfo.prevBlkNum++;
330✔
971
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
330✔
972
    
973
    doBuildStbJoinTableHash(pOperator, pBlock);
330✔
974
  }
975

976
  postProcessStbJoinTableHash(pOperator);
11,035✔
977

978
  pStbJoin->ctx.prev.joinBuild = true;
11,035✔
979
}
11,035✔
980

981
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,063✔
982
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,063✔
983
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,063✔
984
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,063✔
985
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,063✔
986

987
  while (pNode) {
21,714✔
988
    if (pNode->readIdx >= pNode->uidNum) {
21,385✔
989
      pPrev->pListHead = pNode->pNext;
329✔
990
      freeStbJoinTableList(pNode);
329✔
991
      pNode = pPrev->pListHead;
329✔
992
      continue;
329✔
993
    }
994
    
995
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
21,056✔
996
    if (*ppRes) {
21,056✔
997
      return TSDB_CODE_SUCCESS;
734✔
998
    }
999

1000
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
20,322!
1001
    pPrev->pListHead->readIdx++;
20,322✔
1002
  }
1003

1004
  *ppRes = NULL;
329✔
1005
  setOperatorCompleted(pOperator);
329✔
1006

1007
  return TSDB_CODE_SUCCESS;
329✔
1008
}
1009

1010
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
11,768✔
1011
  if (pBlock) {
11,768✔
1012
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
734!
1013
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
734✔
1014
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
734!
1015

1016
      for (int i = pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
752✔
1017
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
18✔
1018
        if (pSlot == NULL) {
18!
1019
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1020
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1021
        }
1022
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
18✔
1023
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
18✔
1024
        if (code != TSDB_CODE_SUCCESS) {
18!
1025
          return code;
×
1026
        }
1027
        code = blockDataAppendColInfo(pBlock, &colInfo);
18✔
1028
        if (code != TSDB_CODE_SUCCESS) {
18!
1029
          return code;
×
1030
        }
1031
      }
1032
    } else {
1033
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1034
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1035
    }
1036
  }
1037
  return TSDB_CODE_SUCCESS;
11,768✔
1038
}
1039

1040
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
12,044✔
1041
  int32_t                    code = TSDB_CODE_SUCCESS;
12,044✔
1042
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,044✔
1043
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
12,044✔
1044

1045
  QRY_PARAM_CHECK(pRes);
12,044!
1046
  if (pOperator->status == OP_EXEC_DONE) {
12,044✔
1047
    return code;
276✔
1048
  }
1049

1050
  int64_t st = 0;
11,768✔
1051
  if (pOperator->cost.openCost == 0) {
11,768✔
1052
    st = taosGetTimestampUs();
11,035✔
1053
  }
1054

1055
  if (!pStbJoin->ctx.prev.joinBuild) {
11,768✔
1056
    buildStbJoinTableList(pOperator);
11,035✔
1057
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
11,035✔
1058
      setOperatorCompleted(pOperator);
10,705✔
1059
      goto _return;
10,705✔
1060
    }
1061
  }
1062

1063
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
1,063!
1064
  if (*pRes) {
1,063!
1065
    goto _return;
×
1066
  }
1067

1068
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
1,063!
1069

1070
_return:
1,063✔
1071
  if (pOperator->cost.openCost == 0) {
11,768✔
1072
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
11,035✔
1073
  }
1074

1075
  if (code) {
11,768!
1076
    qError("%s failed since %s", __func__, tstrerror(code));
×
1077
    pOperator->pTaskInfo->code = code;
×
1078
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1079
  } else {
1080
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
11,768✔
1081
  }
1082
  return code;
11,768✔
1083
}
1084

1085
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
×
1086
  int32_t                   code = TSDB_CODE_SUCCESS;
×
1087
  int32_t                   lino = 0;
×
1088
  SVTableScanOperatorParam* pVScan = NULL;
×
1089
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
1090
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
1091

1092
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
1093
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
×
1094

1095
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
×
1096
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno);
×
1097
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
×
1098
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno);
×
1099
  pVScan->uid = uid;
×
1100

1101
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
×
1102
  (*ppRes)->downstreamIdx = 0;
×
1103
  (*ppRes)->value = pVScan;
×
1104
  (*ppRes)->reUse = false;
×
1105

1106
  return TSDB_CODE_SUCCESS;
×
1107
_return:
×
1108
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1109
  if (pVScan) {
×
1110
    taosArrayDestroy(pVScan->pOpParamArray);
×
1111
    taosMemoryFreeClear(pVScan);
×
1112
  }
1113
  if (*ppRes) {
×
1114
    taosArrayDestroy((*ppRes)->pChildren);
×
1115
    taosMemoryFreeClear(*ppRes);
×
1116
  }
1117
  return code;
×
1118
}
1119

1120
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
1121
  int32_t                    lino = 0;
×
1122
  SOperatorInfo*             operator=(SOperatorInfo*) param;
×
1123
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
×
1124

1125
  if (TSDB_CODE_SUCCESS != code) {
×
1126
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1127
    if (operator->pTaskInfo->code != code) {
×
1128
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1129
             tstrerror(operator->pTaskInfo->code));
1130
    } else {
1131
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1132
    }
1133
    goto _return;
×
1134
  }
1135

1136
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
×
1137
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno);
×
1138

1139
  QUERY_CHECK_CODE(tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp), lino, _return);
×
1140

1141
  taosMemoryFreeClear(pMsg->pData);
×
1142

1143
  QUERY_CHECK_CODE(tsem_post(&pScanResInfo->vtbScan.ready), lino, _return);
×
1144

1145
  return TSDB_CODE_SUCCESS;
×
1146
_return:
×
1147
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1148
  return code;
×
1149
}
1150

1151
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SReadHandle* pHandle, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
×
1152
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1153
  int32_t                    lino = 0;
×
1154
  char*                      buf1 = NULL;
×
1155
  SUseDbReq*                 pReq = NULL;
×
1156
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
×
1157

1158
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
×
1159
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
×
1160
  QUERY_CHECK_CODE(tNameGetFullDbName(name, pReq->db), lino, _return);
×
1161
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
×
1162
  buf1 = taosMemoryCalloc(1, contLen);
×
1163
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
×
1164
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
×
1165
  if (tempRes < 0) {
×
1166
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1167
  }
1168

1169
  // send the fetch remote task result request
1170
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
1171
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
×
1172

1173
  pMsgSendInfo->param = pOperator;
×
1174
  pMsgSendInfo->msgInfo.pData = buf1;
×
1175
  pMsgSendInfo->msgInfo.len = contLen;
×
1176
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
×
1177
  pMsgSendInfo->fp = dynProcessUseDbRsp;
×
1178
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
×
1179

1180
  QUERY_CHECK_CODE(asyncSendMsgToServer(pHandle->pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo), lino, _return);
×
1181

1182
  QUERY_CHECK_CODE(tsem_wait(&pScanResInfo->vtbScan.ready), lino, _return);
×
1183

1184
  QUERY_CHECK_CODE(queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp), lino, _return);
×
1185

1186
_return:
×
1187
  if (code) {
×
1188
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1189
     taosMemoryFree(buf1);
×
1190
  }
1191
  taosMemoryFree(pReq);
×
1192
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
×
1193
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
×
1194
  return code;
×
1195
}
1196

1197
int dynVgInfoComp(const void* lp, const void* rp) {
×
1198
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
×
1199
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
×
1200
  if (pLeft->hashBegin < pRight->hashBegin) {
×
1201
    return -1;
×
1202
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1203
    return 1;
×
1204
  }
1205

1206
  return 0;
×
1207
}
1208

1209
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
×
1210
  if (NULL == dbInfo) {
×
1211
    return TSDB_CODE_SUCCESS;
×
1212
  }
1213

1214
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
×
1215
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
×
1216
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
×
1217
    if (NULL == dbInfo->vgArray) {
×
1218
      return terrno;
×
1219
    }
1220

1221
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
×
1222
    while (pIter) {
×
1223
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
×
1224
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1225
        return terrno;
×
1226
      }
1227

1228
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
×
1229
    }
1230

1231
    taosArraySort(dbInfo->vgArray, sort_func);
×
1232
  }
1233

1234
  return TSDB_CODE_SUCCESS;
×
1235
}
1236

1237
int32_t dynHashValueComp(void const* lp, void const* rp) {
×
1238
  uint32_t*    key = (uint32_t*)lp;
×
1239
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
1240

1241
  if (*key < pVg->hashBegin) {
×
1242
    return -1;
×
1243
  } else if (*key > pVg->hashEnd) {
×
1244
    return 1;
×
1245
  }
1246

1247
  return 0;
×
1248
}
1249

1250
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
×
1251
  int32_t code = 0;
×
1252
  int32_t lino = 0;
×
1253
  QUERY_CHECK_CODE(dynMakeVgArraySortBy(dbInfo, dynVgInfoComp), lino, _return);
×
1254

1255
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
×
1256
  if (vgNum <= 0) {
×
1257
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1258
    QUERY_CHECK_CODE(TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1259
  }
1260

1261
  SVgroupInfo* vgInfo = NULL;
×
1262
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1263
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
×
1264
  int32_t offset = (int32_t)strlen(tbFullName);
×
1265

1266
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
×
1267
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
×
1268
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
1269

1270
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
×
1271
  if (NULL == vgInfo) {
×
1272
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1273
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1274
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1275
  }
1276

1277
  *vgId = vgInfo->vgId;
×
1278

1279
_return:
×
1280
  return code;
×
1281
}
1282

1283
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
×
1284
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1285
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1286
  SArray *                   pColList = pVtbScan->readColList;
×
1287
  if (pVtbScan->scanAllCols) {
×
1288
    return true;
×
1289
  }
1290
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
×
1291
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
×
1292
      return true;
×
1293
    }
1294
  }
1295
  return false;
×
1296
}
1297

1298
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
×
1299
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1300
  int32_t                    line = 0;
×
1301
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1302
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1303
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1304
  SReadHandle*               pHandle = &pVtbScan->readHandle;
×
1305
  SUseDbOutput*              output = NULL;
×
1306
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
×
1307

1308
  QRY_PARAM_CHECK(dbVgInfo);
×
1309

1310
  if (find == NULL) {
×
1311
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
×
1312
    QUERY_CHECK_CODE(buildDbVgInfoMap(pOperator, pHandle, name, pTaskInfo, output), line, _return);
×
1313
    QUERY_CHECK_CODE(taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES), line, _return);
×
1314
  } else {
1315
    output = *find;
×
1316
  }
1317

1318
  *dbVgInfo = output->dbVgroup;
×
1319
  return code;
×
1320
_return:
×
1321
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1322
  freeUseDbOutput(output);
×
1323
  return code;
×
1324
}
1325

1326
int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
1327
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1328
  int32_t                    line = 0;
×
1329
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1330
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1331
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1332
  SReadHandle*               pHandle = &pVtbScan->readHandle;
×
1333
  SMetaReader                mr = {0};
×
1334
  SDBVgInfo*                 dbVgInfo = NULL;
×
1335

1336
  QRY_PARAM_CHECK(pRes);
×
1337
  if (pOperator->status == OP_EXEC_DONE) {
×
1338
    return code;
×
1339
  }
1340

1341
  int64_t st = 0;
×
1342
  if (pOperator->cost.openCost == 0) {
×
1343
    st = taosGetTimestampUs();
×
1344
  }
1345

1346
  size_t num = taosArrayGetSize(pVtbScan->childTableList);
×
1347

1348
  // no child table, return
1349
  if (num == 0) {
×
1350
    setOperatorCompleted(pOperator);
×
1351
    return code;
×
1352
  }
1353

1354
  pVtbScan->orgTbVgColMap = taosHashInit(num * 64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1355
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno);
×
1356
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
1357

1358
  while (true) {
1359
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
×
1360
      QUERY_CHECK_CODE(pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes), line, _return);
×
1361
    } else {
1362
      uint64_t* id = taosArrayGet(pVtbScan->childTableList, pVtbScan->curTableIdx);
×
1363
      QUERY_CHECK_NULL(id, code, line, _return, terrno);
×
1364
      pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn);
×
1365
      QUERY_CHECK_CODE(pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id), line, _return);
×
1366

1367
      for (int32_t j = 0; j < mr.me.colRef.nCols; j++) {
×
1368
        if (mr.me.colRef.pColRef[j].hasRef && colNeedScan(pOperator, mr.me.colRef.pColRef[j].id)) {
×
1369
          SName   name = {0};
×
1370
          char    dbFname[TSDB_DB_FNAME_LEN] = {0};
×
1371
          char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
×
1372

1373
          toName(pInfo->vtbScan.acctId, mr.me.colRef.pColRef[j].refDbName, mr.me.colRef.pColRef[j].refTableName, &name);
×
1374
          QUERY_CHECK_CODE(getDbVgInfo(pOperator, &name, &dbVgInfo), line, _return);
×
1375
          QUERY_CHECK_CODE(tNameGetFullDbName(&name, dbFname), line, _return);
×
1376
          QUERY_CHECK_CODE(tNameGetFullTableName(&name, orgTbFName), line, _return);
×
1377

1378
          void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
×
1379
          if (!pVal) {
×
1380
            SOrgTbInfo map = {0};
×
1381
            QUERY_CHECK_CODE(getVgId(dbVgInfo, dbFname, &map.vgId, name.tname), line, _return);
×
1382
            tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
×
1383
            map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
×
1384
            QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno);
×
1385
            SColIdNameKV colIdNameKV = {0};
×
1386
            colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
×
1387
            tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
×
1388
            QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno);
×
1389
            QUERY_CHECK_CODE(taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map)), line, _return);
×
1390
          } else {
1391
            SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
×
1392
            SColIdNameKV colIdNameKV = {0};
×
1393
            colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
×
1394
            tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
×
1395
            QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno);
×
1396
          }
1397
        }
1398
      }
1399

1400
      pVtbScan->vtbScanParam = NULL;
×
1401
      QUERY_CHECK_CODE(buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, *id), line, _return);
×
1402

1403
      void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
×
1404
      while (pIter != NULL) {
×
1405
        SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
×
1406
        SOperatorParam*  pExchangeParam = NULL;
×
1407
        QUERY_CHECK_CODE(buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap), line, _return);
×
1408
        QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno);
×
1409
        pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
×
1410
      }
1411
      pHandle->api.metaReaderFn.clearReader(&mr);
×
1412

1413
      // reset downstream operator's status
1414
      pOperator->pDownstream[0]->status = OP_NOT_OPENED;
×
1415
      QUERY_CHECK_CODE(pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes), line, _return);
×
1416
    }
1417

1418
    if (*pRes) {
×
1419
      // has result, still read data from this table.
1420
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
×
1421
      break;
×
1422
    } else {
1423
      // no result, read next table.
1424
      pVtbScan->curTableIdx++;
×
1425
      if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
×
1426
        setOperatorCompleted(pOperator);
×
1427
        break;
×
1428
      }
1429
    }
1430
  }
1431

1432
_return:
×
1433
  taosHashCleanup(pVtbScan->orgTbVgColMap);
×
1434
  pVtbScan->orgTbVgColMap = NULL;
×
1435
  if (pOperator->cost.openCost == 0) {
×
1436
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
1437
  }
1438

1439
  if (code) {
×
1440
    qError("%s failed since %s", __func__, tstrerror(code));
×
1441
    pOperator->pTaskInfo->code = code;
×
1442
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1443
  }
1444

1445
  return code;
×
1446
}
1447

1448
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
11,035✔
1449
  if (batchFetch) {
11,035✔
1450
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
11,023✔
1451
    if (NULL == pPrev->leftHash) {
11,023!
1452
      return terrno;
×
1453
    }
1454
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
11,023✔
1455
    if (NULL == pPrev->rightHash) {
11,023!
1456
      return terrno;
×
1457
    }
1458
  } else {
1459
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
1460
    if (NULL == pPrev->leftCache) {
12!
1461
      return terrno;
×
1462
    }
1463
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
1464
    if (NULL == pPrev->rightCache) {
12!
1465
      return terrno;
×
1466
    }
1467
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
1468
    if (NULL == pPrev->onceTable) {
12!
1469
      return terrno;
×
1470
    }
1471
  }
1472

1473
  return TSDB_CODE_SUCCESS;
11,035✔
1474
}
1475

1476
static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorInfo* pInfo, SReadHandle* pHandle,
×
1477
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
1478
  int32_t      code = TSDB_CODE_SUCCESS;
×
1479
  int32_t      line = 0;
×
1480

1481
  QUERY_CHECK_CODE(tsem_init(&pInfo->vtbScan.ready, 0, 0), line, _return);
×
1482

1483
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
×
1484
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
×
1485
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
×
1486
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
×
1487
  pInfo->vtbScan.readHandle = *pHandle;
×
1488
  pInfo->vtbScan.curTableIdx = 0;
×
1489
  pInfo->vtbScan.lastTableIdx = -1;
×
1490

1491
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
×
1492
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno);
×
1493

1494
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
×
1495
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
×
1496
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno);
×
1497
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno);
×
1498
  }
1499

1500
  pInfo->vtbScan.childTableList = taosArrayInit(10, sizeof(uint64_t));
×
1501
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno);
×
1502
  QUERY_CHECK_CODE(pHandle->api.metaFn.getChildTableList(pHandle->vnode, pInfo->vtbScan.suid, pInfo->vtbScan.childTableList), line, _return);
×
1503

1504
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1505
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno);
×
1506

1507
  return code;
×
1508
_return:
×
1509
  // no need to destroy array and hashmap allocated in this function,
1510
  // since the operator's destroy function will take care of it
1511
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1512
  return code;
×
1513
}
1514

1515
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
11,035✔
1516
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1517
                                       SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
1518
  QRY_PARAM_CHECK(pOptrInfo);
11,035!
1519

1520
  int32_t                    code = TSDB_CODE_SUCCESS;
11,035✔
1521
  __optr_fn_t                nextFp = NULL;
11,035✔
1522
  SOperatorInfo*             pOperator = NULL;
11,035✔
1523
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
11,035!
1524
  if (pInfo == NULL) {
11,035!
1525
    code = terrno;
×
1526
    goto _error;
×
1527
  }
1528

1529
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
11,035!
1530
  if (pOperator == NULL) {
11,035!
1531
    code = terrno;
×
1532
    goto _error;
×
1533
  }
1534

1535
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
11,035✔
1536

1537
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
11,035✔
1538
  if (TSDB_CODE_SUCCESS != code) {
11,035!
1539
    goto _error;
×
1540
  }
1541

1542
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
11,035✔
1543
                  pInfo, pTaskInfo);
1544

1545
  pInfo->qType = pPhyciNode->qType;
11,035✔
1546
  switch (pInfo->qType) {
11,035!
1547
    case DYN_QTYPE_STB_HASH:
11,035✔
1548
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
11,035✔
1549
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
11,035✔
1550
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
11,035✔
1551
      if (TSDB_CODE_SUCCESS != code) {
11,035!
1552
        goto _error;
×
1553
      }
1554
      nextFp = seqStableJoin;
11,035✔
1555
      break;
11,035✔
1556
    case DYN_QTYPE_VTB_SCAN:
×
1557
      QUERY_CHECK_CODE(initVtbScanInfo(pOperator, pInfo, pHandle, pPhyciNode, pTaskInfo), code, _error);
×
1558
      nextFp = vtbScan;
×
1559
      break;
×
1560
    default:
×
1561
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1562
      code = TSDB_CODE_INVALID_PARA;
×
1563
      goto _error;
×
1564
  }
1565

1566
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
11,035✔
1567
                                         NULL, optrDefaultGetNextExtFn, NULL);
1568

1569
  *pOptrInfo = pOperator;
11,035✔
1570
  return TSDB_CODE_SUCCESS;
11,035✔
1571

1572
_error:
×
1573
  if (pInfo != NULL) {
×
1574
    destroyDynQueryCtrlOperator(pInfo);
×
1575
  }
1576

1577
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1578
  pTaskInfo->code = code;
×
1579
  return code;
×
1580
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc