• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5016

03 Apr 2026 03:59PM UTC coverage: 72.299% (+0.01%) from 72.289%
#5016

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4055 of 5985 new or added lines in 68 files covered. (67.75%)

13126 existing lines in 156 files now uncovered.

257424 of 356056 relevant lines covered (72.3%)

133108577.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.07
/source/libs/executor/src/mergeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortMergeInfo {
23
  SArray*        pSortInfo;
24
  SSortHandle*   pSortHandle;
25
  STupleHandle*  prefetchedTuple;
26
  int32_t        bufPageSize;
27
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
28
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
29
  SSDataBlock*   pInputBlock;
30
  SColMatchInfo  matchInfo;
31
} SSortMergeInfo;
32

33
typedef struct SNonSortMergeInfo {
34
  int32_t  lastSourceIdx;
35
  int32_t  sourceWorkIdx;
36
  int32_t  sourceNum;
37
  int32_t* pSourceStatus;
38
} SNonSortMergeInfo;
39

40
typedef struct SColsMergeInfo {
41
  SNodeList* pTargets;
42
  size_t     sourceNum;
43
  int64_t*   srcBlkIds;
44
} SColsMergeInfo;
45

46
typedef struct SMultiwayMergeOperatorInfo {
47
  SOptrBasicInfo binfo;
48
  EMergeType     type;
49
  union {
50
    SSortMergeInfo    sortMergeInfo;
51
    SNonSortMergeInfo nsortMergeInfo;
52
    SColsMergeInfo    colsMergeInfo;
53
  };
54
  SLimitInfo     limitInfo;
55
  bool           groupMerge;
56
  bool           ignoreGroupId;
57
  uint64_t       groupId;
58
  bool           inputWithGroupId;
59
} SMultiwayMergeOperatorInfo;
60

61
static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
62
static int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
63
static int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
64
static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
65
static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
66

67
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
95,668,280✔
68
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
95,668,280✔
69
  int32_t        code = TSDB_CODE_SUCCESS;
95,668,280✔
70
  int32_t        lino = 0;
95,668,280✔
71

72
  code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
95,668,280✔
73
  QUERY_CHECK_CODE(code, lino, _return);
95,670,244✔
74

75
  code = blockDataCheck(*ppBlock);
95,670,244✔
76
  QUERY_CHECK_CODE(code, lino, _return);
95,670,628✔
77

78
  return code;
95,670,628✔
79
_return:
×
80
  qError("%s failed to load next data block from upstream, %s line: %d, code:%s", __func__, GET_TASKID(pOperator->pTaskInfo),
×
81
         lino, tstrerror(code));
82
  return code;
×
83
}
84

85
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
13,392,056✔
86
  int32_t                     code = TSDB_CODE_SUCCESS;
13,392,056✔
87
  int32_t                     lino = 0;
13,392,056✔
88
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
13,392,056✔
89
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
13,393,753✔
90
  SSortMergeInfo*             pSortMergeInfo = &pInfo->sortMergeInfo;
13,393,362✔
91
  int32_t                     numOfBufPage = (int32_t)pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize;
13,393,223✔
92

93
  pSortMergeInfo->pSortHandle = NULL;
13,391,274✔
94
  code = tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize,
26,780,442✔
95
                               numOfBufPage, pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pSortMergeInfo->pSortHandle);
13,388,821✔
96
  QUERY_CHECK_CODE(code, lino, _return);
13,389,556✔
97

98
  tsortSetFetchRawDataFp(pSortMergeInfo->pSortHandle, sortMergeloadNextDataBlock, NULL, NULL);
13,389,556✔
99
  tsortSetCompareGroupId(pSortMergeInfo->pSortHandle, pInfo->groupMerge);
13,386,729✔
100

101
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
45,057,964✔
102
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
31,665,105✔
103
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
31,664,040✔
104
      code = pDownstream->fpSet._openFn(pDownstream);
31,666,796✔
105
      QUERY_CHECK_CODE(code, lino, _return);
31,668,249✔
106
      if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[i]) {
31,668,249✔
107
        pDownstream->pOperatorGetParam = pOperator->pDownstreamGetParams[i];
297,652✔
108
        pOperator->pDownstreamGetParams[i] = NULL;
297,652✔
109
      }
110
    }
111

112
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
31,665,882✔
113
    QUERY_CHECK_NULL(ps, code, lino, _return, terrno);
31,666,955✔
114

115
    ps->param = pDownstream;
31,666,955✔
116
    ps->onlyRef = true;
31,666,955✔
117

118
    code = tsortAddSource(pSortMergeInfo->pSortHandle, ps);
31,666,955✔
119
    QUERY_CHECK_CODE(code, lino, _return);
31,667,067✔
120
  }
121

122
  code = tsortOpen(pSortMergeInfo->pSortHandle);
13,393,369✔
123
  QUERY_CHECK_CODE(code, lino, _return);
13,393,134✔
124

125
_return:
13,393,134✔
126
  if (code) {
13,393,134✔
127
    qError("%s failed to open sort merge operator, %s at line %d, code:%s", __func__, GET_TASKID(pTaskInfo), lino, tstrerror(code));
×
128
  }
129
  return code;
13,392,366✔
130
}
131

132
static int32_t doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
46,351,905✔
133
                                 SSDataBlock* p, bool* newgroup) {
134
  SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
46,351,905✔
135
  *newgroup = false;
46,351,905✔
136
  int32_t code = 0;
46,351,905✔
137

138
  while (1) {
2,147,483,647✔
139
    STupleHandle* pTupleHandle = NULL;
2,147,483,647✔
140
    if (pInfo->groupMerge || pInfo->inputWithGroupId) {
2,147,483,647✔
141
      if (pSortMergeInfo->prefetchedTuple == NULL) {
2,147,483,647✔
142
        pTupleHandle = NULL;
2,147,483,647✔
143
        code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
144
        if (code) {
145
          // todo handle error
146
        }
147
      } else {
148
        pTupleHandle = pSortMergeInfo->prefetchedTuple;
6,340,135✔
149
        pSortMergeInfo->prefetchedTuple = NULL;
6,340,135✔
150
        uint64_t gid = 0, baseGid = 0;
6,340,135✔
151
        tsortGetGroupId(pTupleHandle, &gid, &baseGid);
6,340,135✔
152
        if (gid != pInfo->groupId) {
6,340,135✔
153
          *newgroup = true;
6,340,135✔
154
          pInfo->groupId = gid;
6,340,135✔
155
        }
156
      }
157
    } else {
158
      code = tsortNextTuple(pHandle, &pTupleHandle);
2,147,483,647✔
159
      pInfo->groupId = 0;
2,147,483,647✔
160
    }
161

162
    if (pTupleHandle == NULL || (code != 0)) {
2,147,483,647✔
163
      break;
164
    }
165

166
    if (pInfo->groupMerge || pInfo->inputWithGroupId) {
2,147,483,647✔
167
      uint64_t tupleGroupId = 0, tupleBaseGid = 0;
2,147,483,647✔
168
      tsortGetGroupId(pTupleHandle, &tupleGroupId, &tupleBaseGid);
2,147,483,647✔
169
      if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
2,147,483,647✔
170
        code = appendOneRowToDataBlock(p, pTupleHandle);
2,147,483,647✔
171
        if (code) {
2,147,483,647✔
UNCOV
172
          return code;
×
173
        }
174

175
        p->info.id.groupId = tupleGroupId;
2,147,483,647✔
176
        p->info.id.baseGId = tupleBaseGid;
2,147,483,647✔
177
        pInfo->groupId = tupleGroupId;
2,147,483,647✔
178
      } else {
179
        if (p->info.rows == 0) {
6,363,456✔
UNCOV
180
          code = appendOneRowToDataBlock(p, pTupleHandle);
×
UNCOV
181
          if (code) {
×
UNCOV
182
            return code;
×
183
          }
184

185
          p->info.id.groupId = pInfo->groupId = tupleGroupId;
×
NEW
186
          p->info.id.baseGId = tupleBaseGid;
×
187
        } else {
188
          pSortMergeInfo->prefetchedTuple = pTupleHandle;
6,363,456✔
189
          break;
6,363,456✔
190
        }
191
      }
192
    } else {
193
      code = appendOneRowToDataBlock(p, pTupleHandle);
2,147,483,647✔
194
      if (code) {
2,147,483,647✔
UNCOV
195
        return code;
×
196
      }
197
    }
198

199
    if (p->info.rows >= capacity) {
2,147,483,647✔
200
      break;
16,915,071✔
201
    }
202
  }
203

204
  return code;
46,351,905✔
205
}
206

207
int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
46,334,823✔
208
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
46,334,823✔
209
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
46,334,823✔
210
  SSortMergeInfo*             pSortMergeInfo = &pInfo->sortMergeInfo;
46,334,040✔
211
  SSortHandle*                pHandle = pSortMergeInfo->pSortHandle;
46,334,439✔
212
  SSDataBlock*                pDataBlock = pInfo->binfo.pRes;
46,334,055✔
213
  SArray*                     pColMatchInfo = pSortMergeInfo->matchInfo.pList;
46,334,040✔
214
  int32_t                     capacity = pOperator->resultInfo.capacity;
46,334,439✔
215
  int32_t                     code = 0;
46,334,055✔
216
  bool                        newgroup = false;
46,334,055✔
217

218
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
46,334,040✔
219
  blockDataCleanup(pDataBlock);
46,334,439✔
220

221
  if (pSortMergeInfo->pIntermediateBlock == NULL) {
46,334,823✔
222
    pSortMergeInfo->pIntermediateBlock = NULL;
13,393,134✔
223
    code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock);
13,393,134✔
224
    if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) {
13,393,134✔
UNCOV
225
      return code;
×
226
    }
227

228
    code = blockDataEnsureCapacity(pSortMergeInfo->pIntermediateBlock, capacity);
13,393,134✔
229
    if (code) {
13,393,134✔
UNCOV
230
      return code;
×
231
    }
232

233
  } else {
234
    blockDataCleanup(pSortMergeInfo->pIntermediateBlock);
32,941,689✔
235
  }
236

237
  SSDataBlock* p = pSortMergeInfo->pIntermediateBlock;
46,334,823✔
238
  while (1) {
17,082✔
239
    code = doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
46,351,905✔
240
    if (code) {
46,351,905✔
UNCOV
241
      return code;
×
242
    }
243

244
    if (p->info.rows == 0) {
46,351,905✔
245
      break;
10,792,391✔
246
    }
247

248
    if (newgroup) {
35,559,514✔
249
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
6,340,135✔
250
    }
251

252
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
35,559,514✔
253

254
    if (p->info.rows > 0) {
35,559,514✔
255
      break;
35,542,432✔
256
    }
257
  }
258

259
  if (p->info.rows > 0) {
46,334,823✔
260
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
35,542,432✔
261
    for (int32_t i = 0; i < numOfCols; ++i) {
133,118,946✔
262
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
97,577,455✔
263
      if (pmInfo == NULL) {
97,577,455✔
UNCOV
264
        code = terrno;
×
UNCOV
265
        return code;
×
266
      }
267

268
      SColumnInfoData* pSrc = getDataBlockColBySlotId(p, pmInfo->srcSlotId, NULL);
97,577,455✔
269
      if (pSrc == NULL) {
97,577,455✔
UNCOV
270
        code = terrno;
×
UNCOV
271
        return code;
×
272
      }
273

274
      SColumnInfoData* pDst = getDataBlockColBySlotId(pDataBlock, pmInfo->dstSlotId, NULL);
97,577,455✔
275
      if (pDst == NULL) {
97,577,455✔
UNCOV
276
        code = terrno;
×
UNCOV
277
        return code;
×
278
      }
279

280
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
97,577,455✔
281
      if (code) {
97,576,514✔
UNCOV
282
        return code;
×
283
      }
284
    }
285

286
    pDataBlock->info.rows = p->info.rows;
35,541,491✔
287
    pDataBlock->info.scanFlag = p->info.scanFlag;
35,541,491✔
288
    if (pInfo->ignoreGroupId) {
35,541,491✔
289
      pDataBlock->info.id.groupId = 0;
3,352✔
290
      pDataBlock->info.id.baseGId = p->info.id.baseGId;
3,352✔
291
    } else {
292
      pDataBlock->info.id.groupId = pInfo->groupId;
35,538,139✔
293
      pDataBlock->info.id.baseGId = p->info.id.baseGId;
35,538,668✔
294
    }
295
    pDataBlock->info.dataLoad = 1;
35,542,020✔
296
  }
297

298
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
46,334,411✔
299
         pDataBlock->info.rows);
300

301
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
46,334,411✔
302
  return code;
46,334,823✔
303
}
304

305
int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
479,426✔
306
  SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
479,426✔
307
  if (pSortExecInfo == NULL) {
479,426✔
UNCOV
308
    pOptr->pTaskInfo->code = terrno;
×
UNCOV
309
    return terrno;
×
310
  }
311

312
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
479,426✔
313
  SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
479,426✔
314

315
  *pSortExecInfo = tsortGetSortExecInfo(pSortMergeInfo->pSortHandle);
479,426✔
316
  *pOptrExplain = pSortExecInfo;
479,825✔
317

318
  *len = sizeof(SSortExecInfo);
479,825✔
319
  return TSDB_CODE_SUCCESS;
479,426✔
320
}
321

322

323
void destroySortMergeOperatorInfo(void* param) {
13,496,493✔
324
  SSortMergeInfo* pSortMergeInfo = param;
13,496,493✔
325
  blockDataDestroy(pSortMergeInfo->pInputBlock);
13,496,493✔
326
  pSortMergeInfo->pInputBlock = NULL;
13,495,964✔
327

328
  blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
13,495,964✔
329
  pSortMergeInfo->pIntermediateBlock = NULL;
13,495,964✔
330

331
  taosArrayDestroy(pSortMergeInfo->matchInfo.pList);
13,495,964✔
332

333
  tsortDestroySortHandle(pSortMergeInfo->pSortHandle);
13,495,964✔
334
  taosArrayDestroy(pSortMergeInfo->pSortInfo);
13,496,493✔
335
}
13,496,090✔
336

337
#define NON_SORT_NEXT_SRC(_info, _idx) ((++(_idx) >= (_info)->sourceNum) ? ((_info)->sourceWorkIdx) : (_idx))
338

339
int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
46,676✔
340
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
46,676✔
341
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
46,676✔
342
  SNonSortMergeInfo*          pNonSortMergeInfo = &pInfo->nsortMergeInfo;
46,676✔
343

344
  pNonSortMergeInfo->sourceWorkIdx = 0;
46,676✔
345
  pNonSortMergeInfo->sourceNum = pOperator->numOfDownstream;
46,676✔
346
  pNonSortMergeInfo->lastSourceIdx = -1;
46,676✔
347
  pNonSortMergeInfo->pSourceStatus =
46,676✔
348
      taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pNonSortMergeInfo->pSourceStatus));
46,676✔
349
  if (NULL == pNonSortMergeInfo->pSourceStatus) {
46,676✔
UNCOV
350
    pTaskInfo->code = terrno;
×
UNCOV
351
    return pTaskInfo->code;
×
352
  }
353

354
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
241,125✔
355
    pNonSortMergeInfo->pSourceStatus[i] = i;
194,449✔
356
  }
357

358
  return 0;
46,676✔
359
}
360

361
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
266,149✔
362
  QRY_PARAM_CHECK(pResBlock);
266,149✔
363

364
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
266,149✔
365
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
266,149✔
366
  SNonSortMergeInfo*          pNonSortMerge = &pInfo->nsortMergeInfo;
266,149✔
367
  SSDataBlock*                pBlock = NULL;
266,149✔
368
  SSDataBlock*                pRes = pInfo->binfo.pRes;
266,149✔
369
  int32_t                     code = 0;
266,149✔
370

371
  qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
266,149✔
372

373
  int32_t idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
266,149✔
374
  while (idx < pNonSortMerge->sourceNum) {
460,598✔
375
    pBlock = getNextBlockFromDownstream(pOperator, pNonSortMerge->pSourceStatus[idx]);
413,922✔
376
    if (NULL == pBlock) {
413,922✔
377
      TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
194,449✔
378
      pNonSortMerge->sourceWorkIdx++;
194,449✔
379
      idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
194,449✔
380
      continue;
194,449✔
381
    }
382

383
    pNonSortMerge->lastSourceIdx = idx - 1;
219,473✔
384
    break;
219,473✔
385
  }
386

387
  if (!pBlock) {  // null data
266,149✔
388
    return code;
46,676✔
389
  }
390

391
  code = copyDataBlock(pRes, pBlock);
219,473✔
392
  *pResBlock = pRes;
219,473✔
393
  return code;
219,473✔
394
}
395

396
void destroyNonSortMergeOperatorInfo(void* param) {
46,676✔
397
  SNonSortMergeInfo* pNonSortMerge = param;
46,676✔
398
  taosMemoryFree(pNonSortMerge->pSourceStatus);
46,676✔
399
}
46,676✔
400

UNCOV
401
int32_t getNonSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
UNCOV
402
  return TSDB_CODE_SUCCESS;
×
403
}
404

405

406
int32_t openColsMergeOperator(SOperatorInfo* pOperator) {
342,454✔
407
  return TSDB_CODE_SUCCESS;
342,454✔
408
}
409

410
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
349,972✔
411
  QRY_PARAM_CHECK(pResBlock);
349,972✔
412

413
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
349,972✔
414
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
349,972✔
415
  SSDataBlock*                pBlock = NULL;
349,972✔
416
  SColsMergeInfo*             pColsMerge = &pInfo->colsMergeInfo;
349,972✔
417
  int32_t                     code = 0;
349,972✔
418
  int32_t                     numOfRows = 0;
349,972✔
419
  int32_t                     lino = 0;
349,972✔
420
  STimeWindow                 timeWindow = {.skey = INT64_MAX, .ekey = INT64_MIN};
349,972✔
421
  bool                        allNull = true;
349,972✔
422
  bool                        isVtableMerge = false;
349,972✔
423

424
  if (pOperator->pOperatorGetParam) {
349,972✔
425
    if (pOperator->status == OP_EXEC_DONE) {
315,756✔
426
      pOperator->status = OP_OPENED;
7,518✔
427
    }
428
    numOfRows = ((SMergeOperatorParam*)(pOperator->pOperatorGetParam)->value)->winNum;
315,756✔
429
    freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
315,756✔
430
    pOperator->pOperatorGetParam = NULL;
315,756✔
431
    isVtableMerge = true;
315,756✔
432
  } else {
433
    numOfRows = 1;
34,216✔
434
    isVtableMerge = false;
34,216✔
435
  }
436

437
  qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo));
349,972✔
438
  blockDataCleanup(pInfo->binfo.pRes);
349,972✔
439
  code = blockDataEnsureCapacity(pInfo->binfo.pRes, numOfRows);
349,972✔
440
  QUERY_CHECK_CODE(code, lino, _return);
349,972✔
441

442
  for (int32_t i = 0; i < pColsMerge->sourceNum; ++i) {
1,177,722✔
443
    if (isVtableMerge) {
827,750✔
444
      SOperatorInfo*  pExtWinOp = pOperator->pDownstream[i];
759,318✔
445
      SOperatorParam* pParam = pOperator->pDownstreamGetParams[i];
759,318✔
446
      code = pExtWinOp->fpSet.getNextExtFn(pExtWinOp, pParam, &pBlock);
759,318✔
447
      setOperatorCompleted(pExtWinOp);
759,318✔
448
      pOperator->pDownstreamGetParams[i] = NULL;
759,318✔
449
      QUERY_CHECK_CODE(code, lino, _return);
759,318✔
450
      if (pBlock) {
759,318✔
451
        printDataBlock(pBlock, __func__, "colsMerge", pTaskInfo->id.queryId);
759,318✔
452
        code = copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock, numOfRows);
759,318✔
453
        QUERY_CHECK_CODE(code, lino, _return);
759,318✔
454

455
        timeWindow.skey = TMIN(timeWindow.skey, pBlock->info.window.skey);
759,318✔
456
        timeWindow.ekey = TMAX(timeWindow.ekey, pBlock->info.window.ekey);
759,318✔
457
        allNull = false;
759,318✔
458
      } else {
UNCOV
459
        code = copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock, numOfRows);
×
UNCOV
460
        QUERY_CHECK_CODE(code, lino, _return);
×
461
      }
462
    } else {
463
      pBlock = getNextBlockFromDownstream(pOperator, i);
68,432✔
464
      if (pBlock) {
68,432✔
465
        code = copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock, numOfRows);
68,432✔
466
        QUERY_CHECK_CODE(code, lino, _return);
68,432✔
467
        allNull = false;
68,432✔
468
      }
469
    }
470
  }
471

472
  setOperatorCompleted(pOperator);
349,972✔
473
  if (allNull) {
349,972✔
UNCOV
474
    return code;
×
475
  }
476

477
  pInfo->binfo.pRes->info.window.skey = timeWindow.skey;
349,972✔
478
  pInfo->binfo.pRes->info.window.ekey = timeWindow.ekey;
349,972✔
479
  pInfo->binfo.pRes->info.rows = numOfRows;
349,972✔
480
  *pResBlock = pInfo->binfo.pRes;
349,972✔
481

482
  return code;
349,972✔
UNCOV
483
_return:
×
UNCOV
484
  qError("failed to merge columns, line:%d code:%s", lino, tstrerror(code));
×
UNCOV
485
  return code;
×
486
}
487

488
void destroyColsMergeOperatorInfo(void* param) {
342,454✔
489
  SColsMergeInfo* pColsMergeInfo = param;
342,454✔
490
  taosMemoryFreeClear(pColsMergeInfo->srcBlkIds);
342,454✔
491
}
342,454✔
492

UNCOV
493
int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
UNCOV
494
  return TSDB_CODE_SUCCESS;
×
495
}
496

497

498
SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = {
499
  {0},
500
  {._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo},
501
  {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo},
502
  {._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo},
503
};
504

505

506
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
46,949,866✔
507
  int32_t code = 0;
46,949,866✔
508
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
46,949,866✔
509
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
46,950,270✔
510

511
  if (OPTR_IS_OPENED(pOperator)) {
46,949,874✔
512
    return TSDB_CODE_SUCCESS;
33,168,680✔
513
  }
514

515
  if (NULL != gMultiwayMergeFps[pInfo->type]._openFn) {
13,779,364✔
516
    code = (*gMultiwayMergeFps[pInfo->type]._openFn)(pOperator);
13,779,498✔
517
  }
518

519
  pOperator->status = OP_RES_TO_RETURN;
13,779,806✔
520

521
  if (code != TSDB_CODE_SUCCESS) {
13,781,496✔
UNCOV
522
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
523
    pOperator->pTaskInfo->code = code;
×
UNCOV
524
    T_LONG_JMP(pTaskInfo->env, terrno);
×
525
  }
526

527
  OPTR_SET_OPENED(pOperator);
13,781,496✔
528
  return code;
13,782,264✔
529
}
530

531
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
46,986,890✔
532
  QRY_PARAM_CHECK(pResBlock);
46,986,890✔
533
  int32_t code = TSDB_CODE_SUCCESS;
46,988,057✔
534
  int32_t lino = 0;
46,988,057✔
535

536
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
46,988,057✔
537
    return 0;
37,553✔
538
  }
539

540
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
46,949,591✔
541
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
46,946,999✔
542

543
  code = pOperator->fpSet._openFn(pOperator);
46,947,913✔
544
  QUERY_CHECK_CODE(code, lino, _end);
46,950,944✔
545

546
  if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
46,950,944✔
547
    code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock);
46,950,545✔
548
    QUERY_CHECK_CODE(code, lino, _end);
46,950,944✔
549
  }
550

551
  if ((*pResBlock) != NULL) {
46,950,959✔
552
    code = blockDataCheck(*pResBlock);
36,111,877✔
553
    QUERY_CHECK_CODE(code, lino, _end);
36,111,877✔
554
  } else {
555
    setOperatorCompleted(pOperator);
10,839,067✔
556
  }
557

558
_end:
46,950,944✔
559
  if (code != TSDB_CODE_SUCCESS) {
46,950,944✔
UNCOV
560
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
561
    pTaskInfo->code = code;
×
UNCOV
562
    T_LONG_JMP(pTaskInfo->env, code);
×
563
  }
564
  return code;
46,950,944✔
565
}
566

567
void destroyMultiwayMergeOperatorInfo(void* param) {
13,885,623✔
568
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
13,885,623✔
569
  blockDataDestroy(pInfo->binfo.pRes);
13,885,623✔
570
  pInfo->binfo.pRes = NULL;
13,885,623✔
571

572
  if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) {
13,885,623✔
573
    (*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo);
13,885,623✔
574
  }
575

576
  taosMemoryFreeClear(param);
13,885,220✔
577
}
13,885,220✔
578

579
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
479,426✔
580
  int32_t code = 0;
479,426✔
581
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
479,426✔
582

583
  if (NULL != gMultiwayMergeFps[pInfo->type].getExplainFn) {
479,426✔
584
    code = (*gMultiwayMergeFps[pInfo->type].getExplainFn)(pOptr, pOptrExplain, len);
479,426✔
585
  }
586

587
  return code;
479,426✔
588
}
589

590
static int32_t resetMultiwayMergeOperState(SOperatorInfo* pOper) {
32,825✔
591
  SMultiwayMergeOperatorInfo* pInfo = pOper->info;
32,825✔
592
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
32,825✔
593
  SMergePhysiNode* pPhynode = (SMergePhysiNode*)pOper->pPhyNode;
32,825✔
594
  pOper->status = OP_NOT_OPENED;
32,825✔
595

596
  resetBasicOperatorState(&pInfo->binfo);
32,825✔
597
  pInfo->groupId = 0;
32,825✔
598

599
  OPTR_CLR_OPENED(pOper);
32,825✔
600

601
  switch (pInfo->type) {
32,825✔
602
    case MERGE_TYPE_SORT: {
32,825✔
603

604
      SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
32,825✔
605

606
      blockDataCleanup(pSortMergeInfo->pInputBlock);
32,825✔
607

608
      blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
32,825✔
609
      pSortMergeInfo->pIntermediateBlock = NULL;
32,825✔
610

611
      tsortDestroySortHandle(pSortMergeInfo->pSortHandle);
32,825✔
612
      pSortMergeInfo->pSortHandle = NULL;
32,825✔
613
      pSortMergeInfo->prefetchedTuple = NULL;
32,825✔
614

615
      pInfo->limitInfo = (SLimitInfo){0};
32,825✔
616
      initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
32,825✔
617
      break;
32,825✔
618
    }
UNCOV
619
    case MERGE_TYPE_NON_SORT: {
×
UNCOV
620
      pInfo->nsortMergeInfo = (SNonSortMergeInfo){0};
×
UNCOV
621
      break;
×
622
    }
UNCOV
623
    case MERGE_TYPE_COLUMNS: {
×
624
      break;
×
625
    }
626
    default:
×
UNCOV
627
      qError("Invalid merge type: %d", pInfo->type);
×
628
  }
629

630
  return 0;
32,825✔
631
}
632

633
int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SMergePhysiNode* pMergePhyNode,
13,884,709✔
634
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
635
  QRY_PARAM_CHECK(pOptrInfo);
13,884,709✔
636

637
  SPhysiNode*                 pPhyNode = (SPhysiNode*)pMergePhyNode;
13,885,623✔
638
  int32_t                     lino = 0;
13,885,623✔
639
  int32_t                     code = TSDB_CODE_SUCCESS;
13,885,623✔
640
  SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
13,885,623✔
641
  SOperatorInfo*              pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
13,884,180✔
642
  SDataBlockDescNode*         pDescNode = pPhyNode->pOutputDataBlockDesc;
13,885,093✔
643
  if (pInfo == NULL || pOperator == NULL) {
13,885,093✔
UNCOV
644
    code = terrno;
×
UNCOV
645
    goto _error;
×
646
  }
647
  initOperatorCostInfo(pOperator);
13,885,093✔
648

649
  pOperator->pPhyNode = pPhyNode;
13,885,623✔
650
  pInfo->groupMerge = pMergePhyNode->groupSort;
13,885,623✔
651
  pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId;
13,885,093✔
652
  pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
13,884,180✔
653
  pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
13,885,093✔
654
  pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;
13,884,709✔
655

656
  pInfo->type = pMergePhyNode->type;
13,884,180✔
657
  switch (pInfo->type) {
13,885,239✔
658
    case MERGE_TYPE_SORT: {
13,495,964✔
659
      SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
13,495,964✔
660
      initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
13,495,963✔
661
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
13,496,109✔
662
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
13,496,109✔
663

664
      SPhysiNode*  pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
13,495,579✔
665
      SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
13,495,963✔
666
      TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno);
13,496,493✔
667
      pSortMergeInfo->pInputBlock = pInputBlock;
13,496,493✔
668

669
      initResultSizeInfo(&pOperator->resultInfo, 1024);
13,495,963✔
670
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
13,495,579✔
671
      TSDB_CHECK_CODE(code, lino, _error);
13,496,094✔
672

673
      size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
13,496,094✔
674
      int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
13,495,710✔
675
      int32_t numOfOutputCols = 0;
13,496,094✔
676
      pSortMergeInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
13,496,094✔
677
      pSortMergeInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
13,495,964✔
678
      pSortMergeInfo->sortBufSize =
13,496,109✔
679
          pSortMergeInfo->bufPageSize * (numStreams + 1);  // one additional is reserved for merged result.
13,496,109✔
680
      code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
13,495,580✔
681
                                 &pSortMergeInfo->matchInfo);
682
      if (code != TSDB_CODE_SUCCESS) {
13,495,196✔
UNCOV
683
        goto _error;
×
684
      }
685
      break;
13,495,196✔
686
    }
687
    case MERGE_TYPE_NON_SORT: {
46,676✔
688
      SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
46,676✔
689
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
46,676✔
690
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
46,676✔
691

692
      initResultSizeInfo(&pOperator->resultInfo, 1024);
46,676✔
693
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
46,676✔
694
      TSDB_CHECK_CODE(code, lino, _error);
46,676✔
695

696
      break;
46,676✔
697
    }
698
    case MERGE_TYPE_COLUMNS: {
342,454✔
699
      SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
342,454✔
700
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
342,454✔
701
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
342,454✔
702

703
      initResultSizeInfo(&pOperator->resultInfo, 1);
342,454✔
704
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
342,454✔
705
      TSDB_CHECK_CODE(code, lino, _error);
342,454✔
706

707
      pColsMerge->pTargets = pMergePhyNode->pTargets;
342,454✔
708
      pColsMerge->sourceNum = numStreams;
342,454✔
709
      pColsMerge->srcBlkIds = taosMemoryCalloc(numStreams, sizeof(int64_t));
342,454✔
710
      for (size_t i = 0; i < numStreams; ++i) {
1,147,650✔
711
        pColsMerge->srcBlkIds[i] = getOperatorResultBlockId(downStreams[i], 0);
805,196✔
712
      }
713
      break;
342,454✔
714
    }
UNCOV
715
    default:
×
UNCOV
716
      qError("Invalid merge type: %d", pInfo->type);
×
UNCOV
717
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
718
      goto _error;
×
719
  }
720

721
  setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo,
13,884,710✔
722
                  pTaskInfo);
723
  pOperator->fpSet =
724
      createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, destroyMultiwayMergeOperatorInfo,
13,883,796✔
725
                          optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL);
726

727
  setOperatorResetStateFn(pOperator, resetMultiwayMergeOperState);
13,883,266✔
728
  code = appendDownstream(pOperator, downStreams, numStreams);
13,883,121✔
729
  if (code != TSDB_CODE_SUCCESS) {
13,882,738✔
UNCOV
730
    goto _error;
×
731
  }
732

733
  *pOptrInfo = pOperator;
13,882,738✔
734
  return TSDB_CODE_SUCCESS;
13,882,738✔
735

UNCOV
736
_error:
×
UNCOV
737
  if (pInfo != NULL) {
×
UNCOV
738
    destroyMultiwayMergeOperatorInfo(pInfo);
×
739
  }
UNCOV
740
  pTaskInfo->code = code;
×
741
  destroyOperatorAndDownstreams(pOperator, downStreams, numStreams);
×
742
  return code;
×
743
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc