• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3911

24 Apr 2025 11:36PM UTC coverage: 53.735% (-1.6%) from 55.295%
#3911

push

travis-ci

happyguoxy
Sync branches at 2025-04-25 07:35

170049 of 316459 relevant lines covered (53.73%)

1192430.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.05
/source/libs/executor/src/mergejoin.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "mergejoin.h"
29

30
static uint32_t mJoinGetFinBlkCapacity(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
26✔
31
  uint32_t maxRows = TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
26✔
32
  if (INT64_MAX != pJoin->ctx.mergeCtx.limit && NULL == pJoin->pFinFilter) {
26✔
33
    uint32_t limitMaxRows = pJoin->ctx.mergeCtx.limit / MJOIN_BLK_THRESHOLD_RATIO + 1;
4✔
34
    maxRows = TMIN(maxRows, limitMaxRows);
4✔
35
  }
36

37
  if (JOIN_STYPE_SEMI == pJoinNode->subType || JOIN_STYPE_ANTI == pJoinNode->subType) {
26✔
38
    maxRows = TMIN(MJOIN_SEMI_ANTI_BLK_ROWS_NUM, maxRows);
×
39
  }
40

41
  return maxRows;
26✔
42
}
43

44
static FORCE_INLINE bool mJoinBlkReachThreshold(SMJoinOperatorInfo* pInfo, int64_t blkRows) {
45
  if (INT64_MAX == pInfo->ctx.mergeCtx.limit || pInfo->pFinFilter != NULL) {
4,000,000✔
46
    return blkRows >= pInfo->ctx.mergeCtx.blkThreshold;
2,000,200✔
47
  }
48
  
49
  return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.mergeCtx.limit || blkRows >= pInfo->ctx.mergeCtx.blkThreshold;
4,000,000✔
50
}
51

52

53
int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
×
54
  int64_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
×
55
  SMJoinWinCache* cache = &pCtx->cache;
×
56
  int32_t buildGrpNum = taosArrayGetSize(cache->grps);
×
57
  int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit);
×
58

59
  pCtx->finBlk->info.id.groupId = pCtx->seqWinGrp ? pCtx->pJoin->outGrpId : 0;
×
60

61
  if (buildGrpNum <= 0 || buildTotalRows <= 0) {
×
62
    MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp));   
×
63
    if (pCtx->seqWinGrp) {
×
64
      pCtx->pJoin->outGrpId++;
×
65
    }
66
    return TSDB_CODE_SUCCESS;
×
67
  }
68
  
69
  SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
×
70
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
×
71
  int32_t probeEndIdx = probeGrp->endIdx;
×
72

73
  if ((!pCtx->seqWinGrp) && 0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) {
×
74
    SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0);
×
75
    if (NULL == pFirstBuild) {
×
76
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
77
    }
78
    if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
×
79
      for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) {
×
80
        SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
×
81
        if (NULL == buildGrp) {
×
82
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
83
        }
84
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
85
        buildGrp->readIdx = buildGrp->beginIdx;
×
86
      }
87

88
      cache->grpIdx = 0;
×
89
      pCtx->grpRemains = false;
×
90
      return TSDB_CODE_SUCCESS;
×
91
    }
92
  }
93

94
  for (; !GRP_DONE(probeGrp); ) {
×
95
    probeGrp->endIdx = probeGrp->readIdx;
×
96
    for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) {
×
97
      SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
×
98
      if (NULL == buildGrp) {
×
99
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
100
      }
101

102
      if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
×
103
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
104
        rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
×
105
        buildGrp->readIdx = buildGrp->beginIdx;
×
106
        continue;
×
107
      }
108
      
109
      int32_t buildEndIdx = buildGrp->endIdx;
×
110
      buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
111
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
112
      buildGrp->readIdx += rowsLeft;
×
113
      buildGrp->endIdx = buildEndIdx;
×
114
      rowsLeft = 0;
×
115
      break;
×
116
    }
117
    probeGrp->endIdx = probeEndIdx;
×
118

119
    if (cache->grpIdx >= buildGrpNum) {
×
120
      cache->grpIdx = 0;
×
121
      ++probeGrp->readIdx; 
×
122
      if (pCtx->seqWinGrp) {
×
123
        pCtx->pJoin->outGrpId++;
×
124
        break;
×
125
      }
126
    }
127

128
    if (rowsLeft <= 0) {
×
129
      break;
×
130
    }
131
  }
132

133
  probeGrp->endIdx = probeEndIdx;        
×
134

135
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
136
  
137
  return TSDB_CODE_SUCCESS;  
×
138
}
139

140
static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) {
×
141
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
142
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
143
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
144

145
  if (build->grpRowIdx >= 0) {
×
146
    bool contLoop = false;
×
147
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
×
148
    if (build->grpRowIdx < 0) {
×
149
      probeGrp->readIdx++;
×
150
    }
151
    
152
    if (!contLoop) {
×
153
      goto _return;
×
154
    }
155
  }
156

157
  size_t bufLen = 0;
×
158
  int32_t probeEndIdx = probeGrp->endIdx;
×
159
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
×
160
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
161
      probeGrp->endIdx = probeGrp->readIdx;
×
162
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
163
      probeGrp->endIdx = probeEndIdx;
×
164
      continue;
×
165
    }
166

167
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
168
    if (NULL == pGrp) {
×
169
      probeGrp->endIdx = probeGrp->readIdx;
×
170
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
171
      probeGrp->endIdx = probeEndIdx;
×
172
      continue;
×
173
    }
174

175
    if (build->rowBitmapSize > 0) {
×
176
      build->pHashCurGrp = ((SMJoinHashGrpRows*)pGrp)->pRows;
×
177
      build->pHashGrpRows = pGrp;
×
178
      build->pHashGrpRows->allRowsMatch = true;
×
179
    } else {
180
      build->pHashCurGrp = *(SArray**)pGrp;
×
181
    }
182
    
183
    build->grpRowIdx = 0;
×
184
    bool contLoop = false;
×
185
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
×
186
    if (!contLoop) {
×
187
      if (build->grpRowIdx < 0) {
×
188
        probeGrp->readIdx++;
×
189
      }
190
      goto _return;
×
191
    }  
192
  }
193

194
_return:
×
195

196
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
197

198
  return TSDB_CODE_SUCCESS;
×
199
}
200

201

202
static int32_t mOuterJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
3,000,000✔
203
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
3,000,000✔
204
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
3,000,000✔
205
  SMJoinTableCtx* build = pCtx->pJoin->build;
3,000,000✔
206
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
3,000,000✔
207
  if (NULL == probeGrp) {
3,000,000✔
208
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
209
  }
210

211
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
3,000,000✔
212
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
3,000,000✔
213
  int32_t probeEndIdx = probeGrp->endIdx;
3,000,000✔
214

215
  if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) {
3,000,000✔
216
    SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0);
3,000,000✔
217
    if (NULL == pFirstBuild) {
3,000,000✔
218
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
219
    }
220

221
    if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
3,000,000✔
222
      for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
6,000,000✔
223
        SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
3,000,000✔
224
        if (NULL == buildGrp) {
3,000,000✔
225
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
226
        }
227

228
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
3,000,000✔
229
        buildGrp->readIdx = buildGrp->beginIdx;
3,000,000✔
230
      }
231

232
      pCtx->grpRemains = false;
3,000,000✔
233
      return TSDB_CODE_SUCCESS;
3,000,000✔
234
    }
235
  }
236

237
  for (; !GRP_DONE(probeGrp); ) {
×
238
    probeGrp->endIdx = probeGrp->readIdx;
×
239
    for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
×
240
      SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
×
241
      if (NULL == buildGrp) {
×
242
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
243
      }
244

245
      if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
×
246
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
247
        rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
×
248
        buildGrp->readIdx = buildGrp->beginIdx;
×
249
        continue;
×
250
      }
251
      
252
      int32_t buildEndIdx = buildGrp->endIdx;
×
253
      buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
254
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
255
      buildGrp->readIdx += rowsLeft;
×
256
      buildGrp->endIdx = buildEndIdx;
×
257
      rowsLeft = 0;
×
258
      break;
×
259
    }
260
    probeGrp->endIdx = probeEndIdx;
×
261

262
    if (build->grpIdx >= buildGrpNum) {
×
263
      build->grpIdx = 0;
×
264
      ++probeGrp->readIdx; 
×
265
    }
266

267
    if (rowsLeft <= 0) {
×
268
      break;
×
269
    }
270
  }
271

272
  probeGrp->endIdx = probeEndIdx;        
×
273

274
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
275
  
276
  return TSDB_CODE_SUCCESS;  
×
277
}
278

279
static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
×
280
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
281
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
282
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
283
  if (NULL == probeGrp) {
×
284
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
285
  }
286

287
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
×
288
  int32_t probeEndIdx = probeGrp->endIdx;
×
289
  int32_t rowsLeft = pCtx->midBlk->info.capacity;  
×
290
  bool contLoop = true;
×
291
  int32_t startGrpIdx = 0;
×
292
  int32_t startRowIdx = -1;
×
293

294
  //blockDataCleanup(pCtx->midBlk);
295

296
  do {
297
    for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); 
×
298
      ++probeGrp->readIdx, probeGrp->readMatch = false, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
×
299
      probeGrp->endIdx = probeGrp->readIdx;
×
300
      
301
      rowsLeft = pCtx->midBlk->info.capacity;
×
302
      startGrpIdx = build->grpIdx;
×
303
      startRowIdx = -1;
×
304
      
305
      for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
×
306
        SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
×
307
        if (NULL == buildGrp) {
×
308
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
309
        }
310

311
        if (startRowIdx < 0) {
×
312
          startRowIdx = buildGrp->readIdx;
×
313
        }
314

315
        if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
×
316
          MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
317
          rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
×
318
          buildGrp->readIdx = buildGrp->beginIdx;
×
319
          continue;
×
320
        }
321
        
322
        int32_t buildEndIdx = buildGrp->endIdx;
×
323
        buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
324
        //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
325
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
326
        buildGrp->readIdx += rowsLeft;
×
327
        buildGrp->endIdx = buildEndIdx;
×
328
        break;
×
329
      }
330

331
      if (pCtx->midBlk->info.rows > 0) {
×
332
        if (build->rowBitmapSize > 0) {
×
333
          MJ_ERR_RET(mJoinFilterAndMarkRows(pCtx->midBlk, pCtx->pJoin->pFPreFilter, build, startGrpIdx, startRowIdx));
×
334
        } else {
335
          MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pFPreFilter, NULL));
×
336
        }
337

338
        if (pCtx->midBlk->info.rows > 0) {
×
339
          probeGrp->readMatch = true;
×
340
        }
341
      } 
342

343
      if (0 == pCtx->midBlk->info.rows) {
×
344
        if (build->grpIdx == buildGrpNum) {
×
345
          if (!probeGrp->readMatch) {
×
346
            MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
347
          }
348

349
          continue;
×
350
        }
351
      } else {
352
        MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
×
353
        
354
        if (pCtx->midRemains) {
×
355
          contLoop = false;
×
356
        } else if (build->grpIdx == buildGrpNum) {
×
357
          continue;
×
358
        }
359
      }
360

361
      //need break
362

363
      probeGrp->endIdx = probeEndIdx;
×
364
      
365
      if (build->grpIdx >= buildGrpNum) {
×
366
        build->grpIdx = 0;
×
367
        ++probeGrp->readIdx;
×
368
        probeGrp->readMatch = false;
×
369
      }      
370

371
      break;
×
372
    }
373

374
    if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
×
375
      break;
376
    }
377
  } while (contLoop);
×
378

379
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
380

381
  return TSDB_CODE_SUCCESS;
×
382
}
383

384
static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) {
×
385
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
386
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
387
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
388
  if (NULL == probeGrp) {
×
389
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
390
  }
391

392
  int32_t startRowIdx = 0;
×
393
  
394
  //blockDataCleanup(pCtx->midBlk);
395

396
  do {
397
    startRowIdx = build->grpRowIdx;
×
398
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
×
399

400
    if (pCtx->midBlk->info.rows > 0) {
×
401
      if (build->rowBitmapSize > 0) {
×
402
        MJ_ERR_RET(mJoinFilterAndMarkHashRows(pCtx->midBlk, pCtx->pJoin->pPreFilter, build, startRowIdx));
×
403
      } else {
404
        MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pPreFilter, NULL));
×
405
      }
406
      if (pCtx->midBlk->info.rows > 0) {
×
407
        probeGrp->readMatch = true;
×
408
      }
409
    } 
410

411
    if (0 == pCtx->midBlk->info.rows) {
×
412
      if (build->grpRowIdx < 0) {
×
413
        if (!probeGrp->readMatch) {
×
414
          MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
415
        }
416

417
        break;
×
418
      }
419
      
420
      continue;
×
421
    } else {
422
      MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
×
423
      
424
      if (pCtx->midRemains) {
×
425
        pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
426
        *contLoop = false;
×
427
        return TSDB_CODE_SUCCESS;
×
428
      }
429

430
      if (build->grpRowIdx < 0) {
×
431
        break;
×
432
      }
433

434
      continue;
×
435
    }
436
  } while (true);
437

438
  *contLoop = true;
×
439
  return TSDB_CODE_SUCCESS;
×
440
}
441

442

443
static int32_t mOuterJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
×
444
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
445
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
446
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
×
447
  if (NULL == probeGrp) {
×
448
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
449
  }
450

451
  bool contLoop = false;
×
452

453
  if (build->grpRowIdx >= 0) {
×
454
    MJ_ERR_RET(mOuterJoinHashGrpCartFilter(pCtx, &contLoop));
×
455
    if (build->grpRowIdx < 0) {
×
456
      probeGrp->readIdx++;
×
457
      probeGrp->readMatch = false;
×
458
    }
459

460
    if (!contLoop) {
×
461
      goto _return;
×
462
    }
463
  }
464

465
  size_t bufLen = 0;
×
466
  int32_t probeEndIdx = probeGrp->endIdx;
×
467
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);) {
×
468
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
469
      probeGrp->endIdx = probeGrp->readIdx;
×
470
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
471
      probeGrp->endIdx = probeEndIdx;
×
472
      probeGrp->readIdx++;
×
473
      probeGrp->readMatch = false;
×
474
      continue;
×
475
    }
476

477
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
478
    if (NULL == pGrp) {
×
479
      probeGrp->endIdx = probeGrp->readIdx;
×
480
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
481
      probeGrp->endIdx = probeEndIdx;
×
482
      probeGrp->readIdx++;
×
483
      probeGrp->readMatch = false;
×
484
      continue;
×
485
    }
486

487
    if (build->rowBitmapSize > 0) {
×
488
      build->pHashCurGrp = ((SMJoinHashGrpRows*)pGrp)->pRows;
×
489
      build->pHashGrpRows = pGrp;
×
490
      if (0 == build->pHashGrpRows->rowBitmapOffset) {
×
491
        MJ_ERR_RET(mJoinGetRowBitmapOffset(build, taosArrayGetSize(build->pHashCurGrp), &build->pHashGrpRows->rowBitmapOffset));
×
492
      }
493
    } else {
494
      build->pHashCurGrp = *(SArray**)pGrp;
×
495
    }
496
    
497
    build->grpRowIdx = 0;
×
498

499
    probeGrp->endIdx = probeGrp->readIdx;      
×
500
    MJ_ERR_RET(mOuterJoinHashGrpCartFilter(pCtx, &contLoop));
×
501
    probeGrp->endIdx = probeEndIdx;
×
502
    if (build->grpRowIdx < 0) {
×
503
      probeGrp->readIdx++;
×
504
      probeGrp->readMatch = false;
×
505
    }
506

507
    if (!contLoop) {
×
508
      break;
×
509
    }
510
  }
511

512
_return:
×
513

514
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
515

516
  return TSDB_CODE_SUCCESS;
×
517
}
518

519

520
static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) {
3,000,000✔
521
  return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx);
3,000,000✔
522
}
523

524

525

526
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
792✔
527
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
792✔
528
  bool buildGot = false;
792✔
529

530
  do {
531
    if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {  
792✔
532
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
791✔
533
    }
534
    
535
    if (!probeGot) {
792✔
536
      if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
1✔
537
        mJoinSetDone(pOperator);
1✔
538
      }
539

540
      return false;
1✔
541
    }
542

543
    if (buildGot) {
791✔
544
      SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
791✔
545
      if (NULL == pProbeCol) {
791✔
546
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
547
      }
548

549
      SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
791✔
550
      if (NULL == pBuildCol) {
791✔
551
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
552
      }
553

554
      if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
791✔
555
        pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
×
556
        buildGot = false;
×
557
        continue;
×
558
      }
559
    }
560
    
561
    break;
791✔
562
  } while (true);
563

564
  return true;
791✔
565
}
566

567
static int32_t mLeftJoinHashCart(SMJoinMergeCtx* pCtx) {
×
568
  return (NULL == pCtx->pJoin->pPreFilter) ? mOuterJoinHashFullCart(pCtx) : mOuterJoinHashSeqCart(pCtx);
×
569
}
570

571
static FORCE_INLINE int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
572
  if (pCtx->lastEqGrp) {
×
573
    return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
574
  }
575
  
576
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
×
577
}
578

579
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
59✔
580
  SMJoinOperatorInfo* pJoin = pOperator->info;
59✔
581
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
59✔
582
  int32_t code = TSDB_CODE_SUCCESS;
59✔
583
  int64_t probeTs = 0;
59✔
584
  int64_t buildTs = 0;
59✔
585
  SColumnInfoData* pBuildCol = NULL;
59✔
586
  SColumnInfoData* pProbeCol = NULL;
59✔
587

588
  blockDataCleanup(pCtx->finBlk);
59✔
589

590
  if (pCtx->midRemains) {
59✔
591
    MJ_ERR_JRET(mJoinHandleMidRemains(pCtx));
×
592
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
593
      return pCtx->finBlk;
×
594
    }
595
    pCtx->midRemains = false;
×
596
  }
597

598
  if (pCtx->grpRemains) {
59✔
599
    MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx));
×
600
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
601
      return pCtx->finBlk;
×
602
    }
603
    pCtx->grpRemains = false;
×
604
  }
605

606
  do {
607
    if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
792✔
608
      if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
1✔
609
        continue;
×
610
      }
611

612
      break;
1✔
613
    }
614

615
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
791✔
616
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
791✔
617
    
618
    if (probeTs == pCtx->lastEqTs) {
791✔
619
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
620
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
621
        return pCtx->finBlk;
×
622
      }
623

624
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
625
        continue;
×
626
      } else {
627
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
628
      }
629
    }
630

631
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
3,000,733✔
632
      if (probeTs == buildTs) {
3,000,000✔
633
        pCtx->lastEqTs = probeTs;
3,000,000✔
634
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
3,000,000✔
635
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
6,000,000✔
636
          return pCtx->finBlk;
58✔
637
        }
638

639
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
2,999,942✔
640
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
2,999,942✔
641
      } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
×
642
        MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
×
643
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
644
          return pCtx->finBlk;
×
645
        }
646
      } else {
647
        while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
×
648
          MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
×
649
          if (PROBE_TS_NREACH(pCtx->ascTs, probeTs, buildTs)) {
×
650
            continue;
×
651
          }
652
          
653
          break;
×
654
        }
655
      }
656
    }
657

658
    if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
733✔
659
      pCtx->probeNEqGrp.blk = pJoin->probe->blk;
×
660
      pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
×
661
      pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
×
662
      pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
×
663
      
664
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
×
665
            
666
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
×
667
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
668
        return pCtx->finBlk;
×
669
      }
670
    }
671
  } while (true);
672

673
_return:
1✔
674

675
  if (code) {
1✔
676
    pJoin->errCode = code;
×
677
    return NULL;
×
678
  }
679

680
  return pCtx->finBlk;
1✔
681
}
682

683
void mLeftJoinGroupReset(SMJoinOperatorInfo* pJoin) {
×
684
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
×
685

686
  pCtx->lastEqGrp = false;
×
687
  pCtx->lastProbeGrp = false;
×
688
  pCtx->hashCan = false;
×
689
  pCtx->midRemains = false;
×
690
  pCtx->lastEqTs = INT64_MIN;
×
691

692
  mJoinResetGroupTableCtx(pJoin->probe);
×
693
  mJoinResetGroupTableCtx(pJoin->build);    
×
694
}
×
695

696

697
static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) {
3,000,200✔
698
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
3,000,200✔
699
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
3,000,200✔
700
  SMJoinTableCtx* build = pCtx->pJoin->build;
3,000,200✔
701
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
3,000,200✔
702
  if (NULL == probeGrp) {
3,000,200✔
703
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
704
  }
705

706
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
3,000,200✔
707
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
3,000,200✔
708
  int32_t probeEndIdx = probeGrp->endIdx;
3,000,200✔
709

710
  if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) {
3,000,200✔
711
    SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0);
3,000,200✔
712
    if (NULL == pFirstBuild) {
3,000,200✔
713
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
714
    }
715

716
    if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
3,000,200✔
717
      for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
6,000,400✔
718
        SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
3,000,200✔
719
        if (NULL == buildGrp) {
3,000,200✔
720
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
721
        }
722

723
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
3,000,200✔
724
        buildGrp->readIdx = buildGrp->beginIdx;
3,000,200✔
725
      }
726

727
      pCtx->grpRemains = false;
3,000,200✔
728
      return TSDB_CODE_SUCCESS;
3,000,200✔
729
    }
730
  }
731

732
  for (; !GRP_DONE(probeGrp); ) {
×
733
    probeGrp->endIdx = probeGrp->readIdx;
×
734
    for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
×
735
      SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
×
736
      if (NULL == buildGrp) {
×
737
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
738
      }
739

740
      if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
×
741
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
742
        rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
×
743
        buildGrp->readIdx = buildGrp->beginIdx;
×
744
        continue;
×
745
      }
746
      
747
      int32_t buildEndIdx = buildGrp->endIdx;
×
748
      buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
749
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
750
      buildGrp->readIdx += rowsLeft;
×
751
      buildGrp->endIdx = buildEndIdx;
×
752
      rowsLeft = 0;
×
753
      break;
×
754
    }
755
    probeGrp->endIdx = probeEndIdx;
×
756

757
    if (build->grpIdx >= buildGrpNum) {
×
758
      build->grpIdx = 0;
×
759
      ++probeGrp->readIdx; 
×
760
    }
761

762
    if (rowsLeft <= 0) {
×
763
      break;
×
764
    }
765
  }
766

767
  probeGrp->endIdx = probeEndIdx;        
×
768

769
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
770
  
771
  return TSDB_CODE_SUCCESS;  
×
772
}
773

774

775
static int32_t mInnerJoinHashCart(SMJoinMergeCtx* pCtx) {
×
776
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
777
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
778
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
779
  if (NULL == probeGrp) {
×
780
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
781
  }
782

783
  if (build->grpRowIdx >= 0) {
×
784
    bool contLoop = false;
×
785
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
×
786
    if (build->grpRowIdx < 0) {
×
787
      probeGrp->readIdx++;
×
788
    }
789
    
790
    if (!contLoop) {
×
791
      goto _return;
×
792
    }
793
  }
794

795
  size_t bufLen = 0;
×
796
  int32_t probeEndIdx = probeGrp->endIdx;
×
797
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
×
798
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
799
      continue;
×
800
    }
801

802
    SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
803
    if (NULL != pGrp) {
×
804
      build->pHashCurGrp = *pGrp;
×
805
      build->grpRowIdx = 0;
×
806
      bool contLoop = false;
×
807
      MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
×
808
      if (!contLoop) {
×
809
        if (build->grpRowIdx < 0) {
×
810
          probeGrp->readIdx++;
×
811
        }
812
        goto _return;
×
813
      }  
814
    }
815
  }
816

817
_return:
×
818

819
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
820

821
  return TSDB_CODE_SUCCESS;
×
822
}
823

824
static FORCE_INLINE int32_t mInnerJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
825
  return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
826
}
827

828

829
static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
832✔
830
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
832✔
831
  bool buildGot = false;
832✔
832

833
  do {
834
    if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {  
832✔
835
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
811✔
836
    }
837
    
838
    if (!probeGot) {
832✔
839
      mJoinSetDone(pOperator);
21✔
840
      return false;
21✔
841
    }
842

843
    if (buildGot) {
811✔
844
      SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
811✔
845
      if (NULL == pProbeCol) {
811✔
846
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
847
      }
848

849
      SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
811✔
850
      if (NULL == pBuildCol) {
811✔
851
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
852
      }
853

854
      if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
811✔
855
        pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
×
856
        buildGot = false;
×
857
        continue;
×
858
      }
859
    }
860

861
    break;
811✔
862
  } while (true);
863

864
  return true;
811✔
865
}
866

867

868
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
79✔
869
  SMJoinOperatorInfo* pJoin = pOperator->info;
79✔
870
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
79✔
871
  int32_t code = TSDB_CODE_SUCCESS;
79✔
872
  int64_t probeTs = 0;
79✔
873
  int64_t buildTs = 0;
79✔
874
  SColumnInfoData* pBuildCol = NULL;
79✔
875
  SColumnInfoData* pProbeCol = NULL;
79✔
876

877
  blockDataCleanup(pCtx->finBlk);
79✔
878

879
  if (pCtx->grpRemains) {
79✔
880
    MJ_ERR_JRET(mInnerJoinHandleGrpRemains(pCtx));
×
881
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
882
      return pCtx->finBlk;
×
883
    }
884
    pCtx->grpRemains = false;
×
885
  }
886

887
  do {
888
    if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) {
832✔
889
      break;
21✔
890
    }
891

892
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
811✔
893
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
811✔
894
    
895
    if (probeTs == pCtx->lastEqTs) {
811✔
896
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
897
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
898
        return pCtx->finBlk;
×
899
      }
900

901
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
902
        continue;
×
903
      } 
904

905
      MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
906
    } else if (MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
811✔
907
      mJoinSetDone(pOperator);
×
908
      break;
×
909
    }
910

911
    do {
912
      if (probeTs == buildTs) {
3,000,200✔
913
        pCtx->lastEqTs = probeTs;
3,000,200✔
914
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
3,000,200✔
915
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
6,000,400✔
916
          return pCtx->finBlk;
58✔
917
        }
918

919
        if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
3,000,142✔
920
          break;
921
        }
922
        
923
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
2,999,389✔
924
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
2,999,389✔
925
        continue;
2,999,389✔
926
      }
927

928
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
×
929
        if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
×
930
          MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
931
          continue;
×
932
        }
933
      } else {
934
        if (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
×
935
          MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
×
936
          continue;
×
937
        }
938
      }
939
      
940
      break;
×
941
    } while (true);
942
  } while (true);
943

944
_return:
21✔
945

946
  if (code) {
21✔
947
    pJoin->errCode = code;
×
948
    return NULL;
×
949
  }
950

951
  return pCtx->finBlk;
21✔
952
}
953

954
static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
955
  if (pCtx->lastEqGrp) {
×
956
    return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
957
  }
958
  
959
  return pCtx->lastProbeGrp ? mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false) : mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false);
×
960
}
961

962
static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
×
963
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
×
964
  bool buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
×
965
  
966
  if (!probeGot && !buildGot) {
×
967
    return false;
×
968
  }
969

970
  return true;
×
971
}
972

973
static FORCE_INLINE int32_t mFullJoinHashCart(SMJoinMergeCtx* pCtx) {
×
974
  return (NULL == pCtx->pJoin->pPreFilter) ? mOuterJoinHashFullCart(pCtx) : mOuterJoinHashSeqCart(pCtx);
×
975
}
976

977
static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) {
×
978
  return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx);
×
979
}
980

981
static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) {
982
  SMJoinGrpRows grp = {0};
×
983
  SMJoinRowPos* pPos = taosArrayGet(pGrpRows->pRows, idx);
×
984
  if (NULL == pPos) {
×
985
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
986
  }
987

988
  grp.blk = pPos->pBlk;
×
989
  grp.readIdx = pPos->pos;
×
990
  grp.endIdx = pPos->pos;
×
991
  return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false);
×
992
}
993

994
static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch, bool* grpDone) {
×
995
  int32_t rowNum = taosArrayGetSize(pGrpRows->pRows);
×
996
  for (; pNMatch->rowIdx < rowNum && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) {
×
997
    MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, pNMatch->rowIdx));
×
998
  }
999

1000
  if (pNMatch->rowIdx >= rowNum) {
×
1001
    *grpDone = true;
×
1002
    pNMatch->rowIdx = 0;
×
1003
  }
1004
  
1005
  return TSDB_CODE_SUCCESS;
×
1006
}
1007

1008
static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) {
×
1009
  static const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2};
1010
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1011
  SMJoinNMatchCtx* pNMatch = &build->nMatchCtx;
×
1012
  if (NULL == pNMatch->pGrp) {
×
1013
    pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1014
    pNMatch->bitIdx = 0;
×
1015
  }
1016

1017
  int32_t baseIdx = 0;
×
1018
  while (NULL != pNMatch->pGrp) {
×
1019
    SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)pNMatch->pGrp;
×
1020
    if (pGrpRows->allRowsMatch) {
×
1021
      pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1022
      pNMatch->bitIdx = 0;
×
1023
      continue;
×
1024
    }
1025
  
1026
    if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) {
×
1027
      pGrpRows->allRowsNMatch = true;
×
1028

1029
      bool grpDone = false;      
×
1030
      MJ_ERR_RET(mFullJoinOutputHashGrpRows(pCtx, pGrpRows, pNMatch, &grpDone));
×
1031
      if (BLK_IS_FULL(pCtx->finBlk)) {
×
1032
        if (grpDone) {
×
1033
          pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1034
          pNMatch->bitIdx = 0;      
×
1035
        }
1036
        
1037
        pCtx->nmatchRemains = true;
×
1038
        return TSDB_CODE_SUCCESS;
×
1039
      }
1040

1041
      pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1042
      pNMatch->bitIdx = 0;      
×
1043
      continue;
×
1044
    }
1045

1046
    int32_t grpRowNum = taosArrayGetSize(pGrpRows->pRows);
×
1047
    int32_t bitBytes = BitmapLen(grpRowNum);
×
1048
    for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) {
×
1049
      if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) {
×
1050
        continue;
×
1051
      }
1052

1053
      baseIdx = 8 * pNMatch->bitIdx;
×
1054
      char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx];
×
1055
      while (*v && !BLK_IS_FULL(pCtx->finBlk)) {
×
1056
        uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11];
×
1057
        if (baseIdx + n >= grpRowNum) {
×
1058
          MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
×
1059
          continue;
×
1060
        }
1061

1062
        MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, baseIdx + n));
×
1063
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
×
1064
        if (++pGrpRows->rowMatchNum == taosArrayGetSize(pGrpRows->pRows)) {
×
1065
          pGrpRows->allRowsMatch = true;
×
1066
          pNMatch->bitIdx = bitBytes;
×
1067
          break;
×
1068
        }
1069
      }
1070
  
1071
      if (BLK_IS_FULL(pCtx->finBlk)) {
×
1072
        if (pNMatch->bitIdx == bitBytes) {
×
1073
          pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1074
          pNMatch->bitIdx = 0;      
×
1075
        }
1076

1077
        pCtx->nmatchRemains = true;
×
1078
        return TSDB_CODE_SUCCESS;
×
1079
      }
1080
    }
1081

1082
    pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1083
    pNMatch->bitIdx = 0;
×
1084
  }
1085
  
1086
  pCtx->nmatchRemains = false;
×
1087
  pCtx->lastEqGrp = false;
×
1088
  
1089
  return TSDB_CODE_SUCCESS;
×
1090
}
1091

1092
static FORCE_INLINE int32_t mFullJoinOutputMergeRow(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, int32_t idx) {
1093
  SMJoinGrpRows grp = {0};
×
1094
  grp.blk = pGrpRows->blk;
×
1095
  grp.readIdx = idx;
×
1096
  grp.endIdx = idx;
×
1097
  return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false);
×
1098
}
1099

1100

1101
static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch, bool* grpDone) {
×
1102
  for (; pNMatch->rowIdx <= pGrpRows->endIdx && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) {
×
1103
    MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, pNMatch->rowIdx));
×
1104
  }
1105

1106
  if (pNMatch->rowIdx > pGrpRows->endIdx) {
×
1107
    *grpDone = true;
×
1108
    pNMatch->rowIdx = 0;
×
1109
  }
1110
  
1111
  return TSDB_CODE_SUCCESS;
×
1112
}
1113

1114

1115
static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
×
1116
  static const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2};
1117
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1118
  SMJoinNMatchCtx* pNMatch = &build->nMatchCtx;
×
1119
  bool grpDone = false;
×
1120
  int32_t baseIdx = 0;
×
1121
  int32_t rowNum = 0;
×
1122
  int32_t grpNum = taosArrayGetSize(build->eqGrps);
×
1123
  for (; pNMatch->grpIdx < grpNum; ++pNMatch->grpIdx, pNMatch->bitIdx = 0) {
×
1124
    grpDone = false;
×
1125
    
1126
    SMJoinGrpRows* pGrpRows = taosArrayGet(build->eqGrps, pNMatch->grpIdx);
×
1127
    if (NULL == pGrpRows) {
×
1128
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1129
    }
1130

1131
    if (pGrpRows->allRowsMatch) {
×
1132
      continue;
×
1133
    }
1134

1135
    if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) {
×
1136
      if (!pGrpRows->allRowsNMatch) {
×
1137
        pGrpRows->allRowsNMatch = true;
×
1138
        pNMatch->rowIdx = pGrpRows->beginIdx;
×
1139
      }
1140
      
1141
      MJ_ERR_RET(mFullJoinOutputMergeGrpRows(pCtx, pGrpRows, pNMatch, &grpDone));
×
1142

1143
      if (BLK_IS_FULL(pCtx->finBlk)) {
×
1144
        if (grpDone) {
×
1145
          ++pNMatch->grpIdx;
×
1146
          pNMatch->bitIdx = 0;
×
1147
        }
1148
        
1149
        pCtx->nmatchRemains = true;
×
1150
        return TSDB_CODE_SUCCESS;
×
1151
      }
1152

1153
      continue;
×
1154
    }
1155

1156
    int32_t bitBytes = BitmapLen(pGrpRows->endIdx - pGrpRows->beginIdx + 1);
×
1157
    rowNum = pGrpRows->endIdx - pGrpRows->beginIdx + 1;
×
1158
    for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) {
×
1159
      if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) {
×
1160
        continue;
×
1161
      }
1162

1163
      baseIdx = 8 * pNMatch->bitIdx;
×
1164
      char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx];
×
1165
      while (*v && !BLK_IS_FULL(pCtx->finBlk)) {
×
1166
        uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11];
×
1167
        if (pGrpRows->beginIdx + baseIdx + n > pGrpRows->endIdx) {
×
1168
          MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
×
1169
          continue;
×
1170
        }
1171
        
1172
        MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, pGrpRows->beginIdx + baseIdx + n));
×
1173

1174
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
×
1175
        if (++pGrpRows->rowMatchNum == rowNum) {
×
1176
          pGrpRows->allRowsMatch = true;
×
1177
          pNMatch->bitIdx = bitBytes;
×
1178
          break;
×
1179
        }
1180
      }
1181

1182
      if (BLK_IS_FULL(pCtx->finBlk)) {
×
1183
        break;
×
1184
      }
1185
    }
1186

1187
    if (BLK_IS_FULL(pCtx->finBlk)) {
×
1188
      if (pNMatch->bitIdx >= bitBytes) {
×
1189
        ++pNMatch->grpIdx;
×
1190
        pNMatch->bitIdx = 0;
×
1191
      }
1192
      
1193
      pCtx->nmatchRemains = true;
×
1194
      return TSDB_CODE_SUCCESS;
×
1195
    }      
1196
  }
1197

1198
  pCtx->nmatchRemains = false;
×
1199
  pCtx->lastEqGrp = false;  
×
1200
  
1201
  return TSDB_CODE_SUCCESS;  
×
1202
}
1203

1204
static int32_t mFullJoinHandleBuildTableRemains(SMJoinMergeCtx* pCtx) {
×
1205
  return pCtx->hashJoin ? mFullJoinHandleHashGrpRemains(pCtx) : mFullJoinHandleMergeGrpRemains(pCtx);
×
1206
}
1207

1208
SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
×
1209
  SMJoinOperatorInfo* pJoin = pOperator->info;
×
1210
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
×
1211
  int32_t code = TSDB_CODE_SUCCESS;
×
1212
  int64_t probeTs = 0;
×
1213
  int64_t buildTs = 0;
×
1214
  SColumnInfoData* pBuildCol = NULL;
×
1215
  SColumnInfoData* pProbeCol = NULL;
×
1216

1217
  blockDataCleanup(pCtx->finBlk);
×
1218

1219
  if (pCtx->midRemains) {
×
1220
    MJ_ERR_JRET(mJoinHandleMidRemains(pCtx));
×
1221
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1222
      return pCtx->finBlk;
×
1223
    }
1224
    pCtx->midRemains = false;
×
1225
  }
1226

1227
  if (pCtx->grpRemains) {
×
1228
    MJ_ERR_JRET(mFullJoinHandleGrpRemains(pCtx));
×
1229
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1230
      return pCtx->finBlk;
×
1231
    }
1232
    pCtx->grpRemains = false;
×
1233
  }
1234

1235
  if (pCtx->nmatchRemains) {
×
1236
    MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1237
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1238
      return pCtx->finBlk;
×
1239
    }
1240
  }
1241

1242
  do {
1243
    if (!mFullJoinRetrieve(pOperator, pJoin)) {
×
1244
      if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
×
1245
        MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1246
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1247
          return pCtx->finBlk;
×
1248
        }
1249
      }
1250

1251
      mJoinSetDone(pOperator);      
×
1252
      break;
×
1253
    }
1254

1255
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
1256
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
1257
    
1258
    if (probeTs == pCtx->lastEqTs) {
×
1259
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
1260
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1261
        return pCtx->finBlk;
×
1262
      }
1263

1264
      if (FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
1265
        continue;
×
1266
      } else {
1267
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
1268
      }
1269
    }
1270

1271
    if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
×
1272
      MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1273
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1274
        return pCtx->finBlk;
×
1275
      }
1276
    }
1277

1278
    while (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
1279
      if (probeTs == buildTs) {
×
1280
        pCtx->lastEqTs = probeTs;
×
1281
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
×
1282
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1283
          return pCtx->finBlk;
×
1284
        }
1285

1286
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
1287
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
1288

1289
        if (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && probeTs != pCtx->lastEqTs && pJoin->build->rowBitmapSize > 0) {
×
1290
          MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1291
          if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1292
            return pCtx->finBlk;
×
1293
          }
1294
        }
1295

1296
        continue;
×
1297
      }
1298

1299
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
×
1300
        MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
×
1301
      } else {
1302
        MJ_ERR_JRET(mJoinProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
×
1303
      }
1304

1305
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1306
        return pCtx->finBlk;
×
1307
      }
1308
    }
1309

1310
    if (pJoin->build->dsFetchDone && !FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
1311
      if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
×
1312
        MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1313
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1314
          return pCtx->finBlk;
×
1315
        }
1316
      }
1317
      
1318
      pCtx->probeNEqGrp.blk = pJoin->probe->blk;
×
1319
      pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
×
1320
      pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
×
1321
      pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
×
1322
      
1323
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
×
1324
            
1325
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
×
1326
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1327
        return pCtx->finBlk;
×
1328
      }
1329
    }
1330

1331
    if (pJoin->probe->dsFetchDone && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
1332
      if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
×
1333
        MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1334
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1335
          return pCtx->finBlk;
×
1336
        }
1337
      }
1338

1339
      pCtx->buildNEqGrp.blk = pJoin->build->blk;
×
1340
      pCtx->buildNEqGrp.beginIdx = pJoin->build->blkRowIdx;
×
1341
      pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
×
1342
      pCtx->buildNEqGrp.endIdx = pJoin->build->blk->info.rows - 1;
×
1343
      
1344
      pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
×
1345
            
1346
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false));
×
1347
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1348
        return pCtx->finBlk;
×
1349
      }
1350
    }
1351

1352
  } while (true);
1353

1354
_return:
×
1355

1356
  if (code) {
×
1357
    pJoin->errCode = code;
×
1358
    return NULL;
×
1359
  }
1360

1361
  return pCtx->finBlk;
×
1362
}
1363

1364

1365
static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp) {
×
1366
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1367
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1368
  
1369
  do {
1370
    blockDataCleanup(pCtx->midBlk);
×
1371

1372
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
×
1373

1374
    if (pCtx->midBlk->info.rows > 0) {
×
1375
      MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pPreFilter));
×
1376
    }
1377

1378
    if (pCtx->midBlk->info.rows <= 0) {
×
1379
      if (build->grpRowIdx < 0) {
×
1380
        break;
×
1381
      }
1382
      
1383
      continue;
×
1384
    }
1385

1386
    //A S S E R T(1 == pCtx->midBlk->info.rows);
1387
    MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
×
1388
    //A S S E R T(false == pCtx->midRemains);
1389
    
1390
    break;
×
1391
  } while (true);
1392

1393
  return TSDB_CODE_SUCCESS;
×
1394
}
1395

1396

1397
static int32_t mSemiJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
×
1398
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1399
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1400
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
×
1401
  if (NULL == probeGrp) {
×
1402
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1403
  }
1404

1405
  size_t bufLen = 0;
×
1406
  int32_t probeEndIdx = probeGrp->endIdx;
×
1407
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); probeGrp->readIdx++) {
×
1408
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
1409
      continue;
×
1410
    }
1411

1412
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
1413
    if (NULL == pGrp) {
×
1414
      continue;
×
1415
    }
1416

1417
    build->pHashCurGrp = *(SArray**)pGrp;
×
1418
    build->grpRowIdx = 0;
×
1419

1420
    probeGrp->endIdx = probeGrp->readIdx;      
×
1421
    MJ_ERR_RET(mSemiJoinHashGrpCartFilter(pCtx, probeGrp));
×
1422
    probeGrp->endIdx = probeEndIdx;
×
1423
  }
1424

1425
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1426

1427
  return TSDB_CODE_SUCCESS;
×
1428
}
1429

1430

1431
static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
×
1432
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1433
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1434
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
1435
  if (NULL == probeGrp) {
×
1436
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1437
  }
1438

1439
  size_t bufLen = 0;
×
1440

1441
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
×
1442
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
1443
      continue;
×
1444
    }
1445

1446
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
1447
    if (NULL == pGrp) {
×
1448
      continue;
×
1449
    }
1450

1451
    build->pHashCurGrp = *(SArray**)pGrp;
×
1452
    //A S S E R T(1 == taosArrayGetSize(build->pHashCurGrp));
1453
    build->grpRowIdx = 0;
×
1454
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, NULL));
×
1455
    //A S S E R T(build->grpRowIdx < 0);
1456
  }
1457

1458
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1459

1460
  return TSDB_CODE_SUCCESS;
×
1461
}
1462

1463

1464
static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
×
1465
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1466
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1467
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
1468
  if (NULL == probeGrp) {
×
1469
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1470
  }
1471

1472
  SMJoinGrpRows* buildGrp = NULL;
×
1473
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
×
1474
  int32_t probeEndIdx = probeGrp->endIdx;
×
1475
  int32_t rowsLeft = pCtx->midBlk->info.capacity;  
×
1476

1477
  do {
1478
    for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); 
×
1479
      ++probeGrp->readIdx, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
×
1480
      probeGrp->endIdx = probeGrp->readIdx;
×
1481
      
1482
      rowsLeft = pCtx->midBlk->info.capacity;
×
1483

1484
      blockDataCleanup(pCtx->midBlk);      
×
1485
      for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
×
1486
        buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
×
1487
        if (NULL == buildGrp) {
×
1488
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1489
        }
1490

1491
        if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
×
1492
          MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
1493
          rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
×
1494
          buildGrp->readIdx = buildGrp->beginIdx;
×
1495
          continue;
×
1496
        }
1497
        
1498
        int32_t buildEndIdx = buildGrp->endIdx;
×
1499
        buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
1500
        //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
1501
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
1502
        buildGrp->readIdx += rowsLeft;
×
1503
        buildGrp->endIdx = buildEndIdx;
×
1504
        break;
×
1505
      }
1506

1507
      if (pCtx->midBlk->info.rows > 0) {
×
1508
        MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pFPreFilter));
×
1509
      } 
1510

1511
      if (0 == pCtx->midBlk->info.rows) {
×
1512
        if (build->grpIdx == buildGrpNum) {
×
1513
          continue;
×
1514
        }
1515
      } else {
1516
        //A S S E R T(1 == pCtx->midBlk->info.rows);
1517
        MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
×
1518
        //A S S E R T(false == pCtx->midRemains);
1519

1520
        if (build->grpIdx == buildGrpNum) {
×
1521
          continue;
×
1522
        }
1523

1524
        buildGrp->readIdx = buildGrp->beginIdx;        
×
1525
        continue;
×
1526
      }
1527

1528
      //need break
1529

1530
      probeGrp->endIdx = probeEndIdx;
×
1531
      break;
×
1532
    }
1533

1534
    if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
×
1535
      break;
1536
    }
1537
  } while (true);
1538

1539
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1540

1541
  return TSDB_CODE_SUCCESS;
×
1542
}
1543

1544

1545
static int32_t mSemiJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
×
1546
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
×
1547
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1548
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1549
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
×
1550
  SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, 0);
×
1551
  if (NULL == buildGrp || NULL == probeGrp) {
×
1552
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1553
  }
1554

1555
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
×
1556
  int32_t probeEndIdx = probeGrp->endIdx;
×
1557

1558
  //A S S E R T(1 == taosArrayGetSize(build->eqGrps));
1559
  //A S S E R T(buildGrp->beginIdx == buildGrp->endIdx);
1560

1561
  if (probeRows <= rowsLeft) {
×
1562
    MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
1563

1564
    pCtx->grpRemains = false;
×
1565
    return TSDB_CODE_SUCCESS;
×
1566
  }
1567

1568
  probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1;
×
1569
  MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
1570
  probeGrp->readIdx = probeGrp->endIdx + 1; 
×
1571
  probeGrp->endIdx = probeEndIdx;
×
1572

1573
  pCtx->grpRemains = true;
×
1574
  
1575
  return TSDB_CODE_SUCCESS;  
×
1576
}
1577

1578

1579
static int32_t mSemiJoinHashCart(SMJoinMergeCtx* pCtx) {
×
1580
  return (NULL == pCtx->pJoin->pPreFilter) ? mSemiJoinHashFullCart(pCtx) : mSemiJoinHashSeqCart(pCtx);
×
1581
}
1582

1583
static int32_t mSemiJoinMergeCart(SMJoinMergeCtx* pCtx) {
×
1584
  return (NULL == pCtx->pJoin->pFPreFilter) ? mSemiJoinMergeFullCart(pCtx) : mSemiJoinMergeSeqCart(pCtx);
×
1585
}
1586

1587
static FORCE_INLINE int32_t mSemiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
1588
  return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
1589
}
1590

1591

1592
SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) {
×
1593
  SMJoinOperatorInfo* pJoin = pOperator->info;
×
1594
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
×
1595
  int32_t code = TSDB_CODE_SUCCESS;
×
1596
  int64_t probeTs = 0;
×
1597
  int64_t buildTs = 0;
×
1598
  SColumnInfoData* pBuildCol = NULL;
×
1599
  SColumnInfoData* pProbeCol = NULL;
×
1600

1601
  blockDataCleanup(pCtx->finBlk);
×
1602

1603
  if (pCtx->grpRemains) {
×
1604
    MJ_ERR_JRET(mSemiJoinHandleGrpRemains(pCtx));
×
1605
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1606
      return pCtx->finBlk;
×
1607
    }
1608
    pCtx->grpRemains = false;
×
1609
  }
1610

1611
  do {
1612
    if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) {
×
1613
      break;
×
1614
    }
1615

1616
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
1617
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
1618
    
1619
    if (probeTs == pCtx->lastEqTs) {
×
1620
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
1621
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1622
        return pCtx->finBlk;
×
1623
      }
1624

1625
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
1626
        continue;
×
1627
      } 
1628

1629
      MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
1630
    } else if (MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
1631
      mJoinSetDone(pOperator);
×
1632
      break;
×
1633
    }
1634

1635
    do {
1636
      if (probeTs == buildTs) {
×
1637
        pCtx->lastEqTs = probeTs;
×
1638
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
×
1639
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1640
          return pCtx->finBlk;
×
1641
        }
1642

1643
        if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
1644
          break;
1645
        }
1646
        
1647
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
1648
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
1649
        continue;
×
1650
      }
1651

1652
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
×
1653
        if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
×
1654
          MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
1655
          continue;
×
1656
        }
1657
      } else {
1658
        if (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
×
1659
          MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
×
1660
          continue;
×
1661
        }
1662
      }
1663
      
1664
      break;
×
1665
    } while (true);
1666
  } while (true);
1667

1668
_return:
×
1669

1670
  if (code) {
×
1671
    pJoin->errCode = code;
×
1672
    return NULL;
×
1673
  }
1674

1675
  return pCtx->finBlk;
×
1676
}
1677

1678

1679
static FORCE_INLINE int32_t mAntiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
1680
  if (pCtx->lastEqGrp) {
×
1681
    return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
1682
  }
1683
  
1684
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
×
1685
}
1686

1687
static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
×
1688
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1689
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1690
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
1691
  if (NULL == probeGrp) {
×
1692
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1693
  }
1694

1695
  size_t bufLen = 0;
×
1696
  int32_t probeEndIdx = probeGrp->endIdx;
×
1697

1698
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
×
1699
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
1700
      probeGrp->endIdx = probeGrp->readIdx;
×
1701
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1702
      probeGrp->endIdx = probeEndIdx;
×
1703
      continue;
×
1704
    }
1705

1706
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
1707
    if (NULL == pGrp) {
×
1708
      probeGrp->endIdx = probeGrp->readIdx;
×
1709
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1710
      probeGrp->endIdx = probeEndIdx;
×
1711
    }
1712
  }
1713

1714
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1715

1716
  return TSDB_CODE_SUCCESS;
×
1717
}
1718

1719

1720
static int32_t mAntiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp) {
×
1721
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1722
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1723
  
1724
  do {
1725
    blockDataCleanup(pCtx->midBlk);
×
1726

1727
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
×
1728

1729
    if (pCtx->midBlk->info.rows > 0) {
×
1730
      MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pPreFilter));
×
1731
    } 
1732

1733
    if (pCtx->midBlk->info.rows) {
×
1734
      break;
×
1735
    }
1736
    
1737
    if (build->grpRowIdx < 0) {
×
1738
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1739
      break;
×
1740
    }
1741
    
1742
    continue;
×
1743
  } while (true);
1744

1745
  return TSDB_CODE_SUCCESS;
×
1746
}
1747

1748

1749
static int32_t mAntiJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
×
1750
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1751
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1752
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
×
1753
  if (NULL == probeGrp) {
×
1754
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1755
  }
1756

1757
  size_t bufLen = 0;
×
1758
  int32_t probeEndIdx = probeGrp->endIdx;
×
1759

1760
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); probeGrp->readIdx++) {
×
1761
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
1762
      probeGrp->endIdx = probeGrp->readIdx;
×
1763
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1764
      probeGrp->endIdx = probeEndIdx;
×
1765
      continue;
×
1766
    }
1767

1768
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
1769
    if (NULL == pGrp) {
×
1770
      probeGrp->endIdx = probeGrp->readIdx;
×
1771
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1772
      probeGrp->endIdx = probeEndIdx;
×
1773
      continue;
×
1774
    }
1775

1776
    build->pHashCurGrp = *(SArray**)pGrp;
×
1777
    build->grpRowIdx = 0;
×
1778

1779
    probeGrp->endIdx = probeGrp->readIdx;      
×
1780
    MJ_ERR_RET(mAntiJoinHashGrpCartFilter(pCtx, probeGrp));
×
1781
    probeGrp->endIdx = probeEndIdx;
×
1782
  }
1783

1784
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1785

1786
  return TSDB_CODE_SUCCESS;
×
1787
}
1788

1789
static int32_t mAntiJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
×
1790
  return TSDB_CODE_SUCCESS;
×
1791
}
1792

1793
static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
×
1794
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1795
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1796
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
1797
  if (NULL == probeGrp) {
×
1798
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1799
  }
1800

1801
  SMJoinGrpRows* buildGrp = NULL;
×
1802
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
×
1803
  int32_t probeEndIdx = probeGrp->endIdx;
×
1804
  int32_t rowsLeft = pCtx->midBlk->info.capacity;  
×
1805

1806
  do {
1807
    for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); 
×
1808
      ++probeGrp->readIdx, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
×
1809
      probeGrp->endIdx = probeGrp->readIdx;
×
1810
      
1811
      rowsLeft = pCtx->midBlk->info.capacity;
×
1812

1813
      blockDataCleanup(pCtx->midBlk);      
×
1814
      for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
×
1815
        buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
×
1816
        if (NULL == buildGrp) {
×
1817
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1818
        }
1819

1820
        if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
×
1821
          MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
1822
          rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
×
1823
          buildGrp->readIdx = buildGrp->beginIdx;
×
1824
          continue;
×
1825
        }
1826
        
1827
        int32_t buildEndIdx = buildGrp->endIdx;
×
1828
        buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
1829
        //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
1830
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
1831
        buildGrp->readIdx += rowsLeft;
×
1832
        buildGrp->endIdx = buildEndIdx;
×
1833
        break;
×
1834
      }
1835

1836
      if (pCtx->midBlk->info.rows > 0) {
×
1837
        MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pFPreFilter));
×
1838
      } 
1839

1840
      if (pCtx->midBlk->info.rows > 0) {
×
1841
        if (build->grpIdx < buildGrpNum) {
×
1842
          buildGrp->readIdx = buildGrp->beginIdx;        
×
1843
        }
1844

1845
        continue;
×
1846
      }
1847
      
1848
      if (build->grpIdx >= buildGrpNum) {
×
1849
        MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1850
        continue;
×
1851
      }
1852

1853
      //need break
1854

1855
      probeGrp->endIdx = probeEndIdx;
×
1856
      break;
×
1857
    }
1858

1859
    if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
×
1860
      break;
1861
    }
1862
  } while (true);
1863

1864
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1865

1866
  return TSDB_CODE_SUCCESS;
×
1867
}
1868

1869

1870
static int32_t mAntiJoinHashCart(SMJoinMergeCtx* pCtx) {
×
1871
  return (NULL == pCtx->pJoin->pPreFilter) ? mAntiJoinHashFullCart(pCtx) : mAntiJoinHashSeqCart(pCtx);
×
1872
}
1873

1874
static int32_t mAntiJoinMergeCart(SMJoinMergeCtx* pCtx) {
×
1875
  return (NULL == pCtx->pJoin->pFPreFilter) ? mAntiJoinMergeFullCart(pCtx) : mAntiJoinMergeSeqCart(pCtx);
×
1876
}
1877

1878
SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
×
1879
  SMJoinOperatorInfo* pJoin = pOperator->info;
×
1880
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
×
1881
  int32_t code = TSDB_CODE_SUCCESS;
×
1882
  int64_t probeTs = 0;
×
1883
  int64_t buildTs = 0;
×
1884
  SColumnInfoData* pBuildCol = NULL;
×
1885
  SColumnInfoData* pProbeCol = NULL;
×
1886

1887
  blockDataCleanup(pCtx->finBlk);
×
1888

1889
  if (pCtx->grpRemains) {
×
1890
    MJ_ERR_JRET(mAntiJoinHandleGrpRemains(pCtx));
×
1891
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1892
      return pCtx->finBlk;
×
1893
    }
1894
    pCtx->grpRemains = false;
×
1895
  }
1896

1897
  do {
1898
    if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
×
1899
      break;
×
1900
    }
1901

1902
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
1903
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
1904
    
1905
    if (probeTs == pCtx->lastEqTs) {
×
1906
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
1907
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1908
        return pCtx->finBlk;
×
1909
      }
1910

1911
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
1912
        continue;
×
1913
      } else {
1914
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
1915
      }
1916
    }
1917

1918
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
1919
      if (probeTs == buildTs) {
×
1920
        pCtx->lastEqTs = probeTs;
×
1921
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
×
1922
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1923
          return pCtx->finBlk;
×
1924
        }
1925

1926
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
1927
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
1928
      } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
×
1929
        MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
×
1930
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1931
          return pCtx->finBlk;
×
1932
        }
1933
      } else {
1934
        while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
×
1935
          MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
×
1936
          if (PROBE_TS_NREACH(pCtx->ascTs, probeTs, buildTs)) {
×
1937
            continue;
×
1938
          }
1939
          
1940
          break;
×
1941
        }
1942
      }
1943
    }
1944

1945
    if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
×
1946
      pCtx->probeNEqGrp.blk = pJoin->probe->blk;
×
1947
      pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
×
1948
      pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
×
1949
      pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
×
1950
      
1951
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
×
1952
            
1953
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
×
1954
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1955
        return pCtx->finBlk;
×
1956
      }
1957
    }
1958
  } while (true);
1959

1960
_return:
×
1961

1962
  if (code) {
×
1963
    pJoin->errCode = code;
×
1964
    return NULL;
×
1965
  }
1966

1967
  return pCtx->finBlk;
×
1968
}
1969

1970

1971
int32_t mAsofBackwardCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) {
×
1972
  if (pCache->outBlk->info.rows <= 0) {
×
1973
    *evictRows = 0;
×
1974
    return TMIN(jLimit, newRows);
×
1975
  }
1976

1977
  if ((pCache->outBlk->info.rows + newRows) <= jLimit) {
×
1978
    *evictRows = 0;
×
1979
    return newRows;
×
1980
  }
1981

1982
  if (newRows >= jLimit) {
×
1983
    *evictRows = pCache->outBlk->info.rows;
×
1984
    return jLimit;
×
1985
  }
1986

1987
  *evictRows = pCache->outBlk->info.rows + newRows - jLimit;
×
1988
  return newRows;
×
1989
}
1990

1991
int32_t mAsofBackwardAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) {
×
1992
  int32_t evictRows = 0;
×
1993
  SMJoinWinCache* pCache = &pCtx->cache;
×
1994
  int32_t rows = mAsofBackwardCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows);
×
1995
  if (evictRows > 0) {
×
1996
    MJ_ERR_RET(blockDataTrimFirstRows(pCache->outBlk, evictRows));
×
1997
  }
1998

1999
  int32_t startIdx = fromBegin ? pGrp->beginIdx : pGrp->endIdx - rows + 1;
×
2000
  return blockDataMergeNRows(pCache->outBlk, pGrp->blk, startIdx, rows);
×
2001
}
2002

2003

2004
int32_t mAsofBackwardAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) {
×
2005
  int64_t eqRowsNum = 0;
×
2006
  SMJoinGrpRows grp;
2007

2008
  do {
×
2009
      grp.blk = pTable->blk;
×
2010
      
2011
      SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
×
2012
      if (NULL == pCol) {
×
2013
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2014
      }
2015

2016
      if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
×
2017
        return TSDB_CODE_SUCCESS;
×
2018
      }
2019

2020
      grp.beginIdx = pTable->blkRowIdx;
×
2021
      
2022
      char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
×
2023
      if (timestamp != *(int64_t*)pEndVal) {
×
2024
        for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
×
2025
          char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
×
2026
          if (timestamp == *(int64_t*)pNextVal) {
×
2027
            continue;
×
2028
          }
2029

2030
          break;
×
2031
        }
2032

2033
        grp.endIdx = pTable->blkRowIdx - 1;
×
2034
      } else {
2035
        grp.endIdx = pTable->blk->info.rows - 1;
×
2036
        pTable->blkRowIdx = pTable->blk->info.rows;
×
2037
      }
2038

2039
      if (eqRowsNum < pCtx->jLimit) {
×
2040
        grp.endIdx = grp.beginIdx + TMIN(grp.endIdx - grp.beginIdx + 1, pCtx->jLimit - eqRowsNum) - 1;
×
2041
        MJ_ERR_RET(mAsofBackwardAddRowsToCache(pCtx, &grp, true));
×
2042
      }
2043
      
2044
      eqRowsNum += grp.endIdx - grp.beginIdx + 1;
×
2045

2046
    if (pTable->blkRowIdx == pTable->blk->info.rows && !pTable->dsFetchDone) {
×
2047
      pTable->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pTable);
×
2048
      qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
×
2049

2050
      pTable->blkRowIdx = 0;
×
2051
      pCtx->buildGrp.blk = pTable->blk;
×
2052

2053
      if (NULL == pTable->blk) {
×
2054
        break;
×
2055
      }    
2056
    } else {
2057
      break;
2058
    }
2059
  } while (true);
2060

2061
  return TSDB_CODE_SUCCESS;
×
2062
}
2063

2064
int32_t mAsofBackwardDumpGrpCache(SMJoinWindowCtx* pCtx) {
×
2065
  if (NULL == pCtx->cache.outBlk || pCtx->cache.outBlk->info.rows <= 0) {
×
2066
    return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false);
×
2067
  }
2068

2069
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
×
2070
  SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
×
2071
  SMJoinGrpRows buildGrp = {.blk = pCtx->cache.outBlk, .readIdx = pCtx->cache.outRowIdx, .endIdx = pCtx->cache.outBlk->info.rows - 1};
×
2072
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
×
2073
  int32_t probeEndIdx = probeGrp->endIdx;
×
2074
  int64_t totalResRows = (0 == pCtx->cache.outRowIdx) ? (probeRows * pCtx->cache.outBlk->info.rows) : 
×
2075
    (pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx + (probeRows - 1) * pCtx->cache.outBlk->info.rows);
×
2076

2077
  if (totalResRows <= rowsLeft) {
×
2078
    if (0 == pCtx->cache.outRowIdx) {
×
2079
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2080

2081
      pCtx->grpRemains = false;
×
2082
      pCtx->cache.outRowIdx = 0;
×
2083
      return TSDB_CODE_SUCCESS;
×
2084
    }
2085

2086
    probeGrp->endIdx = probeGrp->readIdx;
×
2087
    MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2088
    if (++probeGrp->readIdx <= probeEndIdx) {
×
2089
      probeGrp->endIdx = probeEndIdx;
×
2090
      buildGrp.readIdx = 0;
×
2091
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2092
    }
2093
    
2094
    pCtx->grpRemains = false;
×
2095
    pCtx->cache.outRowIdx = 0;
×
2096
    return TSDB_CODE_SUCCESS;
×
2097
  }
2098

2099
  for (; !GRP_DONE(probeGrp) && rowsLeft > 0; ) {
×
2100
    if (0 == pCtx->cache.outRowIdx) {
×
2101
      int32_t grpNum = rowsLeft / pCtx->cache.outBlk->info.rows;
×
2102
      if (grpNum > 0) {
×
2103
        probeGrp->endIdx = probeGrp->readIdx + grpNum - 1;
×
2104
        buildGrp.readIdx = 0;
×
2105
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2106
        rowsLeft -= grpNum * pCtx->cache.outBlk->info.rows;
×
2107
        probeGrp->readIdx += grpNum;
×
2108
        probeGrp->endIdx = probeEndIdx;
×
2109
        continue;
×
2110
      }
2111
    }
2112
    
2113
    probeGrp->endIdx = probeGrp->readIdx;
×
2114
    buildGrp.readIdx = pCtx->cache.outRowIdx;
×
2115
    
2116
    int32_t grpRemainRows = pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx;
×
2117
    if (rowsLeft >= grpRemainRows) {
×
2118
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2119
      rowsLeft -= grpRemainRows;
×
2120
      pCtx->cache.outRowIdx = 0;
×
2121
      probeGrp->readIdx++;
×
2122
      probeGrp->endIdx = probeEndIdx;
×
2123
      continue;
×
2124
    }
2125
    
2126
    buildGrp.endIdx = buildGrp.readIdx + rowsLeft - 1;
×
2127
    MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2128
    pCtx->cache.outRowIdx += rowsLeft;
×
2129
    break;
×
2130
  }
2131

2132
  probeGrp->endIdx = probeEndIdx;
×
2133
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
2134
  
2135
  return TSDB_CODE_SUCCESS;  
×
2136
}
2137

2138
int32_t mAsofBackwardDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp, bool skipEqPost) {
×
2139
  if (!pCtx->eqRowsAcq) {
×
2140
    MJ_ERR_RET(mAsofBackwardDumpGrpCache(pCtx));
×
2141

2142
    pCtx->lastEqGrp = true;
×
2143
    if (pCtx->grpRemains) {
×
2144
      return TSDB_CODE_SUCCESS;
×
2145
    }
2146
  }
2147

2148
  if (!pCtx->eqPostDone && !lastBuildGrp && (pCtx->eqRowsAcq || !skipEqPost)) {
×
2149
    pCtx->eqPostDone = true;
×
2150
    MJ_ERR_RET(mAsofBackwardAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
×
2151
  }
2152

2153
  if (!pCtx->eqRowsAcq) {
×
2154
    return TSDB_CODE_SUCCESS;
×
2155
  }
2156

2157
  MJ_ERR_RET(mAsofBackwardDumpGrpCache(pCtx));
×
2158

2159
  pCtx->lastEqGrp = true;
×
2160

2161
  return TSDB_CODE_SUCCESS;
×
2162
}
2163

2164
int32_t mAsofBackwardProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
×
2165
  SMJoinOperatorInfo* pJoin = pCtx->pJoin;
×
2166

2167
  if (!lastBuildGrp) {
×
2168
    pCtx->eqPostDone = false;
×
2169
  }
2170

2171
  bool wholeBlk = false;
×
2172
  MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp));
×
2173

2174
  MJ_ERR_RET(mAsofBackwardDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp, wholeBlk));
×
2175
  
2176
  return TSDB_CODE_SUCCESS;
×
2177
}
2178

2179

2180
int32_t mAsofBackwardHandleClosedGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol,  int64_t* probeTs, int64_t* buildTs) {
×
2181
  pCtx->lastEqGrp = false;
×
2182
  
2183
  pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
×
2184
  pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
×
2185
  pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
×
2186
  
2187
  while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
×
2188
    MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe);
×
2189
    if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) {
×
2190
      pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx;
×
2191
      continue;
×
2192
    }
2193
    
2194
    break;
×
2195
  }
2196

2197
  return mAsofBackwardDumpGrpCache(pCtx);
×
2198
}
2199

2200
int32_t mAsofBackwardHandleUnclosedGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol,  int64_t* probeTs, int64_t* buildTs) {
×
2201
  pCtx->lastEqGrp = false;
×
2202

2203
  pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx;
×
2204
  pCtx->buildGrp.readIdx = pCtx->buildGrp.beginIdx;
×
2205
  pCtx->buildGrp.endIdx = pCtx->buildGrp.beginIdx;
×
2206
  
2207
  while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
×
2208
    MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build);
×
2209
    if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
×
2210
      pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx;
×
2211
      continue;
×
2212
    }
2213
    
2214
    break;
×
2215
  }
2216

2217
  pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
×
2218
  pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
×
2219
  pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
×
2220

2221
  return mAsofBackwardAddRowsToCache(pCtx, &pCtx->buildGrp, false);
×
2222
}
2223

2224
int32_t mAsofBackwardHandleGrpRemains(SMJoinWindowCtx* pCtx) {
×
2225
  return (pCtx->lastEqGrp) ? mAsofBackwardDumpUpdateEqRows(pCtx, pCtx->pJoin, false, true) : mAsofBackwardDumpGrpCache(pCtx);
×
2226
}
2227

2228
static int32_t mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
×
2229
  *newBlock = false;
×
2230
  
2231
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
×
2232
  bool buildGot = false;
×
2233

2234
  do {
2235
    if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {  
×
2236
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
×
2237
    }
2238
    
2239
    if (!probeGot) {
×
2240
      if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
×
2241
        mJoinSetDone(pOperator);
×
2242
      }
2243

2244
      return TSDB_CODE_SUCCESS;
×
2245
    }
2246
    
2247
    break;
×
2248
  } while (true);
2249

2250
  if (buildGot && NULL == pCtx->cache.outBlk) {
×
2251
    pCtx->cache.outBlk = NULL;
×
2252
    int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk);
×
2253
    if (code) {
×
2254
      MJ_ERR_RET(code);
×
2255
    }
2256

2257
    MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
×
2258
  }
2259

2260
  pCtx->probeGrp.blk = pJoin->probe->blk;
×
2261
  pCtx->buildGrp.blk = pJoin->build->blk;
×
2262

2263
  *newBlock = true;
×
2264
  
2265
  return TSDB_CODE_SUCCESS;
×
2266
}
2267

2268

2269
SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
×
2270
  SMJoinOperatorInfo* pJoin = pOperator->info;
×
2271
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
×
2272
  int32_t code = TSDB_CODE_SUCCESS;
×
2273
  int64_t probeTs = 0;
×
2274
  int64_t buildTs = 0;
×
2275
  SColumnInfoData* pBuildCol = NULL;
×
2276
  SColumnInfoData* pProbeCol = NULL;
×
2277
  bool newBlock = false;
×
2278

2279
  blockDataCleanup(pCtx->finBlk);
×
2280

2281
  if (pCtx->grpRemains) {
×
2282
    MJ_ERR_JRET(mAsofBackwardHandleGrpRemains(pCtx));
×
2283
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2284
      return pCtx->finBlk;
×
2285
    }
2286
    pCtx->grpRemains = false;
×
2287
  }
2288

2289
  do {
2290
    MJ_ERR_JRET(mAsofBackwardRetrieve(pOperator, pJoin, pCtx, &newBlock));
×
2291
    if (!newBlock) {
×
2292
      if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
×
2293
        continue;
×
2294
      }
2295
      
2296
      break;
×
2297
    }
2298

2299
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
2300
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
2301
    
2302
    if (probeTs == pCtx->lastTs) {
×
2303
      MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, true));
×
2304
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2305
        return pCtx->finBlk;
×
2306
      }
2307

2308
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
2309
        continue;
×
2310
      } else {
2311
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
2312
      }
2313
    }
2314

2315
    if (pCtx->lastEqGrp && !pCtx->eqPostDone) {
×
2316
      pCtx->eqPostDone = true;
×
2317
      MJ_ERR_JRET(mAsofBackwardAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
×
2318
      MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
2319
    }
2320

2321
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
2322
      if (probeTs == buildTs) {
×
2323
        pCtx->lastTs = probeTs;
×
2324
        MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, false));
×
2325
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2326
          return pCtx->finBlk;
×
2327
        }
2328

2329
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
2330
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
2331
        continue;
×
2332
      }
2333

2334
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
×
2335
        MJ_ERR_JRET(mAsofBackwardHandleClosedGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
×
2336
      } else {
2337
        MJ_ERR_JRET(mAsofBackwardHandleUnclosedGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs));
×
2338
      }
2339

2340
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2341
        return pCtx->finBlk;
×
2342
      }
2343
    }
2344

2345
    if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
×
2346
      pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
×
2347
      pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
×
2348
      pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
×
2349
      
2350
      MJ_ERR_JRET(mAsofBackwardDumpGrpCache(pCtx));
×
2351
      pCtx->lastEqGrp = false;
×
2352
      
2353
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
×
2354
            
2355
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2356
        return pCtx->finBlk;
×
2357
      }
2358
    }
2359
  } while (true);
2360

2361
_return:
×
2362

2363
  if (code) {
×
2364
    pJoin->errCode = code;
×
2365
    return NULL;
×
2366
  }
2367

2368
  return pCtx->finBlk;
×
2369
}
2370

2371
int32_t mAsofForwardTrimCacheBlk(SMJoinWindowCtx* pCtx) {
×
2372
  if (taosArrayGetSize(pCtx->cache.grps) <= 0) {
×
2373
    return TSDB_CODE_SUCCESS;
×
2374
  }
2375
  
2376
  SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);
×
2377
  if (NULL == pGrp) {
×
2378
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2379
  }
2380

2381
  if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) {
×
2382
    MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx));
×
2383
    pCtx->pJoin->build->blkRowIdx = 0;
×
2384
    //A S S E R T(pCtx->pJoin->build->blk == pGrp->blk);
2385
    MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
×
2386
  }
2387

2388
  return TSDB_CODE_SUCCESS;
×
2389
}
2390

2391
int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
×
2392
  if (pCtx->cache.rowNum >= pCtx->jLimit || pCtx->pJoin->build->dsFetchDone) {
×
2393
    return TSDB_CODE_SUCCESS;
×
2394
  }
2395

2396
  MJ_ERR_RET(mAsofForwardTrimCacheBlk(pCtx));
×
2397

2398
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
2399
  SMJoinWinCache* pCache = &pCtx->cache;
×
2400
  int32_t grpNum = taosArrayGetSize(pCache->grps);
×
2401
  if (grpNum >= 1) {
×
2402
    SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1);
×
2403
    if (NULL == pGrp) {
×
2404
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2405
    }
2406

2407
    if (pGrp->blk != pCache->outBlk) {
×
2408
      int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0;
×
2409
      MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx));
×
2410
      if (1 == grpNum) {
×
2411
        pGrp->blk = pCache->outBlk;
×
2412
        pGrp->beginIdx = 0;
×
2413
        pGrp->readIdx = 0;
×
2414
        //pGrp->endIdx = pGrp->blk->info.rows - 1;
2415
      } else {
2416
        if (NULL == taosArrayPop(pCache->grps)) {
×
2417
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2418
        }
2419
        
2420
        pGrp = taosArrayGet(pCache->grps, 0);
×
2421
        if (NULL == pGrp) {
×
2422
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2423
        }
2424

2425
        //A S S E R T(pGrp->blk == pCache->outBlk);
2426
        //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
2427
      }
2428
      
2429
      //A S S E R T((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum);
2430
    }
2431

2432
    
2433
    //A S S E R T(taosArrayGetSize(pCache->grps) == 1);
2434
    //A S S E R T(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum);
2435
  }
2436
  
2437
  do {
2438
    build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, build);
×
2439
    qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
×
2440
    
2441
    build->blkRowIdx = 0;
×
2442
    
2443
    if (NULL == build->blk) {
×
2444
      break;
×
2445
    }
2446

2447
    if ((pCache->rowNum + build->blk->info.rows) >= pCtx->jLimit) {
×
2448
      MJOIN_PUSH_BLK_TO_CACHE(pCache, build->blk);
×
2449
      break;
×
2450
    }
2451
    
2452
    MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, build->blk, 0, build->blk->info.rows));
×
2453
    pCache->rowNum += build->blk->info.rows;
×
2454
    
2455
    //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
2456
  } while (pCache->rowNum < pCtx->jLimit);
×
2457

2458
  MJOIN_RESTORE_TB_BLK(pCache, build);
×
2459

2460
  return TSDB_CODE_SUCCESS;
×
2461
}
2462

2463
int32_t mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) {
×
2464
  int32_t grpNum = taosArrayGetSize(pCtx->cache.grps);
×
2465
  if (grpNum <= 0) {
×
2466
    return TSDB_CODE_SUCCESS;
×
2467
  }
2468

2469
  SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);  
×
2470
  if (NULL == pGrp) {
×
2471
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2472
  }
2473

2474
  if (1 == grpNum) {
×
2475
    pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows - pGrp->beginIdx, pCtx->jLimit) - 1;
×
2476
    return TSDB_CODE_SUCCESS;
×
2477
  }
2478

2479
  //A S S E R T(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx));
2480
  pGrp->endIdx = pGrp->blk->info.rows - 1;
×
2481
  
2482
  int64_t remainRows = pCtx->jLimit - (pGrp->endIdx - pGrp->beginIdx + 1);
×
2483
  
2484
  pGrp = taosArrayGet(pCtx->cache.grps, 1); 
×
2485
  if (NULL == pGrp) {
×
2486
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2487
  }
2488
  
2489
  pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows, remainRows) - 1;
×
2490

2491
  return TSDB_CODE_SUCCESS;  
×
2492
}
2493

2494
int32_t mAsofForwardFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
×
2495
  if (!lastBuildGrp) {
×
2496
    MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
×
2497
    MJ_ERR_RET(mAsofForwardChkFillGrpCache(pCtx));
×
2498
  }
2499

2500
  MJ_ERR_RET(mAsofForwardUpdateBuildGrpEndIdx(pCtx));
×
2501
  
2502
  return mWinJoinDumpGrpCache(pCtx);
×
2503
}
2504

2505
int32_t mAsofForwardSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) {
×
2506
  SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
×
2507
  if (NULL == pCol) {
×
2508
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2509
  }
2510
  
2511
  if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
×
2512
    *wholeBlk = false;
×
2513
    return TSDB_CODE_SUCCESS;
×
2514
  }
2515

2516
  pTable->blkRowIdx++;
×
2517
  pCtx->cache.rowNum--;
×
2518
  
2519
  char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
×
2520
  if (timestamp != *(int64_t*)pEndVal) {
×
2521
    for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
×
2522
      char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
×
2523
      if (timestamp == *(int64_t*)pNextVal) {
×
2524
        pCtx->cache.rowNum--;
×
2525
        continue;
×
2526
      }
2527

2528
      *wholeBlk = false;  
×
2529
      return TSDB_CODE_SUCCESS;
×
2530
    }
2531
  } else {
2532
    pCtx->cache.rowNum -= (pTable->blk->info.rows - pTable->blkRowIdx);
×
2533
  }
2534

2535
  *wholeBlk = true;
×
2536
  
2537
  return TSDB_CODE_SUCCESS;
×
2538
}
2539

2540
int32_t mAsofForwardSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
×
2541
  SMJoinWinCache* cache = &pCtx->cache;
×
2542
  SMJoinTableCtx* pTable = pCtx->pJoin->build;
×
2543
  bool wholeBlk = false;
×
2544

2545
  do {
×
2546
    do {
2547
      MJ_ERR_RET(mAsofForwardSkipEqRows(pCtx, pTable, timestamp, &wholeBlk));
×
2548
      if (!wholeBlk) {
×
2549
        return TSDB_CODE_SUCCESS;
×
2550
      }
2551

2552
      MJOIN_POP_TB_BLK(cache);
×
2553
      MJOIN_RESTORE_TB_BLK(cache, pTable);
×
2554
    } while (!MJOIN_BUILD_TB_ROWS_DONE(pTable));
×
2555

2556
    //A S S E R T(pCtx->cache.rowNum == 0);
2557
    //A S S E R T(taosArrayGetSize(pCtx->cache.grps) == 0);
2558

2559
    if (pTable->dsFetchDone) {
×
2560
      return TSDB_CODE_SUCCESS;
×
2561
    }
2562
    
2563
    pTable->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pTable);
×
2564
    qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
×
2565

2566
    pTable->blkRowIdx = 0;
×
2567

2568
    if (NULL == pTable->blk) {
×
2569
      return TSDB_CODE_SUCCESS;
×
2570
    }
2571

2572
    MJOIN_PUSH_BLK_TO_CACHE(cache, pTable->blk);
×
2573
  } while (true);
2574

2575
  return TSDB_CODE_SUCCESS;
2576
}
2577

2578

2579
int32_t mAsofForwardUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
×
2580
  if (!pCtx->eqRowsAcq && !lastBuildGrp) {
×
2581
    MJ_ERR_RET(mAsofForwardSkipAllEqRows(pCtx, timestamp));
×
2582
  }
2583

2584
  return mAsofForwardFillDumpGrpCache(pCtx, lastBuildGrp);
×
2585
}
2586

2587

2588
int32_t mAsofForwardProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
×
2589
  SMJoinOperatorInfo* pJoin = pCtx->pJoin;
×
2590

2591
  pCtx->lastEqGrp = true;
×
2592

2593
  MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp));
×
2594

2595
  return mAsofForwardUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp);
×
2596
}
2597

2598
int32_t mAsofForwardHandleProbeGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol,  int64_t* probeTs, int64_t* buildTs) {
×
2599
  pCtx->lastEqGrp = false;
×
2600
  
2601
  pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
×
2602
  pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
×
2603
  pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
×
2604
  
2605
  while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
×
2606
    MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe);
×
2607
    if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) {
×
2608
      pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx;
×
2609
      continue;
×
2610
    }
2611
    
2612
    break;
×
2613
  }
2614

2615
  return mAsofForwardFillDumpGrpCache(pCtx, false);
×
2616
}
2617

2618
int32_t mAsofForwardSkipBuildGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData** pCol,  int64_t* probeTs, int64_t* buildTs) {
×
2619
  do {
2620
    MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build);
×
2621
    if (!PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
×
2622
      break;
2623
    }
2624

2625
    pCtx->cache.rowNum--;
×
2626
    while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
×
2627
      MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build);
×
2628
      if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
×
2629
        pCtx->cache.rowNum--;
×
2630
        continue;
×
2631
      }
2632
      
2633
      return TSDB_CODE_SUCCESS;
×
2634
    }
2635

2636
    MJOIN_POP_TB_BLK(&pCtx->cache);
×
2637
    MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build);
×
2638
    MJOIN_GET_TB_COL_TS(*pCol, *buildTs, pJoin->build);
×
2639
  } while (!MJOIN_BUILD_TB_ROWS_DONE(pJoin->build));
×
2640

2641
  return TSDB_CODE_SUCCESS;
×
2642
}
2643

2644
static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
×
2645
  *newBlock = false;
×
2646

2647
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
×
2648
  bool buildGot = false;
×
2649

2650
  do {
2651
    if ((probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) && pCtx->cache.rowNum < pCtx->jLimit) { 
×
2652
      pJoin->build->newBlk = false;
×
2653
      MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
×
2654
      //A S S E R T(taosArrayGetSize(pCtx->cache.grps) <= 1);
2655
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
×
2656
    }
2657
    
2658
    if (!probeGot) {
×
2659
      if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
×
2660
        mJoinSetDone(pOperator);
×
2661
      }
2662

2663
      return TSDB_CODE_SUCCESS;
×
2664
    }
2665

2666
    if (buildGot) {
×
2667
      SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
×
2668
      SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
×
2669
      if (NULL == pProbeCol || NULL == pBuildCol) {
×
2670
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2671
      }
2672

2673
      if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
×
2674
        pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
×
2675
        MJOIN_POP_TB_BLK(&pCtx->cache);
×
2676
        buildGot = false;
×
2677
        continue;
×
2678
      }
2679
    }
2680
    
2681
    break;
×
2682
  } while (true);
2683

2684
  if (buildGot && pJoin->build->newBlk) {
×
2685
    if (NULL == pCtx->cache.outBlk) {
×
2686
      pCtx->cache.outBlk = NULL;
×
2687
      int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk);
×
2688
      if (code) {
×
2689
        MJ_ERR_RET(code);
×
2690
      }
2691

2692
      MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
×
2693
    }
2694
    
2695
    MJOIN_PUSH_BLK_TO_CACHE(&pCtx->cache, pJoin->build->blk);
×
2696
    MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build);
×
2697
  }
2698

2699
  pCtx->probeGrp.blk = pJoin->probe->blk;
×
2700
  *newBlock = true;
×
2701

2702
  return TSDB_CODE_SUCCESS;
×
2703
}
2704

2705

2706
SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
×
2707
  SMJoinOperatorInfo* pJoin = pOperator->info;
×
2708
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
×
2709
  int32_t code = TSDB_CODE_SUCCESS;
×
2710
  int64_t probeTs = 0;
×
2711
  int64_t buildTs = 0;
×
2712
  SColumnInfoData* pBuildCol = NULL;
×
2713
  SColumnInfoData* pProbeCol = NULL;
×
2714
  bool newBlock = false;
×
2715

2716
  blockDataCleanup(pCtx->finBlk);
×
2717

2718
  if (pCtx->grpRemains) {
×
2719
    MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
×
2720
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2721
      return pCtx->finBlk;
×
2722
    }
2723
    pCtx->grpRemains = false;
×
2724
  }
2725

2726
  do {
2727
    MJ_ERR_JRET(mAsofForwardRetrieve(pOperator, pJoin, pCtx, &newBlock));
×
2728
    if (!newBlock) {
×
2729
      if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
×
2730
        continue;
×
2731
      }
2732

2733
      break;
×
2734
    }
2735

2736
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
2737
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
2738
    
2739
    if (probeTs == pCtx->lastTs) {
×
2740
      MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, true));
×
2741
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2742
        return pCtx->finBlk;
×
2743
      }
2744

2745
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
2746
        continue;
×
2747
      } else {
2748
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
2749
      }
2750
    }
2751

2752
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
2753
      if (probeTs == buildTs) {
×
2754
        pCtx->lastTs = probeTs;
×
2755
        MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, false));
×
2756
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2757
          return pCtx->finBlk;
×
2758
        }
2759

2760
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
2761
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
2762
        continue;
×
2763
      }
2764

2765
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
×
2766
        MJ_ERR_JRET(mAsofForwardHandleProbeGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
×
2767
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);        
×
2768
      } else {
2769
        MJ_ERR_JRET(mAsofForwardSkipBuildGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs));
×
2770
      }
2771

2772
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2773
        return pCtx->finBlk;
×
2774
      }
2775
    }
2776

2777
    if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
×
2778
      pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
×
2779
      pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
×
2780
      pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
×
2781
      
2782
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false));
×
2783
      
2784
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
×
2785
            
2786
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2787
        return pCtx->finBlk;
×
2788
      }
2789
    }
2790
  } while (true);
2791

2792
_return:
×
2793

2794
  if (code) {
×
2795
    pJoin->errCode = code;
×
2796
    return NULL;
×
2797
  }
2798

2799
  return pCtx->finBlk;
×
2800
}
2801

2802
void mAsofJoinGroupReset(SMJoinOperatorInfo* pJoin) {
×
2803
  SMJoinWindowCtx* pWin = &pJoin->ctx.windowCtx;
×
2804
  SMJoinWinCache* pCache = &pWin->cache;
×
2805

2806
  pWin->lastEqGrp = false;
×
2807
  pWin->lastProbeGrp = false;
×
2808
  pWin->eqPostDone = false;
×
2809
  pWin->lastTs = INT64_MIN;
×
2810

2811
  mWinJoinResetWindowCache(pWin, pCache);
×
2812

2813
  mJoinResetGroupTableCtx(pJoin->probe);
×
2814
  mJoinResetGroupTableCtx(pJoin->build);    
×
2815
}
×
2816

2817
static FORCE_INLINE void mWinJoinPopFrontGroup(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp) {
2818
  pCtx->cache.rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx);
×
2819
  if (pGrp->blk == pCtx->cache.outBlk) {
×
2820
    blockDataCleanup(pGrp->blk);
×
2821
  } else if (pGrp->clonedBlk) {
×
2822
    (void)blockDataDestroy(pGrp->blk);
×
2823
  }
2824
  
2825
  taosArrayPopFrontBatch(pCtx->cache.grps, 1);
×
2826
}
×
2827

2828
static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
×
2829
  SMJoinWinCache* pCache = &pCtx->cache;
×
2830
  SArray* pGrpArray = (NULL != pCache->grps) ? pCache->grps : pCache->grpsQueue;
×
2831
  int32_t grpNum = taosArrayGetSize(pGrpArray);
×
2832
  if (grpNum <= 0) {
×
2833
    return TSDB_CODE_SUCCESS;
×
2834
  }
2835

2836
  SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGetLast(pGrpArray);
×
2837
  if (NULL == pGrp) {
×
2838
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2839
  }
2840

2841
  if (!pGrp->clonedBlk) {
×
2842
    int32_t code = 0;
×
2843
    if (0 == pGrp->beginIdx) {
×
2844
      SSDataBlock* p = NULL;
×
2845
      code = createOneDataBlock(pGrp->blk, true, &p);
×
2846
      if (code) {
×
2847
        MJ_ERR_RET(code);
×
2848
      }
2849
      pGrp->blk = p;
×
2850
    } else {
2851
      code = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx, &pGrp->blk);
×
2852
      pGrp->endIdx -= pGrp->beginIdx;
×
2853
      pGrp->beginIdx = 0;
×
2854
      pGrp->readIdx = 0;
×
2855
    }
2856
    if (code) {
×
2857
      MJ_ERR_RET(code);
×
2858
    }
2859

2860
    pGrp->clonedBlk = true;
×
2861
  }
2862

2863
  return TSDB_CODE_SUCCESS;
×
2864
}
2865

2866
static int32_t mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
×
2867
  *newBlock = false;
×
2868
  
2869
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
×
2870
  bool buildGot = false;
×
2871

2872
  do {
2873
    if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { 
×
2874
      if (NULL == pJoin->build->blk) {
×
2875
        MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx));
×
2876
      }
2877
      
2878
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
×
2879
    }
2880
    
2881
    if (!probeGot) {
×
2882
      if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
×
2883
        mJoinSetDone(pOperator);
×
2884
      }
2885
      
2886
      return TSDB_CODE_SUCCESS;
×
2887
    }
2888

2889
    if (buildGot && pCtx->forwardRowsAcq) {
×
2890
      SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
×
2891
      SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
×
2892
      if (NULL == pProbeCol || NULL == pBuildCol) {
×
2893
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2894
      }
2895

2896
      if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
×
2897
        pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
×
2898
        buildGot = false;
×
2899
        continue;
×
2900
      }
2901
    }
2902
    
2903
    break;
×
2904
  } while (true);
2905

2906
  pCtx->probeGrp.blk = pJoin->probe->blk;
×
2907
  *newBlock = true;
×
2908
  
2909
  return TSDB_CODE_SUCCESS;
×
2910
}
2911

2912
int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
×
2913
  SSDataBlock* pBlk = build->blk;
×
2914
  SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId);
×
2915
  if (NULL == pCol) {
×
2916
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2917
  }
2918

2919
  if (pCtx->ascTs) {
×
2920
    if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) < pCtx->winBeginTs) {      
×
2921
      *winEnd = false;
×
2922
      build->blk = NULL;
×
2923
      goto _return;
×
2924
    }
2925

2926
    if (*(int64_t*)pCol->pData > pCtx->winEndTs) {
×
2927
      *winEnd = true;
×
2928
      goto _return;
×
2929
    }
2930

2931
    for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
×
2932
      if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) {
×
2933
        continue;
×
2934
      }
2935
    
2936
      if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) {
×
2937
        SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
×
2938
        SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
2939
        if (NULL == pGrp) {
×
2940
          MJ_ERR_RET(terrno);
×
2941
        }
2942
    
2943
        pGrp->readIdx = pGrp->beginIdx;
×
2944
        pGrp->endIdx = pGrp->beginIdx;
×
2945
    
2946
        build->blk = NULL;
×
2947
        pCache->rowNum = 1;
×
2948
      } else {
2949
        pCache->rowNum = 0;
×
2950
      }
2951
    
2952
      *winEnd = true;  
×
2953
      return TSDB_CODE_SUCCESS;
×
2954
    }
2955

2956
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2957
  }
2958
  
2959
  if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) > pCtx->winEndTs) {
×
2960
    *winEnd = false;
×
2961
    build->blk = NULL;
×
2962
    goto _return;
×
2963
  }
2964

2965
  if (*(int64_t*)pCol->pData < pCtx->winBeginTs) {
×
2966
    *winEnd = true;
×
2967
    goto _return;
×
2968
  }
2969

2970
  for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
×
2971
    if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) {
×
2972
      continue;
×
2973
    }
2974
  
2975
    if (*((int64_t*)pCol->pData + build->blkRowIdx) >= pCtx->winBeginTs) {
×
2976
      SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
×
2977
      SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
2978
      if (NULL == pGrp) {
×
2979
        MJ_ERR_RET(terrno);
×
2980
      }
2981
      
2982
      pGrp->readIdx = pGrp->beginIdx;
×
2983
      pGrp->endIdx = pGrp->beginIdx;
×
2984
  
2985
      build->blk = NULL;
×
2986
      pCache->rowNum = 1;
×
2987
    } else {
2988
      pCache->rowNum = 0;
×
2989
    }
2990
  
2991
    *winEnd = true;  
×
2992
    return TSDB_CODE_SUCCESS;
×
2993
  }  
2994

2995
  return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2996

2997
_return:
×
2998

2999
  pCache->rowNum = 0;
×
3000

3001
  return TSDB_CODE_SUCCESS;
×
3002
}
3003

3004

3005

3006
int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx) {
×
3007
  SMJoinWinCache* pCache = &pCtx->cache;
×
3008
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
3009
  bool winEnd = false;
×
3010
  if (NULL != build->blk) {
×
3011
    MJ_ERR_RET(mWinJoinTryAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd));
×
3012
    if (winEnd || taosArrayGetSize(pCache->grps) > 0) {
×
3013
      return TSDB_CODE_SUCCESS;
×
3014
    }
3015
  }
3016

3017
  if (build->dsFetchDone) {
×
3018
    goto _return;
×
3019
  }
3020
  
3021
  do {
3022
    build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build);
×
3023
    qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
×
3024
    
3025
    build->blkRowIdx = 0;
×
3026
    
3027
    if (NULL == build->blk) {
×
3028
      break;
×
3029
    }
3030

3031
    MJ_ERR_RET(mWinJoinTryAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd));
×
3032
    if (winEnd || taosArrayGetSize(pCache->grps) > 0) {
×
3033
      return TSDB_CODE_SUCCESS;
×
3034
    }
3035
  } while (true);
3036

3037
_return:
×
3038

3039
  return TSDB_CODE_SUCCESS;
×
3040
}
3041

3042

3043
int32_t mWinJoinMoveAscWinBegin(SMJoinWindowCtx* pCtx) {
×
3044
  SMJoinWinCache* pCache = &pCtx->cache;
×
3045
  
3046
  do {
×
3047
    int32_t grpNum = taosArrayGetSize(pCache->grps);
×
3048
    for (int32_t i = 0; i < grpNum; ++i) {
×
3049
      SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
×
3050
      if (NULL == pGrp) {
×
3051
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3052
      }
3053

3054
      SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
×
3055
      if (NULL == pCol) {
×
3056
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3057
      }
3058

3059
      if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) {
×
3060
        mWinJoinPopFrontGroup(pCtx, pGrp);
3061
        grpNum--;
×
3062
        i--;
×
3063
        continue;
×
3064
      }
3065

3066
      int32_t startIdx = pGrp->beginIdx;
×
3067
      for (; pGrp->beginIdx < pGrp->blk->info.rows; pGrp->beginIdx++) {
×
3068
        if (*((int64_t*)pCol->pData + pGrp->beginIdx) < pCtx->winBeginTs) {
×
3069
          continue;
×
3070
        }
3071

3072
        if (*((int64_t*)pCol->pData + pGrp->beginIdx) <= pCtx->winEndTs) {
×
3073
          pGrp->readIdx = pGrp->beginIdx;
×
3074
          if (pGrp->endIdx < pGrp->beginIdx) {
×
3075
            pGrp->endIdx = pGrp->beginIdx;
×
3076
            pCache->rowNum = 1;
×
3077
          } else {
3078
            pCache->rowNum -= (pGrp->beginIdx - startIdx);
×
3079
          }
3080
          return TSDB_CODE_SUCCESS;
×
3081
        }
3082

3083
        pGrp->endIdx = pGrp->beginIdx;
×
3084
        pCache->rowNum = 0;
×
3085
        TSWAP(pCache->grps, pCache->grpsQueue);
×
3086
        return TSDB_CODE_SUCCESS;
×
3087
      }
3088
    }
3089

3090
    if (NULL != pCache->grpsQueue) {
×
3091
      pCache->grps = pCache->grpsQueue;
×
3092
      pCache->rowNum = 1;
×
3093
      pCache->grpsQueue = NULL;
×
3094
      
3095
      continue;
×
3096
    }
3097

3098
    break;
×
3099
  } while (true);
3100

3101
  return mWinJoinAddWinBeginBlk(pCtx);
×
3102
}
3103

3104
int32_t mWinJoinMoveDescWinBegin(SMJoinWindowCtx* pCtx) {
×
3105
  SMJoinWinCache* pCache = &pCtx->cache;
×
3106
  
3107
  do {
×
3108
    int32_t grpNum = taosArrayGetSize(pCache->grps);
×
3109
    for (int32_t i = 0; i < grpNum; ++i) {
×
3110
      SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
×
3111
      if (NULL == pGrp) {
×
3112
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3113
      }
3114

3115
      SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
×
3116
      if (NULL == pCol) {
×
3117
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3118
      }
3119

3120
      if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) > pCtx->winEndTs) {
×
3121
        mWinJoinPopFrontGroup(pCtx, pGrp);
3122

3123
        grpNum--;
×
3124
        i--;
×
3125
        continue;
×
3126
      }
3127

3128
      int32_t startIdx = pGrp->beginIdx;
×
3129
      for (; pGrp->beginIdx < pGrp->blk->info.rows; pGrp->beginIdx++) {
×
3130
        if (*((int64_t*)pCol->pData + pGrp->beginIdx) > pCtx->winEndTs) {
×
3131
          continue;
×
3132
        }
3133

3134
        if (*((int64_t*)pCol->pData + pGrp->beginIdx) >= pCtx->winBeginTs) {
×
3135
          pGrp->readIdx = pGrp->beginIdx;
×
3136
          if (pGrp->endIdx < pGrp->beginIdx) {
×
3137
            pGrp->endIdx = pGrp->beginIdx;
×
3138
            pCache->rowNum = 1;
×
3139
          } else {
3140
            pCache->rowNum -= (pGrp->beginIdx - startIdx);
×
3141
          }
3142
          return TSDB_CODE_SUCCESS;
×
3143
        }
3144

3145
        pGrp->endIdx = pGrp->beginIdx;
×
3146
        pCache->rowNum = 0;
×
3147
        TSWAP(pCache->grps, pCache->grpsQueue);
×
3148
        return TSDB_CODE_SUCCESS;
×
3149
      }
3150
    }
3151

3152
    if (NULL != pCache->grpsQueue) {
×
3153
      pCache->grps = pCache->grpsQueue;
×
3154
      pCache->rowNum = 1;
×
3155
      pCache->grpsQueue = NULL;
×
3156
      
3157
      continue;
×
3158
    }
3159

3160
    break;
×
3161
  } while (true);
3162

3163
  return mWinJoinAddWinBeginBlk(pCtx);
×
3164
}
3165

3166
void mWinJoinRemoveOverflowGrp(SMJoinWindowCtx* pCtx) {
×
3167
  if (pCtx->cache.rowNum <= pCtx->jLimit) {
×
3168
    return;
×
3169
  }
3170

3171
  int32_t i = 0;
×
3172
  while (true) {
×
3173
    SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, i++);
×
3174
    if (NULL == pGrp) {
×
3175
      return;
×
3176
    }
3177

3178
    if ((pCtx->cache.rowNum - (pGrp->blk->info.rows - pGrp->beginIdx)) < pCtx->jLimit) {
×
3179
      return;
×
3180
    }
3181

3182
    mWinJoinPopFrontGroup(pCtx, pGrp);
3183
    i--;
×
3184
  }
3185
}
3186

3187
int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
×
3188
  SSDataBlock* pBlk = build->blk;
×
3189
  SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId);
×
3190
  if (NULL == pCol) {
×
3191
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3192
  }
3193

3194
  SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
×
3195

3196
  if (pCtx->ascTs) {
×
3197
    if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) {
×
3198
      *winEnd = true;
×
3199
      return TSDB_CODE_SUCCESS;
×
3200
    }
3201

3202
    if (*((int64_t*)pCol->pData +  pBlk->info.rows - 1) < pCtx->winBeginTs) {
×
3203
      *winEnd = false;
×
3204
      goto _return;
×
3205
    }
3206

3207
    if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) {
×
3208
      SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
3209
      if (NULL == pGrp) {
×
3210
        MJ_ERR_RET(terrno);
×
3211
      }
3212
      
3213
      pGrp->readIdx = pGrp->beginIdx;
×
3214
      pGrp->endIdx = pBlk->info.rows - 1;
×
3215

3216
      pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1);
×
3217
      if (pCache->rowNum >= pCtx->jLimit) {
×
3218
        pGrp->endIdx = pBlk->info.rows - 1 + pCtx->jLimit - pCache->rowNum;
×
3219
        pCache->rowNum = pCtx->jLimit;
×
3220

3221
        *winEnd = true;
×
3222
        goto _return;
×
3223
      }
3224
      
3225
      *winEnd = false;
×
3226
      goto _return;
×
3227
    }
3228

3229
    for (; build->blkRowIdx < pBlk->info.rows && pCache->rowNum < pCtx->jLimit; build->blkRowIdx++) {
×
3230
      if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) {
×
3231
        pCache->rowNum++;
×
3232
        continue;
×
3233
      }
3234

3235
      break;
×
3236
    }
3237

3238
    SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
3239
    if (NULL == pGrp) {
×
3240
      MJ_ERR_RET(terrno);
×
3241
    }
3242
    
3243
    pGrp->readIdx = pGrp->beginIdx;
×
3244
    pGrp->endIdx = build->blkRowIdx - 1;
×
3245
    
3246
    *winEnd = true;  
×
3247
    goto _return;
×
3248
  }
3249

3250
  if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) {
×
3251
    *winEnd = true;
×
3252
    return TSDB_CODE_SUCCESS;
×
3253
  }
3254
  
3255
  if (*((int64_t*)pCol->pData +  pBlk->info.rows - 1) > pCtx->winEndTs) {
×
3256
    *winEnd = false;
×
3257
    goto _return;
×
3258
  }
3259
  
3260
  if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) {
×
3261
    SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
3262
    if (NULL == pGrp) {
×
3263
      MJ_ERR_RET(terrno);
×
3264
    }
3265
    
3266
    pGrp->readIdx = pGrp->beginIdx;
×
3267
    pGrp->endIdx = pBlk->info.rows - 1;
×
3268
  
3269
    pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1);
×
3270

3271
    mWinJoinRemoveOverflowGrp(pCtx);
×
3272
    
3273
    *winEnd = false;
×
3274
    goto _return;
×
3275
  }
3276
  
3277
  for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
×
3278
    if (*((int64_t*)pCol->pData + build->blkRowIdx) >= pCtx->winBeginTs) {
×
3279
      pCache->rowNum++;
×
3280
      continue;
×
3281
    }
3282
  
3283
    break;
×
3284
  }
3285
  
3286
  SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
3287
  if (NULL == pGrp) {
×
3288
    MJ_ERR_RET(terrno);
×
3289
  }
3290
    
3291
  pGrp->readIdx = pGrp->beginIdx;
×
3292
  pGrp->endIdx = build->blkRowIdx - 1;
×
3293

3294
  mWinJoinRemoveOverflowGrp(pCtx);
×
3295
  
3296
  *winEnd = true;  
×
3297

3298
_return:
×
3299

3300
  build->blk = NULL;
×
3301

3302
  return TSDB_CODE_SUCCESS;
×
3303
}
3304

3305
int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx) {
×
3306
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
3307
  bool winEnd = false;
×
3308
  if (NULL != build->blk) {
×
3309
    MJ_ERR_RET(mWinJoinTryAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd));
×
3310
    if (winEnd) {
×
3311
      return TSDB_CODE_SUCCESS;
×
3312
    }
3313
  }
3314

3315
  if (build->dsFetchDone) {
×
3316
    goto _return;
×
3317
  }
3318

3319
  do {
3320
    MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx));
×
3321
    
3322
    build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build);
×
3323
    qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
×
3324
    
3325
    build->blkRowIdx = 0;
×
3326
    
3327
    if (NULL == build->blk) {
×
3328
      break;
×
3329
    }
3330

3331
    MJ_ERR_RET(mWinJoinTryAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd));
×
3332
    if (winEnd) {
×
3333
      return TSDB_CODE_SUCCESS;
×
3334
    }
3335
  } while (true);
3336

3337
_return:
×
3338

3339
  return TSDB_CODE_SUCCESS;
×
3340
}
3341

3342
int32_t mWinJoinMoveAscWinEnd(SMJoinWindowCtx* pCtx) {
×
3343
  SMJoinWinCache* pCache = &pCtx->cache;
×
3344
  int32_t grpNum = taosArrayGetSize(pCache->grps);
×
3345
  if (grpNum <= 0 || pCache->rowNum >= pCtx->jLimit) {
×
3346
    return TSDB_CODE_SUCCESS;
×
3347
  }
3348
  
3349
  SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
×
3350
  if (NULL == pGrp) {
×
3351
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3352
  }
3353

3354
  SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
×
3355
  if (NULL == pCol) {
×
3356
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3357
  }
3358

3359
  if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) {
×
3360
    pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1;
×
3361
    if (pCache->rowNum >= pCtx->jLimit) {
×
3362
      pGrp->endIdx = pGrp->blk->info.rows - 1 + pCtx->jLimit - pCache->rowNum;
×
3363
      pCache->rowNum = pCtx->jLimit;
×
3364

3365
      return TSDB_CODE_SUCCESS;
×
3366
    } else {
3367
      pGrp->endIdx = pGrp->blk->info.rows - 1;
×
3368
    }
3369
  } else {
3370
    int32_t startIdx = pGrp->endIdx;
×
3371
    for (; pCache->rowNum < pCtx->jLimit && ++pGrp->endIdx < pGrp->blk->info.rows; ) {
×
3372
      if (*((int64_t*)pCol->pData + pGrp->endIdx) <= pCtx->winEndTs) {
×
3373
        pCache->rowNum++;
×
3374
        if ((pGrp->endIdx + 1) >= pGrp->blk->info.rows) {
×
3375
          break;
×
3376
        }
3377
        
3378
        continue;
×
3379
      }
3380

3381
      //A S S E R T(pGrp->endIdx > startIdx);
3382
      
3383
      pGrp->endIdx--;
×
3384
      break;
×
3385
    }
3386

3387
    return TSDB_CODE_SUCCESS;
×
3388
  }
3389

3390
  return mWinJoinAddWinEndBlk(pCtx);
×
3391
}
3392

3393
int32_t mWinJoinMoveDescWinEnd(SMJoinWindowCtx* pCtx) {
×
3394
  SMJoinWinCache* pCache = &pCtx->cache;
×
3395
  int32_t grpNum = taosArrayGetSize(pCache->grps);
×
3396
  if (grpNum <= 0) {
×
3397
    return TSDB_CODE_SUCCESS;
×
3398
  }
3399
  
3400
  SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
×
3401
  if (NULL == pGrp) {
×
3402
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3403
  }
3404

3405
  SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
×
3406
  if (NULL == pCol) {
×
3407
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3408
  }
3409

3410
  if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) >= pCtx->winBeginTs) {
×
3411
    pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1;
×
3412
    pGrp->endIdx = pGrp->blk->info.rows - 1;
×
3413
  } else {
3414
    int32_t startIdx = pGrp->endIdx;
×
3415
    for (; ++pGrp->endIdx < pGrp->blk->info.rows; ) {
×
3416
      if (*((int64_t*)pCol->pData + pGrp->endIdx) >= pCtx->winBeginTs) {
×
3417
        pCache->rowNum++;
×
3418
        if ((pGrp->endIdx + 1) >= pGrp->blk->info.rows) {
×
3419
          break;
×
3420
        }
3421
        
3422
        continue;
×
3423
      }
3424

3425
      //A S S E R T(pGrp->endIdx > startIdx);
3426
      
3427
      pGrp->endIdx--;
×
3428
      break;
×
3429
    }
3430

3431
    return TSDB_CODE_SUCCESS;
×
3432
  }
3433

3434
  return mWinJoinAddWinEndBlk(pCtx);
×
3435
}
3436

3437

3438
int32_t mWinJoinMoveFillWinCache(SMJoinWindowCtx* pCtx) {
×
3439
  MJ_ERR_RET((*pCtx->moveWinBeginFp)(pCtx));
×
3440
  MJ_ERR_RET((*pCtx->moveWinEndFp)(pCtx));
×
3441

3442
  return TSDB_CODE_SUCCESS;
×
3443
}
3444

3445
int32_t mWinJoinTrimDumpGrpCache(SMJoinWindowCtx* pCtx) {
×
3446
  if (!pCtx->ascTs) {
×
3447
    SMJoinWinCache* cache = &pCtx->cache;
×
3448
    if (cache->rowNum > pCtx->jLimit) {
×
3449
      int32_t skipRows = cache->rowNum - pCtx->jLimit;
×
3450
      int32_t buildGrpNum = taosArrayGetSize(cache->grps);
×
3451
      for (int32_t i = 0; i < buildGrpNum && skipRows > 0; ++i) {
×
3452
        SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, i);
×
3453
        if (NULL == buildGrp) {
×
3454
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3455
        }
3456

3457
        if (skipRows >= GRP_REMAIN_ROWS(buildGrp)) {
×
3458
          skipRows -= GRP_REMAIN_ROWS(buildGrp);
×
3459
          mWinJoinPopFrontGroup(pCtx, buildGrp);
3460
          buildGrpNum--;
×
3461
          i--;
×
3462
          continue;
×
3463
        } else {
3464
          buildGrp->beginIdx += skipRows;
×
3465
          buildGrp->readIdx = buildGrp->beginIdx;
×
3466
          break;
×
3467
        }
3468
      }
3469

3470
      cache->rowNum = pCtx->jLimit;
×
3471
    }
3472
  }
3473

3474
  return mWinJoinDumpGrpCache(pCtx);
×
3475
}
3476

3477
SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
×
3478
  SMJoinOperatorInfo* pJoin = pOperator->info;
×
3479
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
×
3480
  int32_t code = TSDB_CODE_SUCCESS;
×
3481
  int64_t probeTs = 0;
×
3482
  SColumnInfoData* pProbeCol = NULL;
×
3483
  bool newBlock = false;
×
3484

3485
  blockDataCleanup(pCtx->finBlk);
×
3486

3487
  if (pCtx->grpRemains) {
×
3488
    MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
×
3489
    if ((mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) {
×
3490
      return pCtx->finBlk;
×
3491
    }
3492
    pCtx->grpRemains = false;
×
3493
  }
3494

3495
  do {
3496
    MJ_ERR_JRET(mWinJoinRetrieve(pOperator, pJoin, pCtx, &newBlock));
×
3497
    if (!newBlock) {
×
3498
      if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
×
3499
        continue;
×
3500
      }
3501
      
3502
      break;
×
3503
    }
3504

3505
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
×
3506

3507
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
3508
      MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
3509

3510
      MJ_ERR_JRET(mJoinBuildEqGrp(pJoin->probe, probeTs, NULL, &pCtx->probeGrp));
×
3511
      
3512
      if (probeTs != pCtx->lastTs) {
×
3513
        pCtx->lastTs = probeTs;
×
3514
        pCtx->winBeginTs = probeTs + pCtx->winBeginOffset;
×
3515
        pCtx->winEndTs = probeTs + pCtx->winEndOffset;
×
3516
        MJ_ERR_JRET(mWinJoinMoveFillWinCache(pCtx));
×
3517
      }
3518

3519
      MJ_ERR_JRET(mWinJoinTrimDumpGrpCache(pCtx));
×
3520
      
3521
      if ((mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) {
×
3522
        return pCtx->finBlk;
×
3523
      }
3524
    }
3525
  } while (true);
3526

3527
_return:
×
3528

3529
  if (code) {
×
3530
    pJoin->errCode = code;
×
3531
    return NULL;
×
3532
  }
3533

3534
  return pCtx->finBlk;
×
3535
}
3536

3537
void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin) {
×
3538
  SMJoinWindowCtx* pWin = &pJoin->ctx.windowCtx;
×
3539
  SMJoinWinCache* pCache = &pWin->cache;
×
3540

3541
  pWin->lastEqGrp = false;
×
3542
  pWin->lastProbeGrp = false;
×
3543
  pWin->eqPostDone = false;
×
3544
  pWin->lastTs = INT64_MIN;
×
3545

3546
  mWinJoinResetWindowCache(pWin, pCache);
×
3547
  
3548
  mJoinResetGroupTableCtx(pJoin->probe);
×
3549
  mJoinResetGroupTableCtx(pJoin->build);  
×
3550
}
×
3551

3552
int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
×
3553
  pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT;
×
3554
  pCache->colNum = pJoin->build->finNum;
×
3555
  
3556
  pCache->grps = taosArrayInit(2, sizeof(SMJoinGrpRows));
×
3557
  if (NULL == pCache->grps) {
×
3558
    return terrno;
×
3559
  }
3560
  //taosArrayReserve(pTable->eqGrps, 1);
3561
  
3562
  return TSDB_CODE_SUCCESS;
×
3563
}
3564

3565
void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) {
×
3566
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
×
3567

3568
  mWinJoinResetWindowCache(pCtx, &pCtx->cache);
×
3569

3570
  blockDataDestroy(pCtx->finBlk);
×
3571
  pCtx->finBlk = NULL;
×
3572
  blockDataDestroy(pCtx->cache.outBlk);
×
3573
  pCtx->cache.outBlk = NULL;
×
3574

3575
  taosArrayDestroy(pCtx->cache.grps);
×
3576
}
×
3577

3578
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
×
3579
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
×
3580
  
3581
  pCtx->pJoin = pJoin;
×
3582
  pCtx->lastTs = INT64_MIN;
×
3583
  pCtx->seqWinGrp = pJoinNode->seqWinGroup;
×
3584
  if (pCtx->seqWinGrp) {
×
3585
    pJoin->outGrpId = 1;
×
3586
  }
3587

3588
  if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
×
3589
    pCtx->ascTs = true;
×
3590
  }
3591

3592
  switch (pJoinNode->subType) {
×
3593
    case JOIN_STYPE_ASOF:
×
3594
      pCtx->asofOpType = pJoinNode->asofOpType;
×
3595
      pCtx->jLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit) ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i : 1;
×
3596
      pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
×
3597
      pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
×
3598
      pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
×
3599

3600
      if ((pCtx->ascTs && pCtx->lowerRowsAcq) || (!pCtx->ascTs && pCtx->greaterRowsAcq) ) {
×
3601
        pJoin->joinFp = mAsofBackwardJoinDo;
×
3602
      } else {
3603
        pJoin->joinFp = mAsofForwardJoinDo;
×
3604
        pCtx->forwardRowsAcq = true;
×
3605
      }
3606
      pJoin->grpResetFp = mAsofJoinGroupReset;
×
3607
      break;
×
3608
    case JOIN_STYPE_WIN: {
×
3609
      SWindowOffsetNode* pOffsetNode = (SWindowOffsetNode*)pJoinNode->pWindowOffset;
×
3610
      SValueNode* pWinBegin = (SValueNode*)pOffsetNode->pStartOffset;
×
3611
      SValueNode* pWinEnd = (SValueNode*)pOffsetNode->pEndOffset;
×
3612
      pCtx->jLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit) ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i : INT64_MAX;
×
3613
      pCtx->winBeginOffset = pWinBegin->datum.i;
×
3614
      pCtx->winEndOffset = pWinEnd->datum.i;
×
3615
      pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0);
×
3616
      pCtx->lowerRowsAcq = pCtx->winBeginOffset < 0;
×
3617
      pCtx->greaterRowsAcq = pCtx->winEndOffset > 0;
×
3618
      pCtx->moveWinBeginFp = (joinMoveWin)(pCtx->ascTs ? mWinJoinMoveAscWinBegin : mWinJoinMoveDescWinBegin);
×
3619
      pCtx->moveWinEndFp = (joinMoveWin)(pCtx->ascTs ? mWinJoinMoveAscWinEnd : mWinJoinMoveDescWinEnd);
×
3620
      if ((pCtx->ascTs && !pCtx->lowerRowsAcq) || (!pCtx->ascTs && !pCtx->greaterRowsAcq) ) {
×
3621
        pCtx->forwardRowsAcq = true;
×
3622
      }
3623
      break;
×
3624
    }
3625
    default:
×
3626
      break;
×
3627
  }
3628

3629
  if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
×
3630
    pCtx->ascTs = true;
×
3631
  }
3632

3633
  pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
×
3634
  if (NULL == pCtx->finBlk) {
×
3635
    MJ_ERR_RET(terrno);
×
3636
  }
3637

3638
  MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
×
3639

3640
  pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;
×
3641

3642
  MJ_ERR_RET(mJoinInitWindowCache(&pCtx->cache, pJoin, pCtx));
×
3643
  
3644
  return TSDB_CODE_SUCCESS;
×
3645
}
3646

3647
void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin) {
26✔
3648
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
26✔
3649
  blockDataDestroy(pCtx->finBlk);
26✔
3650
  blockDataDestroy(pCtx->midBlk);
26✔
3651

3652
  pCtx->finBlk = NULL;
26✔
3653
  pCtx->midBlk = NULL;
26✔
3654
}
26✔
3655

3656

3657
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
26✔
3658
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
26✔
3659

3660
  pCtx->pJoin = pJoin;
26✔
3661
  pCtx->lastEqTs = INT64_MIN;
26✔
3662
  pCtx->hashCan = pJoin->probe->keyNum > 0;
26✔
3663

3664
  if (JOIN_STYPE_ASOF == pJoinNode->subType || JOIN_STYPE_WIN == pJoinNode->subType) {
26✔
3665
    pCtx->jLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit) ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i : 1;
×
3666
    pJoin->subType = JOIN_STYPE_OUTER;
×
3667
    pJoin->build->eqRowLimit = pCtx->jLimit;
×
3668
    pJoin->grpResetFp = mLeftJoinGroupReset;
×
3669
  } else {
3670
    pCtx->jLimit = -1;
26✔
3671
  }
3672
    
3673
  if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
26✔
3674
    pCtx->ascTs = true;
26✔
3675
  }
3676

3677
  pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
26✔
3678
  if (NULL == pCtx->finBlk) {
26✔
3679
    MJ_ERR_RET(terrno);
×
3680
  }
3681

3682
  //A S S E R T(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
3683

3684
  MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
26✔
3685
  
3686
  if (pJoin->pFPreFilter) {
26✔
3687
    pCtx->midBlk = NULL;
×
3688
    int32_t code = createOneDataBlock(pCtx->finBlk, false, &pCtx->midBlk);
×
3689
    if (code) {
×
3690
      MJ_ERR_RET(code);
×
3691
    }
3692
    MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity));
×
3693
  }
3694

3695
  pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;
26✔
3696

3697
  switch (pJoin->joinType) {
26✔
3698
    case JOIN_TYPE_INNER:
23✔
3699
      pCtx->hashCartFp = (joinCartFp)mInnerJoinHashCart;
23✔
3700
      pCtx->mergeCartFp = (joinCartFp)mInnerJoinMergeCart;
23✔
3701
      break;
23✔
3702
    case JOIN_TYPE_LEFT:
3✔
3703
    case JOIN_TYPE_RIGHT: {
3704
      switch (pJoin->subType) {
3✔
3705
        case JOIN_STYPE_OUTER:          
3✔
3706
          pCtx->hashCartFp = (joinCartFp)mLeftJoinHashCart;
3✔
3707
          pCtx->mergeCartFp = (joinCartFp)mLeftJoinMergeCart;
3✔
3708
          break;
3✔
3709
        case JOIN_STYPE_SEMI: 
×
3710
          pCtx->hashCartFp = (joinCartFp)mSemiJoinHashCart;
×
3711
          pCtx->mergeCartFp = (joinCartFp)mSemiJoinMergeCart;
×
3712
          break;
×
3713
        case JOIN_STYPE_ANTI:
×
3714
          pCtx->hashCartFp = (joinCartFp)mAntiJoinHashCart;
×
3715
          pCtx->mergeCartFp = (joinCartFp)mAntiJoinMergeCart;
×
3716
          break;
×
3717
        default:
×
3718
          break;
×
3719
      }
3720
      break;
3✔
3721
    }
3722
    case JOIN_TYPE_FULL:
×
3723
      pCtx->hashCartFp = (joinCartFp)mFullJoinHashCart;
×
3724
      pCtx->mergeCartFp = (joinCartFp)mFullJoinMergeCart;
×
3725
      break;
×
3726
    default:
×
3727
      break;
×
3728
  }
3729
  
3730
  return TSDB_CODE_SUCCESS;
26✔
3731
}
3732

3733

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc