• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4469

08 Jul 2025 09:38AM UTC coverage: 62.22% (-1.2%) from 63.381%
#4469

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

153678 of 316510 branches covered (48.55%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

5035 existing lines in 221 files now uncovered.

238955 of 314529 relevant lines covered (75.97%)

6273248.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.58
/source/libs/executor/src/mergeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortMergeInfo {
23
  SArray*        pSortInfo;
24
  SSortHandle*   pSortHandle;
25
  STupleHandle*  prefetchedTuple;
26
  int32_t        bufPageSize;
27
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
28
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
29
  SSDataBlock*   pInputBlock;
30
  SColMatchInfo  matchInfo;
31
} SSortMergeInfo;
32

33
typedef struct SNonSortMergeInfo {
34
  int32_t  lastSourceIdx;
35
  int32_t  sourceWorkIdx;
36
  int32_t  sourceNum;
37
  int32_t* pSourceStatus;
38
} SNonSortMergeInfo;
39

40
typedef struct SColsMergeInfo {
41
  SNodeList* pTargets;
42
  size_t     sourceNum;
43
  uint64_t*  srcBlkIds;
44
} SColsMergeInfo;
45

46
typedef struct SMultiwayMergeOperatorInfo {
47
  SOptrBasicInfo binfo;
48
  EMergeType     type;
49
  union {
50
    SSortMergeInfo    sortMergeInfo;
51
    SNonSortMergeInfo nsortMergeInfo;
52
    SColsMergeInfo    colsMergeInfo;
53
  };
54
  SLimitInfo     limitInfo;
55
  bool           groupMerge;
56
  bool           ignoreGroupId;
57
  uint64_t       groupId;
58
  bool           inputWithGroupId;
59
} SMultiwayMergeOperatorInfo;
60

61
static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
62
static int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
63
static int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
64
static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
65
static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
66

67
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
950,037✔
68
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
950,037✔
69
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
950,037✔
70
  if (code) {
950,035!
71
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
72
  }
73
  code = blockDataCheck(*ppBlock);
950,035✔
74
  if (code) {
950,015!
75
    qError("failed to check data block got from upstream, %s code:%s", __func__, tstrerror(code));
×
76
  }
77
  return code;
950,017✔
78
}
79

80
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
95,865✔
81
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
95,865✔
82
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
95,865✔
83
  SSortMergeInfo*             pSortMergeInfo = &pInfo->sortMergeInfo;
95,865✔
84

85
  int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize;
95,865✔
86

87
  pSortMergeInfo->pSortHandle = NULL;
95,865✔
88
  int32_t code = tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize,
95,865✔
89
                                       numOfBufPage, pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pSortMergeInfo->pSortHandle);
95,865✔
90
  if (code) {
95,884!
91
    return code;
×
92
  }
93

94
  tsortSetFetchRawDataFp(pSortMergeInfo->pSortHandle, sortMergeloadNextDataBlock, NULL, NULL);
95,884✔
95
  tsortSetCompareGroupId(pSortMergeInfo->pSortHandle, pInfo->groupMerge);
95,881✔
96

97
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
347,097✔
98
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
251,212✔
99
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
251,212!
100
      code = pDownstream->fpSet._openFn(pDownstream);
251,220✔
101
      if (code) {
251,221!
102
        return code;
×
103
      }
104
    }
105

106
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
251,213!
107
    if (ps == NULL) {
251,231!
108
      return terrno;
×
109
    }
110

111
    ps->param = pDownstream;
251,231✔
112
    ps->onlyRef = true;
251,231✔
113

114
    code = tsortAddSource(pSortMergeInfo->pSortHandle, ps);
251,231✔
115
    if (code) {
251,217!
116
      return code;
×
117
    }
118
  }
119

120
  return tsortOpen(pSortMergeInfo->pSortHandle);
95,885✔
121
}
122

123
static int32_t doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
365,431✔
124
                                 SSDataBlock* p, bool* newgroup) {
125
  SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
365,431✔
126
  *newgroup = false;
365,431✔
127
  int32_t code = 0;
365,431✔
128

129
  while (1) {
91,033,596✔
130
    STupleHandle* pTupleHandle = NULL;
91,399,027✔
131
    if (pInfo->groupMerge || pInfo->inputWithGroupId) {
91,399,027✔
132
      if (pSortMergeInfo->prefetchedTuple == NULL) {
74,235,109✔
133
        pTupleHandle = NULL;
74,091,446✔
134
        code = tsortNextTuple(pHandle, &pTupleHandle);
74,091,446✔
135
        if (code) {
136
          // todo handle error
137
        }
138
      } else {
139
        pTupleHandle = pSortMergeInfo->prefetchedTuple;
143,663✔
140
        pSortMergeInfo->prefetchedTuple = NULL;
143,663✔
141
        uint64_t gid = tsortGetGroupId(pTupleHandle);
143,663✔
142
        if (gid != pInfo->groupId) {
143,663!
143
          *newgroup = true;
143,663✔
144
          pInfo->groupId = gid;
143,663✔
145
        }
146
      }
147
    } else {
148
      code = tsortNextTuple(pHandle, &pTupleHandle);
17,163,918✔
149
      pInfo->groupId = 0;
17,052,671✔
150
    }
151

152
    if (pTupleHandle == NULL || (code != 0)) {
91,269,609!
153
      break;
154
    }
155

156
    if (pInfo->groupMerge || pInfo->inputWithGroupId) {
165,106,706✔
157
      uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
74,124,208✔
158
      if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
74,092,475✔
159
        code = appendOneRowToDataBlock(p, pTupleHandle);
73,948,686✔
160
        if (code) {
73,992,671!
161
          return code;
×
162
        }
163

164
        p->info.id.groupId = tupleGroupId;
73,992,671✔
165
        pInfo->groupId = tupleGroupId;
73,992,671✔
166
      } else {
167
        if (p->info.rows == 0) {
143,789✔
168
          code = appendOneRowToDataBlock(p, pTupleHandle);
12✔
169
          if (code) {
12!
170
            return code;
×
171
          }
172

173
          p->info.id.groupId = pInfo->groupId = tupleGroupId;
12✔
174
        } else {
175
          pSortMergeInfo->prefetchedTuple = pTupleHandle;
143,777✔
176
          break;
143,777✔
177
        }
178
      }
179
    } else {
180
      code = appendOneRowToDataBlock(p, pTupleHandle);
16,989,815✔
181
      if (code) {
17,097,325!
182
        return code;
×
183
      }
184
    }
185

186
    if (p->info.rows >= capacity) {
91,090,008✔
187
      break;
56,412✔
188
    }
189
  }
190

191
  return code;
355,775✔
192
}
193

194
int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
365,337✔
195
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
365,337✔
196
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
365,337✔
197
  SSortMergeInfo*             pSortMergeInfo = &pInfo->sortMergeInfo;
365,337✔
198
  SSortHandle*                pHandle = pSortMergeInfo->pSortHandle;
365,337✔
199
  SSDataBlock*                pDataBlock = pInfo->binfo.pRes;
365,337✔
200
  SArray*                     pColMatchInfo = pSortMergeInfo->matchInfo.pList;
365,337✔
201
  int32_t                     capacity = pOperator->resultInfo.capacity;
365,337✔
202
  int32_t                     code = 0;
365,337✔
203
  bool                        newgroup = false;
365,337✔
204

205
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
365,337✔
206
  blockDataCleanup(pDataBlock);
365,338✔
207

208
  if (pSortMergeInfo->pIntermediateBlock == NULL) {
365,336✔
209
    pSortMergeInfo->pIntermediateBlock = NULL;
95,883✔
210
    code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock);
95,883✔
211
    if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) {
95,884!
212
      return code;
×
213
    }
214

215
    code = blockDataEnsureCapacity(pSortMergeInfo->pIntermediateBlock, capacity);
95,884✔
216
    if (code) {
95,885!
217
      return code;
×
218
    }
219

220
  } else {
221
    blockDataCleanup(pSortMergeInfo->pIntermediateBlock);
269,453✔
222
  }
223

224
  SSDataBlock* p = pSortMergeInfo->pIntermediateBlock;
365,338✔
225
  while (1) {
93✔
226
    code = doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
365,431✔
227
    if (code) {
365,431!
228
      return code;
×
229
    }
230

231
    if (p->info.rows == 0) {
365,431✔
232
      break;
76,592✔
233
    }
234

235
    if (newgroup) {
288,839✔
236
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
143,663✔
237
    }
238

239
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
288,839✔
240

241
    if (p->info.rows > 0) {
288,840✔
242
      break;
288,747✔
243
    }
244
  }
245

246
  if (p->info.rows > 0) {
365,339✔
247
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
288,747✔
248
    for (int32_t i = 0; i < numOfCols; ++i) {
1,334,143✔
249
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
1,045,400✔
250
      if (pmInfo == NULL) {
1,045,395!
251
        code = terrno;
×
252
        return code;
×
253
      }
254

255
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
1,045,395✔
256
      if (pSrc == NULL) {
1,045,393!
257
        code = terrno;
×
258
        return code;
×
259
      }
260

261
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
1,045,393✔
262
      if (pDst == NULL) {
1,045,390!
263
        code = terrno;
×
264
        return code;
×
265
      }
266

267
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
1,045,390✔
268
      if (code) {
1,045,396!
269
        return code;
×
270
      }
271
    }
272

273
    pDataBlock->info.rows = p->info.rows;
288,743✔
274
    pDataBlock->info.scanFlag = p->info.scanFlag;
288,743✔
275
    if (pInfo->ignoreGroupId) {
288,743✔
276
      pDataBlock->info.id.groupId = 0;
8✔
277
    } else {
278
      pDataBlock->info.id.groupId = pInfo->groupId;
288,735✔
279
    }
280
    pDataBlock->info.dataLoad = 1;
288,743✔
281
  }
282

283
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
365,335✔
284
         pDataBlock->info.rows);
285

286
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
365,336✔
287
  return code;
365,336✔
288
}
289

290
int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
1,644✔
291
  SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
1,644!
292
  if (pSortExecInfo == NULL) {
1,644!
293
    pOptr->pTaskInfo->code = terrno;
×
294
    return terrno;
×
295
  }
296

297
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
1,644✔
298
  SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
1,644✔
299

300
  *pSortExecInfo = tsortGetSortExecInfo(pSortMergeInfo->pSortHandle);
1,644✔
301
  *pOptrExplain = pSortExecInfo;
1,644✔
302

303
  *len = sizeof(SSortExecInfo);
1,644✔
304
  return TSDB_CODE_SUCCESS;
1,644✔
305
}
306

307

308
void destroySortMergeOperatorInfo(void* param) {
96,063✔
309
  SSortMergeInfo* pSortMergeInfo = param;
96,063✔
310
  blockDataDestroy(pSortMergeInfo->pInputBlock);
96,063✔
311
  pSortMergeInfo->pInputBlock = NULL;
96,063✔
312

313
  blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
96,063✔
314
  pSortMergeInfo->pIntermediateBlock = NULL;
96,064✔
315

316
  taosArrayDestroy(pSortMergeInfo->matchInfo.pList);
96,064✔
317

318
  tsortDestroySortHandle(pSortMergeInfo->pSortHandle);
96,064✔
319
  taosArrayDestroy(pSortMergeInfo->pSortInfo);
96,062✔
320
}
96,063✔
321

322
#define NON_SORT_NEXT_SRC(_info, _idx) ((++(_idx) >= (_info)->sourceNum) ? ((_info)->sourceWorkIdx) : (_idx))
323

324
int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
89✔
325
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
89✔
326
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
89✔
327
  SNonSortMergeInfo*          pNonSortMergeInfo = &pInfo->nsortMergeInfo;
89✔
328

329
  pNonSortMergeInfo->sourceWorkIdx = 0;
89✔
330
  pNonSortMergeInfo->sourceNum = pOperator->numOfDownstream;
89✔
331
  pNonSortMergeInfo->lastSourceIdx = -1;
89✔
332
  pNonSortMergeInfo->pSourceStatus =
178✔
333
      taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pNonSortMergeInfo->pSourceStatus));
89!
334
  if (NULL == pNonSortMergeInfo->pSourceStatus) {
89!
335
    pTaskInfo->code = terrno;
×
336
    return pTaskInfo->code;
×
337
  }
338

339
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
557✔
340
    pNonSortMergeInfo->pSourceStatus[i] = i;
468✔
341
  }
342

343
  return 0;
89✔
344
}
345

346
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
1,008✔
347
  QRY_PARAM_CHECK(pResBlock);
1,008!
348

349
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
1,008✔
350
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
1,008✔
351
  SNonSortMergeInfo*          pNonSortMerge = &pInfo->nsortMergeInfo;
1,008✔
352
  SSDataBlock*                pBlock = NULL;
1,008✔
353
  SSDataBlock*                pRes = pInfo->binfo.pRes;
1,008✔
354
  int32_t                     code = 0;
1,008✔
355

356
  qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
1,008✔
357

358
  int32_t idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
1,008!
359
  while (idx < pNonSortMerge->sourceNum) {
1,476✔
360
    pBlock = getNextBlockFromDownstream(pOperator, pNonSortMerge->pSourceStatus[idx]);
1,387✔
361
    if (NULL == pBlock) {
1,387✔
362
      TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
468✔
363
      pNonSortMerge->sourceWorkIdx++;
468✔
364
      idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
468✔
365
      continue;
468✔
366
    }
367

368
    pNonSortMerge->lastSourceIdx = idx - 1;
919✔
369
    break;
919✔
370
  }
371

372
  if (!pBlock) {  // null data
1,008✔
373
    return code;
89✔
374
  }
375

376
  code = copyDataBlock(pRes, pBlock);
919✔
377
  *pResBlock = pRes;
919✔
378
  return code;
919✔
379
}
380

381
void destroyNonSortMergeOperatorInfo(void* param) {
89✔
382
  SNonSortMergeInfo* pNonSortMerge = param;
89✔
383
  taosMemoryFree(pNonSortMerge->pSourceStatus);
89!
384
}
89✔
385

386
int32_t getNonSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
387
  return TSDB_CODE_SUCCESS;
×
388
}
389

390

391
int32_t openColsMergeOperator(SOperatorInfo* pOperator) {
254✔
392
  return TSDB_CODE_SUCCESS;
254✔
393
}
394

395
int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock* pDst, SSDataBlock* pSrc) {
508✔
396
  bool    isNull = (NULL == pSrc || pSrc->info.rows <= 0);
508!
397
  size_t  numOfCols = LIST_LENGTH(pNodeList);
508!
398
  int32_t code = 0;
508✔
399

400
  for (int32_t i = 0; i < numOfCols; ++i) {
5,146✔
401
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
4,638✔
402
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == targetBlkId) {
4,638!
403
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, pNode->slotId);
2,319✔
404
      if (pDstCol == NULL) {
2,319!
405
        return terrno;
×
406
      }
407

408
      if (isNull) {
2,319!
409
        code = colDataSetVal(pDstCol, 0, NULL, true);
×
410
      } else {
411
        SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, ((SColumnNode*)pNode->pExpr)->slotId);
2,319✔
412
        if (pSrcCol == NULL) {
2,319!
413
          code = terrno;
×
414
          return code;
×
415
        }
416

417
        code = colDataAssign(pDstCol, pSrcCol, 1, &pDst->info);
2,319✔
418
      }
419

420
      if (code) {
2,319!
421
        break;
×
422
      }
423
    }
424
  }
425

426
  return code;
508✔
427
}
428

429
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
254✔
430
  QRY_PARAM_CHECK(pResBlock);
254!
431

432
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
254✔
433
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
254✔
434
  SSDataBlock*                pBlock = NULL;
254✔
435
  SColsMergeInfo*             pColsMerge = &pInfo->colsMergeInfo;
254✔
436
  int32_t                     nullBlkNum = 0;
254✔
437
  int32_t                     code = 0;
254✔
438

439
  qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo));
254✔
440

441
  for (int32_t i = 0; i < pColsMerge->sourceNum; ++i) {
762✔
442
    pBlock = getNextBlockFromDownstream(pOperator, i);
508✔
443
    if (pBlock && pBlock->info.rows > 1) {
508!
444
      qError("more than 1 row returned from downstream, rows:%" PRId64, pBlock->info.rows);
×
445
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
446
    } else if (NULL == pBlock) {
508!
447
      nullBlkNum++;
×
448
    }
449
    
450
    code = copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock);
508✔
451
    if (code) {
508!
452
      return code;
×
453
    }
454
  }
455

456
  setOperatorCompleted(pOperator);
254✔
457
  if (pColsMerge->sourceNum == nullBlkNum) {
254!
458
    return code;
×
459
  }
460

461
  pInfo->binfo.pRes->info.rows = 1;
254✔
462
  *pResBlock = pInfo->binfo.pRes;
254✔
463

464
  return code;
254✔
465
}
466

467
void destroyColsMergeOperatorInfo(void* param) {
254✔
468
  SColsMergeInfo* pColsMergeInfo = param;
254✔
469
  taosMemoryFreeClear(pColsMergeInfo->srcBlkIds);
254!
470
}
254✔
471

472
int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
473
  return TSDB_CODE_SUCCESS;
×
474
}
475

476

477
SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = {
478
  {0},
479
  {._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo},
480
  {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo},
481
  {._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo},
482
};
483

484

485
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
366,577✔
486
  int32_t code = 0;
366,577✔
487
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
366,577✔
488
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
366,577✔
489

490
  if (OPTR_IS_OPENED(pOperator)) {
366,577✔
491
    return TSDB_CODE_SUCCESS;
270,371✔
492
  }
493

494
  int64_t startTs = taosGetTimestampUs();
96,221✔
495
  
496
  if (NULL != gMultiwayMergeFps[pInfo->type]._openFn) {
96,221!
497
    code = (*gMultiwayMergeFps[pInfo->type]._openFn)(pOperator);
96,225✔
498
  }
499

500
  pOperator->cost.openCost = (taosGetTimestampUs() - startTs) / 1000.0;
96,228✔
501
  pOperator->status = OP_RES_TO_RETURN;
96,228✔
502

503
  if (code != TSDB_CODE_SUCCESS) {
96,228!
504
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
505
    pOperator->pTaskInfo->code = code;
×
506
    T_LONG_JMP(pTaskInfo->env, terrno);
×
507
  }
508

509
  OPTR_SET_OPENED(pOperator);
96,228✔
510
  return code;
96,228✔
511
}
512

513
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
366,873✔
514
  QRY_PARAM_CHECK(pResBlock);
366,873!
515
  int32_t code = TSDB_CODE_SUCCESS;
366,873✔
516
  int32_t lino = 0;
366,873✔
517

518
  if (pOperator->status == OP_EXEC_DONE) {
366,873✔
519
    return 0;
293✔
520
  }
521

522
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
366,580✔
523
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
366,580✔
524

525
  code = pOperator->fpSet._openFn(pOperator);
366,580✔
526
  QUERY_CHECK_CODE(code, lino, _end);
366,599!
527

528
  if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
366,599!
529
    code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock);
366,599✔
530
    QUERY_CHECK_CODE(code, lino, _end);
366,596!
531
  }
532

533
  if ((*pResBlock) != NULL) {
366,596✔
534
    pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
289,919✔
535
    code = blockDataCheck(*pResBlock);
289,919✔
536
    QUERY_CHECK_CODE(code, lino, _end);
289,919!
537
  } else {
538
    setOperatorCompleted(pOperator);
76,677✔
539
  }
540

541
_end:
366,596✔
542
  if (code != TSDB_CODE_SUCCESS) {
366,596!
543
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
544
    pTaskInfo->code = code;
×
545
    T_LONG_JMP(pTaskInfo->env, code);
×
546
  }
547
  return code;
366,596✔
548
}
549

550
void destroyMultiwayMergeOperatorInfo(void* param) {
96,407✔
551
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
96,407✔
552
  blockDataDestroy(pInfo->binfo.pRes);
96,407✔
553
  pInfo->binfo.pRes = NULL;
96,407✔
554

555
  if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) {
96,407!
556
    (*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo);
96,407✔
557
  }
558

559
  taosMemoryFreeClear(param);
96,406!
560
}
96,407✔
561

562
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
1,644✔
563
  int32_t code = 0;
1,644✔
564
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
1,644✔
565

566
  if (NULL != gMultiwayMergeFps[pInfo->type].getExplainFn) {
1,644!
567
    code = (*gMultiwayMergeFps[pInfo->type].getExplainFn)(pOptr, pOptrExplain, len);
1,644✔
568
  }
569

570
  return code;
1,644✔
571
}
572

573
int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SMergePhysiNode* pMergePhyNode,
96,395✔
574
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
575
  QRY_PARAM_CHECK(pOptrInfo);
96,395!
576

577
  SPhysiNode*                 pPhyNode = (SPhysiNode*)pMergePhyNode;
96,395✔
578
  int32_t                     lino = 0;
96,395✔
579
  int32_t                     code = TSDB_CODE_SUCCESS;
96,395✔
580
  SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
96,395!
581
  SOperatorInfo*              pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
96,399!
582
  SDataBlockDescNode*         pDescNode = pPhyNode->pOutputDataBlockDesc;
96,395✔
583
  if (pInfo == NULL || pOperator == NULL) {
96,395!
584
    code = terrno;
×
585
    goto _error;
×
586
  }
587

588
  pInfo->groupMerge = pMergePhyNode->groupSort;
96,398✔
589
  pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId;
96,398✔
590
  pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
96,398✔
591
  pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
96,398✔
592
  pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;
96,398✔
593

594
  pInfo->type = pMergePhyNode->type;
96,398✔
595
  switch (pInfo->type) {
96,398!
596
    case MERGE_TYPE_SORT: {
96,058✔
597
      SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
96,058✔
598
      initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
96,058✔
599
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
96,058✔
600
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
96,060!
601

602
      SPhysiNode*  pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
96,060✔
603
      SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
96,060✔
604
      TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno);
96,061!
605
      pSortMergeInfo->pInputBlock = pInputBlock;
96,061✔
606

607
      initResultSizeInfo(&pOperator->resultInfo, 1024);
96,061✔
608
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
96,055✔
609
      TSDB_CHECK_CODE(code, lino, _error);
96,061!
610

611
      size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
96,061✔
612
      int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
96,061✔
613
      int32_t numOfOutputCols = 0;
96,061✔
614
      pSortMergeInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
96,061✔
615
      pSortMergeInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
96,061✔
616
      pSortMergeInfo->sortBufSize =
96,060✔
617
          pSortMergeInfo->bufPageSize * (numStreams + 1);  // one additional is reserved for merged result.
96,060✔
618
      code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
96,060✔
619
                                 &pSortMergeInfo->matchInfo);
620
      if (code != TSDB_CODE_SUCCESS) {
96,051!
621
        goto _error;
×
622
      }
623
      break;
96,051✔
624
    }
625
    case MERGE_TYPE_NON_SORT: {
89✔
626
      SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
89✔
627
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
89✔
628
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
89!
629

630
      initResultSizeInfo(&pOperator->resultInfo, 1024);
89✔
631
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
89✔
632
      TSDB_CHECK_CODE(code, lino, _error);
89!
633

634
      break;
89✔
635
    }
636
    case MERGE_TYPE_COLUMNS: {
254✔
637
      SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
254✔
638
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
254✔
639
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
254!
640

641
      initResultSizeInfo(&pOperator->resultInfo, 1);
254✔
642
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
254✔
643
      TSDB_CHECK_CODE(code, lino, _error);
254!
644

645
      pColsMerge->pTargets = pMergePhyNode->pTargets;
254✔
646
      pColsMerge->sourceNum = numStreams;
254✔
647
      pColsMerge->srcBlkIds = taosMemoryCalloc(numStreams, sizeof(uint64_t));
254!
648
      for (size_t i = 0; i < numStreams; ++i) {
762✔
649
        pColsMerge->srcBlkIds[i] = getOperatorResultBlockId(downStreams[i], 0);
508✔
650
      }
651
      break;
254✔
652
    }
UNCOV
653
    default:
×
UNCOV
654
      qError("Invalid merge type: %d", pInfo->type);
×
655
      code = TSDB_CODE_INVALID_PARA;
×
656
      goto _error;
×
657
  }
658

659
  setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo,
96,394✔
660
                  pTaskInfo);
661
  pOperator->fpSet =
662
      createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, destroyMultiwayMergeOperatorInfo,
96,388✔
663
                          optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL);
664

665
  code = appendDownstream(pOperator, downStreams, numStreams);
96,390✔
666
  if (code != TSDB_CODE_SUCCESS) {
96,395!
667
    goto _error;
×
668
  }
669

670
  *pOptrInfo = pOperator;
96,395✔
671
  return TSDB_CODE_SUCCESS;
96,395✔
672

673
_error:
×
674
  if (pInfo != NULL) {
×
675
    destroyMultiwayMergeOperatorInfo(pInfo);
×
676
  }
677
  pTaskInfo->code = code;
×
678
  destroyOperatorAndDownstreams(pOperator, downStreams, numStreams);
×
679
  return code;
×
680
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc