• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4895

23 Dec 2025 01:08PM UTC coverage: 65.513% (-0.2%) from 65.72%
#4895

push

travis-ci

web-flow
fix: mem leak (#34023)

6 of 9 new or added lines in 1 file covered. (66.67%)

7770 existing lines in 123 files now uncovered.

184705 of 281937 relevant lines covered (65.51%)

112009834.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.75
/source/libs/executor/src/hashjoinoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "hashjoin.h"
29
#include "functionMgt.h"
30

31

32
bool hJoinBlkReachThreshold(SHJoinOperatorInfo* pInfo, int64_t blkRows) {
1,009,113✔
33
  if (INT64_MAX == pInfo->ctx.limit || pInfo->pFinFilter != NULL) {
1,009,113✔
34
    return blkRows >= pInfo->blkThreshold;
1,009,113✔
35
  }
36
  
37
  return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.limit;
×
38
}
39

40
int32_t hJoinHandleMidRemains(SHJoinOperatorInfo* pJoin, SHJoinCtx* pCtx) {
×
41
  TSWAP(pJoin->midBlk, pJoin->finBlk);
×
42

43
  pCtx->midRemains = false;
×
44

45
  return TSDB_CODE_SUCCESS;
×
46
}
47

48
int32_t hJoinCopyMergeMidBlk(SHJoinCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
×
49
  SSDataBlock* pLess = *ppMid;
×
50
  SSDataBlock* pMore = *ppFin;
×
51

52
/*
53
  if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
54
    pLess = (*ppMid);
55
    pMore = (*ppFin);
56
  } else {
57
    pLess = (*ppFin);
58
    pMore = (*ppMid);
59
  }
60
*/
61

62
  int32_t totalRows = pMore->info.rows + pLess->info.rows;
×
63
  if (totalRows <= pMore->info.capacity) {
×
64
    HJ_ERR_RET(blockDataMerge(pMore, pLess));
×
65
    blockDataCleanup(pLess);
×
66
    pCtx->midRemains = false;
×
67
  } else {
68
    int32_t copyRows = pMore->info.capacity - pMore->info.rows;
×
69
    HJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
×
70
    blockDataShrinkNRows(pLess, copyRows);
×
71
    pCtx->midRemains = true;
×
72
  }
73

74
/*
75
  if (pMore != (*ppFin)) {
76
    TSWAP(*ppMid, *ppFin);
77
  }
78
*/
79

80
  return TSDB_CODE_SUCCESS;
×
81
}
82

83
int32_t hJoinSetImplFp(SHJoinOperatorInfo* pJoin) {
523,164✔
84
  switch (pJoin->joinType) {
523,164✔
85
    case JOIN_TYPE_INNER:
523,164✔
86
      pJoin->joinFp = hInnerJoinDo;
523,164✔
87
      break;
523,164✔
88
    case JOIN_TYPE_LEFT:
×
89
    case JOIN_TYPE_RIGHT: {
90
      switch (pJoin->subType) {
×
91
        case JOIN_STYPE_OUTER:          
×
92
          //pJoin->joinFp = hLeftJoinDo; TOOPEN
93
          break;
×
94
        default:
×
95
          break;
×
96
      }
97
      break;
×
98
    }      
99
    default:
×
100
      break;
×
101
  }
102

103
  return TSDB_CODE_SUCCESS;
523,164✔
104
}
105

106

107
int32_t hJoinLaunchPrimExpr(SSDataBlock* pBlock, SHJoinTableCtx* pTable, int32_t startIdx, int32_t endIdx) {
2,018,819✔
108
  if (NULL == pTable->primExpr) {
2,018,819✔
109
    return TSDB_CODE_SUCCESS;
2,018,819✔
110
  }
111

112
  SHJoinPrimExprCtx* pCtx = &pTable->primCtx;
×
113
  SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
×
114
  SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
×
115
  if (0 != pCtx->timezoneUnit) {
×
116
    for (int32_t i = startIdx; i <= endIdx; ++i) {
×
117
      ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit;
×
118
    }
119
  } else {
120
    for (int32_t i = startIdx; i <= endIdx; ++i) {
×
121
      ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
×
122
    }
123
  }
124

125
  return TSDB_CODE_SUCCESS;
×
126
}
127

128

129
static int64_t hJoinGetSingleKeyRowsNum(SBufRowInfo* pRow) {
×
130
  int64_t rows = 0;
×
131
  while (pRow) {
×
132
    rows++;
×
133
    pRow = pRow->next;
×
134
  }
135
  return rows;
×
136
}
137

138
static int64_t hJoinGetRowsNumOfKeyHash(SSHashObj* pHash) {
×
139
  SGroupData* pGroup = NULL;
×
140
  int32_t iter = 0;
×
141
  int64_t rowsNum = 0;
×
142
  
143
  while (NULL != (pGroup = tSimpleHashIterate(pHash, pGroup, &iter))) {
×
144
    int32_t* pKey = tSimpleHashGetKey(pGroup, NULL);
×
145
    int64_t rows = hJoinGetSingleKeyRowsNum(pGroup->rows);
×
146
    //qTrace("build_key:%d, rows:%" PRId64, *pKey, rows);
147
    rowsNum += rows;
×
148
  }
149

150
  return rowsNum;
×
151
}
152

153
static int32_t hJoinInitKeyColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) {
1,046,328✔
154
  pTable->keyNum = LIST_LENGTH(pList);
1,046,328✔
155
  
156
  pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo));
1,046,328✔
157
  if (NULL == pTable->keyCols) {
1,046,328✔
158
    return terrno;
×
159
  }
160

161
  int64_t bufSize = 0;
1,046,328✔
162
  int32_t i = 0;
1,046,328✔
163
  SNode* pNode = NULL;
1,046,328✔
164
  FOREACH(pNode, pList) {
2,106,796✔
165
    SColumnNode* pColNode = (SColumnNode*)pNode;
1,060,468✔
166
    pTable->keyCols[i].srcSlot = pColNode->slotId;
1,060,468✔
167
    pTable->keyCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
1,060,468✔
168
    pTable->keyCols[i].bytes = pColNode->node.resType.bytes;
1,060,468✔
169
    bufSize += pColNode->node.resType.bytes;
1,060,468✔
170
    ++i;
1,060,468✔
171
  }  
172

173
  if (pTable->keyNum > 1) {
1,046,328✔
174
    pTable->keyBuf = taosMemoryMalloc(bufSize);
14,140✔
175
    if (NULL == pTable->keyBuf) {
14,140✔
176
      return terrno;
×
177
    }
178
  }
179

180
  return TSDB_CODE_SUCCESS;
1,046,328✔
181
}
182

183
static void hJoinGetValColsNum(SNodeList* pList, int32_t blkId, int32_t* colNum) {
1,046,328✔
184
  *colNum = 0;
1,046,328✔
185
  
186
  SNode* pNode = NULL;
1,046,328✔
187
  FOREACH(pNode, pList) {
5,246,872✔
188
    STargetNode* pTarget = (STargetNode*)pNode;
4,200,544✔
189
    SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
4,200,544✔
190
    if (pCol->dataBlockId == blkId) {
4,200,544✔
191
      (*colNum)++;
2,100,272✔
192
    }
193
  }
194
}
1,046,328✔
195

196
static bool hJoinIsValColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) {
2,100,272✔
197
  for (int32_t i = 0; i < keyNum; ++i) {
4,234,536✔
198
    if (pKeys[i].srcSlot == slotId) {
2,136,168✔
199
      *pKeyIdx = i;
1,904✔
200
      return true;
1,904✔
201
    }
202
  }
203

204
  *pKeyIdx = -1;
2,098,368✔
205
  return false;
2,098,368✔
206
}
207

208
static int32_t hJoinInitValColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) {
1,046,328✔
209
  hJoinGetValColsNum(pList, pTable->blkId, &pTable->valNum);
1,046,328✔
210
  if (pTable->valNum == 0) {
1,046,328✔
UNCOV
211
    return TSDB_CODE_SUCCESS;
×
212
  }
213
  
214
  pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SHJoinColInfo));
1,046,328✔
215
  if (NULL == pTable->valCols) {
1,046,328✔
UNCOV
216
    return terrno;
×
217
  }
218

219
  int32_t i = 0;
1,046,328✔
220
  int32_t colNum = 0;
1,046,328✔
221
  SNode* pNode = NULL;
1,046,328✔
222
  FOREACH(pNode, pList) {
5,246,872✔
223
    STargetNode* pTarget = (STargetNode*)pNode;
4,200,544✔
224
    SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr;
4,200,544✔
225
    if (pColNode->dataBlockId == pTable->blkId) {
4,200,544✔
226
      if (!hJoinIsValColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].keyColIdx)) {
2,100,272✔
227
        pTable->valColExist = true;
2,098,368✔
228
        colNum++;
2,098,368✔
229
      }
230
      pTable->valCols[i].srcSlot = pColNode->slotId;
2,100,272✔
231
      pTable->valCols[i].dstSlot = pTarget->slotId;
2,100,272✔
232
      pTable->valCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
2,100,272✔
233
      if (pTable->valCols[i].vardata && !IS_HASH_JOIN_KEY_COL(pTable->valCols[i].keyColIdx)) {
2,100,272✔
234
        if (NULL == pTable->valVarCols) {
2,856✔
235
          pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t));
952✔
236
          if (NULL == pTable->valVarCols) {
952✔
237
            return terrno;
×
238
          }
239
        }
240
        if (NULL == taosArrayPush(pTable->valVarCols, &i)) {
5,712✔
UNCOV
241
          return terrno;
×
242
        }
243
      }
244
      pTable->valCols[i].bytes = pColNode->node.resType.bytes;
2,100,272✔
245
      if (!IS_HASH_JOIN_KEY_COL(pTable->valCols[i].keyColIdx) && !pTable->valCols[i].vardata) {
2,100,272✔
246
        pTable->valBufSize += pColNode->node.resType.bytes;
2,095,512✔
247
      }
248
      i++;
2,100,272✔
249
    }
250
  }
251

252
  pTable->valBitMapSize = BitmapLen(colNum);
1,046,328✔
253
  pTable->valBufSize += pTable->valBitMapSize;
1,046,328✔
254

255
  return TSDB_CODE_SUCCESS;
1,046,328✔
256
}
257

258
static int32_t hJoinInitPrimKeyInfo(SHJoinTableCtx* pTable, int32_t slotId) {
1,046,328✔
259
  pTable->primCol = taosMemoryMalloc(sizeof(SHJoinColMap));
1,046,328✔
260
  if (NULL == pTable->primCol) {
1,046,328✔
UNCOV
261
    return terrno;
×
262
  }
263

264
  pTable->primCol->srcSlot = slotId;
1,046,328✔
265

266
  return TSDB_CODE_SUCCESS;
1,046,328✔
267
}
268

269

270
static int32_t hJoinInitPrimExprCtx(SNode* pNode, SHJoinPrimExprCtx* pCtx, SHJoinTableCtx* pTable) {
1,046,328✔
271
  if (NULL == pNode) {
1,046,328✔
272
    pCtx->targetSlotId = pTable->primCol->srcSlot;
1,046,328✔
273
    return TSDB_CODE_SUCCESS;
1,046,328✔
274
  }
275
  
UNCOV
276
  if (QUERY_NODE_TARGET != nodeType(pNode)) {
×
UNCOV
277
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
278
  }  
279

UNCOV
280
  STargetNode* pTarget = (STargetNode*)pNode;
×
UNCOV
281
  if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr)) {
×
282
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
283
  }
284

UNCOV
285
  SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
×
UNCOV
286
  if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
×
287
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
288
  }
289

UNCOV
290
  if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) {
×
UNCOV
291
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
292
  }
293

UNCOV
294
  SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
×
UNCOV
295
  SValueNode* pCurrTz = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2) : NULL;
×
296
  SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
×
297

298
  pCtx->truncateUnit = pUnit->typeData;
×
UNCOV
299
  if ((NULL == pCurrTz || 1 == pCurrTz->typeData) && pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) {
×
300
    pCtx->timezoneUnit = offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
×
301
  }
302

UNCOV
303
  pCtx->targetSlotId = pTarget->slotId;
×
304

305
  return TSDB_CODE_SUCCESS;
×
306
}
307

308

309
static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
1,046,328✔
310
  SNodeList* pKeyList = NULL;
1,046,328✔
311
  SHJoinTableCtx* pTable = &pJoin->tbs[idx];
1,046,328✔
312
  pTable->downStream = pDownstream[idx];
1,046,328✔
313
  pTable->blkId = pDownstream[idx]->resultDataBlockId;
1,046,328✔
314
  if (0 == idx) {
1,046,328✔
315
    pKeyList = pJoinNode->pOnLeft;
523,164✔
316
    pTable->hasTimeRange = pJoinNode->timeRangeTarget & 0x1;
523,164✔
317
  } else {
318
    pKeyList = pJoinNode->pOnRight;
523,164✔
319
    pTable->hasTimeRange = pJoinNode->timeRangeTarget & 0x2;
523,164✔
320
  }
321

322
  HJ_ERR_RET(hJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
1,046,328✔
323
  
324
  int32_t code = hJoinInitKeyColsInfo(pTable, pKeyList);
1,046,328✔
325
  if (code) {
1,046,328✔
UNCOV
326
    return code;
×
327
  }
328
  code = hJoinInitValColsInfo(pTable, pJoinNode->pTargets);
1,046,328✔
329
  if (code) {
1,046,328✔
UNCOV
330
    return code;
×
331
  }
332

333
  TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat));
1,046,328✔
334

335
  HJ_ERR_RET(hJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
1,046,328✔
336

337
  return TSDB_CODE_SUCCESS;
1,046,328✔
338
}
339

340
static void hJoinSetBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
523,164✔
341
  int32_t buildIdx = 0;
523,164✔
342
  int32_t probeIdx = 1;
523,164✔
343

344
  pInfo->joinType = pJoinNode->joinType;
523,164✔
345
  pInfo->subType = pJoinNode->subType;
523,164✔
346
  
347
  switch (pInfo->joinType) {
523,164✔
348
    case JOIN_TYPE_INNER:
523,164✔
349
    case JOIN_TYPE_FULL:
350
      if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) {
523,164✔
351
        buildIdx = 0;
523,164✔
352
        probeIdx = 1;
523,164✔
353
      } else {
UNCOV
354
        buildIdx = 1;
×
UNCOV
355
        probeIdx = 0;
×
356
      }
357
      break;
523,164✔
UNCOV
358
    case JOIN_TYPE_LEFT:
×
UNCOV
359
      buildIdx = 1;
×
360
      probeIdx = 0;
×
361
      break;
×
362
    case JOIN_TYPE_RIGHT:
×
363
      buildIdx = 0;
×
364
      probeIdx = 1;
×
365
      break;
×
366
    default:
×
367
      break;
×
368
  } 
369
  
370
  pInfo->pBuild = &pInfo->tbs[buildIdx];
523,164✔
371
  pInfo->pProbe = &pInfo->tbs[probeIdx];
523,164✔
372
  
373
  pInfo->pBuild->downStreamIdx = buildIdx;
523,164✔
374
  pInfo->pProbe->downStreamIdx = probeIdx;
523,164✔
375

376
  if (0 == buildIdx) {
523,164✔
377
    pInfo->pBuild->primExpr = pJoinNode->leftPrimExpr;
523,164✔
378
    pInfo->pProbe->primExpr = pJoinNode->rightPrimExpr;
523,164✔
379
  } else {
UNCOV
380
    pInfo->pBuild->primExpr = pJoinNode->rightPrimExpr;
×
UNCOV
381
    pInfo->pProbe->primExpr = pJoinNode->leftPrimExpr;
×
382
  }
383
}
523,164✔
384

385
static int32_t hJoinBuildResColsMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
523,164✔
386
  pInfo->pResColNum = pJoinNode->pTargets->length;
523,164✔
387
  pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t));
523,164✔
388
  if (NULL == pInfo->pResColMap) {
523,164✔
UNCOV
389
    return terrno;
×
390
  }
391
  
392
  SNode* pNode = NULL;
523,164✔
393
  int32_t i = 0;
523,164✔
394
  FOREACH(pNode, pJoinNode->pTargets) {
2,623,436✔
395
    STargetNode* pTarget = (STargetNode*)pNode;
2,100,272✔
396
    SColumnNode* pCol = (SColumnNode*)pTarget->pExpr;
2,100,272✔
397
    if (pCol->dataBlockId == pInfo->pBuild->blkId) {
2,100,272✔
398
      pInfo->pResColMap[i] = 1;
1,050,136✔
399
    }
400
    
401
    i++;
2,100,272✔
402
  }
403

404
  return TSDB_CODE_SUCCESS;
523,164✔
405
}
406

407

408
static FORCE_INLINE int32_t hJoinAddPageToBufs(SArray* pRowBufs) {
UNCOV
409
  SBufPageInfo page;
×
410
  page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE;
524,490✔
411
  page.offset = 0;
524,490✔
412
  page.data = taosMemoryMalloc(page.pageSize);
524,490✔
413
  if (NULL == page.data) {
524,490✔
UNCOV
414
    return terrno;
×
415
  }
416

417
  if (NULL == taosArrayPush(pRowBufs, &page)) {
524,490✔
UNCOV
418
    return terrno;
×
419
  }
420
  return TSDB_CODE_SUCCESS;
524,490✔
421
}
422

423
static int32_t hJoinInitBufPages(SHJoinOperatorInfo* pInfo) {
524,490✔
424
  pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo));
524,490✔
425
  if (NULL == pInfo->pRowBufs) {
524,490✔
UNCOV
426
    return terrno;
×
427
  }
428

429
  return hJoinAddPageToBufs(pInfo->pRowBufs);
1,048,980✔
430
}
431

432
static void hJoinFreeTableInfo(SHJoinTableCtx* pTable) {
1,046,328✔
433
  taosMemoryFreeClear(pTable->keyCols);
1,046,328✔
434
  taosMemoryFreeClear(pTable->keyBuf);
1,046,328✔
435
  taosMemoryFreeClear(pTable->valCols);
1,046,328✔
436
  taosArrayDestroy(pTable->valVarCols);
1,046,328✔
437
  taosMemoryFree(pTable->primCol);
1,046,328✔
438
}
1,046,328✔
439

440
static void hJoinFreeBufPage(void* param) {
524,490✔
441
  SBufPageInfo* pInfo = (SBufPageInfo*)param;
524,490✔
442
  taosMemoryFree(pInfo->data);
524,490✔
443
}
524,490✔
444

445
static void hJoinDestroyKeyHash(SSHashObj** ppHash) {
1,047,212✔
446
  if (NULL == ppHash || NULL == (*ppHash)) {
1,047,212✔
447
    return;
522,722✔
448
  }
449

450
  void*   pIte = NULL;
524,490✔
451
  int32_t iter = 0;
524,490✔
452
  while ((pIte = tSimpleHashIterate(*ppHash, pIte, &iter)) != NULL) {
2,503,936✔
453
    SGroupData* pGroup = pIte;
1,979,446✔
454
    SBufRowInfo* pRow = pGroup->rows;
1,979,446✔
455
    SBufRowInfo* pNext = NULL;
1,979,446✔
456
    while (pRow) {
4,004,724✔
457
      pNext = pRow->next;
2,025,278✔
458
      taosMemoryFree(pRow);
2,025,278✔
459
      pRow = pNext;
2,025,278✔
460
    }
461
  }
462

463
  tSimpleHashCleanup(*ppHash);
524,490✔
464
  *ppHash = NULL;
524,490✔
465
}
466

467
static FORCE_INLINE int32_t hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow, char** ppData) {
468
  *ppData = NULL;
3,226,054✔
469
  
470
  if ((uint16_t)-1 == pRow->pageId) {
1,613,027✔
UNCOV
471
    return TSDB_CODE_SUCCESS;
×
472
  }
473
  SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId);
1,613,027✔
474
  if (NULL == pPage) {
1,613,027✔
UNCOV
475
    qError("fail to get %d page, total:%d", pRow->pageId, (int32_t)taosArrayGetSize(pRowBufs));
×
UNCOV
476
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
477
  }
478
  
479
  *ppData = pPage->data + pRow->offset;
1,613,027✔
480

481
  return TSDB_CODE_SUCCESS;
1,613,027✔
482
}
483

484
static int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) {
279,580✔
485
  SHJoinTableCtx* pBuild = pJoin->pBuild;
279,580✔
486
  SHJoinTableCtx* pProbe = pJoin->pProbe;
279,580✔
487
  int32_t buildIdx = 0, buildValIdx = 0;
279,580✔
488
  int32_t probeIdx = 0;
279,580✔
489
  SBufRowInfo* pRow = pStart;
279,580✔
490
  int32_t code = 0;
279,580✔
491
  char* pData = NULL;
279,580✔
492

493
  for (int32_t r = 0; r < rowNum; ++r) {
1,892,607✔
494
    HJ_ERR_RET(hJoinRetrieveColDataFromRowBufs(pJoin->pRowBufs, pRow, &pData));
3,226,054✔
495
    
496
    char* pValData = pData + pBuild->valBitMapSize;
1,613,027✔
497
    char* pKeyData = pProbe->keyData;
1,613,027✔
498
    buildIdx = buildValIdx = probeIdx = 0;
1,613,027✔
499
    for (int32_t i = 0; i < pJoin->pResColNum; ++i) {
8,087,983✔
500
      if (pJoin->pResColMap[i]) {
6,474,956✔
501
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot);
3,237,478✔
502
        if (IS_HASH_JOIN_KEY_COL(pBuild->valCols[buildIdx].keyColIdx)) {
3,237,478✔
503
          int32_t bufOffset = pProbe->keyCols[pBuild->valCols[buildIdx].keyColIdx].bufOffset;
2,856✔
504
          code = colDataSetVal(pDst, pRes->info.rows + r, pKeyData + bufOffset, false);
2,856✔
505
          if (code) {
2,856✔
506
            return code;
×
507
          }
508
        } else {
509
          if (BMIsNull(pData, buildValIdx)) {
3,234,622✔
510
            code = colDataSetVal(pDst, pRes->info.rows + r, NULL, true);
5,712✔
511
            if (code) {
5,712✔
512
              return code;
×
513
            }
514
          } else {
515
            code = colDataSetVal(pDst, pRes->info.rows + r, pValData, false);
3,228,910✔
516
            if (code) {
3,228,910✔
UNCOV
517
              return code;
×
518
            }
519
            pValData += pBuild->valCols[buildIdx].vardata ? varDataTLen(pValData) : pBuild->valCols[buildIdx].bytes;
3,228,910✔
520
          }
521
          buildValIdx++;
3,234,622✔
522
        }
523
        buildIdx++;
3,237,478✔
524
      } else if (0 == r) {
3,237,478✔
525
        SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
570,584✔
526
        SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
570,584✔
527

528
        if (colDataIsNull_s(pSrc, pJoin->ctx.probeStartIdx)) {
1,141,168✔
529
          code = colDataCopyNItems(pDst, pRes->info.rows, NULL, rowNum, true);
5,712✔
530
        } else {
531
          code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeStartIdx), rowNum, false);
564,872✔
532
        }
533
        if (code) {
570,584✔
UNCOV
534
          return code;
×
535
        }
536
        probeIdx++;
570,584✔
537
      }
538
    }
539
    pRow = pRow->next;
1,613,027✔
540
  }
541

542
  return TSDB_CODE_SUCCESS;
279,580✔
543
}
544

545
int32_t hJoinCopyNMatchRowsToBlock(SHJoinOperatorInfo* pJoin, SSDataBlock* pRes, int32_t startIdx, int32_t rows) {
×
546
  SHJoinTableCtx* pBuild = pJoin->pBuild;
×
547
  SHJoinTableCtx* pProbe = pJoin->pProbe;
×
548
  int32_t buildIdx = 0;
×
UNCOV
549
  int32_t probeIdx = 0;
×
550
  int32_t code = 0;
×
551

552
  for (int32_t i = 0; i < pJoin->pResColNum; ++i) {
×
553
    if (pJoin->pResColMap[i]) {
×
UNCOV
554
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot);
×
555
      colDataSetNItemsNull(pDst, pRes->info.rows, rows);
×
556

557
      buildIdx++;
×
558
    } else {
UNCOV
559
      SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
×
560
      SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
×
561

562
      QRY_ERR_RET(colDataAssignNRows(pDst, pRes->info.rows, pSrc, startIdx, rows));
×
563

UNCOV
564
      probeIdx++;
×
565
    }
566
  }
567

568
  pRes->info.rows += rows;
×
569

UNCOV
570
  return TSDB_CODE_SUCCESS;
×
571
}
572

573

574

575
void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) {
279,580✔
576
  SHJoinOperatorInfo* pJoin = pOperator->info;
279,580✔
577
  SHJoinCtx* pCtx = &pJoin->ctx;
279,580✔
578
  SBufRowInfo* pStart = pCtx->pBuildRow;
279,580✔
579
  int32_t rowNum = 0;
279,580✔
580
  int32_t resNum = pRes->info.rows;
279,580✔
581
  
582
  while (pCtx->pBuildRow && (resNum < pRes->info.capacity)) {
1,892,607✔
583
    rowNum++;
1,613,027✔
584
    resNum++;
1,613,027✔
585
    pCtx->pBuildRow = pCtx->pBuildRow->next;
1,613,027✔
586
  }
587

588
  pJoin->execInfo.resRows += rowNum;
279,580✔
589

590
  int32_t code = hJoinCopyResRowsToBlock(pJoin, rowNum, pStart, pRes);
279,580✔
591
  if (code) {
279,580✔
UNCOV
592
    pOperator->pTaskInfo->code = code;
×
UNCOV
593
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
594
  }
595

596
  pRes->info.rows = resNum;
279,580✔
597
  *allFetched = pCtx->pBuildRow ? false : true;
279,580✔
598
}
279,580✔
599

600

601
bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) {
3,184,483✔
602
  char *pData = NULL;
3,184,483✔
603
  size_t bufLen = 0;
3,184,483✔
604
  
605
  if (1 == pTable->keyNum) {
3,184,483✔
606
    if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) {
6,291,386✔
607
      return true;
1,534✔
608
    }
609
    if (pTable->keyCols[0].vardata) {
3,144,159✔
610
      pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
687,519✔
611
      bufLen = varDataTLen(pData);
687,519✔
612
    } else {
613
      pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
2,456,640✔
614
      bufLen = pTable->keyCols[0].bytes;
2,456,640✔
615
    }
616
    pTable->keyData = pData;
3,144,159✔
617
  } else {
618
    for (int32_t i = 0; i < pTable->keyNum; ++i) {
116,370✔
619
      if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) {
155,160✔
UNCOV
620
        return true;
×
621
      }
622
      
623
      pTable->keyCols[i].bufOffset = bufLen;
77,580✔
624
      
625
      if (pTable->keyCols[i].vardata) {
77,580✔
626
        pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
29,948✔
627
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
29,948✔
628
        bufLen += varDataTLen(pData);
29,948✔
629
      } else {
630
        pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
47,632✔
631
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
47,632✔
632
        bufLen += pTable->keyCols[i].bytes;
47,632✔
633
      }
634
    }
635
    pTable->keyData = pTable->keyBuf;
38,790✔
636
  }
637

638
  if (pBufLen) {
3,182,949✔
639
    *pBufLen = bufLen;
3,182,949✔
640
  }
641

642
  return false;
3,182,949✔
643
}
644

645
static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) {
2,018,819✔
646
  for (int32_t i = 0; i < pTable->keyNum; ++i) {
4,057,945✔
647
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
2,039,126✔
648
    if (NULL == pCol) {
2,039,126✔
649
      qError("fail to get %d col, total:%d", pTable->keyCols[i].srcSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
×
UNCOV
650
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
651
    }
652
    if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type))  {
2,039,126✔
653
      qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata);
×
UNCOV
654
      return TSDB_CODE_INVALID_PARA;
×
655
    }
656
    if (pTable->keyCols[i].bytes != pCol->info.bytes)  {
2,039,126✔
UNCOV
657
      qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes, pTable->keyCols[i].bytes);
×
UNCOV
658
      return TSDB_CODE_INVALID_PARA;
×
659
    }
660
    pTable->keyCols[i].data = pCol->pData;
2,039,126✔
661
    if (pTable->keyCols[i].vardata) {
2,039,126✔
662
      pTable->keyCols[i].offset = pCol->varmeta.offset;
472,396✔
663
    }
664
    pTable->keyCols[i].colData = pCol;
2,039,126✔
665
  }
666

667
  return TSDB_CODE_SUCCESS;
2,018,819✔
668
}
669

670
static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) {
3,034,391✔
671
  if (!pTable->valColExist) {
3,034,391✔
UNCOV
672
    return TSDB_CODE_SUCCESS;
×
673
  }
674
  for (int32_t i = 0; i < pTable->valNum; ++i) {
9,120,309✔
675
    if (IS_HASH_JOIN_KEY_COL(pTable->valCols[i].keyColIdx)) {
6,085,918✔
676
      continue;
4,760✔
677
    }
678
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->valCols[i].srcSlot);
6,081,158✔
679
    if (NULL == pCol) {
6,081,158✔
680
      qError("fail to get %d col, total:%d", pTable->valCols[i].srcSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
×
UNCOV
681
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
682
    }
683
    if (pTable->valCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type))  {
6,081,158✔
684
      qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->valCols[i].srcSlot, pCol->info.type, pTable->valCols[i].vardata);
×
UNCOV
685
      return TSDB_CODE_INVALID_PARA;
×
686
    }
687
    if (pTable->valCols[i].bytes != pCol->info.bytes)  {
6,081,158✔
UNCOV
688
      qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->valCols[i].srcSlot, pCol->info.bytes, pTable->valCols[i].bytes);
×
UNCOV
689
      return TSDB_CODE_INVALID_PARA;
×
690
    }
691
    if (!pTable->valCols[i].vardata) {
6,081,158✔
692
      pTable->valCols[i].bitMap = pCol->nullbitmap;
6,075,446✔
693
    }
694
    pTable->valCols[i].data = pCol->pData;
6,081,158✔
695
    if (pTable->valCols[i].vardata) {
6,081,158✔
696
      pTable->valCols[i].offset = pCol->varmeta.offset;
5,712✔
697
    }
698
  }
699

700
  return TSDB_CODE_SUCCESS;
3,034,391✔
701
}
702

703

704

705
static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx) {
706
  if (!pTable->valColExist) {
2,025,278✔
UNCOV
707
    return;
×
708
  }
709

710
  char *pData = NULL;
2,025,278✔
711
  size_t bufLen = pTable->valBitMapSize;
2,025,278✔
712
  TAOS_MEMSET(pTable->valData, 0, pTable->valBitMapSize);
2,025,278✔
713
  for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) {
6,087,258✔
714
    if (IS_HASH_JOIN_KEY_COL(pTable->valCols[i].keyColIdx)) {
4,061,980✔
715
      continue;
2,856✔
716
    }
717
    if (pTable->valCols[i].vardata) {
4,059,124✔
718
      if (-1 == pTable->valCols[i].offset[rowIdx]) {
4,284✔
719
        colDataSetNull_f(pTable->valData, m);
4,284✔
720
      } else {
UNCOV
721
        pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx];
×
722
        TAOS_MEMCPY(pTable->valData + bufLen, pData, varDataTLen(pData));
×
UNCOV
723
        bufLen += varDataTLen(pData);
×
724
      }
725
    } else {
726
      if (BMIsNull(pTable->valCols[i].bitMap, rowIdx)) {
4,054,840✔
727
        colDataSetNull_f(pTable->valData, m);
1,428✔
728
      } else {
729
        pData = pTable->valCols[i].data + pTable->valCols[i].bytes * rowIdx;
4,053,412✔
730
        TAOS_MEMCPY(pTable->valData + bufLen, pData, pTable->valCols[i].bytes);
4,053,412✔
731
        bufLen += pTable->valCols[i].bytes;
4,053,412✔
732
      }
733
    }
734
    m++;
4,059,124✔
735
  }
736
}
737

738

739
static FORCE_INLINE int32_t hJoinGetValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) {
740
  if (0 == bufSize) {
2,025,278✔
741
    pRow->pageId = -1;
×
742
    return TSDB_CODE_SUCCESS;
×
743
  }
744

745
  if (bufSize > HASH_JOIN_DEFAULT_PAGE_SIZE) {
2,025,278✔
UNCOV
746
    qError("invalid join value buf size:%d", bufSize);
×
UNCOV
747
    return TSDB_CODE_INVALID_PARA;
×
748
  }
749
  
UNCOV
750
  do {
×
751
    SBufPageInfo* page = taosArrayGetLast(pPages);
2,025,278✔
752
    if ((page->pageSize - page->offset) >= bufSize) {
2,025,278✔
753
      *pBuf = page->data + page->offset;
2,025,278✔
754
      pRow->pageId = taosArrayGetSize(pPages) - 1;
2,025,278✔
755
      pRow->offset = page->offset;
2,025,278✔
756
      page->offset += bufSize;
2,025,278✔
757
      return TSDB_CODE_SUCCESS;
2,025,278✔
758
    }
759

UNCOV
760
    int32_t code = hJoinAddPageToBufs(pPages);
×
UNCOV
761
    if (code) {
×
UNCOV
762
      return code;
×
763
    }
764
  } while (true);
765
}
766

767
static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableCtx* pTable, int32_t rowIdx) {
768
  if (NULL == pTable->valVarCols) {
2,025,278✔
769
    return pTable->valBufSize;
2,023,850✔
770
  }
771

772
  int32_t* varColIdx = NULL;
1,428✔
773
  int32_t bufLen = pTable->valBufSize;
1,428✔
774
  int32_t varColNum = taosArrayGetSize(pTable->valVarCols);
1,428✔
775
  for (int32_t i = 0; i < varColNum; ++i) {
5,712✔
776
    varColIdx = taosArrayGet(pTable->valVarCols, i);
4,284✔
777
    if (-1 == pTable->valCols[*varColIdx].offset[rowIdx]) {
4,284✔
778
      continue;
4,284✔
779
    }
UNCOV
780
    char* pData = pTable->valCols[*varColIdx].data + pTable->valCols[*varColIdx].offset[rowIdx];
×
UNCOV
781
    bufLen += varDataTLen(pData);
×
782
  }
783

784
  return bufLen;
1,428✔
785
}
786

787

788
static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableCtx* pTable, size_t keyLen, int32_t rowIdx) {
2,025,278✔
789
  SGroupData group = {0};
2,025,278✔
790
  SBufRowInfo* pRow = NULL;
2,025,278✔
791

792
  if (NULL == pGroup) {
2,025,278✔
793
    group.rows = taosMemoryMalloc(sizeof(SBufRowInfo));
1,979,446✔
794
    if (NULL == group.rows) {
1,979,446✔
UNCOV
795
      return terrno;
×
796
    }
797
    pRow = group.rows;
1,979,446✔
798
  } else {
799
    pRow = taosMemoryMalloc(sizeof(SBufRowInfo));
45,832✔
800
    if (NULL == pRow) {
45,832✔
UNCOV
801
      return terrno;
×
802
    }
803
  }
804

805
  int32_t code = hJoinGetValBufFromPages(pJoin->pRowBufs, hJoinGetValBufSize(pTable, rowIdx), &pTable->valData, pRow);
4,050,556✔
806
  if (code) {
2,025,278✔
807
    taosMemoryFree(pRow);
×
UNCOV
808
    return code;
×
809
  }
810

811
  if (NULL == pGroup) {
2,025,278✔
812
    pRow->next = NULL;
1,979,446✔
813
    if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) {
1,979,446✔
UNCOV
814
      taosMemoryFree(pRow);
×
UNCOV
815
      return terrno;
×
816
    }
817
  } else {
818
    pRow->next = pGroup->rows;
45,832✔
819
    pGroup->rows = pRow;
45,832✔
820
  }
821

822
  return TSDB_CODE_SUCCESS;
2,025,278✔
823
}
824

825
static int32_t hJoinAddRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) {
2,025,278✔
826
  SHJoinTableCtx* pBuild = pJoin->pBuild;
2,025,278✔
827
  int32_t code = hJoinSetValColsData(pBlock, pBuild);
2,025,278✔
828
  if (code) {
2,025,278✔
UNCOV
829
    return code;
×
830
  }
831

832
  SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyData, keyLen);
2,025,278✔
833
  code = hJoinAddRowToHashImpl(pJoin, pGroup, pBuild, keyLen, rowIdx);
2,025,278✔
834
  if (code) {
2,025,278✔
835
    return code;
×
836
  }
837
  
838
  hJoinCopyValColsDataToBuf(pBuild, rowIdx);
839

840
  return TSDB_CODE_SUCCESS;
2,025,278✔
841
}
842

843
static bool hJoinFilterTimeRange(SSDataBlock* pBlock, STimeWindow* pRange, int32_t primSlot, int32_t* startIdx, int32_t* endIdx) {
×
UNCOV
844
  SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, primSlot);
×
845
  if (NULL == pCol) {
×
846
    qError("hash join can't get prim col, slot:%d, slotNum:%d", primSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
×
UNCOV
847
    return false;
×
848
  }
849

850
  TSKEY skey = *(TSKEY*)colDataGetData(pCol, 0);
×
851
  TSKEY ekey = *(TSKEY*)colDataGetData(pCol, (pBlock->info.rows - 1));
×
852

UNCOV
853
  if (ekey < pRange->skey || skey > pRange->ekey) {
×
UNCOV
854
    return false;
×
855
  }
856

857
  if (skey >= pRange->skey && ekey <= pRange->ekey) {
×
858
    *startIdx = 0;
×
859
    *endIdx = pBlock->info.rows - 1;
×
860
    return true;
×
861
  }
862

863
  if (skey < pRange->skey && ekey > pRange->ekey) {
×
864
    TSKEY *pStart = (TSKEY*)taosbsearch(&pRange->skey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_GE);
×
865
    TSKEY *pEnd = (TSKEY*)taosbsearch(&pRange->ekey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_LE);
×
866
    *startIdx = ((uint64_t)pStart - (uint64_t)pCol->pData) / sizeof(int64_t);
×
867
    *endIdx = ((uint64_t)pEnd - (uint64_t)pCol->pData) / sizeof(int64_t);
×
UNCOV
868
    return true;
×
869
  }
870

871
  if (skey >= pRange->skey) {
×
872
    TSKEY *pEnd = (TSKEY*)taosbsearch(&pRange->ekey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_LE);
×
UNCOV
873
    *startIdx = 0;
×
874
    *endIdx = ((uint64_t)pEnd - (uint64_t)pCol->pData) / sizeof(int64_t);
×
UNCOV
875
    return true;
×
876
  }
877

UNCOV
878
  TSKEY *pStart = (TSKEY*)taosbsearch(&pRange->skey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_GE);
×
UNCOV
879
  *startIdx = ((uint64_t)pStart - (uint64_t)pCol->pData) / sizeof(int64_t);
×
UNCOV
880
  *endIdx = pBlock->info.rows - 1;
×
881
  
UNCOV
882
  return true;
×
883
}
884

885
static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) {
1,009,706✔
886
  SHJoinTableCtx* pBuild = pJoin->pBuild;
1,009,706✔
887
  int32_t startIdx = 0, endIdx = pBlock->info.rows - 1;
1,009,706✔
888
  if (pBuild->hasTimeRange && !hJoinFilterTimeRange(pBlock, &pJoin->tblTimeRange, pBuild->primCol->srcSlot, &startIdx, &endIdx)) {
1,009,706✔
UNCOV
889
    return TSDB_CODE_SUCCESS;
×
890
  }
891

892
  HJ_ERR_RET(hJoinLaunchPrimExpr(pBlock, pBuild, startIdx, endIdx));
1,009,706✔
893

894
  int32_t code = hJoinSetKeyColsData(pBlock, pBuild);
1,009,706✔
895
  if (code) {
1,009,706✔
UNCOV
896
    return code;
×
897
  }
898

899
  size_t bufLen = 0;
1,009,706✔
900
  for (int32_t i = startIdx; i <= endIdx; ++i) {
3,036,126✔
901
    if (hJoinCopyKeyColsDataToBuf(pBuild, i, &bufLen)) {
2,026,420✔
902
      continue;
1,142✔
903
    }
904
    code = hJoinAddRowToHash(pJoin, pBlock, bufLen, i);
2,025,278✔
905
    if (code) {
2,025,278✔
UNCOV
906
      return code;
×
907
    }
908
  }
909

910
  return code;
1,009,706✔
911
}
912

913
static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator, bool* queryDone) {
524,048✔
914
  SHJoinOperatorInfo* pJoin = pOperator->info;
524,048✔
915
  SSDataBlock*        pBlock = NULL;
524,048✔
916
  int32_t             code = TSDB_CODE_SUCCESS;
524,048✔
917

918
  while (true) {
919
    pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx);
1,533,754✔
920
    if (NULL == pBlock) {
1,533,754✔
921
      break;
524,048✔
922
    }
923

924
    pJoin->execInfo.buildBlkNum++;
1,009,706✔
925
    pJoin->execInfo.buildBlkRows += pBlock->info.rows;
1,009,706✔
926

927
    code = hJoinAddBlockRowsToHash(pBlock, pJoin);
1,009,706✔
928
    if (code) {
1,009,706✔
UNCOV
929
      return code;
×
930
    }
931
  }
932

933
  if (IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && tSimpleHashGetSize(pJoin->pKeyHash) <= 0) {
524,048✔
934
    hJoinSetDone(pOperator);
6,972✔
935
    *queryDone = true;
6,972✔
936
  }
937
  
938
  //qTrace("build table rows:%" PRId64, hJoinGetRowsNumOfKeyHash(pJoin->pKeyHash));
939

940
  return TSDB_CODE_SUCCESS;
524,048✔
941
}
942

943
static int32_t hJoinPrepareStart(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) {
1,009,113✔
944
  SHJoinOperatorInfo* pJoin = pOperator->info;
1,009,113✔
945
  SHJoinTableCtx* pProbe = pJoin->pProbe;
1,009,113✔
946
  int32_t startIdx = 0, endIdx = pBlock->info.rows - 1;
1,009,113✔
947
  if (pProbe->hasTimeRange && !hJoinFilterTimeRange(pBlock, &pJoin->tblTimeRange, pProbe->primCol->srcSlot, &startIdx, &endIdx)) {
1,009,113✔
UNCOV
948
    if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) {
×
UNCOV
949
      pJoin->ctx.probeEndIdx = -1;
×
950
      pJoin->ctx.probePostIdx = 0;
×
UNCOV
951
      pJoin->ctx.pProbeData = pBlock;
×
UNCOV
952
      pJoin->ctx.rowRemains = true;
×
UNCOV
953
      pJoin->ctx.probePhase = E_JOIN_PHASE_POST;
×
954
      
UNCOV
955
      HJ_ERR_RET((*pJoin->joinFp)(pOperator));
×
956
    }
957
    
UNCOV
958
    return TSDB_CODE_SUCCESS;
×
959
  }
960

961
  HJ_ERR_RET(hJoinLaunchPrimExpr(pBlock, pProbe, startIdx, endIdx));
1,009,113✔
962

963
  int32_t code = hJoinSetKeyColsData(pBlock, pProbe);
1,009,113✔
964
  if (code) {
1,009,113✔
UNCOV
965
    return code;
×
966
  }
967
  code = hJoinSetValColsData(pBlock, pProbe);
1,009,113✔
968
  if (code) {
1,009,113✔
UNCOV
969
    return code;
×
970
  }
971

972
  pJoin->ctx.probeStartIdx = startIdx;
1,009,113✔
973
  pJoin->ctx.probeEndIdx = endIdx;
1,009,113✔
974
  pJoin->ctx.pBuildRow = NULL;
1,009,113✔
975
  pJoin->ctx.pProbeData = pBlock;
1,009,113✔
976
  pJoin->ctx.rowRemains = true;
1,009,113✔
977
  pJoin->ctx.probePreIdx = 0;
1,009,113✔
978
  pJoin->ctx.probePostIdx = endIdx + 1;
1,009,113✔
979

980
  if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && startIdx > 0) {
1,009,113✔
UNCOV
981
    pJoin->ctx.probePhase = E_JOIN_PHASE_PRE;
×
982
  } else {
983
    pJoin->ctx.probePhase = E_JOIN_PHASE_CUR;
1,009,113✔
984
  }
985

986
  HJ_ERR_RET((*pJoin->joinFp)(pOperator));
1,009,113✔
987

988
  return TSDB_CODE_SUCCESS;
1,009,113✔
989
}
990

991
void hJoinSetDone(struct SOperatorInfo* pOperator) {
524,048✔
992
  setOperatorCompleted(pOperator);
524,048✔
993

994
  SHJoinOperatorInfo* pInfo = pOperator->info;
524,048✔
995
  hJoinDestroyKeyHash(&pInfo->pKeyHash);
524,048✔
996

997
  qDebug("hash Join done");  
524,048✔
998
}
524,048✔
999

1000
static int32_t hJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
610,691✔
1001
  SHJoinOperatorInfo* pJoin = pOperator->info;
610,691✔
1002
  SExecTaskInfo*      pTaskInfo = pOperator->pTaskInfo;
610,691✔
1003
  int32_t             code = TSDB_CODE_SUCCESS;
610,691✔
1004
  int32_t             lino = 0;
610,691✔
1005
  SSDataBlock*        pRes = pJoin->finBlk;
610,691✔
1006
  int64_t             st = 0;
610,691✔
1007

1008
  QRY_PARAM_CHECK(pResBlock);
610,691✔
1009
  if (pOperator->cost.openCost == 0) {
610,691✔
1010
    st = taosGetTimestampUs();
523,164✔
1011
  }
1012

1013
  if (pOperator->status == OP_EXEC_DONE) {
610,691✔
1014
    pRes->info.rows = 0;
86,643✔
1015
    goto _end;
86,643✔
1016
  }
1017

1018
  if (!pJoin->keyHashBuilt) {
524,048✔
1019
    pJoin->keyHashBuilt = true;
524,048✔
1020

1021
    bool queryDone = false;
524,048✔
1022
    code = hJoinBuildHash(pOperator, &queryDone);
524,048✔
1023
    QUERY_CHECK_CODE(code, lino, _end);
524,048✔
1024

1025
    if (queryDone) {
524,048✔
1026
      goto _end;
6,972✔
1027
    }
1028
  }
1029

1030
  blockDataCleanup(pRes);
517,076✔
1031

1032
  if (pJoin->ctx.rowRemains) {
517,076✔
1033
    code = (*pJoin->joinFp)(pOperator);
×
1034
    QUERY_CHECK_CODE(code, lino, _end);
×
1035

UNCOV
1036
    if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
×
UNCOV
1037
      code = doFilter(pRes, pJoin->pFinFilter, NULL, NULL);
×
UNCOV
1038
      QUERY_CHECK_CODE(code, lino, _end);
×
1039
    }
1040

UNCOV
1041
    if (pRes->info.rows > 0) {
×
UNCOV
1042
      *pResBlock = pRes;
×
UNCOV
1043
      return code;
×
1044
    }
1045
  }
1046

1047
  while (true) {
1,009,113✔
1048
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pJoin->pProbe->downStreamIdx);
1,526,189✔
1049
    if (NULL == pBlock) {
1,526,189✔
1050
      hJoinSetDone(pOperator);
517,076✔
1051
      break;
517,076✔
1052
    }
1053

1054
    pJoin->execInfo.probeBlkNum++;
1,009,113✔
1055
    pJoin->execInfo.probeBlkRows += pBlock->info.rows;
1,009,113✔
1056

1057
    code = hJoinPrepareStart(pOperator, pBlock);
1,009,113✔
1058
    QUERY_CHECK_CODE(code, lino, _end);
1,009,113✔
1059

1060
    if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) {
1,009,113✔
1061
      continue;
1,009,113✔
1062
    }
1063

UNCOV
1064
    if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
×
UNCOV
1065
      code = doFilter(pRes, pJoin->pFinFilter, NULL, NULL);
×
UNCOV
1066
      QUERY_CHECK_CODE(code, lino, _end);
×
1067
    }
1068

UNCOV
1069
    if (pRes->info.rows > 0) {
×
UNCOV
1070
      break;
×
1071
    }
1072
  }
1073

1074
_end:
610,691✔
1075
  if (pOperator->cost.openCost == 0) {
610,691✔
1076
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
523,164✔
1077
  }
1078
  if (code != TSDB_CODE_SUCCESS) {
610,691✔
UNCOV
1079
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1080
    pTaskInfo->code = code;
×
UNCOV
1081
    T_LONG_JMP(pTaskInfo->env, code);
×
1082
  }
1083
  if (pRes->info.rows > 0) {
610,691✔
1084
    *pResBlock = pRes;
86,643✔
1085
    qDebug("%s %s output %" PRId64 " rows final res", GET_TASKID(pTaskInfo), __func__, pRes->info.rows);
86,643✔
1086
  }
1087

1088
  return code;
610,691✔
1089
}
1090

1091
static void destroyHashJoinOperator(void* param) {
523,164✔
1092
  SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param;
523,164✔
1093
  qDebug("hashJoin exec info, buildBlk:%" PRId64 ", buildRows:%" PRId64 ", probeBlk:%" PRId64 ", probeRows:%" PRId64 ", resRows:%" PRId64, 
523,164✔
1094
         pJoinOperator->execInfo.buildBlkNum, pJoinOperator->execInfo.buildBlkRows, pJoinOperator->execInfo.probeBlkNum, 
1095
         pJoinOperator->execInfo.probeBlkRows, pJoinOperator->execInfo.resRows);
1096

1097
  hJoinDestroyKeyHash(&pJoinOperator->pKeyHash);
523,164✔
1098

1099
  hJoinFreeTableInfo(&pJoinOperator->tbs[0]);
523,164✔
1100
  hJoinFreeTableInfo(&pJoinOperator->tbs[1]);
523,164✔
1101
  blockDataDestroy(pJoinOperator->finBlk);
523,164✔
1102
  pJoinOperator->finBlk = NULL;
523,164✔
1103
  taosMemoryFreeClear(pJoinOperator->pResColMap);
523,164✔
1104
  taosArrayDestroyEx(pJoinOperator->pRowBufs, hJoinFreeBufPage);
523,164✔
1105

1106
  taosMemoryFreeClear(param);
523,164✔
1107
}
523,164✔
1108

1109
int32_t hJoinHandleConds(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
523,164✔
1110
  switch (pJoin->joinType) {
523,164✔
1111
    case JOIN_TYPE_INNER: {
523,164✔
1112
      SNode* pCond = NULL;
523,164✔
1113
      if (pJoinNode->pFullOnCond != NULL) {
523,164✔
UNCOV
1114
        if (pJoinNode->node.pConditions != NULL) {
×
UNCOV
1115
          HJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions));
×
1116
        }
UNCOV
1117
        pCond = pJoinNode->pFullOnCond;
×
1118
      } else if (pJoinNode->node.pConditions != NULL) {
523,164✔
1119
        pCond = pJoinNode->node.pConditions;
×
1120
      }
1121

1122
      HJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0, pTaskInfo->pStreamRuntimeInfo));
523,164✔
1123
      break;
523,164✔
1124
    }
UNCOV
1125
    case JOIN_TYPE_LEFT:
×
1126
    case JOIN_TYPE_RIGHT:
1127
    case JOIN_TYPE_FULL:
1128
      if (pJoinNode->pFullOnCond != NULL) {
×
1129
        HJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pPreFilter, 0,
×
1130
                                      pTaskInfo->pStreamRuntimeInfo));
1131
      }
UNCOV
1132
      if (pJoinNode->node.pConditions != NULL) {
×
UNCOV
1133
        HJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0,
×
1134
                                      pTaskInfo->pStreamRuntimeInfo));
1135
      }
UNCOV
1136
      break;
×
UNCOV
1137
    default:
×
1138
      break;
×
1139
  }
1140

1141
  return TSDB_CODE_SUCCESS;
523,164✔
1142
}
1143

1144
static uint32_t hJoinGetFinBlkCapacity(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) {
523,164✔
1145
  uint32_t maxRows = TMAX(HJOIN_DEFAULT_BLK_ROWS_NUM, HJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
523,164✔
1146
  if (INT64_MAX != pJoin->ctx.limit && NULL == pJoin->pFinFilter) {
523,164✔
UNCOV
1147
    uint32_t limitMaxRows = pJoin->ctx.limit / HJOIN_BLK_THRESHOLD_RATIO + 1;
×
UNCOV
1148
    return (maxRows > limitMaxRows) ? limitMaxRows : maxRows;
×
1149
  }
1150

1151
  return maxRows;
523,164✔
1152
}
1153

1154

1155
int32_t hJoinInitResBlocks(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode) {
523,164✔
1156
  pJoin->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
523,164✔
1157
  if (NULL == pJoin->finBlk) {
523,164✔
1158
    QRY_ERR_RET(terrno);
×
1159
  }
1160

1161
  int32_t code = blockDataEnsureCapacity(pJoin->finBlk, hJoinGetFinBlkCapacity(pJoin, pJoinNode));
523,164✔
1162
  if (TSDB_CODE_SUCCESS != code) {
523,164✔
UNCOV
1163
    QRY_ERR_RET(code);
×
1164
  }
1165
  
1166
  if (NULL != pJoin->pPreFilter) {
523,164✔
UNCOV
1167
    pJoin->midBlk = NULL;
×
UNCOV
1168
    code = createOneDataBlock(pJoin->finBlk, false, &pJoin->midBlk);
×
UNCOV
1169
    if (code) {
×
UNCOV
1170
      QRY_ERR_RET(code);
×
1171
    }
1172

UNCOV
1173
    code = blockDataEnsureCapacity(pJoin->midBlk, pJoin->finBlk->info.capacity);
×
UNCOV
1174
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1175
      QRY_ERR_RET(code);
×
1176
    }
1177
  }
1178

1179
  pJoin->blkThreshold = pJoin->finBlk->info.capacity * HJOIN_BLK_THRESHOLD_RATIO;
523,164✔
1180
  return TSDB_CODE_SUCCESS;
523,164✔
1181
}
1182

1183
static int32_t resetHashJoinOperState(SOperatorInfo* pOper) {
1,326✔
1184
  SHJoinOperatorInfo* pHjOper = pOper->info;
1,326✔
1185
  pHjOper->keyHashBuilt = false;
1,326✔
1186
  blockDataCleanup(pHjOper->midBlk);
1,326✔
1187
  blockDataCleanup(pHjOper->finBlk);
1,326✔
1188
  pOper->status = OP_NOT_OPENED;
1,326✔
1189

1190
  pHjOper->execInfo = (SHJoinExecInfo){0};
1,326✔
1191

1192
  void*   pIte = NULL;
1,326✔
1193
  int32_t iter = 0;
1,326✔
1194
  while ((pIte = tSimpleHashIterate(pHjOper->pKeyHash, pIte, &iter)) != NULL) {
1,326✔
UNCOV
1195
    SGroupData* pGroup = pIte;
×
UNCOV
1196
    SBufRowInfo* pRow = pGroup->rows;
×
UNCOV
1197
    SBufRowInfo* pNext = NULL;
×
UNCOV
1198
    while (pRow) {
×
1199
      pNext = pRow->next;
×
UNCOV
1200
      taosMemoryFree(pRow);
×
UNCOV
1201
      pRow = pNext;
×
1202
    }
1203
  }
1204
  tSimpleHashCleanup(pHjOper->pKeyHash);
1,326✔
1205
  size_t hashCap = pHjOper->pBuild->inputStat.inputRowNum > 0 ? (pHjOper->pBuild->inputStat.inputRowNum * 1.5) : 1024;
1,326✔
1206
  pHjOper->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
1,326✔
1207
  if (pHjOper->pKeyHash == NULL) {
1,326✔
UNCOV
1208
    return terrno; 
×
1209
  }
1210
  taosArrayDestroyEx(pHjOper->pRowBufs, hJoinFreeBufPage);
1,326✔
1211
  int32_t code = hJoinInitBufPages(pHjOper);
1,326✔
1212
  int64_t limit = pHjOper->ctx.limit;
1,326✔
1213
  pHjOper->ctx = (SHJoinCtx){0};
1,326✔
1214
  pHjOper->ctx.limit = limit;
1,326✔
1215
  return code;
1,326✔
1216
}
1217

1218
int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
523,164✔
1219
                                           SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1220
  QRY_PARAM_CHECK(pOptrInfo);
523,164✔
1221

1222
  int32_t             code = TSDB_CODE_SUCCESS;
523,164✔
1223
  SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo));
523,164✔
1224
  SOperatorInfo*      pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
523,164✔
1225
  if (pOperator == NULL || pInfo == NULL) {
523,164✔
UNCOV
1226
    code = terrno;
×
UNCOV
1227
    goto _return;
×
1228
  }
1229

1230
  pInfo->tblTimeRange.skey = pJoinNode->timeRange.skey;
523,164✔
1231
  pInfo->tblTimeRange.ekey = pJoinNode->timeRange.ekey;
523,164✔
1232
  
1233
  pInfo->ctx.limit = (pJoinNode->node.pLimit && ((SLimitNode*)pJoinNode->node.pLimit)->limit) ? ((SLimitNode*)pJoinNode->node.pLimit)->limit->datum.i : INT64_MAX;
523,164✔
1234

1235
  setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
523,164✔
1236

1237
  HJ_ERR_JRET(hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]));
523,164✔
1238
  HJ_ERR_JRET(hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]));
523,164✔
1239

1240
  hJoinSetBuildAndProbeTable(pInfo, pJoinNode);
523,164✔
1241
  
1242
  HJ_ERR_JRET(hJoinBuildResColsMap(pInfo, pJoinNode));
523,164✔
1243

1244
  HJ_ERR_JRET(hJoinInitBufPages(pInfo));
523,164✔
1245

1246
  size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024;
523,164✔
1247
  pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
523,164✔
1248
  if (pInfo->pKeyHash == NULL) {
523,164✔
UNCOV
1249
    code = terrno;
×
UNCOV
1250
    goto _return;
×
1251
  }
1252

1253
  HJ_ERR_JRET(hJoinHandleConds(pInfo, pJoinNode, pTaskInfo));
523,164✔
1254

1255
  HJ_ERR_JRET(hJoinInitResBlocks(pInfo, pJoinNode));
523,164✔
1256

1257
  HJ_ERR_JRET(hJoinSetImplFp(pInfo));
523,164✔
1258

1259
  HJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
523,164✔
1260

1261
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hJoinMainProcess, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
523,164✔
1262
  setOperatorResetStateFn(pOperator, resetHashJoinOperState);
523,164✔
1263

1264
  qDebug("create hash Join operator done");
523,164✔
1265

1266
  *pOptrInfo = pOperator;
523,164✔
1267
  return TSDB_CODE_SUCCESS;
523,164✔
1268

UNCOV
1269
_return:
×
1270

UNCOV
1271
  if (pInfo != NULL) {
×
UNCOV
1272
    destroyHashJoinOperator(pInfo);
×
1273
  }
UNCOV
1274
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
UNCOV
1275
  pTaskInfo->code = code;
×
UNCOV
1276
  return code;
×
1277
}
1278

1279

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc