• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.61
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
} SFetchRspHandleWrapper;
32

33
typedef struct SSourceDataInfo {
34
  int32_t            index;
35
  SRetrieveTableRsp* pRsp;
36
  uint64_t           totalRows;
37
  int64_t            startTime;
38
  int32_t            code;
39
  EX_SOURCE_STATUS   status;
40
  const char*        taskId;
41
  SArray*            pSrcUidList;
42
  int32_t            srcOpType;
43
  bool               tableSeq;
44
  char*              decompBuf;
45
  int32_t            decompBufSize;
46
  SOrgTbInfo*     colMap;
47
  bool               isVtbRefScan;
48
  STimeWindow        window;
49
} SSourceDataInfo;
50

51
static void destroyExchangeOperatorInfo(void* param);
52
static void freeBlock(void* pParam);
53
static void freeSourceDataInfo(void* param);
54
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
55

56
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
57
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
58
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
59
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
60
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
61
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
62
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
63
                                 bool holdDataInBuf);
64
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
65

66
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
67

68
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
7,171,661✔
69
                                           SExecTaskInfo* pTaskInfo) {
70
  int32_t code = 0;
7,171,661✔
71
  int32_t lino = 0;
7,171,661✔
72
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
7,171,661✔
73
  int32_t completed = 0;
7,170,743✔
74
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
7,170,743✔
75
  if (code != TSDB_CODE_SUCCESS) {
7,171,227!
76
    pTaskInfo->code = code;
×
77
    T_LONG_JMP(pTaskInfo->env, code);
×
78
  }
79
  if (completed == totalSources) {
7,171,227✔
80
    setAllSourcesCompleted(pOperator);
1,817,706✔
81
    return;
7,173,483✔
82
  }
83

84
  SSourceDataInfo* pDataInfo = NULL;
5,353,521✔
85

86
  while (1) {
314,487✔
87
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
5,668,008✔
88
    code = exchangeWait(pOperator, pExchangeInfo);
5,668,011✔
89

90
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
5,670,737!
91
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
29!
92
    }
93

94
    for (int32_t i = 0; i < totalSources; ++i) {
31,871,527✔
95
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
31,838,839✔
96
      QUERY_CHECK_NULL(pDataInfo, code, lino, _error, terrno);
31,814,847!
97
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
31,864,615✔
98
        continue;
24,706,332✔
99
      }
100

101
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
7,158,283✔
102
        continue;
1,494,549✔
103
      }
104

105
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
5,663,734✔
106
        code = pDataInfo->code;
15✔
107
        goto _error;
15✔
108
      }
109

110
      tmemory_barrier();
5,663,719✔
111
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
5,663,719✔
112
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
5,663,719✔
113
      QUERY_CHECK_NULL(pSource, code, lino, _error, terrno);
5,668,503!
114

115
      // todo
116
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
5,669,073✔
117
      if (pRsp->numOfRows == 0) {
5,669,073✔
118
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
2,114,593!
119
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
120
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
121
          if (code != TSDB_CODE_SUCCESS) {
×
122
            taosMemoryFreeClear(pDataInfo->pRsp);
×
123
            goto _error;
×
124
          }
125
        } else {
126
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,114,593✔
127
          qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
2,114,593✔
128
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
129
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
130
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
131
          taosMemoryFreeClear(pDataInfo->pRsp);
2,114,585!
132
        }
133
        break;
2,114,747✔
134
      }
135

136
      code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,554,480✔
137
      if (code != TSDB_CODE_SUCCESS) {
3,554,928!
138
        goto _error;
×
139
      }
140

141
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,554,928✔
142
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
3,554,928✔
143
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,555,257✔
144

145
      if (pRsp->completed == 1) {
3,555,257✔
146
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
3,253,015✔
147
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
3,253,015✔
148
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
149
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu,
150
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
151
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
152
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
153
      } else {
154
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
302,242✔
155
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
156
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
157
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
158
      }
159

160
      taosMemoryFreeClear(pDataInfo->pRsp);
3,555,219!
161

162
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan) {
3,555,629!
163
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
302,253✔
164
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
302,253✔
165
        if (code != TSDB_CODE_SUCCESS) {
302,252!
166
          taosMemoryFreeClear(pDataInfo->pRsp);
×
167
          goto _error;
×
168
        }
169
      }
170
      return;
5,355,681✔
171
    }  // end loop
172

173
    int32_t complete1 = 0;
2,147,435✔
174
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
2,147,435✔
175
    if (code != TSDB_CODE_SUCCESS) {
2,114,484!
UNCOV
176
      pTaskInfo->code = code;
×
UNCOV
177
      T_LONG_JMP(pTaskInfo->env, code);
×
178
    }
179
    if (complete1 == totalSources) {
2,114,537✔
180
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
1,800,050✔
181
      return;
1,800,053✔
182
    }
183
  }
184

185
_error:
15✔
186
  pTaskInfo->code = code;
15✔
187
}
188

189
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
18,674,001✔
190
  int32_t        code = TSDB_CODE_SUCCESS;
18,674,001✔
191
  SExchangeInfo* pExchangeInfo = pOperator->info;
18,674,001✔
192
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
18,674,001✔
193

194
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
18,674,001✔
195

196
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
18,671,891✔
197
  if (pOperator->status == OP_EXEC_DONE) {
18,671,891!
198
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
199
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
200
           pLoadInfo->totalElapsed / 1000.0);
201
    return NULL;
×
202
  }
203

204
  // we have buffered retrieved datablock, return it directly
205
  SSDataBlock* p = NULL;
18,671,891✔
206
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
18,671,891✔
207
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
11,498,442✔
208
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
11,498,183✔
209
  }
210

211
  if (p != NULL) {
18,672,579✔
212
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
11,498,422✔
213
    if (!tmp) {
11,498,581!
214
      code = terrno;
×
215
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
216
      pTaskInfo->code = code;
×
217
      T_LONG_JMP(pTaskInfo->env, code);
×
218
    }
219
    return p;
11,498,581✔
220
  } else {
221
    if (pExchangeInfo->seqLoadData) {
7,174,157✔
222
      code = seqLoadRemoteData(pOperator);
2,452✔
223
      if (code != TSDB_CODE_SUCCESS) {
2,452!
224
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
225
        pTaskInfo->code = code;
×
226
        T_LONG_JMP(pTaskInfo->env, code);
×
227
      }
228
    } else {
229
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
7,171,705✔
230
    }
231
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
7,175,180✔
232
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
15!
233
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
15!
234
    }
235
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
7,175,165✔
236
      return NULL;
3,618,390✔
237
    } else {
238
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
3,555,562✔
239
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
3,555,064✔
240
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
3,556,018✔
241
      if (!tmp) {
3,556,557!
242
        code = terrno;
×
243
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
244
        pTaskInfo->code = code;
×
245
        T_LONG_JMP(pTaskInfo->env, code);
×
246
      }
247
      return p;
3,556,557✔
248
    }
249
  }
250
}
251

252
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
18,664,556✔
253
  int32_t        code = TSDB_CODE_SUCCESS;
18,664,556✔
254
  int32_t        lino = 0;
18,664,556✔
255
  SExchangeInfo* pExchangeInfo = pOperator->info;
18,664,556✔
256
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
18,664,556✔
257

258
  code = pOperator->fpSet._openFn(pOperator);
18,664,556✔
259
  QUERY_CHECK_CODE(code, lino, _end);
18,664,635!
260

261
  if (pOperator->status == OP_EXEC_DONE) {
18,664,635✔
262
    (*ppRes) = NULL;
63✔
263
    return code;
63✔
264
  }
265

266
  while (1) {
9,546✔
267
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
18,674,118✔
268
    if (pBlock == NULL) {
18,671,580✔
269
      (*ppRes) = NULL;
3,618,503✔
270
      return code;
3,618,503✔
271
    }
272

273
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
15,053,077✔
274
    QUERY_CHECK_CODE(code, lino, _end);
15,054,755!
275

276
    if (blockDataGetNumOfRows(pBlock) == 0) {
15,054,755✔
277
      continue;
4✔
278
    }
279

280
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
15,054,782✔
281
    if (hasLimitOffsetInfo(pLimitInfo)) {
15,054,782✔
282
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
24,550✔
283
      if (status == PROJECT_RETRIEVE_CONTINUE) {
24,530✔
284
        continue;
9,542✔
285
      } else if (status == PROJECT_RETRIEVE_DONE) {
14,988!
286
        if (pBlock->info.rows == 0) {
14,988!
287
          setOperatorCompleted(pOperator);
×
288
          (*ppRes) = NULL;
×
289
          return code;
×
290
        } else {
291
          (*ppRes) = pBlock;
14,988✔
292
          return code;
14,988✔
293
        }
294
      }
295
    } else {
296
      (*ppRes) = pBlock;
15,030,280✔
297
      return code;
15,030,280✔
298
    }
299
  }
300

301
_end:
×
302
  if (code != TSDB_CODE_SUCCESS) {
×
303
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
304
    pTaskInfo->code = code;
×
305
    T_LONG_JMP(pTaskInfo->env, code);
×
306
  }
307
  (*ppRes) = NULL;
×
308
  return code;
×
309
}
310

311
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
3,697,567✔
312
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
3,697,567✔
313
  if (pInfo->pSourceDataInfo == NULL) {
3,697,332!
314
    return terrno;
×
315
  }
316

317
  if (pInfo->dynamicOp) {
3,697,356✔
318
    return TSDB_CODE_SUCCESS;
22,191✔
319
  }
320

321
  int32_t len = strlen(id) + 1;
3,675,165✔
322
  pInfo->pTaskId = taosMemoryCalloc(1, len);
3,675,165!
323
  if (!pInfo->pTaskId) {
3,675,411!
324
    return terrno;
×
325
  }
326
  tstrncpy(pInfo->pTaskId, id, len);
3,675,411✔
327
  for (int32_t i = 0; i < numOfSources; ++i) {
9,046,003✔
328
    SSourceDataInfo dataInfo = {0};
5,370,546✔
329
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
5,370,546✔
330
    dataInfo.taskId = pInfo->pTaskId;
5,370,546✔
331
    dataInfo.index = i;
5,370,546✔
332
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
5,370,546✔
333
    if (pDs == NULL) {
5,370,592!
334
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
335
      return terrno;
×
336
    }
337
  }
338

339
  return TSDB_CODE_SUCCESS;
3,675,457✔
340
}
341

342
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
3,697,475✔
343
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
3,697,475!
344

345
  if (numOfSources == 0) {
3,697,475!
346
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
347
    return TSDB_CODE_INVALID_PARA;
×
348
  }
349
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
3,697,475✔
350
  if (!pInfo->pFetchRpcHandles) {
3,697,656!
351
    return terrno;
×
352
  }
353
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
3,697,656✔
354
  if (!ret) {
3,697,553!
355
    return terrno;
×
356
  }
357

358
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
3,697,553✔
359
  if (pInfo->pSources == NULL) {
3,697,315!
360
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
361
    return terrno;
×
362
  }
363

364
  if (pExNode->node.dynamicOp) {
3,697,315✔
365
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
22,191✔
366
    if (NULL == pInfo->pHashSources) {
22,184!
367
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
368
      return terrno;
×
369
    }
370
  }
371

372
  for (int32_t i = 0; i < numOfSources; ++i) {
9,112,531✔
373
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
5,414,990✔
374
    if (!pNode) {
5,414,984!
375
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
376
      return terrno;
×
377
    }
378
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
5,414,984✔
379
    if (!tmp) {
5,415,140!
380
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
381
      return terrno;
×
382
    }
383
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
5,415,140✔
384
    int32_t           code =
385
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
5,415,140✔
386
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
5,415,223!
387
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
388
      return code;
×
389
    }
390
  }
391

392
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
3,697,541✔
393
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
3,697,569✔
394
  if (refId < 0) {
3,697,598!
395
    int32_t code = terrno;
×
396
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
397
    return code;
×
398
  } else {
399
    pInfo->self = refId;
3,697,598✔
400
  }
401

402
  return initDataSource(numOfSources, pInfo, id);
3,697,598✔
403
}
404

405
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
3,697,521✔
406
                                   SOperatorInfo** pOptrInfo) {
407
  QRY_PARAM_CHECK(pOptrInfo);
3,697,521!
408

409
  int32_t        code = 0;
3,697,521✔
410
  int32_t        lino = 0;
3,697,521✔
411
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
3,697,521!
412
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,697,466!
413
  if (pInfo == NULL || pOperator == NULL) {
3,697,455!
414
    code = terrno;
×
415
    goto _error;
×
416
  }
417

418
  pInfo->dynamicOp = pExNode->node.dynamicOp;
3,697,467✔
419
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
3,697,467✔
420
  QUERY_CHECK_CODE(code, lino, _error);
3,697,417!
421

422
  code = tsem_init(&pInfo->ready, 0, 0);
3,697,417✔
423
  QUERY_CHECK_CODE(code, lino, _error);
3,697,511!
424

425
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
3,697,511✔
426
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
3,697,638!
427

428
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
3,697,638✔
429
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
3,697,471!
430
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
3,697,471✔
431
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
3,697,386!
432

433
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
3,697,386✔
434
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
3,697,386✔
435
  QUERY_CHECK_CODE(code, lino, _error);
3,697,676!
436

437
  pInfo->seqLoadData = pExNode->seqRecvData;
3,697,676✔
438
  pInfo->pTransporter = pTransporter;
3,697,676✔
439

440
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
3,697,676✔
441
                  pTaskInfo);
442
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
3,697,587✔
443

444
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
3,697,362✔
445
  QUERY_CHECK_CODE(code, lino, _error);
3,697,356!
446

447
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
3,697,356✔
448
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
449
  *pOptrInfo = pOperator;
3,697,485✔
450
  return TSDB_CODE_SUCCESS;
3,697,485✔
451

452
_error:
×
453
  if (code != TSDB_CODE_SUCCESS) {
×
454
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
455
    pTaskInfo->code = code;
×
456
  }
457
  if (pInfo != NULL) {
×
458
    doDestroyExchangeOperatorInfo(pInfo);
×
459
  }
460

461
  if (pOperator != NULL) {
×
462
    pOperator->info = NULL;
×
463
    destroyOperator(pOperator);
×
464
  }
465
  return code;
×
466
}
467

468
void destroyExchangeOperatorInfo(void* param) {
3,697,630✔
469
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,697,630✔
470
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
3,697,630✔
471
  if (code != TSDB_CODE_SUCCESS) {
3,697,735!
472
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
473
  }
474
}
3,697,735✔
475

476
void freeBlock(void* pParam) {
7,185,909✔
477
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
7,185,909✔
478
  blockDataDestroy(pBlock);
7,185,909✔
479
}
7,185,970✔
480

481
void freeSourceDataInfo(void* p) {
5,372,566✔
482
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
5,372,566✔
483
  taosMemoryFreeClear(pInfo->decompBuf);
5,372,566!
484
  taosMemoryFreeClear(pInfo->pRsp);
5,372,566!
485

486
  pInfo->decompBufSize = 0;
5,372,566✔
487
}
5,372,566✔
488

489
void doDestroyExchangeOperatorInfo(void* param) {
3,697,667✔
490
  if (param == NULL) {
3,697,667!
491
    return;
×
492
  }
493
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,697,667✔
494
  if (pExInfo->pFetchRpcHandles) {
3,697,667!
495
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
9,110,675✔
496
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
5,413,518✔
497
      if (*pRpcHandle > 0) {
5,412,933✔
498
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
6,912✔
499
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
6,912✔
500
      }
501
    }
502
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
3,697,157✔
503
  }
504

505
  taosArrayDestroy(pExInfo->pSources);
3,697,554✔
506
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3,697,681✔
507

508
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
3,697,625✔
509
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
3,697,676✔
510

511
  blockDataDestroy(pExInfo->pDummyBlock);
3,697,719✔
512
  tSimpleHashCleanup(pExInfo->pHashSources);
3,697,726✔
513

514
  int32_t code = tsem_destroy(&pExInfo->ready);
3,697,691✔
515
  if (code != TSDB_CODE_SUCCESS) {
3,697,645!
516
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
517
  }
518
  taosMemoryFreeClear(pExInfo->pTaskId);
3,697,645!
519

520
  taosMemoryFreeClear(param);
3,697,680!
521
}
522

523
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
5,670,573✔
524
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
5,670,573✔
525

526
  taosMemoryFreeClear(pMsg->pEpSet);
5,670,573!
527
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
5,670,573✔
528
  if (pExchangeInfo == NULL) {
5,673,080✔
529
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
107!
530
    taosMemoryFree(pMsg->pData);
107!
531
    return TSDB_CODE_SUCCESS;
107✔
532
  }
533

534
  int32_t          index = pWrapper->sourceIndex;
5,672,973✔
535
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
5,672,973✔
536

537
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
5,649,049✔
538
  if (pRpcHandle != NULL) {
5,643,306✔
539
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
5,643,172✔
540
    if (ret != 0) {
5,671,624✔
541
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
22,500✔
542
    }
543
    *pRpcHandle = -1;
5,671,642✔
544
  }
545

546
  if (!pSourceDataInfo) {
5,671,776!
547
    return terrno;
×
548
  }
549

550
  if (code == TSDB_CODE_SUCCESS) {
5,671,776✔
551
    pSourceDataInfo->pRsp = pMsg->pData;
5,670,174✔
552

553
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
5,670,174✔
554
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
5,670,174✔
555
    pRsp->compLen = htonl(pRsp->compLen);
5,668,632✔
556
    pRsp->payloadLen = htonl(pRsp->payloadLen);
5,668,632✔
557
    pRsp->numOfCols = htonl(pRsp->numOfCols);
5,668,632✔
558
    pRsp->useconds = htobe64(pRsp->useconds);
5,668,632✔
559
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
5,671,528✔
560

561
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
5,671,528✔
562
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
563
  } else {
564
    taosMemoryFree(pMsg->pData);
1,602!
565
    pSourceDataInfo->code = rpcCvtErrCode(code);
17✔
566
    if (pSourceDataInfo->code != code) {
17!
567
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
568
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
569
    } else {
570
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
17!
571
             pExchangeInfo);
572
    }
573
  }
574

575
  tmemory_barrier();
5,671,550✔
576
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
5,671,550✔
577
  code = tsem_post(&pExchangeInfo->ready);
5,671,550✔
578
  if (code != TSDB_CODE_SUCCESS) {
5,672,044!
579
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
580
    return code;
×
581
  }
582

583
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
5,672,044✔
584
  if (code != TSDB_CODE_SUCCESS) {
5,672,956!
585
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
586
  }
587
  return code;
5,672,698✔
588
}
589

590
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
1,381✔
591
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,381!
592
  if (NULL == *ppRes) {
1,381!
593
    return terrno;
×
594
  }
595

596
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
1,381!
597
  if (NULL == pScan) {
1,381!
598
    taosMemoryFreeClear(*ppRes);
×
599
    return terrno;
×
600
  }
601

602
  pScan->pUidList = taosArrayDup(pUidList, NULL);
1,381✔
603
  if (NULL == pScan->pUidList) {
1,381!
604
    taosMemoryFree(pScan);
×
605
    taosMemoryFreeClear(*ppRes);
×
606
    return terrno;
×
607
  }
608
  pScan->tableSeq = tableSeq;
1,381✔
609
  pScan->pOrgTbInfo = NULL;
1,381✔
610
  pScan->window.skey = INT64_MAX;
1,381✔
611
  pScan->window.ekey = INT64_MIN;
1,381✔
612

613
  (*ppRes)->opType = srcOpType;
1,381✔
614
  (*ppRes)->downstreamIdx = 0;
1,381✔
615
  (*ppRes)->value = pScan;
1,381✔
616
  (*ppRes)->pChildren = NULL;
1,381✔
617
  (*ppRes)->reUse = false;
1,381✔
618

619
  return TSDB_CODE_SUCCESS;
1,381✔
620
}
621

622
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window) {
228✔
623
  int32_t                  code = TSDB_CODE_SUCCESS;
228✔
624
  int32_t                  lino = 0;
228✔
625
  STableScanOperatorParam* pScan = NULL;
228✔
626

627
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
228!
628
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
228!
629

630
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
228!
631
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
228!
632

633
  pScan->pUidList = taosArrayDup(pUidList, NULL);
228✔
634
  QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
228!
635

636
  pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
228!
637
  QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
228!
638

639
  pScan->pOrgTbInfo->vgId = pMap->vgId;
228✔
640
  tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
228✔
641

642
  pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
228✔
643
  QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
228!
644

645
  pScan->tableSeq = tableSeq;
228✔
646
  pScan->window.skey = window->skey;
228✔
647
  pScan->window.ekey = window->ekey;
228✔
648

649
  (*ppRes)->opType = srcOpType;
228✔
650
  (*ppRes)->downstreamIdx = 0;
228✔
651
  (*ppRes)->value = pScan;
228✔
652
  (*ppRes)->pChildren = NULL;
228✔
653
  (*ppRes)->reUse = false;
228✔
654

655
  return code;
228✔
656
_return:
×
657
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
658
  taosMemoryFreeClear(*ppRes);
×
659
  if (pScan) {
×
660
    taosArrayDestroy(pScan->pUidList);
×
661
    if (pScan->pOrgTbInfo) {
×
662
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
663
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
664
    }
665
    taosMemoryFree(pScan);
×
666
  }
667
  return code;
×
668
}
669

670
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
5,672,437✔
671
  int32_t          code = TSDB_CODE_SUCCESS;
5,672,437✔
672
  int32_t          lino = 0;
5,672,437✔
673
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
5,672,437✔
674
  if (!pDataInfo) {
5,670,627!
675
    return terrno;
×
676
  }
677

678
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
5,670,781!
679
    return TSDB_CODE_SUCCESS;
×
680
  }
681

682
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
5,670,781✔
683
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
5,670,781✔
684
  if (!pSource) {
5,669,855!
685
    return terrno;
×
686
  }
687

688
  pDataInfo->startTime = taosGetTimestampUs();
5,672,898✔
689
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
5,672,898✔
690

691
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
5,673,404!
692
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
5,674,864!
693
  pWrapper->exchangeId = pExchangeInfo->self;
5,674,864✔
694
  pWrapper->sourceIndex = sourceIndex;
5,674,864✔
695

696
  if (pSource->localExec) {
5,674,864✔
697
    SDataBuf pBuf = {0};
16,199✔
698
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
16,199✔
699
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
700
                                               pTaskInfo->localFetch.explainRes);
701
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
16,199✔
702
    taosMemoryFree(pWrapper);
16,199!
703
    QUERY_CHECK_CODE(code, lino, _end);
16,199!
704
  } else {
705
    SResFetchReq req = {0};
5,658,665✔
706
    req.header.vgId = pSource->addr.nodeId;
5,658,665✔
707
    req.sId = pSource->sId;
5,658,665✔
708
    req.clientId = pSource->clientId;
5,658,665✔
709
    req.taskId = pSource->taskId;
5,658,665✔
710
    req.queryId = pTaskInfo->id.queryId;
5,658,665✔
711
    req.execId = pSource->execId;
5,658,665✔
712
    if (pDataInfo->isVtbRefScan) {
5,658,665✔
713
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window);
228✔
714
      taosArrayDestroy(pDataInfo->colMap->colMap);
228✔
715
      taosMemoryFreeClear(pDataInfo->colMap);
228!
716
      taosArrayDestroy(pDataInfo->pSrcUidList);
228✔
717
      pDataInfo->pSrcUidList = NULL;
228✔
718
      if (TSDB_CODE_SUCCESS != code) {
228!
719
        pTaskInfo->code = code;
×
720
        taosMemoryFree(pWrapper);
×
721
        return pTaskInfo->code;
×
722
      }
723
    } else {
724
      if (pDataInfo->pSrcUidList) {
5,658,437✔
725
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
1,303✔
726
        taosArrayDestroy(pDataInfo->pSrcUidList);
1,303✔
727
        pDataInfo->pSrcUidList = NULL;
1,303✔
728
        if (TSDB_CODE_SUCCESS != code) {
1,303!
729
          pTaskInfo->code = code;
×
730
          taosMemoryFree(pWrapper);
×
731
          return pTaskInfo->code;
×
732
        }
733
      }
734
    }
735

736
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
5,658,665✔
737
    if (msgSize < 0) {
5,658,338!
738
      pTaskInfo->code = msgSize;
×
739
      taosMemoryFree(pWrapper);
×
740
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
741
      return pTaskInfo->code;
×
742
    }
743

744
    void* msg = taosMemoryCalloc(1, msgSize);
5,658,338!
745
    if (NULL == msg) {
5,658,543!
746
      pTaskInfo->code = terrno;
×
747
      taosMemoryFree(pWrapper);
×
748
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
749
      return pTaskInfo->code;
×
750
    }
751

752
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req);
5,658,543✔
753
    if (msgSize < 0) {
5,658,056!
754
      pTaskInfo->code = msgSize;
×
755
      taosMemoryFree(pWrapper);
×
756
      taosMemoryFree(msg);
×
757
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
758
      return pTaskInfo->code;
×
759
    }
760

761
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
5,658,056✔
762

763
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
5,658,042✔
764
           ", execId:%d, %p, %d/%" PRIzu,
765
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
766
           pSource->taskId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
767

768
    // send the fetch remote task result reques
769
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,658,050!
770
    if (NULL == pMsgSendInfo) {
5,658,215!
771
      taosMemoryFreeClear(msg);
×
772
      taosMemoryFree(pWrapper);
×
773
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
774
      pTaskInfo->code = terrno;
×
775
      return pTaskInfo->code;
×
776
    }
777

778
    pMsgSendInfo->param = pWrapper;
5,658,215✔
779
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,658,215✔
780
    pMsgSendInfo->msgInfo.pData = msg;
5,658,215✔
781
    pMsgSendInfo->msgInfo.len = msgSize;
5,658,215✔
782
    pMsgSendInfo->msgType = pSource->fetchMsgType;
5,658,215✔
783
    pMsgSendInfo->fp = loadRemoteDataCallback;
5,658,215✔
784

785
    int64_t transporterId = 0;
5,658,215✔
786
    void* poolHandle = NULL;
5,658,215✔
787
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
5,658,215✔
788
    QUERY_CHECK_CODE(code, lino, _end);
5,658,774!
789
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
5,658,774✔
790
    *pRpcHandle = transporterId;
5,656,042✔
791
  }
792

793
_end:
5,672,241✔
794
  if (code != TSDB_CODE_SUCCESS) {
5,672,241!
795
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
796
  }
797
  return code;
5,672,230✔
798
}
799

800
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
3,755,292✔
801
                          SOperatorInfo* pOperator) {
802
  pInfo->totalRows += numOfRows;
3,755,292✔
803
  pInfo->totalSize += dataLen;
3,755,292✔
804
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
3,755,626✔
805
  pOperator->resultInfo.totalRows += numOfRows;
3,755,626✔
806
}
3,755,626✔
807

808
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
15,365,452✔
809
  int32_t      code = TSDB_CODE_SUCCESS;
15,365,452✔
810
  int32_t      lino = 0;
15,365,452✔
811
  SSDataBlock* pBlock = NULL;
15,365,452✔
812
  if (pColList == NULL) {  // data from other sources
15,365,452✔
813
    blockDataCleanup(pRes);
15,167,400✔
814
    code = blockDecode(pRes, pData, (const char**)pNextStart);
15,166,497✔
815
    if (code) {
15,163,604!
816
      return code;
×
817
    }
818
  } else {  // extract data according to pColList
819
    char* pStart = pData;
198,052✔
820

821
    int32_t numOfCols = htonl(*(int32_t*)pStart);
198,052✔
822
    pStart += sizeof(int32_t);
198,052✔
823

824
    // todo refactor:extract method
825
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
198,052✔
826
    for (int32_t i = 0; i < numOfCols; ++i) {
3,478,362✔
827
      SSysTableSchema* p = (SSysTableSchema*)pStart;
3,280,310✔
828

829
      p->colId = htons(p->colId);
3,280,310✔
830
      p->bytes = htonl(p->bytes);
3,280,310✔
831
      pStart += sizeof(SSysTableSchema);
3,280,310✔
832
    }
833

834
    pBlock = NULL;
198,052✔
835
    code = createDataBlock(&pBlock);
198,052✔
836
    QUERY_CHECK_CODE(code, lino, _end);
198,725!
837

838
    for (int32_t i = 0; i < numOfCols; ++i) {
3,474,557✔
839
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
3,275,792✔
840
      code = blockDataAppendColInfo(pBlock, &idata);
3,281,657✔
841
      QUERY_CHECK_CODE(code, lino, _end);
3,275,832!
842
    }
843

844
    const char* pDummy = NULL;
198,765✔
845
    code = blockDecode(pBlock, pStart, &pDummy);
198,765✔
846
    QUERY_CHECK_CODE(code, lino, _end);
198,673!
847

848
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
198,673✔
849
    QUERY_CHECK_CODE(code, lino, _end);
198,728!
850

851
    // data from mnode
852
    pRes->info.dataLoad = 1;
198,728✔
853
    pRes->info.rows = pBlock->info.rows;
198,728✔
854
    pRes->info.scanFlag = MAIN_SCAN;
198,728✔
855
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
198,728✔
856
    QUERY_CHECK_CODE(code, lino, _end);
198,685!
857

858
    blockDataDestroy(pBlock);
198,685✔
859
    pBlock = NULL;
198,764✔
860
  }
861

862
_end:
15,362,368✔
863
  if (code != TSDB_CODE_SUCCESS) {
15,362,368!
864
    blockDataDestroy(pBlock);
×
865
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
866
  }
867
  return code;
15,362,288✔
868
}
869

870
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
1,818,548✔
871
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,818,548✔
872
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,818,548✔
873

874
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,818,548✔
875
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,818,548✔
876
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
1,818,563✔
877
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
878
         pLoadInfo->totalElapsed / 1000.0);
879

880
  setOperatorCompleted(pOperator);
1,818,563✔
881
}
1,818,637✔
882

883
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
9,285,272✔
884
  int32_t code = TSDB_CODE_SUCCESS;
9,285,272✔
885
  int32_t lino = 0;
9,285,272✔
886
  size_t  total = taosArrayGetSize(pArray);
9,285,272✔
887

888
  int32_t completed = 0;
9,283,746✔
889
  for (int32_t k = 0; k < total; ++k) {
71,894,554✔
890
    SSourceDataInfo* p = taosArrayGet(pArray, k);
62,622,525✔
891
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
62,583,193!
892
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
62,610,808✔
893
      completed += 1;
31,374,969✔
894
    }
895
  }
896

897
  *pRes = completed;
9,272,029✔
898
_end:
9,272,029✔
899
  if (code != TSDB_CODE_SUCCESS) {
9,272,029!
900
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
901
  }
902
  return code;
9,283,059✔
903
}
904

905
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
3,674,661✔
906
  SExchangeInfo* pExchangeInfo = pOperator->info;
3,674,661✔
907
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,674,661✔
908

909
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
3,674,661✔
910
  int64_t startTs = taosGetTimestampUs();
3,674,651✔
911

912
  // Asynchronously send all fetch requests to all sources.
913
  for (int32_t i = 0; i < totalSources; ++i) {
9,042,284✔
914
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
5,367,857✔
915
    if (code != TSDB_CODE_SUCCESS) {
5,367,633!
916
      pTaskInfo->code = code;
×
917
      return code;
×
918
    }
919
  }
920

921
  int64_t endTs = taosGetTimestampUs();
3,674,708✔
922
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
3,674,708✔
923
         totalSources, (endTs - startTs) / 1000.0);
924

925
  pOperator->status = OP_RES_TO_RETURN;
3,674,707✔
926
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
3,674,740✔
927
  if (isTaskKilled(pTaskInfo)) {
3,674,740!
928
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
929
  }
930

931
  return TSDB_CODE_SUCCESS;
3,674,859✔
932
}
933

934
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
3,556,065✔
935
  int32_t            code = TSDB_CODE_SUCCESS;
3,556,065✔
936
  int32_t            lino = 0;
3,556,065✔
937
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,556,065✔
938
  SSDataBlock*       pb = NULL;
3,556,065✔
939

940
  char* pNextStart = pRetrieveRsp->data;
3,556,065✔
941
  char* pStart = pNextStart;
3,556,065✔
942

943
  int32_t index = 0;
3,556,065✔
944

945
  if (pRetrieveRsp->compressed) {  // decompress the data
3,556,065!
946
    if (pDataInfo->decompBuf == NULL) {
×
947
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
948
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
949
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
950
    } else {
951
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
952
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
953
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
954
        if (p != NULL) {
×
955
          pDataInfo->decompBuf = p;
×
956
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
957
        }
958
      }
959
    }
960
  }
961

962
  while (index++ < pRetrieveRsp->numOfBlocks) {
18,722,452✔
963
    pStart = pNextStart;
15,165,862✔
964

965
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
15,165,862✔
966
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
7,981,406✔
967
      blockDataCleanup(pb);
7,981,378✔
968
    } else {
969
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
7,184,575✔
970
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
7,185,551!
971
    }
972

973
    int32_t compLen = *(int32_t*)pStart;
15,166,486✔
974
    pStart += sizeof(int32_t);
15,166,486✔
975

976
    int32_t rawLen = *(int32_t*)pStart;
15,166,486✔
977
    pStart += sizeof(int32_t);
15,166,486✔
978
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
15,166,486!
979

980
    pNextStart = pStart + compLen;
15,166,486✔
981
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
15,166,486!
982
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
983
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
984
      pStart = pDataInfo->decompBuf;
×
985
    }
986

987
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
15,166,486✔
988
    if (code != 0) {
15,164,168!
989
      taosMemoryFreeClear(pDataInfo->pRsp);
×
990
      goto _end;
×
991
    }
992

993
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
15,164,168✔
994
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
15,166,387!
995
    pb = NULL;
15,166,387✔
996
  }
997

998
_end:
3,556,590✔
999
  if (code != TSDB_CODE_SUCCESS) {
3,556,590!
1000
    blockDataDestroy(pb);
×
1001
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1002
  }
1003
  return code;
3,556,569✔
1004
}
1005

1006
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
2,452✔
1007
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,452✔
1008
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,452✔
1009

1010
  int32_t code = 0;
2,452✔
1011
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2,452✔
1012
  int64_t startTs = taosGetTimestampUs();
2,452✔
1013

1014
  while (1) {
727✔
1015
    if (pExchangeInfo->current >= totalSources) {
3,179✔
1016
      setAllSourcesCompleted(pOperator);
837✔
1017
      return TSDB_CODE_SUCCESS;
837✔
1018
    }
1019

1020
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
2,342✔
1021
    if (!pDataInfo) {
2,342!
1022
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1023
      pTaskInfo->code = terrno;
×
1024
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1025
    }
1026
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,342✔
1027

1028
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
2,342✔
1029
    if (code != TSDB_CODE_SUCCESS) {
2,342!
1030
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1031
      pTaskInfo->code = code;
×
1032
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1033
    }
1034

1035
    code = exchangeWait(pOperator, pExchangeInfo);
2,342✔
1036
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
2,342!
1037
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1038
    }
1039

1040
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2,342✔
1041
    if (!pSource) {
2,342!
1042
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1043
      pTaskInfo->code = terrno;
×
1044
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1045
    }
1046

1047
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2,342!
1048
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
1049
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1050
             tstrerror(pDataInfo->code));
1051
      pOperator->pTaskInfo->code = pDataInfo->code;
×
1052
      return pOperator->pTaskInfo->code;
×
1053
    }
1054

1055
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
2,342✔
1056
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2,342✔
1057

1058
    if (pRsp->numOfRows == 0) {
2,342✔
1059
      qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
727✔
1060
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
1061
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1062
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1063

1064
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
727✔
1065
      if (pDataInfo->isVtbRefScan) {
727✔
1066
        pExchangeInfo->current = totalSources;
57✔
1067
      } else {
1068
        pExchangeInfo->current += 1;
670✔
1069
      }
1070
      taosMemoryFreeClear(pDataInfo->pRsp);
727!
1071
      continue;
727✔
1072
    }
1073

1074
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
1,615✔
1075
    if (code != TSDB_CODE_SUCCESS) {
1,615!
1076
      goto _error;
×
1077
    }
1078

1079
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
1,615✔
1080
    if (pRsp->completed == 1) {
1,615✔
1081
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,015✔
1082
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
1083
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1084
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1085
             pExchangeInfo->current + 1, totalSources);
1086

1087
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,015✔
1088
      if (pDataInfo->isVtbRefScan) {
1,015!
1089
        pExchangeInfo->current = totalSources;
×
1090
      } else {
1091
        pExchangeInfo->current += 1;
1,015✔
1092
      }
1093
    } else {
1094
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
600✔
1095
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1096
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1097
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1098
    }
1099
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
1,615!
1100
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
171✔
1101
    }
1102
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1,615✔
1103
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
1,615✔
1104

1105
    taosMemoryFreeClear(pDataInfo->pRsp);
1,615!
1106
    return TSDB_CODE_SUCCESS;
1,615✔
1107
  }
1108

1109
_error:
×
1110
  pTaskInfo->code = code;
×
1111
  return code;
×
1112
}
1113

1114
void clearVtbScanDataInfo(void* pItem) {
51✔
1115
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
51✔
1116
  if (pInfo->colMap) {
51!
1117
    taosArrayDestroy(pInfo->colMap->colMap);
×
1118
    taosMemoryFreeClear(pInfo->colMap);
×
1119
  }
1120
  taosArrayDestroy(pInfo->pSrcUidList);
51✔
1121
}
51✔
1122

1123
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
1,531✔
1124
  SExchangeInfo*     pExchangeInfo = pOperator->info;
1,531✔
1125
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
1,531✔
1126
  if (NULL == pIdx) {
1,531!
1127
    qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1128
    return TSDB_CODE_INVALID_PARA;
×
1129
  }
1130

1131
  if (pBasicParam->isVtbRefScan) {
1,531✔
1132
    SSourceDataInfo dataInfo = {0};
228✔
1133
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
228✔
1134
    dataInfo.taskId = pExchangeInfo->pTaskId;
228✔
1135
    dataInfo.index = pIdx->srcIdx;
228✔
1136
    dataInfo.window = pBasicParam->window;
228✔
1137
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
228!
1138
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
228✔
1139
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
228✔
1140
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
228✔
1141

1142
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
228✔
1143
    if (dataInfo.pSrcUidList == NULL) {
228!
1144
      return terrno;
×
1145
    }
1146

1147
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
228✔
1148
    dataInfo.srcOpType = pBasicParam->srcOpType;
228✔
1149
    dataInfo.tableSeq = pBasicParam->tableSeq;
228✔
1150

1151
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
228✔
1152
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
228✔
1153
    if (!tmp) {
228!
1154
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1155
      return terrno;
×
1156
    }
1157
  } else {
1158
    if (pIdx->inUseIdx < 0) {
1,303✔
1159
      SSourceDataInfo dataInfo = {0};
1,279✔
1160
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,279✔
1161
      dataInfo.taskId = pExchangeInfo->pTaskId;
1,279✔
1162
      dataInfo.index = pIdx->srcIdx;
1,279✔
1163
      if (pBasicParam->isVtbRefScan) {
1,279!
1164
        dataInfo.window = pBasicParam->window;
×
1165
        dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1166
        dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1167
        tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1168
        dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1169
      }
1170

1171
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,279✔
1172
      if (dataInfo.pSrcUidList == NULL) {
1,279!
1173
        return terrno;
×
1174
      }
1175

1176
      dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
1,279✔
1177
      dataInfo.srcOpType = pBasicParam->srcOpType;
1,279✔
1178
      dataInfo.tableSeq = pBasicParam->tableSeq;
1,279✔
1179

1180
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
1,279✔
1181
      if (!tmp) {
1,279!
1182
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1183
        return terrno;
×
1184
      }
1185
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,279✔
1186
    } else {
1187
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
24✔
1188
      if (!pDataInfo) {
24!
1189
        return terrno;
×
1190
      }
1191
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
24!
1192
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
24✔
1193
      }
1194

1195
      if (pBasicParam->isVtbRefScan) {
24!
1196
        pDataInfo->window = pBasicParam->window;
×
1197
        if (!pDataInfo->colMap) {
×
1198
          pDataInfo->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1199
        }
1200
        pDataInfo->colMap->vgId = pBasicParam->colMap->vgId;
×
1201
        tstrncpy(pDataInfo->colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1202
        pDataInfo->colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1203
      }
1204

1205
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
24✔
1206
      if (pDataInfo->pSrcUidList == NULL) {
24!
1207
        return terrno;
×
1208
      }
1209

1210
      pDataInfo->isVtbRefScan = pBasicParam->isVtbRefScan;
24✔
1211

1212
      pDataInfo->srcOpType = pBasicParam->srcOpType;
24✔
1213
      pDataInfo->tableSeq = pBasicParam->tableSeq;
24✔
1214
    }
1215
  }
1216

1217
  return TSDB_CODE_SUCCESS;
1,531✔
1218
}
1219

1220
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
1,038✔
1221
  SExchangeInfo*               pExchangeInfo = pOperator->info;
1,038✔
1222
  int32_t                      code = TSDB_CODE_SUCCESS;
1,038✔
1223
  SExchangeOperatorBasicParam* pBasicParam = NULL;
1,038✔
1224
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
1,038✔
1225
  if (pParam->multiParams) {
1,038✔
1226
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
762✔
1227
    int32_t                      iter = 0;
762✔
1228
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
2,017✔
1229
      code = addSingleExchangeSource(pOperator, pBasicParam);
1,255✔
1230
      if (code) {
1,255!
1231
        return code;
×
1232
      }
1233
    }
1234
  } else {
1235
    pBasicParam = &pParam->basic;
276✔
1236
    code = addSingleExchangeSource(pOperator, pBasicParam);
276✔
1237
  }
1238

1239
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
1,038✔
1240
  pOperator->pOperatorGetParam = NULL;
1,038✔
1241

1242
  return code;
1,038✔
1243
}
1244

1245
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
21,436,014✔
1246
  SExchangeInfo* pExchangeInfo = pOperator->info;
21,436,014✔
1247
  int32_t        code = TSDB_CODE_SUCCESS;
21,436,014✔
1248
  int32_t        lino = 0;
21,436,014✔
1249
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
21,436,014✔
1250
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
3,678,641✔
1251
    return TSDB_CODE_SUCCESS;
17,760,802✔
1252
  }
1253

1254
  if (pExchangeInfo->dynamicOp) {
3,675,212✔
1255
    code = addDynamicExchangeSource(pOperator);
1,038✔
1256
    QUERY_CHECK_CODE(code, lino, _end);
1,038!
1257
  }
1258

1259
  if (pOperator->status == OP_NOT_OPENED && pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
3,675,212!
1260
    pExchangeInfo->current = 0;
228✔
1261
  }
1262

1263
  int64_t st = taosGetTimestampUs();
3,675,858✔
1264

1265
  if (!pExchangeInfo->seqLoadData) {
3,675,858✔
1266
    code = prepareConcurrentlyLoad(pOperator);
3,674,771✔
1267
    QUERY_CHECK_CODE(code, lino, _end);
3,674,830!
1268
    pExchangeInfo->openedTs = taosGetTimestampUs();
3,674,871✔
1269
  }
1270

1271
  OPTR_SET_OPENED(pOperator);
3,675,958✔
1272
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3,675,932✔
1273

1274
_end:
3,675,932✔
1275
  if (code != TSDB_CODE_SUCCESS) {
3,675,932!
1276
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1277
    pOperator->pTaskInfo->code = code;
×
1278
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1279
  }
1280
  return TSDB_CODE_SUCCESS;
3,675,932✔
1281
}
1282

1283
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
24,530✔
1284
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
24,530✔
1285

1286
  if (pLimitInfo->remainGroupOffset > 0) {
24,530!
1287
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1288
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1289
      blockDataCleanup(pBlock);
×
1290
      return PROJECT_RETRIEVE_CONTINUE;
×
1291
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1292
      // now it is the data from a new group
1293
      pLimitInfo->remainGroupOffset -= 1;
×
1294

1295
      // ignore data block in current group
1296
      if (pLimitInfo->remainGroupOffset > 0) {
×
1297
        blockDataCleanup(pBlock);
×
1298
        return PROJECT_RETRIEVE_CONTINUE;
×
1299
      }
1300
    }
1301

1302
    // set current group id of the project operator
1303
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1304
  }
1305

1306
  // here check for a new group data, we need to handle the data of the previous group.
1307
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
24,530✔
1308
    pLimitInfo->numOfOutputGroups += 1;
916✔
1309
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
916!
1310
      pOperator->status = OP_EXEC_DONE;
×
1311
      blockDataCleanup(pBlock);
×
1312

1313
      return PROJECT_RETRIEVE_DONE;
×
1314
    }
1315

1316
    // reset the value for a new group data
1317
    resetLimitInfoForNextGroup(pLimitInfo);
916✔
1318
    // existing rows that belongs to previous group.
1319
    if (pBlock->info.rows > 0) {
916!
1320
      return PROJECT_RETRIEVE_DONE;
916✔
1321
    }
1322
  }
1323

1324
  // here we reach the start position, according to the limit/offset requirements.
1325

1326
  // set current group id
1327
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
23,614✔
1328

1329
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
23,614✔
1330
  if (pBlock->info.rows == 0) {
23,614✔
1331
    return PROJECT_RETRIEVE_CONTINUE;
9,542✔
1332
  } else {
1333
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
14,072!
1334
      setOperatorCompleted(pOperator);
×
1335
      return PROJECT_RETRIEVE_DONE;
×
1336
    }
1337
  }
1338

1339
  // todo optimize performance
1340
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1341
  // they may not belong to the same group the limit/offset value is not valid in this case.
1342
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
14,072!
1343
    return PROJECT_RETRIEVE_DONE;
14,072✔
1344
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1345
    return PROJECT_RETRIEVE_CONTINUE;
×
1346
  }
1347
}
1348

1349
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
5,670,860✔
1350
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
5,670,860✔
1351
  int32_t        code = TSDB_CODE_SUCCESS;
5,670,860✔
1352
  if (pTask->pWorkerCb) {
5,670,860!
1353
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
5,670,969✔
1354
    if (code != TSDB_CODE_SUCCESS) {
5,673,104!
1355
      pTask->code = code;
×
1356
      return pTask->code;
×
1357
    }
1358
  }
1359

1360
  code = tsem_wait(&pExchangeInfo->ready);
5,672,995✔
1361
  if (code != TSDB_CODE_SUCCESS) {
5,672,920!
1362
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1363
    pTask->code = code;
×
1364
    return pTask->code;
×
1365
  }
1366

1367
  if (pTask->pWorkerCb) {
5,673,007!
1368
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
5,673,021✔
1369
    if (code != TSDB_CODE_SUCCESS) {
5,673,103!
1370
      pTask->code = code;
×
1371
      return pTask->code;
×
1372
    }
1373
  }
1374
  return TSDB_CODE_SUCCESS;
5,673,089✔
1375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc