• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.33
/source/libs/executor/src/hashjoin.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "hashjoin.h"
29

30

31

32
int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) {
1,752✔
33
  SHJoinOperatorInfo* pJoin = pOperator->info;
1,752✔
34
  SHJoinTableCtx* pProbe = pJoin->pProbe;
1,752✔
35
  SHJoinCtx* pCtx = &pJoin->ctx;
1,752✔
36
  SSDataBlock* pRes = pJoin->finBlk;
1,752✔
37
  size_t bufLen = 0;
1,752✔
38
  int32_t code = 0;
1,752✔
39
  bool allFetched = false;
1,752✔
40

41
  if (pJoin->ctx.pBuildRow) {
1,752!
42
    hJoinAppendResToBlock(pOperator, pRes, &allFetched);
×
43
    if (pRes->info.rows >= pRes->info.capacity) {
×
44
      if (allFetched) {
×
45
        ++pCtx->probeStartIdx;
×
46
      }
47
      
48
      return code;
×
49
    } else {
50
      ++pCtx->probeStartIdx;
×
51
    }
52
  }
53

54
  for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
3,728✔
55
    if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
1,976!
UNCOV
56
      continue;
×
57
    }
58
    
59
    SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
1,976✔
60
/*
61
    size_t keySize = 0;
62
    int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
63
    A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
64
    int64_t rows = getSingleKeyRowsNum(pGroup->rows);
65
    pJoin->execInfo.expectRows += rows;    
66
    qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
67
*/
68
    if (pGroup) {
1,976✔
69
      pCtx->pBuildRow = pGroup->rows;
527✔
70
      hJoinAppendResToBlock(pOperator, pRes, &allFetched);
527✔
71
      if (pRes->info.rows >= pRes->info.capacity) {
527!
72
        if (allFetched) {
×
73
          ++pCtx->probeStartIdx;
×
74
        }
75
        
76
        return code;
×
77
      }
78
    }
79
  }
80

81
  pCtx->rowRemains = false;
1,752✔
82

83
  return code;
1,752✔
84
}
85

86
int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
×
87
  bool allFetched = false;
×
88
  SHJoinCtx* pCtx = &pJoin->ctx;
×
89
  
90
  while (!allFetched) {
×
91
    hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
×
92
    if (pJoin->midBlk->info.rows > 0) {
×
93
      HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL));
×
94
      if (pJoin->midBlk->info.rows > 0) {
×
95
        pCtx->readMatch = true;
×
96
        HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
×
97
        
98
        if (pCtx->midRemains) {
×
99
          if (allFetched) {
×
100
            ++pCtx->probeStartIdx;
×
101
          }
102

103
          *loopCont = false;
×
104
          return TSDB_CODE_SUCCESS;
×
105
        }
106
      }
107
    }
108
  
109
    if (allFetched && !pCtx->readMatch) {
×
110
      HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
×
111
    }    
112
    
113
    if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
×
114
      if (allFetched) {
×
115
        ++pCtx->probeStartIdx;
×
116
      }
117

118
      *loopCont = false;      
×
119
      return TSDB_CODE_SUCCESS;
×
120
    }
121
  }
122
  
123
  ++pCtx->probeStartIdx;
×
124
  *loopCont = true;
×
125

126
  return TSDB_CODE_SUCCESS;
×
127
}
128

129
int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
×
130
  SHJoinTableCtx* pProbe = pJoin->pProbe;
×
131
  SHJoinCtx* pCtx = &pJoin->ctx;
×
132
  size_t bufLen = 0;
×
133
  bool allFetched = false;
×
134

135
  if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
×
136
    return TSDB_CODE_SUCCESS;
×
137
  }
138

139
  for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
×
140
    if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
×
141
      continue;
×
142
    }
143
    
144
    SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
×
145
/*
146
    size_t keySize = 0;
147
    int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
148
    A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
149
    int64_t rows = getSingleKeyRowsNum(pGroup->rows);
150
    pJoin->execInfo.expectRows += rows;    
151
    qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
152
*/
153

154
    if (NULL == pGroup) {
×
155
      HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
×
156
      if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
×
157
        ++pCtx->probeStartIdx;
×
158
        *loopCont = false;
×
159
        
160
        return TSDB_CODE_SUCCESS;
×
161
      }
162

163
      continue;
×
164
    }
165
    
166
    pCtx->readMatch = false;
×
167
    pCtx->pBuildRow = pGroup->rows;
×
168
    allFetched = false;
×
169

170
    while (!allFetched) {
×
171
      hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
×
172
      if (pJoin->midBlk->info.rows > 0) {
×
173
        HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL));
×
174
        if (pJoin->midBlk->info.rows > 0) {
×
175
          pCtx->readMatch = true;
×
176
          HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
×
177
          
178
          if (pCtx->midRemains) {
×
179
            if (allFetched) {
×
180
              ++pCtx->probeStartIdx;
×
181
            }
182

183
            *loopCont = false;
×
184
            
185
            return TSDB_CODE_SUCCESS;
×
186
          }
187
        }
188
      }
189
      
190
      if (allFetched && !pCtx->readMatch) {
×
191
        HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
×
192
      }    
193
      
194
      if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
×
195
        if (allFetched) {
×
196
          ++pCtx->probeStartIdx;
×
197
        }
198

199
        *loopCont = false;
×
200
        
201
        return TSDB_CODE_SUCCESS;
×
202
      }
203
    }
204
  }
205

206
  pCtx->probePhase = E_JOIN_PHASE_POST;
×
207
  *loopCont = true;
×
208

209
  return TSDB_CODE_SUCCESS;
×
210
}
211

212

213
int32_t hLeftJoinHandleRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
×
214
  bool allFetched = false;
×
215
  SHJoinCtx* pCtx = &pJoin->ctx;
×
216
  
217
  hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched);
×
218
  
219
  if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
×
220
    if (allFetched) {
×
221
      ++pCtx->probeStartIdx;
×
222
    }
223

224
    *loopCont = false;
×
225
    return TSDB_CODE_SUCCESS;
×
226
  } else {
227
    ++pCtx->probeStartIdx;
×
228
  }
229

230
  *loopCont = true;
×
231

232
  return TSDB_CODE_SUCCESS;
×
233
}
234

235

236
int32_t hLeftJoinHandleProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
×
237
  SHJoinTableCtx* pProbe = pJoin->pProbe;
×
238
  SHJoinCtx* pCtx = &pJoin->ctx;
×
239
  size_t bufLen = 0;
×
240
  bool allFetched = false;
×
241

242
  for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
×
243
    if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
×
244
      continue;
×
245
    }
246
    
247
    SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
×
248
/*
249
    size_t keySize = 0;
250
    int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
251
    A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
252
    int64_t rows = getSingleKeyRowsNum(pGroup->rows);
253
    pJoin->execInfo.expectRows += rows;    
254
    qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
255
*/
256

257
    if (NULL == pGroup) {
×
258
      HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
×
259
      if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
×
260
        ++pCtx->probeStartIdx;
×
261
        *loopCont = false;
×
262
        
263
        return TSDB_CODE_SUCCESS;
×
264
      }
265

266
      continue;
×
267
    }
268
    
269
    pCtx->pBuildRow = pGroup->rows;
×
270

271
    hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched);
×
272
    if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
×
273
      if (allFetched) {
×
274
        ++pCtx->probeStartIdx;
×
275
      }
276
      *loopCont = false;
×
277
      
278
      return TSDB_CODE_SUCCESS;
×
279
    }
280
  }
281

282
  pCtx->probePhase = E_JOIN_PHASE_POST;
×
283
  *loopCont = true;
×
284

285
  return TSDB_CODE_SUCCESS;
×
286
}
287

288

289

290
int32_t hLeftJoinDo(struct SOperatorInfo* pOperator) {
×
291
  SHJoinOperatorInfo* pJoin = pOperator->info;
×
292
  SHJoinCtx* pCtx = &pJoin->ctx;
×
293

294
  while (pCtx->rowRemains) {
×
295
    switch (pCtx->probePhase) {
×
296
      case E_JOIN_PHASE_PRE: {
×
297
        int32_t rows = pCtx->probeStartIdx - pCtx->probePreIdx;
×
298
        int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows;
×
299
        if (rows <= rowsLeft) {
×
300
          HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rows));        
×
301
          pCtx->probePhase = E_JOIN_PHASE_CUR;
×
302
        } else {
303
          HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rowsLeft));
×
304
          pJoin->ctx.probePreIdx += rowsLeft;
×
305
          
306
          return TSDB_CODE_SUCCESS;
×
307
        }
308
        break;
×
309
      }
310
      case E_JOIN_PHASE_CUR: {
×
311
        bool loopCont = false;
×
312
        if (NULL == pJoin->ctx.pBuildRow) {
×
313
          HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqProbeRows(pOperator, pJoin, &loopCont) : hLeftJoinHandleProbeRows(pOperator, pJoin, &loopCont));
×
314
        } else {
315
          HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqRowRemains(pOperator, pJoin, &loopCont) : hLeftJoinHandleRowRemains(pOperator, pJoin, &loopCont));
×
316
        }
317

318
        if (!loopCont) {
×
319
          return TSDB_CODE_SUCCESS;
×
320
        }
321
        break;
×
322
      }
323
      case E_JOIN_PHASE_POST: {
×
324
        if (pCtx->probeEndIdx < (pCtx->pProbeData->info.rows - 1) && pCtx->probePostIdx <= (pCtx->pProbeData->info.rows - 1)) {
×
325
          int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows;
×
326
          int32_t rows = pCtx->pProbeData->info.rows - pCtx->probePostIdx;
×
327
          if (rows <= rowsLeft) {
×
328
            HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rows));
×
329
            pCtx->rowRemains = false;
×
330
          } else {
331
            HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rowsLeft));
×
332
            pCtx->probePostIdx += rowsLeft;
×
333
            
334
            return TSDB_CODE_SUCCESS;
×
335
          }
336
        } else {
337
          pJoin->ctx.rowRemains = false;
×
338
        }
339
        break;
×
340
      }
341
      default:
×
342
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
343
    }
344
  }
345

346
  return TSDB_CODE_SUCCESS;
×
347
}
348

349

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc