• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3646

12 Mar 2025 12:34PM UTC coverage: 28.375% (-27.8%) from 56.156%
#3646

push

travis-ci

web-flow
Merge pull request #30119 from taosdata/ciup30

ci: Update workflow to fix param issue of run_tdgpt_test

59085 of 286935 branches covered (20.59%)

Branch coverage included in aggregate %.

102775 of 283490 relevant lines covered (36.25%)

55149.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.82
/source/libs/executor/src/hashjoin.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "hashjoin.h"
29

30

31

32
int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) {
188✔
33
  SHJoinOperatorInfo* pJoin = pOperator->info;
188✔
34
  SHJoinTableCtx* pProbe = pJoin->pProbe;
188✔
35
  SHJoinCtx* pCtx = &pJoin->ctx;
188✔
36
  SSDataBlock* pRes = pJoin->finBlk;
188✔
37
  size_t bufLen = 0;
188✔
38
  int32_t code = 0;
188✔
39
  bool allFetched = false;
188✔
40

41
  if (pJoin->ctx.pBuildRow) {
188!
42
    hJoinAppendResToBlock(pOperator, pRes, &allFetched);
×
43
    if (pRes->info.rows >= pRes->info.capacity) {
×
44
      if (allFetched) {
×
45
        ++pCtx->probeStartIdx;
×
46
      }
47
      
48
      return code;
×
49
    } else {
50
      ++pCtx->probeStartIdx;
×
51
    }
52
  }
53

54
  for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
448✔
55
    if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
260!
56
      continue;
×
57
    }
58
    
59
    SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
260✔
60
/*
61
    size_t keySize = 0;
62
    int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
63
    A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
64
    int64_t rows = getSingleKeyRowsNum(pGroup->rows);
65
    pJoin->execInfo.expectRows += rows;    
66
    qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
67
*/
68
    if (pGroup) {
260✔
69
      pCtx->pBuildRow = pGroup->rows;
254✔
70
      hJoinAppendResToBlock(pOperator, pRes, &allFetched);
254✔
71
      if (pRes->info.rows >= pRes->info.capacity) {
254!
72
        if (allFetched) {
×
73
          ++pCtx->probeStartIdx;
×
74
        }
75
        
76
        return code;
×
77
      }
78
    }
79
  }
80

81
  pCtx->rowRemains = false;
188✔
82

83
  return code;
188✔
84
}
85

86
#ifdef HASH_JOIN_FULL
87

88
int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
89
  bool allFetched = false;
90
  SHJoinCtx* pCtx = &pJoin->ctx;
91
  
92
  while (!allFetched) {
93
    hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
94
    if (pJoin->midBlk->info.rows > 0) {
95
      HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL));
96
      if (pJoin->midBlk->info.rows > 0) {
97
        pCtx->readMatch = true;
98
        HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
99
        
100
        if (pCtx->midRemains) {
101
          if (allFetched) {
102
            ++pCtx->probeStartIdx;
103
          }
104

105
          *loopCont = false;
106
          return TSDB_CODE_SUCCESS;
107
        }
108
      }
109
    }
110
  
111
    if (allFetched && !pCtx->readMatch) {
112
      HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
113
    }    
114
    
115
    if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
116
      if (allFetched) {
117
        ++pCtx->probeStartIdx;
118
      }
119

120
      *loopCont = false;      
121
      return TSDB_CODE_SUCCESS;
122
    }
123
  }
124
  
125
  ++pCtx->probeStartIdx;
126
  *loopCont = true;
127

128
  return TSDB_CODE_SUCCESS;
129
}
130

131
int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
132
  SHJoinTableCtx* pProbe = pJoin->pProbe;
133
  SHJoinCtx* pCtx = &pJoin->ctx;
134
  size_t bufLen = 0;
135
  bool allFetched = false;
136

137
  if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
138
    return TSDB_CODE_SUCCESS;
139
  }
140

141
  for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
142
    if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
143
      continue;
144
    }
145
    
146
    SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
147
/*
148
    size_t keySize = 0;
149
    int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
150
    A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
151
    int64_t rows = getSingleKeyRowsNum(pGroup->rows);
152
    pJoin->execInfo.expectRows += rows;    
153
    qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
154
*/
155

156
    if (NULL == pGroup) {
157
      HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
158
      if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
159
        ++pCtx->probeStartIdx;
160
        *loopCont = false;
161
        
162
        return TSDB_CODE_SUCCESS;
163
      }
164

165
      continue;
166
    }
167
    
168
    pCtx->readMatch = false;
169
    pCtx->pBuildRow = pGroup->rows;
170
    allFetched = false;
171

172
    while (!allFetched) {
173
      hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
174
      if (pJoin->midBlk->info.rows > 0) {
175
        HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL));
176
        if (pJoin->midBlk->info.rows > 0) {
177
          pCtx->readMatch = true;
178
          HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
179
          
180
          if (pCtx->midRemains) {
181
            if (allFetched) {
182
              ++pCtx->probeStartIdx;
183
            }
184

185
            *loopCont = false;
186
            
187
            return TSDB_CODE_SUCCESS;
188
          }
189
        }
190
      }
191
      
192
      if (allFetched && !pCtx->readMatch) {
193
        HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
194
      }    
195
      
196
      if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
197
        if (allFetched) {
198
          ++pCtx->probeStartIdx;
199
        }
200

201
        *loopCont = false;
202
        
203
        return TSDB_CODE_SUCCESS;
204
      }
205
    }
206
  }
207

208
  pCtx->probePhase = E_JOIN_PHASE_POST;
209
  *loopCont = true;
210

211
  return TSDB_CODE_SUCCESS;
212
}
213

214

215
int32_t hLeftJoinHandleRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
216
  bool allFetched = false;
217
  SHJoinCtx* pCtx = &pJoin->ctx;
218
  
219
  hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched);
220
  
221
  if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
222
    if (allFetched) {
223
      ++pCtx->probeStartIdx;
224
    }
225

226
    *loopCont = false;
227
    return TSDB_CODE_SUCCESS;
228
  } else {
229
    ++pCtx->probeStartIdx;
230
  }
231

232
  *loopCont = true;
233

234
  return TSDB_CODE_SUCCESS;
235
}
236

237

238
int32_t hLeftJoinHandleProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
239
  SHJoinTableCtx* pProbe = pJoin->pProbe;
240
  SHJoinCtx* pCtx = &pJoin->ctx;
241
  size_t bufLen = 0;
242
  bool allFetched = false;
243

244
  for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
245
    if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
246
      continue;
247
    }
248
    
249
    SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
250
/*
251
    size_t keySize = 0;
252
    int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
253
    A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
254
    int64_t rows = getSingleKeyRowsNum(pGroup->rows);
255
    pJoin->execInfo.expectRows += rows;    
256
    qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
257
*/
258

259
    if (NULL == pGroup) {
260
      HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
261
      if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
262
        ++pCtx->probeStartIdx;
263
        *loopCont = false;
264
        
265
        return TSDB_CODE_SUCCESS;
266
      }
267

268
      continue;
269
    }
270
    
271
    pCtx->pBuildRow = pGroup->rows;
272

273
    hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched);
274
    if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
275
      if (allFetched) {
276
        ++pCtx->probeStartIdx;
277
      }
278
      *loopCont = false;
279
      
280
      return TSDB_CODE_SUCCESS;
281
    }
282
  }
283

284
  pCtx->probePhase = E_JOIN_PHASE_POST;
285
  *loopCont = true;
286

287
  return TSDB_CODE_SUCCESS;
288
}
289

290

291

292
int32_t hLeftJoinDo(struct SOperatorInfo* pOperator) {
293
  SHJoinOperatorInfo* pJoin = pOperator->info;
294
  SHJoinCtx* pCtx = &pJoin->ctx;
295

296
  while (pCtx->rowRemains) {
297
    switch (pCtx->probePhase) {
298
      case E_JOIN_PHASE_PRE: {
299
        int32_t rows = pCtx->probeStartIdx - pCtx->probePreIdx;
300
        int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows;
301
        if (rows <= rowsLeft) {
302
          HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rows));        
303
          pCtx->probePhase = E_JOIN_PHASE_CUR;
304
        } else {
305
          HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rowsLeft));
306
          pJoin->ctx.probePreIdx += rowsLeft;
307
          
308
          return TSDB_CODE_SUCCESS;
309
        }
310
        break;
311
      }
312
      case E_JOIN_PHASE_CUR: {
313
        bool loopCont = false;
314
        if (NULL == pJoin->ctx.pBuildRow) {
315
          HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqProbeRows(pOperator, pJoin, &loopCont) : hLeftJoinHandleProbeRows(pOperator, pJoin, &loopCont));
316
        } else {
317
          HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqRowRemains(pOperator, pJoin, &loopCont) : hLeftJoinHandleRowRemains(pOperator, pJoin, &loopCont));
318
        }
319

320
        if (!loopCont) {
321
          return TSDB_CODE_SUCCESS;
322
        }
323
        break;
324
      }
325
      case E_JOIN_PHASE_POST: {
326
        if (pCtx->probeEndIdx < (pCtx->pProbeData->info.rows - 1) && pCtx->probePostIdx <= (pCtx->pProbeData->info.rows - 1)) {
327
          int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows;
328
          int32_t rows = pCtx->pProbeData->info.rows - pCtx->probePostIdx;
329
          if (rows <= rowsLeft) {
330
            HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rows));
331
            pCtx->rowRemains = false;
332
          } else {
333
            HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rowsLeft));
334
            pCtx->probePostIdx += rowsLeft;
335
            
336
            return TSDB_CODE_SUCCESS;
337
          }
338
        } else {
339
          pJoin->ctx.rowRemains = false;
340
        }
341
        break;
342
      }
343
      default:
344
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
345
    }
346
  }
347

348
  return TSDB_CODE_SUCCESS;
349
}
350

351
#endif
352

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc