• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4066

12 May 2025 05:35AM UTC coverage: 62.547% (+0.04%) from 62.508%
#4066

push

travis-ci

web-flow
Merge pull request #31053 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

155777 of 317858 branches covered (49.01%)

Branch coverage included in aggregate %.

382 of 573 new or added lines in 31 files covered. (66.67%)

648 existing lines in 129 files now uncovered.

241270 of 316936 relevant lines covered (76.13%)

6462449.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.01
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
} SFetchRspHandleWrapper;
32

33
typedef struct SSourceDataInfo {
34
  int32_t            index;
35
  SRetrieveTableRsp* pRsp;
36
  uint64_t           totalRows;
37
  int64_t            startTime;
38
  int32_t            code;
39
  EX_SOURCE_STATUS   status;
40
  const char*        taskId;
41
  SArray*            pSrcUidList;
42
  int32_t            srcOpType;
43
  bool               tableSeq;
44
  char*              decompBuf;
45
  int32_t            decompBufSize;
46
  SOrgTbInfo*     colMap;
47
  bool               isVtbRefScan;
48
  STimeWindow        window;
49
} SSourceDataInfo;
50

51
static void destroyExchangeOperatorInfo(void* param);
52
static void freeBlock(void* pParam);
53
static void freeSourceDataInfo(void* param);
54
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
55

56
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
57
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
58
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
59
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
60
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
61
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
62
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
63
                                 bool holdDataInBuf);
64
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
65

66
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
67

68
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
1,678,501✔
69
                                           SExecTaskInfo* pTaskInfo) {
70
  int32_t code = 0;
1,678,501✔
71
  int32_t lino = 0;
1,678,501✔
72
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
1,678,501✔
73
  int32_t completed = 0;
1,678,550✔
74
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
1,678,550✔
75
  if (code != TSDB_CODE_SUCCESS) {
1,678,528!
76
    pTaskInfo->code = code;
×
77
    T_LONG_JMP(pTaskInfo->env, code);
×
78
  }
79
  if (completed == totalSources) {
1,678,528✔
80
    setAllSourcesCompleted(pOperator);
555,991✔
81
    return;
1,678,591✔
82
  }
83

84
  SSourceDataInfo* pDataInfo = NULL;
1,122,537✔
85

86
  while (1) {
120,733✔
87
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
1,243,270✔
88
    code = exchangeWait(pOperator, pExchangeInfo);
1,243,271✔
89

90
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
1,243,361!
91
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
2!
92
    }
93

94
    for (int32_t i = 0; i < totalSources; ++i) {
1,820,290✔
95
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
1,820,248✔
96
      QUERY_CHECK_NULL(pDataInfo, code, lino, _error, terrno);
1,820,240!
97
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1,820,262✔
98
        continue;
477,723✔
99
      }
100

101
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
1,342,539✔
102
        continue;
99,220✔
103
      }
104

105
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
1,243,319!
UNCOV
106
        code = pDataInfo->code;
×
UNCOV
107
        goto _error;
×
108
      }
109

110
      tmemory_barrier();
1,243,319✔
111
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
1,243,319✔
112
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
1,243,319✔
113
      QUERY_CHECK_NULL(pSource, code, lino, _error, terrno);
1,243,328!
114

115
      // todo
116
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,243,342✔
117
      if (pRsp->numOfRows == 0) {
1,243,342✔
118
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
303,131!
119
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
120
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
121
          if (code != TSDB_CODE_SUCCESS) {
×
122
            taosMemoryFreeClear(pDataInfo->pRsp);
×
123
            goto _error;
×
124
          }
125
        } else {
126
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
303,131✔
127
          qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
303,131✔
128
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
129
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
130
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
131
          taosMemoryFreeClear(pDataInfo->pRsp);
303,130!
132
        }
133
        break;
303,128✔
134
      }
135

136
      code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
940,211✔
137
      if (code != TSDB_CODE_SUCCESS) {
940,174!
138
        goto _error;
×
139
      }
140

141
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
940,174✔
142
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
940,174✔
143
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
940,182✔
144

145
      if (pRsp->completed == 1) {
940,182✔
146
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
881,502✔
147
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
881,502✔
148
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
149
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu,
150
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
151
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
152
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
153
      } else {
154
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
58,680✔
155
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
156
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
157
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
158
      }
159

160
      taosMemoryFreeClear(pDataInfo->pRsp);
940,188!
161

162
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan) {
940,212!
163
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
58,688✔
164
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
58,688✔
165
        if (code != TSDB_CODE_SUCCESS) {
58,687!
166
          taosMemoryFreeClear(pDataInfo->pRsp);
×
167
          goto _error;
×
168
        }
169
      }
170
      return;
1,122,599✔
171
    }  // end loop
172

173
    int32_t complete1 = 0;
303,170✔
174
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
303,170✔
175
    if (code != TSDB_CODE_SUCCESS) {
303,120✔
176
      pTaskInfo->code = code;
1✔
177
      T_LONG_JMP(pTaskInfo->env, code);
1!
178
    }
179
    if (complete1 == totalSources) {
303,119✔
180
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
182,386✔
181
      return;
182,388✔
182
    }
183
  }
184

UNCOV
185
_error:
×
UNCOV
186
  pTaskInfo->code = code;
×
187
}
188

189
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
4,753,503✔
190
  int32_t        code = TSDB_CODE_SUCCESS;
4,753,503✔
191
  SExchangeInfo* pExchangeInfo = pOperator->info;
4,753,503✔
192
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,753,503✔
193

194
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
4,753,503✔
195

196
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
4,753,527✔
197
  if (pOperator->status == OP_EXEC_DONE) {
4,753,527!
198
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
199
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
200
           pLoadInfo->totalElapsed / 1000.0);
201
    return NULL;
×
202
  }
203

204
  // we have buffered retrieved datablock, return it directly
205
  SSDataBlock* p = NULL;
4,753,527✔
206
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
4,753,527✔
207
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
3,073,028✔
208
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
3,073,002✔
209
  }
210

211
  if (p != NULL) {
4,753,538✔
212
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
3,072,999✔
213
    if (!tmp) {
3,073,004!
214
      code = terrno;
×
215
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
216
      pTaskInfo->code = code;
×
217
      T_LONG_JMP(pTaskInfo->env, code);
×
218
    }
219
    return p;
3,073,004✔
220
  } else {
221
    if (pExchangeInfo->seqLoadData) {
1,680,539✔
222
      code = seqLoadRemoteData(pOperator);
2,014✔
223
      if (code != TSDB_CODE_SUCCESS) {
2,014!
224
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
225
        pTaskInfo->code = code;
×
226
        T_LONG_JMP(pTaskInfo->env, code);
×
227
      }
228
    } else {
229
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
1,678,525✔
230
    }
231
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
1,680,571!
UNCOV
232
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
×
UNCOV
233
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
×
234
    }
235
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
1,680,571✔
236
      return NULL;
739,112✔
237
    } else {
238
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
941,460✔
239
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
941,469✔
240
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
941,470✔
241
      if (!tmp) {
941,471!
242
        code = terrno;
×
243
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
244
        pTaskInfo->code = code;
×
245
        T_LONG_JMP(pTaskInfo->env, code);
×
246
      }
247
      return p;
941,471✔
248
    }
249
  }
250
}
251

252
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,744,392✔
253
  int32_t        code = TSDB_CODE_SUCCESS;
4,744,392✔
254
  int32_t        lino = 0;
4,744,392✔
255
  SExchangeInfo* pExchangeInfo = pOperator->info;
4,744,392✔
256
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,744,392✔
257

258
  code = pOperator->fpSet._openFn(pOperator);
4,744,392✔
259
  QUERY_CHECK_CODE(code, lino, _end);
4,744,639!
260

261
  if (pOperator->status == OP_EXEC_DONE) {
4,744,639✔
262
    (*ppRes) = NULL;
63✔
263
    return code;
63✔
264
  }
265

266
  while (1) {
8,986✔
267
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
4,753,562✔
268
    if (pBlock == NULL) {
4,753,438✔
269
      (*ppRes) = NULL;
739,110✔
270
      return code;
739,110✔
271
    }
272

273
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
4,014,328✔
274
    QUERY_CHECK_CODE(code, lino, _end);
4,014,364!
275

276
    if (blockDataGetNumOfRows(pBlock) == 0) {
4,014,364✔
277
      continue;
2✔
278
    }
279

280
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
4,014,370✔
281
    if (hasLimitOffsetInfo(pLimitInfo)) {
4,014,370✔
282
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
23,219✔
283
      if (status == PROJECT_RETRIEVE_CONTINUE) {
23,216✔
284
        continue;
8,984✔
285
      } else if (status == PROJECT_RETRIEVE_DONE) {
14,232!
286
        if (pBlock->info.rows == 0) {
14,232!
287
          setOperatorCompleted(pOperator);
×
288
          (*ppRes) = NULL;
×
289
          return code;
×
290
        } else {
291
          (*ppRes) = pBlock;
14,232✔
292
          return code;
14,232✔
293
        }
294
      }
295
    } else {
296
      (*ppRes) = pBlock;
3,991,179✔
297
      return code;
3,991,179✔
298
    }
299
  }
300

301
_end:
×
302
  if (code != TSDB_CODE_SUCCESS) {
×
303
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
304
    pTaskInfo->code = code;
×
305
    T_LONG_JMP(pTaskInfo->env, code);
×
306
  }
307
  (*ppRes) = NULL;
×
308
  return code;
×
309
}
310

311
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
822,736✔
312
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
822,736✔
313
  if (pInfo->pSourceDataInfo == NULL) {
822,755!
314
    return terrno;
×
315
  }
316

317
  if (pInfo->dynamicOp) {
822,784✔
318
    return TSDB_CODE_SUCCESS;
22,169✔
319
  }
320

321
  int32_t len = strlen(id) + 1;
800,615✔
322
  pInfo->pTaskId = taosMemoryCalloc(1, len);
800,615!
323
  if (!pInfo->pTaskId) {
800,682!
324
    return terrno;
×
325
  }
326
  tstrncpy(pInfo->pTaskId, id, len);
800,682✔
327
  for (int32_t i = 0; i < numOfSources; ++i) {
1,987,482✔
328
    SSourceDataInfo dataInfo = {0};
1,186,726✔
329
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,186,726✔
330
    dataInfo.taskId = pInfo->pTaskId;
1,186,726✔
331
    dataInfo.index = i;
1,186,726✔
332
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
1,186,726✔
333
    if (pDs == NULL) {
1,186,800!
334
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
335
      return terrno;
×
336
    }
337
  }
338

339
  return TSDB_CODE_SUCCESS;
800,756✔
340
}
341

342
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
822,754✔
343
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
822,754!
344

345
  if (numOfSources == 0) {
822,754!
346
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
347
    return TSDB_CODE_INVALID_PARA;
×
348
  }
349
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
822,754✔
350
  if (!pInfo->pFetchRpcHandles) {
822,894!
351
    return terrno;
×
352
  }
353
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
822,894✔
354
  if (!ret) {
822,781!
355
    return terrno;
×
356
  }
357

358
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
822,781✔
359
  if (pInfo->pSources == NULL) {
822,720!
360
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
361
    return terrno;
×
362
  }
363

364
  if (pExNode->node.dynamicOp) {
822,720✔
365
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
22,169✔
366
    if (NULL == pInfo->pHashSources) {
22,176!
367
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
368
      return terrno;
×
369
    }
370
  }
371

372
  for (int32_t i = 0; i < numOfSources; ++i) {
2,054,028✔
373
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
1,231,246✔
374
    if (!pNode) {
1,231,320!
375
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
376
      return terrno;
×
377
    }
378
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
1,231,320✔
379
    if (!tmp) {
1,231,373!
380
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
381
      return terrno;
×
382
    }
383
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
1,231,373✔
384
    int32_t           code =
385
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
1,231,373✔
386
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
1,231,301!
387
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
388
      return code;
×
389
    }
390
  }
391

392
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
822,782✔
393
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
822,819✔
394
  if (refId < 0) {
822,774!
395
    int32_t code = terrno;
×
396
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
397
    return code;
×
398
  } else {
399
    pInfo->self = refId;
822,774✔
400
  }
401

402
  return initDataSource(numOfSources, pInfo, id);
822,774✔
403
}
404

405
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
822,692✔
406
                                   SOperatorInfo** pOptrInfo) {
407
  QRY_PARAM_CHECK(pOptrInfo);
822,692!
408

409
  int32_t        code = 0;
822,692✔
410
  int32_t        lino = 0;
822,692✔
411
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
822,692!
412
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
822,757!
413
  if (pInfo == NULL || pOperator == NULL) {
822,750!
414
    code = terrno;
×
415
    goto _error;
×
416
  }
417

418
  pInfo->dynamicOp = pExNode->node.dynamicOp;
822,763✔
419
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
822,763✔
420
  QUERY_CHECK_CODE(code, lino, _error);
822,685!
421

422
  code = tsem_init(&pInfo->ready, 0, 0);
822,685✔
423
  QUERY_CHECK_CODE(code, lino, _error);
822,786!
424

425
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
822,786✔
426
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
822,849!
427

428
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
822,849✔
429
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
822,771!
430
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
822,771✔
431
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
822,778!
432

433
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
822,778✔
434
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
822,778✔
435
  QUERY_CHECK_CODE(code, lino, _error);
822,850!
436

437
  pInfo->seqLoadData = pExNode->seqRecvData;
822,850✔
438
  pInfo->pTransporter = pTransporter;
822,850✔
439

440
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
822,850✔
441
                  pTaskInfo);
442
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
822,791✔
443

444
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
822,816✔
445
  QUERY_CHECK_CODE(code, lino, _error);
822,798!
446

447
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
822,798✔
448
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
449
  *pOptrInfo = pOperator;
822,736✔
450
  return TSDB_CODE_SUCCESS;
822,736✔
451

452
_error:
×
453
  if (code != TSDB_CODE_SUCCESS) {
×
454
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
455
    pTaskInfo->code = code;
×
456
  }
457
  if (pInfo != NULL) {
×
458
    doDestroyExchangeOperatorInfo(pInfo);
×
459
  }
460

461
  if (pOperator != NULL) {
×
462
    pOperator->info = NULL;
×
463
    destroyOperator(pOperator);
×
464
  }
465
  return code;
×
466
}
467

468
void destroyExchangeOperatorInfo(void* param) {
822,842✔
469
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
822,842✔
470
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
822,842✔
471
  if (code != TSDB_CODE_SUCCESS) {
822,906!
472
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
473
  }
474
}
822,906✔
475

476
void freeBlock(void* pParam) {
2,445,187✔
477
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
2,445,187✔
478
  blockDataDestroy(pBlock);
2,445,187✔
479
}
2,445,215✔
480

481
void freeSourceDataInfo(void* p) {
1,188,059✔
482
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
1,188,059✔
483
  taosMemoryFreeClear(pInfo->decompBuf);
1,188,059!
484
  taosMemoryFreeClear(pInfo->pRsp);
1,188,059!
485

486
  pInfo->decompBufSize = 0;
1,188,059✔
487
}
1,188,059✔
488

489
void doDestroyExchangeOperatorInfo(void* param) {
822,865✔
490
  if (param == NULL) {
822,865!
491
    return;
×
492
  }
493
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
822,865✔
494
  if (pExInfo->pFetchRpcHandles) {
822,865!
495
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
2,054,365✔
496
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
1,231,469✔
497
      if (*pRpcHandle > 0) {
1,231,475✔
498
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
584✔
499
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
584✔
500
      }
501
    }
502
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
822,896✔
503
  }
504

505
  taosArrayDestroy(pExInfo->pSources);
822,816✔
506
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
822,892✔
507

508
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
822,873✔
509
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
822,888✔
510

511
  blockDataDestroy(pExInfo->pDummyBlock);
822,891✔
512
  tSimpleHashCleanup(pExInfo->pHashSources);
822,892✔
513

514
  int32_t code = tsem_destroy(&pExInfo->ready);
822,874✔
515
  if (code != TSDB_CODE_SUCCESS) {
822,859!
516
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
517
  }
518
  taosMemoryFreeClear(pExInfo->pTaskId);
822,859!
519

520
  taosMemoryFreeClear(param);
822,873!
521
}
522

523
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
1,245,708✔
524
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
1,245,708✔
525

526
  taosMemoryFreeClear(pMsg->pEpSet);
1,245,708!
527
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
1,245,708✔
528
  if (pExchangeInfo == NULL) {
1,245,916✔
529
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
63!
530
    taosMemoryFree(pMsg->pData);
63!
531
    return TSDB_CODE_SUCCESS;
63✔
532
  }
533

534
  int32_t          index = pWrapper->sourceIndex;
1,245,853✔
535
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
1,245,853✔
536

537
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
1,245,543✔
538
  if (pRpcHandle != NULL) {
1,245,446!
539
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
1,245,446✔
540
    if (ret != 0) {
1,245,842✔
541
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
16,225✔
542
    }
543
    *pRpcHandle = -1;
1,245,781✔
544
  }
545

546
  if (!pSourceDataInfo) {
1,245,781!
547
    return terrno;
×
548
  }
549

550
  if (code == TSDB_CODE_SUCCESS) {
1,245,781✔
551
    pSourceDataInfo->pRsp = pMsg->pData;
1,245,539✔
552

553
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
1,245,539✔
554
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
1,245,539✔
555
    pRsp->compLen = htonl(pRsp->compLen);
1,245,518✔
556
    pRsp->payloadLen = htonl(pRsp->payloadLen);
1,245,518✔
557
    pRsp->numOfCols = htonl(pRsp->numOfCols);
1,245,518✔
558
    pRsp->useconds = htobe64(pRsp->useconds);
1,245,518✔
559
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
1,245,771✔
560

561
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
1,245,771✔
562
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
563
  } else {
564
    taosMemoryFree(pMsg->pData);
242!
UNCOV
565
    pSourceDataInfo->code = rpcCvtErrCode(code);
×
UNCOV
566
    if (pSourceDataInfo->code != code) {
×
567
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
568
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
569
    } else {
UNCOV
570
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
×
571
             pExchangeInfo);
572
    }
573
  }
574

575
  tmemory_barrier();
1,245,782✔
576
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
1,245,782✔
577
  code = tsem_post(&pExchangeInfo->ready);
1,245,782✔
578
  if (code != TSDB_CODE_SUCCESS) {
1,245,741!
579
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
580
    return code;
×
581
  }
582

583
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
1,245,741✔
584
  if (code != TSDB_CODE_SUCCESS) {
1,245,869!
585
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
586
  }
587
  return code;
1,245,856✔
588
}
589

590
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
1,281✔
591
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,281!
592
  if (NULL == *ppRes) {
1,281!
593
    return terrno;
×
594
  }
595

596
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
1,281!
597
  if (NULL == pScan) {
1,281!
598
    taosMemoryFreeClear(*ppRes);
×
599
    return terrno;
×
600
  }
601

602
  pScan->pUidList = taosArrayDup(pUidList, NULL);
1,281✔
603
  if (NULL == pScan->pUidList) {
1,281!
604
    taosMemoryFree(pScan);
×
605
    taosMemoryFreeClear(*ppRes);
×
606
    return terrno;
×
607
  }
608
  pScan->tableSeq = tableSeq;
1,281✔
609
  pScan->pOrgTbInfo = NULL;
1,281✔
610
  pScan->window.skey = INT64_MAX;
1,281✔
611
  pScan->window.ekey = INT64_MIN;
1,281✔
612

613
  (*ppRes)->opType = srcOpType;
1,281✔
614
  (*ppRes)->downstreamIdx = 0;
1,281✔
615
  (*ppRes)->value = pScan;
1,281✔
616
  (*ppRes)->pChildren = NULL;
1,281✔
617
  (*ppRes)->reUse = false;
1,281✔
618

619
  return TSDB_CODE_SUCCESS;
1,281✔
620
}
621

622
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window) {
×
623
  int32_t                  code = TSDB_CODE_SUCCESS;
×
624
  int32_t                  lino = 0;
×
625
  STableScanOperatorParam* pScan = NULL;
×
626

627
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
628
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
629

630
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
×
631
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
×
632

633
  pScan->pUidList = taosArrayDup(pUidList, NULL);
×
634
  QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
×
635

636
  pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
637
  QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
×
638

639
  pScan->pOrgTbInfo->vgId = pMap->vgId;
×
640
  tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
641

642
  pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
×
643
  QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
×
644

645
  pScan->tableSeq = tableSeq;
×
646
  pScan->window.skey = window->skey;
×
647
  pScan->window.ekey = window->ekey;
×
648

649
  (*ppRes)->opType = srcOpType;
×
650
  (*ppRes)->downstreamIdx = 0;
×
651
  (*ppRes)->value = pScan;
×
652
  (*ppRes)->pChildren = NULL;
×
653
  (*ppRes)->reUse = false;
×
654

655
  return code;
×
656
_return:
×
657
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
658
  taosMemoryFreeClear(*ppRes);
×
659
  if (pScan) {
×
660
    taosArrayDestroy(pScan->pUidList);
×
661
    if (pScan->pOrgTbInfo) {
×
662
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
663
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
664
    }
665
    taosMemoryFree(pScan);
×
666
  }
667
  return code;
×
668
}
669

670
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
1,246,200✔
671
  int32_t          code = TSDB_CODE_SUCCESS;
1,246,200✔
672
  int32_t          lino = 0;
1,246,200✔
673
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
1,246,200✔
674
  if (!pDataInfo) {
1,246,257!
675
    return terrno;
×
676
  }
677

678
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
1,246,303!
679
    return TSDB_CODE_SUCCESS;
×
680
  }
681

682
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
1,246,303✔
683
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
1,246,303✔
684
  if (!pSource) {
1,246,353!
685
    return terrno;
×
686
  }
687

688
  pDataInfo->startTime = taosGetTimestampUs();
1,246,321✔
689
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,246,321✔
690

691
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
1,246,389!
692
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
1,246,442!
693
  pWrapper->exchangeId = pExchangeInfo->self;
1,246,442✔
694
  pWrapper->sourceIndex = sourceIndex;
1,246,442✔
695

696
  if (pSource->localExec) {
1,246,442✔
697
    SDataBuf pBuf = {0};
16,199✔
698
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
16,199✔
699
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
700
                                               pTaskInfo->localFetch.explainRes);
701
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
16,199✔
702
    taosMemoryFree(pWrapper);
16,199!
703
    QUERY_CHECK_CODE(code, lino, _end);
16,199!
704
  } else {
705
    SResFetchReq req = {0};
1,230,243✔
706
    req.header.vgId = pSource->addr.nodeId;
1,230,243✔
707
    req.sId = pSource->sId;
1,230,243✔
708
    req.clientId = pSource->clientId;
1,230,243✔
709
    req.taskId = pSource->taskId;
1,230,243✔
710
    req.queryId = pTaskInfo->id.queryId;
1,230,243✔
711
    req.execId = pSource->execId;
1,230,243✔
712
    if (pDataInfo->isVtbRefScan) {
1,230,243!
713
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window);
×
714
      taosArrayDestroy(pDataInfo->colMap->colMap);
×
715
      taosMemoryFreeClear(pDataInfo->colMap);
×
716
      taosArrayDestroy(pDataInfo->pSrcUidList);
×
717
      pDataInfo->pSrcUidList = NULL;
×
718
      if (TSDB_CODE_SUCCESS != code) {
×
719
        pTaskInfo->code = code;
×
720
        taosMemoryFree(pWrapper);
×
721
        return pTaskInfo->code;
×
722
      }
723
    } else {
724
      if (pDataInfo->pSrcUidList) {
1,230,243✔
725
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
1,203✔
726
        taosArrayDestroy(pDataInfo->pSrcUidList);
1,203✔
727
        pDataInfo->pSrcUidList = NULL;
1,203✔
728
        if (TSDB_CODE_SUCCESS != code) {
1,203!
729
          pTaskInfo->code = code;
×
730
          taosMemoryFree(pWrapper);
×
731
          return pTaskInfo->code;
×
732
        }
733
      }
734
    }
735

736
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
1,230,243✔
737
    if (msgSize < 0) {
1,229,951!
738
      pTaskInfo->code = msgSize;
×
739
      taosMemoryFree(pWrapper);
×
740
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
741
      return pTaskInfo->code;
×
742
    }
743

744
    void* msg = taosMemoryCalloc(1, msgSize);
1,229,951!
745
    if (NULL == msg) {
1,230,148!
746
      pTaskInfo->code = terrno;
×
747
      taosMemoryFree(pWrapper);
×
748
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
749
      return pTaskInfo->code;
×
750
    }
751

752
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req);
1,230,148✔
753
    if (msgSize < 0) {
1,229,968!
754
      pTaskInfo->code = msgSize;
×
755
      taosMemoryFree(pWrapper);
×
756
      taosMemoryFree(msg);
×
757
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
758
      return pTaskInfo->code;
×
759
    }
760

761
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
1,229,968✔
762

763
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
1,229,962✔
764
           ", execId:%d, %p, %d/%" PRIzu,
765
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
766
           pSource->taskId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
767

768
    // send the fetch remote task result reques
769
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1,229,971!
770
    if (NULL == pMsgSendInfo) {
1,230,130!
771
      taosMemoryFreeClear(msg);
×
772
      taosMemoryFree(pWrapper);
×
773
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
774
      pTaskInfo->code = terrno;
×
775
      return pTaskInfo->code;
×
776
    }
777

778
    pMsgSendInfo->param = pWrapper;
1,230,130✔
779
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
1,230,130✔
780
    pMsgSendInfo->msgInfo.pData = msg;
1,230,130✔
781
    pMsgSendInfo->msgInfo.len = msgSize;
1,230,130✔
782
    pMsgSendInfo->msgType = pSource->fetchMsgType;
1,230,130✔
783
    pMsgSendInfo->fp = loadRemoteDataCallback;
1,230,130✔
784

785
    int64_t transporterId = 0;
1,230,130✔
786
    void* poolHandle = NULL;
1,230,130✔
787
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
1,230,130✔
788
    QUERY_CHECK_CODE(code, lino, _end);
1,230,248!
789
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
1,230,248✔
790
    *pRpcHandle = transporterId;
1,230,248✔
791
  }
792

793
_end:
1,246,447✔
794
  if (code != TSDB_CODE_SUCCESS) {
1,246,447!
795
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
796
  }
797
  return code;
1,246,449✔
798
}
799

800
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
957,150✔
801
                          SOperatorInfo* pOperator) {
802
  pInfo->totalRows += numOfRows;
957,150✔
803
  pInfo->totalSize += dataLen;
957,150✔
804
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
957,159✔
805
  pOperator->resultInfo.totalRows += numOfRows;
957,159✔
806
}
957,159✔
807

808
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
4,142,016✔
809
  int32_t      code = TSDB_CODE_SUCCESS;
4,142,016✔
810
  int32_t      lino = 0;
4,142,016✔
811
  SSDataBlock* pBlock = NULL;
4,142,016✔
812
  if (pColList == NULL) {  // data from other sources
4,142,016✔
813
    blockDataCleanup(pRes);
4,126,325✔
814
    code = blockDecode(pRes, pData, (const char**)pNextStart);
4,126,240✔
815
    if (code) {
4,125,944!
816
      return code;
×
817
    }
818
  } else {  // extract data according to pColList
819
    char* pStart = pData;
15,691✔
820

821
    int32_t numOfCols = htonl(*(int32_t*)pStart);
15,691✔
822
    pStart += sizeof(int32_t);
15,691✔
823

824
    // todo refactor:extract method
825
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
15,691✔
826
    for (int32_t i = 0; i < numOfCols; ++i) {
260,554✔
827
      SSysTableSchema* p = (SSysTableSchema*)pStart;
244,863✔
828

829
      p->colId = htons(p->colId);
244,863✔
830
      p->bytes = htonl(p->bytes);
244,863✔
831
      pStart += sizeof(SSysTableSchema);
244,863✔
832
    }
833

834
    pBlock = NULL;
15,691✔
835
    code = createDataBlock(&pBlock);
15,691✔
836
    QUERY_CHECK_CODE(code, lino, _end);
15,705!
837

838
    for (int32_t i = 0; i < numOfCols; ++i) {
260,568✔
839
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
244,863✔
840
      code = blockDataAppendColInfo(pBlock, &idata);
244,863✔
841
      QUERY_CHECK_CODE(code, lino, _end);
244,863!
842
    }
843

844
    const char* pDummy = NULL;
15,705✔
845
    code = blockDecode(pBlock, pStart, &pDummy);
15,705✔
846
    QUERY_CHECK_CODE(code, lino, _end);
15,705!
847

848
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
15,705✔
849
    QUERY_CHECK_CODE(code, lino, _end);
15,705!
850

851
    // data from mnode
852
    pRes->info.dataLoad = 1;
15,705✔
853
    pRes->info.rows = pBlock->info.rows;
15,705✔
854
    pRes->info.scanFlag = MAIN_SCAN;
15,705✔
855
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
15,705✔
856
    QUERY_CHECK_CODE(code, lino, _end);
15,705!
857

858
    blockDataDestroy(pBlock);
15,705✔
859
    pBlock = NULL;
15,705✔
860
  }
861

862
_end:
4,141,649✔
863
  if (code != TSDB_CODE_SUCCESS) {
4,141,649!
864
    blockDataDestroy(pBlock);
×
865
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
866
  }
867
  return code;
4,141,644✔
868
}
869

870
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
556,723✔
871
  SExchangeInfo* pExchangeInfo = pOperator->info;
556,723✔
872
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
556,723✔
873

874
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
556,723✔
875
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
556,723✔
876
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
556,724✔
877
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
878
         pLoadInfo->totalElapsed / 1000.0);
879

880
  setOperatorCompleted(pOperator);
556,724✔
881
}
556,728✔
882

883
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
1,981,632✔
884
  int32_t code = TSDB_CODE_SUCCESS;
1,981,632✔
885
  int32_t lino = 0;
1,981,632✔
886
  size_t  total = taosArrayGetSize(pArray);
1,981,632✔
887

888
  int32_t completed = 0;
1,981,646✔
889
  for (int32_t k = 0; k < total; ++k) {
5,501,595✔
890
    SSourceDataInfo* p = taosArrayGet(pArray, k);
3,519,964✔
891
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
3,519,946!
892
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
3,519,949✔
893
      completed += 1;
1,693,523✔
894
    }
895
  }
896

897
  *pRes = completed;
1,981,631✔
898
_end:
1,981,631✔
899
  if (code != TSDB_CODE_SUCCESS) {
1,981,631!
900
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
901
  }
902
  return code;
1,981,614✔
903
}
904

905
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
799,875✔
906
  SExchangeInfo* pExchangeInfo = pOperator->info;
799,875✔
907
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
799,875✔
908

909
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
799,875✔
910
  int64_t startTs = taosGetTimestampUs();
800,100✔
911

912
  // Asynchronously send all fetch requests to all sources.
913
  for (int32_t i = 0; i < totalSources; ++i) {
1,985,975✔
914
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
1,185,839✔
915
    if (code != TSDB_CODE_SUCCESS) {
1,185,875!
916
      pTaskInfo->code = code;
×
917
      return code;
×
918
    }
919
  }
920

921
  int64_t endTs = taosGetTimestampUs();
800,127✔
922
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
800,127✔
923
         totalSources, (endTs - startTs) / 1000.0);
924

925
  pOperator->status = OP_RES_TO_RETURN;
800,128✔
926
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
800,131✔
927
  if (isTaskKilled(pTaskInfo)) {
800,131✔
928
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
3!
929
  }
930

931
  return TSDB_CODE_SUCCESS;
800,135✔
932
}
933

934
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
941,472✔
935
  int32_t            code = TSDB_CODE_SUCCESS;
941,472✔
936
  int32_t            lino = 0;
941,472✔
937
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
941,472✔
938
  SSDataBlock*       pb = NULL;
941,472✔
939

940
  char* pNextStart = pRetrieveRsp->data;
941,472✔
941
  char* pStart = pNextStart;
941,472✔
942

943
  int32_t index = 0;
941,472✔
944

945
  if (pRetrieveRsp->compressed) {  // decompress the data
941,472!
946
    if (pDataInfo->decompBuf == NULL) {
×
947
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
948
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
949
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
950
    } else {
951
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
952
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
953
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
954
        if (p != NULL) {
×
955
          pDataInfo->decompBuf = p;
×
956
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
957
        }
958
      }
959
    }
960
  }
961

962
  while (index++ < pRetrieveRsp->numOfBlocks) {
5,067,492✔
963
    pStart = pNextStart;
4,126,125✔
964

965
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
4,126,125✔
966
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
1,681,423✔
967
      blockDataCleanup(pb);
1,681,422✔
968
    } else {
969
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
2,444,732✔
970
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
2,444,931!
971
    }
972

973
    int32_t compLen = *(int32_t*)pStart;
4,126,352✔
974
    pStart += sizeof(int32_t);
4,126,352✔
975

976
    int32_t rawLen = *(int32_t*)pStart;
4,126,352✔
977
    pStart += sizeof(int32_t);
4,126,352✔
978
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
4,126,352!
979

980
    pNextStart = pStart + compLen;
4,126,352✔
981
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
4,126,352!
982
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
983
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
984
      pStart = pDataInfo->decompBuf;
×
985
    }
986

987
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
4,126,352✔
988
    if (code != 0) {
4,125,950!
989
      taosMemoryFreeClear(pDataInfo->pRsp);
×
990
      goto _end;
×
991
    }
992

993
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
4,125,950✔
994
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
4,126,020!
995
    pb = NULL;
4,126,020✔
996
  }
997

998
_end:
941,367✔
999
  if (code != TSDB_CODE_SUCCESS) {
941,367!
1000
    blockDataDestroy(pb);
×
1001
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1002
  }
1003
  return code;
941,453✔
1004
}
1005

1006
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
2,014✔
1007
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,014✔
1008
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,014✔
1009

1010
  int32_t code = 0;
2,014✔
1011
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2,014✔
1012
  int64_t startTs = taosGetTimestampUs();
2,014✔
1013

1014
  while (1) {
599✔
1015
    if (pExchangeInfo->current >= totalSources) {
2,613✔
1016
      setAllSourcesCompleted(pOperator);
737✔
1017
      return TSDB_CODE_SUCCESS;
737✔
1018
    }
1019

1020
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
1,876✔
1021
    if (!pDataInfo) {
1,876!
1022
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1023
      pTaskInfo->code = terrno;
×
1024
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1025
    }
1026
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,876✔
1027

1028
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
1,876✔
1029
    if (code != TSDB_CODE_SUCCESS) {
1,876!
1030
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1031
      pTaskInfo->code = code;
×
1032
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1033
    }
1034

1035
    code = exchangeWait(pOperator, pExchangeInfo);
1,876✔
1036
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
1,876!
1037
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1038
    }
1039

1040
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
1,876✔
1041
    if (!pSource) {
1,876!
1042
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1043
      pTaskInfo->code = terrno;
×
1044
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1045
    }
1046

1047
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
1,876!
1048
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
1049
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1050
             tstrerror(pDataInfo->code));
1051
      pOperator->pTaskInfo->code = pDataInfo->code;
×
1052
      return pOperator->pTaskInfo->code;
×
1053
    }
1054

1055
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
1,876✔
1056
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,876✔
1057

1058
    if (pRsp->numOfRows == 0) {
1,876✔
1059
      qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
599✔
1060
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
1061
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1062
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1063

1064
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
599✔
1065
      if (pDataInfo->isVtbRefScan) {
599!
1066
        pExchangeInfo->current = totalSources;
×
1067
      } else {
1068
        pExchangeInfo->current += 1;
599✔
1069
      }
1070
      taosMemoryFreeClear(pDataInfo->pRsp);
599!
1071
      continue;
599✔
1072
    }
1073

1074
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
1,277✔
1075
    if (code != TSDB_CODE_SUCCESS) {
1,277!
1076
      goto _error;
×
1077
    }
1078

1079
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
1,277✔
1080
    if (pRsp->completed == 1) {
1,277✔
1081
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
962✔
1082
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
1083
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1084
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1085
             pExchangeInfo->current + 1, totalSources);
1086

1087
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
962✔
1088
      if (pDataInfo->isVtbRefScan) {
962!
1089
        pExchangeInfo->current = totalSources;
×
1090
      } else {
1091
        pExchangeInfo->current += 1;
962✔
1092
      }
1093
    } else {
1094
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
315✔
1095
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1096
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1097
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1098
    }
1099
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
1,277!
1100
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
×
1101
    }
1102
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1,277✔
1103
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
1,277✔
1104

1105
    taosMemoryFreeClear(pDataInfo->pRsp);
1,277!
1106
    return TSDB_CODE_SUCCESS;
1,277✔
1107
  }
1108

1109
_error:
×
1110
  pTaskInfo->code = code;
×
1111
  return code;
×
1112
}
1113

1114
void clearVtbScanDataInfo(void* pItem) {
×
1115
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
×
1116
  if (pInfo->colMap) {
×
1117
    taosArrayDestroy(pInfo->colMap->colMap);
×
1118
    taosMemoryFreeClear(pInfo->colMap);
×
1119
  }
1120
  taosArrayDestroy(pInfo->pSrcUidList);
×
1121
}
×
1122

1123
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
1,203✔
1124
  SExchangeInfo*     pExchangeInfo = pOperator->info;
1,203✔
1125
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
1,203✔
1126
  if (NULL == pIdx) {
1,203!
1127
    qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1128
    return TSDB_CODE_INVALID_PARA;
×
1129
  }
1130

1131
  if (pBasicParam->isVtbRefScan) {
1,203!
1132
    SSourceDataInfo dataInfo = {0};
×
1133
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
×
1134
    dataInfo.taskId = pExchangeInfo->pTaskId;
×
1135
    dataInfo.index = pIdx->srcIdx;
×
1136
    dataInfo.window = pBasicParam->window;
×
1137
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1138
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1139
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1140
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1141

1142
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1143
    if (dataInfo.pSrcUidList == NULL) {
×
1144
      return terrno;
×
1145
    }
1146

1147
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
×
1148
    dataInfo.srcOpType = pBasicParam->srcOpType;
×
1149
    dataInfo.tableSeq = pBasicParam->tableSeq;
×
1150

1151
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
×
1152
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
×
1153
    if (!tmp) {
×
1154
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1155
      return terrno;
×
1156
    }
1157
  } else {
1158
    if (pIdx->inUseIdx < 0) {
1,203✔
1159
      SSourceDataInfo dataInfo = {0};
1,179✔
1160
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,179✔
1161
      dataInfo.taskId = pExchangeInfo->pTaskId;
1,179✔
1162
      dataInfo.index = pIdx->srcIdx;
1,179✔
1163
      if (pBasicParam->isVtbRefScan) {
1,179!
1164
        dataInfo.window = pBasicParam->window;
×
1165
        dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1166
        dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1167
        tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1168
        dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1169
      }
1170

1171
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,179✔
1172
      if (dataInfo.pSrcUidList == NULL) {
1,179!
1173
        return terrno;
×
1174
      }
1175

1176
      dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
1,179✔
1177
      dataInfo.srcOpType = pBasicParam->srcOpType;
1,179✔
1178
      dataInfo.tableSeq = pBasicParam->tableSeq;
1,179✔
1179

1180
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
1,179✔
1181
      if (!tmp) {
1,179!
1182
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1183
        return terrno;
×
1184
      }
1185
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,179✔
1186
    } else {
1187
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
24✔
1188
      if (!pDataInfo) {
24!
1189
        return terrno;
×
1190
      }
1191
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
24!
1192
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
24✔
1193
      }
1194

1195
      if (pBasicParam->isVtbRefScan) {
24!
1196
        pDataInfo->window = pBasicParam->window;
×
1197
        if (!pDataInfo->colMap) {
×
1198
          pDataInfo->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1199
        }
1200
        pDataInfo->colMap->vgId = pBasicParam->colMap->vgId;
×
1201
        tstrncpy(pDataInfo->colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1202
        pDataInfo->colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1203
      }
1204

1205
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
24✔
1206
      if (pDataInfo->pSrcUidList == NULL) {
24!
1207
        return terrno;
×
1208
      }
1209

1210
      pDataInfo->isVtbRefScan = pBasicParam->isVtbRefScan;
24✔
1211

1212
      pDataInfo->srcOpType = pBasicParam->srcOpType;
24✔
1213
      pDataInfo->tableSeq = pBasicParam->tableSeq;
24✔
1214
    }
1215
  }
1216

1217
  return TSDB_CODE_SUCCESS;
1,203✔
1218
}
1219

1220
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
718✔
1221
  SExchangeInfo*               pExchangeInfo = pOperator->info;
718✔
1222
  int32_t                      code = TSDB_CODE_SUCCESS;
718✔
1223
  SExchangeOperatorBasicParam* pBasicParam = NULL;
718✔
1224
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
718✔
1225
  if (pParam->multiParams) {
718✔
1226
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
670✔
1227
    int32_t                      iter = 0;
670✔
1228
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
1,825✔
1229
      code = addSingleExchangeSource(pOperator, pBasicParam);
1,155✔
1230
      if (code) {
1,155!
1231
        return code;
×
1232
      }
1233
    }
1234
  } else {
1235
    pBasicParam = &pParam->basic;
48✔
1236
    code = addSingleExchangeSource(pOperator, pBasicParam);
48✔
1237
  }
1238

1239
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
718✔
1240
  pOperator->pOperatorGetParam = NULL;
718✔
1241

1242
  return code;
718✔
1243
}
1244

1245
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
4,996,252✔
1246
  SExchangeInfo* pExchangeInfo = pOperator->info;
4,996,252✔
1247
  int32_t        code = TSDB_CODE_SUCCESS;
4,996,252✔
1248
  int32_t        lino = 0;
4,996,252✔
1249
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
4,996,252✔
1250
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
804,009✔
1251
    return TSDB_CODE_SUCCESS;
4,195,667✔
1252
  }
1253

1254
  if (pExchangeInfo->dynamicOp) {
800,585✔
1255
    code = addDynamicExchangeSource(pOperator);
718✔
1256
    QUERY_CHECK_CODE(code, lino, _end);
718!
1257
  }
1258

1259
  if (pOperator->status == OP_NOT_OPENED && pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
800,585!
1260
    pExchangeInfo->current = 0;
×
1261
  }
1262

1263
  int64_t st = taosGetTimestampUs();
800,849✔
1264

1265
  if (!pExchangeInfo->seqLoadData) {
800,849✔
1266
    code = prepareConcurrentlyLoad(pOperator);
800,037✔
1267
    QUERY_CHECK_CODE(code, lino, _end);
800,116!
1268
    pExchangeInfo->openedTs = taosGetTimestampUs();
800,137✔
1269
  }
1270

1271
  OPTR_SET_OPENED(pOperator);
800,949✔
1272
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
800,908✔
1273

1274
_end:
800,908✔
1275
  if (code != TSDB_CODE_SUCCESS) {
800,908!
1276
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1277
    pOperator->pTaskInfo->code = code;
×
1278
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1279
  }
1280
  return TSDB_CODE_SUCCESS;
800,908✔
1281
}
1282

1283
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
23,216✔
1284
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
23,216✔
1285

1286
  if (pLimitInfo->remainGroupOffset > 0) {
23,216!
1287
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1288
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1289
      blockDataCleanup(pBlock);
×
1290
      return PROJECT_RETRIEVE_CONTINUE;
×
1291
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1292
      // now it is the data from a new group
1293
      pLimitInfo->remainGroupOffset -= 1;
×
1294

1295
      // ignore data block in current group
1296
      if (pLimitInfo->remainGroupOffset > 0) {
×
1297
        blockDataCleanup(pBlock);
×
1298
        return PROJECT_RETRIEVE_CONTINUE;
×
1299
      }
1300
    }
1301

1302
    // set current group id of the project operator
1303
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1304
  }
1305

1306
  // here check for a new group data, we need to handle the data of the previous group.
1307
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
23,216✔
1308
    pLimitInfo->numOfOutputGroups += 1;
889✔
1309
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
889!
1310
      pOperator->status = OP_EXEC_DONE;
×
1311
      blockDataCleanup(pBlock);
×
1312

1313
      return PROJECT_RETRIEVE_DONE;
×
1314
    }
1315

1316
    // reset the value for a new group data
1317
    resetLimitInfoForNextGroup(pLimitInfo);
889✔
1318
    // existing rows that belongs to previous group.
1319
    if (pBlock->info.rows > 0) {
889!
1320
      return PROJECT_RETRIEVE_DONE;
889✔
1321
    }
1322
  }
1323

1324
  // here we reach the start position, according to the limit/offset requirements.
1325

1326
  // set current group id
1327
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
22,327✔
1328

1329
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
22,327✔
1330
  if (pBlock->info.rows == 0) {
22,327✔
1331
    return PROJECT_RETRIEVE_CONTINUE;
8,984✔
1332
  } else {
1333
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
13,343!
1334
      setOperatorCompleted(pOperator);
×
1335
      return PROJECT_RETRIEVE_DONE;
×
1336
    }
1337
  }
1338

1339
  // todo optimize performance
1340
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1341
  // they may not belong to the same group the limit/offset value is not valid in this case.
1342
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
13,343!
1343
    return PROJECT_RETRIEVE_DONE;
13,343✔
1344
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1345
    return PROJECT_RETRIEVE_CONTINUE;
×
1346
  }
1347
}
1348

1349
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
1,245,117✔
1350
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
1,245,117✔
1351
  int32_t        code = TSDB_CODE_SUCCESS;
1,245,117✔
1352
  if (pTask->pWorkerCb) {
1,245,117!
1353
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
1,245,167✔
1354
    if (code != TSDB_CODE_SUCCESS) {
1,245,233!
1355
      pTask->code = code;
×
1356
      return pTask->code;
×
1357
    }
1358
  }
1359

1360
  code = tsem_wait(&pExchangeInfo->ready);
1,245,183✔
1361
  if (code != TSDB_CODE_SUCCESS) {
1,245,181!
1362
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1363
    pTask->code = code;
×
1364
    return pTask->code;
×
1365
  }
1366

1367
  if (pTask->pWorkerCb) {
1,245,224!
1368
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
1,245,224✔
1369
    if (code != TSDB_CODE_SUCCESS) {
1,245,238!
1370
      pTask->code = code;
×
1371
      return pTask->code;
×
1372
    }
1373
  }
1374
  return TSDB_CODE_SUCCESS;
1,245,238✔
1375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc