• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4881

14 Dec 2025 03:48AM UTC coverage: 60.617% (+0.5%) from 60.092%
#4881

push

travis-ci

web-flow
test: update coverage workflow time (#33918)

156854 of 258761 relevant lines covered (60.62%)

75258957.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.85
/source/libs/executor/src/hashjoinoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "hashjoin.h"
29
#include "functionMgt.h"
30

31

32
bool hJoinBlkReachThreshold(SHJoinOperatorInfo* pInfo, int64_t blkRows) {
418,357✔
33
  if (INT64_MAX == pInfo->ctx.limit || pInfo->pFinFilter != NULL) {
418,357✔
34
    return blkRows >= pInfo->blkThreshold;
418,357✔
35
  }
36
  
37
  return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.limit;
×
38
}
39

40
int32_t hJoinHandleMidRemains(SHJoinOperatorInfo* pJoin, SHJoinCtx* pCtx) {
×
41
  TSWAP(pJoin->midBlk, pJoin->finBlk);
×
42

43
  pCtx->midRemains = false;
×
44

45
  return TSDB_CODE_SUCCESS;
×
46
}
47

48
int32_t hJoinCopyMergeMidBlk(SHJoinCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
×
49
  SSDataBlock* pLess = *ppMid;
×
50
  SSDataBlock* pMore = *ppFin;
×
51

52
/*
53
  if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
54
    pLess = (*ppMid);
55
    pMore = (*ppFin);
56
  } else {
57
    pLess = (*ppFin);
58
    pMore = (*ppMid);
59
  }
60
*/
61

62
  int32_t totalRows = pMore->info.rows + pLess->info.rows;
×
63
  if (totalRows <= pMore->info.capacity) {
×
64
    HJ_ERR_RET(blockDataMerge(pMore, pLess));
×
65
    blockDataCleanup(pLess);
×
66
    pCtx->midRemains = false;
×
67
  } else {
68
    int32_t copyRows = pMore->info.capacity - pMore->info.rows;
×
69
    HJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
×
70
    blockDataShrinkNRows(pLess, copyRows);
×
71
    pCtx->midRemains = true;
×
72
  }
73

74
/*
75
  if (pMore != (*ppFin)) {
76
    TSWAP(*ppMid, *ppFin);
77
  }
78
*/
79

80
  return TSDB_CODE_SUCCESS;
×
81
}
82

83
int32_t hJoinSetImplFp(SHJoinOperatorInfo* pJoin) {
216,989✔
84
  switch (pJoin->joinType) {
216,989✔
85
    case JOIN_TYPE_INNER:
216,989✔
86
      pJoin->joinFp = hInnerJoinDo;
216,989✔
87
      break;
216,989✔
88
    case JOIN_TYPE_LEFT:
×
89
    case JOIN_TYPE_RIGHT: {
90
      switch (pJoin->subType) {
×
91
        case JOIN_STYPE_OUTER:          
×
92
          //pJoin->joinFp = hLeftJoinDo; TOOPEN
93
          break;
×
94
        default:
×
95
          break;
×
96
      }
97
      break;
×
98
    }      
99
    default:
×
100
      break;
×
101
  }
102

103
  return TSDB_CODE_SUCCESS;
216,989✔
104
}
105

106

107
int32_t hJoinLaunchPrimExpr(SSDataBlock* pBlock, SHJoinTableCtx* pTable, int32_t startIdx, int32_t endIdx) {
835,447✔
108
  if (NULL == pTable->primExpr) {
835,447✔
109
    return TSDB_CODE_SUCCESS;
835,447✔
110
  }
111

112
  SHJoinPrimExprCtx* pCtx = &pTable->primCtx;
×
113
  SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
×
114
  SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
×
115
  if (0 != pCtx->timezoneUnit) {
×
116
    for (int32_t i = startIdx; i <= endIdx; ++i) {
×
117
      ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit;
×
118
    }
119
  } else {
120
    for (int32_t i = startIdx; i <= endIdx; ++i) {
×
121
      ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
×
122
    }
123
  }
124

125
  return TSDB_CODE_SUCCESS;
×
126
}
127

128

129
static int64_t hJoinGetSingleKeyRowsNum(SBufRowInfo* pRow) {
×
130
  int64_t rows = 0;
×
131
  while (pRow) {
×
132
    rows++;
×
133
    pRow = pRow->next;
×
134
  }
135
  return rows;
×
136
}
137

138
static int64_t hJoinGetRowsNumOfKeyHash(SSHashObj* pHash) {
×
139
  SGroupData* pGroup = NULL;
×
140
  int32_t iter = 0;
×
141
  int64_t rowsNum = 0;
×
142
  
143
  while (NULL != (pGroup = tSimpleHashIterate(pHash, pGroup, &iter))) {
×
144
    int32_t* pKey = tSimpleHashGetKey(pGroup, NULL);
×
145
    int64_t rows = hJoinGetSingleKeyRowsNum(pGroup->rows);
×
146
    //qTrace("build_key:%d, rows:%" PRId64, *pKey, rows);
147
    rowsNum += rows;
×
148
  }
149

150
  return rowsNum;
×
151
}
152

153
static int32_t hJoinInitKeyColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) {
433,978✔
154
  pTable->keyNum = LIST_LENGTH(pList);
433,978✔
155
  
156
  pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo));
433,978✔
157
  if (NULL == pTable->keyCols) {
433,978✔
158
    return terrno;
×
159
  }
160

161
  int64_t bufSize = 0;
433,978✔
162
  int32_t i = 0;
433,978✔
163
  SNode* pNode = NULL;
433,978✔
164
  FOREACH(pNode, pList) {
874,348✔
165
    SColumnNode* pColNode = (SColumnNode*)pNode;
440,370✔
166
    pTable->keyCols[i].srcSlot = pColNode->slotId;
440,370✔
167
    pTable->keyCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
440,370✔
168
    pTable->keyCols[i].bytes = pColNode->node.resType.bytes;
440,370✔
169
    bufSize += pColNode->node.resType.bytes;
440,370✔
170
    ++i;
440,370✔
171
  }  
172

173
  if (pTable->keyNum > 1) {
433,978✔
174
    pTable->keyBuf = taosMemoryMalloc(bufSize);
6,392✔
175
    if (NULL == pTable->keyBuf) {
6,392✔
176
      return terrno;
×
177
    }
178
  }
179

180
  return TSDB_CODE_SUCCESS;
433,978✔
181
}
182

183
static void hJoinGetValColsNum(SNodeList* pList, int32_t blkId, int32_t* colNum) {
433,978✔
184
  *colNum = 0;
433,978✔
185
  
186
  SNode* pNode = NULL;
433,978✔
187
  FOREACH(pNode, pList) {
2,169,890✔
188
    STargetNode* pTarget = (STargetNode*)pNode;
1,735,912✔
189
    SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
1,735,912✔
190
    if (pCol->dataBlockId == blkId) {
1,735,912✔
191
      (*colNum)++;
867,956✔
192
    }
193
  }
194
}
433,978✔
195

196
static bool hJoinIsValColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) {
867,956✔
197
  for (int32_t i = 0; i < keyNum; ++i) {
1,748,696✔
198
    if (pKeys[i].srcSlot == slotId) {
880,740✔
199
      *pKeyIdx = i;
×
200
      return true;
×
201
    }
202
  }
203

204
  return false;
867,956✔
205
}
206

207
static int32_t hJoinInitValColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) {
433,978✔
208
  hJoinGetValColsNum(pList, pTable->blkId, &pTable->valNum);
433,978✔
209
  if (pTable->valNum == 0) {
433,978✔
210
    return TSDB_CODE_SUCCESS;
×
211
  }
212
  
213
  pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SHJoinColInfo));
433,978✔
214
  if (NULL == pTable->valCols) {
433,978✔
215
    return terrno;
×
216
  }
217

218
  int32_t i = 0;
433,978✔
219
  int32_t colNum = 0;
433,978✔
220
  SNode* pNode = NULL;
433,978✔
221
  FOREACH(pNode, pList) {
2,169,890✔
222
    STargetNode* pTarget = (STargetNode*)pNode;
1,735,912✔
223
    SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr;
1,735,912✔
224
    if (pColNode->dataBlockId == pTable->blkId) {
1,735,912✔
225
      if (hJoinIsValColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].srcSlot)) {
867,956✔
226
        pTable->valCols[i].keyCol = true;
×
227
      } else {
228
        pTable->valCols[i].keyCol = false;
867,956✔
229
        pTable->valCols[i].srcSlot = pColNode->slotId;
867,956✔
230
        pTable->valColExist = true;
867,956✔
231
        colNum++;
867,956✔
232
      }
233
      pTable->valCols[i].dstSlot = pTarget->slotId;
867,956✔
234
      pTable->valCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
867,956✔
235
      if (pTable->valCols[i].vardata && !pTable->valCols[i].keyCol) {
867,956✔
236
        if (NULL == pTable->valVarCols) {
×
237
          pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t));
×
238
          if (NULL == pTable->valVarCols) {
×
239
            return terrno;
×
240
          }
241
        }
242
        if (NULL == taosArrayPush(pTable->valVarCols, &i)) {
×
243
          return terrno;
×
244
        }
245
      }
246
      pTable->valCols[i].bytes = pColNode->node.resType.bytes;
867,956✔
247
      if (!pTable->valCols[i].keyCol && !pTable->valCols[i].vardata) {
867,956✔
248
        pTable->valBufSize += pColNode->node.resType.bytes;
867,956✔
249
      }
250
      i++;
867,956✔
251
    }
252
  }
253

254
  pTable->valBitMapSize = BitmapLen(colNum);
433,978✔
255
  pTable->valBufSize += pTable->valBitMapSize;
433,978✔
256

257
  return TSDB_CODE_SUCCESS;
433,978✔
258
}
259

260
static int32_t hJoinInitPrimKeyInfo(SHJoinTableCtx* pTable, int32_t slotId) {
433,978✔
261
  pTable->primCol = taosMemoryMalloc(sizeof(SHJoinColMap));
433,978✔
262
  if (NULL == pTable->primCol) {
433,978✔
263
    return terrno;
×
264
  }
265

266
  pTable->primCol->srcSlot = slotId;
433,978✔
267

268
  return TSDB_CODE_SUCCESS;
433,978✔
269
}
270

271

272
static int32_t hJoinInitPrimExprCtx(SNode* pNode, SHJoinPrimExprCtx* pCtx, SHJoinTableCtx* pTable) {
433,978✔
273
  if (NULL == pNode) {
433,978✔
274
    pCtx->targetSlotId = pTable->primCol->srcSlot;
433,978✔
275
    return TSDB_CODE_SUCCESS;
433,978✔
276
  }
277
  
278
  if (QUERY_NODE_TARGET != nodeType(pNode)) {
×
279
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
280
  }  
281

282
  STargetNode* pTarget = (STargetNode*)pNode;
×
283
  if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr)) {
×
284
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
285
  }
286

287
  SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
×
288
  if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
×
289
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
290
  }
291

292
  if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) {
×
293
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
294
  }
295

296
  SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
297
  SValueNode* pCurrTz = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2) : NULL;
×
298
  SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
×
299

300
  pCtx->truncateUnit = pUnit->typeData;
×
301
  if ((NULL == pCurrTz || 1 == pCurrTz->typeData) && pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) {
×
302
    pCtx->timezoneUnit = offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
×
303
  }
304

305
  pCtx->targetSlotId = pTarget->slotId;
×
306

307
  return TSDB_CODE_SUCCESS;
×
308
}
309

310

311
static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
433,978✔
312
  SNodeList* pKeyList = NULL;
433,978✔
313
  SHJoinTableCtx* pTable = &pJoin->tbs[idx];
433,978✔
314
  pTable->downStream = pDownstream[idx];
433,978✔
315
  pTable->blkId = pDownstream[idx]->resultDataBlockId;
433,978✔
316
  if (0 == idx) {
433,978✔
317
    pKeyList = pJoinNode->pOnLeft;
216,989✔
318
    pTable->hasTimeRange = pJoinNode->timeRangeTarget & 0x1;
216,989✔
319
  } else {
320
    pKeyList = pJoinNode->pOnRight;
216,989✔
321
    pTable->hasTimeRange = pJoinNode->timeRangeTarget & 0x2;
216,989✔
322
  }
323

324
  HJ_ERR_RET(hJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
433,978✔
325
  
326
  int32_t code = hJoinInitKeyColsInfo(pTable, pKeyList);
433,978✔
327
  if (code) {
433,978✔
328
    return code;
×
329
  }
330
  code = hJoinInitValColsInfo(pTable, pJoinNode->pTargets);
433,978✔
331
  if (code) {
433,978✔
332
    return code;
×
333
  }
334

335
  TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat));
433,978✔
336

337
  HJ_ERR_RET(hJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
433,978✔
338

339
  return TSDB_CODE_SUCCESS;
433,978✔
340
}
341

342
static void hJoinSetBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
216,989✔
343
  int32_t buildIdx = 0;
216,989✔
344
  int32_t probeIdx = 1;
216,989✔
345

346
  pInfo->joinType = pJoinNode->joinType;
216,989✔
347
  pInfo->subType = pJoinNode->subType;
216,989✔
348
  
349
  switch (pInfo->joinType) {
216,989✔
350
    case JOIN_TYPE_INNER:
216,989✔
351
    case JOIN_TYPE_FULL:
352
      if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) {
216,989✔
353
        buildIdx = 0;
216,989✔
354
        probeIdx = 1;
216,989✔
355
      } else {
356
        buildIdx = 1;
×
357
        probeIdx = 0;
×
358
      }
359
      break;
216,989✔
360
    case JOIN_TYPE_LEFT:
×
361
      buildIdx = 1;
×
362
      probeIdx = 0;
×
363
      break;
×
364
    case JOIN_TYPE_RIGHT:
×
365
      buildIdx = 0;
×
366
      probeIdx = 1;
×
367
      break;
×
368
    default:
×
369
      break;
×
370
  } 
371
  
372
  pInfo->pBuild = &pInfo->tbs[buildIdx];
216,989✔
373
  pInfo->pProbe = &pInfo->tbs[probeIdx];
216,989✔
374
  
375
  pInfo->pBuild->downStreamIdx = buildIdx;
216,989✔
376
  pInfo->pProbe->downStreamIdx = probeIdx;
216,989✔
377

378
  if (0 == buildIdx) {
216,989✔
379
    pInfo->pBuild->primExpr = pJoinNode->leftPrimExpr;
216,989✔
380
    pInfo->pProbe->primExpr = pJoinNode->rightPrimExpr;
216,989✔
381
  } else {
382
    pInfo->pBuild->primExpr = pJoinNode->rightPrimExpr;
×
383
    pInfo->pProbe->primExpr = pJoinNode->leftPrimExpr;
×
384
  }
385
}
216,989✔
386

387
static int32_t hJoinBuildResColsMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
216,989✔
388
  pInfo->pResColNum = pJoinNode->pTargets->length;
216,989✔
389
  pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t));
216,989✔
390
  if (NULL == pInfo->pResColMap) {
216,989✔
391
    return terrno;
×
392
  }
393
  
394
  SNode* pNode = NULL;
216,989✔
395
  int32_t i = 0;
216,989✔
396
  FOREACH(pNode, pJoinNode->pTargets) {
1,084,945✔
397
    STargetNode* pTarget = (STargetNode*)pNode;
867,956✔
398
    SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
867,956✔
399
    if (pCol->dataBlockId == pInfo->pBuild->blkId) {
867,956✔
400
      pInfo->pResColMap[i] = 1;
433,978✔
401
    }
402
    
403
    i++;
867,956✔
404
  }
405

406
  return TSDB_CODE_SUCCESS;
216,989✔
407
}
408

409

410
static FORCE_INLINE int32_t hJoinAddPageToBufs(SArray* pRowBufs) {
411
  SBufPageInfo page;
×
412
  page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE;
216,989✔
413
  page.offset = 0;
216,989✔
414
  page.data = taosMemoryMalloc(page.pageSize);
216,989✔
415
  if (NULL == page.data) {
216,989✔
416
    return terrno;
×
417
  }
418

419
  if (NULL == taosArrayPush(pRowBufs, &page)) {
216,989✔
420
    return terrno;
×
421
  }
422
  return TSDB_CODE_SUCCESS;
216,989✔
423
}
424

425
static int32_t hJoinInitBufPages(SHJoinOperatorInfo* pInfo) {
216,989✔
426
  pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo));
216,989✔
427
  if (NULL == pInfo->pRowBufs) {
216,989✔
428
    return terrno;
×
429
  }
430

431
  return hJoinAddPageToBufs(pInfo->pRowBufs);
433,978✔
432
}
433

434
static void hJoinFreeTableInfo(SHJoinTableCtx* pTable) {
433,978✔
435
  taosMemoryFreeClear(pTable->keyCols);
433,978✔
436
  taosMemoryFreeClear(pTable->keyBuf);
433,978✔
437
  taosMemoryFreeClear(pTable->valCols);
433,978✔
438
  taosArrayDestroy(pTable->valVarCols);
433,978✔
439
  taosMemoryFree(pTable->primCol);
433,978✔
440
}
433,978✔
441

442
static void hJoinFreeBufPage(void* param) {
216,989✔
443
  SBufPageInfo* pInfo = (SBufPageInfo*)param;
216,989✔
444
  taosMemoryFree(pInfo->data);
216,989✔
445
}
216,989✔
446

447
static void hJoinDestroyKeyHash(SSHashObj** ppHash) {
433,978✔
448
  if (NULL == ppHash || NULL == (*ppHash)) {
433,978✔
449
    return;
216,989✔
450
  }
451

452
  void*   pIte = NULL;
216,989✔
453
  int32_t iter = 0;
216,989✔
454
  while ((pIte = tSimpleHashIterate(*ppHash, pIte, &iter)) != NULL) {
1,036,024✔
455
    SGroupData* pGroup = pIte;
819,035✔
456
    SBufRowInfo* pRow = pGroup->rows;
819,035✔
457
    SBufRowInfo* pNext = NULL;
819,035✔
458
    while (pRow) {
1,659,926✔
459
      pNext = pRow->next;
840,891✔
460
      taosMemoryFree(pRow);
840,891✔
461
      pRow = pNext;
840,891✔
462
    }
463
  }
464

465
  tSimpleHashCleanup(*ppHash);
216,989✔
466
  *ppHash = NULL;
216,989✔
467
}
468

469
static FORCE_INLINE int32_t hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow, char** ppData) {
470
  *ppData = NULL;
1,564,334✔
471
  
472
  if ((uint16_t)-1 == pRow->pageId) {
782,167✔
473
    return TSDB_CODE_SUCCESS;
×
474
  }
475
  SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId);
782,167✔
476
  if (NULL == pPage) {
782,167✔
477
    qError("fail to get %d page, total:%d", pRow->pageId, (int32_t)taosArrayGetSize(pRowBufs));
×
478
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
479
  }
480
  
481
  *ppData = pPage->data + pRow->offset;
782,167✔
482

483
  return TSDB_CODE_SUCCESS;
782,167✔
484
}
485

486
static int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) {
140,826✔
487
  SHJoinTableCtx* pBuild = pJoin->pBuild;
140,826✔
488
  SHJoinTableCtx* pProbe = pJoin->pProbe;
140,826✔
489
  int32_t buildIdx = 0, buildValIdx = 0;
140,826✔
490
  int32_t probeIdx = 0;
140,826✔
491
  SBufRowInfo* pRow = pStart;
140,826✔
492
  int32_t code = 0;
140,826✔
493
  char* pData = NULL;
140,826✔
494

495
  for (int32_t r = 0; r < rowNum; ++r) {
922,993✔
496
    HJ_ERR_RET(hJoinRetrieveColDataFromRowBufs(pJoin->pRowBufs, pRow, &pData));
1,564,334✔
497
    
498
    char* pValData = pData + pBuild->valBitMapSize;
782,167✔
499
    char* pKeyData = pProbe->keyData;
782,167✔
500
    buildIdx = buildValIdx = probeIdx = 0;
782,167✔
501
    for (int32_t i = 0; i < pJoin->pResColNum; ++i) {
3,910,835✔
502
      if (pJoin->pResColMap[i]) {
3,128,668✔
503
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot);
1,564,334✔
504
        if (pBuild->valCols[buildIdx].keyCol) {
1,564,334✔
505
          code = colDataSetVal(pDst, pRes->info.rows + r, pKeyData, false);
×
506
          if (code) {
×
507
            return code;
×
508
          }
509
          pKeyData += pBuild->valCols[buildIdx].vardata ? varDataTLen(pKeyData) : pBuild->valCols[buildIdx].bytes;
×
510
        } else {
511
          if (BMIsNull(pData, buildValIdx)) {
1,564,334✔
512
            code = colDataSetVal(pDst, pRes->info.rows + r, NULL, true);
×
513
            if (code) {
×
514
              return code;
×
515
            }
516
          } else {
517
            code = colDataSetVal(pDst, pRes->info.rows + r, pValData, false);
1,564,334✔
518
            if (code) {
1,564,334✔
519
              return code;
×
520
            }
521
            pValData += pBuild->valCols[buildIdx].vardata ? varDataTLen(pValData) : pBuild->valCols[buildIdx].bytes;
1,564,334✔
522
          }
523
          buildValIdx++;
1,564,334✔
524
        }
525
        buildIdx++;
1,564,334✔
526
      } else if (0 == r) {
1,564,334✔
527
        SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
281,652✔
528
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
281,652✔
529
    
530
        code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeStartIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeStartIdx));
563,304✔
531
        if (code) {
281,652✔
532
          return code;
×
533
        }
534
        probeIdx++;
281,652✔
535
      }
536
    }
537
    pRow = pRow->next;
782,167✔
538
  }
539

540
  return TSDB_CODE_SUCCESS;
140,826✔
541
}
542

543
int32_t hJoinCopyNMatchRowsToBlock(SHJoinOperatorInfo* pJoin, SSDataBlock* pRes, int32_t startIdx, int32_t rows) {
×
544
  SHJoinTableCtx* pBuild = pJoin->pBuild;
×
545
  SHJoinTableCtx* pProbe = pJoin->pProbe;
×
546
  int32_t buildIdx = 0;
×
547
  int32_t probeIdx = 0;
×
548
  int32_t code = 0;
×
549

550
  for (int32_t i = 0; i < pJoin->pResColNum; ++i) {
×
551
    if (pJoin->pResColMap[i]) {
×
552
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot);
×
553
      colDataSetNItemsNull(pDst, pRes->info.rows, rows);
×
554

555
      buildIdx++;
×
556
    } else {
557
      SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
×
558
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
×
559

560
      QRY_ERR_RET(colDataAssignNRows(pDst, pRes->info.rows, pSrc, startIdx, rows));
×
561

562
      probeIdx++;
×
563
    }
564
  }
565

566
  pRes->info.rows += rows;
×
567

568
  return TSDB_CODE_SUCCESS;
×
569
}
570

571

572

573
void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) {
140,826✔
574
  SHJoinOperatorInfo* pJoin = pOperator->info;
140,826✔
575
  SHJoinCtx* pCtx = &pJoin->ctx;
140,826✔
576
  SBufRowInfo* pStart = pCtx->pBuildRow;
140,826✔
577
  int32_t rowNum = 0;
140,826✔
578
  int32_t resNum = pRes->info.rows;
140,826✔
579
  
580
  while (pCtx->pBuildRow && (resNum < pRes->info.capacity)) {
922,993✔
581
    rowNum++;
782,167✔
582
    resNum++;
782,167✔
583
    pCtx->pBuildRow = pCtx->pBuildRow->next;
782,167✔
584
  }
585

586
  pJoin->execInfo.resRows += rowNum;
140,826✔
587

588
  int32_t code = hJoinCopyResRowsToBlock(pJoin, rowNum, pStart, pRes);
140,826✔
589
  if (code) {
140,826✔
590
    pOperator->pTaskInfo->code = code;
×
591
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
592
  }
593

594
  pRes->info.rows = resNum;
140,826✔
595
  *allFetched = pCtx->pBuildRow ? false : true;
140,826✔
596
}
140,826✔
597

598

599
bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) {
1,335,906✔
600
  char *pData = NULL;
1,335,906✔
601
  size_t bufLen = 0;
1,335,906✔
602
  
603
  if (1 == pTable->keyNum) {
1,335,906✔
604
    if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) {
2,637,040✔
605
      return true;
636✔
606
    }
607
    if (pTable->keyCols[0].vardata) {
1,317,884✔
608
      pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
292,410✔
609
      bufLen = varDataTLen(pData);
292,410✔
610
    } else {
611
      pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
1,025,474✔
612
      bufLen = pTable->keyCols[0].bytes;
1,025,474✔
613
    }
614
    pTable->keyData = pData;
1,317,884✔
615
  } else {
616
    for (int32_t i = 0; i < pTable->keyNum; ++i) {
52,158✔
617
      if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) {
69,544✔
618
        return true;
×
619
      }
620
      if (pTable->keyCols[i].vardata) {
34,772✔
621
        pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
12,612✔
622
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
12,612✔
623
        bufLen += varDataTLen(pData);
12,612✔
624
      } else {
625
        pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
22,160✔
626
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
22,160✔
627
        bufLen += pTable->keyCols[i].bytes;
22,160✔
628
      }
629
    }
630
    pTable->keyData = pTable->keyBuf;
17,386✔
631
  }
632

633
  if (pBufLen) {
1,335,270✔
634
    *pBufLen = bufLen;
1,335,270✔
635
  }
636

637
  return false;
1,335,270✔
638
}
639

640
static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) {
835,447✔
641
  for (int32_t i = 0; i < pTable->keyNum; ++i) {
1,679,547✔
642
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
844,100✔
643
    if (NULL == pCol) {
844,100✔
644
      qError("fail to get %d col, total:%d", pTable->keyCols[i].srcSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
×
645
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
646
    }
647
    if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type))  {
844,100✔
648
      qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata);
×
649
      return TSDB_CODE_INVALID_PARA;
×
650
    }
651
    if (pTable->keyCols[i].bytes != pCol->info.bytes)  {
844,100✔
652
      qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes, pTable->keyCols[i].bytes);
×
653
      return TSDB_CODE_INVALID_PARA;
×
654
    }
655
    pTable->keyCols[i].data = pCol->pData;
844,100✔
656
    if (pTable->keyCols[i].vardata) {
844,100✔
657
      pTable->keyCols[i].offset = pCol->varmeta.offset;
199,743✔
658
    }
659
    pTable->keyCols[i].colData = pCol;
844,100✔
660
  }
661

662
  return TSDB_CODE_SUCCESS;
835,447✔
663
}
664

665
static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) {
1,259,248✔
666
  if (!pTable->valColExist) {
1,259,248✔
667
    return TSDB_CODE_SUCCESS;
×
668
  }
669
  for (int32_t i = 0; i < pTable->valNum; ++i) {
3,777,744✔
670
    if (pTable->valCols[i].keyCol) {
2,518,496✔
671
      continue;
×
672
    }
673
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->valCols[i].srcSlot);
2,518,496✔
674
    if (NULL == pCol) {
2,518,496✔
675
      qError("fail to get %d col, total:%d", pTable->valCols[i].srcSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
×
676
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
677
    }
678
    if (pTable->valCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type))  {
2,518,496✔
679
      qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->valCols[i].srcSlot, pCol->info.type, pTable->valCols[i].vardata);
×
680
      return TSDB_CODE_INVALID_PARA;
×
681
    }
682
    if (pTable->valCols[i].bytes != pCol->info.bytes)  {
2,518,496✔
683
      qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->valCols[i].srcSlot, pCol->info.bytes, pTable->valCols[i].bytes);
×
684
      return TSDB_CODE_INVALID_PARA;
×
685
    }
686
    if (!pTable->valCols[i].vardata) {
2,518,496✔
687
      pTable->valCols[i].bitMap = pCol->nullbitmap;
2,518,496✔
688
    }
689
    pTable->valCols[i].data = pCol->pData;
2,518,496✔
690
    if (pTable->valCols[i].vardata) {
2,518,496✔
691
      pTable->valCols[i].offset = pCol->varmeta.offset;
×
692
    }
693
  }
694

695
  return TSDB_CODE_SUCCESS;
1,259,248✔
696
}
697

698

699

700
static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx) {
701
  if (!pTable->valColExist) {
840,891✔
702
    return;
×
703
  }
704

705
  char *pData = NULL;
840,891✔
706
  size_t bufLen = pTable->valBitMapSize;
840,891✔
707
  TAOS_MEMSET(pTable->valData, 0, pTable->valBitMapSize);
840,891✔
708
  for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) {
2,522,673✔
709
    if (pTable->valCols[i].keyCol) {
1,681,782✔
710
      continue;
×
711
    }
712
    if (pTable->valCols[i].vardata) {
1,681,782✔
713
      if (-1 == pTable->valCols[i].offset[rowIdx]) {
×
714
        colDataSetNull_f(pTable->valData, m);
×
715
      } else {
716
        pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx];
×
717
        TAOS_MEMCPY(pTable->valData + bufLen, pData, varDataTLen(pData));
×
718
        bufLen += varDataTLen(pData);
×
719
      }
720
    } else {
721
      if (BMIsNull(pTable->valCols[i].bitMap, rowIdx)) {
1,681,782✔
722
        colDataSetNull_f(pTable->valData, m);
×
723
      } else {
724
        pData = pTable->valCols[i].data + pTable->valCols[i].bytes * rowIdx;
1,681,782✔
725
        TAOS_MEMCPY(pTable->valData + bufLen, pData, pTable->valCols[i].bytes);
1,681,782✔
726
        bufLen += pTable->valCols[i].bytes;
1,681,782✔
727
      }
728
    }
729
    m++;
1,681,782✔
730
  }
731
}
732

733

734
static FORCE_INLINE int32_t hJoinGetValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) {
735
  if (0 == bufSize) {
840,891✔
736
    pRow->pageId = -1;
×
737
    return TSDB_CODE_SUCCESS;
×
738
  }
739

740
  if (bufSize > HASH_JOIN_DEFAULT_PAGE_SIZE) {
840,891✔
741
    qError("invalid join value buf size:%d", bufSize);
×
742
    return TSDB_CODE_INVALID_PARA;
×
743
  }
744
  
745
  do {
×
746
    SBufPageInfo* page = taosArrayGetLast(pPages);
840,891✔
747
    if ((page->pageSize - page->offset) >= bufSize) {
840,891✔
748
      *pBuf = page->data + page->offset;
840,891✔
749
      pRow->pageId = taosArrayGetSize(pPages) - 1;
840,891✔
750
      pRow->offset = page->offset;
840,891✔
751
      page->offset += bufSize;
840,891✔
752
      return TSDB_CODE_SUCCESS;
840,891✔
753
    }
754

755
    int32_t code = hJoinAddPageToBufs(pPages);
×
756
    if (code) {
×
757
      return code;
×
758
    }
759
  } while (true);
760
}
761

762
static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableCtx* pTable, int32_t rowIdx) {
763
  if (NULL == pTable->valVarCols) {
840,891✔
764
    return pTable->valBufSize;
840,891✔
765
  }
766

767
  int32_t* varColIdx = NULL;
×
768
  int32_t bufLen = pTable->valBufSize;
×
769
  int32_t varColNum = taosArrayGetSize(pTable->valVarCols);
×
770
  for (int32_t i = 0; i < varColNum; ++i) {
×
771
    varColIdx = taosArrayGet(pTable->valVarCols, i);
×
772
    char* pData = pTable->valCols[*varColIdx].data + pTable->valCols[*varColIdx].offset[rowIdx];
×
773
    bufLen += varDataTLen(pData);
×
774
  }
775

776
  return bufLen;
×
777
}
778

779

780
static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableCtx* pTable, size_t keyLen, int32_t rowIdx) {
840,891✔
781
  SGroupData group = {0};
840,891✔
782
  SBufRowInfo* pRow = NULL;
840,891✔
783

784
  if (NULL == pGroup) {
840,891✔
785
    group.rows = taosMemoryMalloc(sizeof(SBufRowInfo));
819,035✔
786
    if (NULL == group.rows) {
819,035✔
787
      return terrno;
×
788
    }
789
    pRow = group.rows;
819,035✔
790
  } else {
791
    pRow = taosMemoryMalloc(sizeof(SBufRowInfo));
21,856✔
792
    if (NULL == pRow) {
21,856✔
793
      return terrno;
×
794
    }
795
  }
796

797
  int32_t code = hJoinGetValBufFromPages(pJoin->pRowBufs, hJoinGetValBufSize(pTable, rowIdx), &pTable->valData, pRow);
1,681,782✔
798
  if (code) {
840,891✔
799
    taosMemoryFree(pRow);
×
800
    return code;
×
801
  }
802

803
  if (NULL == pGroup) {
840,891✔
804
    pRow->next = NULL;
819,035✔
805
    if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) {
819,035✔
806
      taosMemoryFree(pRow);
×
807
      return terrno;
×
808
    }
809
  } else {
810
    pRow->next = pGroup->rows;
21,856✔
811
    pGroup->rows = pRow;
21,856✔
812
  }
813

814
  return TSDB_CODE_SUCCESS;
840,891✔
815
}
816

817
static int32_t hJoinAddRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) {
840,891✔
818
  SHJoinTableCtx* pBuild = pJoin->pBuild;
840,891✔
819
  int32_t code = hJoinSetValColsData(pBlock, pBuild);
840,891✔
820
  if (code) {
840,891✔
821
    return code;
×
822
  }
823

824
  SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyData, keyLen);
840,891✔
825
  code = hJoinAddRowToHashImpl(pJoin, pGroup, pBuild, keyLen, rowIdx);
840,891✔
826
  if (code) {
840,891✔
827
    return code;
×
828
  }
829
  
830
  hJoinCopyValColsDataToBuf(pBuild, rowIdx);
831

832
  return TSDB_CODE_SUCCESS;
840,891✔
833
}
834

835
static bool hJoinFilterTimeRange(SSDataBlock* pBlock, STimeWindow* pRange, int32_t primSlot, int32_t* startIdx, int32_t* endIdx) {
×
836
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, primSlot);
×
837
  if (NULL == pCol) {
×
838
    qError("hash join can't get prim col, slot:%d, slotNum:%d", primSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
×
839
    return false;
×
840
  }
841

842
  TSKEY skey = *(TSKEY*)colDataGetData(pCol, 0);
×
843
  TSKEY ekey = *(TSKEY*)colDataGetData(pCol, (pBlock->info.rows - 1));
×
844

845
  if (ekey < pRange->skey || skey > pRange->ekey) {
×
846
    return false;
×
847
  }
848

849
  if (skey >= pRange->skey && ekey <= pRange->ekey) {
×
850
    *startIdx = 0;
×
851
    *endIdx = pBlock->info.rows - 1;
×
852
    return true;
×
853
  }
854

855
  if (skey < pRange->skey && ekey > pRange->ekey) {
×
856
    TSKEY *pStart = (TSKEY*)taosbsearch(&pRange->skey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_GE);
×
857
    TSKEY *pEnd = (TSKEY*)taosbsearch(&pRange->ekey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_LE);
×
858
    *startIdx = ((uint64_t)pStart - (uint64_t)pCol->pData) / sizeof(int64_t);
×
859
    *endIdx = ((uint64_t)pEnd - (uint64_t)pCol->pData) / sizeof(int64_t);
×
860
    return true;
×
861
  }
862

863
  if (skey >= pRange->skey) {
×
864
    TSKEY *pEnd = (TSKEY*)taosbsearch(&pRange->ekey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_LE);
×
865
    *startIdx = 0;
×
866
    *endIdx = ((uint64_t)pEnd - (uint64_t)pCol->pData) / sizeof(int64_t);
×
867
    return true;
×
868
  }
869

870
  TSKEY *pStart = (TSKEY*)taosbsearch(&pRange->skey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_GE);
×
871
  *startIdx = ((uint64_t)pStart - (uint64_t)pCol->pData) / sizeof(int64_t);
×
872
  *endIdx = pBlock->info.rows - 1;
×
873
  
874
  return true;
×
875
}
876

877
static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) {
417,090✔
878
  SHJoinTableCtx* pBuild = pJoin->pBuild;
417,090✔
879
  int32_t startIdx = 0, endIdx = pBlock->info.rows - 1;
417,090✔
880
  if (pBuild->hasTimeRange && !hJoinFilterTimeRange(pBlock, &pJoin->tblTimeRange, pBuild->primCol->srcSlot, &startIdx, &endIdx)) {
417,090✔
881
    return TSDB_CODE_SUCCESS;
×
882
  }
883

884
  HJ_ERR_RET(hJoinLaunchPrimExpr(pBlock, pBuild, startIdx, endIdx));
417,090✔
885

886
  int32_t code = hJoinSetKeyColsData(pBlock, pBuild);
417,090✔
887
  if (code) {
417,090✔
888
    return code;
×
889
  }
890

891
  size_t bufLen = 0;
417,090✔
892
  for (int32_t i = startIdx; i <= endIdx; ++i) {
1,258,429✔
893
    if (hJoinCopyKeyColsDataToBuf(pBuild, i, &bufLen)) {
841,339✔
894
      continue;
448✔
895
    }
896
    code = hJoinAddRowToHash(pJoin, pBlock, bufLen, i);
840,891✔
897
    if (code) {
840,891✔
898
      return code;
×
899
    }
900
  }
901

902
  return code;
417,090✔
903
}
904

905
static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator, bool* queryDone) {
216,989✔
906
  SHJoinOperatorInfo* pJoin = pOperator->info;
216,989✔
907
  SSDataBlock*        pBlock = NULL;
216,989✔
908
  int32_t             code = TSDB_CODE_SUCCESS;
216,989✔
909

910
  while (true) {
911
    pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx);
634,079✔
912
    if (NULL == pBlock) {
634,079✔
913
      break;
216,989✔
914
    }
915

916
    pJoin->execInfo.buildBlkNum++;
417,090✔
917
    pJoin->execInfo.buildBlkRows += pBlock->info.rows;
417,090✔
918

919
    code = hJoinAddBlockRowsToHash(pBlock, pJoin);
417,090✔
920
    if (code) {
417,090✔
921
      return code;
×
922
    }
923
  }
924

925
  if (IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && tSimpleHashGetSize(pJoin->pKeyHash) <= 0) {
216,989✔
926
    hJoinSetDone(pOperator);
2,290✔
927
    *queryDone = true;
2,290✔
928
  }
929
  
930
  //qTrace("build table rows:%" PRId64, hJoinGetRowsNumOfKeyHash(pJoin->pKeyHash));
931

932
  return TSDB_CODE_SUCCESS;
216,989✔
933
}
934

935
static int32_t hJoinPrepareStart(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) {
418,357✔
936
  SHJoinOperatorInfo* pJoin = pOperator->info;
418,357✔
937
  SHJoinTableCtx* pProbe = pJoin->pProbe;
418,357✔
938
  int32_t startIdx = 0, endIdx = pBlock->info.rows - 1;
418,357✔
939
  if (pProbe->hasTimeRange && !hJoinFilterTimeRange(pBlock, &pJoin->tblTimeRange, pProbe->primCol->srcSlot, &startIdx, &endIdx)) {
418,357✔
940
    if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) {
×
941
      pJoin->ctx.probeEndIdx = -1;
×
942
      pJoin->ctx.probePostIdx = 0;
×
943
      pJoin->ctx.pProbeData = pBlock;
×
944
      pJoin->ctx.rowRemains = true;
×
945
      pJoin->ctx.probePhase = E_JOIN_PHASE_POST;
×
946
      
947
      HJ_ERR_RET((*pJoin->joinFp)(pOperator));
×
948
    }
949
    
950
    return TSDB_CODE_SUCCESS;
×
951
  }
952

953
  HJ_ERR_RET(hJoinLaunchPrimExpr(pBlock, pProbe, startIdx, endIdx));
418,357✔
954

955
  int32_t code = hJoinSetKeyColsData(pBlock, pProbe);
418,357✔
956
  if (code) {
418,357✔
957
    return code;
×
958
  }
959
  code = hJoinSetValColsData(pBlock, pProbe);
418,357✔
960
  if (code) {
418,357✔
961
    return code;
×
962
  }
963

964
  pJoin->ctx.probeStartIdx = startIdx;
418,357✔
965
  pJoin->ctx.probeEndIdx = endIdx;
418,357✔
966
  pJoin->ctx.pBuildRow = NULL;
418,357✔
967
  pJoin->ctx.pProbeData = pBlock;
418,357✔
968
  pJoin->ctx.rowRemains = true;
418,357✔
969
  pJoin->ctx.probePreIdx = 0;
418,357✔
970
  pJoin->ctx.probePostIdx = endIdx + 1;
418,357✔
971

972
  if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && startIdx > 0) {
418,357✔
973
    pJoin->ctx.probePhase = E_JOIN_PHASE_PRE;
×
974
  } else {
975
    pJoin->ctx.probePhase = E_JOIN_PHASE_CUR;
418,357✔
976
  }
977

978
  HJ_ERR_RET((*pJoin->joinFp)(pOperator));
418,357✔
979

980
  return TSDB_CODE_SUCCESS;
418,357✔
981
}
982

983
void hJoinSetDone(struct SOperatorInfo* pOperator) {
216,989✔
984
  setOperatorCompleted(pOperator);
216,989✔
985

986
  SHJoinOperatorInfo* pInfo = pOperator->info;
216,989✔
987
  hJoinDestroyKeyHash(&pInfo->pKeyHash);
216,989✔
988

989
  qDebug("hash Join done");  
216,989✔
990
}
216,989✔
991

992
static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
259,251✔
993
  SHJoinOperatorInfo* pJoin = pOperator->info;
259,251✔
994
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
259,251✔
995
  int32_t             code = TSDB_CODE_SUCCESS;
259,251✔
996
  int32_t             lino = 0;
259,251✔
997
  SSDataBlock*        pRes = pJoin->finBlk;
259,251✔
998
  int64_t             st = 0;
259,251✔
999

1000
  QRY_PARAM_CHECK(pResBlock);
259,251✔
1001
  if (pOperator->cost.openCost == 0) {
259,251✔
1002
    st = taosGetTimestampUs();
216,989✔
1003
  }
1004

1005
  if (pOperator->status == OP_EXEC_DONE) {
259,251✔
1006
    pRes->info.rows = 0;
42,262✔
1007
    goto _end;
42,262✔
1008
  }
1009

1010
  if (!pJoin->keyHashBuilt) {
216,989✔
1011
    pJoin->keyHashBuilt = true;
216,989✔
1012

1013
    bool queryDone = false;
216,989✔
1014
    code = hJoinBuildHash(pOperator, &queryDone);
216,989✔
1015
    QUERY_CHECK_CODE(code, lino, _end);
216,989✔
1016

1017
    if (queryDone) {
216,989✔
1018
      goto _end;
2,290✔
1019
    }
1020
  }
1021

1022
  blockDataCleanup(pRes);
214,699✔
1023

1024
  if (pJoin->ctx.rowRemains) {
214,699✔
1025
    code = (*pJoin->joinFp)(pOperator);
×
1026
    QUERY_CHECK_CODE(code, lino, _end);
×
1027

1028
    if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
×
1029
      code = doFilter(pRes, pJoin->pFinFilter, NULL, NULL);
×
1030
      QUERY_CHECK_CODE(code, lino, _end);
×
1031
    }
1032

1033
    if (pRes->info.rows > 0) {
×
1034
      *pResBlock = pRes;
×
1035
      return code;
×
1036
    }
1037
  }
1038

1039
  while (true) {
418,357✔
1040
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pJoin->pProbe->downStreamIdx);
633,056✔
1041
    if (NULL == pBlock) {
633,056✔
1042
      hJoinSetDone(pOperator);
214,699✔
1043
      break;
214,699✔
1044
    }
1045

1046
    pJoin->execInfo.probeBlkNum++;
418,357✔
1047
    pJoin->execInfo.probeBlkRows += pBlock->info.rows;
418,357✔
1048

1049
    code = hJoinPrepareStart(pOperator, pBlock);
418,357✔
1050
    QUERY_CHECK_CODE(code, lino, _end);
418,357✔
1051

1052
    if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) {
418,357✔
1053
      continue;
418,357✔
1054
    }
1055

1056
    if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
×
1057
      code = doFilter(pRes, pJoin->pFinFilter, NULL, NULL);
×
1058
      QUERY_CHECK_CODE(code, lino, _end);
×
1059
    }
1060

1061
    if (pRes->info.rows > 0) {
×
1062
      break;
×
1063
    }
1064
  }
1065

1066
_end:
259,251✔
1067
  if (pOperator->cost.openCost == 0) {
259,251✔
1068
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
216,989✔
1069
  }
1070
  if (code != TSDB_CODE_SUCCESS) {
259,251✔
1071
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1072
    pTaskInfo->code = code;
×
1073
    T_LONG_JMP(pTaskInfo->env, code);
×
1074
  }
1075
  if (pRes->info.rows > 0) {
259,251✔
1076
    *pResBlock = pRes;
42,262✔
1077
  }
1078

1079
  return code;
259,251✔
1080
}
1081

1082
static void destroyHashJoinOperator(void* param) {
216,989✔
1083
  SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param;
216,989✔
1084
  qDebug("hashJoin exec info, buildBlk:%" PRId64 ", buildRows:%" PRId64 ", probeBlk:%" PRId64 ", probeRows:%" PRId64 ", resRows:%" PRId64, 
216,989✔
1085
         pJoinOperator->execInfo.buildBlkNum, pJoinOperator->execInfo.buildBlkRows, pJoinOperator->execInfo.probeBlkNum, 
1086
         pJoinOperator->execInfo.probeBlkRows, pJoinOperator->execInfo.resRows);
1087

1088
  hJoinDestroyKeyHash(&pJoinOperator->pKeyHash);
216,989✔
1089

1090
  hJoinFreeTableInfo(&pJoinOperator->tbs[0]);
216,989✔
1091
  hJoinFreeTableInfo(&pJoinOperator->tbs[1]);
216,989✔
1092
  blockDataDestroy(pJoinOperator->finBlk);
216,989✔
1093
  pJoinOperator->finBlk = NULL;
216,989✔
1094
  taosMemoryFreeClear(pJoinOperator->pResColMap);
216,989✔
1095
  taosArrayDestroyEx(pJoinOperator->pRowBufs, hJoinFreeBufPage);
216,989✔
1096

1097
  taosMemoryFreeClear(param);
216,989✔
1098
}
216,989✔
1099

1100
int32_t hJoinHandleConds(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
216,989✔
1101
  switch (pJoin->joinType) {
216,989✔
1102
    case JOIN_TYPE_INNER: {
216,989✔
1103
      SNode* pCond = NULL;
216,989✔
1104
      if (pJoinNode->pFullOnCond != NULL) {
216,989✔
1105
        if (pJoinNode->node.pConditions != NULL) {
×
1106
          HJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions));
×
1107
        }
1108
        pCond = pJoinNode->pFullOnCond;
×
1109
      } else if (pJoinNode->node.pConditions != NULL) {
216,989✔
1110
        pCond = pJoinNode->node.pConditions;
×
1111
      }
1112

1113
      HJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0, pTaskInfo->pStreamRuntimeInfo));
216,989✔
1114
      break;
216,989✔
1115
    }
1116
    case JOIN_TYPE_LEFT:
×
1117
    case JOIN_TYPE_RIGHT:
1118
    case JOIN_TYPE_FULL:
1119
      if (pJoinNode->pFullOnCond != NULL) {
×
1120
        HJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pPreFilter, 0,
×
1121
                                      pTaskInfo->pStreamRuntimeInfo));
1122
      }
1123
      if (pJoinNode->node.pConditions != NULL) {
×
1124
        HJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0,
×
1125
                                      pTaskInfo->pStreamRuntimeInfo));
1126
      }
1127
      break;
×
1128
    default:
×
1129
      break;
×
1130
  }
1131

1132
  return TSDB_CODE_SUCCESS;
216,989✔
1133
}
1134

1135
static uint32_t hJoinGetFinBlkCapacity(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) {
216,989✔
1136
  uint32_t maxRows = TMAX(HJOIN_DEFAULT_BLK_ROWS_NUM, HJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
216,989✔
1137
  if (INT64_MAX != pJoin->ctx.limit && NULL == pJoin->pFinFilter) {
216,989✔
1138
    uint32_t limitMaxRows = pJoin->ctx.limit / HJOIN_BLK_THRESHOLD_RATIO + 1;
×
1139
    return (maxRows > limitMaxRows) ? limitMaxRows : maxRows;
×
1140
  }
1141

1142
  return maxRows;
216,989✔
1143
}
1144

1145

1146
int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) {
216,989✔
1147
  pJoin->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
216,989✔
1148
  if (NULL == pJoin->finBlk) {
216,989✔
1149
    QRY_ERR_RET(terrno);
×
1150
  }
1151

1152
  int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode));
216,989✔
1153
  if (TSDB_CODE_SUCCESS != code) {
216,989✔
1154
    QRY_ERR_RET(code);
×
1155
  }
1156
  
1157
  if (NULL != pJoin->pPreFilter) {
216,989✔
1158
    pJoin->midBlk = NULL;
×
1159
    code = createOneDataBlock(pJoin->finBlk, false, &pJoin->midBlk);
×
1160
    if (code) {
×
1161
      QRY_ERR_RET(code);
×
1162
    }
1163

1164
    code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity);
×
1165
    if (TSDB_CODE_SUCCESS != code) {
×
1166
      QRY_ERR_RET(code);
×
1167
    }
1168
  }
1169

1170
  pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO;
216,989✔
1171
  return TSDB_CODE_SUCCESS;
216,989✔
1172
}
1173

1174
static int32_t resetHashJoinOperState(SOperatorInfo* pOper) {
×
1175
  SHJoinOperatorInfo* pHjOper = pOper->info;
×
1176
  pHjOper->keyHashBuilt = false;
×
1177
  blockDataCleanup(pHjOper->midBlk);
×
1178
  blockDataCleanup(pHjOper->finBlk);
×
1179
  pOper->status = OP_NOT_OPENED;
×
1180

1181
  pHjOper->execInfo = (SHJoinExecInfo){0};
×
1182

1183
  void*   pIte = NULL;
×
1184
  int32_t iter = 0;
×
1185
  while ((pIte = tSimpleHashIterate(pHjOper->pKeyHash, pIte, &iter)) != NULL) {
×
1186
    SGroupData* pGroup = pIte;
×
1187
    SBufRowInfo* pRow = pGroup->rows;
×
1188
    SBufRowInfo* pNext = NULL;
×
1189
    while (pRow) {
×
1190
      pNext = pRow->next;
×
1191
      taosMemoryFree(pRow);
×
1192
      pRow = pNext;
×
1193
    }
1194
  }
1195
  tSimpleHashCleanup(pHjOper->pKeyHash);
×
1196
  size_t hashCap = pHjOper->pBuild->inputStat.inputRowNum > 0 ? (pHjOper->pBuild->inputStat.inputRowNum * 1.5) : 1024;
×
1197
  pHjOper->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
1198
  if (pHjOper->pKeyHash == NULL) {
×
1199
    return terrno; 
×
1200
  }
1201
  taosArrayDestroyEx(pHjOper->pRowBufs, hJoinFreeBufPage);
×
1202
  int32_t code = hJoinInitBufPages(pHjOper);
×
1203
  int64_t limit = pHjOper->ctx.limit;
×
1204
  pHjOper->ctx = (SHJoinCtx){0};
×
1205
  pHjOper->ctx.limit = limit;
×
1206
  return code;
×
1207
}
1208

1209
int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
216,989✔
1210
                                           SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1211
  QRY_PARAM_CHECK(pOptrInfo);
216,989✔
1212

1213
  int32_t             code = TSDB_CODE_SUCCESS;
216,989✔
1214
  SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo));
216,989✔
1215
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
216,989✔
1216
  if (pOperator == NULL || pInfo == NULL) {
216,989✔
1217
    code = terrno;
×
1218
    goto _return;
×
1219
  }
1220

1221
  pInfo->tblTimeRange.skey = pJoinNode->timeRange.skey;
216,989✔
1222
  pInfo->tblTimeRange.ekey = pJoinNode->timeRange.ekey;
216,989✔
1223
  
1224
  pInfo->ctx.limit = (pJoinNode->node.pLimit && ((SLimitNode*)pJoinNode->node.pLimit)->limit) ? ((SLimitNode*)pJoinNode->node.pLimit)->limit->datum.i : INT64_MAX;
216,989✔
1225

1226
  setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
216,989✔
1227

1228
  HJ_ERR_JRET(hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]));
216,989✔
1229
  HJ_ERR_JRET(hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]));
216,989✔
1230

1231
  hJoinSetBuildAndProbeTable(pInfo, pJoinNode);
216,989✔
1232
  
1233
  HJ_ERR_JRET(hJoinBuildResColsMap(pInfo, pJoinNode));
216,989✔
1234

1235
  HJ_ERR_JRET(hJoinInitBufPages(pInfo));
216,989✔
1236

1237
  size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024;
216,989✔
1238
  pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
216,989✔
1239
  if (pInfo->pKeyHash == NULL) {
216,989✔
1240
    code = terrno;
×
1241
    goto _return;
×
1242
  }
1243

1244
  HJ_ERR_JRET(hJoinHandleConds(pInfo, pJoinNode, pTaskInfo));
216,989✔
1245

1246
  HJ_ERR_JRET(hJoinInitResBlocks(pInfo, pJoinNode));
216,989✔
1247

1248
  HJ_ERR_JRET(hJoinSetImplFp(pInfo));
216,989✔
1249

1250
  HJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
216,989✔
1251

1252
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hJoinMainProcess, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
216,989✔
1253
  setOperatorResetStateFn(pOperator, resetHashJoinOperState);
216,989✔
1254

1255
  qDebug("create hash Join operator done");
216,989✔
1256

1257
  *pOptrInfo = pOperator;
216,989✔
1258
  return TSDB_CODE_SUCCESS;
216,989✔
1259

1260
_return:
×
1261

1262
  if (pInfo != NULL) {
×
1263
    destroyHashJoinOperator(pInfo);
×
1264
  }
1265
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1266
  pTaskInfo->code = code;
×
1267
  return code;
×
1268
}
1269

1270

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc