• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3858

17 Apr 2025 01:40PM UTC coverage: 62.968% (+0.5%) from 62.513%
#3858

push

travis-ci

web-flow
docs(opc): add perssit data support (#30783)

156194 of 316378 branches covered (49.37%)

Branch coverage included in aggregate %.

242021 of 316027 relevant lines covered (76.58%)

19473613.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.45
/source/libs/executor/src/mergejoinoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "querynodes.h"
22
#include "querytask.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "thash.h"
26
#include "tmsg.h"
27
#include "ttypes.h"
28
#include "functionMgt.h"
29
#include "mergejoin.h"
30

31
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
5,498✔
32
  SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
5,498✔
33
  if (NULL == pCol) {
5,498!
34
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
35
  }
36

37
  pGrp->beginIdx = pTable->blkRowIdx;
5,498✔
38
  pGrp->readIdx = pTable->blkRowIdx;
5,498✔
39

40
  pTable->blkRowIdx++;
5,498✔
41
  char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
5,498✔
42
  if (timestamp != *(int64_t*)pEndVal) {
5,498✔
43
    for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
4,610!
44
      char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
4,610✔
45
      if (timestamp == *(int64_t*)pNextVal) {
4,610✔
46
        continue;
880✔
47
      }
48

49
      pGrp->endIdx = pTable->blkRowIdx - 1;
3,730✔
50
      return TSDB_CODE_SUCCESS;
3,730✔
51
    }
52
  }
53

54
  pGrp->endIdx = pTable->blk->info.rows - 1;
1,768✔
55
  pTable->blkRowIdx = pTable->blk->info.rows;
1,768✔
56

57
  if (wholeBlk) {
1,768✔
58
    *wholeBlk = true;
422✔
59
  }
60

61
  return TSDB_CODE_SUCCESS;
1,768✔
62
}
63

64

65
int32_t mJoinTrimKeepFirstRow(SSDataBlock* pBlock) {
160✔
66
  int32_t bmLen = BitmapLen(pBlock->info.rows);
160✔
67
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
160✔
68

69
  for (int32_t i = 0; i < numOfCols; ++i) {
980✔
70
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
820✔
71
    if (NULL == pDst) {
820!
72
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
73
    }
74
  
75
    // it is a reserved column for scalar function, and no data in this column yet.
76
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
820!
77
      continue;
×
78
    }
79

80
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
820!
81
      pDst->varmeta.length = 0;
×
82

83
      if (!colDataIsNull_var(pDst, 0)) {
×
84
        char*   p1 = colDataGetVarData(pDst, 0);
×
85
        int32_t len = 0;
×
86
        if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
×
87
          len = getJsonValueLen(p1);
×
88
        } else {
89
          len = varDataTLen(p1);
×
90
        }
91
        pDst->varmeta.length = len;
×
92
      }
93
    } else {
94
      bool isNull = colDataIsNull_f(pDst->nullbitmap, 0);
820✔
95

96
      TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
820✔
97
      if (isNull) {
820!
98
        colDataSetNull_f(pDst->nullbitmap, 0);
×
99
      }
100
    }
101
  }
102

103
  pBlock->info.rows = 1;
160✔
104

105
  return TSDB_CODE_SUCCESS;
160✔
106
}
107

108

109
int32_t mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
224✔
110
  //  int32_t totalRows = pBlock->info.rows;
111
  int32_t code = TSDB_CODE_SUCCESS;
224✔
112
  int32_t bmLen = BitmapLen(totalRows);
224✔
113
  char*   pBitmap = NULL;
224✔
114
  int32_t maxRows = 0;
224✔
115
  size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
224✔
116

117
  for (int32_t i = 0; i < numOfCols; ++i) {
1,388✔
118
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
1,164✔
119
    if (NULL == pDst) {
1,164!
120
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
121
    }
122

123
    // it is a reserved column for scalar function, and no data in this column yet.
124
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
1,164!
125
      continue;
×
126
    }
127

128
    int32_t numOfRows = 0;
1,164✔
129
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
1,184!
130
      int32_t j = 0;
20✔
131
      pDst->varmeta.length = 0;
20✔
132

133
      while (j < totalRows) {
30!
134
        if (pBoolList[j] == 0) {
30✔
135
          j += 1;
10✔
136
          continue;
10✔
137
        }
138

139
        if (colDataIsNull_var(pDst, j)) {
20!
140
          colDataSetNull_var(pDst, numOfRows);
×
141
        } else {
142
          // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
143
          // copy it to p2
144
          char*   p1 = colDataGetVarData(pDst, j);
20✔
145
          int32_t len = 0;
20✔
146
          if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
20✔
147
            len = getJsonValueLen(p1);
16✔
148
          } else {
149
            len = varDataTLen(p1);
4✔
150
          }
151
          char* p2 = taosMemoryMalloc(len);
20!
152
          if (NULL == p2) {
20!
153
            MJ_ERR_RET(terrno);
×
154
          }
155
          TAOS_MEMCPY(p2, p1, len);
20✔
156
          code = colDataSetVal(pDst, numOfRows, p2, false);
20✔
157
          if (code) {
20!
158
            taosMemoryFreeClear(p2);
×
159
            MJ_ERR_RET(terrno);
×
160
          }
161
          taosMemoryFree(p2);
20!
162
        }
163
        numOfRows += 1;
20✔
164
        j += 1;
20✔
165
        break;
20✔
166
      }
167

168
      if (maxRows < numOfRows) {
20✔
169
        maxRows = numOfRows;
4✔
170
      }
171
    } else {
172
      if (pBitmap == NULL) {
1,144✔
173
        pBitmap = taosMemoryCalloc(1, bmLen);
224!
174
        if (NULL == pBitmap) {
224!
175
          MJ_ERR_RET(terrno);
×
176
        }
177
      }
178

179
      TAOS_MEMCPY(pBitmap, pDst->nullbitmap, bmLen);
1,144✔
180
      TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
1,144✔
181

182
      int32_t j = 0;
1,144✔
183

184
      switch (pDst->info.type) {
1,144!
185
        case TSDB_DATA_TYPE_BIGINT:
324✔
186
        case TSDB_DATA_TYPE_UBIGINT:
187
        case TSDB_DATA_TYPE_DOUBLE:
188
        case TSDB_DATA_TYPE_TIMESTAMP:
189
          while (j < totalRows) {
491!
190
            if (pBoolList[j] == 0) {
491✔
191
              j += 1;
167✔
192
              continue;
167✔
193
            }
194

195
            if (colDataIsNull_f(pBitmap, j)) {
324!
196
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
197
            } else {
198
              ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
324✔
199
            }
200
            numOfRows += 1;
324✔
201
            j += 1;
324✔
202
            break;
324✔
203
          }
204
          break;
324✔
205
        case TSDB_DATA_TYPE_FLOAT:
820✔
206
        case TSDB_DATA_TYPE_INT:
207
        case TSDB_DATA_TYPE_UINT:
208
          while (j < totalRows) {
1,244!
209
            if (pBoolList[j] == 0) {
1,244✔
210
              j += 1;
424✔
211
              continue;
424✔
212
            }
213
            if (colDataIsNull_f(pBitmap, j)) {
820!
214
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
215
            } else {
216
              ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
820✔
217
            }
218
            numOfRows += 1;
820✔
219
            j += 1;
820✔
220
            break;
820✔
221
          }
222
          break;
820✔
223
        case TSDB_DATA_TYPE_SMALLINT:
×
224
        case TSDB_DATA_TYPE_USMALLINT:
225
          while (j < totalRows) {
×
226
            if (pBoolList[j] == 0) {
×
227
              j += 1;
×
228
              continue;
×
229
            }
230
            if (colDataIsNull_f(pBitmap, j)) {
×
231
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
232
            } else {
233
              ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
×
234
            }
235
            numOfRows += 1;
×
236
            j += 1;
×
237
            break;
×
238
          }
239
          break;
×
240
        case TSDB_DATA_TYPE_BOOL:
×
241
        case TSDB_DATA_TYPE_TINYINT:
242
        case TSDB_DATA_TYPE_UTINYINT:
243
          while (j < totalRows) {
×
244
            if (pBoolList[j] == 0) {
×
245
              j += 1;
×
246
              continue;
×
247
            }
248
            if (colDataIsNull_f(pBitmap, j)) {
×
249
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
250
            } else {
251
              ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
×
252
            }
253
            numOfRows += 1;
×
254
            j += 1;
×
255
            break;
×
256
          }
257
          break;
×
258
      }
259
    }
260

261
    if (maxRows < numOfRows) {
1,164✔
262
      maxRows = numOfRows;
220✔
263
    }
264
  }
265

266
  pBlock->info.rows = maxRows;
224✔
267
  if (pBitmap != NULL) {
224!
268
    taosMemoryFree(pBitmap);
224!
269
  }
270

271
  return TSDB_CODE_SUCCESS;
224✔
272
}
273

274

275
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx) {
×
276
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
×
277
    return TSDB_CODE_SUCCESS;
×
278
  }
279

280
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
×
281
  SColumnInfoData*   p = NULL;
×
282

283
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
×
284
  if (code != TSDB_CODE_SUCCESS) {
×
285
    goto _err;
×
286
  }
287

288
  int32_t status = 0;
×
289
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
×
290
  if (code != TSDB_CODE_SUCCESS) {
×
291
    goto _err;
×
292
  }
293

294
  if (!build->pHashGrpRows->allRowsMatch && (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED)) {
×
295
    if (status == FILTER_RESULT_ALL_QUALIFIED && taosArrayGetSize(build->pHashCurGrp) == pBlock->info.rows) {
×
296
      build->pHashGrpRows->allRowsMatch = true;
×
297
    } else {
298
      bool* pRes = (bool*)p->pData;
×
299
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
300
        if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + i)) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) {
×
301
          continue;
×
302
        }
303

304
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i);
×
305
        build->pHashGrpRows->rowMatchNum++;
×
306
      }
307

308
      if (build->pHashGrpRows->rowMatchNum == taosArrayGetSize(build->pHashGrpRows->pRows)) {
×
309
        build->pHashGrpRows->allRowsMatch = true;
×
310
      }
311
    }
312
  }
313

314
  code = extractQualifiedTupleByFilterResult(pBlock, p, status);
×
315

316
_err:
×
317
  colDataDestroy(p);
×
318
  taosMemoryFree(p);
×
319
  
320
  return code;
×
321
}
322

323
int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx) {
204✔
324
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
204!
325
    return TSDB_CODE_SUCCESS;
×
326
  }
327

328
  int32_t code = TSDB_CODE_SUCCESS;
204✔
329
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
204✔
330
  SColumnInfoData*   p = NULL;
204✔
331

332
  code = filterSetDataFromSlotId(pFilterInfo, &param1);
204✔
333
  if (code != TSDB_CODE_SUCCESS) {
204!
334
    goto _return;
×
335
  }
336

337
  int32_t status = 0;
204✔
338
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
204✔
339
  if (code != TSDB_CODE_SUCCESS) {
204!
340
    goto _return;
×
341
  }
342

343
  int32_t rowNum = 0;
204✔
344
  bool* pRes = (bool*)p->pData;  
204✔
345
  int32_t grpNum = taosArrayGetSize(build->eqGrps);
204✔
346
  if (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED) {
204✔
347
    for (int32_t i = startGrpIdx; i < grpNum && rowNum < pBlock->info.rows; startRowIdx = 0, ++i) {
184!
348
      SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, i);
92✔
349
      if (NULL == buildGrp) {
92!
350
        MJ_ERR_JRET(terrno);
×
351
      }
352
      if (buildGrp->allRowsMatch) {
92✔
353
        rowNum += buildGrp->endIdx - startRowIdx + 1;
14✔
354
        continue;
14✔
355
      }
356
      
357
      if (status == FILTER_RESULT_ALL_QUALIFIED && startRowIdx == buildGrp->beginIdx && ((pBlock->info.rows - rowNum) >= (buildGrp->endIdx - startRowIdx + 1))) {
78!
358
        buildGrp->allRowsMatch = true;
28✔
359
        rowNum += buildGrp->endIdx - startRowIdx + 1;
28✔
360
        continue;
28✔
361
      }
362

363
      for (int32_t m = startRowIdx; m <= buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) {
150!
364
        if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + rowNum)) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) {
100!
365
          continue;
56✔
366
        }
367

368
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx);
44✔
369
        buildGrp->rowMatchNum++;
44✔
370
      }
371

372
      if (buildGrp->rowMatchNum == (buildGrp->endIdx - buildGrp->beginIdx + 1)) {
50✔
373
        buildGrp->allRowsMatch = true;
14✔
374
      }
375
    }
376
  } 
377
  
378
  code = extractQualifiedTupleByFilterResult(pBlock, p, status);
204✔
379

380
_return:
204✔
381
  colDataDestroy(p);
204✔
382
  taosMemoryFree(p);
204!
383
  
384
  return code;
204✔
385
}
386

387
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
600✔
388
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
600!
389
    return TSDB_CODE_SUCCESS;
×
390
  }
391

392
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
600✔
393
  SColumnInfoData*   p = NULL;
600✔
394

395
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
600✔
396
  if (code != TSDB_CODE_SUCCESS) {
600!
397
    goto _return;
×
398
  }
399

400
  int32_t status = 0;
600✔
401
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
600✔
402
  if (code != TSDB_CODE_SUCCESS) {
600!
403
    goto _return;
×
404
  }
405
  
406
  if (status == FILTER_RESULT_ALL_QUALIFIED) {
600✔
407
    pBlock->info.rows = 1;
160✔
408
    MJ_ERR_JRET(mJoinTrimKeepFirstRow(pBlock));
160!
409
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
440✔
410
    pBlock->info.rows = 0;
216✔
411
  } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
224!
412
    MJ_ERR_JRET(mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData));
224!
413
  }
414

415
  code = TSDB_CODE_SUCCESS;
600✔
416

417
_return:
600✔
418

419
  colDataDestroy(p);
600✔
420
  taosMemoryFree(p);
600!
421
  
422
  return code;
600✔
423
}
424

425
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
700✔
426
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
700!
427
    return TSDB_CODE_SUCCESS;
×
428
  }
429

430
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
700✔
431
  SColumnInfoData*   p = NULL;
700✔
432

433
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
700✔
434
  if (code != TSDB_CODE_SUCCESS) {
700!
435
    goto _err;
×
436
  }
437

438
  int32_t status = 0;
700✔
439
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
700✔
440
  if (code != TSDB_CODE_SUCCESS) {
700!
441
    goto _err;
×
442
  }
443
  
444
  if (status == FILTER_RESULT_NONE_QUALIFIED) {
700✔
445
    pBlock->info.rows = 0;
294✔
446
  }
447

448
  code = TSDB_CODE_SUCCESS;
700✔
449

450
_err:
700✔
451

452
  colDataDestroy(p);
700✔
453
  taosMemoryFree(p);
700!
454
  
455
  return code;
700✔
456
}
457

458

459
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
768✔
460
  SSDataBlock* pLess = *ppMid;
768✔
461
  SSDataBlock* pMore = *ppFin;
768✔
462

463
/*
464
  if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
465
    pLess = (*ppMid);
466
    pMore = (*ppFin);
467
  } else {
468
    pLess = (*ppFin);
469
    pMore = (*ppMid);
470
  }
471
*/
472

473
  int32_t totalRows = pMore->info.rows + pLess->info.rows;
768✔
474
  if (totalRows <= pMore->info.capacity) {
768!
475
    MJ_ERR_RET(blockDataMerge(pMore, pLess));
768!
476
    blockDataCleanup(pLess);
768✔
477
    pCtx->midRemains = false;
768✔
478
  } else {
479
    int32_t copyRows = pMore->info.capacity - pMore->info.rows;
×
480
    if (copyRows > 0) {
×
481
      MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
×
482
      blockDataShrinkNRows(pLess, copyRows);
×
483
    }
484
    
485
    pCtx->midRemains = true;
×
486
  }
487

488
/*
489
  if (pMore != (*ppFin)) {
490
    TSWAP(*ppMid, *ppFin);
491
  }
492
*/
493

494
  return TSDB_CODE_SUCCESS;
768✔
495
}
496

497

498
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
×
499
  TSWAP(pCtx->midBlk, pCtx->finBlk);
×
500

501
  pCtx->midRemains = false;
×
502

503
  return TSDB_CODE_SUCCESS;
×
504
}
505

506
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp) {
1,392✔
507
  SMJoinTableCtx* probe = probeGrp ? pJoin->probe : pJoin->build;
1,392✔
508
  SMJoinTableCtx* build = probeGrp ? pJoin->build : pJoin->probe;
1,392✔
509
  int32_t currRows = append ? pRes->info.rows : 0;
1,392!
510
  int32_t firstRows = GRP_REMAIN_ROWS(pGrp);
1,392✔
511
  
512
  for (int32_t c = 0; c < probe->finNum; ++c) {
3,704✔
513
    SMJoinColMap* pFirstCol = probe->finCols + c;
2,312✔
514
    SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
2,312✔
515
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
2,312✔
516
    if (NULL == pInCol || NULL == pOutCol) {
2,312!
517
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
518
    }
519

520
    MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows));
2,312!
521
  }
522
  
523
  for (int32_t c = 0; c < build->finNum; ++c) {
3,680✔
524
    SMJoinColMap* pSecondCol = build->finCols + c;
2,288✔
525
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
2,288✔
526
    if (NULL == pOutCol) {
2,288!
527
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
528
    }
529

530
    colDataSetNItemsNull(pOutCol, currRows, firstRows);
2,288✔
531
  }
532
  
533
  pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
1,392!
534
  return TSDB_CODE_SUCCESS;
1,392✔
535
}
536

537

538
int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp, bool singleProbeRow) {
730✔
539
  pCtx->lastEqGrp = false;
730✔
540
  pCtx->lastProbeGrp = probeGrp;
730✔
541

542
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
730✔
543
  if (rowsLeft <= 0) {
730!
544
    pCtx->grpRemains = pGrp->readIdx <= pGrp->endIdx;
×
545
    return TSDB_CODE_SUCCESS;
×
546
  }
547

548
  if (probeGrp && singleProbeRow) {
730✔
549
    rowsLeft = 1;
106✔
550
  }
551

552
  if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) {
730!
553
    MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
730!
554
    pGrp->readIdx = pGrp->endIdx + 1;
730✔
555
    pCtx->grpRemains = false;
730✔
556
  } else {
557
    int32_t endIdx = pGrp->endIdx;
×
558
    pGrp->endIdx = pGrp->readIdx + rowsLeft - 1;
×
559
    MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
×
560
    pGrp->readIdx = pGrp->endIdx + 1;
×
561
    pGrp->endIdx = endIdx;
×
562
    pCtx->grpRemains = true;
×
563
  }
564

565
  return TSDB_CODE_SUCCESS;
730✔
566
}
567

568
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond) {
9,029,134✔
569
  SMJoinTableCtx* probe = pJoin->probe;
9,029,134✔
570
  SMJoinTableCtx* build = pJoin->build;
9,029,134✔
571
  int32_t currRows = append ? pRes->info.rows : 0;
9,029,134!
572
  int32_t firstRows = GRP_REMAIN_ROWS(pFirst);  
9,029,134✔
573
  int32_t secondRows = GRP_REMAIN_ROWS(pSecond);
9,029,134✔
574

575
  for (int32_t c = 0; c < probe->finNum; ++c) {
30,447,991✔
576
    SMJoinColMap* pFirstCol = probe->finCols + c;
21,418,901✔
577
    SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
21,418,901✔
578
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
21,418,897✔
579
    if (NULL == pInCol || NULL == pOutCol) {
21,418,896!
580
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
581
    }
582

583
    for (int32_t r = 0; r < firstRows; ++r) {
45,898,414✔
584
      if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
48,959,114✔
585
        colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
725,912✔
586
      } else {
587
        if (pRes->info.capacity < (pRes->info.rows + firstRows * secondRows)) {
23,753,645!
588
          qError("capacity:%d not enough, rows:%" PRId64 ", firstRows:%d, secondRows:%d", pRes->info.capacity, pRes->info.rows, firstRows, secondRows);
×
589
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
590
        }
591
        uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type)) ? pOutCol->varmeta.length : ((currRows + r * secondRows) * pOutCol->info.bytes);
23,753,136!
592
        if ((startOffset + 1 * pOutCol->info.bytes) > pRes->info.capacity * pOutCol->info.bytes) {
23,753,136!
593
          qError("col buff not enough, startOffset:%d, bytes:%d, capacity:%d", startOffset, pOutCol->info.bytes, pRes->info.capacity);
×
594
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
595
        }
596
        MJ_ERR_RET(colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r), secondRows, true));
23,752,698!
597
      }
598
    }
599
  }
600

601
  for (int32_t c = 0; c < build->finNum; ++c) {
15,875,756✔
602
    SMJoinColMap* pSecondCol = build->finCols + c;
6,846,682✔
603
    SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
6,846,682✔
604
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
6,846,682✔
605
    if (NULL == pInCol || NULL == pOutCol) {
6,846,682!
606
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
607
    }
608

609
    for (int32_t r = 0; r < firstRows; ++r) {
14,715,111✔
610
      MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows));
7,868,445✔
611
    }
612
  }
613

614
  pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows;
9,029,074!
615
  
616
  return TSDB_CODE_SUCCESS;
9,029,074✔
617
}
618

619

620

621
int32_t mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build, bool* cont) {
×
622
  if (NULL != cont) {
×
623
    *cont = false;
×
624
  }
625
  
626
  int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
×
627
  if (rowsLeft <= 0) {
×
628
    return TSDB_CODE_SUCCESS;
×
629
  }
630
  
631
  int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
×
632
  int32_t grpRows = buildGrpRows - build->grpRowIdx;
×
633
  if (grpRows <= 0 || build->grpRowIdx < 0) {
×
634
    build->grpRowIdx = -1;
×
635
    if (NULL != cont) {
×
636
      *cont = true;
×
637
    }
638
    return TSDB_CODE_SUCCESS;
×
639
  }
640
  
641
  int32_t actRows = TMIN(grpRows, rowsLeft);
×
642
  int32_t currRows = append ? pBlk->info.rows : 0;
×
643

644
  for (int32_t c = 0; c < probe->finNum; ++c) {
×
645
    SMJoinColMap* pFirstCol = probe->finCols + c;
×
646
    SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
×
647
    SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
×
648
    if (NULL == pInCol || NULL == pOutCol) {
×
649
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
650
    }
651

652
    if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
×
653
      colDataSetNItemsNull(pOutCol, currRows, actRows);
×
654
    } else {
655
      MJ_ERR_RET(colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, true));
×
656
    }
657
  }
658

659
  for (int32_t c = 0; c < build->finNum; ++c) {
×
660
    SMJoinColMap* pSecondCol = build->finCols + c;
×
661
    SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
×
662
    for (int32_t r = 0; r < actRows; ++r) {
×
663
      SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r);
×
664
      if (NULL == pRow) {
×
665
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
666
      }
667

668
      SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
×
669
      if (NULL == pInCol) {
×
670
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
671
      }
672

673
      MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1));
×
674
    }
675
  }
676

677
  pBlk->info.rows += actRows;
×
678
  
679
  if (actRows == grpRows) {
×
680
    build->grpRowIdx = -1;
×
681
  } else {
682
    build->grpRowIdx += actRows;
×
683
  }
684
  
685
  if (actRows == rowsLeft) {
×
686
    return TSDB_CODE_SUCCESS;
×
687
  }
688

689
  if (NULL != cont) {
×
690
    *cont = true;
×
691
  }
692
  
693
  return TSDB_CODE_SUCCESS;
×
694
}
695

696
int32_t mJoinAllocGrpRowBitmap(SMJoinTableCtx*        pTb) {
116✔
697
  int32_t grpNum = taosArrayGetSize(pTb->eqGrps);
116✔
698
  for (int32_t i = 0; i < grpNum; ++i) {
232✔
699
    SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGet(pTb->eqGrps, i);
116✔
700
    if (NULL == pGrp) {
116!
701
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
702
    }
703

704
    MJ_ERR_RET(mJoinGetRowBitmapOffset(pTb, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset));
116!
705
    pGrp->rowMatchNum = 0;
116✔
706
  }
707

708
  return TSDB_CODE_SUCCESS;
116✔
709
}
710

711

712
int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
9,016,586✔
713
  SMJoinOperatorInfo* pJoin = pCtx->pJoin;
9,016,586✔
714

715
  pCtx->lastEqGrp = true;
9,016,586✔
716

717
  MJ_ERR_RET(mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true));
9,016,586!
718
  if (!lastBuildGrp) {
9,016,598✔
719
    MJ_ERR_RET(mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp));
9,013,076!
720
  } else {
721
    pJoin->build->grpIdx = 0;
3,522✔
722
  }
723
  
724
  if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) {
9,016,596!
725
    if (!lastBuildGrp || !pCtx->hashJoin) {
×
726
      if (pJoin->build->rowBitmapSize > 0) {
×
727
        MJ_ERR_RET(mJoinCreateFullBuildTbHash(pJoin, pJoin->build));
×
728
      } else {
729
        MJ_ERR_RET(mJoinCreateBuildTbHash(pJoin, pJoin->build));
×
730
      }
731
    }
732

733
    if (pJoin->probe->newBlk) {
×
734
      MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe));
×
735
      pJoin->probe->newBlk = false;
×
736
    }
737
    
738
    pCtx->hashJoin = true;    
×
739

740
    return (*pCtx->hashCartFp)(pCtx);
×
741
  }
742

743
  pCtx->hashJoin = false;    
9,016,596✔
744

745
  if (!lastBuildGrp && pJoin->build->rowBitmapSize > 0) {
9,016,596✔
746
    MJ_ERR_RET(mJoinAllocGrpRowBitmap(pJoin->build));
116!
747
  }
748
  
749
  return (*pCtx->mergeCartFp)(pCtx);
9,016,596✔
750
}
751

752
int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol,  int64_t* probeTs, int64_t* buildTs) {
278✔
753
  pCtx->probeNEqGrp.blk = pTb->blk;
278✔
754
  pCtx->probeNEqGrp.beginIdx = pTb->blkRowIdx;
278✔
755
  pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
278✔
756
  pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx;
278✔
757
  
758
  while (++pTb->blkRowIdx < pTb->blk->info.rows) {
692✔
759
    MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb);
524✔
760
    if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) {
524✔
761
      pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx;
414✔
762
      continue;
414✔
763
    }
764
    
765
    break;
110✔
766
  }
767

768
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);  
278✔
769
}
770

771
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol,  int64_t* probeTs, int64_t* buildTs) {
26✔
772
  pCtx->buildNEqGrp.blk = pTb->blk;
26✔
773
  pCtx->buildNEqGrp.beginIdx = pTb->blkRowIdx;
26✔
774
  pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
26✔
775
  pCtx->buildNEqGrp.endIdx = pCtx->buildNEqGrp.beginIdx;
26✔
776
  
777
  while (++pTb->blkRowIdx < pTb->blk->info.rows) {
26✔
778
    MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb);
24✔
779
    if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
24!
780
      pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx;
×
781
      continue;
×
782
    }
783
    
784
    break;
24✔
785
  }
786

787
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false);  
26✔
788
}
789

790

791
SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
11,073✔
792
  SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
11,073!
793
  if (p) {
11,073!
794
    p[0] = pDownstream[0];
11,073✔
795
    p[1] = pDownstream[0];
11,073✔
796
  }
797

798
  return p;
11,073✔
799
}
800

801
int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDownstream, int32_t *numOfDownstream, bool *newDownstreams) {
101,682✔
802
  if (1 == *numOfDownstream) {
101,682✔
803
    *newDownstreams = true;
11,073✔
804
    *pDownstream = mJoinBuildDownstreams(pInfo, *pDownstream);
11,073✔
805
    if (NULL == *pDownstream) {
11,073!
806
      return terrno;
×
807
    }
808
    *numOfDownstream = 2;
11,073✔
809
  }
810

811
  return TSDB_CODE_SUCCESS;
101,682✔
812
}
813

814
static int32_t mJoinInitPrimKeyInfo(SMJoinTableCtx* pTable, int32_t slotId) {
203,352✔
815
  pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo));
203,352!
816
  if (NULL == pTable->primCol) {
203,373!
817
    return terrno;
×
818
  }
819

820
  pTable->primCol->srcSlot = slotId;
203,373✔
821

822
  return TSDB_CODE_SUCCESS;
203,373✔
823
}
824

825
static int32_t mJoinInitColsInfo(int32_t* colNum, int64_t* rowSize, SMJoinColInfo** pCols, SNodeList* pList) {
203,351✔
826
  *colNum = LIST_LENGTH(pList);
203,351✔
827
  
828
  *pCols = taosMemoryMalloc((*colNum) * sizeof(SMJoinColInfo));
203,351!
829
  if (NULL == *pCols) {
203,372!
830
    return terrno;
×
831
  }
832

833
  *rowSize = 0;
203,372✔
834
  
835
  int32_t i = 0;
203,372✔
836
  SNode* pNode = NULL;
203,372✔
837
  FOREACH(pNode, pList) {
203,898✔
838
    SColumnNode* pColNode = (SColumnNode*)pNode;
526✔
839
    (*pCols)[i].srcSlot = pColNode->slotId;
526✔
840
    (*pCols)[i].jsonData = TSDB_DATA_TYPE_JSON == pColNode->node.resType.type;
526✔
841
    (*pCols)[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
526!
842
    (*pCols)[i].bytes = pColNode->node.resType.bytes;
526✔
843
    *rowSize += pColNode->node.resType.bytes;
526✔
844
    ++i;
526✔
845
  }  
846

847
  return TSDB_CODE_SUCCESS;
203,372✔
848
}
849

850

851
static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bool allocKeyBuf) {
203,351✔
852
  int64_t rowSize = 0;
203,351✔
853
  MJ_ERR_RET(mJoinInitColsInfo(&pTable->keyNum, &rowSize, &pTable->keyCols, pList));
203,351!
854

855
  if (pTable->keyNum > 1 || allocKeyBuf) {
203,366✔
856
    if (rowSize > 1) {
302✔
857
      pTable->keyNullSize = 1;
72✔
858
    } else {
859
      pTable->keyNullSize = 2;
230✔
860
    }
861

862
    pTable->keyBuf = taosMemoryMalloc(TMAX(rowSize, pTable->keyNullSize));
302!
863
    if (NULL == pTable->keyBuf) {
286!
864
      return terrno;
×
865
    }
866
  }
867

868
  return TSDB_CODE_SUCCESS;
203,350✔
869
}
870

871

872
static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
203,345✔
873
  pTable->finCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap));
203,345!
874
  if (NULL == pTable->finCols) {
203,370!
875
    return terrno;
×
876
  }
877
  
878
  int32_t i = 0;
203,370✔
879
  SNode* pNode = NULL;
203,370✔
880
  FOREACH(pNode, pList) {
768,031✔
881
    STargetNode* pTarget = (STargetNode*)pNode;
564,661✔
882
    SColumnNode* pColumn = (SColumnNode*)pTarget->pExpr;
564,661✔
883
    if (pColumn->dataBlockId == pTable->blkId) {
564,661✔
884
      pTable->finCols[i].srcSlot = pColumn->slotId;
282,337✔
885
      pTable->finCols[i].dstSlot = pTarget->slotId;
282,337✔
886
      pTable->finCols[i].bytes = pColumn->node.resType.bytes;
282,337✔
887
      pTable->finCols[i].vardata = IS_VAR_DATA_TYPE(pColumn->node.resType.type);
282,337!
888
      ++i;
282,337✔
889
    }
890
  }  
891

892
  pTable->finNum = i;
203,370✔
893

894
  return TSDB_CODE_SUCCESS;
203,370✔
895
}
896

897
static int32_t mJoinInitFuncPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
408✔
898
  SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
408✔
899
  if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
408!
900
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
901
  }
902

903
  if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) {
408!
904
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
905
  }
906

907
  SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
408✔
908
  if (NULL == pUnit) {
408!
909
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
910
  }
911
  SValueNode* pCurrTz = NULL;
408✔
912
  if (5 == pFunc->pParameterList->length){
408✔
913
    pCurrTz = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
18✔
914
    if (NULL == pCurrTz) {
18!
915
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
916
    }
917
  }
918
  SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
408✔
919
  if (NULL == pTimeZone) {
408!
920
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
921
  }
922

923
  pCtx->truncateUnit = pUnit->typeData;
408✔
924
  if ((NULL == pCurrTz || 1 == pCurrTz->typeData) && pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) {
408!
925
    pCtx->timezoneUnit = offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
102!
926
  }
927

928
  pCtx->type = E_PRIM_TIMETRUNCATE;
408✔
929

930
  return TSDB_CODE_SUCCESS;
408✔
931
}
932

933
static int32_t mJoinInitValPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
×
934
  SValueNode* pVal = (SValueNode*)pTarget->pExpr;
×
935
  if (TSDB_DATA_TYPE_TIMESTAMP != pVal->node.resType.type) {
×
936
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
937
  }
938

939
  pCtx->constTs = pVal->datum.i;
×
940
  pCtx->type = E_PRIM_VALUE;
×
941

942
  return TSDB_CODE_SUCCESS;
×
943
}
944

945

946
static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) {
203,357✔
947
  if (NULL == pNode) {
203,357✔
948
    pCtx->targetSlotId = pTable->primCol->srcSlot;
202,956✔
949
    return TSDB_CODE_SUCCESS;
202,956✔
950
  }
951
  
952
  if (QUERY_NODE_TARGET != nodeType(pNode)) {
401!
953
    qError("primary expr node is not target, type:%d", nodeType(pNode));
×
954
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
955
  }  
956

957
  STargetNode* pTarget = (STargetNode*)pNode;
401✔
958
  if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr) && QUERY_NODE_VALUE != nodeType(pTarget->pExpr)) {
401!
959
    qError("Invalid primary expr node type:%d", nodeType(pTarget->pExpr));
×
960
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
961
  }
962

963
  if (QUERY_NODE_FUNCTION == nodeType(pTarget->pExpr)) {
401!
964
    MJ_ERR_RET(mJoinInitFuncPrimExprCtx(pCtx, pTarget));
408!
965
  } else if (QUERY_NODE_VALUE == nodeType(pTarget->pExpr)) {
×
966
    MJ_ERR_RET(mJoinInitValPrimExprCtx(pCtx, pTarget));
×
967
  }
968

969
  pCtx->targetSlotId = pTarget->slotId;
408✔
970

971
  return TSDB_CODE_SUCCESS;
408✔
972
}
973

974
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat, bool sameDs) {
203,350✔
975
  SMJoinTableCtx* pTable = &pJoin->tbs[idx];
203,350✔
976
  pTable->downStream = pDownstream[idx];
203,350✔
977
  pTable->blkId = getOperatorResultBlockId(pDownstream[idx], sameDs ? idx : 0);
203,350✔
978
  MJ_ERR_RET(mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
203,353!
979

980
  MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight, JOIN_TYPE_FULL == pJoin->joinType));
203,377!
981
  MJ_ERR_RET(mJoinInitFinColsInfo(pTable, pJoinNode->pTargets));
203,350!
982

983
  TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat));
203,366✔
984

985
  pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
203,366✔
986
  if (NULL == pTable->eqGrps) {
203,367!
987
    return terrno;
×
988
  }
989
  
990
  if (E_JOIN_TB_BUILD == pTable->type) {
203,370✔
991
    pTable->createdBlks = taosArrayInit(8, POINTER_BYTES);
101,686✔
992
    if (NULL == pTable->createdBlks) {
101,694!
993
      return terrno;
×
994
    }
995
    pTable->pGrpArrays = taosArrayInit(32, POINTER_BYTES);
101,694✔
996
    if (NULL == pTable->pGrpArrays) {
101,683!
997
      return terrno;
×
998
    }
999
    pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
101,683✔
1000
    if (NULL == pTable->pGrpHash) {
101,683!
1001
      return terrno;
×
1002
    }
1003

1004
    if (pJoin->pFPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) {
101,683!
1005
      pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE;
32✔
1006
      pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize);
32!
1007
      if (NULL == pTable->pRowBitmap) {
34!
1008
        return terrno;
×
1009
      }
1010
    }
1011

1012
    pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter);
101,685✔
1013
    pTable->multiEqGrpRows = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter);
101,685✔
1014
    pTable->multiRowsGrp = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter);
101,685✔
1015
    if (JOIN_STYPE_ASOF == pJoinNode->subType) {
101,685✔
1016
      pTable->eqRowLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit) ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i : 1;
554!
1017
    }
1018
  } else {
1019
    pTable->multiEqGrpRows = true;
101,684✔
1020
  }
1021

1022
  MJ_ERR_RET(mJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
203,369!
1023
  
1024
  return TSDB_CODE_SUCCESS;
203,365✔
1025
}
1026

1027
static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
101,678✔
1028
  int32_t buildIdx = 0;
101,678✔
1029
  int32_t probeIdx = 1;
101,678✔
1030

1031
  pInfo->joinType = pJoinNode->joinType;
101,678✔
1032
  pInfo->subType = pJoinNode->subType;
101,678✔
1033
  
1034
  switch (pInfo->joinType) {
101,678!
1035
    case JOIN_TYPE_INNER:
99,503✔
1036
    case JOIN_TYPE_FULL:
1037
      buildIdx = 1;
99,503✔
1038
      probeIdx = 0;
99,503✔
1039
      break;
99,503✔
1040
    case JOIN_TYPE_LEFT:
1,331✔
1041
      buildIdx = 1;
1,331✔
1042
      probeIdx = 0;
1,331✔
1043
      break;
1,331✔
1044
    case JOIN_TYPE_RIGHT:
852✔
1045
      buildIdx = 0;
852✔
1046
      probeIdx = 1;
852✔
1047
      break;
852✔
1048
    default:
×
1049
      break;
×
1050
  } 
1051
  
1052
  pInfo->build = &pInfo->tbs[buildIdx];
101,678✔
1053
  pInfo->probe = &pInfo->tbs[probeIdx];
101,678✔
1054
  
1055
  pInfo->build->downStreamIdx = buildIdx;
101,678✔
1056
  pInfo->probe->downStreamIdx = probeIdx;
101,678✔
1057

1058
  if (0 == buildIdx) {
101,678✔
1059
    pInfo->build->primExpr = pJoinNode->leftPrimExpr;
852✔
1060
    pInfo->probe->primExpr = pJoinNode->rightPrimExpr;
852✔
1061
  } else {
1062
    pInfo->build->primExpr = pJoinNode->rightPrimExpr;
100,826✔
1063
    pInfo->probe->primExpr = pJoinNode->leftPrimExpr;
100,826✔
1064
  }
1065
  
1066
  pInfo->build->type = E_JOIN_TB_BUILD;
101,678✔
1067
  pInfo->probe->type = E_JOIN_TB_PROBE;
101,678✔
1068
}
101,678✔
1069

1070
int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
446,514✔
1071
  if (NULL == pTable->primExpr) {
446,514✔
1072
    return TSDB_CODE_SUCCESS;
446,031✔
1073
  }
1074

1075
  SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
483✔
1076
  if (NULL == pPrimOut) {
516!
1077
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1078
  }
1079

1080
  SMJoinPrimExprCtx* pCtx = &pTable->primCtx;
516✔
1081
  switch (pCtx->type) {
516!
1082
    case E_PRIM_TIMETRUNCATE: {
516✔
1083
      SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
516✔
1084
      if (NULL == pPrimIn) {
516!
1085
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1086
      }
1087

1088
      if (0 != pCtx->timezoneUnit) {
516✔
1089
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
886✔
1090
          ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit;
784✔
1091
        }
1092
      } else {
1093
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2,704✔
1094
          ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
2,290✔
1095
        }
1096
      }
1097
      break;
516✔
1098
    }
1099
    case E_PRIM_VALUE: {
×
1100
      MJ_ERR_RET(colDataSetNItems(pPrimOut, 0, (char*)&pCtx->constTs, pBlock->info.rows, false));
×
1101
      break;
×
1102
    }
1103
    default:
×
1104
      break;
×
1105
  }
1106

1107
  return TSDB_CODE_SUCCESS;
516✔
1108
}
1109

1110
SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
6,356✔
1111
  SSDataBlock* pTmp = NULL;
6,356✔
1112
  int32_t code = TSDB_CODE_SUCCESS;
6,356✔
1113
  int32_t dsIdx = pTable->downStreamIdx; 
6,356✔
1114
  if (E_JOIN_TB_PROBE == pTable->type) {
6,356✔
1115
    if (pTable->remainInBlk) {
2,932✔
1116
      pTmp = pTable->remainInBlk;
1,106✔
1117
      pTable->remainInBlk = NULL;
1,106✔
1118
      (*pJoin->grpResetFp)(pJoin);
1,106✔
1119
      pTable->lastInGid = pTmp->info.id.groupId;
1,106✔
1120
      goto _return;
1,106✔
1121
    }
1122

1123
    if (pTable->dsFetchDone) {
1,826!
1124
      return NULL;
×
1125
    }
1126

1127
    pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
1,826✔
1128
    if (NULL == pTmp) {
1,826✔
1129
      pTable->dsFetchDone = true;
360✔
1130
      return NULL;
360✔
1131
    }
1132
    
1133
    if (0 == pTable->lastInGid) {
1,466✔
1134
      pTable->lastInGid = pTmp->info.id.groupId;
360✔
1135
      goto _return;
360✔
1136
    }
1137

1138
    if (pTable->lastInGid == pTmp->info.id.groupId) {
1,106!
1139
      goto _return;
×
1140
    }
1141

1142
    pTable->remainInBlk = pTmp;
1,106✔
1143
    return NULL;
1,106✔
1144
  }
1145

1146
  SMJoinTableCtx* pProbe = pJoin->probe;
3,424✔
1147

1148
  while (true) {
1,456✔
1149
    if (pTable->remainInBlk) {
4,880✔
1150
      if (pTable->remainInBlk->info.id.groupId == pProbe->lastInGid) {
2,898✔
1151
        pTmp = pTable->remainInBlk;
1,404✔
1152
        pTable->remainInBlk = NULL;
1,404✔
1153
        pTable->lastInGid = pTmp->info.id.groupId;
1,404✔
1154
        goto _return;
1,404✔
1155
      }
1156

1157
      if (pTable->remainInBlk->info.id.groupId > pProbe->lastInGid) {
1,494✔
1158
        return NULL;
1,466✔
1159
      }
1160

1161
      pTable->remainInBlk = NULL;
28✔
1162
    }
1163

1164
    if (pTable->dsFetchDone) {
2,010✔
1165
      return NULL;
254✔
1166
    }
1167

1168
    SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
1,756✔
1169
    if (NULL == pTmp) {
1,756✔
1170
      pTable->dsFetchDone = true;
300✔
1171
      return NULL;
300✔
1172
    }
1173

1174
    pTable->remainInBlk = pTmp;
1,456✔
1175
  }
1176

1177
_return:
2,870✔
1178

1179
  code = mJoinLaunchPrimExpr(pTmp, pTable);
2,870✔
1180
  if (code) {
2,870!
1181
    pJoin->errCode = code;
×
1182
    T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
×
1183
  }
1184
  
1185
  return pTmp;
2,870✔
1186
}
1187

1188
static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
572,577✔
1189
  if (pTable->dsFetchDone) {
572,577✔
1190
    return NULL;
895✔
1191
  }
1192
  
1193
  SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTable->downStreamIdx);
571,682✔
1194
  if (NULL == pTmp) {
571,652✔
1195
    pTable->dsFetchDone = true;
128,021✔
1196
  } else {
1197
    int32_t code = mJoinLaunchPrimExpr(pTmp, pTable);
443,631✔
1198
    if (code) {
443,680✔
1199
      pJoin->errCode = code;
20✔
1200
      T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
20!
1201
    }
1202
  }
1203

1204
  return pTmp;
571,681✔
1205
}
1206

1207

1208
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
101,684✔
1209
  pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin;
101,684✔
1210
  pJoin->ctx.mergeCtx.limit = (pJoinNode->node.pLimit && ((SLimitNode*)pJoinNode->node.pLimit)->limit) ? ((SLimitNode*)pJoinNode->node.pLimit)->limit->datum.i : INT64_MAX;
101,684!
1211
  pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl;
101,684✔
1212
  pJoin->outBlkId = pJoinNode->node.pOutputDataBlockDesc->dataBlockId;
101,684✔
1213
  
1214
  if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) 
101,684✔
1215
       || (JOIN_STYPE_WIN == pJoin->subType)) {
101,370✔
1216
    pJoin->ctx.mergeCtxInUse = false;
985✔
1217
    return mJoinInitWindowCtx(pJoin, pJoinNode);
985✔
1218
  }
1219

1220
  pJoin->ctx.mergeCtxInUse = true;  
100,699✔
1221
  return mJoinInitMergeCtx(pJoin, pJoinNode);
100,699✔
1222
}
1223

1224
static void mJoinDestroyCtx(SMJoinOperatorInfo* pJoin) {
101,690✔
1225
  if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) {
101,690✔
1226
    return mJoinDestroyWindowCtx(pJoin);
978✔
1227
  }
1228
  
1229
  return mJoinDestroyMergeCtx(pJoin);
100,712✔
1230
}
1231

1232
bool mJoinIsDone(SOperatorInfo* pOperator) {
430✔
1233
  return (OP_EXEC_DONE == pOperator->status);
430✔
1234
}
1235

1236
void mJoinSetDone(SOperatorInfo* pOperator) {
124,088✔
1237
  setOperatorCompleted(pOperator);
124,088✔
1238
  if (pOperator->pDownstreamGetParams) {
124,098✔
1239
    freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM);
34,641✔
1240
    freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM);
34,641✔
1241
    pOperator->pDownstreamGetParams[0] = NULL;
34,641✔
1242
    pOperator->pDownstreamGetParams[1] = NULL;
34,641✔
1243
  }
1244
}
124,098✔
1245

1246
bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
725,542✔
1247
  if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) {
725,542✔
1248
    (*ppBlk) = (*pJoin->retrieveFp)(pJoin, pTb);
550,350✔
1249
    pTb->dsInitDone = true;
550,393✔
1250

1251
    qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0);
550,393✔
1252

1253
    *pIdx = 0;
550,405✔
1254
    if (NULL != (*ppBlk)) {
550,405✔
1255
      pTb->newBlk = true;
423,809✔
1256
    }
1257
    
1258
    return ((*ppBlk) == NULL) ? false : true;
550,405✔
1259
  }
1260

1261
  return true;
175,192✔
1262
}
1263

1264
static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
18,303,737✔
1265
  int32_t blkNum = taosArrayGetSize(pCreatedBlks);
18,303,737✔
1266
  for (int32_t i = 0; i < blkNum; ++i) {
18,330,578✔
1267
    (void)blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i));
26,845✔
1268
  }
1269
  taosArrayClear(pCreatedBlks);
18,303,733✔
1270
}
18,303,734✔
1271

1272
int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset) {
116✔
1273
  int32_t bitmapLen = BitmapLen(rowNum);
116✔
1274
  int64_t reqSize = pTable->rowBitmapOffset + bitmapLen;
116✔
1275
  if (reqSize > pTable->rowBitmapSize) {
116!
1276
    int64_t newSize = reqSize * 1.1;
×
1277
    pTable->pRowBitmap = taosMemoryRealloc(pTable->pRowBitmap, newSize);
×
1278
    if (NULL == pTable->pRowBitmap) {
×
1279
      return terrno;
×
1280
    }
1281
    pTable->rowBitmapSize = newSize;
×
1282
  }
1283

1284
  TAOS_MEMSET(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen);
116✔
1285
  
1286
  *rowBitmapOffset = pTable->rowBitmapOffset;
116✔
1287
  pTable->rowBitmapOffset += bitmapLen;
116✔
1288

1289
  return TSDB_CODE_SUCCESS;
116✔
1290
}
1291

1292
void mJoinResetForBuildTable(SMJoinTableCtx* pTable) {
18,029,649✔
1293
  pTable->grpTotalRows = 0;
18,029,649✔
1294
  pTable->grpIdx = 0;
18,029,649✔
1295
  pTable->eqRowNum = 0;
18,029,649✔
1296
  mJoinDestroyCreatedBlks(pTable->createdBlks);
18,029,649✔
1297
  taosArrayClear(pTable->eqGrps);
18,029,643✔
1298
  if (pTable->rowBitmapSize > 0) {
18,029,637✔
1299
    pTable->rowBitmapOffset = 1;
116✔
1300
    TAOS_MEMSET(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx));
116✔
1301
  }
1302
}
18,029,637✔
1303

1304
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
18,052,379✔
1305
  SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
18,052,379✔
1306
  if (NULL == pCol) {
18,052,372!
1307
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1308
  }
1309
  
1310
  SMJoinGrpRows* pGrp = NULL;
18,052,372✔
1311
  int32_t code = TSDB_CODE_SUCCESS;
18,052,372✔
1312

1313
  if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
18,052,372✔
1314
    return TSDB_CODE_SUCCESS;
19,203✔
1315
  }
1316

1317
  if (restart) {
18,033,169✔
1318
    mJoinResetForBuildTable(pTable);
18,029,650✔
1319
  }
1320

1321
  bool keepGrp = true;
18,033,158✔
1322
  pGrp = taosArrayReserve(pTable->eqGrps, 1);
18,033,158✔
1323
  if (NULL == pGrp) {
18,033,154!
1324
    MJ_ERR_RET(terrno);
×
1325
  }
1326
  
1327
  pGrp->beginIdx = pTable->blkRowIdx++;
18,033,167✔
1328
  pGrp->readIdx = pGrp->beginIdx;
18,033,167✔
1329
  pGrp->endIdx = pGrp->beginIdx;
18,033,167✔
1330
  pGrp->readMatch = false;
18,033,167✔
1331
  pGrp->blk = pTable->blk;
18,033,167✔
1332

1333
  char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
18,033,167✔
1334
  if (timestamp == *(int64_t*)pEndVal) {
18,033,167✔
1335
    if (pTable->multiEqGrpRows) {
92,313✔
1336
      pGrp->endIdx = pTable->blk->info.rows - 1;
91,973✔
1337
    } else {
1338
      pGrp->endIdx = pGrp->beginIdx;
340✔
1339
    }
1340
    
1341
    pTable->blkRowIdx = pTable->blk->info.rows;
92,313✔
1342
  } else {
1343
    for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
25,208,716!
1344
      char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
25,208,735✔
1345
      if (timestamp == *(int64_t*)pNextVal) {
25,208,735✔
1346
        pGrp->endIdx++;
7,267,862✔
1347
        continue;
7,267,862✔
1348
      }
1349

1350
      if (!pTable->multiEqGrpRows) {
17,940,873✔
1351
        pGrp->endIdx = pGrp->beginIdx;
1,776✔
1352
      } else if (0 == pTable->eqRowLimit) {
17,939,097✔
1353
        // DO NOTHING
1354
      } else if (pTable->eqRowLimit == pTable->eqRowNum) {
840!
1355
        keepGrp = false;
×
1356
      } else {
1357
        int64_t rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
840✔
1358
        pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
840✔
1359
        pTable->eqRowNum += rowNum;
840✔
1360
      }
1361
      
1362
      goto _return;
17,940,873✔
1363
    }
1364
  }
1365

1366
  if (wholeBlk && (pTable->multiEqGrpRows || restart)) {
92,294!
1367
    *wholeBlk = true;
27,007✔
1368
    
1369
    if (pTable->noKeepEqGrpRows || !keepGrp) {
27,007!
1370
      goto _return;
162✔
1371
    }
1372
    
1373
    if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
26,845✔
1374
      pGrp->blk = NULL;
13,728✔
1375
      code = createOneDataBlock(pTable->blk, true, &pGrp->blk);
13,728✔
1376
      if (code) {
13,728!
1377
        MJ_ERR_RET(code);
×
1378
      }
1379

1380
      if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
27,456!
1381
        MJ_ERR_RET(terrno);
×
1382
      }
1383
    } else {
1384
      if (!pTable->multiEqGrpRows) {
13,117✔
1385
        pGrp->endIdx = pGrp->beginIdx;
178✔
1386
      }
1387

1388
      int64_t rowNum = 0;
13,117✔
1389
      if (!pTable->multiEqGrpRows) {
13,117✔
1390
        rowNum = 1;
178✔
1391
        pGrp->endIdx = pGrp->beginIdx;
178✔
1392
      } else if (0 == pTable->eqRowLimit) {
12,939✔
1393
        rowNum = pGrp->endIdx - pGrp->beginIdx + 1;
12,645✔
1394
      } else if (pTable->eqRowLimit == pTable->eqRowNum) {
294!
1395
        keepGrp = false;
×
1396
      } else {
1397
        rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
294✔
1398
        pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
294✔
1399
      }
1400

1401
      if (keepGrp && rowNum > 0) {
13,117!
1402
        pTable->eqRowNum += rowNum;
13,117✔
1403
        code = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum, &pGrp->blk);
13,117✔
1404
        if (code) {
13,117!
1405
          MJ_ERR_RET(code);
×
1406
        }
1407

1408
        pGrp->endIdx -= pGrp->beginIdx;
13,117✔
1409
        pGrp->beginIdx = 0;
13,117✔
1410
        pGrp->readIdx = 0;
13,117✔
1411
        if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
26,234!
1412
          MJ_ERR_RET(terrno);
×
1413
        }
1414
      }
1415
    }
1416
    
1417
  }
1418

1419
_return:
78,404✔
1420

1421
  if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
18,033,167!
1422
    if (NULL == taosArrayPop(pTable->eqGrps)) {
1,004!
1423
      code = terrno;
×
1424
    }
1425
  } else {
1426
    pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;  
18,032,163✔
1427
  }
1428
  
1429
  return code;
18,033,179✔
1430
}
1431

1432

1433
int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp) {
9,013,076✔
1434
  bool wholeBlk = false;
9,013,076✔
1435
  
1436
  MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true));
9,013,076!
1437
  
1438
  while (wholeBlk && !pTable->dsFetchDone) {
9,035,799!
1439
    pTable->blk = (*pJoin->retrieveFp)(pJoin, pTable);
27,007✔
1440
    qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
27,007!
1441

1442
    pTable->blkRowIdx = 0;
27,007✔
1443

1444
    if (NULL == pTable->blk) {
27,007✔
1445
      break;
4,282✔
1446
    }
1447

1448
    wholeBlk = false;
22,725✔
1449
    MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false));
22,725✔
1450
  }
1451

1452
  return TSDB_CODE_SUCCESS;
9,013,074✔
1453
}
1454

1455
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
×
1456
  for (int32_t i = 0; i < pTable->keyNum; ++i) {
×
1457
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
×
1458
    if (NULL == pCol) {
×
1459
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1460
    }
1461

1462
    if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type))  {
×
1463
      qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata);
×
1464
      return TSDB_CODE_INVALID_PARA;
×
1465
    }
1466
    if (pTable->keyCols[i].bytes != pCol->info.bytes)  {
×
1467
      qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes, pTable->keyCols[i].bytes);
×
1468
      return TSDB_CODE_INVALID_PARA;
×
1469
    }
1470
    pTable->keyCols[i].data = pCol->pData;
×
1471
    if (pTable->keyCols[i].vardata) {
×
1472
      pTable->keyCols[i].offset = pCol->varmeta.offset;
×
1473
    }
1474
    pTable->keyCols[i].colData = pCol;
×
1475
  }
1476

1477
  return TSDB_CODE_SUCCESS;
×
1478
}
1479

1480
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) {
×
1481
  char *pData = NULL;
×
1482
  size_t bufLen = 0;
×
1483
  
1484
  if (1 == pTable->keyNum) {
×
1485
    if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) {
×
1486
      return true;
×
1487
    }
1488
    if (pTable->keyCols[0].jsonData) {
×
1489
      pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
×
1490
      bufLen = getJsonValueLen(pData);
×
1491
    } else if (pTable->keyCols[0].vardata) {
×
1492
      pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
×
1493
      bufLen = varDataTLen(pData);
×
1494
    } else {
1495
      pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
×
1496
      bufLen = pTable->keyCols[0].bytes;
×
1497
    }
1498
    pTable->keyData = pData;
×
1499
  } else {
1500
    for (int32_t i = 0; i < pTable->keyNum; ++i) {
×
1501
      if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) {
×
1502
        return true;
×
1503
      }
1504
      if (pTable->keyCols[0].jsonData) {
×
1505
        pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
×
1506
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, getJsonValueLen(pData));
×
1507
        bufLen += getJsonValueLen(pData);
×
1508
      } else if (pTable->keyCols[i].vardata) {
×
1509
        pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
×
1510
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
×
1511
        bufLen += varDataTLen(pData);
×
1512
      } else {
1513
        pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
×
1514
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
×
1515
        bufLen += pTable->keyCols[i].bytes;
×
1516
      }
1517
    }
1518
    pTable->keyData = pTable->keyBuf;
×
1519
  }
1520

1521
  if (pBufLen) {
×
1522
    *pBufLen = bufLen;
×
1523
  }
1524

1525
  return false;
×
1526
}
1527

1528
static int32_t mJoinGetAvailableGrpArray(SMJoinTableCtx* pTable, SArray** ppRes) {
×
1529
  do {
×
1530
    if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) {
×
1531
      *ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++);
×
1532
      if (NULL == *ppRes) {
×
1533
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1534
      }
1535
      taosArrayClear(*ppRes);
×
1536
      return TSDB_CODE_SUCCESS;
×
1537
    }
1538

1539
    SArray* pNew = taosArrayInit(4, sizeof(SMJoinRowPos));
×
1540
    if (NULL == pNew) {
×
1541
      return terrno;
×
1542
    }
1543
    if (NULL == taosArrayPush(pTable->pGrpArrays, &pNew)) {
×
1544
      return terrno;
×
1545
    }
1546
  } while (true);
1547

1548
  return TSDB_CODE_SUCCESS;
1549
}
1550

1551
static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
×
1552
  SMJoinTableCtx* pBuild = pJoin->build;
×
1553
  SMJoinRowPos pos = {pBlock, rowIdx};
×
1554
  SArray** pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
×
1555
  if (!pGrpRows) {
×
1556
    SArray* pNewGrp = NULL;
×
1557
    MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp));
×
1558

1559
    if (NULL == taosArrayPush(pNewGrp, &pos)) {
×
1560
      return terrno;
×
1561
    }
1562
    MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES));
×
1563
  } else if (pBuild->multiRowsGrp) {
×
1564
    if (NULL == taosArrayPush(*pGrpRows, &pos)) {
×
1565
      return terrno;
×
1566
    }
1567
  }
1568

1569
  return TSDB_CODE_SUCCESS;
×
1570
}
1571

1572

1573
static int32_t mJoinAddRowToFullHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
×
1574
  SMJoinTableCtx* pBuild = pJoin->build;
×
1575
  SMJoinRowPos pos = {pBlock, rowIdx};
×
1576
  SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
×
1577
  if (!pGrpRows) {
×
1578
    SMJoinHashGrpRows pNewGrp = {0};
×
1579
    MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp.pRows));
×
1580

1581
    if (NULL == taosArrayPush(pNewGrp.pRows, &pos)) {
×
1582
      return terrno;
×
1583
    }
1584
    MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp)));
×
1585
  } else {
1586
    if (NULL == taosArrayPush(pGrpRows->pRows, &pos)) {
×
1587
      return terrno;
×
1588
    }
1589
  }
1590

1591
  return TSDB_CODE_SUCCESS;
×
1592
}
1593

1594
int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
×
1595
  size_t bufLen = 0;
×
1596

1597
  tSimpleHashClear(pJoin->build->pGrpHash);
×
1598
  pJoin->build->grpArrayIdx = 0;
×
1599

1600
  pJoin->build->grpRowIdx = -1;
×
1601
  
1602
  int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
×
1603
  for (int32_t g = 0; g < grpNum; ++g) {
×
1604
    SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
×
1605
    if (NULL == pGrp) {
×
1606
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1607
    }
1608
    MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
×
1609

1610
    int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
×
1611
    for (int32_t r = 0; r < grpRows; ++r) {
×
1612
      if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
×
1613
        *(int16_t *)pTable->keyBuf = 0;
×
1614
        pTable->keyData = pTable->keyBuf;
×
1615
        bufLen = pTable->keyNullSize;
×
1616
      }
1617

1618
      MJ_ERR_RET(mJoinAddRowToFullHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r));
×
1619
    }
1620
  }
1621

1622
  return TSDB_CODE_SUCCESS;
×
1623
}
1624

1625
int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
×
1626
  size_t bufLen = 0;
×
1627

1628
  tSimpleHashClear(pJoin->build->pGrpHash);
×
1629
  pJoin->build->grpArrayIdx = 0;
×
1630

1631
  pJoin->build->grpRowIdx = -1;
×
1632
  
1633
  int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
×
1634
  for (int32_t g = 0; g < grpNum; ++g) {
×
1635
    SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
×
1636
    if (NULL == pGrp) {
×
1637
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1638
    }
1639
    
1640
    MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
×
1641

1642
    int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
×
1643
    for (int32_t r = 0; r < grpRows; ++r) {
×
1644
      if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
×
1645
        continue;
×
1646
      }
1647

1648
      MJ_ERR_RET(mJoinAddRowToHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r));
×
1649
    }
1650
  }
1651

1652
  return TSDB_CODE_SUCCESS;
×
1653
}
1654

1655
void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx) {
70,708✔
1656
  pCtx->blk = NULL;
70,708✔
1657
  pCtx->blkRowIdx = 0;
70,708✔
1658
  pCtx->newBlk = false;
70,708✔
1659

1660
  mJoinDestroyCreatedBlks(pCtx->createdBlks);
70,708✔
1661
  tSimpleHashClear(pCtx->pGrpHash);
70,708✔
1662
}
70,708✔
1663

1664

1665
void mJoinResetTableCtx(SMJoinTableCtx* pCtx) {
68,496✔
1666
  pCtx->dsInitDone = false;
68,496✔
1667
  pCtx->dsFetchDone = false;
68,496✔
1668
  pCtx->lastInGid = 0;
68,496✔
1669
  pCtx->remainInBlk = NULL;
68,496✔
1670

1671
  mJoinResetGroupTableCtx(pCtx);
68,496✔
1672
}
68,496✔
1673

1674
void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) {
34,248✔
1675
  pCtx->grpRemains = false;
34,248✔
1676
  pCtx->midRemains = false;
34,248✔
1677
  pCtx->lastEqGrp = false;
34,248✔
1678

1679
  pCtx->lastEqTs = INT64_MIN;
34,248✔
1680
  pCtx->hashJoin = false;
34,248✔
1681
}
34,248✔
1682

1683
void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache) {
1,910✔
1684
  pCache->outRowIdx = 0;
1,910✔
1685
  pCache->rowNum = 0;
1,910✔
1686
  pCache->grpIdx = 0;
1,910✔
1687

1688
  if (pCache->grpsQueue) {
1,910✔
1689
    TSWAP(pCache->grps, pCache->grpsQueue);
2✔
1690
  }
1691

1692
  int32_t grpNum = taosArrayGetSize(pCache->grps);
1,910✔
1693

1694
  for (int32_t i = 0; i < grpNum; ++i) {
3,188✔
1695
    SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
1,278✔
1696
    if (NULL == pGrp) {
1,278!
1697
      continue;
×
1698
    }
1699
    if (pGrp->blk != pCtx->cache.outBlk && pGrp->clonedBlk) {
1,278✔
1700
      (void)blockDataDestroy(pGrp->blk);
1,010✔
1701
    }
1702
  }
1703
  
1704
  taosArrayClear(pCache->grps);
1,910✔
1705
  
1706
  if (pCache->outBlk) {
1,910✔
1707
    blockDataCleanup(pCache->outBlk);
652✔
1708
  }
1709
}
1,910✔
1710

1711
void mJoinResetWindowCtx(SMJoinWindowCtx* pCtx) {
×
1712
  pCtx->grpRemains = false;
×
1713
  pCtx->lastEqGrp = false;
×
1714
  pCtx->lastProbeGrp = false;
×
1715
  pCtx->eqPostDone = false;
×
1716
  pCtx->lastTs = INT64_MIN;
×
1717
  
1718
  mWinJoinResetWindowCache(pCtx, &pCtx->cache);
×
1719
}
×
1720

1721
void mJoinResetCtx(SMJoinOperatorInfo* pJoin) {
34,248✔
1722
  if (pJoin->ctx.mergeCtxInUse) {
34,248!
1723
    mJoinResetMergeCtx(&pJoin->ctx.mergeCtx);
34,248✔
1724
  } else {
1725
    mJoinResetWindowCtx(&pJoin->ctx.windowCtx);
×
1726
  }
1727
}
34,248✔
1728

1729
void mJoinResetOperator(struct SOperatorInfo* pOperator) {
34,248✔
1730
  SMJoinOperatorInfo* pJoin = pOperator->info;
34,248✔
1731

1732
  mJoinResetTableCtx(pJoin->build);
34,248✔
1733
  mJoinResetTableCtx(pJoin->probe);
34,248✔
1734

1735
  mJoinResetCtx(pJoin);
34,248✔
1736

1737
  pOperator->status = OP_OPENED;
34,248✔
1738
}
34,248✔
1739

1740
int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
183,552✔
1741
  SMJoinOperatorInfo* pJoin = pOperator->info;
183,552✔
1742
  int32_t code = TSDB_CODE_SUCCESS;
183,552✔
1743
  if (pOperator->status == OP_EXEC_DONE) {
183,552✔
1744
    if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) {
90,224!
1745
      qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo));
55,976✔
1746
      return code;
55,976✔
1747
    } else {
1748
      mJoinResetOperator(pOperator);
34,248✔
1749
      qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo));
34,248✔
1750
    }
1751
  }
1752

1753
  int64_t st = 0;
127,584✔
1754
  if (pOperator->cost.openCost == 0) {
127,584✔
1755
    st = taosGetTimestampUs();
91,005✔
1756
  }
1757

1758
  SSDataBlock* pBlock = NULL;
127,591✔
1759
  while (true) {
1760
    pBlock = (*pJoin->joinFp)(pOperator);
127,841✔
1761
    if (NULL == pBlock) {
127,828!
1762
      if (pJoin->errCode) {
×
1763
        T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
×
1764
      }
1765
      break;
×
1766
    }
1767

1768
    pBlock->info.id.blockId = pJoin->outBlkId;
127,828✔
1769
    if (pJoin->pFinFilter != NULL) {
127,828✔
1770
      code = doFilter(pBlock, pJoin->pFinFilter, NULL);
24,470✔
1771
      if (code) {
24,470!
1772
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1773
        pJoin->errCode = code;
×
1774
        T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
×
1775
      }
1776
    }
1777
    
1778
    if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) {      
127,834✔
1779
      pBlock->info.dataLoad = 1;
127,584✔
1780
      break;
127,584✔
1781
    }
1782
  }
1783

1784
  if (pOperator->cost.openCost == 0) {
127,584✔
1785
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
91,011✔
1786
  }
1787

1788
  pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0;
127,587!
1789
  if (pBlock && pBlock->info.rows > 0) {
127,587!
1790
    *pResBlock = pBlock;
64,111✔
1791
  }
1792

1793
  return code;
127,587✔
1794
}
1795

1796
void destroyGrpArray(void* ppArray) {
×
1797
  SArray* pArray = *(SArray**)ppArray;
×
1798
  taosArrayDestroy(pArray);
×
1799
}
×
1800

1801
void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
203,384✔
1802
  if (NULL == pTable) {
203,384!
1803
    return;
×
1804
  }
1805
  mJoinDestroyCreatedBlks(pTable->createdBlks);
203,384✔
1806
  taosArrayDestroy(pTable->createdBlks);
203,383✔
1807
  tSimpleHashCleanup(pTable->pGrpHash);
203,385✔
1808

1809
  taosMemoryFree(pTable->primCol);
203,383!
1810
  taosMemoryFree(pTable->finCols);
203,389!
1811
  taosMemoryFree(pTable->keyCols);
203,388✔
1812
  taosMemoryFree(pTable->keyBuf);
203,388!
1813
  taosMemoryFree(pTable->pRowBitmap);
203,386✔
1814

1815
  taosArrayDestroy(pTable->eqGrps);
203,379✔
1816
  taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray);
203,389✔
1817
}
1818

1819
void destroyMergeJoinOperator(void* param) {
101,690✔
1820
  SMJoinOperatorInfo* pJoin = (SMJoinOperatorInfo*)param;
101,690✔
1821

1822
  mJoinDestroyCtx(pJoin);
101,690✔
1823

1824
  if (pJoin->pFPreFilter != NULL) {
101,693✔
1825
    filterFreeInfo(pJoin->pFPreFilter);
302✔
1826
    pJoin->pFPreFilter = NULL;
302✔
1827
  }
1828
  if (pJoin->pPreFilter != NULL) {
101,693✔
1829
    filterFreeInfo(pJoin->pPreFilter);
218✔
1830
    pJoin->pPreFilter = NULL;
218✔
1831
  }
1832
  if (pJoin->pFinFilter != NULL) {
101,693✔
1833
    filterFreeInfo(pJoin->pFinFilter);
33,025✔
1834
    pJoin->pFinFilter = NULL;
33,025✔
1835
  }
1836

1837
  destroyMergeJoinTableCtx(pJoin->probe);
101,693✔
1838
  destroyMergeJoinTableCtx(pJoin->build);
101,695✔
1839

1840
  taosMemoryFreeClear(pJoin);
101,694!
1841
}
101,695✔
1842

1843
int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
101,675✔
1844
  switch (pJoin->joinType) {
101,675!
1845
    case JOIN_TYPE_INNER: {
99,364✔
1846
      SNode* pCond = NULL;
99,364✔
1847
      if (pJoinNode->pFullOnCond != NULL) {
99,364✔
1848
        if (pJoinNode->node.pConditions != NULL) {
32,631!
1849
          MJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions));
×
1850
        }
1851
        pCond = pJoinNode->pFullOnCond;
32,631✔
1852
      } else if (pJoinNode->node.pConditions != NULL) {
66,733✔
1853
        pCond = pJoinNode->node.pConditions;
12✔
1854
      }
1855
      
1856
      MJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0));
99,364✔
1857
      break;
99,368✔
1858
    }
1859
    case JOIN_TYPE_LEFT:
2,313✔
1860
    case JOIN_TYPE_RIGHT:
1861
    case JOIN_TYPE_FULL:
1862
      if (pJoinNode->pFullOnCond != NULL) {
2,313✔
1863
        MJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pFPreFilter, 0));
302!
1864
      }
1865
      if (pJoinNode->pColOnCond != NULL) {
2,313✔
1866
        MJ_ERR_RET(filterInitFromNode(pJoinNode->pColOnCond, &pJoin->pPreFilter, 0));
218!
1867
      }
1868
      if (pJoinNode->node.pConditions != NULL) {
2,313✔
1869
        MJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0));
382!
1870
      }
1871
      break;
2,314✔
1872
    default:
×
1873
      break;
×
1874
  }
1875

1876
  return TSDB_CODE_SUCCESS;
101,680✔
1877
}
1878

1879
int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
101,692✔
1880
  switch (pJoin->joinType) {
101,692!
1881
    case JOIN_TYPE_INNER:
99,378✔
1882
      pJoin->joinFp = mInnerJoinDo;
99,378✔
1883
      break;
99,378✔
1884
    case JOIN_TYPE_LEFT:
2,183✔
1885
    case JOIN_TYPE_RIGHT: {
1886
      switch (pJoin->subType) {
2,183✔
1887
        case JOIN_STYPE_OUTER:          
547✔
1888
          pJoin->joinFp = mLeftJoinDo;
547✔
1889
          pJoin->grpResetFp = mLeftJoinGroupReset;
547✔
1890
          break;
547✔
1891
        case JOIN_STYPE_SEMI: 
330✔
1892
          pJoin->joinFp = mSemiJoinDo;
330✔
1893
          break;
330✔
1894
        case JOIN_STYPE_ANTI:
324✔
1895
          pJoin->joinFp = mAntiJoinDo;
324✔
1896
          break;
324✔
1897
        case JOIN_STYPE_WIN:
668✔
1898
          pJoin->joinFp = mWinJoinDo;
668✔
1899
          pJoin->grpResetFp = mWinJoinGroupReset;
668✔
1900
          break;
668✔
1901
        default:
314✔
1902
          break;
314✔
1903
      }
1904
      break;
2,183✔
1905
    }      
1906
    case JOIN_TYPE_FULL:
131✔
1907
      pJoin->joinFp = mFullJoinDo;
131✔
1908
      break;
131✔
1909
    default:
×
1910
      break;
×
1911
  }
1912

1913
  return TSDB_CODE_SUCCESS;
101,692✔
1914
}
1915

1916
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
101,669✔
1917
                                           SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1918
  QRY_PARAM_CHECK(pOptrInfo);
101,669!
1919

1920
  int32_t oldNum = numOfDownstream;
101,669✔
1921
  bool newDownstreams = false;
101,669✔
1922
  int32_t code = TSDB_CODE_SUCCESS;
101,669✔
1923
  SOperatorInfo* pOperator = NULL;
101,669✔
1924
  SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
101,669!
1925
  if (pInfo == NULL) {
101,671!
1926
    code = terrno;
×
1927
    goto _return;
×
1928
  }
1929

1930
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
101,671!
1931
  if (pOperator == NULL) {
101,687!
1932
    code = terrno;
×
1933
    goto _return;
×
1934
  }
1935

1936
  pInfo->pOperator = pOperator;
101,687✔
1937
  MJ_ERR_JRET(mJoinInitDownstreamInfo(pInfo, &pDownstream, &numOfDownstream, &newDownstreams));
101,687!
1938

1939
  setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
101,687✔
1940

1941
  mJoinSetBuildAndProbeTable(pInfo, pJoinNode);
101,686✔
1942

1943
  MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode));
101,687!
1944

1945
  MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams));
101,668!
1946
  MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams));
101,687!
1947

1948
  MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
101,684!
1949
  MJ_ERR_JRET(mJoinSetImplFp(pInfo));
101,693!
1950

1951
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
101,691✔
1952

1953
  MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
101,691!
1954

1955
  if (newDownstreams) {
101,695✔
1956
    taosMemoryFree(pDownstream);
11,073!
1957
    pOperator->numOfRealDownstream = 1;
11,073✔
1958
  } else {
1959
    pOperator->numOfRealDownstream = 2;
90,622✔
1960
  }
1961

1962
  *pOptrInfo = pOperator;
101,695✔
1963
  return code;
101,695✔
1964

1965
_return:
×
1966

1967
  if (pInfo != NULL) {
×
1968
    destroyMergeJoinOperator(pInfo);
×
1969
  }
1970
  destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum);
×
1971
  if (newDownstreams) {
×
1972
    taosMemoryFree(pDownstream);
×
1973
  }
1974
  pTaskInfo->code = code;
×
1975
  
1976
  return code;
×
1977
}
1978

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc