• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4956

09 Feb 2026 01:16AM UTC coverage: 66.85% (-0.03%) from 66.884%
#4956

push

travis-ci

web-flow
docs: add support for recording STMT to CSV files (#34276)

* docs: add support for recording STMT to CSV files

* docs: update version for STMT recording feature in CSV files

205696 of 307696 relevant lines covered (66.85%)

127754527.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.3
/source/libs/executor/src/mergejoinoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
// clang-format off
17
#include "executorInt.h"
18
#include "filter.h"
19
#include "function.h"
20
#include "operator.h"
21
#include "os.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttypes.h"
29
#include "functionMgt.h"
30
#include "mergejoin.h"
31
// clang-format on
32

33
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
2,845,674✔
34
  SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
2,845,674✔
35
  if (NULL == pCol) {
2,845,674✔
36
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
37
  }
38

39
  pGrp->beginIdx = pTable->blkRowIdx;
2,845,674✔
40
  pGrp->readIdx = pTable->blkRowIdx;
2,845,674✔
41

42
  pTable->blkRowIdx++;
2,845,674✔
43
  char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
2,845,674✔
44
  if (timestamp != *(int64_t*)pEndVal) {
2,845,674✔
45
    for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
2,359,982✔
46
      char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
2,359,982✔
47
      if (timestamp == *(int64_t*)pNextVal) {
2,359,982✔
48
        continue;
331,376✔
49
      }
50

51
      pGrp->endIdx = pTable->blkRowIdx - 1;
2,028,606✔
52
      return TSDB_CODE_SUCCESS;
2,028,606✔
53
    }
54
  }
55

56
  pGrp->endIdx = pTable->blk->info.rows - 1;
817,068✔
57
  pTable->blkRowIdx = pTable->blk->info.rows;
817,068✔
58

59
  if (wholeBlk) {
817,068✔
60
    *wholeBlk = true;
202,710✔
61
  }
62

63
  return TSDB_CODE_SUCCESS;
817,068✔
64
}
65

66
int32_t mJoinTrimKeepFirstRow(SSDataBlock* pBlock) {
170,312✔
67
  int32_t bmLen = BitmapLen(pBlock->info.rows);
170,312✔
68
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
170,312✔
69

70
  for (int32_t i = 0; i < numOfCols; ++i) {
918,868✔
71
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
748,556✔
72
    if (NULL == pDst) {
748,556✔
73
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
74
    }
75

76
    // it is a reserved column for scalar function, and no data in this column yet.
77
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
748,556✔
78
      continue;
×
79
    }
80

81
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
748,556✔
82
      pDst->varmeta.length = 0;
69,936✔
83

84
      if (!colDataIsNull_var(pDst, 0)) {
69,936✔
85
        char* p1 = colDataGetVarData(pDst, 0);
69,936✔
86
        // int32_t len = calcStrBytesByType(pDst->info.type, p1);
87
        //  if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
88
        //    len = getJsonValueLen(p1);
89
        //  } else if (IS_STR_DATA_BLOB(pDst->info.type)) {
90
        //    len = blobDataTLen(p1);
91
        //  } else {
92
        //    len = varDataTLen(p1);
93
        //  }
94
        pDst->varmeta.length = calcStrBytesByType(pDst->info.type, p1);
69,936✔
95
      }
96
    } else {
97
      bool isNull = colDataIsNull_f(pDst, 0);
678,620✔
98

99
      TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
678,620✔
100
      if (isNull) {
678,620✔
101
        colDataSetNull_f(pDst->nullbitmap, 0);
×
102
      }
103
    }
104
  }
105

106
  pBlock->info.rows = 1;
170,312✔
107

108
  return TSDB_CODE_SUCCESS;
170,312✔
109
}
110

111
int32_t mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
85,206✔
112
  //  int32_t totalRows = pBlock->info.rows;
113
  int32_t code = TSDB_CODE_SUCCESS;
85,206✔
114
  int32_t bmLen = BitmapLen(totalRows);
85,206✔
115
  char*   pBitmap = NULL;
85,206✔
116
  int32_t maxRows = 0;
85,206✔
117
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
85,206✔
118

119
  for (int32_t i = 0; i < numOfCols; ++i) {
527,824✔
120
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
442,618✔
121
    if (NULL == pDst) {
442,618✔
122
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
123
    }
124

125
    // it is a reserved column for scalar function, and no data in this column yet.
126
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
442,618✔
127
      continue;
×
128
    }
129

130
    int32_t numOfRows = 0;
442,618✔
131
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
450,158✔
132
      int32_t j = 0;
7,540✔
133
      pDst->varmeta.length = 0;
7,540✔
134

135
      while (j < totalRows) {
12,064✔
136
        if (pBoolList[j] == 0) {
12,064✔
137
          j += 1;
4,524✔
138
          continue;
4,524✔
139
        }
140

141
        if (colDataIsNull_var(pDst, j)) {
7,540✔
142
          colDataSetNull_var(pDst, numOfRows);
×
143
        } else {
144
          // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
145
          // copy it to p2
146
          char*   p1 = colDataGetVarData(pDst, j);
7,540✔
147
          int32_t len = calcStrBytesByType(pDst->info.type, p1);
7,540✔
148
          // if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
149
          //   len = getJsonValueLen(p1);
150
          // } else if (IS_STR_DATA_BLOB(pDst->info.type)) {
151
          //   len = blobDataTLen(p1);
152
          // } else {
153
          //   len = varDataTLen(p1);
154
          // }
155
          char* p2 = taosMemoryMalloc(len);
7,540✔
156
          if (NULL == p2) {
7,540✔
157
            MJ_ERR_RET(terrno);
×
158
          }
159
          TAOS_MEMCPY(p2, p1, len);
7,540✔
160
          code = colDataSetVal(pDst, numOfRows, p2, false);
7,540✔
161
          if (code) {
7,540✔
162
            taosMemoryFreeClear(p2);
×
163
            MJ_ERR_RET(terrno);
×
164
          }
165
          taosMemoryFree(p2);
7,540✔
166
        }
167
        numOfRows += 1;
7,540✔
168
        j += 1;
7,540✔
169
        break;
7,540✔
170
      }
171

172
      if (maxRows < numOfRows) {
7,540✔
173
        maxRows = numOfRows;
1,508✔
174
      }
175
    } else {
176
      if (pBitmap == NULL) {
435,078✔
177
        pBitmap = taosMemoryCalloc(1, bmLen);
85,206✔
178
        if (NULL == pBitmap) {
85,206✔
179
          MJ_ERR_RET(terrno);
×
180
        }
181
      }
182

183
      TAOS_MEMCPY(pBitmap, pDst->nullbitmap, bmLen);
435,078✔
184
      TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
435,078✔
185

186
      int32_t j = 0;
435,078✔
187

188
      switch (pDst->info.type) {
435,078✔
189
        case TSDB_DATA_TYPE_BIGINT:
124,422✔
190
        case TSDB_DATA_TYPE_UBIGINT:
191
        case TSDB_DATA_TYPE_DOUBLE:
192
        case TSDB_DATA_TYPE_TIMESTAMP:
193
          while (j < totalRows) {
187,004✔
194
            if (pBoolList[j] == 0) {
187,004✔
195
              j += 1;
62,582✔
196
              continue;
62,582✔
197
            }
198

199
            if (BMIsNull(pBitmap, j)) {
124,422✔
200
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
201
            } else {
202
              ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
124,422✔
203
            }
204
            numOfRows += 1;
124,422✔
205
            j += 1;
124,422✔
206
            break;
124,422✔
207
          }
208
          break;
124,422✔
209
        case TSDB_DATA_TYPE_FLOAT:
310,656✔
210
        case TSDB_DATA_TYPE_INT:
211
        case TSDB_DATA_TYPE_UINT:
212
          while (j < totalRows) {
472,758✔
213
            if (pBoolList[j] == 0) {
472,758✔
214
              j += 1;
162,102✔
215
              continue;
162,102✔
216
            }
217
            if (BMIsNull(pBitmap, j)) {
310,656✔
218
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
219
            } else {
220
              ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
310,656✔
221
            }
222
            numOfRows += 1;
310,656✔
223
            j += 1;
310,656✔
224
            break;
310,656✔
225
          }
226
          break;
310,656✔
227
        case TSDB_DATA_TYPE_SMALLINT:
×
228
        case TSDB_DATA_TYPE_USMALLINT:
229
          while (j < totalRows) {
×
230
            if (pBoolList[j] == 0) {
×
231
              j += 1;
×
232
              continue;
×
233
            }
234
            if (BMIsNull(pBitmap, j)) {
×
235
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
236
            } else {
237
              ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
×
238
            }
239
            numOfRows += 1;
×
240
            j += 1;
×
241
            break;
×
242
          }
243
          break;
×
244
        case TSDB_DATA_TYPE_BOOL:
×
245
        case TSDB_DATA_TYPE_TINYINT:
246
        case TSDB_DATA_TYPE_UTINYINT:
247
          while (j < totalRows) {
×
248
            if (pBoolList[j] == 0) {
×
249
              j += 1;
×
250
              continue;
×
251
            }
252
            if (BMIsNull(pBitmap, j)) {
×
253
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
254
            } else {
255
              ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
×
256
            }
257
            numOfRows += 1;
×
258
            j += 1;
×
259
            break;
×
260
          }
261
          break;
×
262
      }
263
    }
264

265
    if (maxRows < numOfRows) {
442,618✔
266
      maxRows = numOfRows;
83,698✔
267
    }
268
  }
269

270
  pBlock->info.rows = maxRows;
85,206✔
271
  if (pBitmap != NULL) {
85,206✔
272
    taosMemoryFree(pBitmap);
85,206✔
273
  }
274

275
  return TSDB_CODE_SUCCESS;
85,206✔
276
}
277

278
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build,
×
279
                                   int32_t startRowIdx) {
280
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
×
281
    return TSDB_CODE_SUCCESS;
×
282
  }
283

284
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
×
285
  SColumnInfoData*   p = NULL;
×
286

287
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
×
288
  if (code != TSDB_CODE_SUCCESS) {
×
289
    goto _err;
×
290
  }
291

292
  int32_t status = 0;
×
293
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
×
294
  if (code != TSDB_CODE_SUCCESS) {
×
295
    goto _err;
×
296
  }
297

298
  if (!build->pHashGrpRows->allRowsMatch &&
×
299
      (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED)) {
×
300
    if (status == FILTER_RESULT_ALL_QUALIFIED && taosArrayGetSize(build->pHashCurGrp) == pBlock->info.rows) {
×
301
      build->pHashGrpRows->allRowsMatch = true;
×
302
    } else {
303
      bool* pRes = (bool*)p->pData;
×
304
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
305
        if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + i)) ||
×
306
            MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) {
×
307
          continue;
×
308
        }
309

310
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i);
×
311
        build->pHashGrpRows->rowMatchNum++;
×
312
      }
313

314
      if (build->pHashGrpRows->rowMatchNum == taosArrayGetSize(build->pHashGrpRows->pRows)) {
×
315
        build->pHashGrpRows->allRowsMatch = true;
×
316
      }
317
    }
318
  }
319

320
  code = extractQualifiedTupleByFilterResult(pBlock, p, status);
×
321

322
_err:
×
323
  colDataDestroy(p);
×
324
  taosMemoryFree(p);
×
325

326
  return code;
×
327
}
328

329
int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build,
166,286✔
330
                               int32_t startGrpIdx, int32_t startRowIdx) {
331
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
166,286✔
332
    return TSDB_CODE_SUCCESS;
×
333
  }
334

335
  int32_t            code = TSDB_CODE_SUCCESS;
166,286✔
336
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
166,286✔
337
  SColumnInfoData*   p = NULL;
166,286✔
338

339
  code = filterSetDataFromSlotId(pFilterInfo, &param1);
166,286✔
340
  if (code != TSDB_CODE_SUCCESS) {
166,286✔
341
    goto _return;
×
342
  }
343

344
  int32_t status = 0;
166,286✔
345
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
166,286✔
346
  if (code != TSDB_CODE_SUCCESS) {
166,286✔
347
    goto _return;
×
348
  }
349

350
  int32_t rowNum = 0;
166,286✔
351
  bool*   pRes = (bool*)p->pData;
166,286✔
352
  int32_t grpNum = taosArrayGetSize(build->eqGrps);
166,286✔
353
  if (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED) {
166,286✔
354
    for (int32_t i = startGrpIdx; i < grpNum && rowNum < pBlock->info.rows; startRowIdx = 0, ++i) {
221,586✔
355
      SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, i);
110,793✔
356
      if (NULL == buildGrp) {
110,793✔
357
        MJ_ERR_JRET(terrno);
×
358
      }
359
      if (buildGrp->allRowsMatch) {
110,793✔
360
        rowNum += buildGrp->endIdx - startRowIdx + 1;
6,829✔
361
        continue;
6,829✔
362
      }
363

364
      if (status == FILTER_RESULT_ALL_QUALIFIED && startRowIdx == buildGrp->beginIdx &&
103,964✔
365
          ((pBlock->info.rows - rowNum) >= (buildGrp->endIdx - startRowIdx + 1))) {
80,945✔
366
        buildGrp->allRowsMatch = true;
80,945✔
367
        rowNum += buildGrp->endIdx - startRowIdx + 1;
80,945✔
368
        continue;
80,945✔
369
      }
370

371
      for (int32_t m = startRowIdx; m <= buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) {
77,395✔
372
        if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + rowNum)) ||
54,376✔
373
            MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) {
28,704✔
374
          continue;
33,619✔
375
        }
376

377
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx);
20,757✔
378
        buildGrp->rowMatchNum++;
20,757✔
379
      }
380

381
      if (buildGrp->rowMatchNum == (buildGrp->endIdx - buildGrp->beginIdx + 1)) {
23,019✔
382
        buildGrp->allRowsMatch = true;
5,278✔
383
      }
384
    }
385
  }
386

387
  code = extractQualifiedTupleByFilterResult(pBlock, p, status);
166,286✔
388

389
_return:
166,286✔
390
  colDataDestroy(p);
166,286✔
391
  taosMemoryFree(p);
166,286✔
392

393
  return code;
166,286✔
394
}
395

396
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
347,684✔
397
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
347,684✔
398
    return TSDB_CODE_SUCCESS;
×
399
  }
400

401
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
347,684✔
402
  SColumnInfoData*   p = NULL;
347,684✔
403

404
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
347,684✔
405
  if (code != TSDB_CODE_SUCCESS) {
347,684✔
406
    goto _return;
×
407
  }
408

409
  int32_t status = 0;
347,684✔
410
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
347,684✔
411
  if (code != TSDB_CODE_SUCCESS) {
347,684✔
412
    goto _return;
×
413
  }
414

415
  if (status == FILTER_RESULT_ALL_QUALIFIED) {
347,684✔
416
    pBlock->info.rows = 1;
170,312✔
417
    MJ_ERR_JRET(mJoinTrimKeepFirstRow(pBlock));
170,312✔
418
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
177,372✔
419
    pBlock->info.rows = 0;
92,166✔
420
  } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
85,206✔
421
    MJ_ERR_JRET(mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData));
85,206✔
422
  }
423

424
  code = TSDB_CODE_SUCCESS;
347,684✔
425

426
_return:
347,684✔
427

428
  colDataDestroy(p);
347,684✔
429
  taosMemoryFree(p);
347,684✔
430

431
  return code;
347,684✔
432
}
433

434
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
433,936✔
435
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
433,936✔
436
    return TSDB_CODE_SUCCESS;
×
437
  }
438

439
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
433,936✔
440
  SColumnInfoData*   p = NULL;
433,936✔
441

442
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
433,936✔
443
  if (code != TSDB_CODE_SUCCESS) {
433,936✔
444
    goto _err;
×
445
  }
446

447
  int32_t status = 0;
433,936✔
448
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
433,936✔
449
  if (code != TSDB_CODE_SUCCESS) {
433,936✔
450
    goto _err;
×
451
  }
452

453
  if (status == FILTER_RESULT_NONE_QUALIFIED) {
433,936✔
454
    pBlock->info.rows = 0;
138,895✔
455
  }
456

457
  code = TSDB_CODE_SUCCESS;
433,936✔
458

459
_err:
433,936✔
460

461
  colDataDestroy(p);
433,936✔
462
  taosMemoryFree(p);
433,936✔
463

464
  return code;
433,936✔
465
}
466

467
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
884,634✔
468
  SSDataBlock* pLess = *ppMid;
884,634✔
469
  SSDataBlock* pMore = *ppFin;
884,634✔
470

471
  /*
472
    if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
473
      pLess = (*ppMid);
474
      pMore = (*ppFin);
475
    } else {
476
      pLess = (*ppFin);
477
      pMore = (*ppMid);
478
    }
479
  */
480

481
  int32_t totalRows = pMore->info.rows + pLess->info.rows;
884,634✔
482
  if (totalRows <= pMore->info.capacity) {
884,634✔
483
    MJ_ERR_RET(blockDataMerge(pMore, pLess));
884,634✔
484
    blockDataCleanup(pLess);
884,634✔
485
    pCtx->midRemains = false;
884,634✔
486
  } else {
487
    int32_t copyRows = pMore->info.capacity - pMore->info.rows;
×
488
    if (copyRows > 0) {
×
489
      MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
×
490
      blockDataShrinkNRows(pLess, copyRows);
×
491
    }
492

493
    pCtx->midRemains = true;
×
494
  }
495

496
  /*
497
    if (pMore != (*ppFin)) {
498
      TSWAP(*ppMid, *ppFin);
499
    }
500
  */
501

502
  return TSDB_CODE_SUCCESS;
884,634✔
503
}
504

505
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
×
506
  TSWAP(pCtx->midBlk, pCtx->finBlk);
×
507

508
  pCtx->midRemains = false;
×
509

510
  return TSDB_CODE_SUCCESS;
×
511
}
512

513
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp,
1,543,750✔
514
                          bool probeGrp) {
515
  SMJoinTableCtx* probe = probeGrp ? pJoin->probe : pJoin->build;
1,543,750✔
516
  SMJoinTableCtx* build = probeGrp ? pJoin->build : pJoin->probe;
1,543,750✔
517
  int32_t         currRows = append ? pRes->info.rows : 0;
1,543,750✔
518
  int32_t         firstRows = GRP_REMAIN_ROWS(pGrp);
1,543,750✔
519

520
  for (int32_t c = 0; c < probe->finNum; ++c) {
4,643,298✔
521
    SMJoinColMap*    pFirstCol = probe->finCols + c;
3,099,548✔
522
    SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
3,099,548✔
523
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
3,099,548✔
524
    if (NULL == pInCol || NULL == pOutCol) {
3,099,548✔
525
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
526
    }
527

528
    MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows));
3,099,548✔
529
  }
530

531
  for (int32_t c = 0; c < build->finNum; ++c) {
4,158,000✔
532
    SMJoinColMap*    pSecondCol = build->finCols + c;
2,614,250✔
533
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
2,614,250✔
534
    if (NULL == pOutCol) {
2,614,250✔
535
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
536
    }
537

538
    colDataSetNItemsNull(pOutCol, currRows, firstRows);
2,614,250✔
539
  }
540

541
  pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
1,543,750✔
542
  return TSDB_CODE_SUCCESS;
1,543,750✔
543
}
544

545
int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp, bool singleProbeRow) {
1,196,475✔
546
  pCtx->lastEqGrp = false;
1,196,475✔
547
  pCtx->lastProbeGrp = probeGrp;
1,196,475✔
548

549
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
1,196,475✔
550
  if (rowsLeft <= 0) {
1,196,475✔
551
    pCtx->grpRemains = pGrp->readIdx <= pGrp->endIdx;
×
552
    return TSDB_CODE_SUCCESS;
×
553
  }
554

555
  if (probeGrp && singleProbeRow) {
1,196,475✔
556
    rowsLeft = 1;
52,370✔
557
  }
558

559
  if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) {
1,196,475✔
560
    MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
1,196,475✔
561
    pGrp->readIdx = pGrp->endIdx + 1;
1,196,475✔
562
    pCtx->grpRemains = false;
1,196,475✔
563
  } else {
564
    int32_t endIdx = pGrp->endIdx;
×
565
    pGrp->endIdx = pGrp->readIdx + rowsLeft - 1;
×
566
    MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
×
567
    pGrp->readIdx = pGrp->endIdx + 1;
×
568
    pGrp->endIdx = endIdx;
×
569
    pCtx->grpRemains = true;
×
570
  }
571

572
  return TSDB_CODE_SUCCESS;
1,196,475✔
573
}
574

575
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst,
2,147,483,647✔
576
                          SMJoinGrpRows* pSecond) {
577
  SMJoinTableCtx* probe = pJoin->probe;
2,147,483,647✔
578
  SMJoinTableCtx* build = pJoin->build;
2,147,483,647✔
579
  int32_t         currRows = append ? pRes->info.rows : 0;
2,147,483,647✔
580
  int32_t         firstRows = GRP_REMAIN_ROWS(pFirst);
2,147,483,647✔
581
  int32_t         secondRows = GRP_REMAIN_ROWS(pSecond);
2,147,483,647✔
582

583
  for (int32_t c = 0; c < probe->finNum; ++c) {
2,147,483,647✔
584
    SMJoinColMap*    pFirstCol = probe->finCols + c;
2,147,483,647✔
585
    SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
2,147,483,647✔
586
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
2,147,483,647✔
587
    if (NULL == pInCol || NULL == pOutCol) {
2,147,483,647✔
588
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
589
    }
590

591
    for (int32_t r = 0; r < firstRows; ++r) {
2,147,483,647✔
592
      if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
2,147,483,647✔
593
        colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
67,549,932✔
594
      } else {
595
        if (pRes->info.capacity < (pRes->info.rows + firstRows * secondRows)) {
2,147,483,647✔
596
          qError("capacity:%d not enough, rows:%" PRId64 ", firstRows:%d, secondRows:%d", pRes->info.capacity,
×
597
                 pRes->info.rows, firstRows, secondRows);
598
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
599
        }
600
        uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type))
2,147,483,647✔
601
                                   ? pOutCol->varmeta.length
602
                                   : ((currRows + r * secondRows) * pOutCol->info.bytes);
2,147,483,647✔
603
        if ((startOffset + 1 * pOutCol->info.bytes) > pRes->info.capacity * pOutCol->info.bytes) {
2,147,483,647✔
604
          qError("col buff not enough, startOffset:%d, bytes:%d, capacity:%d", startOffset, pOutCol->info.bytes,
×
605
                 pRes->info.capacity);
606
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
607
        }
608
        MJ_ERR_RET(colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r),
2,147,483,647✔
609
                                    secondRows, 1, true));
610
      }
611
    }
612
  }
613

614
  for (int32_t c = 0; c < build->finNum; ++c) {
2,147,483,647✔
615
    SMJoinColMap*    pSecondCol = build->finCols + c;
1,072,077,911✔
616
    SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
1,072,077,911✔
617
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
1,072,077,911✔
618
    if (NULL == pInCol || NULL == pOutCol) {
1,072,077,911✔
619
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
620
    }
621

622
    for (int32_t r = 0; r < firstRows; ++r) {
2,147,483,647✔
623
      MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows));
1,184,262,078✔
624
    }
625
  }
626

627
  pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows;
2,147,483,647✔
628

629
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
630
}
631

632
int32_t mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe,
×
633
                         SMJoinTableCtx* build, bool* cont) {
634
  if (NULL != cont) {
×
635
    *cont = false;
×
636
  }
637

638
  int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
×
639
  if (rowsLeft <= 0) {
×
640
    return TSDB_CODE_SUCCESS;
×
641
  }
642

643
  int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
×
644
  int32_t grpRows = buildGrpRows - build->grpRowIdx;
×
645
  if (grpRows <= 0 || build->grpRowIdx < 0) {
×
646
    build->grpRowIdx = -1;
×
647
    if (NULL != cont) {
×
648
      *cont = true;
×
649
    }
650
    return TSDB_CODE_SUCCESS;
×
651
  }
652

653
  int32_t actRows = TMIN(grpRows, rowsLeft);
×
654
  int32_t currRows = append ? pBlk->info.rows : 0;
×
655

656
  for (int32_t c = 0; c < probe->finNum; ++c) {
×
657
    SMJoinColMap*    pFirstCol = probe->finCols + c;
×
658
    SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
×
659
    SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
×
660
    if (NULL == pInCol || NULL == pOutCol) {
×
661
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
662
    }
663

664
    if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
×
665
      colDataSetNItemsNull(pOutCol, currRows, actRows);
×
666
    } else {
667
      MJ_ERR_RET(colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, 1, true));
×
668
    }
669
  }
670

671
  for (int32_t c = 0; c < build->finNum; ++c) {
×
672
    SMJoinColMap*    pSecondCol = build->finCols + c;
×
673
    SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
×
674
    for (int32_t r = 0; r < actRows; ++r) {
×
675
      SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r);
×
676
      if (NULL == pRow) {
×
677
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
678
      }
679

680
      SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
×
681
      if (NULL == pInCol) {
×
682
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
683
      }
684

685
      MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1));
×
686
    }
687
  }
688

689
  pBlk->info.rows += actRows;
×
690

691
  if (actRows == grpRows) {
×
692
    build->grpRowIdx = -1;
×
693
  } else {
694
    build->grpRowIdx += actRows;
×
695
  }
696

697
  if (actRows == rowsLeft) {
×
698
    return TSDB_CODE_SUCCESS;
×
699
  }
700

701
  if (NULL != cont) {
×
702
    *cont = true;
×
703
  }
704

705
  return TSDB_CODE_SUCCESS;
×
706
}
707

708
int32_t mJoinAllocGrpRowBitmap(SMJoinTableCtx* pTb) {
126,597✔
709
  int32_t grpNum = taosArrayGetSize(pTb->eqGrps);
126,597✔
710
  for (int32_t i = 0; i < grpNum; ++i) {
253,194✔
711
    SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGet(pTb->eqGrps, i);
126,597✔
712
    if (NULL == pGrp) {
126,597✔
713
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
714
    }
715

716
    MJ_ERR_RET(mJoinGetRowBitmapOffset(pTb, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset));
126,597✔
717
    pGrp->rowMatchNum = 0;
126,597✔
718
  }
719

720
  return TSDB_CODE_SUCCESS;
126,597✔
721
}
722

723
int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
2,147,483,647✔
724
  SMJoinOperatorInfo* pJoin = pCtx->pJoin;
2,147,483,647✔
725

726
  pCtx->lastEqGrp = true;
2,147,483,647✔
727

728
  MJ_ERR_RET(mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true));
2,147,483,647✔
729
  if (!lastBuildGrp) {
2,147,483,647✔
730
    MJ_ERR_RET(mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp));
2,147,483,647✔
731
  } else {
732
    pJoin->build->grpIdx = 0;
377,404✔
733
  }
734

735
  if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) {
2,147,483,647✔
736
    if (!lastBuildGrp || !pCtx->hashJoin) {
×
737
      if (pJoin->build->rowBitmapSize > 0) {
×
738
        MJ_ERR_RET(mJoinCreateFullBuildTbHash(pJoin, pJoin->build));
×
739
      } else {
740
        MJ_ERR_RET(mJoinCreateBuildTbHash(pJoin, pJoin->build));
×
741
      }
742
    }
743

744
    if (pJoin->probe->newBlk) {
×
745
      MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe));
×
746
      pJoin->probe->newBlk = false;
×
747
    }
748

749
    pCtx->hashJoin = true;
×
750

751
    return (*pCtx->hashCartFp)(pCtx);
×
752
  }
753

754
  pCtx->hashJoin = false;
2,147,483,647✔
755

756
  if (!lastBuildGrp && pJoin->build->rowBitmapSize > 0) {
2,147,483,647✔
757
    MJ_ERR_RET(mJoinAllocGrpRowBitmap(pJoin->build));
126,597✔
758
  }
759

760
  return (*pCtx->mergeCartFp)(pCtx);
2,147,483,647✔
761
}
762

763
int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs,
606,344✔
764
                             int64_t* buildTs) {
765
  pCtx->probeNEqGrp.blk = pTb->blk;
606,344✔
766
  pCtx->probeNEqGrp.beginIdx = pTb->blkRowIdx;
606,344✔
767
  pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
606,344✔
768
  pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx;
606,344✔
769

770
  while (++pTb->blkRowIdx < pTb->blk->info.rows) {
961,676✔
771
    MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb);
804,268✔
772
    if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) {
804,268✔
773
      pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx;
355,332✔
774
      continue;
355,332✔
775
    }
776

777
    break;
448,936✔
778
  }
779

780
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
606,344✔
781
}
782

783
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs,
94,053✔
784
                               int64_t* buildTs) {
785
  pCtx->buildNEqGrp.blk = pTb->blk;
94,053✔
786
  pCtx->buildNEqGrp.beginIdx = pTb->blkRowIdx;
94,053✔
787
  pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
94,053✔
788
  pCtx->buildNEqGrp.endIdx = pCtx->buildNEqGrp.beginIdx;
94,053✔
789

790
  while (++pTb->blkRowIdx < pTb->blk->info.rows) {
100,111✔
791
    MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb);
97,841✔
792
    if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
97,841✔
793
      pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx;
6,058✔
794
      continue;
6,058✔
795
    }
796

797
    break;
91,783✔
798
  }
799

800
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false);
94,053✔
801
}
802

803
SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
1,159,928✔
804
  SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
1,159,928✔
805
  if (p) {
1,159,928✔
806
    p[0] = pDownstream[0];
1,159,928✔
807
    p[1] = pDownstream[0];
1,159,928✔
808
  }
809

810
  return p;
1,159,928✔
811
}
812

813
int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDownstream, int32_t* numOfDownstream,
12,238,787✔
814
                                bool* newDownstreams) {
815
  if (1 == *numOfDownstream) {
12,238,787✔
816
    *newDownstreams = true;
1,160,301✔
817
    *pDownstream = mJoinBuildDownstreams(pInfo, *pDownstream);
1,159,928✔
818
    if (NULL == *pDownstream) {
1,159,928✔
819
      return terrno;
×
820
    }
821
    *numOfDownstream = 2;
1,159,928✔
822
  }
823

824
  return TSDB_CODE_SUCCESS;
12,238,787✔
825
}
826

827
static int32_t mJoinInitPrimKeyInfo(SMJoinTableCtx* pTable, int32_t slotId) {
24,476,970✔
828
  pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo));
24,476,970✔
829
  if (NULL == pTable->primCol) {
24,474,192✔
830
    return terrno;
×
831
  }
832

833
  pTable->primCol->srcSlot = slotId;
24,475,517✔
834

835
  return TSDB_CODE_SUCCESS;
24,476,018✔
836
}
837

838
static int32_t mJoinInitColsInfo(int32_t* colNum, int64_t* rowSize, SMJoinColInfo** pCols, SNodeList* pList) {
24,476,970✔
839
  *colNum = LIST_LENGTH(pList);
24,476,970✔
840

841
  *pCols = taosMemoryMalloc((*colNum) * sizeof(SMJoinColInfo));
24,476,469✔
842
  if (NULL == *pCols) {
24,475,517✔
843
    return terrno;
×
844
  }
845

846
  *rowSize = 0;
24,475,517✔
847

848
  int32_t i = 0;
24,475,517✔
849
  SNode*  pNode = NULL;
24,475,517✔
850
  FOREACH(pNode, pList) {
24,684,420✔
851
    SColumnNode* pColNode = (SColumnNode*)pNode;
209,752✔
852
    (*pCols)[i].srcSlot = pColNode->slotId;
209,752✔
853
    (*pCols)[i].jsonData = TSDB_DATA_TYPE_JSON == pColNode->node.resType.type;
209,752✔
854
    (*pCols)[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
209,752✔
855
    (*pCols)[i].bytes = pColNode->node.resType.bytes;
209,752✔
856
    *rowSize += pColNode->node.resType.bytes;
209,752✔
857
    ++i;
208,903✔
858
  }
859

860
  return TSDB_CODE_SUCCESS;
24,475,041✔
861
}
862

863
static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bool allocKeyBuf) {
24,476,970✔
864
  int64_t rowSize = 0;
24,476,970✔
865
  MJ_ERR_RET(mJoinInitColsInfo(&pTable->keyNum, &rowSize, &pTable->keyCols, pList));
24,477,844✔
866

867
  if (pTable->keyNum > 1 || allocKeyBuf) {
24,475,041✔
868
    if (rowSize > 1) {
187,540✔
869
      pTable->keyNullSize = 1;
27,896✔
870
    } else {
871
      pTable->keyNullSize = 2;
159,644✔
872
    }
873

874
    pTable->keyBuf = taosMemoryMalloc(TMAX(rowSize, pTable->keyNullSize));
187,540✔
875
    if (NULL == pTable->keyBuf) {
187,540✔
876
      return terrno;
×
877
    }
878
  }
879

880
  return TSDB_CODE_SUCCESS;
24,475,041✔
881
}
882

883
static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
24,475,517✔
884
  pTable->finCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap));
24,475,517✔
885
  if (NULL == pTable->finCols) {
24,475,517✔
886
    return terrno;
×
887
  }
888

889
  int32_t i = 0;
24,475,993✔
890
  SNode*  pNode = NULL;
24,475,993✔
891
  FOREACH(pNode, pList) {
93,119,642✔
892
    STargetNode* pTarget = (STargetNode*)pNode;
68,643,070✔
893
    SColumnNode* pColumn = (SColumnNode*)pTarget->pExpr;
68,643,070✔
894
    if (pColumn->dataBlockId == pTable->blkId) {
68,643,070✔
895
      pTable->finCols[i].srcSlot = pColumn->slotId;
34,322,487✔
896
      pTable->finCols[i].dstSlot = pTarget->slotId;
34,320,638✔
897
      pTable->finCols[i].bytes = pColumn->node.resType.bytes;
34,320,162✔
898
      pTable->finCols[i].vardata = IS_VAR_DATA_TYPE(pColumn->node.resType.type);
34,321,139✔
899
      ++i;
34,322,139✔
900
    }
901
  }
902

903
  pTable->finNum = i;
24,477,844✔
904

905
  return TSDB_CODE_SUCCESS;
24,478,320✔
906
}
907

908
static int32_t mJoinInitFuncPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
427,398✔
909
  SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
427,398✔
910
  if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
427,398✔
911
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
912
  }
913

914
  if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) {
427,398✔
915
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
916
  }
917

918
  SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
427,398✔
919
  if (NULL == pUnit) {
427,398✔
920
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
921
  }
922
  SValueNode* pCurrTz = NULL;
427,398✔
923
  if (5 == pFunc->pParameterList->length) {
427,398✔
924
    pCurrTz = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
141,286✔
925
    if (NULL == pCurrTz) {
141,286✔
926
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
927
    }
928
  }
929
  SValueNode* pTimeZone = (5 == pFunc->pParameterList->length)
427,398✔
930
                              ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4)
141,286✔
931
                              : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
427,398✔
932
  if (NULL == pTimeZone) {
427,398✔
933
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
934
  }
935

936
  pCtx->truncateUnit = pUnit->typeData;
427,398✔
937
  if ((NULL == pCurrTz || 1 == pCurrTz->typeData) &&
850,272✔
938
      pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) {
422,874✔
939
    pCtx->timezoneUnit =
172,954✔
940
        offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
172,954✔
941
  }
942

943
  pCtx->type = E_PRIM_TIMETRUNCATE;
427,398✔
944

945
  return TSDB_CODE_SUCCESS;
427,398✔
946
}
947

948
static int32_t mJoinInitValPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
134,096✔
949
  SValueNode* pVal = (SValueNode*)pTarget->pExpr;
134,096✔
950
  if (TSDB_DATA_TYPE_TIMESTAMP != pVal->node.resType.type) {
134,096✔
951
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
952
  }
953

954
  pCtx->constTs = pVal->datum.i;
134,096✔
955
  pCtx->type = E_PRIM_VALUE;
134,096✔
956

957
  return TSDB_CODE_SUCCESS;
134,096✔
958
}
959

960
static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) {
24,474,590✔
961
  if (NULL == pNode) {
24,474,590✔
962
    pCtx->targetSlotId = pTable->primCol->srcSlot;
23,913,096✔
963
    return TSDB_CODE_SUCCESS;
23,915,025✔
964
  }
965

966
  if (QUERY_NODE_TARGET != nodeType(pNode)) {
561,494✔
967
    qError("primary expr node is not target, type:%d", nodeType(pNode));
×
968
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
969
  }
970

971
  STargetNode* pTarget = (STargetNode*)pNode;
561,494✔
972
  if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr) && QUERY_NODE_VALUE != nodeType(pTarget->pExpr)) {
561,494✔
973
    qError("Invalid primary expr node type:%d", nodeType(pTarget->pExpr));
×
974
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
975
  }
976

977
  if (QUERY_NODE_FUNCTION == nodeType(pTarget->pExpr)) {
561,494✔
978
    MJ_ERR_RET(mJoinInitFuncPrimExprCtx(pCtx, pTarget));
427,398✔
979
  } else if (QUERY_NODE_VALUE == nodeType(pTarget->pExpr)) {
134,096✔
980
    MJ_ERR_RET(mJoinInitValPrimExprCtx(pCtx, pTarget));
134,096✔
981
  }
982

983
  pCtx->targetSlotId = pTarget->slotId;
561,494✔
984

985
  return TSDB_CODE_SUCCESS;
561,494✔
986
}
987

988
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode,
24,475,645✔
989
                                  SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat, bool sameDs) {
990
  SMJoinTableCtx* pTable = &pJoin->tbs[idx];
24,475,645✔
991
  pTable->downStream = pDownstream[idx];
24,476,018✔
992
  pTable->blkId = getOperatorResultBlockId(pDownstream[idx], sameDs ? idx : 0);
24,476,867✔
993
  MJ_ERR_RET(mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
24,477,343✔
994

995
  MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight,
24,476,018✔
996
                                  JOIN_TYPE_FULL == pJoin->joinType));
997
  MJ_ERR_RET(mJoinInitFinColsInfo(pTable, pJoinNode->pTargets));
24,475,517✔
998

999
  TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat));
24,478,320✔
1000

1001
  pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
24,477,343✔
1002
  if (NULL == pTable->eqGrps) {
24,475,670✔
1003
    return terrno;
×
1004
  }
1005

1006
  if (E_JOIN_TB_BUILD == pTable->type) {
24,475,915✔
1007
    pTable->createdBlks = taosArrayInit(8, POINTER_BYTES);
12,238,208✔
1008
    if (NULL == pTable->createdBlks) {
12,236,835✔
1009
      return terrno;
×
1010
    }
1011
    pTable->pGrpArrays = taosArrayInit(32, POINTER_BYTES);
12,236,835✔
1012
    if (NULL == pTable->pGrpArrays) {
12,236,860✔
1013
      return terrno;
×
1014
    }
1015
    pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
12,236,835✔
1016
    if (NULL == pTable->pGrpHash) {
12,238,185✔
1017
      return terrno;
×
1018
    }
1019

1020
    if (pJoin->pFPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) {
12,237,812✔
1021
      pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE;
37,968✔
1022
      pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize);
37,968✔
1023
      if (NULL == pTable->pRowBitmap) {
37,968✔
1024
        return terrno;
×
1025
      }
1026
    }
1027

1028
    pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter);
12,237,732✔
1029
    pTable->multiEqGrpRows =
12,238,185✔
1030
        !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter);
12,238,185✔
1031
    pTable->multiRowsGrp =
12,237,359✔
1032
        !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter);
12,237,812✔
1033
    if (JOIN_STYPE_ASOF == pJoinNode->subType) {
12,237,359✔
1034
      pTable->eqRowLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit)
662,556✔
1035
                               ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i
61,372✔
1036
                               : 1;
361,964✔
1037
    }
1038
  } else {
1039
    pTable->multiEqGrpRows = true;
12,237,334✔
1040
  }
1041

1042
  MJ_ERR_RET(mJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
24,475,169✔
1043

1044
  return TSDB_CODE_SUCCESS;
24,475,645✔
1045
}
1046

1047
static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
12,238,787✔
1048
  int32_t buildIdx = 0;
12,238,787✔
1049
  int32_t probeIdx = 1;
12,238,787✔
1050

1051
  pInfo->joinType = pJoinNode->joinType;
12,238,787✔
1052
  pInfo->subType = pJoinNode->subType;
12,239,160✔
1053

1054
  switch (pInfo->joinType) {
12,238,787✔
1055
    case JOIN_TYPE_INNER:
10,823,116✔
1056
    case JOIN_TYPE_FULL:
1057
      buildIdx = 1;
10,823,116✔
1058
      probeIdx = 0;
10,823,116✔
1059
      break;
10,823,116✔
1060
    case JOIN_TYPE_LEFT:
865,531✔
1061
      buildIdx = 1;
865,531✔
1062
      probeIdx = 0;
865,531✔
1063
      break;
865,531✔
1064
    case JOIN_TYPE_RIGHT:
550,513✔
1065
      buildIdx = 0;
550,513✔
1066
      probeIdx = 1;
550,513✔
1067
      break;
550,513✔
1068
    default:
×
1069
      break;
×
1070
  }
1071

1072
  pInfo->build = &pInfo->tbs[buildIdx];
12,239,160✔
1073
  pInfo->probe = &pInfo->tbs[probeIdx];
12,238,787✔
1074

1075
  pInfo->build->downStreamIdx = buildIdx;
12,238,787✔
1076
  pInfo->probe->downStreamIdx = probeIdx;
12,239,160✔
1077

1078
  if (0 == buildIdx) {
12,238,787✔
1079
    pInfo->build->primExpr = pJoinNode->leftPrimExpr;
550,513✔
1080
    pInfo->probe->primExpr = pJoinNode->rightPrimExpr;
550,513✔
1081
  } else {
1082
    pInfo->build->primExpr = pJoinNode->rightPrimExpr;
11,688,274✔
1083
    pInfo->probe->primExpr = pJoinNode->leftPrimExpr;
11,688,647✔
1084
  }
1085

1086
  pInfo->build->type = E_JOIN_TB_BUILD;
12,238,787✔
1087
  pInfo->probe->type = E_JOIN_TB_PROBE;
12,239,160✔
1088
}
12,238,286✔
1089

1090
int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
52,490,653✔
1091
  if (NULL == pTable->primExpr) {
52,490,653✔
1092
    return TSDB_CODE_SUCCESS;
51,836,390✔
1093
  }
1094

1095
  SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
656,618✔
1096
  if (NULL == pPrimOut) {
656,618✔
1097
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1098
  }
1099

1100
  SMJoinPrimExprCtx* pCtx = &pTable->primCtx;
656,618✔
1101
  switch (pCtx->type) {
656,618✔
1102
    case E_PRIM_TIMETRUNCATE: {
533,469✔
1103
      SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
533,469✔
1104
      if (NULL == pPrimIn) {
533,469✔
1105
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1106
      }
1107

1108
      if (0 != pCtx->timezoneUnit) {
533,469✔
1109
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
138,465,522✔
1110
          ((int64_t*)pPrimOut->pData)[i] =
276,450,636✔
1111
              ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit;
276,450,636✔
1112
        }
1113
      } else {
1114
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
1,945,409✔
1115
          ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
1,652,144✔
1116
        }
1117
      }
1118
      break;
533,469✔
1119
    }
1120
    case E_PRIM_VALUE: {
123,149✔
1121
      MJ_ERR_RET(colDataSetNItems(pPrimOut, 0, (char*)&pCtx->constTs, pBlock->info.rows, 1, false));
123,149✔
1122
      break;
123,149✔
1123
    }
1124
    default:
×
1125
      break;
×
1126
  }
1127

1128
  return TSDB_CODE_SUCCESS;
656,618✔
1129
}
1130

1131
SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
2,916,220✔
1132
  SSDataBlock* pTmp = NULL;
2,916,220✔
1133
  int32_t      code = TSDB_CODE_SUCCESS;
2,916,220✔
1134
  int32_t      dsIdx = pTable->downStreamIdx;
2,916,220✔
1135
  if (E_JOIN_TB_PROBE == pTable->type) {
2,916,220✔
1136
    if (pTable->remainInBlk) {
1,382,852✔
1137
      pTmp = pTable->remainInBlk;
501,938✔
1138
      pTable->remainInBlk = NULL;
501,938✔
1139
      (*pJoin->grpResetFp)(pJoin);
501,938✔
1140
      pTable->lastInGid = pTmp->info.id.groupId;
501,938✔
1141
      goto _return;
501,938✔
1142
    }
1143

1144
    if (pTable->dsFetchDone) {
880,914✔
1145
      return NULL;
×
1146
    }
1147

1148
    pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
880,914✔
1149
    if (NULL == pTmp) {
880,914✔
1150
      pTable->dsFetchDone = true;
197,760✔
1151
      return NULL;
197,760✔
1152
    }
1153

1154
    if (0 == pTable->lastInGid) {
683,154✔
1155
      pTable->lastInGid = pTmp->info.id.groupId;
181,216✔
1156
      goto _return;
181,216✔
1157
    }
1158

1159
    if (pTable->lastInGid == pTmp->info.id.groupId) {
501,938✔
1160
      goto _return;
×
1161
    }
1162

1163
    pTable->remainInBlk = pTmp;
501,938✔
1164
    return NULL;
501,938✔
1165
  }
1166

1167
  SMJoinTableCtx* pProbe = pJoin->probe;
1,533,368✔
1168

1169
  while (true) {
640,280✔
1170
    if (pTable->remainInBlk) {
2,173,648✔
1171
      if (pTable->remainInBlk->info.id.groupId == pProbe->lastInGid) {
1,257,986✔
1172
        pTmp = pTable->remainInBlk;
603,004✔
1173
        pTable->remainInBlk = NULL;
603,004✔
1174
        pTable->lastInGid = pTmp->info.id.groupId;
603,004✔
1175
        goto _return;
603,004✔
1176
      }
1177

1178
      if (pTable->remainInBlk->info.id.groupId > pProbe->lastInGid) {
654,982✔
1179
        return NULL;
628,258✔
1180
      }
1181

1182
      pTable->remainInBlk = NULL;
26,724✔
1183
    }
1184

1185
    if (pTable->dsFetchDone) {
942,386✔
1186
      return NULL;
149,902✔
1187
    }
1188

1189
    SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
792,484✔
1190
    if (NULL == pTmp) {
792,484✔
1191
      pTable->dsFetchDone = true;
152,204✔
1192
      return NULL;
152,204✔
1193
    }
1194

1195
    pTable->remainInBlk = pTmp;
640,280✔
1196
  }
1197

1198
_return:
1,286,158✔
1199

1200
  code = mJoinLaunchPrimExpr(pTmp, pTable);
1,286,158✔
1201
  if (code) {
1,286,158✔
1202
    pJoin->errCode = code;
×
1203
    T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
×
1204
  }
1205

1206
  return pTmp;
1,286,158✔
1207
}
1208

1209
static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
67,526,245✔
1210
  if (pTable->dsFetchDone) {
67,526,245✔
1211
    return NULL;
293,315✔
1212
  }
1213

1214
  SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTable->downStreamIdx);
67,232,930✔
1215
  if (NULL == pTmp) {
67,221,906✔
1216
    pTable->dsFetchDone = true;
16,016,459✔
1217
  } else {
1218
    int32_t code = mJoinLaunchPrimExpr(pTmp, pTable);
51,205,447✔
1219
    if (code) {
51,205,898✔
1220
      pJoin->errCode = code;
×
1221
      T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
×
1222
    }
1223
  }
1224

1225
  return pTmp;
67,222,858✔
1226
}
1227

1228
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
12,237,812✔
1229
  pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin;
12,237,812✔
1230
  pJoin->ctx.mergeCtx.limit = (pJoinNode->node.pLimit && ((SLimitNode*)pJoinNode->node.pLimit)->limit)
25,293,068✔
1231
                                  ? ((SLimitNode*)pJoinNode->node.pLimit)->limit->datum.i
817,444✔
1232
                                  : INT64_MAX;
13,055,256✔
1233
  pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl;
12,236,860✔
1234
  pJoin->outBlkId = pJoinNode->node.pOutputDataBlockDesc->dataBlockId;
12,235,432✔
1235

1236
  if ((JOIN_STYPE_ASOF == pJoin->subType &&
12,238,684✔
1237
       (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) ||
300,592✔
1238
      (JOIN_STYPE_WIN == pJoin->subType)) {
12,029,590✔
1239
    pJoin->ctx.mergeCtxInUse = false;
521,522✔
1240
    return mJoinInitWindowCtx(pJoin, pJoinNode);
519,618✔
1241
  }
1242

1243
  pJoin->ctx.mergeCtxInUse = true;
11,716,290✔
1244
  return mJoinInitMergeCtx(pJoin, pJoinNode);
11,718,091✔
1245
}
1246

1247
static void mJoinDestroyCtx(SMJoinOperatorInfo* pJoin) {
12,239,160✔
1248
  if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) {
12,239,160✔
1249
    return mJoinDestroyWindowCtx(pJoin);
519,618✔
1250
  }
1251

1252
  return mJoinDestroyMergeCtx(pJoin);
11,719,542✔
1253
}
1254

1255
bool mJoinIsDone(SOperatorInfo* pOperator) { return (OP_EXEC_DONE == pOperator->status); }
182,414✔
1256

1257
void mJoinSetDone(SOperatorInfo* pOperator) {
14,601,553✔
1258
  setOperatorCompleted(pOperator);
14,601,553✔
1259
  if (pOperator->pDownstreamGetParams) {
14,601,553✔
1260
    freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM);
3,680,158✔
1261
    freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM);
3,680,158✔
1262
    pOperator->pDownstreamGetParams[0] = NULL;
3,680,158✔
1263
    pOperator->pDownstreamGetParams[1] = NULL;
3,680,158✔
1264
  }
1265
}
14,601,553✔
1266

1267
bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
84,820,163✔
1268
  if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) {
84,820,163✔
1269
    (*ppBlk) = (*pJoin->retrieveFp)(pJoin, pTb);
65,241,009✔
1270
    pTb->dsInitDone = true;
65,229,587✔
1271

1272
    qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo),
65,230,539✔
1273
           MJOIN_TBTYPE(pTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0);
1274

1275
    *pIdx = 0;
65,232,443✔
1276
    if (NULL != (*ppBlk)) {
65,231,015✔
1277
      pTb->newBlk = true;
49,315,776✔
1278
    }
1279

1280
    return ((*ppBlk) == NULL) ? false : true;
65,231,015✔
1281
  }
1282

1283
  return true;
19,580,028✔
1284
}
1285

1286
static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
2,147,483,647✔
1287
  int32_t blkNum = taosArrayGetSize(pCreatedBlks);
2,147,483,647✔
1288
  for (int32_t i = 0; i < blkNum; ++i) {
2,147,483,647✔
1289
    (void)blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i));
4,467,401✔
1290
  }
1291
  taosArrayClear(pCreatedBlks);
2,147,483,647✔
1292
}
2,147,483,647✔
1293

1294
int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t* rowBitmapOffset) {
126,597✔
1295
  int32_t bitmapLen = BitmapLen(rowNum);
126,597✔
1296
  int64_t reqSize = pTable->rowBitmapOffset + bitmapLen;
126,597✔
1297
  if (reqSize > pTable->rowBitmapSize) {
126,597✔
1298
    int64_t newSize = reqSize * 1.1;
×
1299
    pTable->pRowBitmap = taosMemoryRealloc(pTable->pRowBitmap, newSize);
×
1300
    if (NULL == pTable->pRowBitmap) {
×
1301
      return terrno;
×
1302
    }
1303
    pTable->rowBitmapSize = newSize;
×
1304
  }
1305

1306
  TAOS_MEMSET(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen);
126,597✔
1307

1308
  *rowBitmapOffset = pTable->rowBitmapOffset;
126,597✔
1309
  pTable->rowBitmapOffset += bitmapLen;
126,597✔
1310

1311
  return TSDB_CODE_SUCCESS;
126,597✔
1312
}
1313

1314
void mJoinResetForBuildTable(SMJoinTableCtx* pTable) {
2,147,483,647✔
1315
  pTable->grpTotalRows = 0;
2,147,483,647✔
1316
  pTable->grpIdx = 0;
2,147,483,647✔
1317
  pTable->eqRowNum = 0;
2,147,483,647✔
1318
  mJoinDestroyCreatedBlks(pTable->createdBlks);
2,147,483,647✔
1319
  taosArrayClear(pTable->eqGrps);
2,147,483,647✔
1320
  if (pTable->rowBitmapSize > 0) {
2,147,483,647✔
1321
    pTable->rowBitmapOffset = 1;
126,597✔
1322
    TAOS_MEMSET(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx));
126,597✔
1323
  }
1324
}
2,147,483,647✔
1325

1326
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
2,147,483,647✔
1327
  SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
2,147,483,647✔
1328
  if (NULL == pCol) {
2,147,483,647✔
1329
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1330
  }
1331

1332
  SMJoinGrpRows* pGrp = NULL;
2,147,483,647✔
1333
  int32_t        code = TSDB_CODE_SUCCESS;
2,147,483,647✔
1334

1335
  if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
2,147,483,647✔
1336
    return TSDB_CODE_SUCCESS;
2,733,079✔
1337
  }
1338

1339
  if (restart) {
2,147,483,647✔
1340
    mJoinResetForBuildTable(pTable);
2,147,483,647✔
1341
  }
1342

1343
  bool keepGrp = true;
2,147,483,647✔
1344
  pGrp = taosArrayReserve(pTable->eqGrps, 1);
2,147,483,647✔
1345
  if (NULL == pGrp) {
2,147,483,647✔
1346
    MJ_ERR_RET(terrno);
×
1347
  }
1348

1349
  pGrp->beginIdx = pTable->blkRowIdx++;
2,147,483,647✔
1350
  pGrp->readIdx = pGrp->beginIdx;
2,147,483,647✔
1351
  pGrp->endIdx = pGrp->beginIdx;
2,147,483,647✔
1352
  pGrp->readMatch = false;
2,147,483,647✔
1353
  pGrp->blk = pTable->blk;
2,147,483,647✔
1354

1355
  char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
2,147,483,647✔
1356
  if (timestamp == *(int64_t*)pEndVal) {
2,147,483,647✔
1357
    if (pTable->multiEqGrpRows) {
13,379,043✔
1358
      pGrp->endIdx = pTable->blk->info.rows - 1;
13,233,440✔
1359
    } else {
1360
      pGrp->endIdx = pGrp->beginIdx;
145,603✔
1361
    }
1362

1363
    pTable->blkRowIdx = pTable->blk->info.rows;
13,379,043✔
1364
  } else {
1365
    for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
2,147,483,647✔
1366
      char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
2,147,483,647✔
1367
      if (timestamp == *(int64_t*)pNextVal) {
2,147,483,647✔
1368
        pGrp->endIdx++;
848,694,460✔
1369
        continue;
848,730,994✔
1370
      }
1371

1372
      if (!pTable->multiEqGrpRows) {
2,147,483,647✔
1373
        pGrp->endIdx = pGrp->beginIdx;
966,812✔
1374
      } else if (0 == pTable->eqRowLimit) {
2,147,483,647✔
1375
        // DO NOTHING
1376
      } else if (pTable->eqRowLimit == pTable->eqRowNum) {
297,038✔
1377
        keepGrp = false;
×
1378
      } else {
1379
        int64_t rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
297,038✔
1380
        pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
297,038✔
1381
        pTable->eqRowNum += rowNum;
297,038✔
1382
      }
1383

1384
      goto _return;
2,147,483,647✔
1385
    }
1386
  }
1387

1388
  if (wholeBlk && (pTable->multiEqGrpRows || restart)) {
13,379,043✔
1389
    *wholeBlk = true;
4,536,054✔
1390

1391
    if (pTable->noKeepEqGrpRows || !keepGrp) {
4,536,054✔
1392
      goto _return;
68,653✔
1393
    }
1394

1395
    if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
4,467,401✔
1396
      pGrp->blk = NULL;
1,455,829✔
1397
      code = createOneDataBlock(pTable->blk, true, &pGrp->blk);
1,455,829✔
1398
      if (code) {
1,455,829✔
1399
        MJ_ERR_RET(code);
×
1400
      }
1401

1402
      if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
2,911,658✔
1403
        MJ_ERR_RET(terrno);
×
1404
      }
1405
    } else {
1406
      if (!pTable->multiEqGrpRows) {
3,011,572✔
1407
        pGrp->endIdx = pGrp->beginIdx;
76,950✔
1408
      }
1409

1410
      int64_t rowNum = 0;
3,011,572✔
1411
      if (!pTable->multiEqGrpRows) {
3,011,572✔
1412
        rowNum = 1;
76,950✔
1413
        pGrp->endIdx = pGrp->beginIdx;
76,950✔
1414
      } else if (0 == pTable->eqRowLimit) {
2,934,622✔
1415
        rowNum = pGrp->endIdx - pGrp->beginIdx + 1;
2,826,802✔
1416
      } else if (pTable->eqRowLimit == pTable->eqRowNum) {
107,820✔
1417
        keepGrp = false;
×
1418
      } else {
1419
        rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
107,820✔
1420
        pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
107,820✔
1421
      }
1422

1423
      if (keepGrp && rowNum > 0) {
3,011,572✔
1424
        pTable->eqRowNum += rowNum;
3,011,572✔
1425
        code = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum, &pGrp->blk);
3,011,572✔
1426
        if (code) {
3,011,572✔
1427
          MJ_ERR_RET(code);
×
1428
        }
1429

1430
        pGrp->endIdx -= pGrp->beginIdx;
3,011,572✔
1431
        pGrp->beginIdx = 0;
3,011,572✔
1432
        pGrp->readIdx = 0;
3,011,572✔
1433
        if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
6,022,763✔
1434
          MJ_ERR_RET(terrno);
×
1435
        }
1436
      }
1437
    }
1438
  }
1439

1440
_return:
11,854,180✔
1441

1442
  if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
2,147,483,647✔
1443
    if (NULL == taosArrayPop(pTable->eqGrps)) {
477,765✔
1444
      code = terrno;
×
1445
    }
1446
  } else {
1447
    pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
2,147,483,647✔
1448
  }
1449

1450
  return code;
2,147,483,647✔
1451
}
1452

1453
int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp) {
2,147,483,647✔
1454
  bool wholeBlk = false;
2,147,483,647✔
1455

1456
  MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true));
2,147,483,647✔
1457

1458
  while (wholeBlk && !pTable->dsFetchDone) {
2,147,483,647✔
1459
    pTable->blk = (*pJoin->retrieveFp)(pJoin, pTable);
4,536,054✔
1460
    qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pJoin->pOperator->pTaskInfo),
4,536,054✔
1461
           MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
1462

1463
    pTable->blkRowIdx = 0;
4,536,054✔
1464

1465
    if (NULL == pTable->blk) {
4,536,054✔
1466
      break;
1,358,321✔
1467
    }
1468

1469
    wholeBlk = false;
3,177,733✔
1470
    MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false));
3,177,733✔
1471
  }
1472

1473
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1474
}
1475

1476
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
×
1477
  for (int32_t i = 0; i < pTable->keyNum; ++i) {
×
1478
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
×
1479
    if (NULL == pCol) {
×
1480
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1481
    }
1482

1483
    if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
×
1484
      qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot,
×
1485
             pCol->info.type, pTable->keyCols[i].vardata);
1486
      return TSDB_CODE_INVALID_PARA;
×
1487
    }
1488
    if (pTable->keyCols[i].bytes != pCol->info.bytes) {
×
1489
      qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes,
×
1490
             pTable->keyCols[i].bytes);
1491
      return TSDB_CODE_INVALID_PARA;
×
1492
    }
1493
    pTable->keyCols[i].data = pCol->pData;
×
1494
    if (pTable->keyCols[i].vardata) {
×
1495
      pTable->keyCols[i].offset = pCol->varmeta.offset;
×
1496
    }
1497
    pTable->keyCols[i].colData = pCol;
×
1498
  }
1499

1500
  return TSDB_CODE_SUCCESS;
×
1501
}
1502

1503
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t* pBufLen) {
×
1504
  char*  pData = NULL;
×
1505
  size_t bufLen = 0;
×
1506

1507
  if (1 == pTable->keyNum) {
×
1508
    if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) {
×
1509
      return true;
×
1510
    }
1511
    if (pTable->keyCols[0].jsonData) {
×
1512
      pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
×
1513
      bufLen = getJsonValueLen(pData);
×
1514
    } else if (pTable->keyCols[0].vardata) {
×
1515
      pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
×
1516
      bufLen = varDataTLen(pData);
×
1517
    } else {
1518
      pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
×
1519
      bufLen = pTable->keyCols[0].bytes;
×
1520
    }
1521
    pTable->keyData = pData;
×
1522
  } else {
1523
    for (int32_t i = 0; i < pTable->keyNum; ++i) {
×
1524
      if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) {
×
1525
        return true;
×
1526
      }
1527
      if (pTable->keyCols[0].jsonData) {
×
1528
        pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
×
1529
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, getJsonValueLen(pData));
×
1530
        bufLen += getJsonValueLen(pData);
×
1531
      } else if (pTable->keyCols[i].vardata) {
×
1532
        pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
×
1533
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
×
1534
        bufLen += varDataTLen(pData);
×
1535
      } else {
1536
        pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
×
1537
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
×
1538
        bufLen += pTable->keyCols[i].bytes;
×
1539
      }
1540
    }
1541
    pTable->keyData = pTable->keyBuf;
×
1542
  }
1543

1544
  if (pBufLen) {
×
1545
    *pBufLen = bufLen;
×
1546
  }
1547

1548
  return false;
×
1549
}
1550

1551
static int32_t mJoinGetAvailableGrpArray(SMJoinTableCtx* pTable, SArray** ppRes) {
×
1552
  do {
×
1553
    if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) {
×
1554
      *ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++);
×
1555
      if (NULL == *ppRes) {
×
1556
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1557
      }
1558
      taosArrayClear(*ppRes);
×
1559
      return TSDB_CODE_SUCCESS;
×
1560
    }
1561

1562
    SArray* pNew = taosArrayInit(4, sizeof(SMJoinRowPos));
×
1563
    if (NULL == pNew) {
×
1564
      return terrno;
×
1565
    }
1566
    if (NULL == taosArrayPush(pTable->pGrpArrays, &pNew)) {
×
1567
      return terrno;
×
1568
    }
1569
  } while (true);
1570

1571
  return TSDB_CODE_SUCCESS;
1572
}
1573

1574
static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
×
1575
  SMJoinTableCtx* pBuild = pJoin->build;
×
1576
  SMJoinRowPos    pos = {pBlock, rowIdx};
×
1577
  SArray**        pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
×
1578
  if (!pGrpRows) {
×
1579
    SArray* pNewGrp = NULL;
×
1580
    MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp));
×
1581

1582
    if (NULL == taosArrayPush(pNewGrp, &pos)) {
×
1583
      return terrno;
×
1584
    }
1585
    MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES));
×
1586
  } else if (pBuild->multiRowsGrp) {
×
1587
    if (NULL == taosArrayPush(*pGrpRows, &pos)) {
×
1588
      return terrno;
×
1589
    }
1590
  }
1591

1592
  return TSDB_CODE_SUCCESS;
×
1593
}
1594

1595
static int32_t mJoinAddRowToFullHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
×
1596
  SMJoinTableCtx*    pBuild = pJoin->build;
×
1597
  SMJoinRowPos       pos = {pBlock, rowIdx};
×
1598
  SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
×
1599
  if (!pGrpRows) {
×
1600
    SMJoinHashGrpRows pNewGrp = {0};
×
1601
    MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp.pRows));
×
1602

1603
    if (NULL == taosArrayPush(pNewGrp.pRows, &pos)) {
×
1604
      return terrno;
×
1605
    }
1606
    MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp)));
×
1607
  } else {
1608
    if (NULL == taosArrayPush(pGrpRows->pRows, &pos)) {
×
1609
      return terrno;
×
1610
    }
1611
  }
1612

1613
  return TSDB_CODE_SUCCESS;
×
1614
}
1615

1616
int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
×
1617
  size_t bufLen = 0;
×
1618

1619
  tSimpleHashClear(pJoin->build->pGrpHash);
×
1620
  pJoin->build->grpArrayIdx = 0;
×
1621

1622
  pJoin->build->grpRowIdx = -1;
×
1623

1624
  int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
×
1625
  for (int32_t g = 0; g < grpNum; ++g) {
×
1626
    SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
×
1627
    if (NULL == pGrp) {
×
1628
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1629
    }
1630
    MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
×
1631

1632
    int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
×
1633
    for (int32_t r = 0; r < grpRows; ++r) {
×
1634
      if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
×
1635
        *(int16_t*)pTable->keyBuf = 0;
×
1636
        pTable->keyData = pTable->keyBuf;
×
1637
        bufLen = pTable->keyNullSize;
×
1638
      }
1639

1640
      MJ_ERR_RET(mJoinAddRowToFullHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r));
×
1641
    }
1642
  }
1643

1644
  return TSDB_CODE_SUCCESS;
×
1645
}
1646

1647
int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
×
1648
  size_t bufLen = 0;
×
1649

1650
  tSimpleHashClear(pJoin->build->pGrpHash);
×
1651
  pJoin->build->grpArrayIdx = 0;
×
1652

1653
  pJoin->build->grpRowIdx = -1;
×
1654

1655
  int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
×
1656
  for (int32_t g = 0; g < grpNum; ++g) {
×
1657
    SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
×
1658
    if (NULL == pGrp) {
×
1659
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1660
    }
1661

1662
    MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
×
1663

1664
    int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
×
1665
    for (int32_t r = 0; r < grpRows; ++r) {
×
1666
      if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
×
1667
        continue;
×
1668
      }
1669

1670
      MJ_ERR_RET(mJoinAddRowToHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r));
×
1671
    }
1672
  }
1673

1674
  return TSDB_CODE_SUCCESS;
×
1675
}
1676

1677
void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx) {
8,219,612✔
1678
  pCtx->blk = NULL;
8,219,612✔
1679
  pCtx->blkRowIdx = 0;
8,219,612✔
1680
  pCtx->newBlk = false;
8,219,612✔
1681

1682
  mJoinDestroyCreatedBlks(pCtx->createdBlks);
8,219,612✔
1683
  tSimpleHashClear(pCtx->pGrpHash);
8,219,612✔
1684
}
8,219,612✔
1685

1686
void mJoinResetTableCtx(SMJoinTableCtx* pCtx) {
7,215,736✔
1687
  pCtx->dsInitDone = false;
7,215,736✔
1688
  pCtx->dsFetchDone = false;
7,215,736✔
1689
  pCtx->lastInGid = 0;
7,215,736✔
1690
  pCtx->remainInBlk = NULL;
7,215,736✔
1691

1692
  mJoinResetGroupTableCtx(pCtx);
7,215,736✔
1693
}
7,215,736✔
1694

1695
void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) {
3,607,868✔
1696
  pCtx->grpRemains = false;
3,607,868✔
1697
  pCtx->midRemains = false;
3,607,868✔
1698
  pCtx->lastEqGrp = false;
3,607,868✔
1699

1700
  pCtx->lastEqTs = INT64_MIN;
3,607,868✔
1701
  pCtx->hashJoin = false;
3,607,868✔
1702
}
3,607,868✔
1703

1704
void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache) {
954,069✔
1705
  pCache->outRowIdx = 0;
954,069✔
1706
  pCache->rowNum = 0;
954,069✔
1707
  pCache->grpIdx = 0;
954,069✔
1708

1709
  if (pCache->grpsQueue) {
954,069✔
1710
    TSWAP(pCache->grps, pCache->grpsQueue);
1,506✔
1711
  }
1712

1713
  int32_t grpNum = taosArrayGetSize(pCache->grps);
954,069✔
1714

1715
  for (int32_t i = 0; i < grpNum; ++i) {
1,488,647✔
1716
    SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
534,197✔
1717
    if (NULL == pGrp) {
534,578✔
1718
      continue;
×
1719
    }
1720
    if (pGrp->blk != pCtx->cache.outBlk && pGrp->clonedBlk) {
534,578✔
1721
      (void)blockDataDestroy(pGrp->blk);
417,369✔
1722
    }
1723
  }
1724

1725
  taosArrayClear(pCache->grps);
954,450✔
1726

1727
  if (pCache->outBlk) {
954,450✔
1728
    blockDataCleanup(pCache->outBlk);
339,804✔
1729
  }
1730
}
954,450✔
1731

1732
void mJoinResetWindowCtx(SMJoinWindowCtx* pCtx) {
×
1733
  pCtx->grpRemains = false;
×
1734
  pCtx->lastEqGrp = false;
×
1735
  pCtx->lastProbeGrp = false;
×
1736
  pCtx->eqPostDone = false;
×
1737
  pCtx->lastTs = INT64_MIN;
×
1738

1739
  mWinJoinResetWindowCache(pCtx, &pCtx->cache);
×
1740
}
×
1741

1742
void mJoinResetCtx(SMJoinOperatorInfo* pJoin) {
3,607,868✔
1743
  if (pJoin->ctx.mergeCtxInUse) {
3,607,868✔
1744
    mJoinResetMergeCtx(&pJoin->ctx.mergeCtx);
3,607,868✔
1745
  } else {
1746
    mJoinResetWindowCtx(&pJoin->ctx.windowCtx);
×
1747
  }
1748
}
3,607,868✔
1749

1750
void mJoinResetOperator(struct SOperatorInfo* pOperator) {
3,607,868✔
1751
  SMJoinOperatorInfo* pJoin = pOperator->info;
3,607,868✔
1752

1753
  mJoinResetTableCtx(pJoin->build);
3,607,868✔
1754
  mJoinResetTableCtx(pJoin->probe);
3,607,868✔
1755

1756
  mJoinResetCtx(pJoin);
3,607,868✔
1757

1758
  pJoin->errCode = 0;
3,607,868✔
1759
  pJoin->execInfo = (SMJoinExecInfo){0};
3,607,868✔
1760

1761
  pOperator->status = OP_OPENED;
3,607,868✔
1762
}
3,607,868✔
1763

1764
int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
23,093,495✔
1765
  SMJoinOperatorInfo* pJoin = pOperator->info;
23,093,495✔
1766
  int32_t             code = TSDB_CODE_SUCCESS;
23,093,868✔
1767
  if (pOperator->status == OP_EXEC_DONE) {
23,093,868✔
1768
    if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] ||
10,930,644✔
1769
        NULL == pOperator->pDownstreamGetParams[1]) {
3,607,112✔
1770
      qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo));
7,323,532✔
1771
      return code;
7,323,532✔
1772
    } else {
1773
      mJoinResetOperator(pOperator);
3,607,112✔
1774
      qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo));
3,607,112✔
1775
    }
1776
  }
1777

1778
  int64_t st = 0;
15,770,336✔
1779
  if (pOperator->cost.openCost == 0) {
15,770,336✔
1780
    st = taosGetTimestampUs();
11,151,020✔
1781
  }
1782

1783
  SSDataBlock* pBlock = NULL;
15,770,336✔
1784
  while (true) {
1785
    pBlock = (*pJoin->joinFp)(pOperator);
15,873,230✔
1786
    if (NULL == pBlock) {
15,863,236✔
1787
      if (pJoin->errCode) {
12,624✔
1788
        T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
12,624✔
1789
      }
1790
      break;
×
1791
    }
1792

1793
    pBlock->info.id.blockId = pJoin->outBlkId;
15,850,612✔
1794
    if (pJoin->pFinFilter != NULL) {
15,850,612✔
1795
      code = doFilter(pBlock, pJoin->pFinFilter, NULL, NULL);
3,077,801✔
1796
      if (code) {
3,077,801✔
1797
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1798
        pJoin->errCode = code;
×
1799
        T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
×
1800
      }
1801
    }
1802

1803
    if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) {
15,850,612✔
1804
      pBlock->info.dataLoad = 1;
15,747,718✔
1805
      break;
15,747,718✔
1806
    }
1807
  }
1808

1809
  if (pOperator->cost.openCost == 0) {
15,747,718✔
1810
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
11,128,402✔
1811
  }
1812

1813
  pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0;
15,747,718✔
1814
  if (pBlock && pBlock->info.rows > 0) {
15,747,718✔
1815
    *pResBlock = pBlock;
9,008,878✔
1816
  }
1817

1818
  return code;
15,747,718✔
1819
}
1820

1821
void destroyGrpArray(void* ppArray) {
×
1822
  SArray* pArray = *(SArray**)ppArray;
×
1823
  taosArrayDestroy(pArray);
×
1824
}
×
1825

1826
void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
24,478,320✔
1827
  if (NULL == pTable) {
24,478,320✔
1828
    return;
×
1829
  }
1830
  mJoinDestroyCreatedBlks(pTable->createdBlks);
24,478,320✔
1831
  taosArrayDestroy(pTable->createdBlks);
24,478,320✔
1832
  tSimpleHashCleanup(pTable->pGrpHash);
24,478,320✔
1833

1834
  taosMemoryFree(pTable->primCol);
24,478,320✔
1835
  taosMemoryFree(pTable->finCols);
24,478,320✔
1836
  taosMemoryFree(pTable->keyCols);
24,478,320✔
1837
  taosMemoryFree(pTable->keyBuf);
24,478,320✔
1838
  taosMemoryFree(pTable->pRowBitmap);
24,478,320✔
1839

1840
  taosArrayDestroy(pTable->eqGrps);
24,478,320✔
1841
  taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray);
24,478,320✔
1842
}
1843

1844
void destroyMergeJoinOperator(void* param) {
12,239,160✔
1845
  SMJoinOperatorInfo* pJoin = (SMJoinOperatorInfo*)param;
12,239,160✔
1846

1847
  mJoinDestroyCtx(pJoin);
12,239,160✔
1848

1849
  if (pJoin->pFPreFilter != NULL) {
12,239,160✔
1850
    filterFreeInfo(pJoin->pFPreFilter);
385,488✔
1851
    pJoin->pFPreFilter = NULL;
385,488✔
1852
  }
1853
  if (pJoin->pPreFilter != NULL) {
12,239,160✔
1854
    filterFreeInfo(pJoin->pPreFilter);
350,812✔
1855
    pJoin->pPreFilter = NULL;
350,812✔
1856
  }
1857
  if (pJoin->pFinFilter != NULL) {
12,239,160✔
1858
    filterFreeInfo(pJoin->pFinFilter);
3,664,251✔
1859
    pJoin->pFinFilter = NULL;
3,664,251✔
1860
  }
1861

1862
  destroyMergeJoinTableCtx(pJoin->probe);
12,239,160✔
1863
  destroyMergeJoinTableCtx(pJoin->build);
12,239,160✔
1864

1865
  taosMemoryFreeClear(pJoin);
12,239,160✔
1866
}
12,239,160✔
1867

1868
int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
12,238,787✔
1869
  switch (pJoin->joinType) {
12,238,787✔
1870
    case JOIN_TYPE_INNER: {
10,736,886✔
1871
      SNode* pCond = NULL;
10,736,886✔
1872
      if (pJoinNode->pFullOnCond != NULL) {
10,736,886✔
1873
        if (pJoinNode->node.pConditions != NULL) {
3,381,071✔
1874
          MJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions));
1,880✔
1875
        }
1876
        pCond = pJoinNode->pFullOnCond;
3,381,071✔
1877
      } else if (pJoinNode->node.pConditions != NULL) {
7,355,815✔
1878
        pCond = pJoinNode->node.pConditions;
6,682✔
1879
      }
1880

1881
      MJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0, pTaskInfo->pStreamRuntimeInfo));
10,736,886✔
1882
      break;
10,736,886✔
1883
    }
1884
    case JOIN_TYPE_LEFT:
1,501,901✔
1885
    case JOIN_TYPE_RIGHT:
1886
    case JOIN_TYPE_FULL:
1887
      if (pJoinNode->pFullOnCond != NULL) {
1,501,901✔
1888
        MJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pFPreFilter, 0,
385,488✔
1889
                                      pTaskInfo->pStreamRuntimeInfo));
1890
      }
1891
      if (pJoinNode->pColOnCond != NULL) {
1,501,901✔
1892
        MJ_ERR_RET(
350,812✔
1893
            filterInitFromNode(pJoinNode->pColOnCond, &pJoin->pPreFilter, 0, pTaskInfo->pStreamRuntimeInfo));
1894
      }
1895
      if (pJoinNode->node.pConditions != NULL) {
1,502,274✔
1896
        MJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0,
276,498✔
1897
                                      pTaskInfo->pStreamRuntimeInfo));
1898
      }
1899
      break;
1,501,901✔
1900
    default:
×
1901
      break;
×
1902
  }
1903

1904
  return TSDB_CODE_SUCCESS;
12,238,787✔
1905
}
1906

1907
int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
12,239,160✔
1908
  switch (pJoin->joinType) {
12,239,160✔
1909
    case JOIN_TYPE_INNER:
10,736,886✔
1910
      pJoin->joinFp = mInnerJoinDo;
10,736,886✔
1911
      break;
10,736,886✔
1912
    case JOIN_TYPE_LEFT:
1,416,044✔
1913
    case JOIN_TYPE_RIGHT: {
1914
      switch (pJoin->subType) {
1,416,044✔
1915
        case JOIN_STYPE_OUTER:
440,864✔
1916
          pJoin->joinFp = mLeftJoinDo;
440,864✔
1917
          pJoin->grpResetFp = mLeftJoinGroupReset;
440,864✔
1918
          break;
440,864✔
1919
        case JOIN_STYPE_SEMI:
239,816✔
1920
          pJoin->joinFp = mSemiJoinDo;
239,816✔
1921
          break;
239,816✔
1922
        case JOIN_STYPE_ANTI:
215,746✔
1923
          pJoin->joinFp = mAntiJoinDo;
215,746✔
1924
          break;
215,746✔
1925
        case JOIN_STYPE_WIN:
311,000✔
1926
          pJoin->joinFp = mWinJoinDo;
311,000✔
1927
          pJoin->grpResetFp = mWinJoinGroupReset;
311,000✔
1928
          break;
311,000✔
1929
        default:
208,618✔
1930
          break;
208,618✔
1931
      }
1932
      break;
1,416,044✔
1933
    }
1934
    case JOIN_TYPE_FULL:
86,230✔
1935
      pJoin->joinFp = mFullJoinDo;
86,230✔
1936
      break;
86,230✔
1937
    default:
×
1938
      break;
×
1939
  }
1940

1941
  return TSDB_CODE_SUCCESS;
12,239,160✔
1942
}
1943

1944
static int32_t resetMergeJoinOperState(SOperatorInfo* pOper) {
756✔
1945
  mJoinResetOperator(pOper);
756✔
1946
  return 0;
756✔
1947
}
1948

1949
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
12,238,286✔
1950
                                    SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo,
1951
                                    SOperatorInfo** pOptrInfo) {
1952
  QRY_PARAM_CHECK(pOptrInfo);
12,238,286✔
1953

1954
  int32_t             oldNum = numOfDownstream;
12,238,286✔
1955
  bool                newDownstreams = false;
12,238,286✔
1956
  int32_t             code = TSDB_CODE_SUCCESS;
12,238,286✔
1957
  SOperatorInfo*      pOperator = NULL;
12,238,286✔
1958
  SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
12,238,286✔
1959
  if (pInfo == NULL) {
12,238,659✔
1960
    code = terrno;
×
1961
    goto _return;
×
1962
  }
1963

1964
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
12,238,659✔
1965
  if (pOperator == NULL) {
12,239,160✔
1966
    code = terrno;
×
1967
    goto _return;
×
1968
  }
1969

1970
  pInfo->pOperator = pOperator;
12,239,160✔
1971
  MJ_ERR_JRET(mJoinInitDownstreamInfo(pInfo, &pDownstream, &numOfDownstream, &newDownstreams));
12,238,787✔
1972

1973
  setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo,
12,238,787✔
1974
                  pTaskInfo);
1975

1976
  mJoinSetBuildAndProbeTable(pInfo, pJoinNode);
12,239,160✔
1977

1978
  MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode, pTaskInfo));
12,238,286✔
1979

1980
  MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams));
12,238,787✔
1981
  MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams));
12,237,309✔
1982

1983
  MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
12,237,835✔
1984
  MJ_ERR_JRET(mJoinSetImplFp(pInfo));
12,239,160✔
1985

1986
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator,
12,239,160✔
1987
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1988

1989
  setOperatorResetStateFn(pOperator, resetMergeJoinOperState);
12,239,160✔
1990
  MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
12,238,787✔
1991

1992
  if (newDownstreams) {
12,238,288✔
1993
    taosMemoryFree(pDownstream);
1,159,928✔
1994
    pOperator->numOfRealDownstream = 1;
1,159,928✔
1995
  } else {
1996
    pOperator->numOfRealDownstream = 2;
11,078,360✔
1997
  }
1998

1999
  *pOptrInfo = pOperator;
12,239,160✔
2000
  return code;
12,239,160✔
2001

2002
_return:
×
2003

2004
  if (pInfo != NULL) {
×
2005
    destroyMergeJoinOperator(pInfo);
×
2006
  }
2007
  destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum);
×
2008
  if (newDownstreams) {
×
2009
    taosMemoryFree(pDownstream);
×
2010
  }
2011
  pTaskInfo->code = code;
×
2012

2013
  return code;
×
2014
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc