• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3522

07 Nov 2024 05:59AM UTC coverage: 58.216% (+1.3%) from 56.943%
#3522

push

travis-ci

web-flow
Merge pull request #28663 from taosdata/fix/3_liaohj

fix(stream): stop the underlying scan operations for stream

111884 of 248391 branches covered (45.04%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

1164 existing lines in 134 files now uncovered.

191720 of 273118 relevant lines covered (70.2%)

13088725.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.27
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
} SFetchRspHandleWrapper;
32

33
typedef struct SSourceDataInfo {
34
  int32_t            index;
35
  SRetrieveTableRsp* pRsp;
36
  uint64_t           totalRows;
37
  int64_t            startTime;
38
  int32_t            code;
39
  EX_SOURCE_STATUS   status;
40
  const char*        taskId;
41
  SArray*            pSrcUidList;
42
  int32_t            srcOpType;
43
  bool               tableSeq;
44
  char*              decompBuf;
45
  int32_t            decompBufSize;
46
} SSourceDataInfo;
47

48
static void destroyExchangeOperatorInfo(void* param);
49
static void freeBlock(void* pParam);
50
static void freeSourceDataInfo(void* param);
51
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
52

53
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
54
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
55
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
56
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
57
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
58
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
59
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
60
                                 bool holdDataInBuf);
61
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
62

63
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
64

65
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
4,595,865✔
66
                                           SExecTaskInfo* pTaskInfo) {
67
  int32_t code = 0;
4,595,865✔
68
  int32_t lino = 0;
4,595,865✔
69
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
4,595,865✔
70
  int32_t completed = 0;
4,595,814✔
71
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
4,595,814✔
72
  if (code != TSDB_CODE_SUCCESS) {
4,595,847!
73
    pTaskInfo->code = code;
×
74
    T_LONG_JMP(pTaskInfo->env, code);
×
75
  }
76
  if (completed == totalSources) {
4,595,847✔
77
    setAllSourcesCompleted(pOperator);
1,499,105✔
78
    return;
4,595,984✔
79
  }
80

81
  SSourceDataInfo* pDataInfo = NULL;
3,096,742✔
82

83
  while (1) {
199,445✔
84
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
3,296,187✔
85
    code = exchangeWait(pOperator, pExchangeInfo);
3,296,187✔
86

87
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
3,296,359!
88
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
4!
89
    }
90

91
    for (int32_t i = 0; i < totalSources; ++i) {
3,732,391✔
92
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
3,732,351✔
93
      QUERY_CHECK_NULL(pDataInfo, code, lino, _error, terrno);
3,732,293✔
94
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
3,732,291✔
95
        continue;
364,012✔
96
      }
97

98
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
3,368,279✔
99
        continue;
72,042✔
100
      }
101

102
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
3,296,237!
103
        code = pDataInfo->code;
×
104
        goto _error;
×
105
      }
106

107
      tmemory_barrier();
3,296,237✔
108
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
3,296,237✔
109
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
3,296,237✔
110
      QUERY_CHECK_NULL(pSource, code, lino, _error, terrno);
3,296,269!
111

112
      // todo
113
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
3,296,308✔
114
      if (pRsp->numOfRows == 0) {
3,296,308✔
115
        if (NULL != pDataInfo->pSrcUidList) {
1,618,706!
116
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
117
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
118
          if (code != TSDB_CODE_SUCCESS) {
×
119
            taosMemoryFreeClear(pDataInfo->pRsp);
×
120
            goto _error;
×
121
          }
122
        } else {
123
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,618,706✔
124
          qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64
1,618,706✔
125
                 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
126
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
127
                 pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
128
          taosMemoryFreeClear(pDataInfo->pRsp);
1,618,704✔
129
        }
130
        break;
1,618,731✔
131
      }
132

133
      code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
1,677,602✔
134
      if (code != TSDB_CODE_SUCCESS) {
1,677,607!
135
        goto _error;
×
136
      }
137

138
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
1,677,607✔
139
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
1,677,607✔
140
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
1,677,609✔
141

142
      if (pRsp->completed == 1) {
1,677,609✔
143
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,662,288✔
144
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
1,662,288✔
145
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
146
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu,
147
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
148
               pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
149
               totalSources);
150
      } else {
151
        qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%" PRId64
15,321✔
152
               ", totalRows:%" PRIu64 ", total:%.2f Kb",
153
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
154
               pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
155
      }
156

157
      taosMemoryFreeClear(pDataInfo->pRsp);
1,677,611✔
158

159
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) {
1,677,631✔
160
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
15,327✔
161
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
15,327✔
162
        if (code != TSDB_CODE_SUCCESS) {
15,318!
163
          taosMemoryFreeClear(pDataInfo->pRsp);
×
164
          goto _error;
×
165
        }
166
      }
167
      return;
3,096,867✔
168
    }  // end loop
169

170
    int32_t complete1 = 0;
1,618,771✔
171
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
1,618,771✔
172
    if (code != TSDB_CODE_SUCCESS) {
1,618,681!
173
      pTaskInfo->code = code;
×
174
      T_LONG_JMP(pTaskInfo->env, code);
×
175
    }
176
    if (complete1 == totalSources) {
1,618,690✔
177
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
1,419,245✔
178
      return;
1,419,245✔
179
    }
180
  }
181

182
_error:
×
183
  pTaskInfo->code = code;
×
184
}
185

186
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
10,454,733✔
187
  int32_t        code = TSDB_CODE_SUCCESS;
10,454,733✔
188
  SExchangeInfo* pExchangeInfo = pOperator->info;
10,454,733✔
189
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
10,454,733✔
190

191
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
10,454,733✔
192

193
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
10,454,680✔
194
  if (pOperator->status == OP_EXEC_DONE) {
10,454,680!
195
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
196
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
197
           pLoadInfo->totalElapsed / 1000.0);
198
    return NULL;
×
199
  }
200

201
  // we have buffered retrieved datablock, return it directly
202
  SSDataBlock* p = NULL;
10,454,680✔
203
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
10,454,680✔
204
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
5,858,286✔
205
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
5,858,272✔
206
  }
207

208
  if (p != NULL) {
10,454,735✔
209
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
5,858,274✔
210
    if (!tmp) {
5,858,268!
211
      code = terrno;
×
212
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
213
      pTaskInfo->code = code;
×
214
      T_LONG_JMP(pTaskInfo->env, code);
×
215
    }
216
    return p;
5,858,268✔
217
  } else {
218
    if (pExchangeInfo->seqLoadData) {
4,596,461✔
219
      code = seqLoadRemoteData(pOperator);
584✔
220
      if (code != TSDB_CODE_SUCCESS) {
584!
221
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
222
        pTaskInfo->code = code;
×
223
        T_LONG_JMP(pTaskInfo->env, code);
×
224
      }
225
    } else {
226
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
4,595,877✔
227
    }
228
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
4,596,461!
229
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
×
230
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
×
231
    }
232
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
4,596,461✔
233
      return NULL;
2,918,476✔
234
    } else {
235
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
1,677,943✔
236
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
1,677,997✔
237
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
1,677,989✔
238
      if (!tmp) {
1,677,977!
239
        code = terrno;
×
240
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
241
        pTaskInfo->code = code;
×
242
        T_LONG_JMP(pTaskInfo->env, code);
×
243
      }
244
      return p;
1,677,977✔
245
    }
246
  }
247
}
248

249
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
10,453,332✔
250
  int32_t        code = TSDB_CODE_SUCCESS;
10,453,332✔
251
  int32_t        lino = 0;
10,453,332✔
252
  SExchangeInfo* pExchangeInfo = pOperator->info;
10,453,332✔
253
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
10,453,332✔
254

255
  code = pOperator->fpSet._openFn(pOperator);
10,453,332✔
256
  QUERY_CHECK_CODE(code, lino, _end);
10,453,312!
257

258
  if (pOperator->status == OP_EXEC_DONE) {
10,453,312✔
259
    (*ppRes) = NULL;
12✔
260
    return code;
12✔
261
  }
262

263
  while (1) {
1,494✔
264
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
10,454,794✔
265
    if (pBlock == NULL) {
10,454,398✔
266
      (*ppRes) = NULL;
2,918,481✔
267
      return code;
2,918,481✔
268
    }
269

270
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
7,535,917✔
271
    QUERY_CHECK_CODE(code, lino, _end);
7,536,171!
272

273
    if (blockDataGetNumOfRows(pBlock) == 0) {
7,536,171✔
274
      continue;
2✔
275
    }
276

277
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
7,536,154✔
278
    if (hasLimitOffsetInfo(pLimitInfo)) {
7,536,154✔
279
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
3,864✔
280
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,857✔
281
        continue;
1,492✔
282
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,365!
283
        if (pBlock->info.rows == 0) {
2,365!
284
          setOperatorCompleted(pOperator);
×
285
          (*ppRes) = NULL;
×
286
          return code;
×
287
        } else {
288
          (*ppRes) = pBlock;
2,365✔
289
          return code;
2,365✔
290
        }
291
      }
292
    } else {
293
      (*ppRes) = pBlock;
7,532,281✔
294
      return code;
7,532,281✔
295
    }
296
  }
297

298
_end:
×
299
  if (code != TSDB_CODE_SUCCESS) {
×
300
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
301
    pTaskInfo->code = code;
×
302
    T_LONG_JMP(pTaskInfo->env, code);
×
303
  }
304
  (*ppRes) = NULL;
×
305
  return code;
×
306
}
307

308
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
2,928,459✔
309
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
2,928,459✔
310
  if (pInfo->pSourceDataInfo == NULL) {
2,928,354✔
311
    return terrno;
2✔
312
  }
313

314
  if (pInfo->dynamicOp) {
2,928,352✔
315
    return TSDB_CODE_SUCCESS;
4,012✔
316
  }
317

318
  int32_t len = strlen(id) + 1;
2,924,340✔
319
  pInfo->pTaskId = taosMemoryCalloc(1, len);
2,924,340✔
320
  if (!pInfo->pTaskId) {
2,924,455✔
321
    return terrno;
7✔
322
  }
323
  tstrncpy(pInfo->pTaskId, id, len);
2,924,448✔
324
  for (int32_t i = 0; i < numOfSources; ++i) {
6,205,694✔
325
    SSourceDataInfo dataInfo = {0};
3,281,249✔
326
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
3,281,249✔
327
    dataInfo.taskId = pInfo->pTaskId;
3,281,249✔
328
    dataInfo.index = i;
3,281,249✔
329
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
3,281,249✔
330
    if (pDs == NULL) {
3,281,246!
331
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
332
      return terrno;
×
333
    }
334
  }
335

336
  return TSDB_CODE_SUCCESS;
2,924,445✔
337
}
338

339
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
2,928,405✔
340
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
2,928,405!
341

342
  if (numOfSources == 0) {
2,928,405!
343
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
344
    return TSDB_CODE_INVALID_PARA;
×
345
  }
346
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
2,928,405✔
347
  if (!pInfo->pFetchRpcHandles) {
2,928,460!
348
    return terrno;
×
349
  }
350
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
2,928,460✔
351
  if (!ret) {
2,928,428!
352
    return terrno;
×
353
  }
354

355
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
2,928,428✔
356
  if (pInfo->pSources == NULL) {
2,928,289!
357
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
358
    return terrno;
×
359
  }
360

361
  if (pExNode->node.dynamicOp) {
2,928,289✔
362
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
4,012✔
363
    if (NULL == pInfo->pHashSources) {
4,020!
364
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
365
      return TSDB_CODE_OUT_OF_MEMORY;
×
366
    }
367
  }
368

369
  for (int32_t i = 0; i < numOfSources; ++i) {
6,217,734✔
370
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
3,289,300✔
371
    if (!pNode) {
3,289,327!
372
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
373
      return TSDB_CODE_OUT_OF_MEMORY;
×
374
    }
375
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
3,289,327✔
376
    if (!tmp) {
3,289,431!
377
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
378
      return terrno;
×
379
    }
380
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
3,289,431✔
381
    int32_t           code =
382
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
3,289,431✔
383
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
3,289,437!
384
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
385
      return code;
×
386
    }
387
  }
388

389
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
2,928,434✔
390
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
2,928,418✔
391
  if (refId < 0) {
2,928,472!
392
    int32_t code = terrno;
×
393
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
394
    return code;
×
395
  } else {
396
    pInfo->self = refId;
2,928,472✔
397
  }
398

399
  return initDataSource(numOfSources, pInfo, id);
2,928,472✔
400
}
401

402
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
2,928,454✔
403
                                   SOperatorInfo** pOptrInfo) {
404
  QRY_PARAM_CHECK(pOptrInfo);
2,928,454!
405

406
  int32_t        code = 0;
2,928,454✔
407
  int32_t        lino = 0;
2,928,454✔
408
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
2,928,454✔
409
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2,928,386✔
410
  if (pInfo == NULL || pOperator == NULL) {
2,928,396!
411
    code = terrno;
1✔
412
    goto _error;
×
413
  }
414

415
  pInfo->dynamicOp = pExNode->node.dynamicOp;
2,928,395✔
416
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
2,928,395✔
417
  QUERY_CHECK_CODE(code, lino, _error);
2,928,427!
418

419
  code = tsem_init(&pInfo->ready, 0, 0);
2,928,427✔
420
  QUERY_CHECK_CODE(code, lino, _error);
2,928,413!
421

422
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
2,928,413✔
423
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
2,928,452!
424

425
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
2,928,452✔
426
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
2,928,406!
427
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
2,928,406✔
428
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
2,928,398!
429

430
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
2,928,398✔
431
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
2,928,398✔
432
  QUERY_CHECK_CODE(code, lino, _error);
2,928,479!
433

434
  pInfo->seqLoadData = pExNode->seqRecvData;
2,928,479✔
435
  pInfo->pTransporter = pTransporter;
2,928,479✔
436

437
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
2,928,479✔
438
                  pTaskInfo);
439
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
2,928,460✔
440

441
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
2,928,441✔
442
  QUERY_CHECK_CODE(code, lino, _error);
2,928,419!
443

444
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
2,928,419✔
445
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
446
  *pOptrInfo = pOperator;
2,928,451✔
447
  return TSDB_CODE_SUCCESS;
2,928,451✔
448

449
_error:
×
450
  if (code != TSDB_CODE_SUCCESS) {
×
451
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
452
    pTaskInfo->code = code;
×
453
  }
454
  if (pInfo != NULL) {
×
455
    doDestroyExchangeOperatorInfo(pInfo);
×
456
  }
457

458
  if (pOperator != NULL) {
×
459
    pOperator->info = NULL;
×
460
    destroyOperator(pOperator);
×
461
  }
462
  return code;
×
463
}
464

465
void destroyExchangeOperatorInfo(void* param) {
2,928,425✔
466
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
2,928,425✔
467
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
2,928,425✔
468
  if (code != TSDB_CODE_SUCCESS) {
2,928,474✔
469
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
124,898!
470
  }
471
}
2,928,474✔
472

473
void freeBlock(void* pParam) {
5,336,178✔
474
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
5,336,178✔
475
  blockDataDestroy(pBlock);
5,336,178✔
476
}
5,336,292✔
477

478
void freeSourceDataInfo(void* p) {
3,066,296✔
479
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
3,066,296✔
480
  taosMemoryFreeClear(pInfo->decompBuf);
3,066,296!
481
  taosMemoryFreeClear(pInfo->pRsp);
3,066,296✔
482

483
  pInfo->decompBufSize = 0;
3,066,296✔
484
}
3,066,296✔
485

486
void doDestroyExchangeOperatorInfo(void* param) {
2,803,537✔
487
  if (param == NULL) {
2,803,537!
488
    return;
×
489
  }
490
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
2,803,537✔
491
  if (pExInfo->pFetchRpcHandles) {
2,803,537!
492
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
5,869,974✔
493
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
3,066,508✔
494
      if (*pRpcHandle > 0) {
3,066,406✔
495
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
97✔
496
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
97✔
497
      }
498
    }
499
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
2,803,466✔
500
  }
501

502
  taosArrayDestroy(pExInfo->pSources);
2,803,538✔
503
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
2,803,556✔
504

505
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
2,803,558✔
506
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
2,803,565✔
507

508
  blockDataDestroy(pExInfo->pDummyBlock);
2,803,572✔
509
  tSimpleHashCleanup(pExInfo->pHashSources);
2,803,563✔
510

511
  int32_t code = tsem_destroy(&pExInfo->ready);
2,803,551✔
512
  if (code != TSDB_CODE_SUCCESS) {
2,803,522!
513
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
514
  }
515
  taosMemoryFreeClear(pExInfo->pTaskId);
2,803,522✔
516

517
  taosMemoryFreeClear(param);
2,803,576✔
518
}
519

520
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
3,295,570✔
521
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
3,295,570✔
522

523
  taosMemoryFreeClear(pMsg->pEpSet);
3,295,570✔
524
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
3,295,570✔
525
  if (pExchangeInfo == NULL) {
3,296,812!
526
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
×
527
    taosMemoryFree(pMsg->pData);
×
528
    return TSDB_CODE_SUCCESS;
×
529
  }
530

531
  int32_t          index = pWrapper->sourceIndex;
3,296,812✔
532
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
3,296,812✔
533

534
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
3,293,896✔
535
  if (pRpcHandle != NULL) {
3,290,919!
536
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
3,293,693✔
537
    if (ret != 0) {
3,297,020✔
538
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
21,779✔
539
    }
540
    *pRpcHandle = -1;
3,296,836✔
541
  }
542

543
  if (!pSourceDataInfo) {
3,294,062!
544
    return terrno;
×
545
  }
546

547
  if (code == TSDB_CODE_SUCCESS) {
3,294,062!
548
    pSourceDataInfo->pRsp = pMsg->pData;
3,294,473✔
549

550
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
3,294,473✔
551
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
3,294,473✔
552
    pRsp->compLen = htonl(pRsp->compLen);
3,292,575✔
553
    pRsp->payloadLen = htonl(pRsp->payloadLen);
3,292,575✔
554
    pRsp->numOfCols = htonl(pRsp->numOfCols);
3,292,575✔
555
    pRsp->useconds = htobe64(pRsp->useconds);
3,292,575✔
556
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
3,295,274✔
557

558
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
3,295,274✔
559
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
560
  } else {
UNCOV
561
    taosMemoryFree(pMsg->pData);
×
562
    pSourceDataInfo->code = rpcCvtErrCode(code);
35✔
563
    if (pSourceDataInfo->code != code) {
35!
564
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
565
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
566
    } else {
567
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
35!
568
             pExchangeInfo);
569
    }
570
  }
571

572
  tmemory_barrier();
3,295,312✔
573
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
3,295,312✔
574
  code = tsem_post(&pExchangeInfo->ready);
3,295,312✔
575
  if (code != TSDB_CODE_SUCCESS) {
3,296,347!
576
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
577
    return code;
×
578
  }
579

580
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
3,296,347✔
581
  if (code != TSDB_CODE_SUCCESS) {
3,296,954!
582
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
583
  }
584
  return code;
3,296,947✔
585
}
586

587
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
682✔
588
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
682✔
589
  if (NULL == *ppRes) {
682!
590
    return terrno;
×
591
  }
592

593
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
682✔
594
  if (NULL == pScan) {
682!
595
    taosMemoryFreeClear(*ppRes);
×
596
    return terrno;
×
597
  }
598

599
  pScan->pUidList = taosArrayDup(pUidList, NULL);
682✔
600
  if (NULL == pScan->pUidList) {
682!
601
    taosMemoryFree(pScan);
×
602
    taosMemoryFreeClear(*ppRes);
×
603
    return terrno;
×
604
  }
605
  pScan->tableSeq = tableSeq;
682✔
606

607
  (*ppRes)->opType = srcOpType;
682✔
608
  (*ppRes)->downstreamIdx = 0;
682✔
609
  (*ppRes)->value = pScan;
682✔
610
  (*ppRes)->pChildren = NULL;
682✔
611

612
  return TSDB_CODE_SUCCESS;
682✔
613
}
614

615
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
3,297,247✔
616
  int32_t          code = TSDB_CODE_SUCCESS;
3,297,247✔
617
  int32_t          lino = 0;
3,297,247✔
618
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
3,297,247✔
619
  if (!pDataInfo) {
3,297,190!
620
    return terrno;
×
621
  }
622

623
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
3,297,192!
624
    return TSDB_CODE_SUCCESS;
×
625
  }
626

627
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
3,297,192✔
628
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
3,297,192✔
629
  if (!pSource) {
3,297,190!
630
    return terrno;
×
631
  }
632

633
  pDataInfo->startTime = taosGetTimestampUs();
3,297,205✔
634
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
3,297,205✔
635

636
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
3,297,207✔
637
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
3,297,307!
638
  pWrapper->exchangeId = pExchangeInfo->self;
3,297,307✔
639
  pWrapper->sourceIndex = sourceIndex;
3,297,307✔
640

641
  if (pSource->localExec) {
3,297,307✔
642
    SDataBuf pBuf = {0};
21,682✔
643
    int32_t  code =
644
        (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
21,682✔
645
                                    pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes);
646
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
21,682✔
647
    QUERY_CHECK_CODE(code, lino, _end);
21,682!
648
    taosMemoryFree(pWrapper);
21,682✔
649
  } else {
650
    SResFetchReq req = {0};
3,275,625✔
651
    req.header.vgId = pSource->addr.nodeId;
3,275,625✔
652
    req.sId = pSource->schedId;
3,275,625✔
653
    req.taskId = pSource->taskId;
3,275,625✔
654
    req.queryId = pTaskInfo->id.queryId;
3,275,625✔
655
    req.execId = pSource->execId;
3,275,625✔
656
    if (pDataInfo->pSrcUidList) {
3,275,625✔
657
      int32_t code =
658
          buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
680✔
659
      taosArrayDestroy(pDataInfo->pSrcUidList);
680✔
660
      pDataInfo->pSrcUidList = NULL;
680✔
661
      if (TSDB_CODE_SUCCESS != code) {
680!
662
        pTaskInfo->code = code;
×
663
        taosMemoryFree(pWrapper);
×
664
        return pTaskInfo->code;
×
665
      }
666
    }
667

668
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
3,275,625✔
669
    if (msgSize < 0) {
3,275,560!
670
      pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
671
      taosMemoryFree(pWrapper);
×
672
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
673
      return pTaskInfo->code;
×
674
    }
675

676
    void* msg = taosMemoryCalloc(1, msgSize);
3,275,560✔
677
    if (NULL == msg) {
3,275,616!
678
      pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
679
      taosMemoryFree(pWrapper);
×
680
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
681
      return pTaskInfo->code;
×
682
    }
683

684
    if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
3,275,616!
685
      pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
686
      taosMemoryFree(pWrapper);
×
687
      taosMemoryFree(msg);
×
688
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
689
      return pTaskInfo->code;
×
690
    }
691

692
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
3,275,561✔
693

694
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %p, %d/%" PRIzu,
3,275,525✔
695
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId,
696
           pSource->execId, pExchangeInfo, sourceIndex, totalSources);
697

698
    // send the fetch remote task result reques
699
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,275,529✔
700
    if (NULL == pMsgSendInfo) {
3,275,568!
701
      taosMemoryFreeClear(msg);
×
702
      taosMemoryFree(pWrapper);
×
703
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
704
      pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
705
      return pTaskInfo->code;
×
706
    }
707

708
    pMsgSendInfo->param = pWrapper;
3,275,568✔
709
    pMsgSendInfo->paramFreeFp = taosMemoryFree;
3,275,568✔
710
    pMsgSendInfo->msgInfo.pData = msg;
3,275,568✔
711
    pMsgSendInfo->msgInfo.len = msgSize;
3,275,568✔
712
    pMsgSendInfo->msgType = pSource->fetchMsgType;
3,275,568✔
713
    pMsgSendInfo->fp = loadRemoteDataCallback;
3,275,568✔
714

715
    int64_t transporterId = 0;
3,275,568✔
716
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
3,275,568✔
717
    QUERY_CHECK_CODE(code, lino, _end);
3,275,627!
718
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
3,275,627✔
719
    *pRpcHandle = transporterId;
3,275,589✔
720
  }
721

722
_end:
3,297,271✔
723
  if (code != TSDB_CODE_SUCCESS) {
3,297,271!
724
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
725
  }
726
  return code;
3,297,277✔
727
}
728

729
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
1,684,131✔
730
                          SOperatorInfo* pOperator) {
731
  pInfo->totalRows += numOfRows;
1,684,131✔
732
  pInfo->totalSize += dataLen;
1,684,131✔
733
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
1,684,138✔
734
  pOperator->resultInfo.totalRows += numOfRows;
1,684,138✔
735
}
1,684,138✔
736

737
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
7,571,411✔
738
  int32_t      code = TSDB_CODE_SUCCESS;
7,571,411✔
739
  int32_t      lino = 0;
7,571,411✔
740
  SSDataBlock* pBlock = NULL;
7,571,411✔
741
  if (pColList == NULL) {  // data from other sources
7,571,411✔
742
    blockDataCleanup(pRes);
7,565,300✔
743
    code = blockDecode(pRes, pData, (const char**)pNextStart);
7,565,297✔
744
    if (code) {
7,565,202!
745
      return code;
×
746
    }
747
  } else {  // extract data according to pColList
748
    char* pStart = pData;
6,111✔
749

750
    int32_t numOfCols = htonl(*(int32_t*)pStart);
6,111✔
751
    pStart += sizeof(int32_t);
6,111✔
752

753
    // todo refactor:extract method
754
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
6,111✔
755
    for (int32_t i = 0; i < numOfCols; ++i) {
106,036✔
756
      SSysTableSchema* p = (SSysTableSchema*)pStart;
99,925✔
757

758
      p->colId = htons(p->colId);
99,925✔
759
      p->bytes = htonl(p->bytes);
99,925✔
760
      pStart += sizeof(SSysTableSchema);
99,925✔
761
    }
762

763
    pBlock = NULL;
6,111✔
764
    code = createDataBlock(&pBlock);
6,111✔
765
    QUERY_CHECK_CODE(code, lino, _end);
6,117!
766

767
    for (int32_t i = 0; i < numOfCols; ++i) {
106,042✔
768
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
99,925✔
769
      code = blockDataAppendColInfo(pBlock, &idata);
99,925✔
770
      QUERY_CHECK_CODE(code, lino, _end);
99,925!
771
    }
772

773
    const char* pDummy = NULL;
6,117✔
774
    code = blockDecode(pBlock, pStart, &pDummy);
6,117✔
775
    QUERY_CHECK_CODE(code, lino, _end);
6,117!
776

777
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
6,117✔
778
    QUERY_CHECK_CODE(code, lino, _end);
6,117!
779

780
    // data from mnode
781
    pRes->info.dataLoad = 1;
6,117✔
782
    pRes->info.rows = pBlock->info.rows;
6,117✔
783
    pRes->info.scanFlag = MAIN_SCAN;
6,117✔
784
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
6,117✔
785
    QUERY_CHECK_CODE(code, lino, _end);
6,117!
786

787
    blockDataDestroy(pBlock);
6,117✔
788
    pBlock = NULL;
6,117✔
789
  }
790

791
_end:
7,571,319✔
792
  if (code != TSDB_CODE_SUCCESS) {
7,571,319!
793
    blockDataDestroy(pBlock);
×
794
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
795
  }
796
  return code;
7,571,313✔
797
}
798

799
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
1,499,279✔
800
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,499,279✔
801
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,499,279✔
802

803
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,499,279✔
804
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,499,279✔
805
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
1,499,280✔
806
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
807
         pLoadInfo->totalElapsed / 1000.0);
808

809
  setOperatorCompleted(pOperator);
1,499,281✔
810
}
1,499,290✔
811

812
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
6,214,455✔
813
  int32_t code = TSDB_CODE_SUCCESS;
6,214,455✔
814
  int32_t lino = 0;
6,214,455✔
815
  size_t  total = taosArrayGetSize(pArray);
6,214,455✔
816

817
  int32_t completed = 0;
6,214,319✔
818
  for (int32_t k = 0; k < total; ++k) {
13,656,410✔
819
    SSourceDataInfo* p = taosArrayGet(pArray, k);
7,442,206✔
820
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
7,442,077!
821
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
7,442,091✔
822
      completed += 1;
3,709,306✔
823
    }
824
  }
825

826
  *pRes = completed;
6,214,204✔
827
_end:
6,214,204✔
828
  if (code != TSDB_CODE_SUCCESS) {
6,214,204!
829
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
830
  }
831
  return code;
6,214,149✔
832
}
833

834
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
2,924,472✔
835
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,924,472✔
836
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,924,472✔
837

838
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
2,924,472✔
839
  int64_t startTs = taosGetTimestampUs();
2,924,456✔
840

841
  // Asynchronously send all fetch requests to all sources.
842
  for (int32_t i = 0; i < totalSources; ++i) {
6,205,803✔
843
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
3,281,331✔
844
    if (code != TSDB_CODE_SUCCESS) {
3,281,347!
845
      pTaskInfo->code = code;
×
846
      return code;
×
847
    }
848
  }
849

850
  int64_t endTs = taosGetTimestampUs();
2,924,482✔
851
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
2,924,482✔
852
         totalSources, (endTs - startTs) / 1000.0);
853

854
  pOperator->status = OP_RES_TO_RETURN;
2,924,482✔
855
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
2,924,476✔
856
  if (isTaskKilled(pTaskInfo)) {
2,924,476✔
857
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
7!
858
  }
859

860
  return TSDB_CODE_SUCCESS;
2,924,474✔
861
}
862

863
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
1,678,012✔
864
  int32_t            code = TSDB_CODE_SUCCESS;
1,678,012✔
865
  int32_t            lino = 0;
1,678,012✔
866
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
1,678,012✔
867
  SSDataBlock*       pb = NULL;
1,678,012✔
868

869
  char* pNextStart = pRetrieveRsp->data;
1,678,012✔
870
  char* pStart = pNextStart;
1,678,012✔
871

872
  int32_t index = 0;
1,678,012✔
873

874
  if (pRetrieveRsp->compressed) {  // decompress the data
1,678,012!
875
    if (pDataInfo->decompBuf == NULL) {
×
876
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
877
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
878
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
879
    } else {
880
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
881
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
882
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
883
        if (p != NULL) {
×
884
          pDataInfo->decompBuf = p;
×
885
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
886
        }
887
      }
888
    }
889
  }
890

891
  while (index++ < pRetrieveRsp->numOfBlocks) {
9,243,248✔
892
    pStart = pNextStart;
7,565,253✔
893

894
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
7,565,253✔
895
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
1,918,425✔
896
      blockDataCleanup(pb);
1,918,425✔
897
    } else {
898
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
5,646,817✔
899
      QUERY_CHECK_NULL(pb, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
5,646,882!
900
    }
901

902
    int32_t compLen = *(int32_t*)pStart;
7,565,289✔
903
    pStart += sizeof(int32_t);
7,565,289✔
904

905
    int32_t rawLen = *(int32_t*)pStart;
7,565,289✔
906
    pStart += sizeof(int32_t);
7,565,289✔
907
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
7,565,289!
908

909
    pNextStart = pStart + compLen;
7,565,289✔
910
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
7,565,289!
911
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
912
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
913
      pStart = pDataInfo->decompBuf;
×
914
    }
915

916
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
7,565,289✔
917
    if (code != 0) {
7,565,206!
918
      taosMemoryFreeClear(pDataInfo->pRsp);
×
919
      goto _end;
×
920
    }
921

922
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
7,565,206✔
923
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
7,565,240!
924
    pb = NULL;
7,565,240✔
925
  }
926

927
_end:
1,677,995✔
928
  if (code != TSDB_CODE_SUCCESS) {
1,677,995!
929
    blockDataDestroy(pb);
×
930
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
931
  }
932
  return code;
1,678,016✔
933
}
934

935
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
584✔
936
  SExchangeInfo* pExchangeInfo = pOperator->info;
584✔
937
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
584✔
938

939
  int32_t code = 0;
584✔
940
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
584✔
941
  int64_t startTs = taosGetTimestampUs();
584✔
942

943
  while (1) {
193✔
944
    if (pExchangeInfo->current >= totalSources) {
777✔
945
      setAllSourcesCompleted(pOperator);
173✔
946
      return TSDB_CODE_SUCCESS;
173✔
947
    }
948

949
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
604✔
950
    if (!pDataInfo) {
604!
951
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
952
      pTaskInfo->code = terrno;
×
953
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
954
    }
955
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
604✔
956

957
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
604✔
958
    if (code != TSDB_CODE_SUCCESS) {
604!
959
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
960
      pTaskInfo->code = code;
×
961
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
962
    }
963

964
    code = exchangeWait(pOperator, pExchangeInfo);
604✔
965
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
604!
966
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
967
    }
968

969
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
604✔
970
    if (!pSource) {
604!
971
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
972
      pTaskInfo->code = terrno;
×
973
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
974
    }
975

976
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
604!
977
      qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo),
×
978
             pSource->addr.nodeId, pSource->taskId, pSource->execId, tstrerror(pDataInfo->code));
979
      pOperator->pTaskInfo->code = pDataInfo->code;
×
980
      return pOperator->pTaskInfo->code;
×
981
    }
982

983
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
604✔
984
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
604✔
985

986
    if (pRsp->numOfRows == 0) {
604✔
987
      qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64
193✔
988
             ", totalRows:%" PRIu64 " try next",
989
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1,
990
             pDataInfo->totalRows, pLoadInfo->totalRows);
991

992
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
193✔
993
      pExchangeInfo->current += 1;
193✔
994
      taosMemoryFreeClear(pDataInfo->pRsp);
193!
995
      continue;
193✔
996
    }
997

998
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
411✔
999
    if (code != TSDB_CODE_SUCCESS) {
411!
1000
      goto _error;
×
1001
    }
1002

1003
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
411✔
1004
    if (pRsp->completed == 1) {
411✔
1005
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
240✔
1006
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
1007
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
1008
             pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
1009
             totalSources);
1010

1011
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
240✔
1012
      pExchangeInfo->current += 1;
240✔
1013
    } else {
1014
      qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", totalRows:%" PRIu64
171!
1015
             ", totalBytes:%" PRIu64,
1016
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
1017
             pLoadInfo->totalRows, pLoadInfo->totalSize);
1018
    }
1019

1020
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
411✔
1021
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
411✔
1022

1023
    taosMemoryFreeClear(pDataInfo->pRsp);
411!
1024
    return TSDB_CODE_SUCCESS;
411✔
1025
  }
1026

1027
_error:
×
1028
  pTaskInfo->code = code;
×
1029
  return code;
×
1030
}
1031

1032
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
680✔
1033
  SExchangeInfo*     pExchangeInfo = pOperator->info;
680✔
1034
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
680✔
1035
  if (NULL == pIdx) {
680!
1036
    qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1037
    return TSDB_CODE_INVALID_PARA;
×
1038
  }
1039

1040
  if (pIdx->inUseIdx < 0) {
680✔
1041
    SSourceDataInfo dataInfo = {0};
674✔
1042
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
674✔
1043
    dataInfo.taskId = pExchangeInfo->pTaskId;
674✔
1044
    dataInfo.index = pIdx->srcIdx;
674✔
1045
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
674✔
1046
    if (dataInfo.pSrcUidList == NULL) {
674!
1047
      return terrno;
×
1048
    }
1049

1050
    dataInfo.srcOpType = pBasicParam->srcOpType;
674✔
1051
    dataInfo.tableSeq = pBasicParam->tableSeq;
674✔
1052

1053
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
674✔
1054
    if (!tmp) {
674!
1055
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1056
      return terrno;
×
1057
    }
1058
    pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
674✔
1059
  } else {
1060
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
6✔
1061
    if (!pDataInfo) {
6!
1062
      return terrno;
×
1063
    }
1064
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
6!
1065
      pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
6✔
1066
    }
1067
    pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
6✔
1068
    if (pDataInfo->pSrcUidList == NULL) {
6!
1069
      return terrno;
×
1070
    }
1071

1072
    pDataInfo->srcOpType = pBasicParam->srcOpType;
6✔
1073
    pDataInfo->tableSeq = pBasicParam->tableSeq;
6✔
1074
  }
1075

1076
  return TSDB_CODE_SUCCESS;
680✔
1077
}
1078

1079
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
370✔
1080
  SExchangeInfo*               pExchangeInfo = pOperator->info;
370✔
1081
  int32_t                      code = TSDB_CODE_SUCCESS;
370✔
1082
  SExchangeOperatorBasicParam* pBasicParam = NULL;
370✔
1083
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
370✔
1084
  if (pParam->multiParams) {
370✔
1085
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
358✔
1086
    int32_t                      iter = 0;
358✔
1087
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
1,026✔
1088
      code = addSingleExchangeSource(pOperator, pBasicParam);
668✔
1089
      if (code) {
668!
1090
        return code;
×
1091
      }
1092
    }
1093
  } else {
1094
    pBasicParam = &pParam->basic;
12✔
1095
    code = addSingleExchangeSource(pOperator, pBasicParam);
12✔
1096
  }
1097

1098
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
370✔
1099
  pOperator->pOperatorGetParam = NULL;
370✔
1100

1101
  return TSDB_CODE_SUCCESS;
370✔
1102
}
1103

1104
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
13,040,440✔
1105
  SExchangeInfo* pExchangeInfo = pOperator->info;
13,040,440✔
1106
  int32_t        code = TSDB_CODE_SUCCESS;
13,040,440✔
1107
  int32_t        lino = 0;
13,040,440✔
1108
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
13,040,440✔
1109
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
2,925,813✔
1110
    return TSDB_CODE_SUCCESS;
10,115,957✔
1111
  }
1112

1113
  if (pExchangeInfo->dynamicOp) {
2,924,483✔
1114
    code = addDynamicExchangeSource(pOperator);
370✔
1115
    QUERY_CHECK_CODE(code, lino, _end);
370!
1116
  }
1117

1118
  int64_t st = taosGetTimestampUs();
2,924,663✔
1119

1120
  if (!pExchangeInfo->seqLoadData) {
2,924,663✔
1121
    code = prepareConcurrentlyLoad(pOperator);
2,924,480✔
1122
    QUERY_CHECK_CODE(code, lino, _end);
2,924,466!
1123
    pExchangeInfo->openedTs = taosGetTimestampUs();
2,924,470✔
1124
  }
1125

1126
  OPTR_SET_OPENED(pOperator);
2,924,653✔
1127
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2,924,659✔
1128

1129
_end:
2,924,659✔
1130
  if (code != TSDB_CODE_SUCCESS) {
2,924,659!
1131
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1132
    pOperator->pTaskInfo->code = code;
×
1133
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1134
  }
1135
  return TSDB_CODE_SUCCESS;
2,924,659✔
1136
}
1137

1138
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
3,857✔
1139
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,857✔
1140

1141
  if (pLimitInfo->remainGroupOffset > 0) {
3,857!
1142
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1143
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1144
      blockDataCleanup(pBlock);
×
1145
      return PROJECT_RETRIEVE_CONTINUE;
×
1146
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1147
      // now it is the data from a new group
1148
      pLimitInfo->remainGroupOffset -= 1;
×
1149

1150
      // ignore data block in current group
1151
      if (pLimitInfo->remainGroupOffset > 0) {
×
1152
        blockDataCleanup(pBlock);
×
1153
        return PROJECT_RETRIEVE_CONTINUE;
×
1154
      }
1155
    }
1156

1157
    // set current group id of the project operator
1158
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1159
  }
1160

1161
  // here check for a new group data, we need to handle the data of the previous group.
1162
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
3,857✔
1163
    pLimitInfo->numOfOutputGroups += 1;
223✔
1164
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
223!
1165
      pOperator->status = OP_EXEC_DONE;
×
1166
      blockDataCleanup(pBlock);
×
1167

1168
      return PROJECT_RETRIEVE_DONE;
×
1169
    }
1170

1171
    // reset the value for a new group data
1172
    resetLimitInfoForNextGroup(pLimitInfo);
223✔
1173
    // existing rows that belongs to previous group.
1174
    if (pBlock->info.rows > 0) {
223!
1175
      return PROJECT_RETRIEVE_DONE;
223✔
1176
    }
1177
  }
1178

1179
  // here we reach the start position, according to the limit/offset requirements.
1180

1181
  // set current group id
1182
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,634✔
1183

1184
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,634✔
1185
  if (pBlock->info.rows == 0) {
3,634✔
1186
    return PROJECT_RETRIEVE_CONTINUE;
1,492✔
1187
  } else {
1188
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
2,142!
1189
      setOperatorCompleted(pOperator);
×
1190
      return PROJECT_RETRIEVE_DONE;
×
1191
    }
1192
  }
1193

1194
  // todo optimize performance
1195
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1196
  // they may not belong to the same group the limit/offset value is not valid in this case.
1197
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
2,142!
1198
    return PROJECT_RETRIEVE_DONE;
2,142✔
1199
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1200
    return PROJECT_RETRIEVE_CONTINUE;
×
1201
  }
1202
}
1203

1204
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
3,296,765✔
1205
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
3,296,765✔
1206
  int32_t        code = TSDB_CODE_SUCCESS;
3,296,765✔
1207
  if (pTask->pWorkerCb) {
3,296,765!
1208
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
3,296,790✔
1209
    if (code != TSDB_CODE_SUCCESS) {
3,296,971!
1210
      pTask->code = code;
×
1211
      return pTask->code;
×
1212
    }
1213
  }
1214

1215
  code = tsem_wait(&pExchangeInfo->ready);
3,296,946✔
1216
  if (code != TSDB_CODE_SUCCESS) {
3,296,932!
1217
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1218
    pTask->code = code;
×
1219
    return pTask->code;
×
1220
  }
1221

1222
  if (pTask->pWorkerCb) {
3,296,935!
1223
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
3,296,935✔
1224
    if (code != TSDB_CODE_SUCCESS) {
3,296,972!
1225
      pTask->code = code;
×
1226
      return pTask->code;
×
1227
    }
1228
  }
1229
  return TSDB_CODE_SUCCESS;
3,296,972✔
1230
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc