• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3858

17 Apr 2025 01:40PM UTC coverage: 62.968% (+0.5%) from 62.513%
#3858

push

travis-ci

web-flow
docs(opc): add perssit data support (#30783)

156194 of 316378 branches covered (49.37%)

Branch coverage included in aggregate %.

242021 of 316027 relevant lines covered (76.58%)

19473613.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.93
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
} SFetchRspHandleWrapper;
32

33
typedef struct SSourceDataInfo {
34
  int32_t            index;
35
  SRetrieveTableRsp* pRsp;
36
  uint64_t           totalRows;
37
  int64_t            startTime;
38
  int32_t            code;
39
  EX_SOURCE_STATUS   status;
40
  const char*        taskId;
41
  SArray*            pSrcUidList;
42
  int32_t            srcOpType;
43
  bool               tableSeq;
44
  char*              decompBuf;
45
  int32_t            decompBufSize;
46
  SOrgTbInfo*     colMap;
47
  bool               isVtbRefScan;
48
  STimeWindow        window;
49
} SSourceDataInfo;
50

51
static void destroyExchangeOperatorInfo(void* param);
52
static void freeBlock(void* pParam);
53
static void freeSourceDataInfo(void* param);
54
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
55

56
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
57
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
58
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
59
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
60
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
61
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
62
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
63
                                 bool holdDataInBuf);
64
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
65

66
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
67

68
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
6,815,730✔
69
                                           SExecTaskInfo* pTaskInfo) {
70
  int32_t code = 0;
6,815,730✔
71
  int32_t lino = 0;
6,815,730✔
72
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
6,815,730✔
73
  int32_t completed = 0;
6,815,371✔
74
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
6,815,371✔
75
  if (code != TSDB_CODE_SUCCESS) {
6,815,635!
76
    pTaskInfo->code = code;
×
77
    T_LONG_JMP(pTaskInfo->env, code);
×
78
  }
79
  if (completed == totalSources) {
6,815,635✔
80
    setAllSourcesCompleted(pOperator);
1,787,388✔
81
    return;
6,816,421✔
82
  }
83

84
  SSourceDataInfo* pDataInfo = NULL;
5,028,247✔
85

86
  while (1) {
292,696✔
87
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
5,320,943✔
88
    code = exchangeWait(pOperator, pExchangeInfo);
5,320,947✔
89

90
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
5,322,146!
91
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
32!
92
    }
93

94
    for (int32_t i = 0; i < totalSources; ++i) {
28,326,338✔
95
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
28,311,641✔
96
      QUERY_CHECK_NULL(pDataInfo, code, lino, _error, terrno);
28,304,540!
97
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
28,323,733✔
98
        continue;
21,764,189✔
99
      }
100

101
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
6,559,544✔
102
        continue;
1,240,108✔
103
      }
104

105
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
5,319,436✔
106
        code = pDataInfo->code;
9✔
107
        goto _error;
9✔
108
      }
109

110
      tmemory_barrier();
5,319,427✔
111
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
5,319,427✔
112
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
5,319,427✔
113
      QUERY_CHECK_NULL(pSource, code, lino, _error, terrno);
5,321,614!
114

115
      // todo
116
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
5,321,716✔
117
      if (pRsp->numOfRows == 0) {
5,321,716✔
118
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
1,773,665!
119
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
120
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
121
          if (code != TSDB_CODE_SUCCESS) {
×
122
            taosMemoryFreeClear(pDataInfo->pRsp);
×
123
            goto _error;
×
124
          }
125
        } else {
126
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,773,665✔
127
          qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
1,773,665✔
128
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
129
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
130
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
131
          taosMemoryFreeClear(pDataInfo->pRsp);
1,773,667!
132
        }
133
        break;
1,773,684✔
134
      }
135

136
      code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,548,051✔
137
      if (code != TSDB_CODE_SUCCESS) {
3,547,359!
138
        goto _error;
×
139
      }
140

141
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,547,359✔
142
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
3,547,359✔
143
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,547,668✔
144

145
      if (pRsp->completed == 1) {
3,547,668✔
146
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
3,169,942✔
147
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
3,169,942✔
148
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
149
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu,
150
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
151
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
152
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
153
      } else {
154
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
377,726✔
155
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
156
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
157
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
158
      }
159

160
      taosMemoryFreeClear(pDataInfo->pRsp);
3,547,515!
161

162
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan) {
3,548,058!
163
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
377,725✔
164
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
377,725✔
165
        if (code != TSDB_CODE_SUCCESS) {
377,725!
166
          taosMemoryFreeClear(pDataInfo->pRsp);
×
167
          goto _error;
×
168
        }
169
      }
170
      return;
5,029,008✔
171
    }  // end loop
172

173
    int32_t complete1 = 0;
1,788,381✔
174
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
1,788,381✔
175
    if (code != TSDB_CODE_SUCCESS) {
1,773,632!
176
      pTaskInfo->code = code;
×
177
      T_LONG_JMP(pTaskInfo->env, code);
×
178
    }
179
    if (complete1 == totalSources) {
1,773,647✔
180
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
1,480,951✔
181
      return;
1,480,950✔
182
    }
183
  }
184

185
_error:
9✔
186
  pTaskInfo->code = code;
9✔
187
}
188

189
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
20,076,786✔
190
  int32_t        code = TSDB_CODE_SUCCESS;
20,076,786✔
191
  SExchangeInfo* pExchangeInfo = pOperator->info;
20,076,786✔
192
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
20,076,786✔
193

194
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
20,076,786✔
195

196
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
20,076,461✔
197
  if (pOperator->status == OP_EXEC_DONE) {
20,076,461!
198
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
199
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
200
           pLoadInfo->totalElapsed / 1000.0);
201
    return NULL;
×
202
  }
203

204
  // we have buffered retrieved datablock, return it directly
205
  SSDataBlock* p = NULL;
20,076,461✔
206
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
20,076,461✔
207
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
13,258,338✔
208
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
13,257,941✔
209
  }
210

211
  if (p != NULL) {
20,076,337✔
212
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
13,258,248✔
213
    if (!tmp) {
13,258,179!
214
      code = terrno;
×
215
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
216
      pTaskInfo->code = code;
×
217
      T_LONG_JMP(pTaskInfo->env, code);
×
218
    }
219
    return p;
13,258,179✔
220
  } else {
221
    if (pExchangeInfo->seqLoadData) {
6,818,089✔
222
      code = seqLoadRemoteData(pOperator);
2,302✔
223
      if (code != TSDB_CODE_SUCCESS) {
2,302!
224
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
225
        pTaskInfo->code = code;
×
226
        T_LONG_JMP(pTaskInfo->env, code);
×
227
      }
228
    } else {
229
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
6,815,787✔
230
    }
231
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
6,818,447✔
232
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
9!
233
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
9!
234
    }
235
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
6,818,438✔
236
      return NULL;
3,269,162✔
237
    } else {
238
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
3,549,108✔
239
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
3,548,573✔
240
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
3,548,493✔
241
      if (!tmp) {
3,548,655!
242
        code = terrno;
×
243
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
244
        pTaskInfo->code = code;
×
245
        T_LONG_JMP(pTaskInfo->env, code);
×
246
      }
247
      return p;
3,548,655✔
248
    }
249
  }
250
}
251

252
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
20,067,687✔
253
  int32_t        code = TSDB_CODE_SUCCESS;
20,067,687✔
254
  int32_t        lino = 0;
20,067,687✔
255
  SExchangeInfo* pExchangeInfo = pOperator->info;
20,067,687✔
256
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
20,067,687✔
257

258
  code = pOperator->fpSet._openFn(pOperator);
20,067,687✔
259
  QUERY_CHECK_CODE(code, lino, _end);
20,067,861!
260

261
  if (pOperator->status == OP_EXEC_DONE) {
20,067,861✔
262
    (*ppRes) = NULL;
63✔
263
    return code;
63✔
264
  }
265

266
  while (1) {
9,172✔
267
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
20,076,970✔
268
    if (pBlock == NULL) {
20,074,999✔
269
      (*ppRes) = NULL;
3,269,167✔
270
      return code;
3,269,167✔
271
    }
272

273
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
16,805,832✔
274
    QUERY_CHECK_CODE(code, lino, _end);
16,806,551!
275

276
    if (blockDataGetNumOfRows(pBlock) == 0) {
16,806,551✔
277
      continue;
2✔
278
    }
279

280
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
16,806,557✔
281
    if (hasLimitOffsetInfo(pLimitInfo)) {
16,806,557✔
282
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
23,598✔
283
      if (status == PROJECT_RETRIEVE_CONTINUE) {
23,553✔
284
        continue;
9,170✔
285
      } else if (status == PROJECT_RETRIEVE_DONE) {
14,383!
286
        if (pBlock->info.rows == 0) {
14,383!
287
          setOperatorCompleted(pOperator);
×
288
          (*ppRes) = NULL;
×
289
          return code;
×
290
        } else {
291
          (*ppRes) = pBlock;
14,383✔
292
          return code;
14,383✔
293
        }
294
      }
295
    } else {
296
      (*ppRes) = pBlock;
16,782,964✔
297
      return code;
16,782,964✔
298
    }
299
  }
300

301
_end:
×
302
  if (code != TSDB_CODE_SUCCESS) {
×
303
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
304
    pTaskInfo->code = code;
×
305
    T_LONG_JMP(pTaskInfo->env, code);
×
306
  }
307
  (*ppRes) = NULL;
×
308
  return code;
×
309
}
310

311
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
3,352,900✔
312
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
3,352,900✔
313
  if (pInfo->pSourceDataInfo == NULL) {
3,352,819✔
314
    return terrno;
3✔
315
  }
316

317
  if (pInfo->dynamicOp) {
3,352,816✔
318
    return TSDB_CODE_SUCCESS;
22,074✔
319
  }
320

321
  int32_t len = strlen(id) + 1;
3,330,742✔
322
  pInfo->pTaskId = taosMemoryCalloc(1, len);
3,330,742!
323
  if (!pInfo->pTaskId) {
3,330,960!
324
    return terrno;
×
325
  }
326
  tstrncpy(pInfo->pTaskId, id, len);
3,330,960✔
327
  for (int32_t i = 0; i < numOfSources; ++i) {
8,277,213✔
328
    SSourceDataInfo dataInfo = {0};
4,946,062✔
329
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
4,946,062✔
330
    dataInfo.taskId = pInfo->pTaskId;
4,946,062✔
331
    dataInfo.index = i;
4,946,062✔
332
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
4,946,062✔
333
    if (pDs == NULL) {
4,946,253!
334
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
335
      return terrno;
×
336
    }
337
  }
338

339
  return TSDB_CODE_SUCCESS;
3,331,151✔
340
}
341

342
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
3,352,827✔
343
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
3,352,827!
344

345
  if (numOfSources == 0) {
3,352,827!
346
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
347
    return TSDB_CODE_INVALID_PARA;
×
348
  }
349
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
3,352,827✔
350
  if (!pInfo->pFetchRpcHandles) {
3,353,013!
351
    return terrno;
×
352
  }
353
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
3,353,013✔
354
  if (!ret) {
3,352,918!
355
    return terrno;
×
356
  }
357

358
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
3,352,918✔
359
  if (pInfo->pSources == NULL) {
3,352,736!
360
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
361
    return terrno;
×
362
  }
363

364
  if (pExNode->node.dynamicOp) {
3,352,736✔
365
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
22,074✔
366
    if (NULL == pInfo->pHashSources) {
22,086!
367
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
368
      return terrno;
×
369
    }
370
  }
371

372
  for (int32_t i = 0; i < numOfSources; ++i) {
8,342,918✔
373
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
4,989,974✔
374
    if (!pNode) {
4,989,683!
375
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
376
      return terrno;
×
377
    }
378
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
4,989,683✔
379
    if (!tmp) {
4,990,064!
380
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
381
      return terrno;
×
382
    }
383
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
4,990,064✔
384
    int32_t           code =
385
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
4,990,064✔
386
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
4,990,170!
387
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
388
      return code;
×
389
    }
390
  }
391

392
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
3,352,944✔
393
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
3,352,931✔
394
  if (refId < 0) {
3,353,020!
395
    int32_t code = terrno;
×
396
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
397
    return code;
×
398
  } else {
399
    pInfo->self = refId;
3,353,020✔
400
  }
401

402
  return initDataSource(numOfSources, pInfo, id);
3,353,020✔
403
}
404

405
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
3,352,837✔
406
                                   SOperatorInfo** pOptrInfo) {
407
  QRY_PARAM_CHECK(pOptrInfo);
3,352,837!
408

409
  int32_t        code = 0;
3,352,837✔
410
  int32_t        lino = 0;
3,352,837✔
411
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
3,352,837!
412
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,352,822!
413
  if (pInfo == NULL || pOperator == NULL) {
3,352,823!
414
    code = terrno;
×
415
    goto _error;
×
416
  }
417

418
  pInfo->dynamicOp = pExNode->node.dynamicOp;
3,352,835✔
419
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
3,352,835✔
420
  QUERY_CHECK_CODE(code, lino, _error);
3,352,948!
421

422
  code = tsem_init(&pInfo->ready, 0, 0);
3,352,948✔
423
  QUERY_CHECK_CODE(code, lino, _error);
3,352,859!
424

425
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
3,352,859✔
426
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
3,352,990!
427

428
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
3,352,990✔
429
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
3,352,830!
430
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
3,352,830✔
431
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
3,352,848!
432

433
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
3,352,848✔
434
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
3,352,848✔
435
  QUERY_CHECK_CODE(code, lino, _error);
3,353,038!
436

437
  pInfo->seqLoadData = pExNode->seqRecvData;
3,353,038✔
438
  pInfo->pTransporter = pTransporter;
3,353,038✔
439

440
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
3,353,038✔
441
                  pTaskInfo);
442
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
3,352,953✔
443

444
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
3,352,916✔
445
  QUERY_CHECK_CODE(code, lino, _error);
3,352,891!
446

447
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
3,352,891✔
448
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
449
  *pOptrInfo = pOperator;
3,352,843✔
450
  return TSDB_CODE_SUCCESS;
3,352,843✔
451

452
_error:
×
453
  if (code != TSDB_CODE_SUCCESS) {
×
454
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
455
    pTaskInfo->code = code;
×
456
  }
457
  if (pInfo != NULL) {
×
458
    doDestroyExchangeOperatorInfo(pInfo);
×
459
  }
460

461
  if (pOperator != NULL) {
×
462
    pOperator->info = NULL;
×
463
    destroyOperator(pOperator);
×
464
  }
465
  return code;
×
466
}
467

468
void destroyExchangeOperatorInfo(void* param) {
3,352,997✔
469
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,352,997✔
470
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
3,352,997✔
471
  if (code != TSDB_CODE_SUCCESS) {
3,353,094!
472
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
473
  }
474
}
3,353,094✔
475

476
void freeBlock(void* pParam) {
7,700,791✔
477
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
7,700,791✔
478
  blockDataDestroy(pBlock);
7,700,791✔
479
}
7,700,861✔
480

481
void freeSourceDataInfo(void* p) {
4,946,698✔
482
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
4,946,698✔
483
  taosMemoryFreeClear(pInfo->decompBuf);
4,946,698!
484
  taosMemoryFreeClear(pInfo->pRsp);
4,946,698!
485

486
  pInfo->decompBufSize = 0;
4,946,698✔
487
}
4,946,698✔
488

489
void doDestroyExchangeOperatorInfo(void* param) {
3,353,051✔
490
  if (param == NULL) {
3,353,051!
491
    return;
×
492
  }
493
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,353,051✔
494
  if (pExInfo->pFetchRpcHandles) {
3,353,051!
495
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
8,342,479✔
496
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
4,989,513✔
497
      if (*pRpcHandle > 0) {
4,989,304✔
498
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
3,954✔
499
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
3,954✔
500
      }
501
    }
502
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
3,352,966✔
503
  }
504

505
  taosArrayDestroy(pExInfo->pSources);
3,352,946✔
506
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3,353,053✔
507

508
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
3,352,997✔
509
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
3,353,050✔
510

511
  blockDataDestroy(pExInfo->pDummyBlock);
3,353,059✔
512
  tSimpleHashCleanup(pExInfo->pHashSources);
3,353,090✔
513

514
  int32_t code = tsem_destroy(&pExInfo->ready);
3,353,068✔
515
  if (code != TSDB_CODE_SUCCESS) {
3,353,018!
516
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
517
  }
518
  taosMemoryFreeClear(pExInfo->pTaskId);
3,353,018!
519

520
  taosMemoryFreeClear(param);
3,353,047!
521
}
522

523
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
5,321,024✔
524
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
5,321,024✔
525

526
  taosMemoryFreeClear(pMsg->pEpSet);
5,321,024!
527
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
5,321,024✔
528
  if (pExchangeInfo == NULL) {
5,322,380✔
529
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
90!
530
    taosMemoryFree(pMsg->pData);
90!
531
    return TSDB_CODE_SUCCESS;
90✔
532
  }
533

534
  int32_t          index = pWrapper->sourceIndex;
5,322,290✔
535
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
5,322,290✔
536

537
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
5,315,787✔
538
  if (pRpcHandle != NULL) {
5,313,737✔
539
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
5,313,622✔
540
    if (ret != 0) {
5,324,150✔
541
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
19,796✔
542
    }
543
    *pRpcHandle = -1;
5,321,326✔
544
  }
545

546
  if (!pSourceDataInfo) {
5,321,441!
547
    return terrno;
×
548
  }
549

550
  if (code == TSDB_CODE_SUCCESS) {
5,321,441!
551
    pSourceDataInfo->pRsp = pMsg->pData;
5,322,255✔
552

553
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
5,322,255✔
554
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
5,322,255✔
555
    pRsp->compLen = htonl(pRsp->compLen);
5,319,003✔
556
    pRsp->payloadLen = htonl(pRsp->payloadLen);
5,319,003✔
557
    pRsp->numOfCols = htonl(pRsp->numOfCols);
5,319,003✔
558
    pRsp->useconds = htobe64(pRsp->useconds);
5,319,003✔
559
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
5,321,974✔
560

561
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
5,321,974✔
562
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
563
  } else {
564
    taosMemoryFree(pMsg->pData);
×
565
    pSourceDataInfo->code = rpcCvtErrCode(code);
22✔
566
    if (pSourceDataInfo->code != code) {
22!
567
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
568
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
569
    } else {
570
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
22!
571
             pExchangeInfo);
572
    }
573
  }
574

575
  tmemory_barrier();
5,322,011✔
576
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
5,322,011✔
577
  code = tsem_post(&pExchangeInfo->ready);
5,322,011✔
578
  if (code != TSDB_CODE_SUCCESS) {
5,323,368!
579
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
580
    return code;
×
581
  }
582

583
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
5,323,368✔
584
  if (code != TSDB_CODE_SUCCESS) {
5,324,319!
585
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
586
  }
587
  return code;
5,323,995✔
588
}
589

590
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
1,215✔
591
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,215!
592
  if (NULL == *ppRes) {
1,215!
593
    return terrno;
×
594
  }
595

596
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
1,215!
597
  if (NULL == pScan) {
1,215!
598
    taosMemoryFreeClear(*ppRes);
×
599
    return terrno;
×
600
  }
601

602
  pScan->pUidList = taosArrayDup(pUidList, NULL);
1,215✔
603
  if (NULL == pScan->pUidList) {
1,215!
604
    taosMemoryFree(pScan);
×
605
    taosMemoryFreeClear(*ppRes);
×
606
    return terrno;
×
607
  }
608
  pScan->tableSeq = tableSeq;
1,215✔
609
  pScan->pOrgTbInfo = NULL;
1,215✔
610
  pScan->window.skey = INT64_MAX;
1,215✔
611
  pScan->window.ekey = INT64_MIN;
1,215✔
612

613
  (*ppRes)->opType = srcOpType;
1,215✔
614
  (*ppRes)->downstreamIdx = 0;
1,215✔
615
  (*ppRes)->value = pScan;
1,215✔
616
  (*ppRes)->pChildren = NULL;
1,215✔
617
  (*ppRes)->reUse = false;
1,215✔
618

619
  return TSDB_CODE_SUCCESS;
1,215✔
620
}
621

622
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window) {
×
623
  int32_t                  code = TSDB_CODE_SUCCESS;
×
624
  int32_t                  lino = 0;
×
625
  STableScanOperatorParam* pScan = NULL;
×
626

627
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
628
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
629

630
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
×
631
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
×
632

633
  pScan->pUidList = taosArrayDup(pUidList, NULL);
×
634
  QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
×
635

636
  pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
637
  QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
×
638

639
  pScan->pOrgTbInfo->vgId = pMap->vgId;
×
640
  tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
641

642
  pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
×
643
  QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
×
644

645
  pScan->tableSeq = tableSeq;
×
646
  pScan->window.skey = window->skey;
×
647
  pScan->window.ekey = window->ekey;
×
648

649
  (*ppRes)->opType = srcOpType;
×
650
  (*ppRes)->downstreamIdx = 0;
×
651
  (*ppRes)->value = pScan;
×
652
  (*ppRes)->pChildren = NULL;
×
653
  (*ppRes)->reUse = false;
×
654

655
  return code;
×
656
_return:
×
657
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
658
  taosMemoryFreeClear(*ppRes);
×
659
  if (pScan) {
×
660
    taosArrayDestroy(pScan->pUidList);
×
661
    if (pScan->pOrgTbInfo) {
×
662
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
663
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
664
    }
665
    taosMemoryFree(pScan);
×
666
  }
667
  return code;
×
668
}
669

670
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
5,324,744✔
671
  int32_t          code = TSDB_CODE_SUCCESS;
5,324,744✔
672
  int32_t          lino = 0;
5,324,744✔
673
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
5,324,744✔
674
  if (!pDataInfo) {
5,324,158!
675
    return terrno;
×
676
  }
677

678
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
5,324,173!
679
    return TSDB_CODE_SUCCESS;
×
680
  }
681

682
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
5,324,173✔
683
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
5,324,173✔
684
  if (!pSource) {
5,323,858!
685
    return terrno;
×
686
  }
687

688
  pDataInfo->startTime = taosGetTimestampUs();
5,324,574✔
689
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
5,324,574✔
690

691
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
5,324,710!
692
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
5,325,306!
693
  pWrapper->exchangeId = pExchangeInfo->self;
5,325,306✔
694
  pWrapper->sourceIndex = sourceIndex;
5,325,306✔
695

696
  if (pSource->localExec) {
5,325,306✔
697
    SDataBuf pBuf = {0};
16,205✔
698
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
16,205✔
699
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
700
                                               pTaskInfo->localFetch.explainRes);
701
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
16,205✔
702
    taosMemoryFree(pWrapper);
16,205!
703
    QUERY_CHECK_CODE(code, lino, _end);
16,205!
704
  } else {
705
    SResFetchReq req = {0};
5,309,101✔
706
    req.header.vgId = pSource->addr.nodeId;
5,309,101✔
707
    req.sId = pSource->sId;
5,309,101✔
708
    req.clientId = pSource->clientId;
5,309,101✔
709
    req.taskId = pSource->taskId;
5,309,101✔
710
    req.queryId = pTaskInfo->id.queryId;
5,309,101✔
711
    req.execId = pSource->execId;
5,309,101✔
712
    if (pDataInfo->isVtbRefScan) {
5,309,101!
713
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window);
×
714
      taosArrayDestroy(pDataInfo->colMap->colMap);
×
715
      taosMemoryFreeClear(pDataInfo->colMap);
×
716
      taosArrayDestroy(pDataInfo->pSrcUidList);
×
717
      pDataInfo->pSrcUidList = NULL;
×
718
      if (TSDB_CODE_SUCCESS != code) {
×
719
        pTaskInfo->code = code;
×
720
        taosMemoryFree(pWrapper);
×
721
        return pTaskInfo->code;
×
722
      }
723
    } else {
724
      if (pDataInfo->pSrcUidList) {
5,309,101✔
725
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
1,148✔
726
        taosArrayDestroy(pDataInfo->pSrcUidList);
1,148✔
727
        pDataInfo->pSrcUidList = NULL;
1,148✔
728
        if (TSDB_CODE_SUCCESS != code) {
1,148!
729
          pTaskInfo->code = code;
×
730
          taosMemoryFree(pWrapper);
×
731
          return pTaskInfo->code;
×
732
        }
733
      }
734
    }
735

736
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
5,309,101✔
737
    if (msgSize < 0) {
5,308,727!
738
      pTaskInfo->code = msgSize;
×
739
      taosMemoryFree(pWrapper);
×
740
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
741
      return pTaskInfo->code;
×
742
    }
743

744
    void* msg = taosMemoryCalloc(1, msgSize);
5,308,727!
745
    if (NULL == msg) {
5,309,085!
746
      pTaskInfo->code = terrno;
×
747
      taosMemoryFree(pWrapper);
×
748
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
749
      return pTaskInfo->code;
×
750
    }
751

752
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req);
5,309,085✔
753
    if (msgSize < 0) {
5,308,603!
754
      pTaskInfo->code = msgSize;
×
755
      taosMemoryFree(pWrapper);
×
756
      taosMemoryFree(msg);
×
757
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
758
      return pTaskInfo->code;
×
759
    }
760

761
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
5,308,603✔
762

763
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
5,308,453✔
764
           ", execId:%d, %p, %d/%" PRIzu,
765
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
766
           pSource->taskId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
767

768
    // send the fetch remote task result reques
769
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,308,461!
770
    if (NULL == pMsgSendInfo) {
5,308,695!
771
      taosMemoryFreeClear(msg);
×
772
      taosMemoryFree(pWrapper);
×
773
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
774
      pTaskInfo->code = terrno;
×
775
      return pTaskInfo->code;
×
776
    }
777

778
    pMsgSendInfo->param = pWrapper;
5,308,695✔
779
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,308,695✔
780
    pMsgSendInfo->msgInfo.pData = msg;
5,308,695✔
781
    pMsgSendInfo->msgInfo.len = msgSize;
5,308,695✔
782
    pMsgSendInfo->msgType = pSource->fetchMsgType;
5,308,695✔
783
    pMsgSendInfo->fp = loadRemoteDataCallback;
5,308,695✔
784

785
    int64_t transporterId = 0;
5,308,695✔
786
    void* poolHandle = NULL;
5,308,695✔
787
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
5,308,695✔
788
    QUERY_CHECK_CODE(code, lino, _end);
5,309,241!
789
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
5,309,241✔
790
    *pRpcHandle = transporterId;
5,308,787✔
791
  }
792

793
_end:
5,324,992✔
794
  if (code != TSDB_CODE_SUCCESS) {
5,324,992!
795
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
796
  }
797
  return code;
5,324,939✔
798
}
799

800
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
3,753,240✔
801
                          SOperatorInfo* pOperator) {
802
  pInfo->totalRows += numOfRows;
3,753,240✔
803
  pInfo->totalSize += dataLen;
3,753,240✔
804
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
3,753,641✔
805
  pOperator->resultInfo.totalRows += numOfRows;
3,753,641✔
806
}
3,753,641✔
807

808
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
17,125,945✔
809
  int32_t      code = TSDB_CODE_SUCCESS;
17,125,945✔
810
  int32_t      lino = 0;
17,125,945✔
811
  SSDataBlock* pBlock = NULL;
17,125,945✔
812
  if (pColList == NULL) {  // data from other sources
17,125,945✔
813
    blockDataCleanup(pRes);
16,921,632✔
814
    code = blockDecode(pRes, pData, (const char**)pNextStart);
16,921,088✔
815
    if (code) {
16,919,674!
816
      return code;
×
817
    }
818
  } else {  // extract data according to pColList
819
    char* pStart = pData;
204,313✔
820

821
    int32_t numOfCols = htonl(*(int32_t*)pStart);
204,313✔
822
    pStart += sizeof(int32_t);
204,313✔
823

824
    // todo refactor:extract method
825
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
204,313✔
826
    for (int32_t i = 0; i < numOfCols; ++i) {
3,462,701✔
827
      SSysTableSchema* p = (SSysTableSchema*)pStart;
3,258,388✔
828

829
      p->colId = htons(p->colId);
3,258,388✔
830
      p->bytes = htonl(p->bytes);
3,258,388✔
831
      pStart += sizeof(SSysTableSchema);
3,258,388✔
832
    }
833

834
    pBlock = NULL;
204,313✔
835
    code = createDataBlock(&pBlock);
204,313✔
836
    QUERY_CHECK_CODE(code, lino, _end);
204,466!
837

838
    for (int32_t i = 0; i < numOfCols; ++i) {
3,457,385✔
839
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
3,252,952✔
840
      code = blockDataAppendColInfo(pBlock, &idata);
3,260,218✔
841
      QUERY_CHECK_CODE(code, lino, _end);
3,252,919!
842
    }
843

844
    const char* pDummy = NULL;
204,433✔
845
    code = blockDecode(pBlock, pStart, &pDummy);
204,433✔
846
    QUERY_CHECK_CODE(code, lino, _end);
204,384!
847

848
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
204,384✔
849
    QUERY_CHECK_CODE(code, lino, _end);
204,415!
850

851
    // data from mnode
852
    pRes->info.dataLoad = 1;
204,415✔
853
    pRes->info.rows = pBlock->info.rows;
204,415✔
854
    pRes->info.scanFlag = MAIN_SCAN;
204,415✔
855
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
204,415✔
856
    QUERY_CHECK_CODE(code, lino, _end);
204,381!
857

858
    blockDataDestroy(pBlock);
204,381✔
859
    pBlock = NULL;
204,514✔
860
  }
861

862
_end:
17,124,188✔
863
  if (code != TSDB_CODE_SUCCESS) {
17,124,188!
864
    blockDataDestroy(pBlock);
×
865
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
866
  }
867
  return code;
17,124,122✔
868
}
869

870
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
1,788,248✔
871
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,788,248✔
872
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,788,248✔
873

874
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,788,248✔
875
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,788,248✔
876
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
1,788,256✔
877
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
878
         pLoadInfo->totalElapsed / 1000.0);
879

880
  setOperatorCompleted(pOperator);
1,788,256✔
881
}
1,788,279✔
882

883
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
8,589,058✔
884
  int32_t code = TSDB_CODE_SUCCESS;
8,589,058✔
885
  int32_t lino = 0;
8,589,058✔
886
  size_t  total = taosArrayGetSize(pArray);
8,589,058✔
887

888
  int32_t completed = 0;
8,588,556✔
889
  for (int32_t k = 0; k < total; ++k) {
64,517,323✔
890
    SSourceDataInfo* p = taosArrayGet(pArray, k);
55,938,997✔
891
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
55,919,979!
892
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
55,928,767✔
893
      completed += 1;
27,834,451✔
894
    }
895
  }
896

897
  *pRes = completed;
8,578,326✔
898
_end:
8,578,326✔
899
  if (code != TSDB_CODE_SUCCESS) {
8,578,326!
900
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
901
  }
902
  return code;
8,588,644✔
903
}
904

905
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
3,330,101✔
906
  SExchangeInfo* pExchangeInfo = pOperator->info;
3,330,101✔
907
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,330,101✔
908

909
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
3,330,101✔
910
  int64_t startTs = taosGetTimestampUs();
3,330,273✔
911

912
  // Asynchronously send all fetch requests to all sources.
913
  for (int32_t i = 0; i < totalSources; ++i) {
8,275,223✔
914
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
4,944,997✔
915
    if (code != TSDB_CODE_SUCCESS) {
4,944,950!
916
      pTaskInfo->code = code;
×
917
      return code;
×
918
    }
919
  }
920

921
  int64_t endTs = taosGetTimestampUs();
3,330,283✔
922
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
3,330,283✔
923
         totalSources, (endTs - startTs) / 1000.0);
924

925
  pOperator->status = OP_RES_TO_RETURN;
3,330,283✔
926
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
3,330,321✔
927
  if (isTaskKilled(pTaskInfo)) {
3,330,321✔
928
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
28!
929
  }
930

931
  return TSDB_CODE_SUCCESS;
3,330,276✔
932
}
933

934
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
3,549,435✔
935
  int32_t            code = TSDB_CODE_SUCCESS;
3,549,435✔
936
  int32_t            lino = 0;
3,549,435✔
937
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,549,435✔
938
  SSDataBlock*       pb = NULL;
3,549,435✔
939

940
  char* pNextStart = pRetrieveRsp->data;
3,549,435✔
941
  char* pStart = pNextStart;
3,549,435✔
942

943
  int32_t index = 0;
3,549,435✔
944

945
  if (pRetrieveRsp->compressed) {  // decompress the data
3,549,435!
946
    if (pDataInfo->decompBuf == NULL) {
×
947
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
948
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
949
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
950
    } else {
951
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
952
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
953
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
954
        if (p != NULL) {
×
955
          pDataInfo->decompBuf = p;
×
956
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
957
        }
958
      }
959
    }
960
  }
961

962
  while (index++ < pRetrieveRsp->numOfBlocks) {
20,469,603✔
963
    pStart = pNextStart;
16,920,992✔
964

965
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
16,920,992✔
966
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
9,221,989✔
967
      blockDataCleanup(pb);
9,221,923✔
968
    } else {
969
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
7,699,063✔
970
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
7,699,946!
971
    }
972

973
    int32_t compLen = *(int32_t*)pStart;
16,921,123✔
974
    pStart += sizeof(int32_t);
16,921,123✔
975

976
    int32_t rawLen = *(int32_t*)pStart;
16,921,123✔
977
    pStart += sizeof(int32_t);
16,921,123✔
978
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
16,921,123!
979

980
    pNextStart = pStart + compLen;
16,921,123✔
981
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
16,921,123!
982
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
983
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
984
      pStart = pDataInfo->decompBuf;
×
985
    }
986

987
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
16,921,123✔
988
    if (code != 0) {
16,919,728!
989
      taosMemoryFreeClear(pDataInfo->pRsp);
×
990
      goto _end;
×
991
    }
992

993
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
16,919,728✔
994
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
16,920,168!
995
    pb = NULL;
16,920,168✔
996
  }
997

998
_end:
3,548,611✔
999
  if (code != TSDB_CODE_SUCCESS) {
3,548,611!
1000
    blockDataDestroy(pb);
×
1001
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1002
  }
1003
  return code;
3,548,793✔
1004
}
1005

1006
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
2,302✔
1007
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,302✔
1008
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,302✔
1009

1010
  int32_t code = 0;
2,302✔
1011
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2,302✔
1012
  int64_t startTs = taosGetTimestampUs();
2,302✔
1013

1014
  while (1) {
806✔
1015
    if (pExchangeInfo->current >= totalSources) {
3,108✔
1016
      setAllSourcesCompleted(pOperator);
865✔
1017
      return TSDB_CODE_SUCCESS;
865✔
1018
    }
1019

1020
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
2,243✔
1021
    if (!pDataInfo) {
2,243!
1022
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1023
      pTaskInfo->code = terrno;
×
1024
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1025
    }
1026
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,243✔
1027

1028
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
2,243✔
1029
    if (code != TSDB_CODE_SUCCESS) {
2,243!
1030
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1031
      pTaskInfo->code = code;
×
1032
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1033
    }
1034

1035
    code = exchangeWait(pOperator, pExchangeInfo);
2,243✔
1036
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
2,243!
1037
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1038
    }
1039

1040
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2,243✔
1041
    if (!pSource) {
2,243!
1042
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1043
      pTaskInfo->code = terrno;
×
1044
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1045
    }
1046

1047
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2,243!
1048
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
1049
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1050
             tstrerror(pDataInfo->code));
1051
      pOperator->pTaskInfo->code = pDataInfo->code;
×
1052
      return pOperator->pTaskInfo->code;
×
1053
    }
1054

1055
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
2,243✔
1056
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2,243✔
1057

1058
    if (pRsp->numOfRows == 0) {
2,243✔
1059
      qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
806✔
1060
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
1061
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1062
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1063

1064
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
806✔
1065
      if (pDataInfo->isVtbRefScan) {
806!
1066
        pExchangeInfo->current = totalSources;
×
1067
      } else {
1068
        pExchangeInfo->current += 1;
806✔
1069
      }
1070
      taosMemoryFreeClear(pDataInfo->pRsp);
806!
1071
      continue;
806✔
1072
    }
1073

1074
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
1,437✔
1075
    if (code != TSDB_CODE_SUCCESS) {
1,437!
1076
      goto _error;
×
1077
    }
1078

1079
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
1,437✔
1080
    if (pRsp->completed == 1) {
1,437✔
1081
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,230✔
1082
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
1083
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1084
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1085
             pExchangeInfo->current + 1, totalSources);
1086

1087
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,230✔
1088
      if (pDataInfo->isVtbRefScan) {
1,230!
1089
        pExchangeInfo->current = totalSources;
×
1090
      } else {
1091
        pExchangeInfo->current += 1;
1,230✔
1092
      }
1093
    } else {
1094
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
207!
1095
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1096
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1097
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1098
    }
1099
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
1,437!
1100
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
×
1101
    }
1102
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1,437✔
1103
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
1,437✔
1104

1105
    taosMemoryFreeClear(pDataInfo->pRsp);
1,437!
1106
    return TSDB_CODE_SUCCESS;
1,437✔
1107
  }
1108

1109
_error:
×
1110
  pTaskInfo->code = code;
×
1111
  return code;
×
1112
}
1113

1114
void clearVtbScanDataInfo(void* pItem) {
×
1115
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
×
1116
  if (pInfo->colMap) {
×
1117
    taosArrayDestroy(pInfo->colMap->colMap);
×
1118
    taosMemoryFreeClear(pInfo->colMap);
×
1119
  }
1120
  taosArrayDestroy(pInfo->pSrcUidList);
×
1121
}
×
1122

1123
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
1,148✔
1124
  SExchangeInfo*     pExchangeInfo = pOperator->info;
1,148✔
1125
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
1,148✔
1126
  if (NULL == pIdx) {
1,148!
1127
    qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1128
    return TSDB_CODE_INVALID_PARA;
×
1129
  }
1130

1131
  if (pBasicParam->isVtbRefScan) {
1,148!
1132
    SSourceDataInfo dataInfo = {0};
×
1133
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
×
1134
    dataInfo.taskId = pExchangeInfo->pTaskId;
×
1135
    dataInfo.index = pIdx->srcIdx;
×
1136
    dataInfo.window = pBasicParam->window;
×
1137
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1138
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1139
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1140
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1141

1142
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1143
    if (dataInfo.pSrcUidList == NULL) {
×
1144
      return terrno;
×
1145
    }
1146

1147
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
×
1148
    dataInfo.srcOpType = pBasicParam->srcOpType;
×
1149
    dataInfo.tableSeq = pBasicParam->tableSeq;
×
1150

1151
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
×
1152
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
×
1153
    if (!tmp) {
×
1154
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1155
      return terrno;
×
1156
    }
1157
  } else {
1158
    if (pIdx->inUseIdx < 0) {
1,148✔
1159
      SSourceDataInfo dataInfo = {0};
1,124✔
1160
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,124✔
1161
      dataInfo.taskId = pExchangeInfo->pTaskId;
1,124✔
1162
      dataInfo.index = pIdx->srcIdx;
1,124✔
1163
      if (pBasicParam->isVtbRefScan) {
1,124!
1164
        dataInfo.window = pBasicParam->window;
×
1165
        dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1166
        dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1167
        tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1168
        dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1169
      }
1170

1171
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,124✔
1172
      if (dataInfo.pSrcUidList == NULL) {
1,124!
1173
        return terrno;
×
1174
      }
1175

1176
      dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
1,124✔
1177
      dataInfo.srcOpType = pBasicParam->srcOpType;
1,124✔
1178
      dataInfo.tableSeq = pBasicParam->tableSeq;
1,124✔
1179

1180
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
1,124✔
1181
      if (!tmp) {
1,124!
1182
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1183
        return terrno;
×
1184
      }
1185
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,124✔
1186
    } else {
1187
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
24✔
1188
      if (!pDataInfo) {
24!
1189
        return terrno;
×
1190
      }
1191
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
24!
1192
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
24✔
1193
      }
1194

1195
      if (pBasicParam->isVtbRefScan) {
24!
1196
        pDataInfo->window = pBasicParam->window;
×
1197
        if (!pDataInfo->colMap) {
×
1198
          pDataInfo->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1199
        }
1200
        pDataInfo->colMap->vgId = pBasicParam->colMap->vgId;
×
1201
        tstrncpy(pDataInfo->colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1202
        pDataInfo->colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1203
      }
1204

1205
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
24✔
1206
      if (pDataInfo->pSrcUidList == NULL) {
24!
1207
        return terrno;
×
1208
      }
1209

1210
      pDataInfo->isVtbRefScan = pBasicParam->isVtbRefScan;
24✔
1211

1212
      pDataInfo->srcOpType = pBasicParam->srcOpType;
24✔
1213
      pDataInfo->tableSeq = pBasicParam->tableSeq;
24✔
1214
    }
1215
  }
1216

1217
  return TSDB_CODE_SUCCESS;
1,148✔
1218
}
1219

1220
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
743✔
1221
  SExchangeInfo*               pExchangeInfo = pOperator->info;
743✔
1222
  int32_t                      code = TSDB_CODE_SUCCESS;
743✔
1223
  SExchangeOperatorBasicParam* pBasicParam = NULL;
743✔
1224
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
743✔
1225
  if (pParam->multiParams) {
743✔
1226
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
695✔
1227
    int32_t                      iter = 0;
695✔
1228
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
1,795✔
1229
      code = addSingleExchangeSource(pOperator, pBasicParam);
1,100✔
1230
      if (code) {
1,100!
1231
        return code;
×
1232
      }
1233
    }
1234
  } else {
1235
    pBasicParam = &pParam->basic;
48✔
1236
    code = addSingleExchangeSource(pOperator, pBasicParam);
48✔
1237
  }
1238

1239
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
743✔
1240
  pOperator->pOperatorGetParam = NULL;
743✔
1241

1242
  return code;
743✔
1243
}
1244

1245
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
22,504,174✔
1246
  SExchangeInfo* pExchangeInfo = pOperator->info;
22,504,174✔
1247
  int32_t        code = TSDB_CODE_SUCCESS;
22,504,174✔
1248
  int32_t        lino = 0;
22,504,174✔
1249
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
22,504,174✔
1250
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
3,333,911✔
1251
    return TSDB_CODE_SUCCESS;
19,173,575✔
1252
  }
1253

1254
  if (pExchangeInfo->dynamicOp) {
3,330,599✔
1255
    code = addDynamicExchangeSource(pOperator);
743✔
1256
    QUERY_CHECK_CODE(code, lino, _end);
743!
1257
  }
1258

1259
  if (pOperator->status == OP_NOT_OPENED && pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
3,330,599!
1260
    pExchangeInfo->current = 0;
×
1261
  }
1262

1263
  int64_t st = taosGetTimestampUs();
3,331,121✔
1264

1265
  if (!pExchangeInfo->seqLoadData) {
3,331,121✔
1266
    code = prepareConcurrentlyLoad(pOperator);
3,330,232✔
1267
    QUERY_CHECK_CODE(code, lino, _end);
3,330,280!
1268
    pExchangeInfo->openedTs = taosGetTimestampUs();
3,330,312✔
1269
  }
1270

1271
  OPTR_SET_OPENED(pOperator);
3,331,201✔
1272
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3,331,190✔
1273

1274
_end:
3,331,190✔
1275
  if (code != TSDB_CODE_SUCCESS) {
3,331,190!
1276
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1277
    pOperator->pTaskInfo->code = code;
×
1278
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1279
  }
1280
  return TSDB_CODE_SUCCESS;
3,331,190✔
1281
}
1282

1283
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
23,553✔
1284
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
23,553✔
1285

1286
  if (pLimitInfo->remainGroupOffset > 0) {
23,553!
1287
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1288
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1289
      blockDataCleanup(pBlock);
×
1290
      return PROJECT_RETRIEVE_CONTINUE;
×
1291
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1292
      // now it is the data from a new group
1293
      pLimitInfo->remainGroupOffset -= 1;
×
1294

1295
      // ignore data block in current group
1296
      if (pLimitInfo->remainGroupOffset > 0) {
×
1297
        blockDataCleanup(pBlock);
×
1298
        return PROJECT_RETRIEVE_CONTINUE;
×
1299
      }
1300
    }
1301

1302
    // set current group id of the project operator
1303
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1304
  }
1305

1306
  // here check for a new group data, we need to handle the data of the previous group.
1307
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
23,553✔
1308
    pLimitInfo->numOfOutputGroups += 1;
833✔
1309
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
833!
1310
      pOperator->status = OP_EXEC_DONE;
×
1311
      blockDataCleanup(pBlock);
×
1312

1313
      return PROJECT_RETRIEVE_DONE;
×
1314
    }
1315

1316
    // reset the value for a new group data
1317
    resetLimitInfoForNextGroup(pLimitInfo);
833✔
1318
    // existing rows that belongs to previous group.
1319
    if (pBlock->info.rows > 0) {
833!
1320
      return PROJECT_RETRIEVE_DONE;
833✔
1321
    }
1322
  }
1323

1324
  // here we reach the start position, according to the limit/offset requirements.
1325

1326
  // set current group id
1327
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
22,720✔
1328

1329
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
22,720✔
1330
  if (pBlock->info.rows == 0) {
22,720✔
1331
    return PROJECT_RETRIEVE_CONTINUE;
9,170✔
1332
  } else {
1333
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
13,550!
1334
      setOperatorCompleted(pOperator);
×
1335
      return PROJECT_RETRIEVE_DONE;
×
1336
    }
1337
  }
1338

1339
  // todo optimize performance
1340
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1341
  // they may not belong to the same group the limit/offset value is not valid in this case.
1342
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
13,550!
1343
    return PROJECT_RETRIEVE_DONE;
13,550✔
1344
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1345
    return PROJECT_RETRIEVE_CONTINUE;
×
1346
  }
1347
}
1348

1349
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
5,323,359✔
1350
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
5,323,359✔
1351
  int32_t        code = TSDB_CODE_SUCCESS;
5,323,359✔
1352
  if (pTask->pWorkerCb) {
5,323,359!
1353
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
5,323,478✔
1354
    if (code != TSDB_CODE_SUCCESS) {
5,324,409!
1355
      pTask->code = code;
×
1356
      return pTask->code;
×
1357
    }
1358
  }
1359

1360
  code = tsem_wait(&pExchangeInfo->ready);
5,324,290✔
1361
  if (code != TSDB_CODE_SUCCESS) {
5,324,287!
1362
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1363
    pTask->code = code;
×
1364
    return pTask->code;
×
1365
  }
1366

1367
  if (pTask->pWorkerCb) {
5,324,315✔
1368
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
5,324,312✔
1369
    if (code != TSDB_CODE_SUCCESS) {
5,324,427!
1370
      pTask->code = code;
×
1371
      return pTask->code;
×
1372
    }
1373
  }
1374
  return TSDB_CODE_SUCCESS;
5,324,430✔
1375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc