• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4308

14 Jun 2025 02:06PM UTC coverage: 62.454% (-0.3%) from 62.777%
#4308

push

travis-ci

web-flow
fix: taosdump windows pthread_mutex_unlock crash(3.0) (#31357)

* fix: windows pthread_mutex_unlock crash

* enh: sync from main fix taosdump crash windows

* fix: restore .github action branch to main

153985 of 315105 branches covered (48.87%)

Branch coverage included in aggregate %.

238120 of 312727 relevant lines covered (76.14%)

6462519.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.1
/source/libs/executor/src/mergejoin.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "mergejoin.h"
29

30
static uint32_t mJoinGetFinBlkCapacity(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
103,143✔
31
  uint32_t maxRows = TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
103,143✔
32
  if (INT64_MAX != pJoin->ctx.mergeCtx.limit && NULL == pJoin->pFinFilter) {
103,143✔
33
    uint32_t limitMaxRows = pJoin->ctx.mergeCtx.limit / MJOIN_BLK_THRESHOLD_RATIO + 1;
819✔
34
    maxRows = TMIN(maxRows, limitMaxRows);
819✔
35
  }
36

37
  if (JOIN_STYPE_SEMI == pJoinNode->subType || JOIN_STYPE_ANTI == pJoinNode->subType) {
103,143✔
38
    maxRows = TMIN(MJOIN_SEMI_ANTI_BLK_ROWS_NUM, maxRows);
971✔
39
  }
40

41
  return maxRows;
103,143✔
42
}
43

44
static FORCE_INLINE bool mJoinBlkReachThreshold(SMJoinOperatorInfo* pInfo, int64_t blkRows) {
45
  if (INT64_MAX == pInfo->ctx.mergeCtx.limit || pInfo->pFinFilter != NULL) {
4,692,681!
46
    return blkRows >= pInfo->ctx.mergeCtx.blkThreshold;
5,027,615✔
47
  }
48
  
49
  return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.mergeCtx.limit || blkRows >= pInfo->ctx.mergeCtx.blkThreshold;
4,000,881!
50
}
51

52

53
int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
6,690✔
54
  int64_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
6,690✔
55
  SMJoinWinCache* cache = &pCtx->cache;
6,690✔
56
  int32_t buildGrpNum = taosArrayGetSize(cache->grps);
6,690✔
57
  int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit);
6,690✔
58

59
  pCtx->finBlk->info.id.groupId = pCtx->seqWinGrp ? pCtx->pJoin->outGrpId : 0;
6,690✔
60

61
  if (buildGrpNum <= 0 || buildTotalRows <= 0) {
6,690!
62
    MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp));   
282!
63
    if (pCtx->seqWinGrp) {
282✔
64
      pCtx->pJoin->outGrpId++;
159✔
65
    }
66
    return TSDB_CODE_SUCCESS;
282✔
67
  }
68
  
69
  SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
6,408✔
70
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
6,408✔
71
  int32_t probeEndIdx = probeGrp->endIdx;
6,408✔
72

73
  if ((!pCtx->seqWinGrp) && 0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) {
6,408!
74
    SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0);
4,047✔
75
    if (NULL == pFirstBuild) {
4,047!
76
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
77
    }
78
    if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
4,047!
79
      for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) {
8,094✔
80
        SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
4,047✔
81
        if (NULL == buildGrp) {
4,047!
82
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
83
        }
84
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
4,047!
85
        buildGrp->readIdx = buildGrp->beginIdx;
4,047✔
86
      }
87

88
      cache->grpIdx = 0;
4,047✔
89
      pCtx->grpRemains = false;
4,047✔
90
      return TSDB_CODE_SUCCESS;
4,047✔
91
    }
92
  }
93

94
  for (; !GRP_DONE(probeGrp); ) {
2,361!
95
    probeGrp->endIdx = probeGrp->readIdx;
2,361✔
96
    for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) {
4,722!
97
      SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
2,361✔
98
      if (NULL == buildGrp) {
2,361!
99
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
100
      }
101

102
      if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
2,361!
103
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
2,361!
104
        rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
2,361✔
105
        buildGrp->readIdx = buildGrp->beginIdx;
2,361✔
106
        continue;
2,361✔
107
      }
108
      
109
      int32_t buildEndIdx = buildGrp->endIdx;
×
110
      buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
111
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
112
      buildGrp->readIdx += rowsLeft;
×
113
      buildGrp->endIdx = buildEndIdx;
×
114
      rowsLeft = 0;
×
115
      break;
×
116
    }
117
    probeGrp->endIdx = probeEndIdx;
2,361✔
118

119
    if (cache->grpIdx >= buildGrpNum) {
2,361!
120
      cache->grpIdx = 0;
2,361✔
121
      ++probeGrp->readIdx; 
2,361✔
122
      if (pCtx->seqWinGrp) {
2,361!
123
        pCtx->pJoin->outGrpId++;
2,361✔
124
        break;
2,361✔
125
      }
126
    }
127

128
    if (rowsLeft <= 0) {
×
129
      break;
×
130
    }
131
  }
132

133
  probeGrp->endIdx = probeEndIdx;        
2,361✔
134

135
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
2,361✔
136
  
137
  return TSDB_CODE_SUCCESS;  
2,361✔
138
}
139

140
static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) {
×
141
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
142
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
143
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
144

145
  if (build->grpRowIdx >= 0) {
×
146
    bool contLoop = false;
×
147
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
×
148
    if (build->grpRowIdx < 0) {
×
149
      probeGrp->readIdx++;
×
150
    }
151
    
152
    if (!contLoop) {
×
153
      goto _return;
×
154
    }
155
  }
156

157
  size_t bufLen = 0;
×
158
  int32_t probeEndIdx = probeGrp->endIdx;
×
159
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
×
160
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
161
      probeGrp->endIdx = probeGrp->readIdx;
×
162
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
163
      probeGrp->endIdx = probeEndIdx;
×
164
      continue;
×
165
    }
166

167
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
168
    if (NULL == pGrp) {
×
169
      probeGrp->endIdx = probeGrp->readIdx;
×
170
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
171
      probeGrp->endIdx = probeEndIdx;
×
172
      continue;
×
173
    }
174

175
    if (build->rowBitmapSize > 0) {
×
176
      build->pHashCurGrp = ((SMJoinHashGrpRows*)pGrp)->pRows;
×
177
      build->pHashGrpRows = pGrp;
×
178
      build->pHashGrpRows->allRowsMatch = true;
×
179
    } else {
180
      build->pHashCurGrp = *(SArray**)pGrp;
×
181
    }
182
    
183
    build->grpRowIdx = 0;
×
184
    bool contLoop = false;
×
185
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
×
186
    if (!contLoop) {
×
187
      if (build->grpRowIdx < 0) {
×
188
        probeGrp->readIdx++;
×
189
      }
190
      goto _return;
×
191
    }  
192
  }
193

194
_return:
×
195

196
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
197

198
  return TSDB_CODE_SUCCESS;
×
199
}
200

201

202
static int32_t mOuterJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
3,004,026✔
203
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
3,004,026✔
204
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
3,004,026✔
205
  SMJoinTableCtx* build = pCtx->pJoin->build;
3,004,026✔
206
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
3,004,026✔
207
  if (NULL == probeGrp) {
3,004,026!
208
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
209
  }
210

211
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
3,004,026✔
212
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
3,004,026✔
213
  int32_t probeEndIdx = probeGrp->endIdx;
3,004,026✔
214

215
  if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) {
3,004,026!
216
    SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0);
3,004,026✔
217
    if (NULL == pFirstBuild) {
3,004,026!
218
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
219
    }
220

221
    if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
3,004,026!
222
      for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
6,008,052✔
223
        SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
3,004,026✔
224
        if (NULL == buildGrp) {
3,004,026!
225
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
226
        }
227

228
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
3,004,026!
229
        buildGrp->readIdx = buildGrp->beginIdx;
3,004,026✔
230
      }
231

232
      pCtx->grpRemains = false;
3,004,026✔
233
      return TSDB_CODE_SUCCESS;
3,004,026✔
234
    }
235
  }
236

237
  for (; !GRP_DONE(probeGrp); ) {
×
238
    probeGrp->endIdx = probeGrp->readIdx;
×
239
    for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
×
240
      SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
×
241
      if (NULL == buildGrp) {
×
242
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
243
      }
244

245
      if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
×
246
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
247
        rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
×
248
        buildGrp->readIdx = buildGrp->beginIdx;
×
249
        continue;
×
250
      }
251
      
252
      int32_t buildEndIdx = buildGrp->endIdx;
×
253
      buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
254
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
255
      buildGrp->readIdx += rowsLeft;
×
256
      buildGrp->endIdx = buildEndIdx;
×
257
      rowsLeft = 0;
×
258
      break;
×
259
    }
260
    probeGrp->endIdx = probeEndIdx;
×
261

262
    if (build->grpIdx >= buildGrpNum) {
×
263
      build->grpIdx = 0;
×
264
      ++probeGrp->readIdx; 
×
265
    }
266

267
    if (rowsLeft <= 0) {
×
268
      break;
×
269
    }
270
  }
271

272
  probeGrp->endIdx = probeEndIdx;        
×
273

274
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
275
  
276
  return TSDB_CODE_SUCCESS;  
×
277
}
278

279
static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
567✔
280
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
567✔
281
  SMJoinTableCtx* build = pCtx->pJoin->build;
567✔
282
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
567✔
283
  if (NULL == probeGrp) {
567!
284
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
285
  }
286

287
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
567✔
288
  int32_t probeEndIdx = probeGrp->endIdx;
567✔
289
  int32_t rowsLeft = pCtx->midBlk->info.capacity;  
567✔
290
  bool contLoop = true;
567✔
291
  int32_t startGrpIdx = 0;
567✔
292
  int32_t startRowIdx = -1;
567✔
293

294
  //blockDataCleanup(pCtx->midBlk);
295

296
  do {
297
    for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); 
1,572!
298
      ++probeGrp->readIdx, probeGrp->readMatch = false, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
1,005✔
299
      probeGrp->endIdx = probeGrp->readIdx;
1,005✔
300
      
301
      rowsLeft = pCtx->midBlk->info.capacity;
1,005✔
302
      startGrpIdx = build->grpIdx;
1,005✔
303
      startRowIdx = -1;
1,005✔
304
      
305
      for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
2,010!
306
        SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
1,005✔
307
        if (NULL == buildGrp) {
1,005!
308
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
309
        }
310

311
        if (startRowIdx < 0) {
1,005!
312
          startRowIdx = buildGrp->readIdx;
1,005✔
313
        }
314

315
        if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
1,005!
316
          MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
1,005!
317
          rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
1,005✔
318
          buildGrp->readIdx = buildGrp->beginIdx;
1,005✔
319
          continue;
1,005✔
320
        }
321
        
322
        int32_t buildEndIdx = buildGrp->endIdx;
×
323
        buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
324
        //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
325
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
326
        buildGrp->readIdx += rowsLeft;
×
327
        buildGrp->endIdx = buildEndIdx;
×
328
        break;
×
329
      }
330

331
      if (pCtx->midBlk->info.rows > 0) {
1,005!
332
        if (build->rowBitmapSize > 0) {
1,005✔
333
          MJ_ERR_RET(mJoinFilterAndMarkRows(pCtx->midBlk, pCtx->pJoin->pFPreFilter, build, startGrpIdx, startRowIdx));
306!
334
        } else {
335
          MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pFPreFilter, NULL));
699!
336
        }
337

338
        if (pCtx->midBlk->info.rows > 0) {
1,005✔
339
          probeGrp->readMatch = true;
576✔
340
        }
341
      } 
342

343
      if (0 == pCtx->midBlk->info.rows) {
1,005✔
344
        if (build->grpIdx == buildGrpNum) {
429!
345
          if (!probeGrp->readMatch) {
429!
346
            MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
429!
347
          }
348

349
          continue;
429✔
350
        }
351
      } else {
352
        MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
576!
353
        
354
        if (pCtx->midRemains) {
576!
355
          contLoop = false;
×
356
        } else if (build->grpIdx == buildGrpNum) {
576!
357
          continue;
576✔
358
        }
359
      }
360

361
      //need break
362

363
      probeGrp->endIdx = probeEndIdx;
×
364
      
365
      if (build->grpIdx >= buildGrpNum) {
×
366
        build->grpIdx = 0;
×
367
        ++probeGrp->readIdx;
×
368
        probeGrp->readMatch = false;
×
369
      }      
370

371
      break;
×
372
    }
373

374
    if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
567!
375
      break;
376
    }
377
  } while (contLoop);
×
378

379
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
567✔
380

381
  return TSDB_CODE_SUCCESS;
567✔
382
}
383

384
static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop) {
×
385
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
386
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
387
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
388
  if (NULL == probeGrp) {
×
389
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
390
  }
391

392
  int32_t startRowIdx = 0;
×
393
  
394
  //blockDataCleanup(pCtx->midBlk);
395

396
  do {
397
    startRowIdx = build->grpRowIdx;
×
398
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
×
399

400
    if (pCtx->midBlk->info.rows > 0) {
×
401
      if (build->rowBitmapSize > 0) {
×
402
        MJ_ERR_RET(mJoinFilterAndMarkHashRows(pCtx->midBlk, pCtx->pJoin->pPreFilter, build, startRowIdx));
×
403
      } else {
404
        MJ_ERR_RET(doFilter(pCtx->midBlk, pCtx->pJoin->pPreFilter, NULL));
×
405
      }
406
      if (pCtx->midBlk->info.rows > 0) {
×
407
        probeGrp->readMatch = true;
×
408
      }
409
    } 
410

411
    if (0 == pCtx->midBlk->info.rows) {
×
412
      if (build->grpRowIdx < 0) {
×
413
        if (!probeGrp->readMatch) {
×
414
          MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
415
        }
416

417
        break;
×
418
      }
419
      
420
      continue;
×
421
    } else {
422
      MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
×
423
      
424
      if (pCtx->midRemains) {
×
425
        pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
426
        *contLoop = false;
×
427
        return TSDB_CODE_SUCCESS;
×
428
      }
429

430
      if (build->grpRowIdx < 0) {
×
431
        break;
×
432
      }
433

434
      continue;
×
435
    }
436
  } while (true);
437

438
  *contLoop = true;
×
439
  return TSDB_CODE_SUCCESS;
×
440
}
441

442

443
static int32_t mOuterJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
×
444
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
445
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
446
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
×
447
  if (NULL == probeGrp) {
×
448
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
449
  }
450

451
  bool contLoop = false;
×
452

453
  if (build->grpRowIdx >= 0) {
×
454
    MJ_ERR_RET(mOuterJoinHashGrpCartFilter(pCtx, &contLoop));
×
455
    if (build->grpRowIdx < 0) {
×
456
      probeGrp->readIdx++;
×
457
      probeGrp->readMatch = false;
×
458
    }
459

460
    if (!contLoop) {
×
461
      goto _return;
×
462
    }
463
  }
464

465
  size_t bufLen = 0;
×
466
  int32_t probeEndIdx = probeGrp->endIdx;
×
467
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);) {
×
468
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
469
      probeGrp->endIdx = probeGrp->readIdx;
×
470
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
471
      probeGrp->endIdx = probeEndIdx;
×
472
      probeGrp->readIdx++;
×
473
      probeGrp->readMatch = false;
×
474
      continue;
×
475
    }
476

477
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
478
    if (NULL == pGrp) {
×
479
      probeGrp->endIdx = probeGrp->readIdx;
×
480
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
481
      probeGrp->endIdx = probeEndIdx;
×
482
      probeGrp->readIdx++;
×
483
      probeGrp->readMatch = false;
×
484
      continue;
×
485
    }
486

487
    if (build->rowBitmapSize > 0) {
×
488
      build->pHashCurGrp = ((SMJoinHashGrpRows*)pGrp)->pRows;
×
489
      build->pHashGrpRows = pGrp;
×
490
      if (0 == build->pHashGrpRows->rowBitmapOffset) {
×
491
        MJ_ERR_RET(mJoinGetRowBitmapOffset(build, taosArrayGetSize(build->pHashCurGrp), &build->pHashGrpRows->rowBitmapOffset));
×
492
      }
493
    } else {
494
      build->pHashCurGrp = *(SArray**)pGrp;
×
495
    }
496
    
497
    build->grpRowIdx = 0;
×
498

499
    probeGrp->endIdx = probeGrp->readIdx;      
×
500
    MJ_ERR_RET(mOuterJoinHashGrpCartFilter(pCtx, &contLoop));
×
501
    probeGrp->endIdx = probeEndIdx;
×
502
    if (build->grpRowIdx < 0) {
×
503
      probeGrp->readIdx++;
×
504
      probeGrp->readMatch = false;
×
505
    }
506

507
    if (!contLoop) {
×
508
      break;
×
509
    }
510
  }
511

512
_return:
×
513

514
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
515

516
  return TSDB_CODE_SUCCESS;
×
517
}
518

519

520
static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) {
3,003,727✔
521
  return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx);
3,003,727✔
522
}
523

524

525

526
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
3,901✔
527
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
3,901✔
528
  bool buildGot = false;
3,901✔
529

530
  do {
531
    if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {  
3,952!
532
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
2,410✔
533
    }
534
    
535
    if (!probeGot) {
3,952✔
536
      if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
1,542✔
537
        mJoinSetDone(pOperator);
1,275✔
538
      }
539

540
      return false;
1,542✔
541
    }
542

543
    if (buildGot) {
2,410✔
544
      SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
2,314✔
545
      if (NULL == pProbeCol) {
2,314!
546
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
547
      }
548

549
      SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
2,314✔
550
      if (NULL == pBuildCol) {
2,314!
551
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
552
      }
553

554
      if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
2,314!
555
        pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
51✔
556
        buildGot = false;
51✔
557
        continue;
51✔
558
      }
559
    }
560
    
561
    break;
2,359✔
562
  } while (true);
563

564
  return true;
2,359✔
565
}
566

567
static int32_t mLeftJoinHashCart(SMJoinMergeCtx* pCtx) {
×
568
  return (NULL == pCtx->pJoin->pPreFilter) ? mOuterJoinHashFullCart(pCtx) : mOuterJoinHashSeqCart(pCtx);
×
569
}
570

571
static FORCE_INLINE int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
572
  if (pCtx->lastEqGrp) {
×
573
    return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
574
  }
575
  
576
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
×
577
}
578

579
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
1,122✔
580
  SMJoinOperatorInfo* pJoin = pOperator->info;
1,122✔
581
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
1,122✔
582
  int32_t code = TSDB_CODE_SUCCESS;
1,122✔
583
  int64_t probeTs = 0;
1,122✔
584
  int64_t buildTs = 0;
1,122✔
585
  SColumnInfoData* pBuildCol = NULL;
1,122✔
586
  SColumnInfoData* pProbeCol = NULL;
1,122✔
587

588
  blockDataCleanup(pCtx->finBlk);
1,122✔
589

590
  if (pCtx->midRemains) {
1,122!
591
    MJ_ERR_JRET(mJoinHandleMidRemains(pCtx));
×
592
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
593
      return pCtx->finBlk;
×
594
    }
595
    pCtx->midRemains = false;
×
596
  }
597

598
  if (pCtx->grpRemains) {
1,122!
599
    MJ_ERR_JRET(mLeftJoinHandleGrpRemains(pCtx));
×
600
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
601
      return pCtx->finBlk;
×
602
    }
603
    pCtx->grpRemains = false;
×
604
  }
605

606
  do {
607
    if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
2,939✔
608
      if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
1,064!
609
        continue;
×
610
      }
611

612
      break;
1,064✔
613
    }
614

615
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
1,875!
616
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
1,875!
617
    
618
    if (probeTs == pCtx->lastEqTs) {
1,875!
619
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
620
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
621
        return pCtx->finBlk;
×
622
      }
623

624
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
625
        continue;
×
626
      } else {
627
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
628
      }
629
    }
630

631
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
3,005,937✔
632
      if (probeTs == buildTs) {
3,004,120✔
633
        pCtx->lastEqTs = probeTs;
3,003,727✔
634
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
3,003,727!
635
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
6,007,454✔
636
          return pCtx->finBlk;
58✔
637
        }
638

639
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
3,003,669!
640
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
3,003,669!
641
      } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
393✔
642
        MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
243!
643
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
486!
644
          return pCtx->finBlk;
×
645
        }
646
      } else {
647
        while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
162✔
648
          MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
141✔
649
          if (PROBE_TS_NREACH(pCtx->ascTs, probeTs, buildTs)) {
141!
650
            continue;
12✔
651
          }
652
          
653
          break;
129✔
654
        }
655
      }
656
    }
657

658
    if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
1,817!
659
      pCtx->probeNEqGrp.blk = pJoin->probe->blk;
78✔
660
      pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
78✔
661
      pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
78✔
662
      pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
78✔
663
      
664
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
78✔
665
            
666
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
78!
667
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
156!
668
        return pCtx->finBlk;
×
669
      }
670
    }
671
  } while (true);
672

673
_return:
1,064✔
674

675
  if (code) {
1,064!
676
    pJoin->errCode = code;
×
677
    return NULL;
×
678
  }
679

680
  return pCtx->finBlk;
1,064✔
681
}
682

683
void mLeftJoinGroupReset(SMJoinOperatorInfo* pJoin) {
267✔
684
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
267✔
685

686
  pCtx->lastEqGrp = false;
267✔
687
  pCtx->lastProbeGrp = false;
267✔
688
  pCtx->hashCan = false;
267✔
689
  pCtx->midRemains = false;
267✔
690
  pCtx->lastEqTs = INT64_MIN;
267✔
691

692
  mJoinResetGroupTableCtx(pJoin->probe);
267✔
693
  mJoinResetGroupTableCtx(pJoin->build);    
267✔
694
}
267✔
695

696

697
static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) {
6,010,003✔
698
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
6,010,003✔
699
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
6,010,003✔
700
  SMJoinTableCtx* build = pCtx->pJoin->build;
6,010,003✔
701
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
6,010,003✔
702
  if (NULL == probeGrp) {
6,010,007!
703
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
704
  }
705

706
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
6,010,007✔
707
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
6,010,007✔
708
  int32_t probeEndIdx = probeGrp->endIdx;
6,010,007✔
709

710
  if (0 == build->grpIdx && probeRows * build->grpTotalRows <= rowsLeft) {
6,010,007!
711
    SMJoinGrpRows* pFirstBuild = taosArrayGet(build->eqGrps, 0);
6,009,209✔
712
    if (NULL == pFirstBuild) {
6,009,209!
713
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
714
    }
715

716
    if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
6,009,209!
717
      for (; build->grpIdx < buildGrpNum; ++build->grpIdx) {
12,025,462✔
718
        SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
6,016,245✔
719
        if (NULL == buildGrp) {
6,016,243!
720
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
721
        }
722

723
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
6,016,243!
724
        buildGrp->readIdx = buildGrp->beginIdx;
6,016,253✔
725
      }
726

727
      pCtx->grpRemains = false;
6,009,217✔
728
      return TSDB_CODE_SUCCESS;
6,009,217✔
729
    }
730
  }
731

732
  for (; !GRP_DONE(probeGrp); ) {
798!
733
    probeGrp->endIdx = probeGrp->readIdx;
800✔
734
    for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
800!
735
      SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
800✔
736
      if (NULL == buildGrp) {
800!
737
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
738
      }
739

740
      if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
800!
741
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
742
        rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
×
743
        buildGrp->readIdx = buildGrp->beginIdx;
×
744
        continue;
×
745
      }
746
      
747
      int32_t buildEndIdx = buildGrp->endIdx;
800✔
748
      buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
800✔
749
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
800!
750
      buildGrp->readIdx += rowsLeft;
800✔
751
      buildGrp->endIdx = buildEndIdx;
800✔
752
      rowsLeft = 0;
800✔
753
      break;
800✔
754
    }
755
    probeGrp->endIdx = probeEndIdx;
800✔
756

757
    if (build->grpIdx >= buildGrpNum) {
800!
758
      build->grpIdx = 0;
×
759
      ++probeGrp->readIdx; 
×
760
    }
761

762
    if (rowsLeft <= 0) {
800!
763
      break;
800✔
764
    }
765
  }
766

767
  probeGrp->endIdx = probeEndIdx;        
798✔
768

769
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
798✔
770
  
771
  return TSDB_CODE_SUCCESS;  
798✔
772
}
773

774

775
static int32_t mInnerJoinHashCart(SMJoinMergeCtx* pCtx) {
×
776
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
777
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
778
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
779
  if (NULL == probeGrp) {
×
780
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
781
  }
782

783
  if (build->grpRowIdx >= 0) {
×
784
    bool contLoop = false;
×
785
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
×
786
    if (build->grpRowIdx < 0) {
×
787
      probeGrp->readIdx++;
×
788
    }
789
    
790
    if (!contLoop) {
×
791
      goto _return;
×
792
    }
793
  }
794

795
  size_t bufLen = 0;
×
796
  int32_t probeEndIdx = probeGrp->endIdx;
×
797
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
×
798
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
799
      continue;
×
800
    }
801

802
    SArray** pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
803
    if (NULL != pGrp) {
×
804
      build->pHashCurGrp = *pGrp;
×
805
      build->grpRowIdx = 0;
×
806
      bool contLoop = false;
×
807
      MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, &contLoop));
×
808
      if (!contLoop) {
×
809
        if (build->grpRowIdx < 0) {
×
810
          probeGrp->readIdx++;
×
811
        }
812
        goto _return;
×
813
      }  
814
    }
815
  }
816

817
_return:
×
818

819
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
820

821
  return TSDB_CODE_SUCCESS;
×
822
}
823

824
static FORCE_INLINE int32_t mInnerJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
825
  return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
826
}
827

828

829
static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
370,397✔
830
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
370,397✔
831
  bool buildGot = false;
370,361✔
832

833
  do {
834
    if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {  
402,563✔
835
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
325,619✔
836
    }
837
    
838
    if (!probeGot) {
402,593✔
839
      mJoinSetDone(pOperator);
76,970✔
840
      return false;
76,971✔
841
    }
842

843
    if (buildGot) {
325,623✔
844
      SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
279,942✔
845
      if (NULL == pProbeCol) {
279,959!
846
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
847
      }
848

849
      SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
279,959✔
850
      if (NULL == pBuildCol) {
279,956!
851
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
852
      }
853

854
      if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
279,956!
855
        pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
32,202✔
856
        buildGot = false;
32,202✔
857
        continue;
32,202✔
858
      }
859
    }
860

861
    break;
293,435✔
862
  } while (true);
863

864
  return true;
293,435✔
865
}
866

867

868
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
123,351✔
869
  SMJoinOperatorInfo* pJoin = pOperator->info;
123,351✔
870
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
123,351✔
871
  int32_t code = TSDB_CODE_SUCCESS;
123,351✔
872
  int64_t probeTs = 0;
123,351✔
873
  int64_t buildTs = 0;
123,351✔
874
  SColumnInfoData* pBuildCol = NULL;
123,351✔
875
  SColumnInfoData* pProbeCol = NULL;
123,351✔
876

877
  blockDataCleanup(pCtx->finBlk);
123,351✔
878

879
  if (pCtx->grpRemains) {
123,359!
880
    MJ_ERR_JRET(mInnerJoinHandleGrpRemains(pCtx));
×
881
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
882
      return pCtx->finBlk;
×
883
    }
884
    pCtx->grpRemains = false;
×
885
  }
886

887
  do {
888
    if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) {
369,459✔
889
      break;
76,521✔
890
    }
891

892
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
292,917✔
893
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
292,918!
894
    
895
    if (probeTs == pCtx->lastEqTs) {
292,921✔
896
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
3,522!
897
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
7,044!
898
        return pCtx->finBlk;
×
899
      }
900

901
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
3,522!
902
        continue;
×
903
      } 
904

905
      MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
3,522✔
906
    } else if (MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
289,399!
907
      mJoinSetDone(pOperator);
45,629✔
908
      break;
45,646✔
909
    }
910

911
    do {
912
      if (probeTs == buildTs) {
11,762,833✔
913
        pCtx->lastEqTs = probeTs;
6,006,482✔
914
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
6,006,482!
915
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
12,012,966✔
916
          return pCtx->finBlk;
1,191✔
917
        }
918

919
        if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
6,005,292!
920
          break;
921
        }
922
        
923
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
5,947,551!
924
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
5,947,552!
925
        continue;
5,947,549✔
926
      }
927

928
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
5,756,351✔
929
        if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
2,965,973✔
930
          MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
2,816,875✔
931
          continue;
2,816,875✔
932
        }
933
      } else {
934
        if (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
2,790,378✔
935
          MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
2,751,117✔
936
          continue;
2,751,117✔
937
        }
938
      }
939
      
940
      break;
188,359✔
941
    } while (true);
942
  } while (true);
943

944
_return:
122,167✔
945

946
  if (code) {
122,167!
947
    pJoin->errCode = code;
×
948
    return NULL;
×
949
  }
950

951
  return pCtx->finBlk;
122,167✔
952
}
953

954
static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
955
  if (pCtx->lastEqGrp) {
×
956
    return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
957
  }
958
  
959
  return pCtx->lastProbeGrp ? mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false) : mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false);
×
960
}
961

962
static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
432✔
963
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
432✔
964
  bool buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
432✔
965
  
966
  if (!probeGot && !buildGot) {
432✔
967
    return false;
189✔
968
  }
969

970
  return true;
243✔
971
}
972

973
static FORCE_INLINE int32_t mFullJoinHashCart(SMJoinMergeCtx* pCtx) {
×
974
  return (NULL == pCtx->pJoin->pPreFilter) ? mOuterJoinHashFullCart(pCtx) : mOuterJoinHashSeqCart(pCtx);
×
975
}
976

977
static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) {
866✔
978
  return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx);
866✔
979
}
980

981
static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) {
982
  SMJoinGrpRows grp = {0};
×
983
  SMJoinRowPos* pPos = taosArrayGet(pGrpRows->pRows, idx);
×
984
  if (NULL == pPos) {
×
985
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
986
  }
987

988
  grp.blk = pPos->pBlk;
×
989
  grp.readIdx = pPos->pos;
×
990
  grp.endIdx = pPos->pos;
×
991
  return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false);
×
992
}
993

994
static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch, bool* grpDone) {
×
995
  int32_t rowNum = taosArrayGetSize(pGrpRows->pRows);
×
996
  for (; pNMatch->rowIdx < rowNum && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) {
×
997
    MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, pNMatch->rowIdx));
×
998
  }
999

1000
  if (pNMatch->rowIdx >= rowNum) {
×
1001
    *grpDone = true;
×
1002
    pNMatch->rowIdx = 0;
×
1003
  }
1004
  
1005
  return TSDB_CODE_SUCCESS;
×
1006
}
1007

1008
static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) {
×
1009
  static const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2};
1010
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1011
  SMJoinNMatchCtx* pNMatch = &build->nMatchCtx;
×
1012
  if (NULL == pNMatch->pGrp) {
×
1013
    pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1014
    pNMatch->bitIdx = 0;
×
1015
  }
1016

1017
  int32_t baseIdx = 0;
×
1018
  while (NULL != pNMatch->pGrp) {
×
1019
    SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)pNMatch->pGrp;
×
1020
    if (pGrpRows->allRowsMatch) {
×
1021
      pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1022
      pNMatch->bitIdx = 0;
×
1023
      continue;
×
1024
    }
1025
  
1026
    if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) {
×
1027
      pGrpRows->allRowsNMatch = true;
×
1028

1029
      bool grpDone = false;      
×
1030
      MJ_ERR_RET(mFullJoinOutputHashGrpRows(pCtx, pGrpRows, pNMatch, &grpDone));
×
1031
      if (BLK_IS_FULL(pCtx->finBlk)) {
×
1032
        if (grpDone) {
×
1033
          pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1034
          pNMatch->bitIdx = 0;      
×
1035
        }
1036
        
1037
        pCtx->nmatchRemains = true;
×
1038
        return TSDB_CODE_SUCCESS;
×
1039
      }
1040

1041
      pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1042
      pNMatch->bitIdx = 0;      
×
1043
      continue;
×
1044
    }
1045

1046
    int32_t grpRowNum = taosArrayGetSize(pGrpRows->pRows);
×
1047
    int32_t bitBytes = BitmapLen(grpRowNum);
×
1048
    for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) {
×
1049
      if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) {
×
1050
        continue;
×
1051
      }
1052

1053
      baseIdx = 8 * pNMatch->bitIdx;
×
1054
      char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx];
×
1055
      while (*v && !BLK_IS_FULL(pCtx->finBlk)) {
×
1056
        uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11];
×
1057
        if (baseIdx + n >= grpRowNum) {
×
1058
          MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
×
1059
          continue;
×
1060
        }
1061

1062
        MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, baseIdx + n));
×
1063
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
×
1064
        if (++pGrpRows->rowMatchNum == taosArrayGetSize(pGrpRows->pRows)) {
×
1065
          pGrpRows->allRowsMatch = true;
×
1066
          pNMatch->bitIdx = bitBytes;
×
1067
          break;
×
1068
        }
1069
      }
1070
  
1071
      if (BLK_IS_FULL(pCtx->finBlk)) {
×
1072
        if (pNMatch->bitIdx == bitBytes) {
×
1073
          pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1074
          pNMatch->bitIdx = 0;      
×
1075
        }
1076

1077
        pCtx->nmatchRemains = true;
×
1078
        return TSDB_CODE_SUCCESS;
×
1079
      }
1080
    }
1081

1082
    pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
×
1083
    pNMatch->bitIdx = 0;
×
1084
  }
1085
  
1086
  pCtx->nmatchRemains = false;
×
1087
  pCtx->lastEqGrp = false;
×
1088
  
1089
  return TSDB_CODE_SUCCESS;
×
1090
}
1091

1092
static FORCE_INLINE int32_t mFullJoinOutputMergeRow(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, int32_t idx) {
1093
  SMJoinGrpRows grp = {0};
123✔
1094
  grp.blk = pGrpRows->blk;
123✔
1095
  grp.readIdx = idx;
123✔
1096
  grp.endIdx = idx;
123✔
1097
  return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false);
123✔
1098
}
1099

1100

1101
static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch, bool* grpDone) {
87✔
1102
  for (; pNMatch->rowIdx <= pGrpRows->endIdx && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) {
186!
1103
    MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, pNMatch->rowIdx));
198!
1104
  }
1105

1106
  if (pNMatch->rowIdx > pGrpRows->endIdx) {
87!
1107
    *grpDone = true;
87✔
1108
    pNMatch->rowIdx = 0;
87✔
1109
  }
1110
  
1111
  return TSDB_CODE_SUCCESS;
87✔
1112
}
1113

1114

1115
static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
174✔
1116
  static const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2};
1117
  SMJoinTableCtx* build = pCtx->pJoin->build;
174✔
1118
  SMJoinNMatchCtx* pNMatch = &build->nMatchCtx;
174✔
1119
  bool grpDone = false;
174✔
1120
  int32_t baseIdx = 0;
174✔
1121
  int32_t rowNum = 0;
174✔
1122
  int32_t grpNum = taosArrayGetSize(build->eqGrps);
174✔
1123
  for (; pNMatch->grpIdx < grpNum; ++pNMatch->grpIdx, pNMatch->bitIdx = 0) {
348✔
1124
    grpDone = false;
174✔
1125
    
1126
    SMJoinGrpRows* pGrpRows = taosArrayGet(build->eqGrps, pNMatch->grpIdx);
174✔
1127
    if (NULL == pGrpRows) {
174!
1128
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1129
    }
1130

1131
    if (pGrpRows->allRowsMatch) {
174✔
1132
      continue;
63✔
1133
    }
1134

1135
    if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) {
111!
1136
      if (!pGrpRows->allRowsNMatch) {
87!
1137
        pGrpRows->allRowsNMatch = true;
87✔
1138
        pNMatch->rowIdx = pGrpRows->beginIdx;
87✔
1139
      }
1140
      
1141
      MJ_ERR_RET(mFullJoinOutputMergeGrpRows(pCtx, pGrpRows, pNMatch, &grpDone));
87!
1142

1143
      if (BLK_IS_FULL(pCtx->finBlk)) {
87!
1144
        if (grpDone) {
×
1145
          ++pNMatch->grpIdx;
×
1146
          pNMatch->bitIdx = 0;
×
1147
        }
1148
        
1149
        pCtx->nmatchRemains = true;
×
1150
        return TSDB_CODE_SUCCESS;
×
1151
      }
1152

1153
      continue;
87✔
1154
    }
1155

1156
    int32_t bitBytes = BitmapLen(pGrpRows->endIdx - pGrpRows->beginIdx + 1);
24✔
1157
    rowNum = pGrpRows->endIdx - pGrpRows->beginIdx + 1;
24✔
1158
    for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) {
48✔
1159
      if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) {
24!
1160
        continue;
×
1161
      }
1162

1163
      baseIdx = 8 * pNMatch->bitIdx;
24✔
1164
      char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx];
24✔
1165
      while (*v && !BLK_IS_FULL(pCtx->finBlk)) {
168!
1166
        uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11];
168✔
1167
        if (pGrpRows->beginIdx + baseIdx + n > pGrpRows->endIdx) {
168✔
1168
          MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
144✔
1169
          continue;
144✔
1170
        }
1171
        
1172
        MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, pGrpRows->beginIdx + baseIdx + n));
48!
1173

1174
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
24✔
1175
        if (++pGrpRows->rowMatchNum == rowNum) {
24!
1176
          pGrpRows->allRowsMatch = true;
24✔
1177
          pNMatch->bitIdx = bitBytes;
24✔
1178
          break;
24✔
1179
        }
1180
      }
1181

1182
      if (BLK_IS_FULL(pCtx->finBlk)) {
24!
1183
        break;
×
1184
      }
1185
    }
1186

1187
    if (BLK_IS_FULL(pCtx->finBlk)) {
24!
1188
      if (pNMatch->bitIdx >= bitBytes) {
×
1189
        ++pNMatch->grpIdx;
×
1190
        pNMatch->bitIdx = 0;
×
1191
      }
1192
      
1193
      pCtx->nmatchRemains = true;
×
1194
      return TSDB_CODE_SUCCESS;
×
1195
    }      
1196
  }
1197

1198
  pCtx->nmatchRemains = false;
174✔
1199
  pCtx->lastEqGrp = false;  
174✔
1200
  
1201
  return TSDB_CODE_SUCCESS;  
174✔
1202
}
1203

1204
static int32_t mFullJoinHandleBuildTableRemains(SMJoinMergeCtx* pCtx) {
174✔
1205
  return pCtx->hashJoin ? mFullJoinHandleHashGrpRemains(pCtx) : mFullJoinHandleMergeGrpRemains(pCtx);
174!
1206
}
1207

1208
SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
189✔
1209
  SMJoinOperatorInfo* pJoin = pOperator->info;
189✔
1210
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
189✔
1211
  int32_t code = TSDB_CODE_SUCCESS;
189✔
1212
  int64_t probeTs = 0;
189✔
1213
  int64_t buildTs = 0;
189✔
1214
  SColumnInfoData* pBuildCol = NULL;
189✔
1215
  SColumnInfoData* pProbeCol = NULL;
189✔
1216

1217
  blockDataCleanup(pCtx->finBlk);
189✔
1218

1219
  if (pCtx->midRemains) {
189!
1220
    MJ_ERR_JRET(mJoinHandleMidRemains(pCtx));
×
1221
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1222
      return pCtx->finBlk;
×
1223
    }
1224
    pCtx->midRemains = false;
×
1225
  }
1226

1227
  if (pCtx->grpRemains) {
189!
1228
    MJ_ERR_JRET(mFullJoinHandleGrpRemains(pCtx));
×
1229
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1230
      return pCtx->finBlk;
×
1231
    }
1232
    pCtx->grpRemains = false;
×
1233
  }
1234

1235
  if (pCtx->nmatchRemains) {
189!
1236
    MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1237
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1238
      return pCtx->finBlk;
×
1239
    }
1240
  }
1241

1242
  do {
1243
    if (!mFullJoinRetrieve(pOperator, pJoin)) {
433✔
1244
      if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
189✔
1245
        MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
24!
1246
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
48!
1247
          return pCtx->finBlk;
×
1248
        }
1249
      }
1250

1251
      mJoinSetDone(pOperator);      
189✔
1252
      break;
189✔
1253
    }
1254

1255
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
243!
1256
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
243!
1257
    
1258
    if (probeTs == pCtx->lastEqTs) {
243!
1259
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
1260
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1261
        return pCtx->finBlk;
×
1262
      }
1263

1264
      if (FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
1265
        continue;
×
1266
      } else {
1267
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
1268
      }
1269
    }
1270

1271
    if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
243!
1272
      MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
12!
1273
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
24!
1274
        return pCtx->finBlk;
×
1275
      }
1276
    }
1277

1278
    while (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
1,223✔
1279
      if (probeTs == buildTs) {
979✔
1280
        pCtx->lastEqTs = probeTs;
865✔
1281
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
865!
1282
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
1,732!
1283
          return pCtx->finBlk;
×
1284
        }
1285

1286
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
866!
1287
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
866!
1288

1289
        if (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && probeTs != pCtx->lastEqTs && pJoin->build->rowBitmapSize > 0) {
866!
1290
          MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
138!
1291
          if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
276!
1292
            return pCtx->finBlk;
×
1293
          }
1294
        }
1295

1296
        continue;
866✔
1297
      }
1298

1299
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
114!
1300
        MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
75!
1301
      } else {
1302
        MJ_ERR_JRET(mJoinProcessGreaterGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
39!
1303
      }
1304

1305
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
228!
1306
        return pCtx->finBlk;
×
1307
      }
1308
    }
1309

1310
    if (pJoin->build->dsFetchDone && !FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
244!
1311
      if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
3!
1312
        MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1313
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1314
          return pCtx->finBlk;
×
1315
        }
1316
      }
1317
      
1318
      pCtx->probeNEqGrp.blk = pJoin->probe->blk;
3✔
1319
      pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
3✔
1320
      pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
3✔
1321
      pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
3✔
1322
      
1323
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
3✔
1324
            
1325
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
3!
1326
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
6!
1327
        return pCtx->finBlk;
×
1328
      }
1329
    }
1330

1331
    if (pJoin->probe->dsFetchDone && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
244!
1332
      if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
51!
1333
        MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
×
1334
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1335
          return pCtx->finBlk;
×
1336
        }
1337
      }
1338

1339
      pCtx->buildNEqGrp.blk = pJoin->build->blk;
51✔
1340
      pCtx->buildNEqGrp.beginIdx = pJoin->build->blkRowIdx;
51✔
1341
      pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
51✔
1342
      pCtx->buildNEqGrp.endIdx = pJoin->build->blk->info.rows - 1;
51✔
1343
      
1344
      pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
51✔
1345
            
1346
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false));
51!
1347
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
102!
1348
        return pCtx->finBlk;
×
1349
      }
1350
    }
1351

1352
  } while (true);
1353

1354
_return:
189✔
1355

1356
  if (code) {
189!
1357
    pJoin->errCode = code;
×
1358
    return NULL;
×
1359
  }
1360

1361
  return pCtx->finBlk;
189✔
1362
}
1363

1364

1365
static int32_t mSemiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp) {
×
1366
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1367
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1368
  
1369
  do {
1370
    blockDataCleanup(pCtx->midBlk);
×
1371

1372
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
×
1373

1374
    if (pCtx->midBlk->info.rows > 0) {
×
1375
      MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pPreFilter));
×
1376
    }
1377

1378
    if (pCtx->midBlk->info.rows <= 0) {
×
1379
      if (build->grpRowIdx < 0) {
×
1380
        break;
×
1381
      }
1382
      
1383
      continue;
×
1384
    }
1385

1386
    //A S S E R T(1 == pCtx->midBlk->info.rows);
1387
    MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
×
1388
    //A S S E R T(false == pCtx->midRemains);
1389
    
1390
    break;
×
1391
  } while (true);
1392

1393
  return TSDB_CODE_SUCCESS;
×
1394
}
1395

1396

1397
static int32_t mSemiJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
×
1398
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1399
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1400
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
×
1401
  if (NULL == probeGrp) {
×
1402
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1403
  }
1404

1405
  size_t bufLen = 0;
×
1406
  int32_t probeEndIdx = probeGrp->endIdx;
×
1407
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); probeGrp->readIdx++) {
×
1408
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
1409
      continue;
×
1410
    }
1411

1412
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
1413
    if (NULL == pGrp) {
×
1414
      continue;
×
1415
    }
1416

1417
    build->pHashCurGrp = *(SArray**)pGrp;
×
1418
    build->grpRowIdx = 0;
×
1419

1420
    probeGrp->endIdx = probeGrp->readIdx;      
×
1421
    MJ_ERR_RET(mSemiJoinHashGrpCartFilter(pCtx, probeGrp));
×
1422
    probeGrp->endIdx = probeEndIdx;
×
1423
  }
1424

1425
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1426

1427
  return TSDB_CODE_SUCCESS;
×
1428
}
1429

1430

1431
static int32_t mSemiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
×
1432
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1433
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1434
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
1435
  if (NULL == probeGrp) {
×
1436
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1437
  }
1438

1439
  size_t bufLen = 0;
×
1440

1441
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
×
1442
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
1443
      continue;
×
1444
    }
1445

1446
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
1447
    if (NULL == pGrp) {
×
1448
      continue;
×
1449
    }
1450

1451
    build->pHashCurGrp = *(SArray**)pGrp;
×
1452
    //A S S E R T(1 == taosArrayGetSize(build->pHashCurGrp));
1453
    build->grpRowIdx = 0;
×
1454
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->finBlk, probeGrp, true, probe, build, NULL));
×
1455
    //A S S E R T(build->grpRowIdx < 0);
1456
  }
1457

1458
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1459

1460
  return TSDB_CODE_SUCCESS;
×
1461
}
1462

1463

1464
static int32_t mSemiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
438✔
1465
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
438✔
1466
  SMJoinTableCtx* build = pCtx->pJoin->build;
438✔
1467
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
438✔
1468
  if (NULL == probeGrp) {
438!
1469
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1470
  }
1471

1472
  SMJoinGrpRows* buildGrp = NULL;
438✔
1473
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
438✔
1474
  int32_t probeEndIdx = probeGrp->endIdx;
438✔
1475
  int32_t rowsLeft = pCtx->midBlk->info.capacity;  
438✔
1476

1477
  do {
1478
    for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); 
1,338!
1479
      ++probeGrp->readIdx, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
900✔
1480
      probeGrp->endIdx = probeGrp->readIdx;
900✔
1481
      
1482
      rowsLeft = pCtx->midBlk->info.capacity;
900✔
1483

1484
      blockDataCleanup(pCtx->midBlk);      
900✔
1485
      for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
1,800!
1486
        buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
900✔
1487
        if (NULL == buildGrp) {
900!
1488
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1489
        }
1490

1491
        if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
900!
1492
          MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
900!
1493
          rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
900✔
1494
          buildGrp->readIdx = buildGrp->beginIdx;
900✔
1495
          continue;
900✔
1496
        }
1497
        
1498
        int32_t buildEndIdx = buildGrp->endIdx;
×
1499
        buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
1500
        //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
1501
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
1502
        buildGrp->readIdx += rowsLeft;
×
1503
        buildGrp->endIdx = buildEndIdx;
×
1504
        break;
×
1505
      }
1506

1507
      if (pCtx->midBlk->info.rows > 0) {
900!
1508
        MJ_ERR_RET(mJoinFilterAndKeepSingleRow(pCtx->midBlk, pCtx->pJoin->pFPreFilter));
900!
1509
      } 
1510

1511
      if (0 == pCtx->midBlk->info.rows) {
900✔
1512
        if (build->grpIdx == buildGrpNum) {
324!
1513
          continue;
324✔
1514
        }
1515
      } else {
1516
        //A S S E R T(1 == pCtx->midBlk->info.rows);
1517
        MJ_ERR_RET(mJoinCopyMergeMidBlk(pCtx, &pCtx->midBlk, &pCtx->finBlk));
576!
1518
        //A S S E R T(false == pCtx->midRemains);
1519

1520
        if (build->grpIdx == buildGrpNum) {
576!
1521
          continue;
576✔
1522
        }
1523

1524
        buildGrp->readIdx = buildGrp->beginIdx;        
×
1525
        continue;
×
1526
      }
1527

1528
      //need break
1529

1530
      probeGrp->endIdx = probeEndIdx;
×
1531
      break;
×
1532
    }
1533

1534
    if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
438!
1535
      break;
1536
    }
1537
  } while (true);
1538

1539
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
438✔
1540

1541
  return TSDB_CODE_SUCCESS;
438✔
1542
}
1543

1544

1545
static int32_t mSemiJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
1,600✔
1546
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
1,600✔
1547
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
1,600✔
1548
  SMJoinTableCtx* build = pCtx->pJoin->build;
1,600✔
1549
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
1,600✔
1550
  SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, 0);
1,600✔
1551
  if (NULL == buildGrp || NULL == probeGrp) {
1,600!
1552
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1553
  }
1554

1555
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
1,600✔
1556
  int32_t probeEndIdx = probeGrp->endIdx;
1,600✔
1557

1558
  //A S S E R T(1 == taosArrayGetSize(build->eqGrps));
1559
  //A S S E R T(buildGrp->beginIdx == buildGrp->endIdx);
1560

1561
  if (probeRows <= rowsLeft) {
1,600!
1562
    MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
1,600!
1563

1564
    pCtx->grpRemains = false;
1,600✔
1565
    return TSDB_CODE_SUCCESS;
1,600✔
1566
  }
1567

1568
  probeGrp->endIdx = probeGrp->readIdx + rowsLeft - 1;
×
1569
  MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
×
1570
  probeGrp->readIdx = probeGrp->endIdx + 1; 
×
1571
  probeGrp->endIdx = probeEndIdx;
×
1572

1573
  pCtx->grpRemains = true;
×
1574
  
1575
  return TSDB_CODE_SUCCESS;  
×
1576
}
1577

1578

1579
static int32_t mSemiJoinHashCart(SMJoinMergeCtx* pCtx) {
×
1580
  return (NULL == pCtx->pJoin->pPreFilter) ? mSemiJoinHashFullCart(pCtx) : mSemiJoinHashSeqCart(pCtx);
×
1581
}
1582

1583
static int32_t mSemiJoinMergeCart(SMJoinMergeCtx* pCtx) {
2,038✔
1584
  return (NULL == pCtx->pJoin->pFPreFilter) ? mSemiJoinMergeFullCart(pCtx) : mSemiJoinMergeSeqCart(pCtx);
2,038✔
1585
}
1586

1587
static FORCE_INLINE int32_t mSemiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
1588
  return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
1589
}
1590

1591

1592
SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) {
489✔
1593
  SMJoinOperatorInfo* pJoin = pOperator->info;
489✔
1594
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
489✔
1595
  int32_t code = TSDB_CODE_SUCCESS;
489✔
1596
  int64_t probeTs = 0;
489✔
1597
  int64_t buildTs = 0;
489✔
1598
  SColumnInfoData* pBuildCol = NULL;
489✔
1599
  SColumnInfoData* pProbeCol = NULL;
489✔
1600

1601
  blockDataCleanup(pCtx->finBlk);
489✔
1602

1603
  if (pCtx->grpRemains) {
490!
1604
    MJ_ERR_JRET(mSemiJoinHandleGrpRemains(pCtx));
×
1605
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1606
      return pCtx->finBlk;
×
1607
    }
1608
    pCtx->grpRemains = false;
×
1609
  }
1610

1611
  do {
1612
    if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) {
956✔
1613
      break;
448✔
1614
    }
1615

1616
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
508!
1617
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
508!
1618
    
1619
    if (probeTs == pCtx->lastEqTs) {
508!
1620
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
1621
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1622
        return pCtx->finBlk;
×
1623
      }
1624

1625
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
×
1626
        continue;
×
1627
      } 
1628

1629
      MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
1630
    } else if (MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
508!
1631
      mJoinSetDone(pOperator);
42✔
1632
      break;
42✔
1633
    }
1634

1635
    do {
1636
      if (probeTs == buildTs) {
2,404✔
1637
        pCtx->lastEqTs = probeTs;
2,038✔
1638
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
2,038!
1639
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
4,076!
1640
          return pCtx->finBlk;
×
1641
        }
1642

1643
        if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) || MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
2,038!
1644
          break;
1645
        }
1646
        
1647
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
1,623!
1648
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
1,623!
1649
        continue;
1,623✔
1650
      }
1651

1652
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
366✔
1653
        if (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
228✔
1654
          MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
183✔
1655
          continue;
183✔
1656
        }
1657
      } else {
1658
        if (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
138✔
1659
          MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
132✔
1660
          continue;
132✔
1661
        }
1662
      }
1663
      
1664
      break;
51✔
1665
    } while (true);
1666
  } while (true);
1667

1668
_return:
490✔
1669

1670
  if (code) {
490!
1671
    pJoin->errCode = code;
×
1672
    return NULL;
×
1673
  }
1674

1675
  return pCtx->finBlk;
490✔
1676
}
1677

1678

1679
static FORCE_INLINE int32_t mAntiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
1680
  if (pCtx->lastEqGrp) {
×
1681
    return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
×
1682
  }
1683
  
1684
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
×
1685
}
1686

1687
static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
×
1688
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1689
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1690
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
×
1691
  if (NULL == probeGrp) {
×
1692
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1693
  }
1694

1695
  size_t bufLen = 0;
×
1696
  int32_t probeEndIdx = probeGrp->endIdx;
×
1697

1698
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); ++probeGrp->readIdx) {
×
1699
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
1700
      probeGrp->endIdx = probeGrp->readIdx;
×
1701
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1702
      probeGrp->endIdx = probeEndIdx;
×
1703
      continue;
×
1704
    }
1705

1706
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
1707
    if (NULL == pGrp) {
×
1708
      probeGrp->endIdx = probeGrp->readIdx;
×
1709
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1710
      probeGrp->endIdx = probeEndIdx;
×
1711
    }
1712
  }
1713

1714
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1715

1716
  return TSDB_CODE_SUCCESS;
×
1717
}
1718

1719

1720
static int32_t mAntiJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, SMJoinGrpRows* probeGrp) {
×
1721
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1722
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1723
  
1724
  do {
1725
    blockDataCleanup(pCtx->midBlk);
×
1726

1727
    MJ_ERR_RET(mJoinHashGrpCart(pCtx->midBlk, probeGrp, true, probe, build, NULL));
×
1728

1729
    if (pCtx->midBlk->info.rows > 0) {
×
1730
      MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pPreFilter));
×
1731
    } 
1732

1733
    if (pCtx->midBlk->info.rows) {
×
1734
      break;
×
1735
    }
1736
    
1737
    if (build->grpRowIdx < 0) {
×
1738
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1739
      break;
×
1740
    }
1741
    
1742
    continue;
×
1743
  } while (true);
1744

1745
  return TSDB_CODE_SUCCESS;
×
1746
}
1747

1748

1749
static int32_t mAntiJoinHashSeqCart(SMJoinMergeCtx* pCtx) {
×
1750
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
×
1751
  SMJoinTableCtx* build = pCtx->pJoin->build;
×
1752
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, 0);
×
1753
  if (NULL == probeGrp) {
×
1754
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1755
  }
1756

1757
  size_t bufLen = 0;
×
1758
  int32_t probeEndIdx = probeGrp->endIdx;
×
1759

1760
  for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); probeGrp->readIdx++) {
×
1761
    if (mJoinCopyKeyColsDataToBuf(probe, probeGrp->readIdx, &bufLen)) {
×
1762
      probeGrp->endIdx = probeGrp->readIdx;
×
1763
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1764
      probeGrp->endIdx = probeEndIdx;
×
1765
      continue;
×
1766
    }
1767

1768
    void* pGrp = tSimpleHashGet(build->pGrpHash, probe->keyData, bufLen);
×
1769
    if (NULL == pGrp) {
×
1770
      probeGrp->endIdx = probeGrp->readIdx;
×
1771
      MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
×
1772
      probeGrp->endIdx = probeEndIdx;
×
1773
      continue;
×
1774
    }
1775

1776
    build->pHashCurGrp = *(SArray**)pGrp;
×
1777
    build->grpRowIdx = 0;
×
1778

1779
    probeGrp->endIdx = probeGrp->readIdx;      
×
1780
    MJ_ERR_RET(mAntiJoinHashGrpCartFilter(pCtx, probeGrp));
×
1781
    probeGrp->endIdx = probeEndIdx;
×
1782
  }
1783

1784
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
1785

1786
  return TSDB_CODE_SUCCESS;
×
1787
}
1788

1789
static int32_t mAntiJoinMergeFullCart(SMJoinMergeCtx* pCtx) {
1,474✔
1790
  return TSDB_CODE_SUCCESS;
1,474✔
1791
}
1792

1793
static int32_t mAntiJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
534✔
1794
  SMJoinTableCtx* probe = pCtx->pJoin->probe;
534✔
1795
  SMJoinTableCtx* build = pCtx->pJoin->build;
534✔
1796
  SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
534✔
1797
  if (NULL == probeGrp) {
534!
1798
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1799
  }
1800

1801
  SMJoinGrpRows* buildGrp = NULL;
534✔
1802
  int32_t buildGrpNum = taosArrayGetSize(build->eqGrps);
534✔
1803
  int32_t probeEndIdx = probeGrp->endIdx;
534✔
1804
  int32_t rowsLeft = pCtx->midBlk->info.capacity;  
534✔
1805

1806
  do {
1807
    for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk); 
1,584!
1808
      ++probeGrp->readIdx, probeGrp->endIdx = probeEndIdx, build->grpIdx = 0) {
1,050✔
1809
      probeGrp->endIdx = probeGrp->readIdx;
1,050✔
1810
      
1811
      rowsLeft = pCtx->midBlk->info.capacity;
1,050✔
1812

1813
      blockDataCleanup(pCtx->midBlk);      
1,050✔
1814
      for (; build->grpIdx < buildGrpNum && rowsLeft > 0; ++build->grpIdx) {
2,100!
1815
        buildGrp = taosArrayGet(build->eqGrps, build->grpIdx);
1,050✔
1816
        if (NULL == buildGrp) {
1,050!
1817
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1818
        }
1819

1820
        if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
1,050!
1821
          MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
1,050!
1822
          rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
1,050✔
1823
          buildGrp->readIdx = buildGrp->beginIdx;
1,050✔
1824
          continue;
1,050✔
1825
        }
1826
        
1827
        int32_t buildEndIdx = buildGrp->endIdx;
×
1828
        buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
×
1829
        //A S S E R T(buildGrp->endIdx >= buildGrp->readIdx);
1830
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->midBlk, true, probeGrp, buildGrp));
×
1831
        buildGrp->readIdx += rowsLeft;
×
1832
        buildGrp->endIdx = buildEndIdx;
×
1833
        break;
×
1834
      }
1835

1836
      if (pCtx->midBlk->info.rows > 0) {
1,050!
1837
        MJ_ERR_RET(mJoinFilterAndNoKeepRows(pCtx->midBlk, pCtx->pJoin->pFPreFilter));
1,050!
1838
      } 
1839

1840
      if (pCtx->midBlk->info.rows > 0) {
1,050✔
1841
        if (build->grpIdx < buildGrpNum) {
609!
1842
          buildGrp->readIdx = buildGrp->beginIdx;        
×
1843
        }
1844

1845
        continue;
609✔
1846
      }
1847
      
1848
      if (build->grpIdx >= buildGrpNum) {
441!
1849
        MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, true));
441!
1850
        continue;
441✔
1851
      }
1852

1853
      //need break
1854

1855
      probeGrp->endIdx = probeEndIdx;
×
1856
      break;
×
1857
    }
1858

1859
    if (GRP_DONE(probeGrp) || BLK_IS_FULL(pCtx->finBlk)) {
534!
1860
      break;
1861
    }
1862
  } while (true);
1863

1864
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
534✔
1865

1866
  return TSDB_CODE_SUCCESS;
534✔
1867
}
1868

1869

1870
static int32_t mAntiJoinHashCart(SMJoinMergeCtx* pCtx) {
×
1871
  return (NULL == pCtx->pJoin->pPreFilter) ? mAntiJoinHashFullCart(pCtx) : mAntiJoinHashSeqCart(pCtx);
×
1872
}
1873

1874
static int32_t mAntiJoinMergeCart(SMJoinMergeCtx* pCtx) {
2,008✔
1875
  return (NULL == pCtx->pJoin->pFPreFilter) ? mAntiJoinMergeFullCart(pCtx) : mAntiJoinMergeSeqCart(pCtx);
2,008✔
1876
}
1877

1878
SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
478✔
1879
  SMJoinOperatorInfo* pJoin = pOperator->info;
478✔
1880
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
478✔
1881
  int32_t code = TSDB_CODE_SUCCESS;
478✔
1882
  int64_t probeTs = 0;
478✔
1883
  int64_t buildTs = 0;
478✔
1884
  SColumnInfoData* pBuildCol = NULL;
478✔
1885
  SColumnInfoData* pProbeCol = NULL;
478✔
1886

1887
  blockDataCleanup(pCtx->finBlk);
478✔
1888

1889
  if (pCtx->grpRemains) {
478!
1890
    MJ_ERR_JRET(mAntiJoinHandleGrpRemains(pCtx));
×
1891
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1892
      return pCtx->finBlk;
×
1893
    }
1894
    pCtx->grpRemains = false;
×
1895
  }
1896

1897
  do {
1898
    if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
962✔
1899
      break;
478✔
1900
    }
1901

1902
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
484!
1903
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
484!
1904
    
1905
    if (probeTs == pCtx->lastEqTs) {
484!
1906
      MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
×
1907
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
1908
        return pCtx->finBlk;
×
1909
      }
1910

1911
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
1912
        continue;
×
1913
      } else {
1914
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
1915
      }
1916
    }
1917

1918
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
2,657✔
1919
      if (probeTs == buildTs) {
2,173✔
1920
        pCtx->lastEqTs = probeTs;
2,008✔
1921
        MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
2,008!
1922
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
4,016!
1923
          return pCtx->finBlk;
×
1924
        }
1925

1926
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
2,008!
1927
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
2,008!
1928
      } else if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
165✔
1929
        MJ_ERR_JRET(mJoinProcessLowerGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
99!
1930
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
198!
1931
          return pCtx->finBlk;
×
1932
        }
1933
      } else {
1934
        while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
66✔
1935
          MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
60✔
1936
          if (PROBE_TS_NREACH(pCtx->ascTs, probeTs, buildTs)) {
60!
1937
            continue;
×
1938
          }
1939
          
1940
          break;
60✔
1941
        }
1942
      }
1943
    }
1944

1945
    if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
484✔
1946
      pCtx->probeNEqGrp.blk = pJoin->probe->blk;
51✔
1947
      pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
51✔
1948
      pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
51✔
1949
      pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
51✔
1950
      
1951
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
51✔
1952
            
1953
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
51!
1954
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
102!
1955
        return pCtx->finBlk;
×
1956
      }
1957
    }
1958
  } while (true);
1959

1960
_return:
478✔
1961

1962
  if (code) {
478!
1963
    pJoin->errCode = code;
×
1964
    return NULL;
×
1965
  }
1966

1967
  return pCtx->finBlk;
478✔
1968
}
1969

1970

1971
int32_t mAsofBackwardCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) {
1,869✔
1972
  if (pCache->outBlk->info.rows <= 0) {
1,869✔
1973
    *evictRows = 0;
693✔
1974
    return TMIN(jLimit, newRows);
693✔
1975
  }
1976

1977
  if ((pCache->outBlk->info.rows + newRows) <= jLimit) {
1,176✔
1978
    *evictRows = 0;
27✔
1979
    return newRows;
27✔
1980
  }
1981

1982
  if (newRows >= jLimit) {
1,149✔
1983
    *evictRows = pCache->outBlk->info.rows;
1,110✔
1984
    return jLimit;
1,110✔
1985
  }
1986

1987
  *evictRows = pCache->outBlk->info.rows + newRows - jLimit;
39✔
1988
  return newRows;
39✔
1989
}
1990

1991
int32_t mAsofBackwardAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) {
1,869✔
1992
  int32_t evictRows = 0;
1,869✔
1993
  SMJoinWinCache* pCache = &pCtx->cache;
1,869✔
1994
  int32_t rows = mAsofBackwardCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows);
1,869✔
1995
  if (evictRows > 0) {
1,869✔
1996
    MJ_ERR_RET(blockDataTrimFirstRows(pCache->outBlk, evictRows));
1,149!
1997
  }
1998

1999
  int32_t startIdx = fromBegin ? pGrp->beginIdx : pGrp->endIdx - rows + 1;
1,869✔
2000
  return blockDataMergeNRows(pCache->outBlk, pGrp->blk, startIdx, rows);
1,869✔
2001
}
2002

2003

2004
int32_t mAsofBackwardAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) {
1,731✔
2005
  int64_t eqRowsNum = 0;
1,731✔
2006
  SMJoinGrpRows grp;
2007

2008
  do {
×
2009
      grp.blk = pTable->blk;
1,731✔
2010
      
2011
      SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
1,731✔
2012
      if (NULL == pCol) {
1,731!
2013
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2014
      }
2015

2016
      if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
1,731!
2017
        return TSDB_CODE_SUCCESS;
×
2018
      }
2019

2020
      grp.beginIdx = pTable->blkRowIdx;
1,731✔
2021
      
2022
      char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
1,731✔
2023
      if (timestamp != *(int64_t*)pEndVal) {
1,731✔
2024
        for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
2,679!
2025
          char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
2,679✔
2026
          if (timestamp == *(int64_t*)pNextVal) {
2,679✔
2027
            continue;
1,512✔
2028
          }
2029

2030
          break;
1,167✔
2031
        }
2032

2033
        grp.endIdx = pTable->blkRowIdx - 1;
1,167✔
2034
      } else {
2035
        grp.endIdx = pTable->blk->info.rows - 1;
564✔
2036
        pTable->blkRowIdx = pTable->blk->info.rows;
564✔
2037
      }
2038

2039
      if (eqRowsNum < pCtx->jLimit) {
1,731!
2040
        grp.endIdx = grp.beginIdx + TMIN(grp.endIdx - grp.beginIdx + 1, pCtx->jLimit - eqRowsNum) - 1;
1,731✔
2041
        MJ_ERR_RET(mAsofBackwardAddRowsToCache(pCtx, &grp, true));
1,731!
2042
      }
2043
      
2044
      eqRowsNum += grp.endIdx - grp.beginIdx + 1;
1,731✔
2045

2046
    if (pTable->blkRowIdx == pTable->blk->info.rows && !pTable->dsFetchDone) {
1,731!
2047
      pTable->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pTable);
564✔
2048
      qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
564!
2049

2050
      pTable->blkRowIdx = 0;
564✔
2051
      pCtx->buildGrp.blk = pTable->blk;
564✔
2052

2053
      if (NULL == pTable->blk) {
564!
2054
        break;
564✔
2055
      }    
2056
    } else {
2057
      break;
2058
    }
2059
  } while (true);
2060

2061
  return TSDB_CODE_SUCCESS;
1,731✔
2062
}
2063

2064
int32_t mAsofBackwardDumpGrpCache(SMJoinWindowCtx* pCtx) {
1,992✔
2065
  if (NULL == pCtx->cache.outBlk || pCtx->cache.outBlk->info.rows <= 0) {
1,992✔
2066
    return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false);
147✔
2067
  }
2068

2069
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
1,845✔
2070
  SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
1,845✔
2071
  SMJoinGrpRows buildGrp = {.blk = pCtx->cache.outBlk, .readIdx = pCtx->cache.outRowIdx, .endIdx = pCtx->cache.outBlk->info.rows - 1};
1,845✔
2072
  int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
1,845✔
2073
  int32_t probeEndIdx = probeGrp->endIdx;
1,845✔
2074
  int64_t totalResRows = (0 == pCtx->cache.outRowIdx) ? (probeRows * pCtx->cache.outBlk->info.rows) : 
1,845!
2075
    (pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx + (probeRows - 1) * pCtx->cache.outBlk->info.rows);
×
2076

2077
  if (totalResRows <= rowsLeft) {
1,845!
2078
    if (0 == pCtx->cache.outRowIdx) {
1,845!
2079
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
1,845!
2080

2081
      pCtx->grpRemains = false;
1,845✔
2082
      pCtx->cache.outRowIdx = 0;
1,845✔
2083
      return TSDB_CODE_SUCCESS;
1,845✔
2084
    }
2085

2086
    probeGrp->endIdx = probeGrp->readIdx;
×
2087
    MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2088
    if (++probeGrp->readIdx <= probeEndIdx) {
×
2089
      probeGrp->endIdx = probeEndIdx;
×
2090
      buildGrp.readIdx = 0;
×
2091
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2092
    }
2093
    
2094
    pCtx->grpRemains = false;
×
2095
    pCtx->cache.outRowIdx = 0;
×
2096
    return TSDB_CODE_SUCCESS;
×
2097
  }
2098

2099
  for (; !GRP_DONE(probeGrp) && rowsLeft > 0; ) {
×
2100
    if (0 == pCtx->cache.outRowIdx) {
×
2101
      int32_t grpNum = rowsLeft / pCtx->cache.outBlk->info.rows;
×
2102
      if (grpNum > 0) {
×
2103
        probeGrp->endIdx = probeGrp->readIdx + grpNum - 1;
×
2104
        buildGrp.readIdx = 0;
×
2105
        MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2106
        rowsLeft -= grpNum * pCtx->cache.outBlk->info.rows;
×
2107
        probeGrp->readIdx += grpNum;
×
2108
        probeGrp->endIdx = probeEndIdx;
×
2109
        continue;
×
2110
      }
2111
    }
2112
    
2113
    probeGrp->endIdx = probeGrp->readIdx;
×
2114
    buildGrp.readIdx = pCtx->cache.outRowIdx;
×
2115
    
2116
    int32_t grpRemainRows = pCtx->cache.outBlk->info.rows - pCtx->cache.outRowIdx;
×
2117
    if (rowsLeft >= grpRemainRows) {
×
2118
      MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2119
      rowsLeft -= grpRemainRows;
×
2120
      pCtx->cache.outRowIdx = 0;
×
2121
      probeGrp->readIdx++;
×
2122
      probeGrp->endIdx = probeEndIdx;
×
2123
      continue;
×
2124
    }
2125
    
2126
    buildGrp.endIdx = buildGrp.readIdx + rowsLeft - 1;
×
2127
    MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
×
2128
    pCtx->cache.outRowIdx += rowsLeft;
×
2129
    break;
×
2130
  }
2131

2132
  probeGrp->endIdx = probeEndIdx;
×
2133
  pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
×
2134
  
2135
  return TSDB_CODE_SUCCESS;  
×
2136
}
2137

2138
int32_t mAsofBackwardDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, bool lastBuildGrp, bool skipEqPost) {
1,797✔
2139
  if (!pCtx->eqRowsAcq) {
1,797✔
2140
    MJ_ERR_RET(mAsofBackwardDumpGrpCache(pCtx));
261!
2141

2142
    pCtx->lastEqGrp = true;
261✔
2143
    if (pCtx->grpRemains) {
261!
2144
      return TSDB_CODE_SUCCESS;
×
2145
    }
2146
  }
2147

2148
  if (!pCtx->eqPostDone && !lastBuildGrp && (pCtx->eqRowsAcq || !skipEqPost)) {
1,797!
2149
    pCtx->eqPostDone = true;
1,731✔
2150
    MJ_ERR_RET(mAsofBackwardAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
1,731!
2151
  }
2152

2153
  if (!pCtx->eqRowsAcq) {
1,797✔
2154
    return TSDB_CODE_SUCCESS;
261✔
2155
  }
2156

2157
  MJ_ERR_RET(mAsofBackwardDumpGrpCache(pCtx));
1,536!
2158

2159
  pCtx->lastEqGrp = true;
1,536✔
2160

2161
  return TSDB_CODE_SUCCESS;
1,536✔
2162
}
2163

2164
int32_t mAsofBackwardProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
1,797✔
2165
  SMJoinOperatorInfo* pJoin = pCtx->pJoin;
1,797✔
2166

2167
  if (!lastBuildGrp) {
1,797!
2168
    pCtx->eqPostDone = false;
1,797✔
2169
  }
2170

2171
  bool wholeBlk = false;
1,797✔
2172
  MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp));
1,797!
2173

2174
  MJ_ERR_RET(mAsofBackwardDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp, wholeBlk));
1,797!
2175
  
2176
  return TSDB_CODE_SUCCESS;
1,797✔
2177
}
2178

2179

2180
int32_t mAsofBackwardHandleClosedGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol,  int64_t* probeTs, int64_t* buildTs) {
117✔
2181
  pCtx->lastEqGrp = false;
117✔
2182
  
2183
  pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
117✔
2184
  pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
117✔
2185
  pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
117✔
2186
  
2187
  while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
168✔
2188
    MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe);
114✔
2189
    if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) {
114!
2190
      pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx;
51✔
2191
      continue;
51✔
2192
    }
2193
    
2194
    break;
63✔
2195
  }
2196

2197
  return mAsofBackwardDumpGrpCache(pCtx);
117✔
2198
}
2199

2200
int32_t mAsofBackwardHandleUnclosedGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol,  int64_t* probeTs, int64_t* buildTs) {
138✔
2201
  pCtx->lastEqGrp = false;
138✔
2202

2203
  pCtx->buildGrp.beginIdx = pJoin->build->blkRowIdx;
138✔
2204
  pCtx->buildGrp.readIdx = pCtx->buildGrp.beginIdx;
138✔
2205
  pCtx->buildGrp.endIdx = pCtx->buildGrp.beginIdx;
138✔
2206
  
2207
  while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
174✔
2208
    MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pJoin->build);
126✔
2209
    if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
126!
2210
      pCtx->buildGrp.endIdx = pJoin->build->blkRowIdx;
36✔
2211
      continue;
36✔
2212
    }
2213
    
2214
    break;
90✔
2215
  }
2216

2217
  pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
138✔
2218
  pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
138✔
2219
  pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
138✔
2220

2221
  return mAsofBackwardAddRowsToCache(pCtx, &pCtx->buildGrp, false);
138✔
2222
}
2223

2224
int32_t mAsofBackwardHandleGrpRemains(SMJoinWindowCtx* pCtx) {
×
2225
  return (pCtx->lastEqGrp) ? mAsofBackwardDumpUpdateEqRows(pCtx, pCtx->pJoin, false, true) : mAsofBackwardDumpGrpCache(pCtx);
×
2226
}
2227

2228
static int32_t mAsofBackwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
1,578✔
2229
  *newBlock = false;
1,578✔
2230
  
2231
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
1,578✔
2232
  bool buildGot = false;
1,578✔
2233

2234
  do {
2235
    if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {  
1,578!
2236
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
813✔
2237
    }
2238
    
2239
    if (!probeGot) {
1,578✔
2240
      if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
765✔
2241
        mJoinSetDone(pOperator);
372✔
2242
      }
2243

2244
      return TSDB_CODE_SUCCESS;
765✔
2245
    }
2246
    
2247
    break;
813✔
2248
  } while (true);
2249

2250
  if (buildGot && NULL == pCtx->cache.outBlk) {
813✔
2251
    pCtx->cache.outBlk = NULL;
369✔
2252
    int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk);
369✔
2253
    if (code) {
369!
2254
      MJ_ERR_RET(code);
×
2255
    }
2256

2257
    MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
369!
2258
  }
2259

2260
  pCtx->probeGrp.blk = pJoin->probe->blk;
813✔
2261
  pCtx->buildGrp.blk = pJoin->build->blk;
813✔
2262

2263
  *newBlock = true;
813✔
2264
  
2265
  return TSDB_CODE_SUCCESS;
813✔
2266
}
2267

2268

2269
SSDataBlock* mAsofBackwardJoinDo(struct SOperatorInfo* pOperator) {
765✔
2270
  SMJoinOperatorInfo* pJoin = pOperator->info;
765✔
2271
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
765✔
2272
  int32_t code = TSDB_CODE_SUCCESS;
765✔
2273
  int64_t probeTs = 0;
765✔
2274
  int64_t buildTs = 0;
765✔
2275
  SColumnInfoData* pBuildCol = NULL;
765✔
2276
  SColumnInfoData* pProbeCol = NULL;
765✔
2277
  bool newBlock = false;
765✔
2278

2279
  blockDataCleanup(pCtx->finBlk);
765✔
2280

2281
  if (pCtx->grpRemains) {
765!
2282
    MJ_ERR_JRET(mAsofBackwardHandleGrpRemains(pCtx));
×
2283
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2284
      return pCtx->finBlk;
×
2285
    }
2286
    pCtx->grpRemains = false;
×
2287
  }
2288

2289
  do {
2290
    MJ_ERR_JRET(mAsofBackwardRetrieve(pOperator, pJoin, pCtx, &newBlock));
1,578!
2291
    if (!newBlock) {
1,578✔
2292
      if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
765!
2293
        continue;
×
2294
      }
2295
      
2296
      break;
765✔
2297
    }
2298

2299
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
813!
2300
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
813!
2301
    
2302
    if (probeTs == pCtx->lastTs) {
813!
2303
      MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, true));
×
2304
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2305
        return pCtx->finBlk;
×
2306
      }
2307

2308
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
2309
        continue;
×
2310
      } else {
2311
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
2312
      }
2313
    }
2314

2315
    if (pCtx->lastEqGrp && !pCtx->eqPostDone) {
813!
2316
      pCtx->eqPostDone = true;
×
2317
      MJ_ERR_JRET(mAsofBackwardAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
×
2318
      MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
×
2319
    }
2320

2321
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
2,865✔
2322
      if (probeTs == buildTs) {
2,052✔
2323
        pCtx->lastTs = probeTs;
1,797✔
2324
        MJ_ERR_JRET(mAsofBackwardProcessEqualGrp(pCtx, probeTs, false));
1,797!
2325
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
3,594!
2326
          return pCtx->finBlk;
×
2327
        }
2328

2329
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
1,797!
2330
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
1,797!
2331
        continue;
1,797✔
2332
      }
2333

2334
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
255!
2335
        MJ_ERR_JRET(mAsofBackwardHandleClosedGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
117!
2336
      } else {
2337
        MJ_ERR_JRET(mAsofBackwardHandleUnclosedGrp(pCtx, pJoin, pBuildCol, &probeTs, &buildTs));
138!
2338
      }
2339

2340
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
510!
2341
        return pCtx->finBlk;
×
2342
      }
2343
    }
2344

2345
    if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
813!
2346
      pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
78✔
2347
      pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
78✔
2348
      pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
78✔
2349
      
2350
      MJ_ERR_JRET(mAsofBackwardDumpGrpCache(pCtx));
78!
2351
      pCtx->lastEqGrp = false;
78✔
2352
      
2353
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
78✔
2354
            
2355
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
156!
2356
        return pCtx->finBlk;
×
2357
      }
2358
    }
2359
  } while (true);
2360

2361
_return:
765✔
2362

2363
  if (code) {
765!
2364
    pJoin->errCode = code;
×
2365
    return NULL;
×
2366
  }
2367

2368
  return pCtx->finBlk;
765✔
2369
}
2370

2371
int32_t mAsofForwardTrimCacheBlk(SMJoinWindowCtx* pCtx) {
162✔
2372
  if (taosArrayGetSize(pCtx->cache.grps) <= 0) {
162✔
2373
    return TSDB_CODE_SUCCESS;
15✔
2374
  }
2375
  
2376
  SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);
147✔
2377
  if (NULL == pGrp) {
147!
2378
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2379
  }
2380

2381
  if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) {
147!
2382
    MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx));
×
2383
    pCtx->pJoin->build->blkRowIdx = 0;
×
2384
    //A S S E R T(pCtx->pJoin->build->blk == pGrp->blk);
2385
    MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
×
2386
  }
2387

2388
  return TSDB_CODE_SUCCESS;
147✔
2389
}
2390

2391
int32_t mAsofForwardChkFillGrpCache(SMJoinWindowCtx* pCtx) {
495✔
2392
  if (pCtx->cache.rowNum >= pCtx->jLimit || pCtx->pJoin->build->dsFetchDone) {
495✔
2393
    return TSDB_CODE_SUCCESS;
333✔
2394
  }
2395

2396
  MJ_ERR_RET(mAsofForwardTrimCacheBlk(pCtx));
162!
2397

2398
  SMJoinTableCtx* build = pCtx->pJoin->build;
162✔
2399
  SMJoinWinCache* pCache = &pCtx->cache;
162✔
2400
  int32_t grpNum = taosArrayGetSize(pCache->grps);
162✔
2401
  if (grpNum >= 1) {
162✔
2402
    SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1);
147✔
2403
    if (NULL == pGrp) {
147!
2404
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2405
    }
2406

2407
    if (pGrp->blk != pCache->outBlk) {
147!
2408
      int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0;
147!
2409
      MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx));
147!
2410
      if (1 == grpNum) {
147!
2411
        pGrp->blk = pCache->outBlk;
147✔
2412
        pGrp->beginIdx = 0;
147✔
2413
        pGrp->readIdx = 0;
147✔
2414
        //pGrp->endIdx = pGrp->blk->info.rows - 1;
2415
      } else {
2416
        if (NULL == taosArrayPop(pCache->grps)) {
×
2417
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2418
        }
2419
        
2420
        pGrp = taosArrayGet(pCache->grps, 0);
×
2421
        if (NULL == pGrp) {
×
2422
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2423
        }
2424

2425
        //A S S E R T(pGrp->blk == pCache->outBlk);
2426
        //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
2427
      }
2428
      
2429
      //A S S E R T((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum);
2430
    }
2431

2432
    
2433
    //A S S E R T(taosArrayGetSize(pCache->grps) == 1);
2434
    //A S S E R T(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum);
2435
  }
2436
  
2437
  do {
2438
    build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, build);
162✔
2439
    qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
162!
2440
    
2441
    build->blkRowIdx = 0;
162✔
2442
    
2443
    if (NULL == build->blk) {
162!
2444
      break;
162✔
2445
    }
2446

2447
    if ((pCache->rowNum + build->blk->info.rows) >= pCtx->jLimit) {
×
2448
      MJOIN_PUSH_BLK_TO_CACHE(pCache, build->blk);
×
2449
      break;
×
2450
    }
2451
    
2452
    MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, build->blk, 0, build->blk->info.rows));
×
2453
    pCache->rowNum += build->blk->info.rows;
×
2454
    
2455
    //pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
2456
  } while (pCache->rowNum < pCtx->jLimit);
×
2457

2458
  MJOIN_RESTORE_TB_BLK(pCache, build);
162✔
2459

2460
  return TSDB_CODE_SUCCESS;
162✔
2461
}
2462

2463
int32_t mAsofForwardUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) {
495✔
2464
  int32_t grpNum = taosArrayGetSize(pCtx->cache.grps);
495✔
2465
  if (grpNum <= 0) {
495✔
2466
    return TSDB_CODE_SUCCESS;
30✔
2467
  }
2468

2469
  SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);  
465✔
2470
  if (NULL == pGrp) {
465!
2471
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2472
  }
2473

2474
  if (1 == grpNum) {
465!
2475
    pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows - pGrp->beginIdx, pCtx->jLimit) - 1;
465✔
2476
    return TSDB_CODE_SUCCESS;
465✔
2477
  }
2478

2479
  //A S S E R T(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx));
2480
  pGrp->endIdx = pGrp->blk->info.rows - 1;
×
2481
  
2482
  int64_t remainRows = pCtx->jLimit - (pGrp->endIdx - pGrp->beginIdx + 1);
×
2483
  
2484
  pGrp = taosArrayGet(pCtx->cache.grps, 1); 
×
2485
  if (NULL == pGrp) {
×
2486
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2487
  }
2488
  
2489
  pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows, remainRows) - 1;
×
2490

2491
  return TSDB_CODE_SUCCESS;  
×
2492
}
2493

2494
int32_t mAsofForwardFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
495✔
2495
  if (!lastBuildGrp) {
495!
2496
    MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
495✔
2497
    MJ_ERR_RET(mAsofForwardChkFillGrpCache(pCtx));
495!
2498
  }
2499

2500
  MJ_ERR_RET(mAsofForwardUpdateBuildGrpEndIdx(pCtx));
495!
2501
  
2502
  return mWinJoinDumpGrpCache(pCtx);
495✔
2503
}
2504

2505
int32_t mAsofForwardSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) {
138✔
2506
  SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
138✔
2507
  if (NULL == pCol) {
138!
2508
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2509
  }
2510
  
2511
  if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
138!
2512
    *wholeBlk = false;
×
2513
    return TSDB_CODE_SUCCESS;
×
2514
  }
2515

2516
  pTable->blkRowIdx++;
138✔
2517
  pCtx->cache.rowNum--;
138✔
2518
  
2519
  char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
138✔
2520
  if (timestamp != *(int64_t*)pEndVal) {
138✔
2521
    for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
126!
2522
      char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
126✔
2523
      if (timestamp == *(int64_t*)pNextVal) {
126✔
2524
        pCtx->cache.rowNum--;
18✔
2525
        continue;
18✔
2526
      }
2527

2528
      *wholeBlk = false;  
108✔
2529
      return TSDB_CODE_SUCCESS;
108✔
2530
    }
2531
  } else {
2532
    pCtx->cache.rowNum -= (pTable->blk->info.rows - pTable->blkRowIdx);
30✔
2533
  }
2534

2535
  *wholeBlk = true;
30✔
2536
  
2537
  return TSDB_CODE_SUCCESS;
30✔
2538
}
2539

2540
int32_t mAsofForwardSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
138✔
2541
  SMJoinWinCache* cache = &pCtx->cache;
138✔
2542
  SMJoinTableCtx* pTable = pCtx->pJoin->build;
138✔
2543
  bool wholeBlk = false;
138✔
2544

2545
  do {
×
2546
    do {
2547
      MJ_ERR_RET(mAsofForwardSkipEqRows(pCtx, pTable, timestamp, &wholeBlk));
138!
2548
      if (!wholeBlk) {
138✔
2549
        return TSDB_CODE_SUCCESS;
108✔
2550
      }
2551

2552
      MJOIN_POP_TB_BLK(cache);
30!
2553
      MJOIN_RESTORE_TB_BLK(cache, pTable);
30!
2554
    } while (!MJOIN_BUILD_TB_ROWS_DONE(pTable));
30!
2555

2556
    //A S S E R T(pCtx->cache.rowNum == 0);
2557
    //A S S E R T(taosArrayGetSize(pCtx->cache.grps) == 0);
2558

2559
    if (pTable->dsFetchDone) {
30✔
2560
      return TSDB_CODE_SUCCESS;
6✔
2561
    }
2562
    
2563
    pTable->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pTable);
24✔
2564
    qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
24!
2565

2566
    pTable->blkRowIdx = 0;
24✔
2567

2568
    if (NULL == pTable->blk) {
24!
2569
      return TSDB_CODE_SUCCESS;
24✔
2570
    }
2571

2572
    MJOIN_PUSH_BLK_TO_CACHE(cache, pTable->blk);
×
2573
  } while (true);
2574

2575
  return TSDB_CODE_SUCCESS;
2576
}
2577

2578

2579
int32_t mAsofForwardUpdateDumpEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
438✔
2580
  if (!pCtx->eqRowsAcq && !lastBuildGrp) {
438!
2581
    MJ_ERR_RET(mAsofForwardSkipAllEqRows(pCtx, timestamp));
138!
2582
  }
2583

2584
  return mAsofForwardFillDumpGrpCache(pCtx, lastBuildGrp);
438✔
2585
}
2586

2587

2588
int32_t mAsofForwardProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
438✔
2589
  SMJoinOperatorInfo* pJoin = pCtx->pJoin;
438✔
2590

2591
  pCtx->lastEqGrp = true;
438✔
2592

2593
  MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp));
438!
2594

2595
  return mAsofForwardUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp);
438✔
2596
}
2597

2598
int32_t mAsofForwardHandleProbeGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData* pCol,  int64_t* probeTs, int64_t* buildTs) {
57✔
2599
  pCtx->lastEqGrp = false;
57✔
2600
  
2601
  pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
57✔
2602
  pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
57✔
2603
  pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
57✔
2604
  
2605
  while (++pJoin->probe->blkRowIdx < pJoin->probe->blk->info.rows) {
57✔
2606
    MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pJoin->probe);
39✔
2607
    if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) {
39!
2608
      pCtx->probeGrp.endIdx = pJoin->probe->blkRowIdx;
×
2609
      continue;
×
2610
    }
2611
    
2612
    break;
39✔
2613
  }
2614

2615
  return mAsofForwardFillDumpGrpCache(pCtx, false);
57✔
2616
}
2617

2618
int32_t mAsofForwardSkipBuildGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJoin, SColumnInfoData** pCol,  int64_t* probeTs, int64_t* buildTs) {
192✔
2619
  do {
2620
    MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build);
192✔
2621
    if (!PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
192!
2622
      break;
2623
    }
2624

2625
    pCtx->cache.rowNum--;
192✔
2626
    while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
231✔
2627
      MJOIN_GET_TB_CUR_TS(*pCol, *buildTs, pJoin->build);
210✔
2628
      if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
210✔
2629
        pCtx->cache.rowNum--;
39✔
2630
        continue;
39✔
2631
      }
2632
      
2633
      return TSDB_CODE_SUCCESS;
171✔
2634
    }
2635

2636
    MJOIN_POP_TB_BLK(&pCtx->cache);
21!
2637
    MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build);
21!
2638
    MJOIN_GET_TB_COL_TS(*pCol, *buildTs, pJoin->build);
21!
2639
  } while (!MJOIN_BUILD_TB_ROWS_DONE(pJoin->build));
21!
2640

2641
  return TSDB_CODE_SUCCESS;
21✔
2642
}
2643

2644
static int32_t mAsofForwardRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
468✔
2645
  *newBlock = false;
468✔
2646

2647
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
468✔
2648
  bool buildGot = false;
468✔
2649

2650
  do {
2651
    if ((probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) && pCtx->cache.rowNum < pCtx->jLimit) { 
474!
2652
      pJoin->build->newBlk = false;
249✔
2653
      MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
249!
2654
      //A S S E R T(taosArrayGetSize(pCtx->cache.grps) <= 1);
2655
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
249✔
2656
    }
2657
    
2658
    if (!probeGot) {
474✔
2659
      if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
225✔
2660
        mJoinSetDone(pOperator);
99✔
2661
      }
2662

2663
      return TSDB_CODE_SUCCESS;
225✔
2664
    }
2665

2666
    if (buildGot) {
249✔
2667
      SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
225✔
2668
      SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
225✔
2669
      if (NULL == pProbeCol || NULL == pBuildCol) {
225!
2670
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2671
      }
2672

2673
      if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
225!
2674
        pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
6✔
2675
        MJOIN_POP_TB_BLK(&pCtx->cache);
6!
2676
        buildGot = false;
6✔
2677
        continue;
6✔
2678
      }
2679
    }
2680
    
2681
    break;
243✔
2682
  } while (true);
2683

2684
  if (buildGot && pJoin->build->newBlk) {
243!
2685
    if (NULL == pCtx->cache.outBlk) {
219✔
2686
      pCtx->cache.outBlk = NULL;
93✔
2687
      int32_t code = createOneDataBlock(pJoin->build->blk, false, &pCtx->cache.outBlk);
93✔
2688
      if (code) {
93!
2689
        MJ_ERR_RET(code);
×
2690
      }
2691

2692
      MJ_ERR_RET(blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit));
93!
2693
    }
2694
    
2695
    MJOIN_PUSH_BLK_TO_CACHE(&pCtx->cache, pJoin->build->blk);
219✔
2696
    MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build);
219!
2697
  }
2698

2699
  pCtx->probeGrp.blk = pJoin->probe->blk;
243✔
2700
  *newBlock = true;
243✔
2701

2702
  return TSDB_CODE_SUCCESS;
243✔
2703
}
2704

2705

2706
SSDataBlock* mAsofForwardJoinDo(struct SOperatorInfo* pOperator) {
225✔
2707
  SMJoinOperatorInfo* pJoin = pOperator->info;
225✔
2708
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
225✔
2709
  int32_t code = TSDB_CODE_SUCCESS;
225✔
2710
  int64_t probeTs = 0;
225✔
2711
  int64_t buildTs = 0;
225✔
2712
  SColumnInfoData* pBuildCol = NULL;
225✔
2713
  SColumnInfoData* pProbeCol = NULL;
225✔
2714
  bool newBlock = false;
225✔
2715

2716
  blockDataCleanup(pCtx->finBlk);
225✔
2717

2718
  if (pCtx->grpRemains) {
225!
2719
    MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
×
2720
    if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2721
      return pCtx->finBlk;
×
2722
    }
2723
    pCtx->grpRemains = false;
×
2724
  }
2725

2726
  do {
2727
    MJ_ERR_JRET(mAsofForwardRetrieve(pOperator, pJoin, pCtx, &newBlock));
468!
2728
    if (!newBlock) {
468✔
2729
      if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
225!
2730
        continue;
×
2731
      }
2732

2733
      break;
225✔
2734
    }
2735

2736
    MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
243!
2737
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
243!
2738
    
2739
    if (probeTs == pCtx->lastTs) {
243!
2740
      MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, true));
×
2741
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
×
2742
        return pCtx->finBlk;
×
2743
      }
2744

2745
      if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
×
2746
        continue;
×
2747
      } else {
2748
        MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
×
2749
      }
2750
    }
2751

2752
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
930!
2753
      if (probeTs == buildTs) {
687✔
2754
        pCtx->lastTs = probeTs;
438✔
2755
        MJ_ERR_JRET(mAsofForwardProcessEqualGrp(pCtx, probeTs, false));
438!
2756
        if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
876!
2757
          return pCtx->finBlk;
×
2758
        }
2759

2760
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
438!
2761
        MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
438!
2762
        continue;
438✔
2763
      }
2764

2765
      if (PROBE_TS_NMATCH(pCtx->ascTs, probeTs, buildTs)) {
249!
2766
        MJ_ERR_JRET(mAsofForwardHandleProbeGrp(pCtx, pJoin, pProbeCol, &probeTs, &buildTs));
57!
2767
        MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);        
57!
2768
      } else {
2769
        MJ_ERR_JRET(mAsofForwardSkipBuildGrp(pCtx, pJoin, &pBuildCol, &probeTs, &buildTs));
192!
2770
      }
2771

2772
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
498!
2773
        return pCtx->finBlk;
×
2774
      }
2775
    }
2776

2777
    if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
243!
2778
      pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
27✔
2779
      pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
27✔
2780
      pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
27✔
2781
      
2782
      MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false));
27!
2783
      
2784
      pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
27✔
2785
            
2786
      if (mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) {
54!
2787
        return pCtx->finBlk;
×
2788
      }
2789
    }
2790
  } while (true);
2791

2792
_return:
225✔
2793

2794
  if (code) {
225!
2795
    pJoin->errCode = code;
×
2796
    return NULL;
×
2797
  }
2798

2799
  return pCtx->finBlk;
225✔
2800
}
2801

2802
void mAsofJoinGroupReset(SMJoinOperatorInfo* pJoin) {
519✔
2803
  SMJoinWindowCtx* pWin = &pJoin->ctx.windowCtx;
519✔
2804
  SMJoinWinCache* pCache = &pWin->cache;
519✔
2805

2806
  pWin->lastEqGrp = false;
519✔
2807
  pWin->lastProbeGrp = false;
519✔
2808
  pWin->eqPostDone = false;
519✔
2809
  pWin->lastTs = INT64_MIN;
519✔
2810

2811
  mWinJoinResetWindowCache(pWin, pCache);
519✔
2812

2813
  mJoinResetGroupTableCtx(pJoin->probe);
519✔
2814
  mJoinResetGroupTableCtx(pJoin->build);    
519✔
2815
}
519✔
2816

2817
static FORCE_INLINE void mWinJoinPopFrontGroup(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp) {
2818
  pCtx->cache.rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx);
39✔
2819
  if (pGrp->blk == pCtx->cache.outBlk) {
39!
2820
    blockDataCleanup(pGrp->blk);
×
2821
  } else if (pGrp->clonedBlk) {
39!
2822
    (void)blockDataDestroy(pGrp->blk);
36✔
2823
  }
2824
  
2825
  taosArrayPopFrontBatch(pCtx->cache.grps, 1);
39✔
2826
}
39✔
2827

2828
static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
4,851✔
2829
  SMJoinWinCache* pCache = &pCtx->cache;
4,851✔
2830
  SArray* pGrpArray = (NULL != pCache->grps) ? pCache->grps : pCache->grpsQueue;
4,851!
2831
  int32_t grpNum = taosArrayGetSize(pGrpArray);
4,851✔
2832
  if (grpNum <= 0) {
4,851✔
2833
    return TSDB_CODE_SUCCESS;
1,965✔
2834
  }
2835

2836
  SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGetLast(pGrpArray);
2,886✔
2837
  if (NULL == pGrp) {
2,886!
2838
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2839
  }
2840

2841
  if (!pGrp->clonedBlk) {
2,886✔
2842
    int32_t code = 0;
1,551✔
2843
    if (0 == pGrp->beginIdx) {
1,551✔
2844
      SSDataBlock* p = NULL;
1,075✔
2845
      code = createOneDataBlock(pGrp->blk, true, &p);
1,075✔
2846
      if (code) {
1,075!
2847
        MJ_ERR_RET(code);
×
2848
      }
2849
      pGrp->blk = p;
1,075✔
2850
    } else {
2851
      code = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx, &pGrp->blk);
476✔
2852
      pGrp->endIdx -= pGrp->beginIdx;
476✔
2853
      pGrp->beginIdx = 0;
476✔
2854
      pGrp->readIdx = 0;
476✔
2855
    }
2856
    if (code) {
1,551!
2857
      MJ_ERR_RET(code);
×
2858
    }
2859

2860
    pGrp->clonedBlk = true;
1,551✔
2861
  }
2862

2863
  return TSDB_CODE_SUCCESS;
2,886✔
2864
}
2865

2866
static int32_t mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx, bool* newBlock) {
5,244✔
2867
  *newBlock = false;
5,244✔
2868
  
2869
  bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
5,244✔
2870
  bool buildGot = false;
5,244✔
2871

2872
  do {
2873
    if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { 
5,244!
2874
      if (NULL == pJoin->build->blk) {
3,405!
2875
        MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx));
3,405!
2876
      }
2877
      
2878
      buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
3,405✔
2879
    }
2880
    
2881
    if (!probeGot) {
5,244✔
2882
      if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
1,839✔
2883
        mJoinSetDone(pOperator);
966✔
2884
      }
2885
      
2886
      return TSDB_CODE_SUCCESS;
1,839✔
2887
    }
2888

2889
    if (buildGot && pCtx->forwardRowsAcq) {
3,405✔
2890
      SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
96✔
2891
      SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
96✔
2892
      if (NULL == pProbeCol || NULL == pBuildCol) {
96!
2893
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2894
      }
2895

2896
      if (MJOIN_BUILD_BLK_OOR(pCtx->ascTs, pProbeCol->pData, pJoin->probe->blkRowIdx, pBuildCol->pData, pJoin->build->blk->info.rows)) {
96!
2897
        pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
×
2898
        buildGot = false;
×
2899
        continue;
×
2900
      }
2901
    }
2902
    
2903
    break;
3,405✔
2904
  } while (true);
2905

2906
  pCtx->probeGrp.blk = pJoin->probe->blk;
3,405✔
2907
  *newBlock = true;
3,405✔
2908
  
2909
  return TSDB_CODE_SUCCESS;
3,405✔
2910
}
2911

2912
int32_t mWinJoinTryAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
1,827✔
2913
  SSDataBlock* pBlk = build->blk;
1,827✔
2914
  SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId);
1,827✔
2915
  if (NULL == pCol) {
1,827!
2916
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
2917
  }
2918

2919
  if (pCtx->ascTs) {
1,827✔
2920
    if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) < pCtx->winBeginTs) {      
1,791!
2921
      *winEnd = false;
×
2922
      build->blk = NULL;
×
2923
      goto _return;
×
2924
    }
2925

2926
    if (*(int64_t*)pCol->pData > pCtx->winEndTs) {
1,791✔
2927
      *winEnd = true;
33✔
2928
      goto _return;
33✔
2929
    }
2930

2931
    for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
1,823!
2932
      if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) {
1,823✔
2933
        continue;
65✔
2934
      }
2935
    
2936
      if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) {
1,758!
2937
        SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
1,758✔
2938
        SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
1,758✔
2939
        if (NULL == pGrp) {
1,758!
2940
          MJ_ERR_RET(terrno);
×
2941
        }
2942
    
2943
        pGrp->readIdx = pGrp->beginIdx;
1,758✔
2944
        pGrp->endIdx = pGrp->beginIdx;
1,758✔
2945
    
2946
        build->blk = NULL;
1,758✔
2947
        pCache->rowNum = 1;
1,758✔
2948
      } else {
2949
        pCache->rowNum = 0;
×
2950
      }
2951
    
2952
      *winEnd = true;  
1,758✔
2953
      return TSDB_CODE_SUCCESS;
1,758✔
2954
    }
2955

2956
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2957
  }
2958
  
2959
  if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) > pCtx->winEndTs) {
36!
2960
    *winEnd = false;
×
2961
    build->blk = NULL;
×
2962
    goto _return;
×
2963
  }
2964

2965
  if (*(int64_t*)pCol->pData < pCtx->winBeginTs) {
36✔
2966
    *winEnd = true;
6✔
2967
    goto _return;
6✔
2968
  }
2969

2970
  for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
36!
2971
    if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) {
36✔
2972
      continue;
6✔
2973
    }
2974
  
2975
    if (*((int64_t*)pCol->pData + build->blkRowIdx) >= pCtx->winBeginTs) {
30!
2976
      SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
30✔
2977
      SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
30✔
2978
      if (NULL == pGrp) {
30!
2979
        MJ_ERR_RET(terrno);
×
2980
      }
2981
      
2982
      pGrp->readIdx = pGrp->beginIdx;
30✔
2983
      pGrp->endIdx = pGrp->beginIdx;
30✔
2984
  
2985
      build->blk = NULL;
30✔
2986
      pCache->rowNum = 1;
30✔
2987
    } else {
2988
      pCache->rowNum = 0;
×
2989
    }
2990
  
2991
    *winEnd = true;  
30✔
2992
    return TSDB_CODE_SUCCESS;
30✔
2993
  }  
2994

2995
  return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
2996

2997
_return:
39✔
2998

2999
  pCache->rowNum = 0;
39✔
3000

3001
  return TSDB_CODE_SUCCESS;
39✔
3002
}
3003

3004

3005

3006
int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx) {
2,025✔
3007
  SMJoinWinCache* pCache = &pCtx->cache;
2,025✔
3008
  SMJoinTableCtx* build = pCtx->pJoin->build;
2,025✔
3009
  bool winEnd = false;
2,025✔
3010
  if (NULL != build->blk) {
2,025✔
3011
    MJ_ERR_RET(mWinJoinTryAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd));
1,827!
3012
    if (winEnd || taosArrayGetSize(pCache->grps) > 0) {
1,827!
3013
      return TSDB_CODE_SUCCESS;
1,827✔
3014
    }
3015
  }
3016

3017
  if (build->dsFetchDone) {
198✔
3018
    goto _return;
132✔
3019
  }
3020
  
3021
  do {
3022
    build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build);
66✔
3023
    qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
66!
3024
    
3025
    build->blkRowIdx = 0;
66✔
3026
    
3027
    if (NULL == build->blk) {
66!
3028
      break;
66✔
3029
    }
3030

3031
    MJ_ERR_RET(mWinJoinTryAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd));
×
3032
    if (winEnd || taosArrayGetSize(pCache->grps) > 0) {
×
3033
      return TSDB_CODE_SUCCESS;
×
3034
    }
3035
  } while (true);
3036

3037
_return:
198✔
3038

3039
  return TSDB_CODE_SUCCESS;
198✔
3040
}
3041

3042

3043
int32_t mWinJoinMoveAscWinBegin(SMJoinWindowCtx* pCtx) {
5,838✔
3044
  SMJoinWinCache* pCache = &pCtx->cache;
5,838✔
3045
  
3046
  do {
12✔
3047
    int32_t grpNum = taosArrayGetSize(pCache->grps);
5,850✔
3048
    for (int32_t i = 0; i < grpNum; ++i) {
5,883✔
3049
      SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
3,888✔
3050
      if (NULL == pGrp) {
3,888!
3051
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3052
      }
3053

3054
      SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
3,888✔
3055
      if (NULL == pCol) {
3,888!
3056
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3057
      }
3058

3059
      if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) {
3,888✔
3060
        mWinJoinPopFrontGroup(pCtx, pGrp);
3061
        grpNum--;
33✔
3062
        i--;
33✔
3063
        continue;
33✔
3064
      }
3065

3066
      int32_t startIdx = pGrp->beginIdx;
3,855✔
3067
      for (; pGrp->beginIdx < pGrp->blk->info.rows; pGrp->beginIdx++) {
7,539!
3068
        if (*((int64_t*)pCol->pData + pGrp->beginIdx) < pCtx->winBeginTs) {
7,539✔
3069
          continue;
3,684✔
3070
        }
3071

3072
        if (*((int64_t*)pCol->pData + pGrp->beginIdx) <= pCtx->winEndTs) {
3,855✔
3073
          pGrp->readIdx = pGrp->beginIdx;
3,840✔
3074
          if (pGrp->endIdx < pGrp->beginIdx) {
3,840✔
3075
            pGrp->endIdx = pGrp->beginIdx;
1,005✔
3076
            pCache->rowNum = 1;
1,005✔
3077
          } else {
3078
            pCache->rowNum -= (pGrp->beginIdx - startIdx);
2,835✔
3079
          }
3080
          return TSDB_CODE_SUCCESS;
3,840✔
3081
        }
3082

3083
        pGrp->endIdx = pGrp->beginIdx;
15✔
3084
        pCache->rowNum = 0;
15✔
3085
        TSWAP(pCache->grps, pCache->grpsQueue);
15✔
3086
        return TSDB_CODE_SUCCESS;
15✔
3087
      }
3088
    }
3089

3090
    if (NULL != pCache->grpsQueue) {
1,995✔
3091
      pCache->grps = pCache->grpsQueue;
12✔
3092
      pCache->rowNum = 1;
12✔
3093
      pCache->grpsQueue = NULL;
12✔
3094
      
3095
      continue;
12✔
3096
    }
3097

3098
    break;
1,983✔
3099
  } while (true);
3100

3101
  return mWinJoinAddWinBeginBlk(pCtx);
1,983✔
3102
}
3103

3104
int32_t mWinJoinMoveDescWinBegin(SMJoinWindowCtx* pCtx) {
174✔
3105
  SMJoinWinCache* pCache = &pCtx->cache;
174✔
3106
  
3107
  do {
×
3108
    int32_t grpNum = taosArrayGetSize(pCache->grps);
174✔
3109
    for (int32_t i = 0; i < grpNum; ++i) {
180✔
3110
      SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
138✔
3111
      if (NULL == pGrp) {
138!
3112
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3113
      }
3114

3115
      SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
138✔
3116
      if (NULL == pCol) {
138!
3117
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3118
      }
3119

3120
      if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) > pCtx->winEndTs) {
138✔
3121
        mWinJoinPopFrontGroup(pCtx, pGrp);
3122

3123
        grpNum--;
6✔
3124
        i--;
6✔
3125
        continue;
6✔
3126
      }
3127

3128
      int32_t startIdx = pGrp->beginIdx;
132✔
3129
      for (; pGrp->beginIdx < pGrp->blk->info.rows; pGrp->beginIdx++) {
246!
3130
        if (*((int64_t*)pCol->pData + pGrp->beginIdx) > pCtx->winEndTs) {
246✔
3131
          continue;
114✔
3132
        }
3133

3134
        if (*((int64_t*)pCol->pData + pGrp->beginIdx) >= pCtx->winBeginTs) {
132!
3135
          pGrp->readIdx = pGrp->beginIdx;
132✔
3136
          if (pGrp->endIdx < pGrp->beginIdx) {
132✔
3137
            pGrp->endIdx = pGrp->beginIdx;
15✔
3138
            pCache->rowNum = 1;
15✔
3139
          } else {
3140
            pCache->rowNum -= (pGrp->beginIdx - startIdx);
117✔
3141
          }
3142
          return TSDB_CODE_SUCCESS;
132✔
3143
        }
3144

3145
        pGrp->endIdx = pGrp->beginIdx;
×
3146
        pCache->rowNum = 0;
×
3147
        TSWAP(pCache->grps, pCache->grpsQueue);
×
3148
        return TSDB_CODE_SUCCESS;
×
3149
      }
3150
    }
3151

3152
    if (NULL != pCache->grpsQueue) {
42!
3153
      pCache->grps = pCache->grpsQueue;
×
3154
      pCache->rowNum = 1;
×
3155
      pCache->grpsQueue = NULL;
×
3156
      
3157
      continue;
×
3158
    }
3159

3160
    break;
42✔
3161
  } while (true);
3162

3163
  return mWinJoinAddWinBeginBlk(pCtx);
42✔
3164
}
3165

3166
void mWinJoinRemoveOverflowGrp(SMJoinWindowCtx* pCtx) {
×
3167
  if (pCtx->cache.rowNum <= pCtx->jLimit) {
×
3168
    return;
×
3169
  }
3170

3171
  int32_t i = 0;
×
3172
  while (true) {
×
3173
    SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, i++);
×
3174
    if (NULL == pGrp) {
×
3175
      return;
×
3176
    }
3177

3178
    if ((pCtx->cache.rowNum - (pGrp->blk->info.rows - pGrp->beginIdx)) < pCtx->jLimit) {
×
3179
      return;
×
3180
    }
3181

3182
    mWinJoinPopFrontGroup(pCtx, pGrp);
3183
    i--;
×
3184
  }
3185
}
3186

3187
int32_t mWinJoinTryAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
×
3188
  SSDataBlock* pBlk = build->blk;
×
3189
  SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId);
×
3190
  if (NULL == pCol) {
×
3191
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3192
  }
3193

3194
  SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
×
3195

3196
  if (pCtx->ascTs) {
×
3197
    if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) {
×
3198
      *winEnd = true;
×
3199
      return TSDB_CODE_SUCCESS;
×
3200
    }
3201

3202
    if (*((int64_t*)pCol->pData +  pBlk->info.rows - 1) < pCtx->winBeginTs) {
×
3203
      *winEnd = false;
×
3204
      goto _return;
×
3205
    }
3206

3207
    if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) {
×
3208
      SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
3209
      if (NULL == pGrp) {
×
3210
        MJ_ERR_RET(terrno);
×
3211
      }
3212
      
3213
      pGrp->readIdx = pGrp->beginIdx;
×
3214
      pGrp->endIdx = pBlk->info.rows - 1;
×
3215

3216
      pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1);
×
3217
      if (pCache->rowNum >= pCtx->jLimit) {
×
3218
        pGrp->endIdx = pBlk->info.rows - 1 + pCtx->jLimit - pCache->rowNum;
×
3219
        pCache->rowNum = pCtx->jLimit;
×
3220

3221
        *winEnd = true;
×
3222
        goto _return;
×
3223
      }
3224
      
3225
      *winEnd = false;
×
3226
      goto _return;
×
3227
    }
3228

3229
    for (; build->blkRowIdx < pBlk->info.rows && pCache->rowNum < pCtx->jLimit; build->blkRowIdx++) {
×
3230
      if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) {
×
3231
        pCache->rowNum++;
×
3232
        continue;
×
3233
      }
3234

3235
      break;
×
3236
    }
3237

3238
    SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
3239
    if (NULL == pGrp) {
×
3240
      MJ_ERR_RET(terrno);
×
3241
    }
3242
    
3243
    pGrp->readIdx = pGrp->beginIdx;
×
3244
    pGrp->endIdx = build->blkRowIdx - 1;
×
3245
    
3246
    *winEnd = true;  
×
3247
    goto _return;
×
3248
  }
3249

3250
  if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) {
×
3251
    *winEnd = true;
×
3252
    return TSDB_CODE_SUCCESS;
×
3253
  }
3254
  
3255
  if (*((int64_t*)pCol->pData +  pBlk->info.rows - 1) > pCtx->winEndTs) {
×
3256
    *winEnd = false;
×
3257
    goto _return;
×
3258
  }
3259
  
3260
  if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) {
×
3261
    SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
3262
    if (NULL == pGrp) {
×
3263
      MJ_ERR_RET(terrno);
×
3264
    }
3265
    
3266
    pGrp->readIdx = pGrp->beginIdx;
×
3267
    pGrp->endIdx = pBlk->info.rows - 1;
×
3268
  
3269
    pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1);
×
3270

3271
    mWinJoinRemoveOverflowGrp(pCtx);
×
3272
    
3273
    *winEnd = false;
×
3274
    goto _return;
×
3275
  }
3276
  
3277
  for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
×
3278
    if (*((int64_t*)pCol->pData + build->blkRowIdx) >= pCtx->winBeginTs) {
×
3279
      pCache->rowNum++;
×
3280
      continue;
×
3281
    }
3282
  
3283
    break;
×
3284
  }
3285
  
3286
  SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
×
3287
  if (NULL == pGrp) {
×
3288
    MJ_ERR_RET(terrno);
×
3289
  }
3290
    
3291
  pGrp->readIdx = pGrp->beginIdx;
×
3292
  pGrp->endIdx = build->blkRowIdx - 1;
×
3293

3294
  mWinJoinRemoveOverflowGrp(pCtx);
×
3295
  
3296
  *winEnd = true;  
×
3297

3298
_return:
×
3299

3300
  build->blk = NULL;
×
3301

3302
  return TSDB_CODE_SUCCESS;
×
3303
}
3304

3305
int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx) {
2,346✔
3306
  SMJoinTableCtx* build = pCtx->pJoin->build;
2,346✔
3307
  bool winEnd = false;
2,346✔
3308
  if (NULL != build->blk) {
2,346!
3309
    MJ_ERR_RET(mWinJoinTryAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd));
×
3310
    if (winEnd) {
×
3311
      return TSDB_CODE_SUCCESS;
×
3312
    }
3313
  }
3314

3315
  if (build->dsFetchDone) {
2,346✔
3316
    goto _return;
900✔
3317
  }
3318

3319
  do {
3320
    MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx));
1,446!
3321
    
3322
    build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build);
1,446✔
3323
    qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
1,446!
3324
    
3325
    build->blkRowIdx = 0;
1,446✔
3326
    
3327
    if (NULL == build->blk) {
1,446!
3328
      break;
1,446✔
3329
    }
3330

3331
    MJ_ERR_RET(mWinJoinTryAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd));
×
3332
    if (winEnd) {
×
3333
      return TSDB_CODE_SUCCESS;
×
3334
    }
3335
  } while (true);
3336

3337
_return:
2,346✔
3338

3339
  return TSDB_CODE_SUCCESS;
2,346✔
3340
}
3341

3342
int32_t mWinJoinMoveAscWinEnd(SMJoinWindowCtx* pCtx) {
5,838✔
3343
  SMJoinWinCache* pCache = &pCtx->cache;
5,838✔
3344
  int32_t grpNum = taosArrayGetSize(pCache->grps);
5,838✔
3345
  if (grpNum <= 0 || pCache->rowNum >= pCtx->jLimit) {
5,838✔
3346
    return TSDB_CODE_SUCCESS;
849✔
3347
  }
3348
  
3349
  SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
4,989✔
3350
  if (NULL == pGrp) {
4,989!
3351
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3352
  }
3353

3354
  SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
4,989✔
3355
  if (NULL == pCol) {
4,989!
3356
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3357
  }
3358

3359
  if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) {
4,989✔
3360
    pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1;
2,313✔
3361
    if (pCache->rowNum >= pCtx->jLimit) {
2,313✔
3362
      pGrp->endIdx = pGrp->blk->info.rows - 1 + pCtx->jLimit - pCache->rowNum;
12✔
3363
      pCache->rowNum = pCtx->jLimit;
12✔
3364

3365
      return TSDB_CODE_SUCCESS;
12✔
3366
    } else {
3367
      pGrp->endIdx = pGrp->blk->info.rows - 1;
2,301✔
3368
    }
3369
  } else {
3370
    int32_t startIdx = pGrp->endIdx;
2,676✔
3371
    for (; pCache->rowNum < pCtx->jLimit && ++pGrp->endIdx < pGrp->blk->info.rows; ) {
5,271!
3372
      if (*((int64_t*)pCol->pData + pGrp->endIdx) <= pCtx->winEndTs) {
5,253✔
3373
        pCache->rowNum++;
2,595✔
3374
        if ((pGrp->endIdx + 1) >= pGrp->blk->info.rows) {
2,595!
3375
          break;
×
3376
        }
3377
        
3378
        continue;
2,595✔
3379
      }
3380

3381
      //A S S E R T(pGrp->endIdx > startIdx);
3382
      
3383
      pGrp->endIdx--;
2,658✔
3384
      break;
2,658✔
3385
    }
3386

3387
    return TSDB_CODE_SUCCESS;
2,676✔
3388
  }
3389

3390
  return mWinJoinAddWinEndBlk(pCtx);
2,301✔
3391
}
3392

3393
int32_t mWinJoinMoveDescWinEnd(SMJoinWindowCtx* pCtx) {
174✔
3394
  SMJoinWinCache* pCache = &pCtx->cache;
174✔
3395
  int32_t grpNum = taosArrayGetSize(pCache->grps);
174✔
3396
  if (grpNum <= 0) {
174✔
3397
    return TSDB_CODE_SUCCESS;
12✔
3398
  }
3399
  
3400
  SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
162✔
3401
  if (NULL == pGrp) {
162!
3402
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3403
  }
3404

3405
  SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
162✔
3406
  if (NULL == pCol) {
162!
3407
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3408
  }
3409

3410
  if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) >= pCtx->winBeginTs) {
162✔
3411
    pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1;
45✔
3412
    pGrp->endIdx = pGrp->blk->info.rows - 1;
45✔
3413
  } else {
3414
    int32_t startIdx = pGrp->endIdx;
117✔
3415
    for (; ++pGrp->endIdx < pGrp->blk->info.rows; ) {
240!
3416
      if (*((int64_t*)pCol->pData + pGrp->endIdx) >= pCtx->winBeginTs) {
240✔
3417
        pCache->rowNum++;
123✔
3418
        if ((pGrp->endIdx + 1) >= pGrp->blk->info.rows) {
123!
3419
          break;
×
3420
        }
3421
        
3422
        continue;
123✔
3423
      }
3424

3425
      //A S S E R T(pGrp->endIdx > startIdx);
3426
      
3427
      pGrp->endIdx--;
117✔
3428
      break;
117✔
3429
    }
3430

3431
    return TSDB_CODE_SUCCESS;
117✔
3432
  }
3433

3434
  return mWinJoinAddWinEndBlk(pCtx);
45✔
3435
}
3436

3437

3438
int32_t mWinJoinMoveFillWinCache(SMJoinWindowCtx* pCtx) {
6,012✔
3439
  MJ_ERR_RET((*pCtx->moveWinBeginFp)(pCtx));
6,012!
3440
  MJ_ERR_RET((*pCtx->moveWinEndFp)(pCtx));
6,012!
3441

3442
  return TSDB_CODE_SUCCESS;
6,012✔
3443
}
3444

3445
int32_t mWinJoinTrimDumpGrpCache(SMJoinWindowCtx* pCtx) {
6,012✔
3446
  if (!pCtx->ascTs) {
6,012✔
3447
    SMJoinWinCache* cache = &pCtx->cache;
174✔
3448
    if (cache->rowNum > pCtx->jLimit) {
174✔
3449
      int32_t skipRows = cache->rowNum - pCtx->jLimit;
24✔
3450
      int32_t buildGrpNum = taosArrayGetSize(cache->grps);
24✔
3451
      for (int32_t i = 0; i < buildGrpNum && skipRows > 0; ++i) {
24!
3452
        SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, i);
24✔
3453
        if (NULL == buildGrp) {
24!
3454
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
3455
        }
3456

3457
        if (skipRows >= GRP_REMAIN_ROWS(buildGrp)) {
24!
3458
          skipRows -= GRP_REMAIN_ROWS(buildGrp);
×
3459
          mWinJoinPopFrontGroup(pCtx, buildGrp);
3460
          buildGrpNum--;
×
3461
          i--;
×
3462
          continue;
×
3463
        } else {
3464
          buildGrp->beginIdx += skipRows;
24✔
3465
          buildGrp->readIdx = buildGrp->beginIdx;
24✔
3466
          break;
24✔
3467
        }
3468
      }
3469

3470
      cache->rowNum = pCtx->jLimit;
24✔
3471
    }
3472
  }
3473

3474
  return mWinJoinDumpGrpCache(pCtx);
6,012✔
3475
}
3476

3477
SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
3,927✔
3478
  SMJoinOperatorInfo* pJoin = pOperator->info;
3,927✔
3479
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
3,927✔
3480
  int32_t code = TSDB_CODE_SUCCESS;
3,927✔
3481
  int64_t probeTs = 0;
3,927✔
3482
  SColumnInfoData* pProbeCol = NULL;
3,927✔
3483
  bool newBlock = false;
3,927✔
3484

3485
  blockDataCleanup(pCtx->finBlk);
3,927✔
3486

3487
  if (pCtx->grpRemains) {
3,927✔
3488
    MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
183!
3489
    if ((mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) {
366!
3490
      return pCtx->finBlk;
183✔
3491
    }
3492
    pCtx->grpRemains = false;
×
3493
  }
3494

3495
  do {
3496
    MJ_ERR_JRET(mWinJoinRetrieve(pOperator, pJoin, pCtx, &newBlock));
5,244!
3497
    if (!newBlock) {
5,244✔
3498
      if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
1,839✔
3499
        continue;
432✔
3500
      }
3501
      
3502
      break;
1,407✔
3503
    }
3504

3505
    MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
3,405!
3506

3507
    while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
7,080✔
3508
      MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
6,012✔
3509

3510
      MJ_ERR_JRET(mJoinBuildEqGrp(pJoin->probe, probeTs, NULL, &pCtx->probeGrp));
6,012!
3511
      
3512
      if (probeTs != pCtx->lastTs) {
6,012!
3513
        pCtx->lastTs = probeTs;
6,012✔
3514
        pCtx->winBeginTs = probeTs + pCtx->winBeginOffset;
6,012✔
3515
        pCtx->winEndTs = probeTs + pCtx->winEndOffset;
6,012✔
3516
        MJ_ERR_JRET(mWinJoinMoveFillWinCache(pCtx));
6,012!
3517
      }
3518

3519
      MJ_ERR_JRET(mWinJoinTrimDumpGrpCache(pCtx));
6,012!
3520
      
3521
      if ((mJoinBlkReachThreshold(pJoin, pCtx->finBlk->info.rows)) || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) {
12,024!
3522
        return pCtx->finBlk;
2,337✔
3523
      }
3524
    }
3525
  } while (true);
3526

3527
_return:
1,407✔
3528

3529
  if (code) {
1,407!
3530
    pJoin->errCode = code;
×
3531
    return NULL;
×
3532
  }
3533

3534
  return pCtx->finBlk;
1,407✔
3535
}
3536

3537
void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin) {
873✔
3538
  SMJoinWindowCtx* pWin = &pJoin->ctx.windowCtx;
873✔
3539
  SMJoinWinCache* pCache = &pWin->cache;
873✔
3540

3541
  pWin->lastEqGrp = false;
873✔
3542
  pWin->lastProbeGrp = false;
873✔
3543
  pWin->eqPostDone = false;
873✔
3544
  pWin->lastTs = INT64_MIN;
873✔
3545

3546
  mWinJoinResetWindowCache(pWin, pCache);
873✔
3547
  
3548
  mJoinResetGroupTableCtx(pJoin->probe);
873✔
3549
  mJoinResetGroupTableCtx(pJoin->build);  
873✔
3550
}
873✔
3551

3552
int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
1,473✔
3553
  pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT;
1,473✔
3554
  pCache->colNum = pJoin->build->finNum;
1,473✔
3555
  
3556
  pCache->grps = taosArrayInit(2, sizeof(SMJoinGrpRows));
1,473✔
3557
  if (NULL == pCache->grps) {
1,473!
3558
    return terrno;
×
3559
  }
3560
  //taosArrayReserve(pTable->eqGrps, 1);
3561
  
3562
  return TSDB_CODE_SUCCESS;
1,473✔
3563
}
3564

3565
void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) {
1,473✔
3566
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
1,473✔
3567

3568
  mWinJoinResetWindowCache(pCtx, &pCtx->cache);
1,473✔
3569

3570
  blockDataDestroy(pCtx->finBlk);
1,473✔
3571
  pCtx->finBlk = NULL;
1,473✔
3572
  blockDataDestroy(pCtx->cache.outBlk);
1,473✔
3573
  pCtx->cache.outBlk = NULL;
1,473✔
3574

3575
  taosArrayDestroy(pCtx->cache.grps);
1,473✔
3576
}
1,473✔
3577

3578
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
1,473✔
3579
  SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
1,473✔
3580
  
3581
  pCtx->pJoin = pJoin;
1,473✔
3582
  pCtx->lastTs = INT64_MIN;
1,473✔
3583
  pCtx->seqWinGrp = pJoinNode->seqWinGroup;
1,473✔
3584
  if (pCtx->seqWinGrp) {
1,473✔
3585
    pJoin->outGrpId = 1;
375✔
3586
  }
3587

3588
  if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
1,473✔
3589
    pCtx->ascTs = true;
1,410✔
3590
  }
3591

3592
  switch (pJoinNode->subType) {
1,473!
3593
    case JOIN_STYPE_ASOF:
471✔
3594
      pCtx->asofOpType = pJoinNode->asofOpType;
471✔
3595
      pCtx->jLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit) ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i : 1;
471!
3596
      pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
471!
3597
      pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
471✔
3598
      pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
471✔
3599

3600
      if ((pCtx->ascTs && pCtx->lowerRowsAcq) || (!pCtx->ascTs && pCtx->greaterRowsAcq) ) {
471✔
3601
        pJoin->joinFp = mAsofBackwardJoinDo;
372✔
3602
      } else {
3603
        pJoin->joinFp = mAsofForwardJoinDo;
99✔
3604
        pCtx->forwardRowsAcq = true;
99✔
3605
      }
3606
      pJoin->grpResetFp = mAsofJoinGroupReset;
471✔
3607
      break;
471✔
3608
    case JOIN_STYPE_WIN: {
1,002✔
3609
      SWindowOffsetNode* pOffsetNode = (SWindowOffsetNode*)pJoinNode->pWindowOffset;
1,002✔
3610
      SValueNode* pWinBegin = (SValueNode*)pOffsetNode->pStartOffset;
1,002✔
3611
      SValueNode* pWinEnd = (SValueNode*)pOffsetNode->pEndOffset;
1,002✔
3612
      pCtx->jLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit) ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i : INT64_MAX;
1,002!
3613
      pCtx->winBeginOffset = pWinBegin->datum.i;
1,002✔
3614
      pCtx->winEndOffset = pWinEnd->datum.i;
1,002✔
3615
      pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0);
1,002✔
3616
      pCtx->lowerRowsAcq = pCtx->winBeginOffset < 0;
1,002✔
3617
      pCtx->greaterRowsAcq = pCtx->winEndOffset > 0;
1,002✔
3618
      pCtx->moveWinBeginFp = (joinMoveWin)(pCtx->ascTs ? mWinJoinMoveAscWinBegin : mWinJoinMoveDescWinBegin);
1,002✔
3619
      pCtx->moveWinEndFp = (joinMoveWin)(pCtx->ascTs ? mWinJoinMoveAscWinEnd : mWinJoinMoveDescWinEnd);
1,002✔
3620
      if ((pCtx->ascTs && !pCtx->lowerRowsAcq) || (!pCtx->ascTs && !pCtx->greaterRowsAcq) ) {
1,002✔
3621
        pCtx->forwardRowsAcq = true;
84✔
3622
      }
3623
      break;
1,002✔
3624
    }
3625
    default:
×
3626
      break;
×
3627
  }
3628

3629
  if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
1,473✔
3630
    pCtx->ascTs = true;
1,410✔
3631
  }
3632

3633
  pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
1,473✔
3634
  if (NULL == pCtx->finBlk) {
1,473!
3635
    MJ_ERR_RET(terrno);
×
3636
  }
3637

3638
  MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
1,473!
3639

3640
  pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;
1,473✔
3641

3642
  MJ_ERR_RET(mJoinInitWindowCache(&pCtx->cache, pJoin, pCtx));
1,473!
3643
  
3644
  return TSDB_CODE_SUCCESS;
1,473✔
3645
}
3646

3647
void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin) {
101,662✔
3648
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
101,662✔
3649
  blockDataDestroy(pCtx->finBlk);
101,662✔
3650
  blockDataDestroy(pCtx->midBlk);
101,674✔
3651

3652
  pCtx->finBlk = NULL;
101,669✔
3653
  pCtx->midBlk = NULL;
101,669✔
3654
}
101,669✔
3655

3656

3657
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
101,661✔
3658
  SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
101,661✔
3659

3660
  pCtx->pJoin = pJoin;
101,661✔
3661
  pCtx->lastEqTs = INT64_MIN;
101,661✔
3662
  pCtx->hashCan = pJoin->probe->keyNum > 0;
101,661✔
3663

3664
  if (JOIN_STYPE_ASOF == pJoinNode->subType || JOIN_STYPE_WIN == pJoinNode->subType) {
101,661!
3665
    pCtx->jLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit) ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i : 1;
346!
3666
    pJoin->subType = JOIN_STYPE_OUTER;
346✔
3667
    pJoin->build->eqRowLimit = pCtx->jLimit;
346✔
3668
    pJoin->grpResetFp = mLeftJoinGroupReset;
346✔
3669
  } else {
3670
    pCtx->jLimit = -1;
101,315✔
3671
  }
3672
    
3673
  if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
101,661✔
3674
    pCtx->ascTs = true;
100,631✔
3675
  }
3676

3677
  pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
101,661✔
3678
  if (NULL == pCtx->finBlk) {
101,672!
3679
    MJ_ERR_RET(terrno);
×
3680
  }
3681

3682
  //A S S E R T(pJoinNode->node.pOutputDataBlockDesc->totalRowSize > 0);
3683

3684
  MJ_ERR_RET(blockDataEnsureCapacity(pCtx->finBlk, mJoinGetFinBlkCapacity(pJoin, pJoinNode)));
101,672!
3685
  
3686
  if (pJoin->pFPreFilter) {
101,672✔
3687
    pCtx->midBlk = NULL;
453✔
3688
    int32_t code = createOneDataBlock(pCtx->finBlk, false, &pCtx->midBlk);
453✔
3689
    if (code) {
453!
3690
      MJ_ERR_RET(code);
×
3691
    }
3692
    MJ_ERR_RET(blockDataEnsureCapacity(pCtx->midBlk, pCtx->finBlk->info.capacity));
453!
3693
  }
3694

3695
  pCtx->blkThreshold = pCtx->finBlk->info.capacity * MJOIN_BLK_THRESHOLD_RATIO;
101,673✔
3696

3697
  switch (pJoin->joinType) {
101,673!
3698
    case JOIN_TYPE_INNER:
99,714✔
3699
      pCtx->hashCartFp = (joinCartFp)mInnerJoinHashCart;
99,714✔
3700
      pCtx->mergeCartFp = (joinCartFp)mInnerJoinMergeCart;
99,714✔
3701
      break;
99,714✔
3702
    case JOIN_TYPE_LEFT:
1,770✔
3703
    case JOIN_TYPE_RIGHT: {
3704
      switch (pJoin->subType) {
1,770!
3705
        case JOIN_STYPE_OUTER:          
799✔
3706
          pCtx->hashCartFp = (joinCartFp)mLeftJoinHashCart;
799✔
3707
          pCtx->mergeCartFp = (joinCartFp)mLeftJoinMergeCart;
799✔
3708
          break;
799✔
3709
        case JOIN_STYPE_SEMI: 
490✔
3710
          pCtx->hashCartFp = (joinCartFp)mSemiJoinHashCart;
490✔
3711
          pCtx->mergeCartFp = (joinCartFp)mSemiJoinMergeCart;
490✔
3712
          break;
490✔
3713
        case JOIN_STYPE_ANTI:
481✔
3714
          pCtx->hashCartFp = (joinCartFp)mAntiJoinHashCart;
481✔
3715
          pCtx->mergeCartFp = (joinCartFp)mAntiJoinMergeCart;
481✔
3716
          break;
481✔
3717
        default:
×
3718
          break;
×
3719
      }
3720
      break;
1,770✔
3721
    }
3722
    case JOIN_TYPE_FULL:
189✔
3723
      pCtx->hashCartFp = (joinCartFp)mFullJoinHashCart;
189✔
3724
      pCtx->mergeCartFp = (joinCartFp)mFullJoinMergeCart;
189✔
3725
      break;
189✔
3726
    default:
×
3727
      break;
×
3728
  }
3729
  
3730
  return TSDB_CODE_SUCCESS;
101,673✔
3731
}
3732

3733

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc