• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3842

07 Apr 2025 11:21AM UTC coverage: 62.696% (-0.3%) from 63.027%
#3842

push

travis-ci

web-flow
merge: from main to 3.0 branch (#30679)

154855 of 315075 branches covered (49.15%)

Branch coverage included in aggregate %.

6 of 8 new or added lines in 5 files covered. (75.0%)

2309 existing lines in 130 files now uncovered.

240176 of 314995 relevant lines covered (76.25%)

19119980.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.76
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
} SFetchRspHandleWrapper;
32

33
typedef struct SSourceDataInfo {
34
  int32_t            index;
35
  SRetrieveTableRsp* pRsp;
36
  uint64_t           totalRows;
37
  int64_t            startTime;
38
  int32_t            code;
39
  EX_SOURCE_STATUS   status;
40
  const char*        taskId;
41
  SArray*            pSrcUidList;
42
  int32_t            srcOpType;
43
  bool               tableSeq;
44
  char*              decompBuf;
45
  int32_t            decompBufSize;
46
  SOrgTbInfo*     colMap;
47
  bool               isVtbRefScan;
48
  STimeWindow        window;
49
} SSourceDataInfo;
50

51
static void destroyExchangeOperatorInfo(void* param);
52
static void freeBlock(void* pParam);
53
static void freeSourceDataInfo(void* param);
54
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
55

56
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
57
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
58
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
59
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
60
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
61
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
62
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
63
                                 bool holdDataInBuf);
64
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
65

66
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
67

68
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
7,332,514✔
69
                                           SExecTaskInfo* pTaskInfo) {
70
  int32_t code = 0;
7,332,514✔
71
  int32_t lino = 0;
7,332,514✔
72
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
7,332,514✔
73
  int32_t completed = 0;
7,332,034✔
74
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
7,332,034✔
75
  if (code != TSDB_CODE_SUCCESS) {
7,332,998!
76
    pTaskInfo->code = code;
×
77
    T_LONG_JMP(pTaskInfo->env, code);
×
78
  }
79
  if (completed == totalSources) {
7,332,998✔
80
    setAllSourcesCompleted(pOperator);
1,956,514✔
81
    return;
7,333,481✔
82
  }
83

84
  SSourceDataInfo* pDataInfo = NULL;
5,376,484✔
85

86
  while (1) {
305,225✔
87
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
5,681,709✔
88
    code = exchangeWait(pOperator, pExchangeInfo);
5,681,709✔
89

90
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
5,682,790!
91
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
25!
92
    }
93

94
    for (int32_t i = 0; i < totalSources; ++i) {
28,177,804✔
95
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
28,163,683✔
96
      QUERY_CHECK_NULL(pDataInfo, code, lino, _error, terrno);
28,157,003!
97
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
28,175,335✔
98
        continue;
21,629,602✔
99
      }
100

101
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
6,545,733✔
102
        continue;
865,532✔
103
      }
104

105
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
5,680,201✔
106
        code = pDataInfo->code;
6✔
107
        goto _error;
6✔
108
      }
109

110
      tmemory_barrier();
5,680,195✔
111
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
5,680,195✔
112
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
5,680,195✔
113
      QUERY_CHECK_NULL(pSource, code, lino, _error, terrno);
5,682,055!
114

115
      // todo
116
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
5,682,184✔
117
      if (pRsp->numOfRows == 0) {
5,682,184✔
118
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
2,047,824!
119
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
120
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
121
          if (code != TSDB_CODE_SUCCESS) {
×
122
            taosMemoryFreeClear(pDataInfo->pRsp);
×
123
            goto _error;
×
124
          }
125
        } else {
126
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,047,824✔
127
          qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
2,047,824✔
128
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
129
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
130
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
131
          taosMemoryFreeClear(pDataInfo->pRsp);
2,047,819!
132
        }
133
        break;
2,047,835✔
134
      }
135

136
      code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,634,360✔
137
      if (code != TSDB_CODE_SUCCESS) {
3,633,297!
138
        goto _error;
×
139
      }
140

141
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,633,297✔
142
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
3,633,297✔
143
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,633,643✔
144

145
      if (pRsp->completed == 1) {
3,633,643✔
146
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
3,316,148✔
147
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
3,316,148✔
148
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
149
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu,
150
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
151
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
152
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
153
      } else {
154
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
317,495✔
155
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
156
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
157
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
158
      }
159

160
      taosMemoryFreeClear(pDataInfo->pRsp);
3,633,548!
161

162
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan) {
3,634,380!
163
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
317,513✔
164
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
317,513✔
165
        if (code != TSDB_CODE_SUCCESS) {
317,513!
166
          taosMemoryFreeClear(pDataInfo->pRsp);
×
167
          goto _error;
×
168
        }
169
      }
170
      return;
5,376,947✔
171
    }  // end loop
172

173
    int32_t complete1 = 0;
2,061,956✔
174
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
2,061,956✔
175
    if (code != TSDB_CODE_SUCCESS) {
2,047,778!
176
      pTaskInfo->code = code;
×
177
      T_LONG_JMP(pTaskInfo->env, code);
×
178
    }
179
    if (complete1 == totalSources) {
2,047,788✔
180
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
1,742,563✔
181
      return;
1,742,567✔
182
    }
183
  }
184

185
_error:
6✔
186
  pTaskInfo->code = code;
6✔
187
}
188

189
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
16,872,914✔
190
  int32_t        code = TSDB_CODE_SUCCESS;
16,872,914✔
191
  SExchangeInfo* pExchangeInfo = pOperator->info;
16,872,914✔
192
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
16,872,914✔
193

194
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
16,872,914✔
195

196
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
16,872,639✔
197
  if (pOperator->status == OP_EXEC_DONE) {
16,872,639!
198
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
199
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
200
           pLoadInfo->totalElapsed / 1000.0);
201
    return NULL;
×
202
  }
203

204
  // we have buffered retrieved datablock, return it directly
205
  SSDataBlock* p = NULL;
16,872,639✔
206
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
16,872,639✔
207
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
9,538,104✔
208
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
9,537,942✔
209
  }
210

211
  if (p != NULL) {
16,873,065✔
212
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
9,538,304✔
213
    if (!tmp) {
9,538,319!
214
      code = terrno;
×
215
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
216
      pTaskInfo->code = code;
×
217
      T_LONG_JMP(pTaskInfo->env, code);
×
218
    }
219
    return p;
9,538,319✔
220
  } else {
221
    if (pExchangeInfo->seqLoadData) {
7,334,761✔
222
      code = seqLoadRemoteData(pOperator);
2,239✔
223
      if (code != TSDB_CODE_SUCCESS) {
2,239!
224
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
225
        pTaskInfo->code = code;
×
226
        T_LONG_JMP(pTaskInfo->env, code);
×
227
      }
228
    } else {
229
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
7,332,522✔
230
    }
231
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
7,335,469✔
232
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
6!
233
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
6!
234
    }
235
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
7,335,463✔
236
      return NULL;
3,699,872✔
237
    } else {
238
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
3,635,360✔
239
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
3,634,721✔
240
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
3,634,502✔
241
      if (!tmp) {
3,634,473!
242
        code = terrno;
×
243
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
244
        pTaskInfo->code = code;
×
245
        T_LONG_JMP(pTaskInfo->env, code);
×
246
      }
247
      return p;
3,634,473✔
248
    }
249
  }
250
}
251

252
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
16,864,616✔
253
  int32_t        code = TSDB_CODE_SUCCESS;
16,864,616✔
254
  int32_t        lino = 0;
16,864,616✔
255
  SExchangeInfo* pExchangeInfo = pOperator->info;
16,864,616✔
256
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
16,864,616✔
257

258
  code = pOperator->fpSet._openFn(pOperator);
16,864,616✔
259
  QUERY_CHECK_CODE(code, lino, _end);
16,864,634!
260

261
  if (pOperator->status == OP_EXEC_DONE) {
16,864,634✔
262
    (*ppRes) = NULL;
63✔
263
    return code;
63✔
264
  }
265

266
  while (1) {
8,464✔
267
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
16,873,035✔
268
    if (pBlock == NULL) {
16,871,728✔
269
      (*ppRes) = NULL;
3,699,873✔
270
      return code;
3,699,873✔
271
    }
272

273
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
13,171,855✔
274
    QUERY_CHECK_CODE(code, lino, _end);
13,172,466!
275

276
    if (blockDataGetNumOfRows(pBlock) == 0) {
13,172,466✔
277
      continue;
2✔
278
    }
279

280
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
13,172,507✔
281
    if (hasLimitOffsetInfo(pLimitInfo)) {
13,172,507✔
282
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
22,071✔
283
      if (status == PROJECT_RETRIEVE_CONTINUE) {
22,039✔
284
        continue;
8,462✔
285
      } else if (status == PROJECT_RETRIEVE_DONE) {
13,577!
286
        if (pBlock->info.rows == 0) {
13,577!
287
          setOperatorCompleted(pOperator);
×
288
          (*ppRes) = NULL;
×
289
          return code;
×
290
        } else {
291
          (*ppRes) = pBlock;
13,577✔
292
          return code;
13,577✔
293
        }
294
      }
295
    } else {
296
      (*ppRes) = pBlock;
13,150,580✔
297
      return code;
13,150,580✔
298
    }
299
  }
300

301
_end:
×
302
  if (code != TSDB_CODE_SUCCESS) {
×
303
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
304
    pTaskInfo->code = code;
×
305
    T_LONG_JMP(pTaskInfo->env, code);
×
306
  }
307
  (*ppRes) = NULL;
×
308
  return code;
×
309
}
310

311
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
3,782,304✔
312
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
3,782,304✔
313
  if (pInfo->pSourceDataInfo == NULL) {
3,782,171!
314
    return terrno;
×
315
  }
316

317
  if (pInfo->dynamicOp) {
3,782,195✔
318
    return TSDB_CODE_SUCCESS;
22,046✔
319
  }
320

321
  int32_t len = strlen(id) + 1;
3,760,149✔
322
  pInfo->pTaskId = taosMemoryCalloc(1, len);
3,760,149!
323
  if (!pInfo->pTaskId) {
3,760,357!
324
    return terrno;
×
325
  }
326
  tstrncpy(pInfo->pTaskId, id, len);
3,760,357✔
327
  for (int32_t i = 0; i < numOfSources; ++i) {
9,126,514✔
328
    SSourceDataInfo dataInfo = {0};
5,366,213✔
329
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
5,366,213✔
330
    dataInfo.taskId = pInfo->pTaskId;
5,366,213✔
331
    dataInfo.index = i;
5,366,213✔
332
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
5,366,213✔
333
    if (pDs == NULL) {
5,366,157!
334
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
335
      return terrno;
×
336
    }
337
  }
338

339
  return TSDB_CODE_SUCCESS;
3,760,301✔
340
}
341

342
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
3,782,285✔
343
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
3,782,285!
344

345
  if (numOfSources == 0) {
3,782,285!
346
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
347
    return TSDB_CODE_INVALID_PARA;
×
348
  }
349
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
3,782,285✔
350
  if (!pInfo->pFetchRpcHandles) {
3,782,439!
351
    return terrno;
×
352
  }
353
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
3,782,439✔
354
  if (!ret) {
3,782,317!
355
    return terrno;
×
356
  }
357

358
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
3,782,317✔
359
  if (pInfo->pSources == NULL) {
3,782,150!
360
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
361
    return terrno;
×
362
  }
363

364
  if (pExNode->node.dynamicOp) {
3,782,150✔
365
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
22,046✔
366
    if (NULL == pInfo->pHashSources) {
22,054!
367
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
368
      return terrno;
×
369
    }
370
  }
371

372
  for (int32_t i = 0; i < numOfSources; ++i) {
9,192,709✔
373
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
5,410,358✔
374
    if (!pNode) {
5,410,983!
375
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
376
      return terrno;
×
377
    }
378
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
5,410,983✔
379
    if (!tmp) {
5,410,187!
380
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
381
      return terrno;
×
382
    }
383
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
5,410,187✔
384
    int32_t           code =
385
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
5,410,187✔
386
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
5,410,551!
387
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
388
      return code;
×
389
    }
390
  }
391

392
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
3,782,351✔
393
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
3,782,306✔
394
  if (refId < 0) {
3,782,332!
395
    int32_t code = terrno;
×
396
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
397
    return code;
×
398
  } else {
399
    pInfo->self = refId;
3,782,332✔
400
  }
401

402
  return initDataSource(numOfSources, pInfo, id);
3,782,332✔
403
}
404

405
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
3,782,294✔
406
                                   SOperatorInfo** pOptrInfo) {
407
  QRY_PARAM_CHECK(pOptrInfo);
3,782,294!
408

409
  int32_t        code = 0;
3,782,294✔
410
  int32_t        lino = 0;
3,782,294✔
411
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
3,782,294!
412
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,782,266!
413
  if (pInfo == NULL || pOperator == NULL) {
3,782,279!
414
    code = terrno;
×
415
    goto _error;
×
416
  }
417

418
  pInfo->dynamicOp = pExNode->node.dynamicOp;
3,782,285✔
419
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
3,782,285✔
420
  QUERY_CHECK_CODE(code, lino, _error);
3,782,204!
421

422
  code = tsem_init(&pInfo->ready, 0, 0);
3,782,204✔
423
  QUERY_CHECK_CODE(code, lino, _error);
3,782,222!
424

425
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
3,782,222✔
426
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
3,782,344!
427

428
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
3,782,344✔
429
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
3,782,264!
430
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
3,782,264✔
431
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
3,782,248!
432

433
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
3,782,248✔
434
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
3,782,248✔
435
  QUERY_CHECK_CODE(code, lino, _error);
3,782,419!
436

437
  pInfo->seqLoadData = pExNode->seqRecvData;
3,782,419✔
438
  pInfo->pTransporter = pTransporter;
3,782,419✔
439

440
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
3,782,419✔
441
                  pTaskInfo);
442
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
3,782,369✔
443

444
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
3,782,320✔
445
  QUERY_CHECK_CODE(code, lino, _error);
3,782,282!
446

447
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
3,782,282✔
448
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
449
  *pOptrInfo = pOperator;
3,782,260✔
450
  return TSDB_CODE_SUCCESS;
3,782,260✔
451

452
_error:
×
453
  if (code != TSDB_CODE_SUCCESS) {
×
454
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
455
    pTaskInfo->code = code;
×
456
  }
457
  if (pInfo != NULL) {
×
458
    doDestroyExchangeOperatorInfo(pInfo);
×
459
  }
460

461
  if (pOperator != NULL) {
×
462
    pOperator->info = NULL;
×
463
    destroyOperator(pOperator);
×
464
  }
465
  return code;
×
466
}
467

468
void destroyExchangeOperatorInfo(void* param) {
3,782,363✔
469
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,782,363✔
470
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
3,782,363✔
471
  if (code != TSDB_CODE_SUCCESS) {
3,782,477!
472
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
473
  }
474
}
3,782,477✔
475

476
void freeBlock(void* pParam) {
6,386,571✔
477
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
6,386,571✔
478
  blockDataDestroy(pBlock);
6,386,571✔
479
}
6,386,620✔
480

481
void freeSourceDataInfo(void* p) {
5,367,994✔
482
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
5,367,994✔
483
  taosMemoryFreeClear(pInfo->decompBuf);
5,367,994!
484
  taosMemoryFreeClear(pInfo->pRsp);
5,367,994!
485

486
  pInfo->decompBufSize = 0;
5,367,994✔
487
}
5,367,994✔
488

489
void doDestroyExchangeOperatorInfo(void* param) {
3,782,429✔
490
  if (param == NULL) {
3,782,429!
491
    return;
×
492
  }
493
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,782,429✔
494
  if (pExInfo->pFetchRpcHandles) {
3,782,429!
495
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
9,192,736✔
496
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
5,410,397✔
497
      if (*pRpcHandle > 0) {
5,410,264✔
498
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
3,853✔
499
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
3,853✔
500
      }
501
    }
502
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
3,782,339✔
503
  }
504

505
  taosArrayDestroy(pExInfo->pSources);
3,782,329✔
506
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3,782,418✔
507

508
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
3,782,367✔
509
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
3,782,435✔
510

511
  blockDataDestroy(pExInfo->pDummyBlock);
3,782,443✔
512
  tSimpleHashCleanup(pExInfo->pHashSources);
3,782,463✔
513

514
  int32_t code = tsem_destroy(&pExInfo->ready);
3,782,440✔
515
  if (code != TSDB_CODE_SUCCESS) {
3,782,394!
516
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
517
  }
518
  taosMemoryFreeClear(pExInfo->pTaskId);
3,782,394!
519

520
  taosMemoryFreeClear(param);
3,782,393!
521
}
522

523
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
5,681,732✔
524
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
5,681,732✔
525

526
  taosMemoryFreeClear(pMsg->pEpSet);
5,681,732!
527
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
5,681,732✔
528
  if (pExchangeInfo == NULL) {
5,683,226✔
529
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
69!
530
    taosMemoryFree(pMsg->pData);
69!
531
    return TSDB_CODE_SUCCESS;
69✔
532
  }
533

534
  int32_t          index = pWrapper->sourceIndex;
5,683,157✔
535
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
5,683,157✔
536

537
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
5,676,886✔
538
  if (pRpcHandle != NULL) {
5,673,577!
539
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
5,673,779✔
540
    if (ret != 0) {
5,684,797✔
541
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
19,473✔
542
    }
543
    *pRpcHandle = -1;
5,684,085✔
544
  }
545

546
  if (!pSourceDataInfo) {
5,683,883!
547
    return terrno;
×
548
  }
549

550
  if (code == TSDB_CODE_SUCCESS) {
5,683,883✔
551
    pSourceDataInfo->pRsp = pMsg->pData;
5,681,000✔
552

553
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
5,681,000✔
554
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
5,681,000✔
555
    pRsp->compLen = htonl(pRsp->compLen);
5,679,684✔
556
    pRsp->payloadLen = htonl(pRsp->payloadLen);
5,679,684✔
557
    pRsp->numOfCols = htonl(pRsp->numOfCols);
5,679,684✔
558
    pRsp->useconds = htobe64(pRsp->useconds);
5,679,684✔
559
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
5,682,745✔
560

561
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
5,682,745✔
562
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
563
  } else {
564
    taosMemoryFree(pMsg->pData);
2,883!
565
    pSourceDataInfo->code = rpcCvtErrCode(code);
6✔
566
    if (pSourceDataInfo->code != code) {
6!
567
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
568
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
569
    } else {
570
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
6!
571
             pExchangeInfo);
572
    }
573
  }
574

575
  tmemory_barrier();
5,682,763✔
576
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
5,682,763✔
577
  code = tsem_post(&pExchangeInfo->ready);
5,682,763✔
578
  if (code != TSDB_CODE_SUCCESS) {
5,683,812!
579
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
580
    return code;
×
581
  }
582

583
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
5,683,812✔
584
  if (code != TSDB_CODE_SUCCESS) {
5,684,267!
585
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
586
  }
587
  return code;
5,684,062✔
588
}
589

590
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
1,043✔
591
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,043!
592
  if (NULL == *ppRes) {
1,043!
593
    return terrno;
×
594
  }
595

596
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
1,043!
597
  if (NULL == pScan) {
1,043!
598
    taosMemoryFreeClear(*ppRes);
×
599
    return terrno;
×
600
  }
601

602
  pScan->pUidList = taosArrayDup(pUidList, NULL);
1,043✔
603
  if (NULL == pScan->pUidList) {
1,043!
604
    taosMemoryFree(pScan);
×
605
    taosMemoryFreeClear(*ppRes);
×
606
    return terrno;
×
607
  }
608
  pScan->tableSeq = tableSeq;
1,043✔
609
  pScan->pOrgTbInfo = NULL;
1,043✔
610
  pScan->window.skey = INT64_MAX;
1,043✔
611
  pScan->window.ekey = INT64_MIN;
1,043✔
612

613
  (*ppRes)->opType = srcOpType;
1,043✔
614
  (*ppRes)->downstreamIdx = 0;
1,043✔
615
  (*ppRes)->value = pScan;
1,043✔
616
  (*ppRes)->pChildren = NULL;
1,043✔
617
  (*ppRes)->reUse = false;
1,043✔
618

619
  return TSDB_CODE_SUCCESS;
1,043✔
620
}
621

622
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window) {
×
623
  int32_t                  code = TSDB_CODE_SUCCESS;
×
624
  int32_t                  lino = 0;
×
625
  STableScanOperatorParam* pScan = NULL;
×
626

627
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
628
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
629

630
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
×
631
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
×
632

633
  pScan->pUidList = taosArrayDup(pUidList, NULL);
×
634
  QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
×
635

636
  pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
637
  QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
×
638

639
  pScan->pOrgTbInfo->vgId = pMap->vgId;
×
640
  tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
641

642
  pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
×
643
  QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
×
644

645
  pScan->tableSeq = tableSeq;
×
646
  pScan->window.skey = window->skey;
×
647
  pScan->window.ekey = window->ekey;
×
648

649
  (*ppRes)->opType = srcOpType;
×
650
  (*ppRes)->downstreamIdx = 0;
×
651
  (*ppRes)->value = pScan;
×
652
  (*ppRes)->pChildren = NULL;
×
653
  (*ppRes)->reUse = false;
×
654

655
  return code;
×
656
_return:
×
657
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
658
  taosMemoryFreeClear(*ppRes);
×
659
  if (pScan) {
×
660
    taosArrayDestroy(pScan->pUidList);
×
661
    if (pScan->pOrgTbInfo) {
×
662
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
663
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
664
    }
665
    taosMemoryFree(pScan);
×
666
  }
667
  return code;
×
668
}
669

670
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
5,685,360✔
671
  int32_t          code = TSDB_CODE_SUCCESS;
5,685,360✔
672
  int32_t          lino = 0;
5,685,360✔
673
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
5,685,360✔
674
  if (!pDataInfo) {
5,684,850!
675
    return terrno;
×
676
  }
677

678
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
5,684,937!
679
    return TSDB_CODE_SUCCESS;
×
680
  }
681

682
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
5,684,937✔
683
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
5,684,937✔
684
  if (!pSource) {
5,684,584!
685
    return terrno;
×
686
  }
687

688
  pDataInfo->startTime = taosGetTimestampUs();
5,685,318✔
689
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
5,685,318✔
690

691
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
5,685,330!
692
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
5,685,958!
693
  pWrapper->exchangeId = pExchangeInfo->self;
5,685,958✔
694
  pWrapper->sourceIndex = sourceIndex;
5,685,958✔
695

696
  if (pSource->localExec) {
5,685,958✔
697
    SDataBuf pBuf = {0};
16,045✔
698
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
16,045✔
699
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
700
                                               pTaskInfo->localFetch.explainRes);
701
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
16,045✔
702
    taosMemoryFree(pWrapper);
16,045!
703
    QUERY_CHECK_CODE(code, lino, _end);
16,045!
704
  } else {
705
    SResFetchReq req = {0};
5,669,913✔
706
    req.header.vgId = pSource->addr.nodeId;
5,669,913✔
707
    req.sId = pSource->sId;
5,669,913✔
708
    req.clientId = pSource->clientId;
5,669,913✔
709
    req.taskId = pSource->taskId;
5,669,913✔
710
    req.queryId = pTaskInfo->id.queryId;
5,669,913✔
711
    req.execId = pSource->execId;
5,669,913✔
712
    if (pDataInfo->isVtbRefScan) {
5,669,913!
713
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window);
×
714
      taosArrayDestroy(pDataInfo->colMap->colMap);
×
715
      taosMemoryFreeClear(pDataInfo->colMap);
×
716
      taosArrayDestroy(pDataInfo->pSrcUidList);
×
717
      pDataInfo->pSrcUidList = NULL;
×
718
      if (TSDB_CODE_SUCCESS != code) {
×
719
        pTaskInfo->code = code;
×
720
        taosMemoryFree(pWrapper);
×
721
        return pTaskInfo->code;
×
722
      }
723
    } else {
724
      if (pDataInfo->pSrcUidList) {
5,669,913✔
725
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
994✔
726
        taosArrayDestroy(pDataInfo->pSrcUidList);
994✔
727
        pDataInfo->pSrcUidList = NULL;
994✔
728
        if (TSDB_CODE_SUCCESS != code) {
994!
729
          pTaskInfo->code = code;
×
730
          taosMemoryFree(pWrapper);
×
731
          return pTaskInfo->code;
×
732
        }
733
      }
734
    }
735

736
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
5,669,913✔
737
    if (msgSize < 0) {
5,669,547!
738
      pTaskInfo->code = msgSize;
×
739
      taosMemoryFree(pWrapper);
×
740
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
741
      return pTaskInfo->code;
×
742
    }
743

744
    void* msg = taosMemoryCalloc(1, msgSize);
5,669,547!
745
    if (NULL == msg) {
5,669,722!
746
      pTaskInfo->code = terrno;
×
747
      taosMemoryFree(pWrapper);
×
748
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
749
      return pTaskInfo->code;
×
750
    }
751

752
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req);
5,669,722✔
753
    if (msgSize < 0) {
5,669,111!
754
      pTaskInfo->code = msgSize;
×
755
      taosMemoryFree(pWrapper);
×
756
      taosMemoryFree(msg);
×
757
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
758
      return pTaskInfo->code;
×
759
    }
760

761
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
5,669,111✔
762

763
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
5,669,070✔
764
           ", execId:%d, %p, %d/%" PRIzu,
765
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
766
           pSource->taskId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
767

768
    // send the fetch remote task result reques
769
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,669,077!
770
    if (NULL == pMsgSendInfo) {
5,669,454!
771
      taosMemoryFreeClear(msg);
×
772
      taosMemoryFree(pWrapper);
×
773
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
774
      pTaskInfo->code = terrno;
×
775
      return pTaskInfo->code;
×
776
    }
777

778
    pMsgSendInfo->param = pWrapper;
5,669,454✔
779
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,669,454✔
780
    pMsgSendInfo->msgInfo.pData = msg;
5,669,454✔
781
    pMsgSendInfo->msgInfo.len = msgSize;
5,669,454✔
782
    pMsgSendInfo->msgType = pSource->fetchMsgType;
5,669,454✔
783
    pMsgSendInfo->fp = loadRemoteDataCallback;
5,669,454✔
784

785
    int64_t transporterId = 0;
5,669,454✔
786
    void* poolHandle = NULL;
5,669,454✔
787
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
5,669,454✔
788
    QUERY_CHECK_CODE(code, lino, _end);
5,670,002!
789
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
5,670,002✔
790
    *pRpcHandle = transporterId;
5,669,542✔
791
  }
792

793
_end:
5,685,587✔
794
  if (code != TSDB_CODE_SUCCESS) {
5,685,587!
795
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
796
  }
797
  return code;
5,685,546✔
798
}
799

800
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
3,830,920✔
801
                          SOperatorInfo* pOperator) {
802
  pInfo->totalRows += numOfRows;
3,830,920✔
803
  pInfo->totalSize += dataLen;
3,830,920✔
804
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
3,831,294✔
805
  pOperator->resultInfo.totalRows += numOfRows;
3,831,294✔
806
}
3,831,294✔
807

808
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
13,478,607✔
809
  int32_t      code = TSDB_CODE_SUCCESS;
13,478,607✔
810
  int32_t      lino = 0;
13,478,607✔
811
  SSDataBlock* pBlock = NULL;
13,478,607✔
812
  if (pColList == NULL) {  // data from other sources
13,478,607✔
813
    blockDataCleanup(pRes);
13,282,626✔
814
    code = blockDecode(pRes, pData, (const char**)pNextStart);
13,282,140✔
815
    if (code) {
13,281,386!
816
      return code;
×
817
    }
818
  } else {  // extract data according to pColList
819
    char* pStart = pData;
195,981✔
820

821
    int32_t numOfCols = htonl(*(int32_t*)pStart);
195,981✔
822
    pStart += sizeof(int32_t);
195,981✔
823

824
    // todo refactor:extract method
825
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
195,981✔
826
    for (int32_t i = 0; i < numOfCols; ++i) {
3,354,589✔
827
      SSysTableSchema* p = (SSysTableSchema*)pStart;
3,158,608✔
828

829
      p->colId = htons(p->colId);
3,158,608✔
830
      p->bytes = htonl(p->bytes);
3,158,608✔
831
      pStart += sizeof(SSysTableSchema);
3,158,608✔
832
    }
833

834
    pBlock = NULL;
195,981✔
835
    code = createDataBlock(&pBlock);
195,981✔
836
    QUERY_CHECK_CODE(code, lino, _end);
196,233!
837

838
    for (int32_t i = 0; i < numOfCols; ++i) {
3,346,593✔
839
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
3,150,469✔
840
      code = blockDataAppendColInfo(pBlock, &idata);
3,159,140✔
841
      QUERY_CHECK_CODE(code, lino, _end);
3,150,360!
842
    }
843

844
    const char* pDummy = NULL;
196,124✔
845
    code = blockDecode(pBlock, pStart, &pDummy);
196,124✔
846
    QUERY_CHECK_CODE(code, lino, _end);
196,160!
847

848
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
196,160✔
849
    QUERY_CHECK_CODE(code, lino, _end);
196,195!
850

851
    // data from mnode
852
    pRes->info.dataLoad = 1;
196,195✔
853
    pRes->info.rows = pBlock->info.rows;
196,195✔
854
    pRes->info.scanFlag = MAIN_SCAN;
196,195✔
855
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
196,195✔
856
    QUERY_CHECK_CODE(code, lino, _end);
196,172!
857

858
    blockDataDestroy(pBlock);
196,172✔
859
    pBlock = NULL;
196,288✔
860
  }
861

862
_end:
13,477,674✔
863
  if (code != TSDB_CODE_SUCCESS) {
13,477,674!
864
    blockDataDestroy(pBlock);
×
865
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
866
  }
867
  return code;
13,477,602✔
868
}
869

870
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
1,957,354✔
871
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,957,354✔
872
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,957,354✔
873

874
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,957,354✔
875
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,957,354✔
876
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
1,957,360✔
877
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
878
         pLoadInfo->totalElapsed / 1000.0);
879

880
  setOperatorCompleted(pOperator);
1,957,361✔
881
}
1,957,379✔
882

883
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
9,379,760✔
884
  int32_t code = TSDB_CODE_SUCCESS;
9,379,760✔
885
  int32_t lino = 0;
9,379,760✔
886
  size_t  total = taosArrayGetSize(pArray);
9,379,760✔
887

888
  int32_t completed = 0;
9,379,177✔
889
  for (int32_t k = 0; k < total; ++k) {
65,048,918✔
890
    SSourceDataInfo* p = taosArrayGet(pArray, k);
55,677,482✔
891
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
55,658,639!
892
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
55,669,741✔
893
      completed += 1;
27,742,670✔
894
    }
895
  }
896

897
  *pRes = completed;
9,371,436✔
898
_end:
9,371,436✔
899
  if (code != TSDB_CODE_SUCCESS) {
9,371,436!
900
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
901
  }
902
  return code;
9,380,161✔
903
}
904

905
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
3,759,316✔
906
  SExchangeInfo* pExchangeInfo = pOperator->info;
3,759,316✔
907
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,759,316✔
908

909
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
3,759,316✔
910
  int64_t startTs = taosGetTimestampUs();
3,759,450✔
911

912
  // Asynchronously send all fetch requests to all sources.
913
  for (int32_t i = 0; i < totalSources; ++i) {
9,125,305✔
914
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
5,365,830✔
915
    if (code != TSDB_CODE_SUCCESS) {
5,365,855!
916
      pTaskInfo->code = code;
×
917
      return code;
×
918
    }
919
  }
920

921
  int64_t endTs = taosGetTimestampUs();
3,759,524✔
922
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
3,759,524✔
923
         totalSources, (endTs - startTs) / 1000.0);
924

925
  pOperator->status = OP_RES_TO_RETURN;
3,759,525✔
926
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
3,759,488✔
927
  if (isTaskKilled(pTaskInfo)) {
3,759,488!
UNCOV
928
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
929
  }
930

931
  return TSDB_CODE_SUCCESS;
3,759,532✔
932
}
933

934
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
3,635,708✔
935
  int32_t            code = TSDB_CODE_SUCCESS;
3,635,708✔
936
  int32_t            lino = 0;
3,635,708✔
937
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,635,708✔
938
  SSDataBlock*       pb = NULL;
3,635,708✔
939

940
  char* pNextStart = pRetrieveRsp->data;
3,635,708✔
941
  char* pStart = pNextStart;
3,635,708✔
942

943
  int32_t index = 0;
3,635,708✔
944

945
  if (pRetrieveRsp->compressed) {  // decompress the data
3,635,708!
946
    if (pDataInfo->decompBuf == NULL) {
×
947
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
948
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
949
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
950
    } else {
951
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
952
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
953
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
954
        if (p != NULL) {
×
955
          pDataInfo->decompBuf = p;
×
956
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
957
        }
958
      }
959
    }
960
  }
961

962
  while (index++ < pRetrieveRsp->numOfBlocks) {
16,917,221✔
963
    pStart = pNextStart;
13,282,629✔
964

965
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
13,282,629✔
966
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
6,897,672✔
967
      blockDataCleanup(pb);
6,897,425✔
968
    } else {
969
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
6,385,086✔
970
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
6,385,408!
971
    }
972

973
    int32_t compLen = *(int32_t*)pStart;
13,282,353✔
974
    pStart += sizeof(int32_t);
13,282,353✔
975

976
    int32_t rawLen = *(int32_t*)pStart;
13,282,353✔
977
    pStart += sizeof(int32_t);
13,282,353✔
978
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
13,282,353!
979

980
    pNextStart = pStart + compLen;
13,282,353✔
981
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
13,282,353!
982
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
983
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
984
      pStart = pDataInfo->decompBuf;
×
985
    }
986

987
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
13,282,353✔
988
    if (code != 0) {
13,281,477!
989
      taosMemoryFreeClear(pDataInfo->pRsp);
×
990
      goto _end;
×
991
    }
992

993
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
13,281,477✔
994
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
13,281,513!
995
    pb = NULL;
13,281,513✔
996
  }
997

998
_end:
3,634,592✔
999
  if (code != TSDB_CODE_SUCCESS) {
3,634,592!
1000
    blockDataDestroy(pb);
×
1001
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1002
  }
1003
  return code;
3,634,692✔
1004
}
1005

1006
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
2,239✔
1007
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,239✔
1008
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,239✔
1009

1010
  int32_t code = 0;
2,239✔
1011
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2,239✔
1012
  int64_t startTs = taosGetTimestampUs();
2,239✔
1013

1014
  while (1) {
771✔
1015
    if (pExchangeInfo->current >= totalSources) {
3,010✔
1016
      setAllSourcesCompleted(pOperator);
845✔
1017
      return TSDB_CODE_SUCCESS;
845✔
1018
    }
1019

1020
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
2,165✔
1021
    if (!pDataInfo) {
2,165!
1022
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1023
      pTaskInfo->code = terrno;
×
1024
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1025
    }
1026
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,165✔
1027

1028
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
2,165✔
1029
    if (code != TSDB_CODE_SUCCESS) {
2,165!
1030
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1031
      pTaskInfo->code = code;
×
1032
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1033
    }
1034

1035
    code = exchangeWait(pOperator, pExchangeInfo);
2,165✔
1036
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
2,165!
1037
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1038
    }
1039

1040
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2,165✔
1041
    if (!pSource) {
2,165!
1042
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1043
      pTaskInfo->code = terrno;
×
1044
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1045
    }
1046

1047
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2,165!
1048
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
1049
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1050
             tstrerror(pDataInfo->code));
1051
      pOperator->pTaskInfo->code = pDataInfo->code;
×
1052
      return pOperator->pTaskInfo->code;
×
1053
    }
1054

1055
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
2,165✔
1056
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2,165✔
1057

1058
    if (pRsp->numOfRows == 0) {
2,165✔
1059
      qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
771✔
1060
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
1061
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1062
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1063

1064
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
771✔
1065
      if (pDataInfo->isVtbRefScan) {
771!
1066
        pExchangeInfo->current = totalSources;
×
1067
      } else {
1068
        pExchangeInfo->current += 1;
771✔
1069
      }
1070
      taosMemoryFreeClear(pDataInfo->pRsp);
771!
1071
      continue;
771✔
1072
    }
1073

1074
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
1,394✔
1075
    if (code != TSDB_CODE_SUCCESS) {
1,394!
1076
      goto _error;
×
1077
    }
1078

1079
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
1,394✔
1080
    if (pRsp->completed == 1) {
1,394✔
1081
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,181✔
1082
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
1083
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1084
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1085
             pExchangeInfo->current + 1, totalSources);
1086

1087
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,181✔
1088
      if (pDataInfo->isVtbRefScan) {
1,181!
1089
        pExchangeInfo->current = totalSources;
×
1090
      } else {
1091
        pExchangeInfo->current += 1;
1,181✔
1092
      }
1093
    } else {
1094
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
213!
1095
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1096
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1097
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1098
    }
1099
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
1,394!
1100
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
×
1101
    }
1102
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1,394✔
1103
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
1,394✔
1104

1105
    taosMemoryFreeClear(pDataInfo->pRsp);
1,394!
1106
    return TSDB_CODE_SUCCESS;
1,394✔
1107
  }
1108

1109
_error:
×
1110
  pTaskInfo->code = code;
×
1111
  return code;
×
1112
}
1113

1114
void clearVtbScanDataInfo(void* pItem) {
×
1115
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
×
1116
  if (pInfo->colMap) {
×
1117
    taosArrayDestroy(pInfo->colMap->colMap);
×
1118
    taosMemoryFreeClear(pInfo->colMap);
×
1119
  }
1120
  taosArrayDestroy(pInfo->pSrcUidList);
×
1121
}
×
1122

1123
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
994✔
1124
  SExchangeInfo*     pExchangeInfo = pOperator->info;
994✔
1125
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
994✔
1126
  if (NULL == pIdx) {
994!
1127
    qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1128
    return TSDB_CODE_INVALID_PARA;
×
1129
  }
1130

1131
  if (pBasicParam->isVtbRefScan) {
994!
1132
    SSourceDataInfo dataInfo = {0};
×
1133
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
×
1134
    dataInfo.taskId = pExchangeInfo->pTaskId;
×
1135
    dataInfo.index = pIdx->srcIdx;
×
1136
    dataInfo.window = pBasicParam->window;
×
1137
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1138
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1139
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1140
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1141

1142
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1143
    if (dataInfo.pSrcUidList == NULL) {
×
1144
      return terrno;
×
1145
    }
1146

1147
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
×
1148
    dataInfo.srcOpType = pBasicParam->srcOpType;
×
1149
    dataInfo.tableSeq = pBasicParam->tableSeq;
×
1150

1151
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
×
1152
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
×
1153
    if (!tmp) {
×
1154
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1155
      return terrno;
×
1156
    }
1157
  } else {
1158
    if (pIdx->inUseIdx < 0) {
994✔
1159
      SSourceDataInfo dataInfo = {0};
970✔
1160
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
970✔
1161
      dataInfo.taskId = pExchangeInfo->pTaskId;
970✔
1162
      dataInfo.index = pIdx->srcIdx;
970✔
1163
      if (pBasicParam->isVtbRefScan) {
970!
1164
        dataInfo.window = pBasicParam->window;
×
1165
        dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1166
        dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1167
        tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1168
        dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1169
      }
1170

1171
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
970✔
1172
      if (dataInfo.pSrcUidList == NULL) {
970!
1173
        return terrno;
×
1174
      }
1175

1176
      dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
970✔
1177
      dataInfo.srcOpType = pBasicParam->srcOpType;
970✔
1178
      dataInfo.tableSeq = pBasicParam->tableSeq;
970✔
1179

1180
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
970✔
1181
      if (!tmp) {
970!
1182
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1183
        return terrno;
×
1184
      }
1185
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
970✔
1186
    } else {
1187
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
24✔
1188
      if (!pDataInfo) {
24!
1189
        return terrno;
×
1190
      }
1191
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
24!
1192
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
24✔
1193
      }
1194

1195
      if (pBasicParam->isVtbRefScan) {
24!
1196
        pDataInfo->window = pBasicParam->window;
×
1197
        if (!pDataInfo->colMap) {
×
1198
          pDataInfo->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1199
        }
1200
        pDataInfo->colMap->vgId = pBasicParam->colMap->vgId;
×
1201
        tstrncpy(pDataInfo->colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1202
        pDataInfo->colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1203
      }
1204

1205
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
24✔
1206
      if (pDataInfo->pSrcUidList == NULL) {
24!
1207
        return terrno;
×
1208
      }
1209

1210
      pDataInfo->isVtbRefScan = pBasicParam->isVtbRefScan;
24✔
1211

1212
      pDataInfo->srcOpType = pBasicParam->srcOpType;
24✔
1213
      pDataInfo->tableSeq = pBasicParam->tableSeq;
24✔
1214
    }
1215
  }
1216

1217
  return TSDB_CODE_SUCCESS;
994✔
1218
}
1219

1220
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
597✔
1221
  SExchangeInfo*               pExchangeInfo = pOperator->info;
597✔
1222
  int32_t                      code = TSDB_CODE_SUCCESS;
597✔
1223
  SExchangeOperatorBasicParam* pBasicParam = NULL;
597✔
1224
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
597✔
1225
  if (pParam->multiParams) {
597✔
1226
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
549✔
1227
    int32_t                      iter = 0;
549✔
1228
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
1,495✔
1229
      code = addSingleExchangeSource(pOperator, pBasicParam);
946✔
1230
      if (code) {
946!
1231
        return code;
×
1232
      }
1233
    }
1234
  } else {
1235
    pBasicParam = &pParam->basic;
48✔
1236
    code = addSingleExchangeSource(pOperator, pBasicParam);
48✔
1237
  }
1238

1239
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
597✔
1240
  pOperator->pOperatorGetParam = NULL;
597✔
1241

1242
  return code;
597✔
1243
}
1244

1245
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
19,733,565✔
1246
  SExchangeInfo* pExchangeInfo = pOperator->info;
19,733,565✔
1247
  int32_t        code = TSDB_CODE_SUCCESS;
19,733,565✔
1248
  int32_t        lino = 0;
19,733,565✔
1249
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
19,733,565✔
1250
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
3,762,416✔
1251
    return TSDB_CODE_SUCCESS;
15,973,689✔
1252
  }
1253

1254
  if (pExchangeInfo->dynamicOp) {
3,759,876✔
1255
    code = addDynamicExchangeSource(pOperator);
597✔
1256
    QUERY_CHECK_CODE(code, lino, _end);
597!
1257
  }
1258

1259
  if (pOperator->status == OP_NOT_OPENED && pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
3,759,876!
1260
    pExchangeInfo->current = 0;
×
1261
  }
1262

1263
  int64_t st = taosGetTimestampUs();
3,760,336✔
1264

1265
  if (!pExchangeInfo->seqLoadData) {
3,760,336✔
1266
    code = prepareConcurrentlyLoad(pOperator);
3,759,441✔
1267
    QUERY_CHECK_CODE(code, lino, _end);
3,759,513!
1268
    pExchangeInfo->openedTs = taosGetTimestampUs();
3,759,514✔
1269
  }
1270

1271
  OPTR_SET_OPENED(pOperator);
3,760,409✔
1272
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3,760,385✔
1273

1274
_end:
3,760,385✔
1275
  if (code != TSDB_CODE_SUCCESS) {
3,760,385!
1276
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1277
    pOperator->pTaskInfo->code = code;
×
1278
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1279
  }
1280
  return TSDB_CODE_SUCCESS;
3,760,385✔
1281
}
1282

1283
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
22,039✔
1284
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
22,039✔
1285

1286
  if (pLimitInfo->remainGroupOffset > 0) {
22,039!
1287
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1288
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1289
      blockDataCleanup(pBlock);
×
1290
      return PROJECT_RETRIEVE_CONTINUE;
×
1291
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1292
      // now it is the data from a new group
1293
      pLimitInfo->remainGroupOffset -= 1;
×
1294

1295
      // ignore data block in current group
1296
      if (pLimitInfo->remainGroupOffset > 0) {
×
1297
        blockDataCleanup(pBlock);
×
1298
        return PROJECT_RETRIEVE_CONTINUE;
×
1299
      }
1300
    }
1301

1302
    // set current group id of the project operator
1303
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1304
  }
1305

1306
  // here check for a new group data, we need to handle the data of the previous group.
1307
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
22,039✔
1308
    pLimitInfo->numOfOutputGroups += 1;
839✔
1309
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
839!
1310
      pOperator->status = OP_EXEC_DONE;
×
1311
      blockDataCleanup(pBlock);
×
1312

1313
      return PROJECT_RETRIEVE_DONE;
×
1314
    }
1315

1316
    // reset the value for a new group data
1317
    resetLimitInfoForNextGroup(pLimitInfo);
839✔
1318
    // existing rows that belongs to previous group.
1319
    if (pBlock->info.rows > 0) {
839!
1320
      return PROJECT_RETRIEVE_DONE;
839✔
1321
    }
1322
  }
1323

1324
  // here we reach the start position, according to the limit/offset requirements.
1325

1326
  // set current group id
1327
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
21,200✔
1328

1329
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
21,200✔
1330
  if (pBlock->info.rows == 0) {
21,200✔
1331
    return PROJECT_RETRIEVE_CONTINUE;
8,462✔
1332
  } else {
1333
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
12,738!
1334
      setOperatorCompleted(pOperator);
×
1335
      return PROJECT_RETRIEVE_DONE;
×
1336
    }
1337
  }
1338

1339
  // todo optimize performance
1340
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1341
  // they may not belong to the same group the limit/offset value is not valid in this case.
1342
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
12,738!
1343
    return PROJECT_RETRIEVE_DONE;
12,738✔
1344
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1345
    return PROJECT_RETRIEVE_CONTINUE;
×
1346
  }
1347
}
1348

1349
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
5,683,913✔
1350
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
5,683,913✔
1351
  int32_t        code = TSDB_CODE_SUCCESS;
5,683,913✔
1352
  if (pTask->pWorkerCb) {
5,683,913!
1353
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
5,684,013✔
1354
    if (code != TSDB_CODE_SUCCESS) {
5,684,943!
1355
      pTask->code = code;
×
1356
      return pTask->code;
×
1357
    }
1358
  }
1359

1360
  code = tsem_wait(&pExchangeInfo->ready);
5,684,843✔
1361
  if (code != TSDB_CODE_SUCCESS) {
5,684,794!
1362
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1363
    pTask->code = code;
×
1364
    return pTask->code;
×
1365
  }
1366

1367
  if (pTask->pWorkerCb) {
5,684,846!
1368
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
5,684,859✔
1369
    if (code != TSDB_CODE_SUCCESS) {
5,684,979!
1370
      pTask->code = code;
×
1371
      return pTask->code;
×
1372
    }
1373
  }
1374
  return TSDB_CODE_SUCCESS;
5,684,966✔
1375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc