• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5055

17 May 2026 01:15AM UTC coverage: 73.355% (-0.003%) from 73.358%
#5055

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281532 of 383795 relevant lines covered (73.35%)

135557734.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.52
/source/libs/executor/src/mergejoinoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
// clang-format off
17
#include "executorInt.h"
18
#include "filter.h"
19
#include "function.h"
20
#include "operator.h"
21
#include "os.h"
22
#include "querynodes.h"
23
#include "querytask.h"
24
#include "tcompare.h"
25
#include "tdatablock.h"
26
#include "thash.h"
27
#include "tmsg.h"
28
#include "ttypes.h"
29
#include "functionMgt.h"
30
#include "mergejoin.h"
31
// clang-format on
32

33
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
3,430,006✔
34
  SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
3,430,006✔
35
  if (NULL == pCol) {
3,430,006✔
36
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
37
  }
38

39
  pGrp->beginIdx = pTable->blkRowIdx;
3,430,006✔
40
  pGrp->readIdx = pTable->blkRowIdx;
3,430,006✔
41

42
  pTable->blkRowIdx++;
3,430,006✔
43
  char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
3,430,006✔
44
  if (timestamp != *(int64_t*)pEndVal) {
3,430,006✔
45
    for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
2,844,608✔
46
      char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
2,844,608✔
47
      if (timestamp == *(int64_t*)pNextVal) {
2,844,608✔
48
        continue;
399,495✔
49
      }
50

51
      pGrp->endIdx = pTable->blkRowIdx - 1;
2,445,113✔
52
      return TSDB_CODE_SUCCESS;
2,445,113✔
53
    }
54
  }
55

56
  pGrp->endIdx = pTable->blk->info.rows - 1;
984,893✔
57
  pTable->blkRowIdx = pTable->blk->info.rows;
984,893✔
58

59
  if (wholeBlk) {
984,893✔
60
    *wholeBlk = true;
244,347✔
61
  }
62

63
  return TSDB_CODE_SUCCESS;
984,893✔
64
}
65

66
int32_t mJoinTrimKeepFirstRow(SSDataBlock* pBlock) {
219,780✔
67
  int32_t bmLen = BitmapLen(pBlock->info.rows);
219,780✔
68
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
219,780✔
69

70
  for (int32_t i = 0; i < numOfCols; ++i) {
1,136,358✔
71
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
916,578✔
72
    if (NULL == pDst) {
916,578✔
73
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
74
    }
75

76
    // it is a reserved column for scalar function, and no data in this column yet.
77
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
916,578✔
78
      continue;
×
79
    }
80

81
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
916,578✔
82
      pDst->varmeta.length = 0;
84,258✔
83

84
      if (!colDataIsNull_var(pDst, 0)) {
84,258✔
85
        char* p1 = colDataGetVarData(pDst, 0);
84,258✔
86
        // int32_t len = calcStrBytesByType(pDst->info.type, p1);
87
        //  if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
88
        //    len = getJsonValueLen(p1);
89
        //  } else if (IS_STR_DATA_BLOB(pDst->info.type)) {
90
        //    len = blobDataTLen(p1);
91
        //  } else {
92
        //    len = varDataTLen(p1);
93
        //  }
94
        pDst->varmeta.length = calcStrBytesByType(pDst->info.type, p1);
84,258✔
95
      }
96
    } else {
97
      bool isNull = colDataIsNull_f(pDst, 0);
832,320✔
98

99
      TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
832,320✔
100
      if (isNull) {
832,320✔
101
        colDataSetNull_f(pDst->nullbitmap, 0);
×
102
      }
103
    }
104
  }
105

106
  pBlock->info.rows = 1;
219,780✔
107

108
  return TSDB_CODE_SUCCESS;
219,780✔
109
}
110

111
int32_t mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
102,714✔
112
  //  int32_t totalRows = pBlock->info.rows;
113
  int32_t code = TSDB_CODE_SUCCESS;
102,714✔
114
  int32_t bmLen = BitmapLen(totalRows);
102,714✔
115
  char*   pBitmap = NULL;
102,714✔
116
  int32_t maxRows = 0;
102,714✔
117
  size_t  numOfCols = taosArrayGetSize(pBlock->pDataBlock);
102,714✔
118

119
  for (int32_t i = 0; i < numOfCols; ++i) {
636,282✔
120
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
533,568✔
121
    if (NULL == pDst) {
533,568✔
122
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
123
    }
124

125
    // it is a reserved column for scalar function, and no data in this column yet.
126
    if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
533,568✔
127
      continue;
×
128
    }
129

130
    int32_t numOfRows = 0;
533,568✔
131
    if (IS_VAR_DATA_TYPE(pDst->info.type)) {
542,658✔
132
      int32_t j = 0;
9,090✔
133
      pDst->varmeta.length = 0;
9,090✔
134

135
      while (j < totalRows) {
14,544✔
136
        if (pBoolList[j] == 0) {
14,544✔
137
          j += 1;
5,454✔
138
          continue;
5,454✔
139
        }
140

141
        if (colDataIsNull_var(pDst, j)) {
9,090✔
142
          colDataSetNull_var(pDst, numOfRows);
×
143
        } else {
144
          // fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
145
          // copy it to p2
146
          char*   p1 = colDataGetVarData(pDst, j);
9,090✔
147
          int32_t len = calcStrBytesByType(pDst->info.type, p1);
9,090✔
148
          // if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
149
          //   len = getJsonValueLen(p1);
150
          // } else if (IS_STR_DATA_BLOB(pDst->info.type)) {
151
          //   len = blobDataTLen(p1);
152
          // } else {
153
          //   len = varDataTLen(p1);
154
          // }
155
          char* p2 = taosMemoryMalloc(len);
9,090✔
156
          if (NULL == p2) {
9,090✔
157
            MJ_ERR_RET(terrno);
×
158
          }
159
          TAOS_MEMCPY(p2, p1, len);
9,090✔
160
          code = colDataSetVal(pDst, numOfRows, p2, false);
9,090✔
161
          if (code) {
9,090✔
162
            taosMemoryFreeClear(p2);
×
163
            MJ_ERR_RET(terrno);
×
164
          }
165
          taosMemoryFree(p2);
9,090✔
166
        }
167
        numOfRows += 1;
9,090✔
168
        j += 1;
9,090✔
169
        break;
9,090✔
170
      }
171

172
      if (maxRows < numOfRows) {
9,090✔
173
        maxRows = numOfRows;
1,818✔
174
      }
175
    } else {
176
      if (pBitmap == NULL) {
524,478✔
177
        pBitmap = taosMemoryCalloc(1, bmLen);
102,714✔
178
        if (NULL == pBitmap) {
102,714✔
179
          MJ_ERR_RET(terrno);
×
180
        }
181
      }
182

183
      TAOS_MEMCPY(pBitmap, pDst->nullbitmap, bmLen);
524,478✔
184
      TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
524,478✔
185

186
      int32_t j = 0;
524,478✔
187

188
      switch (pDst->info.type) {
524,478✔
189
        case TSDB_DATA_TYPE_BIGINT:
149,976✔
190
        case TSDB_DATA_TYPE_UBIGINT:
191
        case TSDB_DATA_TYPE_DOUBLE:
192
        case TSDB_DATA_TYPE_TIMESTAMP:
193
          while (j < totalRows) {
224,972✔
194
            if (pBoolList[j] == 0) {
224,972✔
195
              j += 1;
74,996✔
196
              continue;
74,996✔
197
            }
198

199
            if (BMIsNull(pBitmap, j)) {
149,976✔
200
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
201
            } else {
202
              ((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
149,976✔
203
            }
204
            numOfRows += 1;
149,976✔
205
            j += 1;
149,976✔
206
            break;
149,976✔
207
          }
208
          break;
149,976✔
209
        case TSDB_DATA_TYPE_FLOAT:
374,502✔
210
        case TSDB_DATA_TYPE_INT:
211
        case TSDB_DATA_TYPE_UINT:
212
          while (j < totalRows) {
569,944✔
213
            if (pBoolList[j] == 0) {
569,944✔
214
              j += 1;
195,442✔
215
              continue;
195,442✔
216
            }
217
            if (BMIsNull(pBitmap, j)) {
374,502✔
218
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
219
            } else {
220
              ((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
374,502✔
221
            }
222
            numOfRows += 1;
374,502✔
223
            j += 1;
374,502✔
224
            break;
374,502✔
225
          }
226
          break;
374,502✔
227
        case TSDB_DATA_TYPE_SMALLINT:
×
228
        case TSDB_DATA_TYPE_USMALLINT:
229
          while (j < totalRows) {
×
230
            if (pBoolList[j] == 0) {
×
231
              j += 1;
×
232
              continue;
×
233
            }
234
            if (BMIsNull(pBitmap, j)) {
×
235
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
236
            } else {
237
              ((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
×
238
            }
239
            numOfRows += 1;
×
240
            j += 1;
×
241
            break;
×
242
          }
243
          break;
×
244
        case TSDB_DATA_TYPE_BOOL:
×
245
        case TSDB_DATA_TYPE_TINYINT:
246
        case TSDB_DATA_TYPE_UTINYINT:
247
          while (j < totalRows) {
×
248
            if (pBoolList[j] == 0) {
×
249
              j += 1;
×
250
              continue;
×
251
            }
252
            if (BMIsNull(pBitmap, j)) {
×
253
              colDataSetNull_f(pDst->nullbitmap, numOfRows);
×
254
            } else {
255
              ((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
×
256
            }
257
            numOfRows += 1;
×
258
            j += 1;
×
259
            break;
×
260
          }
261
          break;
×
262
      }
263
    }
264

265
    if (maxRows < numOfRows) {
533,568✔
266
      maxRows = numOfRows;
100,896✔
267
    }
268
  }
269

270
  pBlock->info.rows = maxRows;
102,714✔
271
  if (pBitmap != NULL) {
102,714✔
272
    taosMemoryFree(pBitmap);
102,714✔
273
  }
274

275
  return TSDB_CODE_SUCCESS;
102,714✔
276
}
277

278
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build,
×
279
                                   int32_t startRowIdx) {
280
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
×
281
    return TSDB_CODE_SUCCESS;
×
282
  }
283

284
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
×
285
  SColumnInfoData*   p = NULL;
×
286

287
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
×
288
  if (code != TSDB_CODE_SUCCESS) {
×
289
    goto _err;
×
290
  }
291

292
  int32_t status = 0;
×
293
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
×
294
  if (code != TSDB_CODE_SUCCESS) {
×
295
    goto _err;
×
296
  }
297

298
  if (!build->pHashGrpRows->allRowsMatch &&
×
299
      (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED)) {
×
300
    if (status == FILTER_RESULT_ALL_QUALIFIED && taosArrayGetSize(build->pHashCurGrp) == pBlock->info.rows) {
×
301
      build->pHashGrpRows->allRowsMatch = true;
×
302
    } else {
303
      bool* pRes = (bool*)p->pData;
×
304
      for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
305
        if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + i)) ||
×
306
            MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) {
×
307
          continue;
×
308
        }
309

310
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i);
×
311
        build->pHashGrpRows->rowMatchNum++;
×
312
      }
313

314
      if (build->pHashGrpRows->rowMatchNum == taosArrayGetSize(build->pHashGrpRows->pRows)) {
×
315
        build->pHashGrpRows->allRowsMatch = true;
×
316
      }
317
    }
318
  }
319

320
  code = extractQualifiedTupleByFilterResult(pBlock, p, status);
×
321

322
_err:
×
323
  colDataDestroy(p);
×
324
  taosMemoryFree(p);
×
325

326
  return code;
×
327
}
328

329
int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build,
214,863✔
330
                               int32_t startGrpIdx, int32_t startRowIdx) {
331
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
214,863✔
332
    return TSDB_CODE_SUCCESS;
×
333
  }
334

335
  int32_t            code = TSDB_CODE_SUCCESS;
214,863✔
336
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
214,863✔
337
  SColumnInfoData*   p = NULL;
214,863✔
338

339
  code = filterSetDataFromSlotId(pFilterInfo, &param1);
214,863✔
340
  if (code != TSDB_CODE_SUCCESS) {
214,863✔
341
    goto _return;
×
342
  }
343

344
  int32_t status = 0;
214,863✔
345
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
214,863✔
346
  if (code != TSDB_CODE_SUCCESS) {
214,863✔
347
    goto _return;
×
348
  }
349

350
  int32_t rowNum = 0;
214,863✔
351
  bool*   pRes = (bool*)p->pData;
214,863✔
352
  int32_t grpNum = taosArrayGetSize(build->eqGrps);
214,863✔
353
  if (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED) {
214,863✔
354
    for (int32_t i = startGrpIdx; i < grpNum && rowNum < pBlock->info.rows; startRowIdx = 0, ++i) {
281,424✔
355
      SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, i);
140,712✔
356
      if (NULL == buildGrp) {
140,712✔
357
        MJ_ERR_JRET(terrno);
×
358
      }
359
      if (buildGrp->allRowsMatch) {
140,712✔
360
        rowNum += buildGrp->endIdx - startRowIdx + 1;
11,871✔
361
        continue;
11,871✔
362
      }
363

364
      if (status == FILTER_RESULT_ALL_QUALIFIED && startRowIdx == buildGrp->beginIdx &&
128,841✔
365
          ((pBlock->info.rows - rowNum) >= (buildGrp->endIdx - startRowIdx + 1))) {
101,133✔
366
        buildGrp->allRowsMatch = true;
101,133✔
367
        rowNum += buildGrp->endIdx - startRowIdx + 1;
101,133✔
368
        continue;
101,133✔
369
      }
370

371
      for (int32_t m = startRowIdx; m <= buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) {
93,090✔
372
        if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + rowNum)) ||
65,382✔
373
            MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) {
34,503✔
374
          continue;
40,401✔
375
        }
376

377
        MJOIN_SET_ROW_BITMAP(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx);
24,981✔
378
        buildGrp->rowMatchNum++;
24,981✔
379
      }
380

381
      if (buildGrp->rowMatchNum == (buildGrp->endIdx - buildGrp->beginIdx + 1)) {
27,708✔
382
        buildGrp->allRowsMatch = true;
6,363✔
383
      }
384
    }
385
  }
386

387
  code = extractQualifiedTupleByFilterResult(pBlock, p, status);
214,863✔
388

389
_return:
214,863✔
390
  colDataDestroy(p);
214,863✔
391
  taosMemoryFree(p);
214,863✔
392

393
  return code;
214,863✔
394
}
395

396
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
448,134✔
397
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
448,134✔
398
    return TSDB_CODE_SUCCESS;
×
399
  }
400

401
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
448,134✔
402
  SColumnInfoData*   p = NULL;
448,134✔
403

404
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
448,134✔
405
  if (code != TSDB_CODE_SUCCESS) {
448,134✔
406
    goto _return;
×
407
  }
408

409
  int32_t status = 0;
448,134✔
410
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
448,134✔
411
  if (code != TSDB_CODE_SUCCESS) {
448,134✔
412
    goto _return;
×
413
  }
414

415
  if (status == FILTER_RESULT_ALL_QUALIFIED) {
448,134✔
416
    pBlock->info.rows = 1;
219,780✔
417
    MJ_ERR_JRET(mJoinTrimKeepFirstRow(pBlock));
219,780✔
418
  } else if (status == FILTER_RESULT_NONE_QUALIFIED) {
228,354✔
419
    pBlock->info.rows = 0;
125,640✔
420
  } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
102,714✔
421
    MJ_ERR_JRET(mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData));
102,714✔
422
  }
423

424
  code = TSDB_CODE_SUCCESS;
448,134✔
425

426
_return:
448,134✔
427

428
  colDataDestroy(p);
448,134✔
429
  taosMemoryFree(p);
448,134✔
430

431
  return code;
448,134✔
432
}
433

434
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
552,021✔
435
  if (pFilterInfo == NULL || pBlock->info.rows == 0) {
552,021✔
436
    return TSDB_CODE_SUCCESS;
×
437
  }
438

439
  SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
552,021✔
440
  SColumnInfoData*   p = NULL;
552,021✔
441

442
  int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
552,021✔
443
  if (code != TSDB_CODE_SUCCESS) {
552,021✔
444
    goto _err;
×
445
  }
446

447
  int32_t status = 0;
552,021✔
448
  code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
552,021✔
449
  if (code != TSDB_CODE_SUCCESS) {
552,021✔
450
    goto _err;
×
451
  }
452

453
  if (status == FILTER_RESULT_NONE_QUALIFIED) {
552,021✔
454
    pBlock->info.rows = 0;
181,929✔
455
  }
456

457
  code = TSDB_CODE_SUCCESS;
552,021✔
458

459
_err:
552,021✔
460

461
  colDataDestroy(p);
552,021✔
462
  taosMemoryFree(p);
552,021✔
463

464
  return code;
552,021✔
465
}
466

467
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
1,123,800✔
468
  SSDataBlock* pLess = *ppMid;
1,123,800✔
469
  SSDataBlock* pMore = *ppFin;
1,123,800✔
470

471
  /*
472
    if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
473
      pLess = (*ppMid);
474
      pMore = (*ppFin);
475
    } else {
476
      pLess = (*ppFin);
477
      pMore = (*ppMid);
478
    }
479
  */
480

481
  int32_t totalRows = pMore->info.rows + pLess->info.rows;
1,123,800✔
482
  if (totalRows <= pMore->info.capacity) {
1,123,800✔
483
    MJ_ERR_RET(blockDataMerge(pMore, pLess));
1,123,800✔
484
    blockDataCleanup(pLess);
1,123,800✔
485
    pCtx->midRemains = false;
1,123,800✔
486
  } else {
487
    int32_t copyRows = pMore->info.capacity - pMore->info.rows;
×
488
    if (copyRows > 0) {
×
489
      MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
×
490
      blockDataShrinkNRows(pLess, copyRows);
×
491
    }
492

493
    pCtx->midRemains = true;
×
494
  }
495

496
  /*
497
    if (pMore != (*ppFin)) {
498
      TSWAP(*ppMid, *ppFin);
499
    }
500
  */
501

502
  return TSDB_CODE_SUCCESS;
1,123,800✔
503
}
504

505
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
×
506
  TSWAP(pCtx->midBlk, pCtx->finBlk);
×
507

508
  pCtx->midRemains = false;
×
509

510
  return TSDB_CODE_SUCCESS;
×
511
}
512

513
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp,
1,921,114✔
514
                          bool probeGrp) {
515
  SMJoinTableCtx* probe = probeGrp ? pJoin->probe : pJoin->build;
1,921,114✔
516
  SMJoinTableCtx* build = probeGrp ? pJoin->build : pJoin->probe;
1,921,114✔
517
  int32_t         currRows = append ? pRes->info.rows : 0;
1,921,114✔
518
  int32_t         firstRows = GRP_REMAIN_ROWS(pGrp);
1,921,114✔
519

520
  for (int32_t c = 0; c < probe->finNum; ++c) {
5,699,814✔
521
    SMJoinColMap*    pFirstCol = probe->finCols + c;
3,778,700✔
522
    SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
3,778,700✔
523
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
3,778,700✔
524
    if (NULL == pInCol || NULL == pOutCol) {
3,778,700✔
525
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
526
    }
527

528
    MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows));
3,778,700✔
529
  }
530

531
  for (int32_t c = 0; c < build->finNum; ++c) {
5,132,731✔
532
    SMJoinColMap*    pSecondCol = build->finCols + c;
3,211,617✔
533
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
3,211,617✔
534
    if (NULL == pOutCol) {
3,211,617✔
535
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
536
    }
537

538
    colDataSetNItemsNull(pOutCol, currRows, firstRows);
3,211,617✔
539
  }
540

541
  pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
1,921,114✔
542
  return TSDB_CODE_SUCCESS;
1,921,114✔
543
}
544

545
int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp, bool singleProbeRow) {
1,444,117✔
546
  pCtx->lastEqGrp = false;
1,444,117✔
547
  pCtx->lastProbeGrp = probeGrp;
1,444,117✔
548

549
  int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
1,444,117✔
550
  if (rowsLeft <= 0) {
1,444,117✔
551
    pCtx->grpRemains = pGrp->readIdx <= pGrp->endIdx;
×
552
    return TSDB_CODE_SUCCESS;
×
553
  }
554

555
  if (probeGrp && singleProbeRow) {
1,444,117✔
556
    rowsLeft = 1;
63,126✔
557
  }
558

559
  if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) {
1,444,117✔
560
    MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
1,444,117✔
561
    pGrp->readIdx = pGrp->endIdx + 1;
1,444,117✔
562
    pCtx->grpRemains = false;
1,444,117✔
563
  } else {
564
    int32_t endIdx = pGrp->endIdx;
×
565
    pGrp->endIdx = pGrp->readIdx + rowsLeft - 1;
×
566
    MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
×
567
    pGrp->readIdx = pGrp->endIdx + 1;
×
568
    pGrp->endIdx = endIdx;
×
569
    pCtx->grpRemains = true;
×
570
  }
571

572
  return TSDB_CODE_SUCCESS;
1,444,117✔
573
}
574

575
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst,
2,147,483,647✔
576
                          SMJoinGrpRows* pSecond) {
577
  SMJoinTableCtx* probe = pJoin->probe;
2,147,483,647✔
578
  SMJoinTableCtx* build = pJoin->build;
2,147,483,647✔
579
  int32_t         currRows = append ? pRes->info.rows : 0;
2,147,483,647✔
580
  int32_t         firstRows = GRP_REMAIN_ROWS(pFirst);
2,147,483,647✔
581
  int32_t         secondRows = GRP_REMAIN_ROWS(pSecond);
2,147,483,647✔
582

583
  for (int32_t c = 0; c < probe->finNum; ++c) {
2,147,483,647✔
584
    SMJoinColMap*    pFirstCol = probe->finCols + c;
2,147,483,647✔
585
    SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
2,147,483,647✔
586
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
2,147,483,647✔
587
    if (NULL == pInCol || NULL == pOutCol) {
2,147,483,647✔
588
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
589
    }
590

591
    for (int32_t r = 0; r < firstRows; ++r) {
2,147,483,647✔
592
      if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
2,147,483,647✔
593
        colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
84,510,732✔
594
      } else {
595
        if (pRes->info.capacity < (pRes->info.rows + firstRows * secondRows)) {
2,147,483,647✔
596
          qError("capacity:%d not enough, rows:%" PRId64 ", firstRows:%d, secondRows:%d", pRes->info.capacity,
×
597
                 pRes->info.rows, firstRows, secondRows);
598
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
599
        }
600
        uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type))
2,147,483,647✔
601
                                   ? pOutCol->varmeta.length
602
                                   : ((currRows + r * secondRows) * pOutCol->info.bytes);
2,147,483,647✔
603
        if ((startOffset + 1 * pOutCol->info.bytes) > pRes->info.capacity * pOutCol->info.bytes) {
2,147,483,647✔
604
          qError("col buff not enough, startOffset:%d, bytes:%d, capacity:%d", startOffset, pOutCol->info.bytes,
×
605
                 pRes->info.capacity);
606
          MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
607
        }
608
        MJ_ERR_RET(colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r),
2,147,483,647✔
609
                                    secondRows, 1, true));
610
      }
611
    }
612
  }
613

614
  for (int32_t c = 0; c < build->finNum; ++c) {
2,147,483,647✔
615
    SMJoinColMap*    pSecondCol = build->finCols + c;
1,191,145,800✔
616
    SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
1,191,145,800✔
617
    SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
1,191,145,800✔
618
    if (NULL == pInCol || NULL == pOutCol) {
1,191,145,800✔
619
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
620
    }
621

622
    for (int32_t r = 0; r < firstRows; ++r) {
2,147,483,647✔
623
      MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows));
1,321,258,505✔
624
    }
625
  }
626

627
  pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows;
2,147,483,647✔
628

629
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
630
}
631

632
int32_t mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe,
×
633
                         SMJoinTableCtx* build, bool* cont) {
634
  if (NULL != cont) {
×
635
    *cont = false;
×
636
  }
637

638
  int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
×
639
  if (rowsLeft <= 0) {
×
640
    return TSDB_CODE_SUCCESS;
×
641
  }
642

643
  int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
×
644
  int32_t grpRows = buildGrpRows - build->grpRowIdx;
×
645
  if (grpRows <= 0 || build->grpRowIdx < 0) {
×
646
    build->grpRowIdx = -1;
×
647
    if (NULL != cont) {
×
648
      *cont = true;
×
649
    }
650
    return TSDB_CODE_SUCCESS;
×
651
  }
652

653
  int32_t actRows = TMIN(grpRows, rowsLeft);
×
654
  int32_t currRows = append ? pBlk->info.rows : 0;
×
655

656
  for (int32_t c = 0; c < probe->finNum; ++c) {
×
657
    SMJoinColMap*    pFirstCol = probe->finCols + c;
×
658
    SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
×
659
    SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
×
660
    if (NULL == pInCol || NULL == pOutCol) {
×
661
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
662
    }
663

664
    if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
×
665
      colDataSetNItemsNull(pOutCol, currRows, actRows);
×
666
    } else {
667
      MJ_ERR_RET(colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, 1, true));
×
668
    }
669
  }
670

671
  for (int32_t c = 0; c < build->finNum; ++c) {
×
672
    SMJoinColMap*    pSecondCol = build->finCols + c;
×
673
    SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
×
674
    for (int32_t r = 0; r < actRows; ++r) {
×
675
      SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r);
×
676
      if (NULL == pRow) {
×
677
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
678
      }
679

680
      SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
×
681
      if (NULL == pInCol) {
×
682
        MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
683
      }
684

685
      MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1));
×
686
    }
687
  }
688

689
  pBlk->info.rows += actRows;
×
690

691
  if (actRows == grpRows) {
×
692
    build->grpRowIdx = -1;
×
693
  } else {
694
    build->grpRowIdx += actRows;
×
695
  }
696

697
  if (actRows == rowsLeft) {
×
698
    return TSDB_CODE_SUCCESS;
×
699
  }
700

701
  if (NULL != cont) {
×
702
    *cont = true;
×
703
  }
704

705
  return TSDB_CODE_SUCCESS;
×
706
}
707

708
int32_t mJoinAllocGrpRowBitmap(SMJoinTableCtx* pTb) {
159,774✔
709
  int32_t grpNum = taosArrayGetSize(pTb->eqGrps);
159,774✔
710
  for (int32_t i = 0; i < grpNum; ++i) {
319,548✔
711
    SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGet(pTb->eqGrps, i);
159,774✔
712
    if (NULL == pGrp) {
159,774✔
713
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
714
    }
715

716
    MJ_ERR_RET(mJoinGetRowBitmapOffset(pTb, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset));
159,774✔
717
    pGrp->rowMatchNum = 0;
159,774✔
718
  }
719

720
  return TSDB_CODE_SUCCESS;
159,774✔
721
}
722

723
int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
2,147,483,647✔
724
  SMJoinOperatorInfo* pJoin = pCtx->pJoin;
2,147,483,647✔
725

726
  pCtx->lastEqGrp = true;
2,147,483,647✔
727

728
  MJ_ERR_RET(mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true));
2,147,483,647✔
729
  if (!lastBuildGrp) {
2,147,483,647✔
730
    MJ_ERR_RET(mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp));
2,147,483,647✔
731
  } else {
732
    pJoin->build->grpIdx = 0;
437,386✔
733
  }
734

735
  if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) {
2,147,483,647✔
736
    if (!lastBuildGrp || !pCtx->hashJoin) {
×
737
      if (pJoin->build->rowBitmapSize > 0) {
×
738
        MJ_ERR_RET(mJoinCreateFullBuildTbHash(pJoin, pJoin->build));
×
739
      } else {
740
        MJ_ERR_RET(mJoinCreateBuildTbHash(pJoin, pJoin->build));
×
741
      }
742
    }
743

744
    if (pJoin->probe->newBlk) {
×
745
      MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe));
×
746
      pJoin->probe->newBlk = false;
×
747
    }
748

749
    pCtx->hashJoin = true;
×
750

751
    return (*pCtx->hashCartFp)(pCtx);
×
752
  }
753

754
  pCtx->hashJoin = false;
2,147,483,647✔
755

756
  if (!lastBuildGrp && pJoin->build->rowBitmapSize > 0) {
2,147,483,647✔
757
    MJ_ERR_RET(mJoinAllocGrpRowBitmap(pJoin->build));
159,774✔
758
  }
759

760
  return (*pCtx->mergeCartFp)(pCtx);
2,147,483,647✔
761
}
762

763
int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs,
733,639✔
764
                             int64_t* buildTs) {
765
  pCtx->probeNEqGrp.blk = pTb->blk;
733,639✔
766
  pCtx->probeNEqGrp.beginIdx = pTb->blkRowIdx;
733,639✔
767
  pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
733,639✔
768
  pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx;
733,639✔
769

770
  while (++pTb->blkRowIdx < pTb->blk->info.rows) {
1,161,634✔
771
    MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb);
970,534✔
772
    if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) {
970,534✔
773
      pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx;
427,995✔
774
      continue;
427,995✔
775
    }
776

777
    break;
542,539✔
778
  }
779

780
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
733,639✔
781
}
782

783
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs,
113,289✔
784
                               int64_t* buildTs) {
785
  pCtx->buildNEqGrp.blk = pTb->blk;
113,289✔
786
  pCtx->buildNEqGrp.beginIdx = pTb->blkRowIdx;
113,289✔
787
  pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
113,289✔
788
  pCtx->buildNEqGrp.endIdx = pCtx->buildNEqGrp.beginIdx;
113,289✔
789

790
  while (++pTb->blkRowIdx < pTb->blk->info.rows) {
120,537✔
791
    MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb);
117,816✔
792
    if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
117,816✔
793
      pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx;
7,248✔
794
      continue;
7,248✔
795
    }
796

797
    break;
110,568✔
798
  }
799

800
  return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false);
113,289✔
801
}
802

803
SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
1,367,923✔
804
  SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
1,367,923✔
805
  if (p) {
1,367,923✔
806
    p[0] = pDownstream[0];
1,367,923✔
807
    p[1] = pDownstream[0];
1,367,923✔
808
  }
809

810
  return p;
1,367,923✔
811
}
812

813
int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDownstream, int32_t* numOfDownstream,
14,521,202✔
814
                                bool* newDownstreams) {
815
  if (1 == *numOfDownstream) {
14,521,202✔
816
    *newDownstreams = true;
1,367,923✔
817
    *pDownstream = mJoinBuildDownstreams(pInfo, *pDownstream);
1,367,923✔
818
    if (NULL == *pDownstream) {
1,367,923✔
819
      return terrno;
×
820
    }
821
    *numOfDownstream = 2;
1,367,923✔
822
  }
823

824
  return TSDB_CODE_SUCCESS;
14,521,202✔
825
}
826

827
static int32_t mJoinInitPrimKeyInfo(SMJoinTableCtx* pTable, int32_t slotId) {
29,039,678✔
828
  pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo));
29,039,678✔
829
  if (NULL == pTable->primCol) {
29,038,453✔
830
    return terrno;
×
831
  }
832

833
  pTable->primCol->srcSlot = slotId;
29,040,721✔
834

835
  return TSDB_CODE_SUCCESS;
29,039,575✔
836
}
837

838
static int32_t mJoinInitColsInfo(int32_t* colNum, int64_t* rowSize, SMJoinColInfo** pCols, SNodeList* pList) {
29,040,697✔
839
  *colNum = LIST_LENGTH(pList);
29,040,697✔
840

841
  *pCols = taosMemoryMalloc((*colNum) * sizeof(SMJoinColInfo));
29,041,258✔
842
  if (NULL == *pCols) {
29,041,282✔
843
    return terrno;
×
844
  }
845

846
  *rowSize = 0;
29,040,239✔
847

848
  int32_t i = 0;
29,040,239✔
849
  SNode*  pNode = NULL;
29,040,239✔
850
  FOREACH(pNode, pList) {
29,293,237✔
851
    SColumnNode* pColNode = (SColumnNode*)pNode;
252,358✔
852
    (*pCols)[i].srcSlot = pColNode->slotId;
252,358✔
853
    (*pCols)[i].jsonData = TSDB_DATA_TYPE_JSON == pColNode->node.resType.type;
252,358✔
854
    (*pCols)[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
252,358✔
855
    (*pCols)[i].bytes = pColNode->node.resType.bytes;
252,358✔
856
    *rowSize += pColNode->node.resType.bytes;
252,358✔
857
    ++i;
252,998✔
858
  }
859

860
  return TSDB_CODE_SUCCESS;
29,040,239✔
861
}
862

863
static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bool allocKeyBuf) {
29,040,800✔
864
  int64_t rowSize = 0;
29,040,800✔
865
  MJ_ERR_RET(mJoinInitColsInfo(&pTable->keyNum, &rowSize, &pTable->keyCols, pList));
29,041,385✔
866

867
  if (pTable->keyNum > 1 || allocKeyBuf) {
29,040,697✔
868
    if (rowSize > 1) {
230,580✔
869
      pTable->keyNullSize = 1;
33,630✔
870
    } else {
871
      pTable->keyNullSize = 2;
196,950✔
872
    }
873

874
    pTable->keyBuf = taosMemoryMalloc(TMAX(rowSize, pTable->keyNullSize));
230,580✔
875
    if (NULL == pTable->keyBuf) {
230,580✔
876
      return terrno;
×
877
    }
878
  }
879

880
  return TSDB_CODE_SUCCESS;
29,040,239✔
881
}
882

883
static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
29,040,239✔
884
  pTable->finCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap));
29,040,239✔
885
  if (NULL == pTable->finCols) {
29,039,225✔
886
    return terrno;
×
887
  }
888

889
  int32_t i = 0;
29,039,225✔
890
  SNode*  pNode = NULL;
29,039,225✔
891
  FOREACH(pNode, pList) {
109,077,611✔
892
    STargetNode* pTarget = (STargetNode*)pNode;
80,039,405✔
893
    SColumnNode* pColumn = (SColumnNode*)pTarget->pExpr;
80,039,405✔
894
    if (pColumn->dataBlockId == pTable->blkId) {
80,037,801✔
895
      pTable->finCols[i].srcSlot = pColumn->slotId;
40,019,422✔
896
      pTable->finCols[i].dstSlot = pTarget->slotId;
40,018,861✔
897
      pTable->finCols[i].bytes = pColumn->node.resType.bytes;
40,019,422✔
898
      pTable->finCols[i].vardata = IS_VAR_DATA_TYPE(pColumn->node.resType.type);
40,018,861✔
899
      ++i;
40,018,861✔
900
    }
901
  }
902

903
  pTable->finNum = i;
29,040,721✔
904

905
  return TSDB_CODE_SUCCESS;
29,040,721✔
906
}
907

908
static int32_t mJoinInitFuncPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
507,895✔
909
  SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
507,895✔
910
  if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
507,895✔
911
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
912
  }
913

914
  if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) {
507,895✔
915
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
916
  }
917

918
  SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
507,895✔
919
  if (NULL == pUnit) {
507,895✔
920
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
921
  }
922
  SValueNode* pCurrTz = NULL;
507,895✔
923
  if (5 == pFunc->pParameterList->length) {
507,895✔
924
    pCurrTz = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
163,931✔
925
    if (NULL == pCurrTz) {
163,931✔
926
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
927
    }
928
  }
929
  SValueNode* pTimeZone = (5 == pFunc->pParameterList->length)
507,895✔
930
                              ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4)
163,931✔
931
                              : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
507,895✔
932
  if (NULL == pTimeZone) {
507,895✔
933
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
934
  }
935

936
  pCtx->truncateUnit = pUnit->typeData;
507,895✔
937
  if ((NULL == pCurrTz || 1 == pCurrTz->typeData) &&
1,010,336✔
938
      pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) {
502,441✔
939
    pCtx->timezoneUnit =
202,109✔
940
        offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
202,109✔
941
  }
942

943
  qDebug("%s literal:%s, pCurrTz:%p", __func__, varDataVal(pTimeZone->datum.p), pCurrTz);
507,895✔
944

945
  pCtx->type = E_PRIM_TIMETRUNCATE;
507,895✔
946

947
  return TSDB_CODE_SUCCESS;
507,895✔
948
}
949

950
static int32_t mJoinInitValPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
160,392✔
951
  SValueNode* pVal = (SValueNode*)pTarget->pExpr;
160,392✔
952
  if (TSDB_DATA_TYPE_TIMESTAMP != pVal->node.resType.type) {
160,392✔
953
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
954
  }
955

956
  pCtx->constTs = pVal->datum.i;
160,392✔
957
  pCtx->type = E_PRIM_VALUE;
160,392✔
958

959
  return TSDB_CODE_SUCCESS;
160,392✔
960
}
961

962
static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) {
29,040,160✔
963
  if (NULL == pNode) {
29,040,160✔
964
    pCtx->targetSlotId = pTable->primCol->srcSlot;
28,371,873✔
965
    return TSDB_CODE_SUCCESS;
28,372,995✔
966
  }
967

968
  if (QUERY_NODE_TARGET != nodeType(pNode)) {
668,287✔
969
    qError("primary expr node is not target, type:%d", nodeType(pNode));
×
970
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
971
  }
972

973
  STargetNode* pTarget = (STargetNode*)pNode;
668,287✔
974
  if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr) && QUERY_NODE_VALUE != nodeType(pTarget->pExpr)) {
668,287✔
975
    qError("Invalid primary expr node type:%d", nodeType(pTarget->pExpr));
×
976
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
977
  }
978

979
  if (QUERY_NODE_FUNCTION == nodeType(pTarget->pExpr)) {
668,287✔
980
    MJ_ERR_RET(mJoinInitFuncPrimExprCtx(pCtx, pTarget));
507,895✔
981
  } else if (QUERY_NODE_VALUE == nodeType(pTarget->pExpr)) {
160,392✔
982
    MJ_ERR_RET(mJoinInitValPrimExprCtx(pCtx, pTarget));
160,392✔
983
  }
984

985
  pCtx->targetSlotId = pTarget->slotId;
668,287✔
986

987
  return TSDB_CODE_SUCCESS;
668,287✔
988
}
989

990
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode,
29,037,434✔
991
                                  SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat, bool sameDs) {
992
  SMJoinTableCtx* pTable = &pJoin->tbs[idx];
29,037,434✔
993
  pTable->downStream = pDownstream[idx];
29,038,556✔
994
  pTable->blkId = getOperatorResultBlockId(pDownstream[idx], sameDs ? idx : 0);
29,039,038✔
995
  MJ_ERR_RET(mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
29,040,721✔
996

997
  MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight,
29,039,014✔
998
                                  JOIN_TYPE_FULL == pJoin->joinType));
999
  MJ_ERR_RET(mJoinInitFinColsInfo(pTable, pJoinNode->pTargets));
29,041,385✔
1000

1001
  TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat));
29,041,258✔
1002

1003
  pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
29,039,575✔
1004
  if (NULL == pTable->eqGrps) {
29,040,160✔
1005
    return terrno;
×
1006
  }
1007

1008
  if (E_JOIN_TB_BUILD == pTable->type) {
29,040,160✔
1009
    pTable->createdBlks = taosArrayInit(8, POINTER_BYTES);
14,520,080✔
1010
    if (NULL == pTable->createdBlks) {
14,520,641✔
1011
      return terrno;
×
1012
    }
1013
    pTable->pGrpArrays = taosArrayInit(32, POINTER_BYTES);
14,520,080✔
1014
    if (NULL == pTable->pGrpArrays) {
14,520,080✔
1015
      return terrno;
×
1016
    }
1017
    pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
14,519,519✔
1018
    if (NULL == pTable->pGrpHash) {
14,521,202✔
1019
      return terrno;
×
1020
    }
1021

1022
    if (pJoin->pFPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) {
14,520,641✔
1023
      pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE;
48,114✔
1024
      pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize);
48,114✔
1025
      if (NULL == pTable->pRowBitmap) {
48,114✔
1026
        return terrno;
×
1027
      }
1028
    }
1029

1030
    pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter);
14,521,202✔
1031
    pTable->multiEqGrpRows =
14,519,495✔
1032
        !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter);
14,520,080✔
1033
    pTable->multiRowsGrp =
14,520,056✔
1034
        !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter);
14,520,056✔
1035
    if (JOIN_STYPE_ASOF == pJoinNode->subType) {
14,518,934✔
1036
      pTable->eqRowLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit)
798,549✔
1037
                               ? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i
73,965✔
1038
                               : 1;
436,257✔
1039
    }
1040
  } else {
1041
    pTable->multiEqGrpRows = true;
14,519,622✔
1042
  }
1043

1044
  MJ_ERR_RET(mJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
29,039,014✔
1045

1046
  return TSDB_CODE_SUCCESS;
29,041,282✔
1047
}
1048

1049
static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
14,520,641✔
1050
  int32_t buildIdx = 0;
14,520,641✔
1051
  int32_t probeIdx = 1;
14,520,641✔
1052

1053
  pInfo->joinType = pJoinNode->joinType;
14,520,641✔
1054
  pInfo->subType = pJoinNode->subType;
14,520,641✔
1055

1056
  switch (pInfo->joinType) {
14,520,056✔
1057
    case JOIN_TYPE_INNER:
12,782,529✔
1058
    case JOIN_TYPE_FULL:
1059
      buildIdx = 1;
12,782,529✔
1060
      probeIdx = 0;
12,782,529✔
1061
      break;
12,782,529✔
1062
    case JOIN_TYPE_LEFT:
1,067,699✔
1063
      buildIdx = 1;
1,067,699✔
1064
      probeIdx = 0;
1,067,699✔
1065
      break;
1,067,699✔
1066
    case JOIN_TYPE_RIGHT:
670,413✔
1067
      buildIdx = 0;
670,413✔
1068
      probeIdx = 1;
670,413✔
1069
      break;
670,413✔
1070
    default:
×
1071
      break;
×
1072
  }
1073

1074
  pInfo->build = &pInfo->tbs[buildIdx];
14,520,641✔
1075
  pInfo->probe = &pInfo->tbs[probeIdx];
14,521,202✔
1076

1077
  pInfo->build->downStreamIdx = buildIdx;
14,520,056✔
1078
  pInfo->probe->downStreamIdx = probeIdx;
14,520,641✔
1079

1080
  if (0 == buildIdx) {
14,519,598✔
1081
    pInfo->build->primExpr = pJoinNode->leftPrimExpr;
669,955✔
1082
    pInfo->probe->primExpr = pJoinNode->rightPrimExpr;
670,413✔
1083
  } else {
1084
    pInfo->build->primExpr = pJoinNode->rightPrimExpr;
13,849,643✔
1085
    pInfo->probe->primExpr = pJoinNode->leftPrimExpr;
13,850,789✔
1086
  }
1087

1088
  pInfo->build->type = E_JOIN_TB_BUILD;
14,520,056✔
1089
  pInfo->probe->type = E_JOIN_TB_PROBE;
14,519,598✔
1090
}
14,520,056✔
1091

1092
int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
62,114,899✔
1093
  if (NULL == pTable->primExpr) {
62,114,899✔
1094
    return TSDB_CODE_SUCCESS;
61,339,013✔
1095
  }
1096

1097
  SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
779,813✔
1098
  if (NULL == pPrimOut) {
779,813✔
1099
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1100
  }
1101

1102
  SMJoinPrimExprCtx* pCtx = &pTable->primCtx;
779,813✔
1103
  switch (pCtx->type) {
779,813✔
1104
    case E_PRIM_TIMETRUNCATE: {
632,591✔
1105
      SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
632,591✔
1106
      if (NULL == pPrimIn) {
632,591✔
1107
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1108
      }
1109

1110
      if (0 != pCtx->timezoneUnit) {
632,591✔
1111
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
160,357,937✔
1112
          ((int64_t*)pPrimOut->pData)[i] =
320,155,906✔
1113
              ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit;
320,155,906✔
1114
        }
1115
      } else {
1116
        for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2,341,099✔
1117
          ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
1,988,492✔
1118
        }
1119
      }
1120
      break;
632,591✔
1121
    }
1122
    case E_PRIM_VALUE: {
147,222✔
1123
      MJ_ERR_RET(colDataSetNItems(pPrimOut, 0, (char*)&pCtx->constTs, pBlock->info.rows, 1, false));
147,222✔
1124
      break;
147,222✔
1125
    }
1126
    default:
×
1127
      break;
×
1128
  }
1129

1130
  return TSDB_CODE_SUCCESS;
779,813✔
1131
}
1132

1133
static int32_t mJoinGetExternalWinIdx(SMJoinTableCtx* pTable, int32_t rowIdx) {
2,200✔
1134
  if (NULL == pTable->pBlkWinIdx || taosArrayGetSize(pTable->pBlkWinIdx) <= 0) {
2,200✔
1135
    return -1;
550✔
1136
  }
1137

1138
  int32_t size = taosArrayGetSize(pTable->pBlkWinIdx);
1,650✔
1139
  for (int32_t i = size - 1; i >= 0; --i) {
2,475✔
1140
    int64_t* pIdx = taosArrayGet(pTable->pBlkWinIdx, i);
2,475✔
1141
    if (NULL == pIdx) {
2,475✔
1142
      continue;
×
1143
    }
1144

1145
    int32_t* pPair = (int32_t*)pIdx;
2,475✔
1146
    if (rowIdx >= pPair[1]) {
2,475✔
1147
      return pPair[0];
1,650✔
1148
    }
1149
  }
1150

1151
  return -1;
×
1152
}
1153

1154
static void mJoinSetExternalWinIdxFromPeer(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
81,606,586✔
1155
  SExecTaskInfo* pTaskInfo = pJoin->pOperator->pTaskInfo;
81,606,586✔
1156
  if (NULL == pTaskInfo->pStreamRuntimeInfo || !pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow) {
81,605,547✔
1157
    return;
81,597,786✔
1158
  }
1159

1160
  SMJoinTableCtx* pPeer = (pTable == pJoin->build) ? pJoin->probe : pJoin->build;
8,800✔
1161
  if (NULL == pPeer || NULL == pPeer->blk || pPeer->blkRowIdx >= pPeer->blk->info.rows) {
8,800✔
1162
    return;
6,600✔
1163
  }
1164

1165
  int32_t winIdx = mJoinGetExternalWinIdx(pPeer, pPeer->blkRowIdx);
2,200✔
1166
  if (winIdx >= 0) {
2,200✔
1167
    pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = winIdx;
1,650✔
1168
  }
1169
}
1170

1171
static int32_t mJoinSaveExternalWinIdx(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
62,158,702✔
1172
  SExecTaskInfo* pTaskInfo = pJoin->pOperator->pTaskInfo;
62,158,702✔
1173
  if (NULL == pTaskInfo->pStreamRuntimeInfo || !pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow) {
62,163,751✔
1174
    if (pTable->pBlkWinIdx) {
62,157,668✔
1175
      taosArrayClear(pTable->pBlkWinIdx);
×
1176
    }
1177
    return TSDB_CODE_SUCCESS;
62,157,668✔
1178
  }
1179

1180
  SArray* pInputWinIdx = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx;
4,400✔
1181
  int32_t size = (NULL == pInputWinIdx) ? 0 : taosArrayGetSize(pInputWinIdx);
4,400✔
1182
  if (NULL == pTable->pBlkWinIdx) {
4,400✔
1183
    pTable->pBlkWinIdx = taosArrayInit(TMAX(size, 1), sizeof(int64_t));
4,400✔
1184
    if (NULL == pTable->pBlkWinIdx) {
4,400✔
1185
      return terrno;
×
1186
    }
1187
  } else {
1188
    taosArrayClear(pTable->pBlkWinIdx);
×
1189
  }
1190

1191
  if (size > 0 && NULL == taosArrayAddBatch(pTable->pBlkWinIdx, TARRAY_DATA(pInputWinIdx), size)) {
4,400✔
1192
    return terrno;
×
1193
  }
1194

1195
  return TSDB_CODE_SUCCESS;
4,400✔
1196
}
1197

1198
SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
3,515,301✔
1199
  SSDataBlock* pTmp = NULL;
3,515,301✔
1200
  int32_t      code = TSDB_CODE_SUCCESS;
3,515,301✔
1201
  int32_t      dsIdx = pTable->downStreamIdx;
3,515,301✔
1202
  if (E_JOIN_TB_PROBE == pTable->type) {
3,515,301✔
1203
    if (pTable->remainInBlk) {
1,666,908✔
1204
      pTmp = pTable->remainInBlk;
605,055✔
1205
      pTable->remainInBlk = NULL;
605,055✔
1206
      (*pJoin->grpResetFp)(pJoin);
605,055✔
1207
      pTable->lastInGid = pTmp->info.id.groupId;
605,055✔
1208
      goto _return;
605,055✔
1209
    }
1210

1211
    if (pTable->dsFetchDone) {
1,061,853✔
1212
      return NULL;
×
1213
    }
1214

1215
    mJoinSetExternalWinIdxFromPeer(pJoin, pTable);
1,061,853✔
1216
    pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
1,061,853✔
1217
    if (NULL == pTmp) {
1,061,853✔
1218
      pTable->dsFetchDone = true;
238,365✔
1219
      return NULL;
238,365✔
1220
    }
1221
    code = mJoinSaveExternalWinIdx(pJoin, pTable);
823,488✔
1222
    if (code) {
823,488✔
1223
      pJoin->errCode = code;
×
1224
      T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
×
1225
    }
1226

1227
    if (0 == pTable->lastInGid) {
823,488✔
1228
      pTable->lastInGid = pTmp->info.id.groupId;
218,433✔
1229
      goto _return;
218,433✔
1230
    }
1231

1232
    if (pTable->lastInGid == pTmp->info.id.groupId) {
605,055✔
1233
      goto _return;
×
1234
    }
1235

1236
    pTable->remainInBlk = pTmp;
605,055✔
1237
    return NULL;
605,055✔
1238
  }
1239

1240
  SMJoinTableCtx* pProbe = pJoin->probe;
1,848,393✔
1241

1242
  while (true) {
771,831✔
1243
    if (pTable->remainInBlk) {
2,620,224✔
1244
      if (pTable->remainInBlk->info.id.groupId == pProbe->lastInGid) {
1,516,461✔
1245
        pTmp = pTable->remainInBlk;
726,906✔
1246
        pTable->remainInBlk = NULL;
726,906✔
1247
        pTable->lastInGid = pTmp->info.id.groupId;
726,906✔
1248
        goto _return;
726,906✔
1249
      }
1250

1251
      if (pTable->remainInBlk->info.id.groupId > pProbe->lastInGid) {
789,555✔
1252
        return NULL;
757,350✔
1253
      }
1254

1255
      pTable->remainInBlk = NULL;
32,205✔
1256
    }
1257

1258
    if (pTable->dsFetchDone) {
1,135,968✔
1259
      return NULL;
180,675✔
1260
    }
1261

1262
    mJoinSetExternalWinIdxFromPeer(pJoin, pTable);
955,293✔
1263
    SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
955,293✔
1264
    if (NULL == pTmp) {
955,293✔
1265
      pTable->dsFetchDone = true;
183,462✔
1266
      return NULL;
183,462✔
1267
    }
1268
    code = mJoinSaveExternalWinIdx(pJoin, pTable);
771,831✔
1269
    if (code) {
771,831✔
1270
      pJoin->errCode = code;
×
1271
      T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
×
1272
    }
1273

1274
    pTable->remainInBlk = pTmp;
771,831✔
1275
  }
1276

1277
_return:
1,550,394✔
1278

1279
  code = mJoinLaunchPrimExpr(pTmp, pTable);
1,550,394✔
1280
  if (code) {
1,550,394✔
1281
    pJoin->errCode = code;
×
1282
    T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
×
1283
  }
1284

1285
  return pTmp;
1,550,394✔
1286
}
1287

1288
static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
79,945,345✔
1289
  if (pTable->dsFetchDone) {
79,945,345✔
1290
    return NULL;
356,356✔
1291
  }
1292

1293
  mJoinSetExternalWinIdxFromPeer(pJoin, pTable);
79,588,989✔
1294
  SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTable->downStreamIdx);
79,589,440✔
1295
  if (NULL == pTmp) {
79,575,511✔
1296
    pTable->dsFetchDone = true;
19,008,201✔
1297
  } else {
1298
    int32_t code = mJoinLaunchPrimExpr(pTmp, pTable);
60,567,310✔
1299
    if (code) {
60,564,505✔
1300
      pJoin->errCode = code;
×
1301
      T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
×
1302
    }
1303
    code = mJoinSaveExternalWinIdx(pJoin, pTable);
60,564,505✔
1304
    if (code) {
60,563,383✔
1305
      pJoin->errCode = code;
×
1306
      T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
×
1307
    }
1308
  }
1309

1310
  return pTmp;
79,572,145✔
1311
}
1312

1313
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
14,519,598✔
1314
  pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin;
14,519,598✔
1315
  pJoin->ctx.mergeCtx.limit = (pJoinNode->node.pLimit && ((SLimitNode*)pJoinNode->node.pLimit)->limit)
30,000,034✔
1316
                                  ? ((SLimitNode*)pJoinNode->node.pLimit)->limit->datum.i
961,375✔
1317
                                  : INT64_MAX;
15,480,973✔
1318
  pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl;
14,520,056✔
1319
  pJoin->outBlkId = pJoinNode->node.pOutputDataBlockDesc->dataBlockId;
14,518,476✔
1320

1321
  if ((JOIN_STYPE_ASOF == pJoin->subType &&
14,520,080✔
1322
       (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) ||
362,292✔
1323
      (JOIN_STYPE_WIN == pJoin->subType)) {
14,268,647✔
1324
    pJoin->ctx.mergeCtxInUse = false;
626,961✔
1325
    return mJoinInitWindowCtx(pJoin, pJoinNode);
626,297✔
1326
  }
1327

1328
  pJoin->ctx.mergeCtxInUse = true;
13,892,076✔
1329
  return mJoinInitMergeCtx(pJoin, pJoinNode);
13,893,759✔
1330
}
1331

1332
static void mJoinDestroyCtx(SMJoinOperatorInfo* pJoin) {
14,520,641✔
1333
  if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) {
14,520,641✔
1334
    return mJoinDestroyWindowCtx(pJoin);
625,278✔
1335
  }
1336

1337
  return mJoinDestroyMergeCtx(pJoin);
13,894,905✔
1338
}
1339

1340
bool mJoinIsDone(SOperatorInfo* pOperator) { return (OP_EXEC_DONE == pOperator->status); }
219,897✔
1341

1342
void mJoinSetDone(SOperatorInfo* pOperator) {
17,266,807✔
1343
  setOperatorCompleted(pOperator);
17,266,807✔
1344
  if (pOperator->pDownstreamGetParams) {
17,266,807✔
1345
    freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM);
4,297,317✔
1346
    freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM);
4,297,317✔
1347
    pOperator->pDownstreamGetParams[0] = NULL;
4,297,317✔
1348
    pOperator->pDownstreamGetParams[1] = NULL;
4,297,317✔
1349
  }
1350
}
17,266,807✔
1351

1352
bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
100,250,736✔
1353
  if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) {
100,250,736✔
1354
    (*ppBlk) = (*pJoin->retrieveFp)(pJoin, pTb);
77,133,562✔
1355
    pTb->dsInitDone = true;
77,117,977✔
1356

1357
    qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo),
77,117,977✔
1358
           MJOIN_TBTYPE(pTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0);
1359

1360
    *pIdx = 0;
77,123,026✔
1361
    if (NULL != (*ppBlk)) {
77,122,465✔
1362
      pTb->newBlk = true;
58,270,423✔
1363
    }
1364

1365
    return ((*ppBlk) == NULL) ? false : true;
77,122,465✔
1366
  }
1367

1368
  return true;
23,117,147✔
1369
}
1370

1371
static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
2,147,483,647✔
1372
  int32_t blkNum = taosArrayGetSize(pCreatedBlks);
2,147,483,647✔
1373
  for (int32_t i = 0; i < blkNum; ++i) {
2,147,483,647✔
1374
    (void)blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i));
5,441,125✔
1375
  }
1376
  taosArrayClear(pCreatedBlks);
2,147,483,647✔
1377
}
2,147,483,647✔
1378

1379
int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t* rowBitmapOffset) {
159,774✔
1380
  int32_t bitmapLen = BitmapLen(rowNum);
159,774✔
1381
  int64_t reqSize = pTable->rowBitmapOffset + bitmapLen;
159,774✔
1382
  if (reqSize > pTable->rowBitmapSize) {
159,774✔
1383
    int64_t newSize = reqSize * 1.1;
×
1384
    pTable->pRowBitmap = taosMemoryRealloc(pTable->pRowBitmap, newSize);
×
1385
    if (NULL == pTable->pRowBitmap) {
×
1386
      return terrno;
×
1387
    }
1388
    pTable->rowBitmapSize = newSize;
×
1389
  }
1390

1391
  TAOS_MEMSET(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen);
159,774✔
1392

1393
  *rowBitmapOffset = pTable->rowBitmapOffset;
159,774✔
1394
  pTable->rowBitmapOffset += bitmapLen;
159,774✔
1395

1396
  return TSDB_CODE_SUCCESS;
159,774✔
1397
}
1398

1399
void mJoinResetForBuildTable(SMJoinTableCtx* pTable) {
2,147,483,647✔
1400
  pTable->grpTotalRows = 0;
2,147,483,647✔
1401
  pTable->grpIdx = 0;
2,147,483,647✔
1402
  pTable->eqRowNum = 0;
2,147,483,647✔
1403
  mJoinDestroyCreatedBlks(pTable->createdBlks);
2,147,483,647✔
1404
  taosArrayClear(pTable->eqGrps);
2,147,483,647✔
1405
  if (pTable->rowBitmapSize > 0) {
2,147,483,647✔
1406
    pTable->rowBitmapOffset = 1;
159,774✔
1407
    TAOS_MEMSET(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx));
159,774✔
1408
  }
1409
}
2,147,483,647✔
1410

1411
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
2,147,483,647✔
1412
  SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
2,147,483,647✔
1413
  if (NULL == pCol) {
2,147,483,647✔
1414
    MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1415
  }
1416

1417
  SMJoinGrpRows* pGrp = NULL;
2,147,483,647✔
1418
  int32_t        code = TSDB_CODE_SUCCESS;
2,147,483,647✔
1419

1420
  if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
2,147,483,647✔
1421
    return TSDB_CODE_SUCCESS;
3,333,703✔
1422
  }
1423

1424
  if (restart) {
2,147,483,647✔
1425
    mJoinResetForBuildTable(pTable);
2,147,483,647✔
1426
  }
1427

1428
  bool keepGrp = true;
2,147,483,647✔
1429
  pGrp = taosArrayReserve(pTable->eqGrps, 1);
2,147,483,647✔
1430
  if (NULL == pGrp) {
2,147,483,647✔
1431
    MJ_ERR_RET(terrno);
×
1432
  }
1433

1434
  pGrp->beginIdx = pTable->blkRowIdx++;
2,147,483,647✔
1435
  pGrp->readIdx = pGrp->beginIdx;
2,147,483,647✔
1436
  pGrp->endIdx = pGrp->beginIdx;
2,147,483,647✔
1437
  pGrp->readMatch = false;
2,147,483,647✔
1438
  pGrp->blk = pTable->blk;
2,147,483,647✔
1439

1440
  char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
2,147,483,647✔
1441
  if (timestamp == *(int64_t*)pEndVal) {
2,147,483,647✔
1442
    if (pTable->multiEqGrpRows) {
16,064,596✔
1443
      pGrp->endIdx = pTable->blk->info.rows - 1;
15,889,294✔
1444
    } else {
1445
      pGrp->endIdx = pGrp->beginIdx;
175,302✔
1446
    }
1447

1448
    pTable->blkRowIdx = pTable->blk->info.rows;
16,064,596✔
1449
  } else {
1450
    for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
2,147,483,647✔
1451
      char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
2,147,483,647✔
1452
      if (timestamp == *(int64_t*)pNextVal) {
2,147,483,647✔
1453
        pGrp->endIdx++;
984,282,657✔
1454
        continue;
984,303,737✔
1455
      }
1456

1457
      if (!pTable->multiEqGrpRows) {
2,147,483,647✔
1458
        pGrp->endIdx = pGrp->beginIdx;
1,164,921✔
1459
      } else if (0 == pTable->eqRowLimit) {
2,147,483,647✔
1460
        // DO NOTHING
1461
      } else if (pTable->eqRowLimit == pTable->eqRowNum) {
357,927✔
1462
        keepGrp = false;
×
1463
      } else {
1464
        int64_t rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
357,927✔
1465
        pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
357,927✔
1466
        pTable->eqRowNum += rowNum;
357,927✔
1467
      }
1468

1469
      goto _return;
2,147,483,647✔
1470
    }
1471
  }
1472

1473
  if (wholeBlk && (pTable->multiEqGrpRows || restart)) {
16,064,596✔
1474
    *wholeBlk = true;
5,523,781✔
1475

1476
    if (pTable->noKeepEqGrpRows || !keepGrp) {
5,523,781✔
1477
      goto _return;
82,656✔
1478
    }
1479

1480
    if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
5,441,125✔
1481
      pGrp->blk = NULL;
1,778,302✔
1482
      code = createOneDataBlock(pTable->blk, true, &pGrp->blk);
1,778,302✔
1483
      if (code) {
1,778,302✔
1484
        MJ_ERR_RET(code);
×
1485
      }
1486

1487
      if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
3,556,604✔
1488
        MJ_ERR_RET(terrno);
×
1489
      }
1490
    } else {
1491
      if (!pTable->multiEqGrpRows) {
3,662,823✔
1492
        pGrp->endIdx = pGrp->beginIdx;
92,646✔
1493
      }
1494

1495
      int64_t rowNum = 0;
3,662,823✔
1496
      if (!pTable->multiEqGrpRows) {
3,662,823✔
1497
        rowNum = 1;
92,646✔
1498
        pGrp->endIdx = pGrp->beginIdx;
92,646✔
1499
      } else if (0 == pTable->eqRowLimit) {
3,570,177✔
1500
        rowNum = pGrp->endIdx - pGrp->beginIdx + 1;
3,440,211✔
1501
      } else if (pTable->eqRowLimit == pTable->eqRowNum) {
129,966✔
1502
        keepGrp = false;
×
1503
      } else {
1504
        rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
129,966✔
1505
        pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
129,966✔
1506
      }
1507

1508
      if (keepGrp && rowNum > 0) {
3,662,823✔
1509
        pTable->eqRowNum += rowNum;
3,662,823✔
1510
        code = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum, &pGrp->blk);
3,662,823✔
1511
        if (code) {
3,662,823✔
1512
          MJ_ERR_RET(code);
×
1513
        }
1514

1515
        pGrp->endIdx -= pGrp->beginIdx;
3,662,823✔
1516
        pGrp->beginIdx = 0;
3,662,823✔
1517
        pGrp->readIdx = 0;
3,662,823✔
1518
        if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
7,325,646✔
1519
          MJ_ERR_RET(terrno);
×
1520
        }
1521
      }
1522
    }
1523
  }
1524

1525
_return:
14,203,638✔
1526

1527
  if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
2,147,483,647✔
1528
    if (NULL == taosArrayPop(pTable->eqGrps)) {
575,598✔
1529
      code = terrno;
×
1530
    }
1531
  } else {
1532
    pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
2,147,483,647✔
1533
  }
1534

1535
  return code;
2,147,483,647✔
1536
}
1537

1538
int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp) {
2,147,483,647✔
1539
  bool wholeBlk = false;
2,147,483,647✔
1540

1541
  MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true));
2,147,483,647✔
1542

1543
  while (wholeBlk && !pTable->dsFetchDone) {
2,147,483,647✔
1544
    pTable->blk = (*pJoin->retrieveFp)(pJoin, pTable);
5,523,781✔
1545
    qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pJoin->pOperator->pTaskInfo),
5,523,781✔
1546
           MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
1547

1548
    pTable->blkRowIdx = 0;
5,523,781✔
1549

1550
    if (NULL == pTable->blk) {
5,523,781✔
1551
      break;
1,674,817✔
1552
    }
1553

1554
    wholeBlk = false;
3,848,964✔
1555
    MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false));
3,848,964✔
1556
  }
1557

1558
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1559
}
1560

1561
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
×
1562
  for (int32_t i = 0; i < pTable->keyNum; ++i) {
×
1563
    SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
×
1564
    if (NULL == pCol) {
×
1565
      MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1566
    }
1567

1568
    if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
×
1569
      qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot,
×
1570
             pCol->info.type, pTable->keyCols[i].vardata);
1571
      return TSDB_CODE_INVALID_PARA;
×
1572
    }
1573
    if (pTable->keyCols[i].bytes != pCol->info.bytes) {
×
1574
      qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes,
×
1575
             pTable->keyCols[i].bytes);
1576
      return TSDB_CODE_INVALID_PARA;
×
1577
    }
1578
    pTable->keyCols[i].data = pCol->pData;
×
1579
    if (pTable->keyCols[i].vardata) {
×
1580
      pTable->keyCols[i].offset = pCol->varmeta.offset;
×
1581
    }
1582
    pTable->keyCols[i].colData = pCol;
×
1583
  }
1584

1585
  return TSDB_CODE_SUCCESS;
×
1586
}
1587

1588
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t* pBufLen) {
×
1589
  char*  pData = NULL;
×
1590
  size_t bufLen = 0;
×
1591

1592
  if (1 == pTable->keyNum) {
×
1593
    if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) {
×
1594
      return true;
×
1595
    }
1596
    if (pTable->keyCols[0].jsonData) {
×
1597
      pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
×
1598
      bufLen = getJsonValueLen(pData);
×
1599
    } else if (pTable->keyCols[0].vardata) {
×
1600
      pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
×
1601
      bufLen = varDataTLen(pData);
×
1602
    } else {
1603
      pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
×
1604
      bufLen = pTable->keyCols[0].bytes;
×
1605
    }
1606
    pTable->keyData = pData;
×
1607
  } else {
1608
    for (int32_t i = 0; i < pTable->keyNum; ++i) {
×
1609
      if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) {
×
1610
        return true;
×
1611
      }
1612
      if (pTable->keyCols[0].jsonData) {
×
1613
        pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
×
1614
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, getJsonValueLen(pData));
×
1615
        bufLen += getJsonValueLen(pData);
×
1616
      } else if (pTable->keyCols[i].vardata) {
×
1617
        pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
×
1618
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
×
1619
        bufLen += varDataTLen(pData);
×
1620
      } else {
1621
        pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
×
1622
        TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
×
1623
        bufLen += pTable->keyCols[i].bytes;
×
1624
      }
1625
    }
1626
    pTable->keyData = pTable->keyBuf;
×
1627
  }
1628

1629
  if (pBufLen) {
×
1630
    *pBufLen = bufLen;
×
1631
  }
1632

1633
  return false;
×
1634
}
1635

1636
static int32_t mJoinGetAvailableGrpArray(SMJoinTableCtx* pTable, SArray** ppRes) {
×
1637
  do {
×
1638
    if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) {
×
1639
      *ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++);
×
1640
      if (NULL == *ppRes) {
×
1641
        return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1642
      }
1643
      taosArrayClear(*ppRes);
×
1644
      return TSDB_CODE_SUCCESS;
×
1645
    }
1646

1647
    SArray* pNew = taosArrayInit(4, sizeof(SMJoinRowPos));
×
1648
    if (NULL == pNew) {
×
1649
      return terrno;
×
1650
    }
1651
    if (NULL == taosArrayPush(pTable->pGrpArrays, &pNew)) {
×
1652
      return terrno;
×
1653
    }
1654
  } while (true);
1655

1656
  return TSDB_CODE_SUCCESS;
1657
}
1658

1659
static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
×
1660
  SMJoinTableCtx* pBuild = pJoin->build;
×
1661
  SMJoinRowPos    pos = {pBlock, rowIdx};
×
1662
  SArray**        pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
×
1663
  if (!pGrpRows) {
×
1664
    SArray* pNewGrp = NULL;
×
1665
    MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp));
×
1666

1667
    if (NULL == taosArrayPush(pNewGrp, &pos)) {
×
1668
      return terrno;
×
1669
    }
1670
    MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES));
×
1671
  } else if (pBuild->multiRowsGrp) {
×
1672
    if (NULL == taosArrayPush(*pGrpRows, &pos)) {
×
1673
      return terrno;
×
1674
    }
1675
  }
1676

1677
  return TSDB_CODE_SUCCESS;
×
1678
}
1679

1680
static int32_t mJoinAddRowToFullHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
×
1681
  SMJoinTableCtx*    pBuild = pJoin->build;
×
1682
  SMJoinRowPos       pos = {pBlock, rowIdx};
×
1683
  SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
×
1684
  if (!pGrpRows) {
×
1685
    SMJoinHashGrpRows pNewGrp = {0};
×
1686
    MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp.pRows));
×
1687

1688
    if (NULL == taosArrayPush(pNewGrp.pRows, &pos)) {
×
1689
      return terrno;
×
1690
    }
1691
    MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp)));
×
1692
  } else {
1693
    if (NULL == taosArrayPush(pGrpRows->pRows, &pos)) {
×
1694
      return terrno;
×
1695
    }
1696
  }
1697

1698
  return TSDB_CODE_SUCCESS;
×
1699
}
1700

1701
int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
×
1702
  size_t bufLen = 0;
×
1703

1704
  tSimpleHashClear(pJoin->build->pGrpHash);
×
1705
  pJoin->build->grpArrayIdx = 0;
×
1706

1707
  pJoin->build->grpRowIdx = -1;
×
1708

1709
  int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
×
1710
  for (int32_t g = 0; g < grpNum; ++g) {
×
1711
    SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
×
1712
    if (NULL == pGrp) {
×
1713
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1714
    }
1715
    MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
×
1716

1717
    int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
×
1718
    for (int32_t r = 0; r < grpRows; ++r) {
×
1719
      if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
×
1720
        *(int16_t*)pTable->keyBuf = 0;
×
1721
        pTable->keyData = pTable->keyBuf;
×
1722
        bufLen = pTable->keyNullSize;
×
1723
      }
1724

1725
      MJ_ERR_RET(mJoinAddRowToFullHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r));
×
1726
    }
1727
  }
1728

1729
  return TSDB_CODE_SUCCESS;
×
1730
}
1731

1732
int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
×
1733
  size_t bufLen = 0;
×
1734

1735
  tSimpleHashClear(pJoin->build->pGrpHash);
×
1736
  pJoin->build->grpArrayIdx = 0;
×
1737

1738
  pJoin->build->grpRowIdx = -1;
×
1739

1740
  int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
×
1741
  for (int32_t g = 0; g < grpNum; ++g) {
×
1742
    SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
×
1743
    if (NULL == pGrp) {
×
1744
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1745
    }
1746

1747
    MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
×
1748

1749
    int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
×
1750
    for (int32_t r = 0; r < grpRows; ++r) {
×
1751
      if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
×
1752
        continue;
×
1753
      }
1754

1755
      MJ_ERR_RET(mJoinAddRowToHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r));
×
1756
    }
1757
  }
1758

1759
  return TSDB_CODE_SUCCESS;
×
1760
}
1761

1762
void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx) {
9,620,640✔
1763
  pCtx->blk = NULL;
9,620,640✔
1764
  pCtx->blkRowIdx = 0;
9,620,640✔
1765
  pCtx->newBlk = false;
9,620,640✔
1766
  if (pCtx->pBlkWinIdx) {
9,620,640✔
1767
    taosArrayClear(pCtx->pBlkWinIdx);
×
1768
  }
1769

1770
  mJoinDestroyCreatedBlks(pCtx->createdBlks);
9,620,640✔
1771
  tSimpleHashClear(pCtx->pGrpHash);
9,620,640✔
1772
}
9,620,640✔
1773

1774
void mJoinResetTableCtx(SMJoinTableCtx* pCtx) {
8,410,530✔
1775
  pCtx->dsInitDone = false;
8,410,530✔
1776
  pCtx->dsFetchDone = false;
8,410,530✔
1777
  pCtx->lastInGid = 0;
8,410,530✔
1778
  pCtx->remainInBlk = NULL;
8,410,530✔
1779

1780
  mJoinResetGroupTableCtx(pCtx);
8,410,530✔
1781
}
8,410,530✔
1782

1783
void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) {
4,205,265✔
1784
  pCtx->grpRemains = false;
4,205,265✔
1785
  pCtx->midRemains = false;
4,205,265✔
1786
  pCtx->lastEqGrp = false;
4,205,265✔
1787

1788
  pCtx->lastEqTs = INT64_MIN;
4,205,265✔
1789
  pCtx->hashJoin = false;
4,205,265✔
1790
}
4,205,265✔
1791

1792
void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache) {
1,150,451✔
1793
  pCache->outRowIdx = 0;
1,150,451✔
1794
  pCache->rowNum = 0;
1,150,451✔
1795
  pCache->grpIdx = 0;
1,150,451✔
1796

1797
  if (pCache->grpsQueue) {
1,150,451✔
1798
    TSWAP(pCache->grps, pCache->grpsQueue);
1,815✔
1799
  }
1800

1801
  int32_t grpNum = taosArrayGetSize(pCache->grps);
1,150,451✔
1802

1803
  for (int32_t i = 0; i < grpNum; ++i) {
1,794,859✔
1804
    SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
644,408✔
1805
    if (NULL == pGrp) {
644,408✔
1806
      continue;
×
1807
    }
1808
    if (pGrp->blk != pCtx->cache.outBlk && pGrp->clonedBlk) {
644,408✔
1809
      (void)blockDataDestroy(pGrp->blk);
503,576✔
1810
    }
1811
  }
1812

1813
  taosArrayClear(pCache->grps);
1,150,451✔
1814

1815
  if (pCache->outBlk) {
1,150,451✔
1816
    blockDataCleanup(pCache->outBlk);
409,584✔
1817
  }
1818
}
1,150,451✔
1819

1820
void mJoinResetWindowCtx(SMJoinWindowCtx* pCtx) {
×
1821
  pCtx->grpRemains = false;
×
1822
  pCtx->lastEqGrp = false;
×
1823
  pCtx->lastProbeGrp = false;
×
1824
  pCtx->eqPostDone = false;
×
1825
  pCtx->lastTs = INT64_MIN;
×
1826

1827
  mWinJoinResetWindowCache(pCtx, &pCtx->cache);
×
1828
}
×
1829

1830
void mJoinResetCtx(SMJoinOperatorInfo* pJoin) {
4,205,265✔
1831
  if (pJoin->ctx.mergeCtxInUse) {
4,205,265✔
1832
    mJoinResetMergeCtx(&pJoin->ctx.mergeCtx);
4,205,265✔
1833
  } else {
1834
    mJoinResetWindowCtx(&pJoin->ctx.windowCtx);
×
1835
  }
1836
}
4,205,265✔
1837

1838
void mJoinResetOperator(struct SOperatorInfo* pOperator) {
4,205,265✔
1839
  SMJoinOperatorInfo* pJoin = pOperator->info;
4,205,265✔
1840

1841
  mJoinResetTableCtx(pJoin->build);
4,205,265✔
1842
  mJoinResetTableCtx(pJoin->probe);
4,205,265✔
1843

1844
  mJoinResetCtx(pJoin);
4,205,265✔
1845

1846
  pJoin->errCode = 0;
4,205,265✔
1847
  pJoin->execInfo = (SMJoinExecInfo){0};
4,205,265✔
1848

1849
  pOperator->status = OP_OPENED;
4,205,265✔
1850
}
4,205,265✔
1851

1852
int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
27,201,082✔
1853
  SMJoinOperatorInfo* pJoin = pOperator->info;
27,201,082✔
1854
  int32_t             code = TSDB_CODE_SUCCESS;
27,201,082✔
1855
  if (pOperator->status == OP_EXEC_DONE) {
27,201,082✔
1856
    if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] ||
12,735,778✔
1857
        NULL == pOperator->pDownstreamGetParams[1]) {
4,204,017✔
1858
      qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo));
8,531,761✔
1859
      return code;
8,531,761✔
1860
    } else {
1861
      mJoinResetOperator(pOperator);
4,204,017✔
1862
      qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo));
4,204,017✔
1863
    }
1864
  }
1865

1866
  SSDataBlock* pBlock = NULL;
18,668,870✔
1867
  while (true) {
1868
    pBlock = (*pJoin->joinFp)(pOperator);
18,792,908✔
1869
    if (NULL == pBlock) {
18,781,674✔
1870
      if (pJoin->errCode) {
14,760✔
1871
        T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
14,760✔
1872
      }
1873
      break;
×
1874
    }
1875

1876
    pBlock->info.id.blockId = pJoin->outBlkId;
18,766,914✔
1877
    if (pJoin->pFinFilter != NULL) {
18,766,914✔
1878
      code = doFilter(pBlock, pJoin->pFinFilter, NULL, NULL);
3,668,496✔
1879
      if (code) {
3,668,496✔
1880
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1881
        pJoin->errCode = code;
×
1882
        T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
×
1883
      }
1884
    }
1885

1886
    if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) {
18,766,914✔
1887
      pBlock->info.dataLoad = 1;
18,642,876✔
1888
      break;
18,642,876✔
1889
    }
1890
  }
1891

1892
  pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0;
18,642,876✔
1893
  if (pBlock && pBlock->info.rows > 0) {
18,642,876✔
1894
    *pResBlock = pBlock;
10,714,693✔
1895
  }
1896

1897
  return code;
18,642,876✔
1898
}
1899

1900
void destroyGrpArray(void* ppArray) {
×
1901
  SArray* pArray = *(SArray**)ppArray;
×
1902
  taosArrayDestroy(pArray);
×
1903
}
×
1904

1905
void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
29,042,404✔
1906
  if (NULL == pTable) {
29,042,404✔
1907
    return;
×
1908
  }
1909
  mJoinDestroyCreatedBlks(pTable->createdBlks);
29,042,404✔
1910
  taosArrayDestroy(pTable->createdBlks);
29,041,843✔
1911
  tSimpleHashCleanup(pTable->pGrpHash);
29,041,843✔
1912

1913
  taosMemoryFree(pTable->primCol);
29,041,385✔
1914
  taosMemoryFree(pTable->finCols);
29,041,843✔
1915
  taosMemoryFree(pTable->keyCols);
29,041,843✔
1916
  taosMemoryFree(pTable->keyBuf);
29,041,843✔
1917
  taosMemoryFree(pTable->pRowBitmap);
29,041,843✔
1918

1919
  taosArrayDestroy(pTable->eqGrps);
29,041,843✔
1920
  taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray);
29,041,843✔
1921
  taosArrayDestroy(pTable->pBlkWinIdx);
29,040,721✔
1922
}
1923

1924
void destroyMergeJoinOperator(void* param) {
14,520,641✔
1925
  SMJoinOperatorInfo* pJoin = (SMJoinOperatorInfo*)param;
14,520,641✔
1926

1927
  mJoinDestroyCtx(pJoin);
14,520,641✔
1928

1929
  if (pJoin->pFPreFilter != NULL) {
14,521,202✔
1930
    filterFreeInfo(pJoin->pFPreFilter);
498,457✔
1931
    pJoin->pFPreFilter = NULL;
498,457✔
1932
  }
1933
  if (pJoin->pPreFilter != NULL) {
14,521,202✔
1934
    filterFreeInfo(pJoin->pPreFilter);
456,655✔
1935
    pJoin->pPreFilter = NULL;
456,655✔
1936
  }
1937
  if (pJoin->pFinFilter != NULL) {
14,521,202✔
1938
    filterFreeInfo(pJoin->pFinFilter);
4,343,199✔
1939
    pJoin->pFinFilter = NULL;
4,343,199✔
1940
  }
1941

1942
  destroyMergeJoinTableCtx(pJoin->probe);
14,521,202✔
1943
  destroyMergeJoinTableCtx(pJoin->build);
14,521,202✔
1944

1945
  taosMemoryFreeClear(pJoin);
14,520,080✔
1946
}
14,521,202✔
1947

1948
int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
14,519,598✔
1949
  switch (pJoin->joinType) {
14,519,598✔
1950
    case JOIN_TYPE_INNER: {
12,676,305✔
1951
      SNode* pCond = NULL;
12,676,305✔
1952
      if (pJoinNode->pFullOnCond != NULL) {
12,676,305✔
1953
        if (pJoinNode->node.pConditions != NULL) {
3,999,517✔
1954
          MJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions));
2,265✔
1955
        }
1956
        pCond = pJoinNode->pFullOnCond;
3,999,517✔
1957
      } else if (pJoinNode->node.pConditions != NULL) {
8,676,812✔
1958
        pCond = pJoinNode->node.pConditions;
10,470✔
1959
      }
1960

1961
      MJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0, pTaskInfo->pStreamRuntimeInfo));
12,676,890✔
1962
      break;
12,675,744✔
1963
    }
1964
    case JOIN_TYPE_LEFT:
1,844,312✔
1965
    case JOIN_TYPE_RIGHT:
1966
    case JOIN_TYPE_FULL:
1967
      if (pJoinNode->pFullOnCond != NULL) {
1,844,312✔
1968
        MJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pFPreFilter, 0,
498,457✔
1969
                                      pTaskInfo->pStreamRuntimeInfo));
1970
      }
1971
      if (pJoinNode->pColOnCond != NULL) {
1,843,854✔
1972
        MJ_ERR_RET(
456,655✔
1973
            filterInitFromNode(pJoinNode->pColOnCond, &pJoin->pPreFilter, 0, pTaskInfo->pStreamRuntimeInfo));
1974
      }
1975
      if (pJoinNode->node.pConditions != NULL) {
1,843,854✔
1976
        MJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0,
333,212✔
1977
                                      pTaskInfo->pStreamRuntimeInfo));
1978
      }
1979
      break;
1,843,854✔
1980
    default:
×
1981
      break;
×
1982
  }
1983

1984
  return TSDB_CODE_SUCCESS;
14,519,598✔
1985
}
1986

1987
int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
14,521,202✔
1988
  switch (pJoin->joinType) {
14,521,202✔
1989
    case JOIN_TYPE_INNER:
12,676,890✔
1990
      pJoin->joinFp = mInnerJoinDo;
12,676,890✔
1991
      break;
12,676,890✔
1992
    case JOIN_TYPE_LEFT:
1,738,112✔
1993
    case JOIN_TYPE_RIGHT: {
1994
      switch (pJoin->subType) {
1,738,112✔
1995
        case JOIN_STYPE_OUTER:
553,506✔
1996
          pJoin->joinFp = mLeftJoinDo;
553,506✔
1997
          pJoin->grpResetFp = mLeftJoinGroupReset;
553,506✔
1998
          break;
553,506✔
1999
        case JOIN_STYPE_SEMI:
293,655✔
2000
          pJoin->joinFp = mSemiJoinDo;
293,655✔
2001
          break;
293,655✔
2002
        case JOIN_STYPE_ANTI:
264,654✔
2003
          pJoin->joinFp = mAntiJoinDo;
264,654✔
2004
          break;
264,654✔
2005
        case JOIN_STYPE_WIN:
374,864✔
2006
          pJoin->joinFp = mWinJoinDo;
374,864✔
2007
          pJoin->grpResetFp = mWinJoinGroupReset;
374,864✔
2008
          break;
374,864✔
2009
        default:
251,433✔
2010
          break;
251,433✔
2011
      }
2012
      break;
1,738,112✔
2013
    }
2014
    case JOIN_TYPE_FULL:
106,200✔
2015
      pJoin->joinFp = mFullJoinDo;
106,200✔
2016
      break;
106,200✔
2017
    default:
×
2018
      break;
×
2019
  }
2020

2021
  return TSDB_CODE_SUCCESS;
14,521,202✔
2022
}
2023

2024
static int32_t resetMergeJoinOperState(SOperatorInfo* pOper) {
1,248✔
2025
  mJoinResetOperator(pOper);
1,248✔
2026
  return 0;
1,248✔
2027
}
2028

2029
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
14,521,202✔
2030
                                    SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo,
2031
                                    SOperatorInfo** pOptrInfo) {
2032
  QRY_PARAM_CHECK(pOptrInfo);
14,521,202✔
2033

2034
  int32_t             oldNum = numOfDownstream;
14,521,202✔
2035
  bool                newDownstreams = false;
14,521,202✔
2036
  int32_t             code = TSDB_CODE_SUCCESS;
14,521,202✔
2037
  SOperatorInfo*      pOperator = NULL;
14,521,202✔
2038
  SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
14,521,202✔
2039
  if (pInfo == NULL) {
14,521,202✔
2040
    code = terrno;
×
2041
    goto _return;
×
2042
  }
2043

2044
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
14,521,202✔
2045
  if (pOperator == NULL) {
14,521,202✔
2046
    code = terrno;
×
2047
    goto _return;
×
2048
  }
2049
  initOperatorCostInfo(pOperator);
14,521,202✔
2050

2051
  pInfo->pOperator = pOperator;
14,521,202✔
2052
  MJ_ERR_JRET(mJoinInitDownstreamInfo(pInfo, &pDownstream, &numOfDownstream, &newDownstreams));
14,521,202✔
2053

2054
  setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo,
14,521,202✔
2055
                  pTaskInfo);
2056

2057
  mJoinSetBuildAndProbeTable(pInfo, pJoinNode);
14,521,202✔
2058

2059
  MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode, pTaskInfo));
14,520,056✔
2060

2061
  MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams));
14,519,598✔
2062
  MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams));
14,520,080✔
2063

2064
  MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
14,520,641✔
2065
  MJ_ERR_JRET(mJoinSetImplFp(pInfo));
14,521,202✔
2066

2067
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator,
14,521,202✔
2068
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2069

2070
  setOperatorResetStateFn(pOperator, resetMergeJoinOperState);
14,521,202✔
2071
  MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
14,521,202✔
2072

2073
  if (newDownstreams) {
14,520,641✔
2074
    taosMemoryFree(pDownstream);
1,367,923✔
2075
    pOperator->numOfRealDownstream = 1;
1,367,923✔
2076
  } else {
2077
    pOperator->numOfRealDownstream = 2;
13,152,718✔
2078
  }
2079

2080
  *pOptrInfo = pOperator;
14,520,641✔
2081
  return code;
14,520,641✔
2082

2083
_return:
×
2084

2085
  if (pInfo != NULL) {
×
2086
    destroyMergeJoinOperator(pInfo);
×
2087
  }
2088
  destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum);
×
2089
  if (newDownstreams) {
×
2090
    taosMemoryFree(pDownstream);
×
2091
  }
2092
  pTaskInfo->code = code;
×
2093

2094
  return code;
×
2095
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc