• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.04
/source/libs/executor/src/mergeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "operator.h"
19
#include "querytask.h"
20
#include "tdatablock.h"
21

22
typedef struct SSortMergeInfo {
23
  SArray*        pSortInfo;
24
  SSortHandle*   pSortHandle;
25
  STupleHandle*  prefetchedTuple;
26
  int32_t        bufPageSize;
27
  uint32_t       sortBufSize;  // max buffer size for in-memory sort
28
  SSDataBlock*   pIntermediateBlock;   // to hold the intermediate result
29
  SSDataBlock*   pInputBlock;
30
  SColMatchInfo  matchInfo;
31
} SSortMergeInfo;
32

33
typedef struct SNonSortMergeInfo {
34
  int32_t  lastSourceIdx;
35
  int32_t  sourceWorkIdx;
36
  int32_t  sourceNum;
37
  int32_t* pSourceStatus;
38
} SNonSortMergeInfo;
39

40
typedef struct SColsMergeInfo {
41
  SNodeList* pTargets;
42
  uint64_t   srcBlkIds[2]; 
43
} SColsMergeInfo;
44

45
typedef struct SMultiwayMergeOperatorInfo {
46
  SOptrBasicInfo binfo;
47
  EMergeType     type;
48
  union {
49
    SSortMergeInfo    sortMergeInfo;
50
    SNonSortMergeInfo nsortMergeInfo;
51
    SColsMergeInfo    colsMergeInfo;
52
  };
53
  SLimitInfo     limitInfo;
54
  bool           groupMerge;
55
  bool           ignoreGroupId;
56
  uint64_t       groupId;
57
  bool           inputWithGroupId;
58
} SMultiwayMergeOperatorInfo;
59

60
static int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
61
static int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
62
static int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
63
static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
64
static int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock);
65

66
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
5,734,061✔
67
  SOperatorInfo* pOperator = (SOperatorInfo*)param;
5,734,061✔
68
  int32_t        code = pOperator->fpSet.getNextFn(pOperator, ppBlock);
5,734,061✔
69
  if (code) {
5,733,850!
70
    qError("failed to get next data block from upstream, %s code:%s", __func__, tstrerror(code));
×
71
  }
72
  code = blockDataCheck(*ppBlock);
5,733,850✔
73
  if (code) {
5,733,849!
74
    qError("failed to check data block got from upstream, %s code:%s", __func__, tstrerror(code));
×
75
  }
76
  return code;
5,733,851✔
77
}
78

79
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
1,398,819✔
80
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
1,398,819✔
81
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
1,398,819✔
82
  SSortMergeInfo*             pSortMergeInfo = &pInfo->sortMergeInfo;
1,398,819✔
83

84
  int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize;
1,398,819✔
85

86
  pSortMergeInfo->pSortHandle = NULL;
1,398,819✔
87
  int32_t code = tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize,
1,398,819✔
88
                                       numOfBufPage, pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0, &pSortMergeInfo->pSortHandle);
1,398,819✔
89
  if (code) {
1,398,825!
UNCOV
90
    return code;
×
91
  }
92

93
  tsortSetFetchRawDataFp(pSortMergeInfo->pSortHandle, sortMergeloadNextDataBlock, NULL, NULL);
1,398,825✔
94
  tsortSetCompareGroupId(pSortMergeInfo->pSortHandle, pInfo->groupMerge);
1,398,821✔
95

96
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
4,263,977✔
97
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
2,865,153✔
98
    if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
2,865,153!
99
      code = pDownstream->fpSet._openFn(pDownstream);
2,865,166✔
100
      if (code) {
2,865,171!
101
        return code;
×
102
      }
103
    }
104

105
    SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
2,865,158✔
106
    if (ps == NULL) {
2,865,207!
UNCOV
107
      return terrno;
×
108
    }
109

110
    ps->param = pDownstream;
2,865,207✔
111
    ps->onlyRef = true;
2,865,207✔
112

113
    code = tsortAddSource(pSortMergeInfo->pSortHandle, ps);
2,865,207✔
114
    if (code) {
2,865,151!
115
      return code;
×
116
    }
117
  }
118

119
  return tsortOpen(pSortMergeInfo->pSortHandle);
1,398,824✔
120
}
121

122
static int32_t doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
2,630,932✔
123
                                 SSDataBlock* p, bool* newgroup) {
124
  SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
2,630,932✔
125
  *newgroup = false;
2,630,932✔
126
  int32_t code = 0;
2,630,932✔
127

128
  while (1) {
522,970,939✔
129
    STupleHandle* pTupleHandle = NULL;
525,601,871✔
130
    if (pInfo->groupMerge || pInfo->inputWithGroupId) {
525,601,871✔
131
      if (pSortMergeInfo->prefetchedTuple == NULL) {
502,562,855✔
132
        pTupleHandle = NULL;
502,455,387✔
133
        code = tsortNextTuple(pHandle, &pTupleHandle);
502,455,387✔
134
        if (code) {
135
          // todo handle error
136
        }
137
      } else {
138
        pTupleHandle = pSortMergeInfo->prefetchedTuple;
107,468✔
139
        pSortMergeInfo->prefetchedTuple = NULL;
107,468✔
140
        uint64_t gid = tsortGetGroupId(pTupleHandle);
107,468✔
141
        if (gid != pInfo->groupId) {
107,468!
142
          *newgroup = true;
107,468✔
143
          pInfo->groupId = gid;
107,468✔
144
        }
145
      }
146
    } else {
147
      code = tsortNextTuple(pHandle, &pTupleHandle);
23,039,016✔
148
      pInfo->groupId = 0;
22,854,173✔
149
    }
150

151
    if (pTupleHandle == NULL || (code != 0)) {
524,888,587!
152
      break;
153
    }
154

155
    if (pInfo->groupMerge || pInfo->inputWithGroupId) {
1,022,724,518✔
156
      uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
499,754,030✔
157
      if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
499,603,695✔
158
        code = appendOneRowToDataBlock(p, pTupleHandle);
499,496,112✔
159
        if (code) {
500,194,224!
UNCOV
160
          return code;
×
161
        }
162

163
        p->info.id.groupId = tupleGroupId;
500,194,224✔
164
        pInfo->groupId = tupleGroupId;
500,194,224✔
165
      } else {
166
        if (p->info.rows == 0) {
107,583✔
167
          code = appendOneRowToDataBlock(p, pTupleHandle);
14✔
168
          if (code) {
14!
169
            return code;
×
170
          }
171

172
          p->info.id.groupId = pInfo->groupId = tupleGroupId;
14✔
173
        } else {
174
          pSortMergeInfo->prefetchedTuple = pTupleHandle;
107,569✔
175
          break;
107,569✔
176
        }
177
      }
178
    } else {
179
      code = appendOneRowToDataBlock(p, pTupleHandle);
22,776,250✔
180
      if (code) {
22,959,482!
UNCOV
181
        return code;
×
182
      }
183
    }
184

185
    if (p->info.rows >= capacity) {
523,153,720✔
186
      break;
182,781✔
187
    }
188
  }
189

190
  return code;
2,648,657✔
191
}
192

193
int32_t doSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
2,630,886✔
194
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
2,630,886✔
195
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
2,630,886✔
196
  SSortMergeInfo*             pSortMergeInfo = &pInfo->sortMergeInfo;
2,630,886✔
197
  SSortHandle*                pHandle = pSortMergeInfo->pSortHandle;
2,630,886✔
198
  SSDataBlock*                pDataBlock = pInfo->binfo.pRes;
2,630,886✔
199
  SArray*                     pColMatchInfo = pSortMergeInfo->matchInfo.pList;
2,630,886✔
200
  int32_t                     capacity = pOperator->resultInfo.capacity;
2,630,886✔
201
  int32_t                     code = 0;
2,630,886✔
202
  bool                        newgroup = false;
2,630,886✔
203

204
  qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
2,630,886✔
205
  blockDataCleanup(pDataBlock);
2,630,886✔
206

207
  if (pSortMergeInfo->pIntermediateBlock == NULL) {
2,630,862✔
208
    pSortMergeInfo->pIntermediateBlock = NULL;
1,398,817✔
209
    code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock);
1,398,817✔
210
    if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) {
1,398,815!
UNCOV
211
      return code;
×
212
    }
213

214
    code = blockDataEnsureCapacity(pSortMergeInfo->pIntermediateBlock, capacity);
1,398,815✔
215
    if (code) {
1,398,815!
UNCOV
216
      return code;
×
217
    }
218

219
  } else {
220
    blockDataCleanup(pSortMergeInfo->pIntermediateBlock);
1,232,045✔
221
  }
222

223
  SSDataBlock* p = pSortMergeInfo->pIntermediateBlock;
2,630,860✔
224
  while (1) {
70✔
225
    code = doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
2,630,930✔
226
    if (code) {
2,630,979!
UNCOV
227
      return code;
×
228
    }
229

230
    if (p->info.rows == 0) {
2,630,979✔
231
      break;
1,363,767✔
232
    }
233

234
    if (newgroup) {
1,267,212✔
235
      resetLimitInfoForNextGroup(&pInfo->limitInfo);
107,468✔
236
    }
237

238
    bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
1,267,212✔
239

240
    if (p->info.rows > 0) {
1,267,217✔
241
      break;
1,267,147✔
242
    }
243
  }
244

245
  if (p->info.rows > 0) {
2,630,914✔
246
    int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
1,267,147✔
247
    for (int32_t i = 0; i < numOfCols; ++i) {
7,642,995✔
248
      SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
6,375,926✔
249
      if (pmInfo == NULL) {
6,375,836!
250
        code = terrno;
×
251
        return code;
×
252
      }
253

254
      SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
6,375,836✔
255
      if (pSrc == NULL) {
6,375,781!
256
        code = terrno;
×
257
        return code;
×
258
      }
259

260
      SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
6,375,781✔
261
      if (pDst == NULL) {
6,375,718!
262
        code = terrno;
×
263
        return code;
×
264
      }
265

266
      code = colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
6,375,718✔
267
      if (code) {
6,375,849!
268
        return code;
×
269
      }
270
    }
271

272
    pDataBlock->info.rows = p->info.rows;
1,267,069✔
273
    pDataBlock->info.scanFlag = p->info.scanFlag;
1,267,069✔
274
    if (pInfo->ignoreGroupId) {
1,267,069✔
275
      pDataBlock->info.id.groupId = 0;
8✔
276
    } else {
277
      pDataBlock->info.id.groupId = pInfo->groupId;
1,267,061✔
278
    }
279
    pDataBlock->info.dataLoad = 1;
1,267,069✔
280
  }
281

282
  qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
2,630,836✔
283
         pDataBlock->info.rows);
284

285
  *pResBlock = (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
2,630,902✔
286
  return code;
2,630,902✔
287
}
288

289
int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
134,226✔
290
  SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
134,226✔
291
  if (pSortExecInfo == NULL) {
134,226!
292
    pOptr->pTaskInfo->code = terrno;
×
293
    return terrno;
×
294
  }
295

296
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
134,226✔
297
  SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
134,226✔
298

299
  *pSortExecInfo = tsortGetSortExecInfo(pSortMergeInfo->pSortHandle);
134,226✔
300
  *pOptrExplain = pSortExecInfo;
134,226✔
301

302
  *len = sizeof(SSortExecInfo);
134,226✔
303
  return TSDB_CODE_SUCCESS;
134,226✔
304
}
305

306

307
void destroySortMergeOperatorInfo(void* param) {
1,398,994✔
308
  SSortMergeInfo* pSortMergeInfo = param;
1,398,994✔
309
  blockDataDestroy(pSortMergeInfo->pInputBlock);
1,398,994✔
310
  pSortMergeInfo->pInputBlock = NULL;
1,398,996✔
311

312
  blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
1,398,996✔
313
  pSortMergeInfo->pIntermediateBlock = NULL;
1,399,006✔
314

315
  taosArrayDestroy(pSortMergeInfo->matchInfo.pList);
1,399,006✔
316

317
  tsortDestroySortHandle(pSortMergeInfo->pSortHandle);
1,398,996✔
318
  taosArrayDestroy(pSortMergeInfo->pSortInfo);
1,399,000✔
319
}
1,399,000✔
320

321
#define NON_SORT_NEXT_SRC(_info, _idx) ((++(_idx) >= (_info)->sourceNum) ? ((_info)->sourceWorkIdx) : (_idx))
322

323
int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
1,854✔
324
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
1,854✔
325
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
1,854✔
326
  SNonSortMergeInfo*          pNonSortMergeInfo = &pInfo->nsortMergeInfo;
1,854✔
327

328
  pNonSortMergeInfo->sourceWorkIdx = 0;
1,854✔
329
  pNonSortMergeInfo->sourceNum = pOperator->numOfDownstream;
1,854✔
330
  pNonSortMergeInfo->lastSourceIdx = -1;
1,854✔
331
  pNonSortMergeInfo->pSourceStatus =
1,854✔
332
      taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pNonSortMergeInfo->pSourceStatus));
1,854✔
333
  if (NULL == pNonSortMergeInfo->pSourceStatus) {
1,854!
334
    pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
335
    return pTaskInfo->code;
×
336
  }
337

338
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
9,296✔
339
    pNonSortMergeInfo->pSourceStatus[i] = i;
7,442✔
340
  }
341

342
  return 0;
1,854✔
343
}
344

345
int32_t doNonSortMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
7,031✔
346
  QRY_PARAM_CHECK(pResBlock);
7,031!
347

348
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
7,031✔
349
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
7,031✔
350
  SNonSortMergeInfo*          pNonSortMerge = &pInfo->nsortMergeInfo;
7,031✔
351
  SSDataBlock*                pBlock = NULL;
7,031✔
352
  SSDataBlock*                pRes = pInfo->binfo.pRes;
7,031✔
353
  int32_t                     code = 0;
7,031✔
354

355
  qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
7,031✔
356

357
  int32_t idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
7,031!
358
  while (idx < pNonSortMerge->sourceNum) {
14,473✔
359
    pBlock = getNextBlockFromDownstream(pOperator, pNonSortMerge->pSourceStatus[idx]);
12,619✔
360
    if (NULL == pBlock) {
12,619✔
361
      TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
7,442✔
362
      pNonSortMerge->sourceWorkIdx++;
7,442✔
363
      idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
7,442✔
364
      continue;
7,442✔
365
    }
366

367
    pNonSortMerge->lastSourceIdx = idx - 1;
5,177✔
368
    break;
5,177✔
369
  }
370

371
  if (!pBlock) {  // null data
7,031✔
372
    return code;
1,854✔
373
  }
374

375
  code = copyDataBlock(pRes, pBlock);
5,177✔
376
  *pResBlock = pRes;
5,177✔
377
  return code;
5,177✔
378
}
379

380
void destroyNonSortMergeOperatorInfo(void* param) {
1,854✔
381
  SNonSortMergeInfo* pNonSortMerge = param;
1,854✔
382
  taosMemoryFree(pNonSortMerge->pSourceStatus);
1,854✔
383
}
1,854✔
384

385
int32_t getNonSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
386
  return TSDB_CODE_SUCCESS;
×
387
}
388

389

390
int32_t openColsMergeOperator(SOperatorInfo* pOperator) {
244✔
391
  return TSDB_CODE_SUCCESS;
244✔
392
}
393

394
int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock* pDst, SSDataBlock* pSrc) {
488✔
395
  bool    isNull = (NULL == pSrc || pSrc->info.rows <= 0);
488!
396
  size_t  numOfCols = LIST_LENGTH(pNodeList);
488!
397
  int32_t code = 0;
488✔
398

399
  for (int32_t i = 0; i < numOfCols; ++i) {
5,086✔
400
    STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
4,598✔
401
    if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == targetBlkId) {
4,598!
402
      SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, pNode->slotId);
2,299✔
403
      if (pDstCol == NULL) {
2,299!
404
        return terrno;
×
405
      }
406

407
      if (isNull) {
2,299!
408
        code = colDataSetVal(pDstCol, 0, NULL, true);
×
409
      } else {
410
        SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, ((SColumnNode*)pNode->pExpr)->slotId);
2,299✔
411
        if (pSrcCol == NULL) {
2,299!
412
          code = terrno;
×
413
          return code;
×
414
        }
415

416
        code = colDataAssign(pDstCol, pSrcCol, 1, &pDst->info);
2,299✔
417
      }
418

419
      if (code) {
2,299!
420
        break;
×
421
      }
422
    }
423
  }
424

425
  return code;
488✔
426
}
427

428
int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
244✔
429
  QRY_PARAM_CHECK(pResBlock);
244!
430

431
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
244✔
432
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
244✔
433
  SSDataBlock*                pBlock = NULL;
244✔
434
  SColsMergeInfo*             pColsMerge = &pInfo->colsMergeInfo;
244✔
435
  int32_t                     nullBlkNum = 0;
244✔
436
  int32_t                     code = 0;
244✔
437

438
  qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo));
244✔
439

440
  for (int32_t i = 0; i < 2; ++i) {
732✔
441
    pBlock = getNextBlockFromDownstream(pOperator, i);
488✔
442
    if (pBlock && pBlock->info.rows > 1) {
488!
443
      qError("more than 1 row returned from downstream, rows:%" PRId64, pBlock->info.rows);
×
444
      T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
445
    } else if (NULL == pBlock) {
488!
446
      nullBlkNum++;
×
447
    }
448
    
449
    code = copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock);
488✔
450
    if (code) {
488!
451
      return code;
×
452
    }
453
  }
454

455
  setOperatorCompleted(pOperator);
244✔
456
  if (2 == nullBlkNum) {
244!
457
    return code;
×
458
  }
459

460
  pInfo->binfo.pRes->info.rows = 1;
244✔
461
  *pResBlock = pInfo->binfo.pRes;
244✔
462

463
  return code;
244✔
464
}
465

466
void destroyColsMergeOperatorInfo(void* param) {
244✔
467
}
244✔
468

469
int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
×
470
  return TSDB_CODE_SUCCESS;
×
471
}
472

473

474
SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = {
475
  {0},
476
  {._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo},
477
  {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo},
478
  {._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo},
479
};
480

481

482
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
2,638,161✔
483
  int32_t code = 0;
2,638,161✔
484
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
2,638,161✔
485
  SExecTaskInfo*              pTaskInfo = pOperator->pTaskInfo;
2,638,161✔
486

487
  if (OPTR_IS_OPENED(pOperator)) {
2,638,161✔
488
    return TSDB_CODE_SUCCESS;
1,237,257✔
489
  }
490

491
  int64_t startTs = taosGetTimestampUs();
1,400,921✔
492
  
493
  if (NULL != gMultiwayMergeFps[pInfo->type]._openFn) {
1,400,921!
494
    code = (*gMultiwayMergeFps[pInfo->type]._openFn)(pOperator);
1,400,926✔
495
  }
496

497
  pOperator->cost.openCost = (taosGetTimestampUs() - startTs) / 1000.0;
1,400,913✔
498
  pOperator->status = OP_RES_TO_RETURN;
1,400,913✔
499

500
  if (code != TSDB_CODE_SUCCESS) {
1,400,913!
UNCOV
501
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
502
    pOperator->pTaskInfo->code = code;
×
UNCOV
503
    T_LONG_JMP(pTaskInfo->env, terrno);
×
504
  }
505

506
  OPTR_SET_OPENED(pOperator);
1,400,913✔
507
  return code;
1,400,913✔
508
}
509

510
int32_t doMultiwayMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
2,761,730✔
511
  QRY_PARAM_CHECK(pResBlock);
2,761,730!
512
  int32_t code = TSDB_CODE_SUCCESS;
2,761,730✔
513
  int32_t lino = 0;
2,761,730✔
514

515
  if (pOperator->status == OP_EXEC_DONE) {
2,761,730✔
516
    return 0;
123,567✔
517
  }
518

519
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,638,163✔
520
  SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
2,638,163✔
521

522
  code = pOperator->fpSet._openFn(pOperator);
2,638,163✔
523
  QUERY_CHECK_CODE(code, lino, _end);
2,638,149!
524

525
  if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
2,638,149!
526
    code = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator, pResBlock);
2,638,166✔
527
    QUERY_CHECK_CODE(code, lino, _end);
2,638,157!
528
  }
529

530
  if ((*pResBlock) != NULL) {
2,638,140✔
531
    pOperator->resultInfo.totalRows += (*pResBlock)->info.rows;
1,272,559✔
532
    code = blockDataCheck(*pResBlock);
1,272,559✔
533
    QUERY_CHECK_CODE(code, lino, _end);
1,272,549!
534
  } else {
535
    setOperatorCompleted(pOperator);
1,365,581✔
536
  }
537

538
_end:
2,638,139✔
539
  if (code != TSDB_CODE_SUCCESS) {
2,638,139!
UNCOV
540
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
541
    pTaskInfo->code = code;
×
UNCOV
542
    T_LONG_JMP(pTaskInfo->env, code);
×
543
  }
544
  return code;
2,638,139✔
545
}
546

547
void destroyMultiwayMergeOperatorInfo(void* param) {
1,401,088✔
548
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
1,401,088✔
549
  blockDataDestroy(pInfo->binfo.pRes);
1,401,088✔
550
  pInfo->binfo.pRes = NULL;
1,401,100✔
551

552
  if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) {
1,401,100✔
553
    (*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo);
1,401,096✔
554
  }
555

556
  taosMemoryFreeClear(param);
1,401,102✔
557
}
1,401,103✔
558

559
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
134,226✔
560
  int32_t code = 0;
134,226✔
561
  SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
134,226✔
562

563
  if (NULL != gMultiwayMergeFps[pInfo->type].getExplainFn) {
134,226!
564
    code = (*gMultiwayMergeFps[pInfo->type].getExplainFn)(pOptr, pOptrExplain, len);
134,226✔
565
  }
566

567
  return code;
134,226✔
568
}
569

570
int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, SMergePhysiNode* pMergePhyNode,
1,401,093✔
571
                                        SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
572
  QRY_PARAM_CHECK(pOptrInfo);
1,401,093!
573

574
  SPhysiNode*                 pPhyNode = (SPhysiNode*)pMergePhyNode;
1,401,093✔
575
  int32_t                     lino = 0;
1,401,093✔
576
  int32_t                     code = TSDB_CODE_SUCCESS;
1,401,093✔
577
  SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo));
1,401,093✔
578
  SOperatorInfo*              pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,401,080✔
579
  SDataBlockDescNode*         pDescNode = pPhyNode->pOutputDataBlockDesc;
1,401,071✔
580
  if (pInfo == NULL || pOperator == NULL) {
1,401,071!
UNCOV
581
    code = terrno;
×
UNCOV
582
    goto _error;
×
583
  }
584

585
  pInfo->groupMerge = pMergePhyNode->groupSort;
1,401,072✔
586
  pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId;
1,401,072✔
587
  pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
1,401,072✔
588
  pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
1,401,072✔
589
  pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;
1,401,072✔
590

591
  pInfo->type = pMergePhyNode->type;
1,401,072✔
592
  switch (pInfo->type) {
1,401,072!
593
    case MERGE_TYPE_SORT: {
1,398,978✔
594
      SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
1,398,978✔
595
      initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
1,398,978✔
596
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
1,398,981✔
597
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
1,399,002!
598

599
      SPhysiNode*  pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
1,399,002✔
600
      SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
1,399,001✔
601
      TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno);
1,398,991!
602
      pSortMergeInfo->pInputBlock = pInputBlock;
1,398,991✔
603

604
      initResultSizeInfo(&pOperator->resultInfo, 1024);
1,398,991✔
605
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,398,991✔
606
      TSDB_CHECK_CODE(code, lino, _error);
1,398,995!
607

608
      size_t  numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
1,398,995✔
609
      int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
1,398,994✔
610
      int32_t numOfOutputCols = 0;
1,398,994✔
611
      pSortMergeInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
1,398,994✔
612
      pSortMergeInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
1,398,983✔
613
      pSortMergeInfo->sortBufSize =
1,398,983✔
614
          pSortMergeInfo->bufPageSize * (numStreams + 1);  // one additional is reserved for merged result.
1,398,983✔
615
      code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
1,398,983✔
616
                                 &pSortMergeInfo->matchInfo);
617
      if (code != TSDB_CODE_SUCCESS) {
1,398,957!
UNCOV
618
        goto _error;
×
619
      }
620
      break;
1,398,957✔
621
    }
622
    case MERGE_TYPE_NON_SORT: {
1,854✔
623
      SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
1,854✔
624
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
1,854✔
625
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
1,854!
626

627
      initResultSizeInfo(&pOperator->resultInfo, 1024);
1,854✔
628
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
1,854✔
629
      TSDB_CHECK_CODE(code, lino, _error);
1,854!
630

631
      break;
1,854✔
632
    }
633
    case MERGE_TYPE_COLUMNS: {
244✔
634
      SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
244✔
635
      pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
244✔
636
      TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
244!
637

638
      initResultSizeInfo(&pOperator->resultInfo, 1);
244✔
639
      code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
244✔
640
      TSDB_CHECK_CODE(code, lino, _error);
244!
641

642
      pColsMerge->pTargets = pMergePhyNode->pTargets;
244✔
643
      pColsMerge->srcBlkIds[0] = getOperatorResultBlockId(downStreams[0], 0);
244✔
644
      pColsMerge->srcBlkIds[1] = getOperatorResultBlockId(downStreams[1], 0);
244✔
645
      break;
244✔
646
    }
647
    default:
×
648
      qError("Invalid merge type: %d", pInfo->type);
×
649
      code = TSDB_CODE_INVALID_PARA;
×
650
      goto _error;
×
651
  }
652

653
  setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo,
1,401,055✔
654
                  pTaskInfo);
655
  pOperator->fpSet =
656
      createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, destroyMultiwayMergeOperatorInfo,
1,401,075✔
657
                          optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL);
658

659
  code = appendDownstream(pOperator, downStreams, numStreams);
1,401,092✔
660
  if (code != TSDB_CODE_SUCCESS) {
1,401,102!
661
    goto _error;
×
662
  }
663

664
  *pOptrInfo = pOperator;
1,401,102✔
665
  return TSDB_CODE_SUCCESS;
1,401,102✔
666

UNCOV
667
_error:
×
UNCOV
668
  if (pInfo != NULL) {
×
UNCOV
669
    destroyMultiwayMergeOperatorInfo(pInfo);
×
670
  }
UNCOV
671
  pTaskInfo->code = code;
×
UNCOV
672
  destroyOperatorAndDownstreams(pOperator, downStreams, numStreams);
×
UNCOV
673
  return code;
×
674
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc