• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4932

19 Jan 2026 12:29PM UTC coverage: 66.646% (-0.1%) from 66.749%
#4932

push

travis-ci

web-flow
chore: upgrade taospy (#34272)

202981 of 304565 relevant lines covered (66.65%)

126831443.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.42
/source/libs/executor/src/mergeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortMergeInfo {
23
  SArray*        pSortInfo;
24
  SSortHandle*   pSortHandle;
25
  STupleHandle*  prefetchedTuple;
26
  int32_t        bufPageSize;
27
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
28
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
29
  SSDataBlock*   pInputBlock;
30
  SColMatchInfo  matchInfo;
31
} SSortMergeInfo;
32

33
typedef struct SNonSortMergeInfo {
34
  int32_t  lastSourceIdx;
35
  int32_t  sourceWorkIdx;
36
  int32_t  sourceNum;
37
  int32_t* pSourceStatus;
38
} SNonSortMergeInfo;
39

40
typedef struct SColsMergeInfo {
41
  SNodeList* pTargets;
42
  size_t     sourceNum;
43
  uint64_t*  srcBlkIds;
44
} SColsMergeInfo;
45

46
typedef struct SMultiwayMergeOperatorInfo {
47
  SOptrBasicInfo binfo;
48
  EMergeType     type;
49
  union {
50
    SSortMergeInfo    sortMergeInfo;
51
    SNonSortMergeInfo nsortMergeInfo;
52
    SColsMergeInfo    colsMergeInfo;
53
  };
54
  SLimitInfo     limitInfo;
55
  bool           groupMerge;
56
  bool           ignoreGroupId;
57
  uint64_t       groupId;
58
  bool           inputWithGroupId;
59
} SMultiwayMergeOperatorInfo;
60

61
static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
62
static int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
63
static int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
64
static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
65
static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
66

67
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
119,328,513✔
68
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
119,328,513✔
69
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
119,328,513✔
70
  if (code) {
119,328,997✔
71
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
72
  }
73
  code = blockDataCheck(*ppBlock);
119,328,997✔
74
  if (code) {
119,328,997✔
75
    qError("failed to check data block got from upstream, %s code:%s", __func__, tstrerror(code));
×
76
  }
77
  return code;
119,328,997✔
78
}
79

80
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
16,631,372✔
81
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
16,631,372✔
82
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
16,631,856✔
83
  SSortMergeInfo*             pSortMergeInfo = &pInfo->sortMergeInfo;
16,631,372✔
84

85
  int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize;
16,631,372✔
86

87
  pSortMergeInfo->pSortHandle = NULL;
16,631,856✔
88
  int32_t code = tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize,
33,262,744✔
89
                                       numOfBufPage, pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pSortMergeInfo->pSortHandle);
16,631,372✔
90
  if (code) {
16,631,020✔
91
    return code;
×
92
  }
93

94
  tsortSetFetchRawDataFp(pSortMergeInfo->pSortHandle, sortMergeloadNextDataBlock, NULL, NULL);
16,631,020✔
95
  tsortSetCompareGroupId(pSortMergeInfo->pSortHandle, pInfo->groupMerge);
16,631,020✔
96

97
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
54,927,258✔
98
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
38,296,238✔
99
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
38,295,402✔
100
      code = pDownstream->fpSet._openFn(pDownstream);
38,296,581✔
101
      if (code) {
38,296,581✔
102
        return code;
×
103
      }
104
    }
105

106
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
38,296,238✔
107
    if (ps == NULL) {
38,296,238✔
108
      return terrno;
×
109
    }
110

111
    ps->param = pDownstream;
38,296,238✔
112
    ps->onlyRef = true;
38,296,238✔
113

114
    code = tsortAddSource(pSortMergeInfo->pSortHandle, ps);
38,296,238✔
115
    if (code) {
38,296,238✔
116
      return code;
×
117
    }
118
  }
119

120
  return tsortOpen(pSortMergeInfo->pSortHandle);
16,631,856✔
121
}
122

123
static int32_t doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
57,421,868✔
124
                                 SSDataBlock* p, bool* newgroup) {
125
  SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
57,421,868✔
126
  *newgroup = false;
57,421,868✔
127
  int32_t code = 0;
57,421,868✔
128

129
  while (1) {
2,147,483,647✔
130
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
131
    if (pInfo->groupMerge || pInfo->inputWithGroupId) {
2,147,483,647✔
132
      if (pSortMergeInfo->prefetchedTuple == NULL) {
2,147,483,647✔
133
        pTupleHandle = NULL;
2,147,483,647✔
134
        code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
135
        if (code) {
136
          // todo handle error
137
        }
138
      } else {
139
        pTupleHandle = pSortMergeInfo->prefetchedTuple;
15,665,699✔
140
        pSortMergeInfo->prefetchedTuple = NULL;
15,665,699✔
141
        uint64_t gid = tsortGetGroupId(pTupleHandle);
15,665,699✔
142
        if (gid != pInfo->groupId) {
15,665,699✔
143
          *newgroup = true;
15,665,699✔
144
          pInfo->groupId = gid;
15,665,699✔
145
        }
146
      }
147
    } else {
148
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,099,396,707✔
149
      pInfo->groupId = 0;
2,098,144,892✔
150
    }
151

152
    if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647✔
153
      break;
154
    }
155

156
    if (pInfo->groupMerge || pInfo->inputWithGroupId) {
2,147,483,647✔
157
      uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
2,147,483,647✔
158
      if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
2,147,483,647✔
159
        code = appendOneRowToDataBlock(p, pTupleHandle);
2,147,483,647✔
160
        if (code) {
2,147,483,647✔
161
          return code;
×
162
        }
163

164
        p->info.id.groupId = tupleGroupId;
2,147,483,647✔
165
        pInfo->groupId = tupleGroupId;
2,147,483,647✔
166
      } else {
167
        if (p->info.rows == 0) {
15,865,697✔
168
          code = appendOneRowToDataBlock(p, pTupleHandle);
×
169
          if (code) {
×
170
            return code;
×
171
          }
172

173
          p->info.id.groupId = pInfo->groupId = tupleGroupId;
×
174
        } else {
175
          pSortMergeInfo->prefetchedTuple = pTupleHandle;
15,865,697✔
176
          break;
15,865,697✔
177
        }
178
      }
179
    } else {
180
      code = appendOneRowToDataBlock(p, pTupleHandle);
2,085,046,331✔
181
      if (code) {
2,085,887,283✔
182
        return code;
×
183
      }
184
    }
185

186
    if (p->info.rows >= capacity) {
2,147,483,647✔
187
      break;
13,950,071✔
188
    }
189
  }
190

191
  return code;
57,421,868✔
192
}
193

194
int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
57,406,483✔
195
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
57,406,483✔
196
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
57,406,483✔
197
  SSortMergeInfo*             pSortMergeInfo = &pInfo->sortMergeInfo;
57,405,982✔
198
  SSortHandle*                pHandle = pSortMergeInfo->pSortHandle;
57,405,982✔
199
  SSDataBlock*                pDataBlock = pInfo->binfo.pRes;
57,406,483✔
200
  SArray*                     pColMatchInfo = pSortMergeInfo->matchInfo.pList;
57,405,982✔
201
  int32_t                     capacity = pOperator->resultInfo.capacity;
57,406,483✔
202
  int32_t                     code = 0;
57,406,835✔
203
  bool                        newgroup = false;
57,406,835✔
204

205
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
57,405,982✔
206
  blockDataCleanup(pDataBlock);
57,406,334✔
207

208
  if (pSortMergeInfo->pIntermediateBlock == NULL) {
57,406,835✔
209
    pSortMergeInfo->pIntermediateBlock = NULL;
16,631,856✔
210
    code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock);
16,631,856✔
211
    if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) {
16,631,856✔
212
      return code;
×
213
    }
214

215
    code = blockDataEnsureCapacity(pSortMergeInfo->pIntermediateBlock, capacity);
16,631,856✔
216
    if (code) {
16,631,856✔
217
      return code;
×
218
    }
219

220
  } else {
221
    blockDataCleanup(pSortMergeInfo->pIntermediateBlock);
40,774,979✔
222
  }
223

224
  SSDataBlock* p = pSortMergeInfo->pIntermediateBlock;
57,406,835✔
225
  while (1) {
15,033✔
226
    code = doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
57,421,868✔
227
    if (code) {
57,421,868✔
228
      return code;
×
229
    }
230

231
    if (p->info.rows == 0) {
57,421,868✔
232
      break;
12,409,669✔
233
    }
234

235
    if (newgroup) {
45,012,199✔
236
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
15,665,699✔
237
    }
238

239
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
45,012,199✔
240

241
    if (p->info.rows > 0) {
45,012,199✔
242
      break;
44,997,166✔
243
    }
244
  }
245

246
  if (p->info.rows > 0) {
57,406,835✔
247
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
44,997,166✔
248
    for (int32_t i = 0; i < numOfCols; ++i) {
158,213,308✔
249
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
113,216,142✔
250
      if (pmInfo == NULL) {
113,216,142✔
251
        code = terrno;
×
252
        return code;
×
253
      }
254

255
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
113,216,142✔
256
      if (pSrc == NULL) {
113,216,142✔
257
        code = terrno;
×
258
        return code;
×
259
      }
260

261
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
113,216,142✔
262
      if (pDst == NULL) {
113,216,142✔
263
        code = terrno;
×
264
        return code;
×
265
      }
266

267
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
113,216,142✔
268
      if (code) {
113,216,142✔
269
        return code;
×
270
      }
271
    }
272

273
    pDataBlock->info.rows = p->info.rows;
44,997,166✔
274
    pDataBlock->info.scanFlag = p->info.scanFlag;
44,997,166✔
275
    if (pInfo->ignoreGroupId) {
44,997,166✔
276
      pDataBlock->info.id.groupId = 0;
3,056✔
277
    } else {
278
      pDataBlock->info.id.groupId = pInfo->groupId;
44,993,591✔
279
    }
280
    pDataBlock->info.dataLoad = 1;
44,996,647✔
281
  }
282

283
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
57,406,835✔
284
         pDataBlock->info.rows);
285

286
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
57,406,835✔
287
  return code;
57,406,835✔
288
}
289

290
int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
444,466✔
291
  SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
444,466✔
292
  if (pSortExecInfo == NULL) {
444,466✔
293
    pOptr->pTaskInfo->code = terrno;
×
294
    return terrno;
×
295
  }
296

297
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
444,466✔
298
  SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
444,466✔
299

300
  *pSortExecInfo = tsortGetSortExecInfo(pSortMergeInfo->pSortHandle);
444,818✔
301
  *pOptrExplain = pSortExecInfo;
444,818✔
302

303
  *len = sizeof(SSortExecInfo);
444,818✔
304
  return TSDB_CODE_SUCCESS;
444,818✔
305
}
306

307

308
void destroySortMergeOperatorInfo(void* param) {
16,830,699✔
309
  SSortMergeInfo* pSortMergeInfo = param;
16,830,699✔
310
  blockDataDestroy(pSortMergeInfo->pInputBlock);
16,830,699✔
311
  pSortMergeInfo->pInputBlock = NULL;
16,830,699✔
312

313
  blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
16,830,699✔
314
  pSortMergeInfo->pIntermediateBlock = NULL;
16,830,699✔
315

316
  taosArrayDestroy(pSortMergeInfo->matchInfo.pList);
16,830,699✔
317

318
  tsortDestroySortHandle(pSortMergeInfo->pSortHandle);
16,830,699✔
319
  taosArrayDestroy(pSortMergeInfo->pSortInfo);
16,830,699✔
320
}
16,830,198✔
321

322
#define NON_SORT_NEXT_SRC(_info, _idx) ((++(_idx) >= (_info)->sourceNum) ? ((_info)->sourceWorkIdx) : (_idx))
323

324
int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
934✔
325
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
934✔
326
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
934✔
327
  SNonSortMergeInfo*          pNonSortMergeInfo = &pInfo->nsortMergeInfo;
934✔
328

329
  pNonSortMergeInfo->sourceWorkIdx = 0;
934✔
330
  pNonSortMergeInfo->sourceNum = pOperator->numOfDownstream;
934✔
331
  pNonSortMergeInfo->lastSourceIdx = -1;
934✔
332
  pNonSortMergeInfo->pSourceStatus =
934✔
333
      taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pNonSortMergeInfo->pSourceStatus));
934✔
334
  if (NULL == pNonSortMergeInfo->pSourceStatus) {
934✔
335
    pTaskInfo->code = terrno;
×
336
    return pTaskInfo->code;
×
337
  }
338

339
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
2,802✔
340
    pNonSortMergeInfo->pSourceStatus[i] = i;
1,868✔
341
  }
342

343
  return 0;
934✔
344
}
345

346
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
49,502✔
347
  QRY_PARAM_CHECK(pResBlock);
49,502✔
348

349
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
49,502✔
350
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
49,502✔
351
  SNonSortMergeInfo*          pNonSortMerge = &pInfo->nsortMergeInfo;
49,502✔
352
  SSDataBlock*                pBlock = NULL;
49,502✔
353
  SSDataBlock*                pRes = pInfo->binfo.pRes;
49,502✔
354
  int32_t                     code = 0;
49,502✔
355

356
  qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
49,502✔
357

358
  int32_t idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
49,502✔
359
  while (idx < pNonSortMerge->sourceNum) {
51,370✔
360
    pBlock = getNextBlockFromDownstream(pOperator, pNonSortMerge->pSourceStatus[idx]);
50,436✔
361
    if (NULL == pBlock) {
50,436✔
362
      TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
1,868✔
363
      pNonSortMerge->sourceWorkIdx++;
1,868✔
364
      idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
1,868✔
365
      continue;
1,868✔
366
    }
367

368
    pNonSortMerge->lastSourceIdx = idx - 1;
48,568✔
369
    break;
48,568✔
370
  }
371

372
  if (!pBlock) {  // null data
49,502✔
373
    return code;
934✔
374
  }
375

376
  code = copyDataBlock(pRes, pBlock);
48,568✔
377
  *pResBlock = pRes;
48,568✔
378
  return code;
48,568✔
379
}
380

381
void destroyNonSortMergeOperatorInfo(void* param) {
934✔
382
  SNonSortMergeInfo* pNonSortMerge = param;
934✔
383
  taosMemoryFree(pNonSortMerge->pSourceStatus);
934✔
384
}
934✔
385

386
int32_t getNonSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
387
  return TSDB_CODE_SUCCESS;
×
388
}
389

390

391
int32_t openColsMergeOperator(SOperatorInfo* pOperator) {
315,828✔
392
  return TSDB_CODE_SUCCESS;
315,828✔
393
}
394

395
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
322,758✔
396
  QRY_PARAM_CHECK(pResBlock);
322,758✔
397

398
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
322,758✔
399
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
322,758✔
400
  SSDataBlock*                pBlock = NULL;
322,758✔
401
  SColsMergeInfo*             pColsMerge = &pInfo->colsMergeInfo;
322,758✔
402
  int32_t                     code = 0;
322,758✔
403
  int32_t                     numOfRows = 0;
322,758✔
404
  int32_t                     lino = 0;
322,758✔
405
  STimeWindow                 timeWindow = {.skey = INT64_MAX, .ekey = INT64_MIN};
322,758✔
406
  bool                        allNull = true;
322,758✔
407
  bool                        isVtableMerge = false;
322,758✔
408

409
  if (pOperator->pOperatorGetParam) {
322,758✔
410
    if (pOperator->status == OP_EXEC_DONE) {
291,060✔
411
      pOperator->status = OP_OPENED;
6,930✔
412
    }
413
    numOfRows = ((SMergeOperatorParam*)(pOperator->pOperatorGetParam)->value)->winNum;
291,060✔
414
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
291,060✔
415
    pOperator->pOperatorGetParam = NULL;
291,060✔
416
    isVtableMerge = true;
291,060✔
417
  } else {
418
    numOfRows = 1;
31,698✔
419
    isVtableMerge = false;
31,698✔
420
  }
421

422
  qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo));
322,758✔
423

424
  blockDataCleanup(pInfo->binfo.pRes);
322,758✔
425
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, numOfRows);
322,758✔
426
  QUERY_CHECK_CODE(code, lino, _return);
322,758✔
427

428
  for (int32_t i = 0; i < pColsMerge->sourceNum; ++i) {
1,086,084✔
429
    if (isVtableMerge) {
763,326✔
430
      SOperatorInfo*  pExtWinOp = pOperator->pDownstream[i];
699,930✔
431
      SOperatorParam* pParam = pOperator->pDownstreamGetParams[i];
699,930✔
432
      code = pExtWinOp->fpSet.getNextExtFn(pExtWinOp, pParam, &pBlock);
699,930✔
433
      setOperatorCompleted(pExtWinOp);
699,930✔
434
      pOperator->pDownstreamGetParams[i] = NULL;
699,930✔
435
      QUERY_CHECK_CODE(code, lino, _return);
699,930✔
436
      if (pBlock) {
699,930✔
437
        printDataBlock(pBlock, __func__, "colsMerge", pTaskInfo->id.queryId);
699,930✔
438
        code = copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock, numOfRows);
699,930✔
439
        QUERY_CHECK_CODE(code, lino, _return);
699,930✔
440

441
        timeWindow.skey = TMIN(timeWindow.skey, pBlock->info.window.skey);
699,930✔
442
        timeWindow.ekey = TMAX(timeWindow.ekey, pBlock->info.window.ekey);
699,930✔
443
        allNull = false;
699,930✔
444
      } else {
445
        code = copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock, numOfRows);
×
446
        QUERY_CHECK_CODE(code, lino, _return);
×
447
      }
448
    } else {
449
      pBlock = getNextBlockFromDownstream(pOperator, i);
63,396✔
450
      if (pBlock) {
63,396✔
451
        code = copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock, numOfRows);
63,396✔
452
        QUERY_CHECK_CODE(code, lino, _return);
63,396✔
453
        allNull = false;
63,396✔
454
      }
455
    }
456
  }
457

458
  setOperatorCompleted(pOperator);
322,758✔
459
  if (allNull) {
322,758✔
460
    return code;
×
461
  }
462

463
  pInfo->binfo.pRes->info.window.skey = timeWindow.skey;
322,758✔
464
  pInfo->binfo.pRes->info.window.ekey = timeWindow.ekey;
322,758✔
465
  pInfo->binfo.pRes->info.rows = numOfRows;
322,758✔
466
  *pResBlock = pInfo->binfo.pRes;
322,758✔
467

468
  return code;
322,758✔
469
_return:
×
470
  qError("failed to merge columns, line:%d code:%s", lino, tstrerror(code));
×
471
  return code;
×
472
}
473

474
void destroyColsMergeOperatorInfo(void* param) {
315,828✔
475
  SColsMergeInfo* pColsMergeInfo = param;
315,828✔
476
  taosMemoryFreeClear(pColsMergeInfo->srcBlkIds);
315,828✔
477
}
315,828✔
478

479
int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
480
  return TSDB_CODE_SUCCESS;
×
481
}
482

483

484
SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = {
485
  {0},
486
  {._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo},
487
  {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo},
488
  {._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo},
489
};
490

491

492
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
57,778,611✔
493
  int32_t code = 0;
57,778,611✔
494
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
57,778,611✔
495
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
57,778,611✔
496

497
  if (OPTR_IS_OPENED(pOperator)) {
57,778,611✔
498
    return TSDB_CODE_SUCCESS;
40,830,477✔
499
  }
500

501
  int64_t startTs = taosGetTimestampUs();
16,948,618✔
502
  
503
  if (NULL != gMultiwayMergeFps[pInfo->type]._openFn) {
16,948,618✔
504
    code = (*gMultiwayMergeFps[pInfo->type]._openFn)(pOperator);
16,948,134✔
505
  }
506

507
  pOperator->cost.openCost = (taosGetTimestampUs() - startTs) / 1000.0;
16,948,618✔
508
  pOperator->status = OP_RES_TO_RETURN;
16,948,618✔
509

510
  if (code != TSDB_CODE_SUCCESS) {
16,948,266✔
511
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
512
    pOperator->pTaskInfo->code = code;
×
513
    T_LONG_JMP(pTaskInfo->env, terrno);
×
514
  }
515

516
  OPTR_SET_OPENED(pOperator);
16,948,266✔
517
  return code;
16,948,266✔
518
}
519

520
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
57,810,822✔
521
  QRY_PARAM_CHECK(pResBlock);
57,810,822✔
522
  int32_t code = TSDB_CODE_SUCCESS;
57,810,822✔
523
  int32_t lino = 0;
57,810,822✔
524

525
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
57,810,822✔
526
    return 0;
32,211✔
527
  }
528

529
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
57,779,095✔
530
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
57,778,611✔
531

532
  code = pOperator->fpSet._openFn(pOperator);
57,779,095✔
533
  QUERY_CHECK_CODE(code, lino, _end);
57,778,743✔
534

535
  if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
57,778,743✔
536
    code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock);
57,778,743✔
537
    QUERY_CHECK_CODE(code, lino, _end);
57,779,095✔
538
  }
539

540
  if ((*pResBlock) != NULL) {
57,779,447✔
541
    pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
45,368,492✔
542
    code = blockDataCheck(*pResBlock);
45,368,492✔
543
    QUERY_CHECK_CODE(code, lino, _end);
45,368,492✔
544
  } else {
545
    setOperatorCompleted(pOperator);
12,410,603✔
546
  }
547

548
_end:
57,779,095✔
549
  if (code != TSDB_CODE_SUCCESS) {
57,779,095✔
550
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
551
    pTaskInfo->code = code;
×
552
    T_LONG_JMP(pTaskInfo->env, code);
×
553
  }
554
  return code;
57,779,095✔
555
}
556

557
void destroyMultiwayMergeOperatorInfo(void* param) {
17,147,461✔
558
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
17,147,461✔
559
  blockDataDestroy(pInfo->binfo.pRes);
17,147,461✔
560
  pInfo->binfo.pRes = NULL;
17,147,461✔
561

562
  if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) {
17,147,461✔
563
    (*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo);
17,147,461✔
564
  }
565

566
  taosMemoryFreeClear(param);
17,146,960✔
567
}
17,147,461✔
568

569
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
444,466✔
570
  int32_t code = 0;
444,466✔
571
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
444,466✔
572

573
  if (NULL != gMultiwayMergeFps[pInfo->type].getExplainFn) {
444,818✔
574
    code = (*gMultiwayMergeFps[pInfo->type].getExplainFn)(pOptr, pOptrExplain, len);
444,466✔
575
  }
576

577
  return code;
444,818✔
578
}
579

580
static int32_t resetMultiwayMergeOperState(SOperatorInfo* pOper) {
10,467✔
581
  SMultiwayMergeOperatorInfo* pInfo = pOper->info;
10,467✔
582
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
10,467✔
583
  SMergePhysiNode* pPhynode = (SMergePhysiNode*)pOper->pPhyNode;
10,467✔
584
  pOper->status = OP_NOT_OPENED;
10,467✔
585

586
  resetBasicOperatorState(&pInfo->binfo);
10,467✔
587
  pInfo->groupId = 0;
10,467✔
588

589
  OPTR_CLR_OPENED(pOper);
10,467✔
590

591
  switch (pInfo->type) {
10,467✔
592
    case MERGE_TYPE_SORT: {
10,467✔
593

594
      SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
10,467✔
595

596
      blockDataCleanup(pSortMergeInfo->pInputBlock);
10,467✔
597

598
      blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
10,467✔
599
      pSortMergeInfo->pIntermediateBlock = NULL;
10,467✔
600

601
      tsortDestroySortHandle(pSortMergeInfo->pSortHandle);
10,467✔
602
      pSortMergeInfo->pSortHandle = NULL;
10,467✔
603
      pSortMergeInfo->prefetchedTuple = NULL;
10,467✔
604

605
      pInfo->limitInfo = (SLimitInfo){0};
10,467✔
606
      initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
10,467✔
607
      break;
10,467✔
608
    }
609
    case MERGE_TYPE_NON_SORT: {
×
610
      pInfo->nsortMergeInfo = (SNonSortMergeInfo){0};
×
611
      break;
×
612
    }
613
    case MERGE_TYPE_COLUMNS: {
×
614
      break;
×
615
    }
616
    default:
×
617
      qError("Invalid merge type: %d", pInfo->type);
×
618
  }
619

620
  return 0;
10,467✔
621
}
622

623
int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SMergePhysiNode* pMergePhyNode,
17,147,461✔
624
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
625
  QRY_PARAM_CHECK(pOptrInfo);
17,147,461✔
626

627
  SPhysiNode*                 pPhyNode = (SPhysiNode*)pMergePhyNode;
17,147,461✔
628
  int32_t                     lino = 0;
17,147,461✔
629
  int32_t                     code = TSDB_CODE_SUCCESS;
17,147,461✔
630
  SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
17,147,461✔
631
  SOperatorInfo*              pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
17,147,461✔
632
  SDataBlockDescNode*         pDescNode = pPhyNode->pOutputDataBlockDesc;
17,147,461✔
633
  if (pInfo == NULL || pOperator == NULL) {
17,147,461✔
634
    code = terrno;
×
635
    goto _error;
×
636
  }
637

638
  pOperator->pPhyNode = pPhyNode;
17,147,461✔
639
  pInfo->groupMerge = pMergePhyNode->groupSort;
17,147,461✔
640
  pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId;
17,147,461✔
641
  pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
17,147,461✔
642
  pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
17,147,461✔
643
  pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;
17,147,461✔
644

645
  pInfo->type = pMergePhyNode->type;
17,147,461✔
646
  switch (pInfo->type) {
17,147,461✔
647
    case MERGE_TYPE_SORT: {
16,830,699✔
648
      SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
16,830,699✔
649
      initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
16,830,699✔
650
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
16,830,699✔
651
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
16,830,699✔
652

653
      SPhysiNode*  pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
16,830,699✔
654
      SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
16,830,699✔
655
      TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno);
16,830,699✔
656
      pSortMergeInfo->pInputBlock = pInputBlock;
16,830,699✔
657

658
      initResultSizeInfo(&pOperator->resultInfo, 1024);
16,830,699✔
659
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
16,830,699✔
660
      TSDB_CHECK_CODE(code, lino, _error);
16,830,699✔
661

662
      size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
16,830,699✔
663
      int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
16,830,699✔
664
      int32_t numOfOutputCols = 0;
16,830,699✔
665
      pSortMergeInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
16,830,699✔
666
      pSortMergeInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
16,830,699✔
667
      pSortMergeInfo->sortBufSize =
16,830,699✔
668
          pSortMergeInfo->bufPageSize * (numStreams + 1);  // one additional is reserved for merged result.
16,830,699✔
669
      code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
16,830,699✔
670
                                 &pSortMergeInfo->matchInfo);
671
      if (code != TSDB_CODE_SUCCESS) {
16,830,699✔
672
        goto _error;
×
673
      }
674
      break;
16,830,699✔
675
    }
676
    case MERGE_TYPE_NON_SORT: {
934✔
677
      SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
934✔
678
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
934✔
679
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
934✔
680

681
      initResultSizeInfo(&pOperator->resultInfo, 1024);
934✔
682
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
934✔
683
      TSDB_CHECK_CODE(code, lino, _error);
934✔
684

685
      break;
934✔
686
    }
687
    case MERGE_TYPE_COLUMNS: {
315,828✔
688
      SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
315,828✔
689
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
315,828✔
690
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
315,828✔
691

692
      initResultSizeInfo(&pOperator->resultInfo, 1);
315,828✔
693
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
315,828✔
694
      TSDB_CHECK_CODE(code, lino, _error);
315,828✔
695

696
      pColsMerge->pTargets = pMergePhyNode->pTargets;
315,828✔
697
      pColsMerge->sourceNum = numStreams;
315,828✔
698
      pColsMerge->srcBlkIds = taosMemoryCalloc(numStreams, sizeof(uint64_t));
315,828✔
699
      for (size_t i = 0; i < numStreams; ++i) {
1,058,364✔
700
        pColsMerge->srcBlkIds[i] = getOperatorResultBlockId(downStreams[i], 0);
742,536✔
701
      }
702
      break;
315,828✔
703
    }
704
    default:
×
705
      qError("Invalid merge type: %d", pInfo->type);
×
706
      code = TSDB_CODE_INVALID_PARA;
×
707
      goto _error;
×
708
  }
709

710
  setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo,
17,147,461✔
711
                  pTaskInfo);
712
  pOperator->fpSet =
713
      createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, destroyMultiwayMergeOperatorInfo,
17,147,461✔
714
                          optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL);
715

716
  setOperatorResetStateFn(pOperator, resetMultiwayMergeOperState);
17,147,461✔
717
  code = appendDownstream(pOperator, downStreams, numStreams);
17,147,461✔
718
  if (code != TSDB_CODE_SUCCESS) {
17,147,461✔
719
    goto _error;
×
720
  }
721

722
  *pOptrInfo = pOperator;
17,147,461✔
723
  return TSDB_CODE_SUCCESS;
17,147,461✔
724

725
_error:
×
726
  if (pInfo != NULL) {
×
727
    destroyMultiwayMergeOperatorInfo(pInfo);
×
728
  }
729
  pTaskInfo->code = code;
×
730
  destroyOperatorAndDownstreams(pOperator, downStreams, numStreams);
×
731
  return code;
×
732
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc