• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3534

21 Nov 2024 07:36AM UTC coverage: 60.825% (+2.0%) from 58.848%
#3534

push

travis-ci

web-flow
Merge pull request #28810 from taosdata/ehn/add-sync-heartbeat-sent-time-to-log

ehn:add-sync-heartbeat-sent-time-to-log

120023 of 252376 branches covered (47.56%)

Branch coverage included in aggregate %.

43 of 47 new or added lines in 3 files covered. (91.49%)

2254 existing lines in 162 files now uncovered.

200876 of 275203 relevant lines covered (72.99%)

16110754.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.37
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
} SFetchRspHandleWrapper;
32

33
typedef struct SSourceDataInfo {
34
  int32_t            index;
35
  SRetrieveTableRsp* pRsp;
36
  uint64_t           totalRows;
37
  int64_t            startTime;
38
  int32_t            code;
39
  EX_SOURCE_STATUS   status;
40
  const char*        taskId;
41
  SArray*            pSrcUidList;
42
  int32_t            srcOpType;
43
  bool               tableSeq;
44
  char*              decompBuf;
45
  int32_t            decompBufSize;
46
} SSourceDataInfo;
47

48
static void destroyExchangeOperatorInfo(void* param);
49
static void freeBlock(void* pParam);
50
static void freeSourceDataInfo(void* param);
51
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
52

53
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
54
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
55
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
56
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
57
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
58
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
59
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
60
                                 bool holdDataInBuf);
61
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
62

63
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
64

65
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
6,048,111✔
66
                                           SExecTaskInfo* pTaskInfo) {
67
  int32_t code = 0;
6,048,111✔
68
  int32_t lino = 0;
6,048,111✔
69
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
6,048,111✔
70
  int32_t completed = 0;
6,048,087✔
71
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
6,048,087✔
72
  if (code != TSDB_CODE_SUCCESS) {
6,048,110!
73
    pTaskInfo->code = code;
×
74
    T_LONG_JMP(pTaskInfo->env, code);
×
75
  }
76
  if (completed == totalSources) {
6,048,110✔
77
    setAllSourcesCompleted(pOperator);
1,987,372✔
78
    return;
6,048,279✔
79
  }
80

81
  SSourceDataInfo* pDataInfo = NULL;
4,060,738✔
82

83
  while (1) {
277,002✔
84
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
4,337,740✔
85
    code = exchangeWait(pOperator, pExchangeInfo);
4,337,742✔
86

87
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
4,337,966!
UNCOV
88
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
89
    }
90

91
    for (int32_t i = 0; i < totalSources; ++i) {
5,244,648✔
92
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
5,244,589✔
93
      QUERY_CHECK_NULL(pDataInfo, code, lino, _error, terrno);
5,244,484✔
94
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
5,244,480✔
95
        continue;
736,947✔
96
      }
97

98
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
4,507,533✔
99
        continue;
169,761✔
100
      }
101

102
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
4,337,772✔
103
        code = pDataInfo->code;
2✔
104
        goto _error;
2✔
105
      }
106

107
      tmemory_barrier();
4,337,770✔
108
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
4,337,770✔
109
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
4,337,770✔
110
      QUERY_CHECK_NULL(pSource, code, lino, _error, terrno);
4,337,816!
111

112
      // todo
113
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
4,337,902✔
114
      if (pRsp->numOfRows == 0) {
4,337,902✔
115
        if (NULL != pDataInfo->pSrcUidList) {
1,883,130!
116
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
117
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
118
          if (code != TSDB_CODE_SUCCESS) {
×
119
            taosMemoryFreeClear(pDataInfo->pRsp);
×
120
            goto _error;
×
121
          }
122
        } else {
123
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,883,130✔
124
          qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
1,883,130✔
125
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
126
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
127
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
128
          taosMemoryFreeClear(pDataInfo->pRsp);
1,883,130!
129
        }
130
        break;
1,883,144✔
131
      }
132

133
      code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
2,454,772✔
134
      if (code != TSDB_CODE_SUCCESS) {
2,454,720!
UNCOV
135
        goto _error;
×
136
      }
137

138
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
2,454,720✔
139
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
2,454,720✔
140
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
2,454,746✔
141

142
      if (pRsp->completed == 1) {
2,454,746✔
143
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,404,168✔
144
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
2,404,168✔
145
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
146
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu,
147
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
148
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
149
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
150
      } else {
151
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
50,578✔
152
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
153
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
154
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
155
      }
156

157
      taosMemoryFreeClear(pDataInfo->pRsp);
2,454,751✔
158

159
      if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) {
2,454,810!
160
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
50,603✔
161
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
50,603✔
162
        if (code != TSDB_CODE_SUCCESS) {
50,575!
UNCOV
163
          taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
164
          goto _error;
×
165
        }
166
      }
167
      return;
4,060,891✔
168
    }  // end loop
169

170
    int32_t complete1 = 0;
1,883,203✔
171
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
1,883,203✔
172
    if (code != TSDB_CODE_SUCCESS) {
1,883,100!
173
      pTaskInfo->code = code;
×
174
      T_LONG_JMP(pTaskInfo->env, code);
×
175
    }
176
    if (complete1 == totalSources) {
1,883,110✔
177
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
1,606,108✔
178
      return;
1,606,109✔
179
    }
180
  }
181

182
_error:
2✔
183
  pTaskInfo->code = code;
2✔
184
}
185

186
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
11,147,978✔
187
  int32_t        code = TSDB_CODE_SUCCESS;
11,147,978✔
188
  SExchangeInfo* pExchangeInfo = pOperator->info;
11,147,978✔
189
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
11,147,978✔
190

191
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
11,147,978✔
192

193
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
11,147,938✔
194
  if (pOperator->status == OP_EXEC_DONE) {
11,147,938!
195
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
196
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
197
           pLoadInfo->totalElapsed / 1000.0);
198
    return NULL;
×
199
  }
200

201
  // we have buffered retrieved datablock, return it directly
202
  SSDataBlock* p = NULL;
11,147,938✔
203
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
11,147,938✔
204
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
5,097,796✔
205
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
5,097,727✔
206
  }
207

208
  if (p != NULL) {
11,147,996✔
209
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
5,097,736✔
210
    if (!tmp) {
5,097,703!
211
      code = terrno;
×
212
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
213
      pTaskInfo->code = code;
×
214
      T_LONG_JMP(pTaskInfo->env, code);
×
215
    }
216
    return p;
5,097,703✔
217
  } else {
218
    if (pExchangeInfo->seqLoadData) {
6,050,260✔
219
      code = seqLoadRemoteData(pOperator);
2,118✔
220
      if (code != TSDB_CODE_SUCCESS) {
2,118!
221
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
222
        pTaskInfo->code = code;
×
223
        T_LONG_JMP(pTaskInfo->env, code);
×
224
      }
225
    } else {
226
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
6,048,142✔
227
    }
228
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
6,050,249✔
229
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
2!
230
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
2!
231
    }
232
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
6,050,247✔
233
      return NULL;
3,594,248✔
234
    } else {
235
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
2,455,949✔
236
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
2,456,025✔
237
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
2,455,999✔
238
      if (!tmp) {
2,455,996!
239
        code = terrno;
×
240
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
241
        pTaskInfo->code = code;
×
242
        T_LONG_JMP(pTaskInfo->env, code);
×
243
      }
244
      return p;
2,455,996✔
245
    }
246
  }
247
}
248

249
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
11,139,899✔
250
  int32_t        code = TSDB_CODE_SUCCESS;
11,139,899✔
251
  int32_t        lino = 0;
11,139,899✔
252
  SExchangeInfo* pExchangeInfo = pOperator->info;
11,139,899✔
253
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
11,139,899✔
254

255
  code = pOperator->fpSet._openFn(pOperator);
11,139,899✔
256
  QUERY_CHECK_CODE(code, lino, _end);
11,140,027!
257

258
  if (pOperator->status == OP_EXEC_DONE) {
11,140,027✔
259
    (*ppRes) = NULL;
60✔
260
    return code;
60✔
261
  }
262

263
  while (1) {
8,105✔
264
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
11,148,072✔
265
    if (pBlock == NULL) {
11,147,411✔
266
      (*ppRes) = NULL;
3,594,241✔
267
      return code;
3,594,241✔
268
    }
269

270
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
7,553,170✔
271
    QUERY_CHECK_CODE(code, lino, _end);
7,553,501!
272

273
    if (blockDataGetNumOfRows(pBlock) == 0) {
7,553,501✔
274
      continue;
2✔
275
    }
276

277
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
7,553,477✔
278
    if (hasLimitOffsetInfo(pLimitInfo)) {
7,553,477✔
279
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
21,896✔
280
      if (status == PROJECT_RETRIEVE_CONTINUE) {
21,882✔
281
        continue;
8,103✔
282
      } else if (status == PROJECT_RETRIEVE_DONE) {
13,779!
283
        if (pBlock->info.rows == 0) {
13,779!
284
          setOperatorCompleted(pOperator);
×
285
          (*ppRes) = NULL;
×
286
          return code;
×
287
        } else {
288
          (*ppRes) = pBlock;
13,779✔
289
          return code;
13,779✔
290
        }
291
      }
292
    } else {
293
      (*ppRes) = pBlock;
7,531,569✔
294
      return code;
7,531,569✔
295
    }
296
  }
297

UNCOV
298
_end:
×
UNCOV
299
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
300
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
301
    pTaskInfo->code = code;
×
UNCOV
302
    T_LONG_JMP(pTaskInfo->env, code);
×
303
  }
304
  (*ppRes) = NULL;
×
305
  return code;
×
306
}
307

308
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
3,670,467✔
309
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
3,670,467✔
310
  if (pInfo->pSourceDataInfo == NULL) {
3,670,349✔
311
    return terrno;
3✔
312
  }
313

314
  if (pInfo->dynamicOp) {
3,670,346✔
315
    return TSDB_CODE_SUCCESS;
22,002✔
316
  }
317

318
  int32_t len = strlen(id) + 1;
3,648,344✔
319
  pInfo->pTaskId = taosMemoryCalloc(1, len);
3,648,344✔
320
  if (!pInfo->pTaskId) {
3,648,538✔
321
    return terrno;
80✔
322
  }
323
  tstrncpy(pInfo->pTaskId, id, len);
3,648,458✔
324
  for (int32_t i = 0; i < numOfSources; ++i) {
7,938,904✔
325
    SSourceDataInfo dataInfo = {0};
4,290,423✔
326
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
4,290,423✔
327
    dataInfo.taskId = pInfo->pTaskId;
4,290,423✔
328
    dataInfo.index = i;
4,290,423✔
329
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
4,290,423✔
330
    if (pDs == NULL) {
4,290,446!
331
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
332
      return terrno;
×
333
    }
334
  }
335

336
  return TSDB_CODE_SUCCESS;
3,648,481✔
337
}
338

339
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
3,670,425✔
340
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
3,670,425!
341

342
  if (numOfSources == 0) {
3,670,425!
343
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
344
    return TSDB_CODE_INVALID_PARA;
×
345
  }
346
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
3,670,425✔
347
  if (!pInfo->pFetchRpcHandles) {
3,670,532!
UNCOV
348
    return terrno;
×
349
  }
350
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
3,670,532✔
351
  if (!ret) {
3,670,449!
352
    return terrno;
×
353
  }
354

355
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
3,670,449✔
356
  if (pInfo->pSources == NULL) {
3,670,278!
UNCOV
357
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
UNCOV
358
    return terrno;
×
359
  }
360

361
  if (pExNode->node.dynamicOp) {
3,670,278✔
362
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
22,002✔
363
    if (NULL == pInfo->pHashSources) {
22,011!
364
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
365
      return TSDB_CODE_OUT_OF_MEMORY;
×
366
    }
367
  }
368

369
  for (int32_t i = 0; i < numOfSources; ++i) {
8,004,911✔
370
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
4,334,501✔
371
    if (!pNode) {
4,334,540!
372
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
373
      return TSDB_CODE_OUT_OF_MEMORY;
×
374
    }
375
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
4,334,540✔
376
    if (!tmp) {
4,334,663!
377
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
378
      return terrno;
×
379
    }
380
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
4,334,663✔
381
    int32_t           code =
382
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
4,334,663✔
383
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
4,334,624!
UNCOV
384
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
385
      return code;
×
386
    }
387
  }
388

389
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
3,670,410✔
390
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
3,670,373✔
391
  if (refId < 0) {
3,670,501!
UNCOV
392
    int32_t code = terrno;
×
UNCOV
393
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
394
    return code;
×
395
  } else {
396
    pInfo->self = refId;
3,670,501✔
397
  }
398

399
  return initDataSource(numOfSources, pInfo, id);
3,670,501✔
400
}
401

402
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
3,670,410✔
403
                                   SOperatorInfo** pOptrInfo) {
404
  QRY_PARAM_CHECK(pOptrInfo);
3,670,410!
405

406
  int32_t        code = 0;
3,670,410✔
407
  int32_t        lino = 0;
3,670,410✔
408
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
3,670,410✔
409
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,670,391✔
410
  if (pInfo == NULL || pOperator == NULL) {
3,670,416!
411
    code = terrno;
5✔
UNCOV
412
    goto _error;
×
413
  }
414

415
  pInfo->dynamicOp = pExNode->node.dynamicOp;
3,670,411✔
416
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
3,670,411✔
417
  QUERY_CHECK_CODE(code, lino, _error);
3,670,389!
418

419
  code = tsem_init(&pInfo->ready, 0, 0);
3,670,389✔
420
  QUERY_CHECK_CODE(code, lino, _error);
3,670,352!
421

422
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
3,670,352✔
423
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
3,670,472!
424

425
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
3,670,472✔
426
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
3,670,377!
427
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
3,670,377✔
428
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
3,670,378!
429

430
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
3,670,378✔
431
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
3,670,378✔
432
  QUERY_CHECK_CODE(code, lino, _error);
3,670,505!
433

434
  pInfo->seqLoadData = pExNode->seqRecvData;
3,670,505✔
435
  pInfo->pTransporter = pTransporter;
3,670,505✔
436

437
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
3,670,505✔
438
                  pTaskInfo);
439
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
3,670,455✔
440

441
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
3,670,445✔
442
  QUERY_CHECK_CODE(code, lino, _error);
3,670,437!
443

444
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
3,670,437✔
445
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
446
  *pOptrInfo = pOperator;
3,670,392✔
447
  return TSDB_CODE_SUCCESS;
3,670,392✔
448

UNCOV
449
_error:
×
UNCOV
450
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
451
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
452
    pTaskInfo->code = code;
×
453
  }
UNCOV
454
  if (pInfo != NULL) {
×
UNCOV
455
    doDestroyExchangeOperatorInfo(pInfo);
×
456
  }
457

UNCOV
458
  if (pOperator != NULL) {
×
UNCOV
459
    pOperator->info = NULL;
×
UNCOV
460
    destroyOperator(pOperator);
×
461
  }
UNCOV
462
  return code;
×
463
}
464

465
void destroyExchangeOperatorInfo(void* param) {
3,670,471✔
466
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,670,471✔
467
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
3,670,471✔
468
  if (code != TSDB_CODE_SUCCESS) {
3,670,546✔
469
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
104,314!
470
  }
471
}
3,670,546✔
472

473
void freeBlock(void* pParam) {
5,716,491✔
474
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
5,716,491✔
475
  blockDataDestroy(pBlock);
5,716,491✔
476
}
5,716,586✔
477

478
void freeSourceDataInfo(void* p) {
4,119,659✔
479
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
4,119,659✔
480
  taosMemoryFreeClear(pInfo->decompBuf);
4,119,659!
481
  taosMemoryFreeClear(pInfo->pRsp);
4,119,659✔
482

483
  pInfo->decompBufSize = 0;
4,119,659✔
484
}
4,119,659✔
485

486
void doDestroyExchangeOperatorInfo(void* param) {
3,566,186✔
487
  if (param == NULL) {
3,566,186!
488
    return;
×
489
  }
490
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,566,186✔
491
  if (pExInfo->pFetchRpcHandles) {
3,566,186!
492
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
7,721,597✔
493
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
4,155,510✔
494
      if (*pRpcHandle > 0) {
4,155,369✔
495
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
269✔
496
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
269✔
497
      }
498
    }
499
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
3,566,087✔
500
  }
501

502
  taosArrayDestroy(pExInfo->pSources);
3,566,190✔
503
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3,566,206✔
504

505
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
3,566,197✔
506
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
3,566,205✔
507

508
  blockDataDestroy(pExInfo->pDummyBlock);
3,566,217✔
509
  tSimpleHashCleanup(pExInfo->pHashSources);
3,566,219✔
510

511
  int32_t code = tsem_destroy(&pExInfo->ready);
3,566,210✔
512
  if (code != TSDB_CODE_SUCCESS) {
3,566,152!
513
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
514
  }
515
  taosMemoryFreeClear(pExInfo->pTaskId);
3,566,152✔
516

517
  taosMemoryFreeClear(param);
3,566,215✔
518
}
519

520
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
4,339,559✔
521
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
4,339,559✔
522

523
  taosMemoryFreeClear(pMsg->pEpSet);
4,339,559✔
524
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
4,339,559✔
525
  if (pExchangeInfo == NULL) {
4,341,118✔
526
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
19!
527
    taosMemoryFree(pMsg->pData);
19✔
528
    return TSDB_CODE_SUCCESS;
19✔
529
  }
530

531
  int32_t          index = pWrapper->sourceIndex;
4,341,099✔
532
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
4,341,099✔
533

534
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
4,336,826✔
535
  if (pRpcHandle != NULL) {
4,335,145!
536
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
4,337,277✔
537
    if (ret != 0) {
4,341,245✔
538
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
16,185✔
539
    }
540
    *pRpcHandle = -1;
4,339,881✔
541
  }
542

543
  if (!pSourceDataInfo) {
4,337,749!
544
    return terrno;
×
545
  }
546

547
  if (code == TSDB_CODE_SUCCESS) {
4,337,749!
548
    pSourceDataInfo->pRsp = pMsg->pData;
4,338,004✔
549

550
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
4,338,004✔
551
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
4,338,004✔
552
    pRsp->compLen = htonl(pRsp->compLen);
4,336,476✔
553
    pRsp->payloadLen = htonl(pRsp->payloadLen);
4,336,476✔
554
    pRsp->numOfCols = htonl(pRsp->numOfCols);
4,336,476✔
555
    pRsp->useconds = htobe64(pRsp->useconds);
4,336,476✔
556
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
4,339,318✔
557

558
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
4,339,318✔
559
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
560
  } else {
UNCOV
561
    taosMemoryFree(pMsg->pData);
×
562
    pSourceDataInfo->code = rpcCvtErrCode(code);
132✔
563
    if (pSourceDataInfo->code != code) {
132!
UNCOV
564
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
565
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
566
    } else {
567
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
132!
568
             pExchangeInfo);
569
    }
570
  }
571

572
  tmemory_barrier();
4,339,453✔
573
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
4,339,453✔
574
  code = tsem_post(&pExchangeInfo->ready);
4,339,453✔
575
  if (code != TSDB_CODE_SUCCESS) {
4,340,548!
576
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
577
    return code;
×
578
  }
579

580
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
4,340,548✔
581
  if (code != TSDB_CODE_SUCCESS) {
4,341,244!
582
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
583
  }
584
  return code;
4,341,256✔
585
}
586

587
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
1,048✔
588
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,048✔
589
  if (NULL == *ppRes) {
1,048!
590
    return terrno;
×
591
  }
592

593
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
1,048✔
594
  if (NULL == pScan) {
1,048!
595
    taosMemoryFreeClear(*ppRes);
×
596
    return terrno;
×
597
  }
598

599
  pScan->pUidList = taosArrayDup(pUidList, NULL);
1,048✔
600
  if (NULL == pScan->pUidList) {
1,048!
601
    taosMemoryFree(pScan);
×
602
    taosMemoryFreeClear(*ppRes);
×
603
    return terrno;
×
604
  }
605
  pScan->tableSeq = tableSeq;
1,048✔
606

607
  (*ppRes)->opType = srcOpType;
1,048✔
608
  (*ppRes)->downstreamIdx = 0;
1,048✔
609
  (*ppRes)->value = pScan;
1,048✔
610
  (*ppRes)->pChildren = NULL;
1,048✔
611

612
  return TSDB_CODE_SUCCESS;
1,048✔
613
}
614

615
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
4,341,578✔
616
  int32_t          code = TSDB_CODE_SUCCESS;
4,341,578✔
617
  int32_t          lino = 0;
4,341,578✔
618
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
4,341,578✔
619
  if (!pDataInfo) {
4,341,546!
UNCOV
620
    return terrno;
×
621
  }
622

623
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
4,341,610!
624
    return TSDB_CODE_SUCCESS;
×
625
  }
626

627
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
4,341,610✔
628
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
4,341,610✔
629
  if (!pSource) {
4,341,596!
630
    return terrno;
×
631
  }
632

633
  pDataInfo->startTime = taosGetTimestampUs();
4,341,621✔
634
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
4,341,621✔
635

636
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
4,341,649✔
637
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
4,341,745!
638
  pWrapper->exchangeId = pExchangeInfo->self;
4,341,745✔
639
  pWrapper->sourceIndex = sourceIndex;
4,341,745✔
640

641
  if (pSource->localExec) {
4,341,745✔
642
    SDataBuf pBuf = {0};
16,016✔
643
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
16,016✔
644
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
645
                                               pTaskInfo->localFetch.explainRes);
646
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
16,016✔
647
    QUERY_CHECK_CODE(code, lino, _end);
16,016!
648
    taosMemoryFree(pWrapper);
16,016✔
649
  } else {
650
    SResFetchReq req = {0};
4,325,729✔
651
    req.header.vgId = pSource->addr.nodeId;
4,325,729✔
652
    req.sId = pSource->schedId;
4,325,729✔
653
    req.clientId = pSource->clientId;
4,325,729✔
654
    req.taskId = pSource->taskId;
4,325,729✔
655
    req.queryId = pTaskInfo->id.queryId;
4,325,729✔
656
    req.execId = pSource->execId;
4,325,729✔
657
    if (pDataInfo->pSrcUidList) {
4,325,729✔
658
      int32_t code =
659
          buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
1,046✔
660
      taosArrayDestroy(pDataInfo->pSrcUidList);
1,046✔
661
      pDataInfo->pSrcUidList = NULL;
1,046✔
662
      if (TSDB_CODE_SUCCESS != code) {
1,046!
663
        pTaskInfo->code = code;
×
664
        taosMemoryFree(pWrapper);
×
UNCOV
665
        return pTaskInfo->code;
×
666
      }
667
    }
668

669
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
4,325,729✔
670
    if (msgSize < 0) {
4,325,608!
671
      pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
672
      taosMemoryFree(pWrapper);
×
673
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
674
      return pTaskInfo->code;
×
675
    }
676

677
    void* msg = taosMemoryCalloc(1, msgSize);
4,325,608✔
678
    if (NULL == msg) {
4,325,747!
UNCOV
679
      pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
680
      taosMemoryFree(pWrapper);
×
UNCOV
681
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
UNCOV
682
      return pTaskInfo->code;
×
683
    }
684

685
    if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
4,325,747!
UNCOV
686
      pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
687
      taosMemoryFree(pWrapper);
×
UNCOV
688
      taosMemoryFree(msg);
×
UNCOV
689
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
UNCOV
690
      return pTaskInfo->code;
×
691
    }
692

693
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
4,325,632✔
694

695
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
4,325,511✔
696
           ", execId:%d, %p, %d/%" PRIzu,
697
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
698
           pSource->taskId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
699

700
    // send the fetch remote task result reques
701
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
4,325,513✔
702
    if (NULL == pMsgSendInfo) {
4,325,678!
UNCOV
703
      taosMemoryFreeClear(msg);
×
UNCOV
704
      taosMemoryFree(pWrapper);
×
UNCOV
705
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
UNCOV
706
      pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
707
      return pTaskInfo->code;
×
708
    }
709

710
    pMsgSendInfo->param = pWrapper;
4,325,678✔
711
    pMsgSendInfo->paramFreeFp = taosMemoryFree;
4,325,678✔
712
    pMsgSendInfo->msgInfo.pData = msg;
4,325,678✔
713
    pMsgSendInfo->msgInfo.len = msgSize;
4,325,678✔
714
    pMsgSendInfo->msgType = pSource->fetchMsgType;
4,325,678✔
715
    pMsgSendInfo->fp = loadRemoteDataCallback;
4,325,678✔
716

717
    int64_t transporterId = 0;
4,325,678✔
718
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
4,325,678✔
719
    QUERY_CHECK_CODE(code, lino, _end);
4,325,768!
720
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
4,325,768✔
721
    *pRpcHandle = transporterId;
4,325,698✔
722
  }
723

724
_end:
4,341,714✔
725
  if (code != TSDB_CODE_SUCCESS) {
4,341,714!
UNCOV
726
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
727
  }
728
  return code;
4,341,726✔
729
}
730

731
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
2,472,582✔
732
                          SOperatorInfo* pOperator) {
733
  pInfo->totalRows += numOfRows;
2,472,582✔
734
  pInfo->totalSize += dataLen;
2,472,582✔
735
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
2,472,624✔
736
  pOperator->resultInfo.totalRows += numOfRows;
2,472,624✔
737
}
2,472,624✔
738

739
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
7,681,071✔
740
  int32_t      code = TSDB_CODE_SUCCESS;
7,681,071✔
741
  int32_t      lino = 0;
7,681,071✔
742
  SSDataBlock* pBlock = NULL;
7,681,071✔
743
  if (pColList == NULL) {  // data from other sources
7,681,071✔
744
    blockDataCleanup(pRes);
7,664,485✔
745
    code = blockDecode(pRes, pData, (const char**)pNextStart);
7,664,409✔
746
    if (code) {
7,663,680!
UNCOV
747
      return code;
×
748
    }
749
  } else {  // extract data according to pColList
750
    char* pStart = pData;
16,586✔
751

752
    int32_t numOfCols = htonl(*(int32_t*)pStart);
16,586✔
753
    pStart += sizeof(int32_t);
16,586✔
754

755
    // todo refactor:extract method
756
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
16,586✔
757
    for (int32_t i = 0; i < numOfCols; ++i) {
231,062✔
758
      SSysTableSchema* p = (SSysTableSchema*)pStart;
214,476✔
759

760
      p->colId = htons(p->colId);
214,476✔
761
      p->bytes = htonl(p->bytes);
214,476✔
762
      pStart += sizeof(SSysTableSchema);
214,476✔
763
    }
764

765
    pBlock = NULL;
16,586✔
766
    code = createDataBlock(&pBlock);
16,586✔
767
    QUERY_CHECK_CODE(code, lino, _end);
16,598!
768

769
    for (int32_t i = 0; i < numOfCols; ++i) {
231,074✔
770
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
214,476✔
771
      code = blockDataAppendColInfo(pBlock, &idata);
214,476✔
772
      QUERY_CHECK_CODE(code, lino, _end);
214,476!
773
    }
774

775
    const char* pDummy = NULL;
16,598✔
776
    code = blockDecode(pBlock, pStart, &pDummy);
16,598✔
777
    QUERY_CHECK_CODE(code, lino, _end);
16,598!
778

779
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
16,598✔
780
    QUERY_CHECK_CODE(code, lino, _end);
16,598!
781

782
    // data from mnode
783
    pRes->info.dataLoad = 1;
16,598✔
784
    pRes->info.rows = pBlock->info.rows;
16,598✔
785
    pRes->info.scanFlag = MAIN_SCAN;
16,598✔
786
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
16,598✔
787
    QUERY_CHECK_CODE(code, lino, _end);
16,598!
788

789
    blockDataDestroy(pBlock);
16,598✔
790
    pBlock = NULL;
16,598✔
791
  }
792

793
_end:
7,680,278✔
794
  if (code != TSDB_CODE_SUCCESS) {
7,680,278!
UNCOV
795
    blockDataDestroy(pBlock);
×
UNCOV
796
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
797
  }
798
  return code;
7,680,274✔
799
}
800

801
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
1,988,210✔
802
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,988,210✔
803
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,988,210✔
804

805
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,988,210✔
806
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,988,210✔
807
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
1,988,211✔
808
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
809
         pLoadInfo->totalElapsed / 1000.0);
810

811
  setOperatorCompleted(pOperator);
1,988,213✔
812
}
1,988,227✔
813

814
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
7,931,160✔
815
  int32_t code = TSDB_CODE_SUCCESS;
7,931,160✔
816
  int32_t lino = 0;
7,931,160✔
817
  size_t  total = taosArrayGetSize(pArray);
7,931,160✔
818

819
  int32_t completed = 0;
7,931,010✔
820
  for (int32_t k = 0; k < total; ++k) {
18,315,062✔
821
    SSourceDataInfo* p = taosArrayGet(pArray, k);
10,384,223✔
822
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
10,384,036!
823
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
10,384,052✔
824
      completed += 1;
5,134,815✔
825
    }
826
  }
827

828
  *pRes = completed;
7,930,839✔
829
_end:
7,930,839✔
830
  if (code != TSDB_CODE_SUCCESS) {
7,930,839!
831
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
832
  }
833
  return code;
7,930,757✔
834
}
835

836
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
3,647,664✔
837
  SExchangeInfo* pExchangeInfo = pOperator->info;
3,647,664✔
838
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,647,664✔
839

840
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
3,647,664✔
841
  int64_t startTs = taosGetTimestampUs();
3,647,755✔
842

843
  // Asynchronously send all fetch requests to all sources.
844
  for (int32_t i = 0; i < totalSources; ++i) {
7,936,800✔
845
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
4,289,037✔
846
    if (code != TSDB_CODE_SUCCESS) {
4,289,045!
UNCOV
847
      pTaskInfo->code = code;
×
UNCOV
848
      return code;
×
849
    }
850
  }
851

852
  int64_t endTs = taosGetTimestampUs();
3,647,780✔
853
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
3,647,780✔
854
         totalSources, (endTs - startTs) / 1000.0);
855

856
  pOperator->status = OP_RES_TO_RETURN;
3,647,781✔
857
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
3,647,781✔
858
  if (isTaskKilled(pTaskInfo)) {
3,647,781!
UNCOV
859
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
860
  }
861

862
  return TSDB_CODE_SUCCESS;
3,647,800✔
863
}
864

865
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
2,456,047✔
866
  int32_t            code = TSDB_CODE_SUCCESS;
2,456,047✔
867
  int32_t            lino = 0;
2,456,047✔
868
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
2,456,047✔
869
  SSDataBlock*       pb = NULL;
2,456,047✔
870

871
  char* pNextStart = pRetrieveRsp->data;
2,456,047✔
872
  char* pStart = pNextStart;
2,456,047✔
873

874
  int32_t index = 0;
2,456,047✔
875

876
  if (pRetrieveRsp->compressed) {  // decompress the data
2,456,047!
877
    if (pDataInfo->decompBuf == NULL) {
×
878
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
879
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
880
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
881
    } else {
882
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
883
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
884
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
885
        if (p != NULL) {
×
886
          pDataInfo->decompBuf = p;
×
887
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
888
        }
889
      }
890
    }
891
  }
892

893
  while (index++ < pRetrieveRsp->numOfBlocks) {
10,119,849✔
894
    pStart = pNextStart;
7,663,969✔
895

896
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
7,663,969✔
897
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
1,654,092✔
898
      blockDataCleanup(pb);
1,654,091✔
899
    } else {
900
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
6,009,901✔
901
      QUERY_CHECK_NULL(pb, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
6,010,460!
902
    }
903

904
    int32_t compLen = *(int32_t*)pStart;
7,664,499✔
905
    pStart += sizeof(int32_t);
7,664,499✔
906

907
    int32_t rawLen = *(int32_t*)pStart;
7,664,499✔
908
    pStart += sizeof(int32_t);
7,664,499✔
909
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
7,664,499!
910

911
    pNextStart = pStart + compLen;
7,664,499✔
912
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
7,664,499!
913
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
914
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
915
      pStart = pDataInfo->decompBuf;
×
916
    }
917

918
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
7,664,499✔
919
    if (code != 0) {
7,663,689!
UNCOV
920
      taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
921
      goto _end;
×
922
    }
923

924
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
7,663,689✔
925
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
7,663,814!
926
    pb = NULL;
7,663,814✔
927
  }
928

929
_end:
2,455,880✔
930
  if (code != TSDB_CODE_SUCCESS) {
2,455,880!
UNCOV
931
    blockDataDestroy(pb);
×
UNCOV
932
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
933
  }
934
  return code;
2,456,001✔
935
}
936

937
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
2,118✔
938
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,118✔
939
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,118✔
940

941
  int32_t code = 0;
2,118✔
942
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2,118✔
943
  int64_t startTs = taosGetTimestampUs();
2,118✔
944

945
  while (1) {
823✔
946
    if (pExchangeInfo->current >= totalSources) {
2,941✔
947
      setAllSourcesCompleted(pOperator);
840✔
948
      return TSDB_CODE_SUCCESS;
840✔
949
    }
950

951
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
2,101✔
952
    if (!pDataInfo) {
2,101!
953
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
954
      pTaskInfo->code = terrno;
×
955
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
956
    }
957
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,101✔
958

959
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
2,101✔
960
    if (code != TSDB_CODE_SUCCESS) {
2,101!
961
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
962
      pTaskInfo->code = code;
×
963
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
964
    }
965

966
    code = exchangeWait(pOperator, pExchangeInfo);
2,101✔
967
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
2,101!
968
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
969
    }
970

971
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
2,101✔
972
    if (!pSource) {
2,101!
973
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
974
      pTaskInfo->code = terrno;
×
975
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
976
    }
977

978
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
2,101!
979
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
980
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
981
             tstrerror(pDataInfo->code));
982
      pOperator->pTaskInfo->code = pDataInfo->code;
×
983
      return pOperator->pTaskInfo->code;
×
984
    }
985

986
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
2,101✔
987
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2,101✔
988

989
    if (pRsp->numOfRows == 0) {
2,101✔
990
      qDebug("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
823✔
991
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
992
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
993
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
994

995
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
823✔
996
      pExchangeInfo->current += 1;
823✔
997
      taosMemoryFreeClear(pDataInfo->pRsp);
823!
998
      continue;
823✔
999
    }
1000

1001
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
1,278✔
1002
    if (code != TSDB_CODE_SUCCESS) {
1,278!
1003
      goto _error;
×
1004
    }
1005

1006
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
1,278✔
1007
    if (pRsp->completed == 1) {
1,278✔
1008
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,111✔
1009
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
1010
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1011
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1012
             pExchangeInfo->current + 1, totalSources);
1013

1014
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,111✔
1015
      pExchangeInfo->current += 1;
1,111✔
1016
    } else {
1017
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
167!
1018
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1019
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1020
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1021
    }
1022

1023
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1,278✔
1024
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
1,278✔
1025

1026
    taosMemoryFreeClear(pDataInfo->pRsp);
1,278!
1027
    return TSDB_CODE_SUCCESS;
1,278✔
1028
  }
1029

1030
_error:
×
1031
  pTaskInfo->code = code;
×
1032
  return code;
×
1033
}
1034

1035
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
1,046✔
1036
  SExchangeInfo*     pExchangeInfo = pOperator->info;
1,046✔
1037
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
1,046✔
1038
  if (NULL == pIdx) {
1,046!
1039
    qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1040
    return TSDB_CODE_INVALID_PARA;
×
1041
  }
1042

1043
  if (pIdx->inUseIdx < 0) {
1,046✔
1044
    SSourceDataInfo dataInfo = {0};
1,022✔
1045
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,022✔
1046
    dataInfo.taskId = pExchangeInfo->pTaskId;
1,022✔
1047
    dataInfo.index = pIdx->srcIdx;
1,022✔
1048
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,022✔
1049
    if (dataInfo.pSrcUidList == NULL) {
1,022!
1050
      return terrno;
×
1051
    }
1052

1053
    dataInfo.srcOpType = pBasicParam->srcOpType;
1,022✔
1054
    dataInfo.tableSeq = pBasicParam->tableSeq;
1,022✔
1055

1056
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
1,022✔
1057
    if (!tmp) {
1,022!
1058
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
×
1059
      return terrno;
×
1060
    }
1061
    pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,022✔
1062
  } else {
1063
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
24✔
1064
    if (!pDataInfo) {
24!
1065
      return terrno;
×
1066
    }
1067
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
24!
1068
      pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
24✔
1069
    }
1070
    pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
24✔
1071
    if (pDataInfo->pSrcUidList == NULL) {
24!
1072
      return terrno;
×
1073
    }
1074

1075
    pDataInfo->srcOpType = pBasicParam->srcOpType;
24✔
1076
    pDataInfo->tableSeq = pBasicParam->tableSeq;
24✔
1077
  }
1078

1079
  return TSDB_CODE_SUCCESS;
1,046✔
1080
}
1081

1082
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
646✔
1083
  SExchangeInfo*               pExchangeInfo = pOperator->info;
646✔
1084
  int32_t                      code = TSDB_CODE_SUCCESS;
646✔
1085
  SExchangeOperatorBasicParam* pBasicParam = NULL;
646✔
1086
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
646✔
1087
  if (pParam->multiParams) {
646✔
1088
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
598✔
1089
    int32_t                      iter = 0;
598✔
1090
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
1,596✔
1091
      code = addSingleExchangeSource(pOperator, pBasicParam);
998✔
1092
      if (code) {
998!
1093
        return code;
×
1094
      }
1095
    }
1096
  } else {
1097
    pBasicParam = &pParam->basic;
48✔
1098
    code = addSingleExchangeSource(pOperator, pBasicParam);
48✔
1099
  }
1100

1101
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
646✔
1102
  pOperator->pOperatorGetParam = NULL;
646✔
1103

1104
  return TSDB_CODE_SUCCESS;
646✔
1105
}
1106

1107
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
14,004,864✔
1108
  SExchangeInfo* pExchangeInfo = pOperator->info;
14,004,864✔
1109
  int32_t        code = TSDB_CODE_SUCCESS;
14,004,864✔
1110
  int32_t        lino = 0;
14,004,864✔
1111
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
14,004,864✔
1112
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
3,651,372✔
1113
    return TSDB_CODE_SUCCESS;
10,356,654✔
1114
  }
1115

1116
  if (pExchangeInfo->dynamicOp) {
3,648,210✔
1117
    code = addDynamicExchangeSource(pOperator);
646✔
1118
    QUERY_CHECK_CODE(code, lino, _end);
646!
1119
  }
1120

1121
  int64_t st = taosGetTimestampUs();
3,648,616✔
1122

1123
  if (!pExchangeInfo->seqLoadData) {
3,648,616✔
1124
    code = prepareConcurrentlyLoad(pOperator);
3,647,773✔
1125
    QUERY_CHECK_CODE(code, lino, _end);
3,647,778!
1126
    pExchangeInfo->openedTs = taosGetTimestampUs();
3,647,791✔
1127
  }
1128

1129
  OPTR_SET_OPENED(pOperator);
3,648,634✔
1130
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3,648,649✔
1131

1132
_end:
3,648,649✔
1133
  if (code != TSDB_CODE_SUCCESS) {
3,648,649!
UNCOV
1134
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1135
    pOperator->pTaskInfo->code = code;
×
UNCOV
1136
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1137
  }
1138
  return TSDB_CODE_SUCCESS;
3,648,649✔
1139
}
1140

1141
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
21,882✔
1142
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
21,882✔
1143

1144
  if (pLimitInfo->remainGroupOffset > 0) {
21,882!
1145
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1146
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1147
      blockDataCleanup(pBlock);
×
1148
      return PROJECT_RETRIEVE_CONTINUE;
×
1149
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1150
      // now it is the data from a new group
1151
      pLimitInfo->remainGroupOffset -= 1;
×
1152

1153
      // ignore data block in current group
1154
      if (pLimitInfo->remainGroupOffset > 0) {
×
1155
        blockDataCleanup(pBlock);
×
1156
        return PROJECT_RETRIEVE_CONTINUE;
×
1157
      }
1158
    }
1159

1160
    // set current group id of the project operator
1161
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1162
  }
1163

1164
  // here check for a new group data, we need to handle the data of the previous group.
1165
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
21,882✔
1166
    pLimitInfo->numOfOutputGroups += 1;
847✔
1167
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
847!
1168
      pOperator->status = OP_EXEC_DONE;
×
1169
      blockDataCleanup(pBlock);
×
1170

1171
      return PROJECT_RETRIEVE_DONE;
×
1172
    }
1173

1174
    // reset the value for a new group data
1175
    resetLimitInfoForNextGroup(pLimitInfo);
847✔
1176
    // existing rows that belongs to previous group.
1177
    if (pBlock->info.rows > 0) {
847!
1178
      return PROJECT_RETRIEVE_DONE;
847✔
1179
    }
1180
  }
1181

1182
  // here we reach the start position, according to the limit/offset requirements.
1183

1184
  // set current group id
1185
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
21,035✔
1186

1187
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
21,035✔
1188
  if (pBlock->info.rows == 0) {
21,035✔
1189
    return PROJECT_RETRIEVE_CONTINUE;
8,103✔
1190
  } else {
1191
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
12,932!
1192
      setOperatorCompleted(pOperator);
×
1193
      return PROJECT_RETRIEVE_DONE;
×
1194
    }
1195
  }
1196

1197
  // todo optimize performance
1198
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1199
  // they may not belong to the same group the limit/offset value is not valid in this case.
1200
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
12,932!
1201
    return PROJECT_RETRIEVE_DONE;
12,932✔
1202
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1203
    return PROJECT_RETRIEVE_CONTINUE;
×
1204
  }
1205
}
1206

1207
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
4,339,794✔
1208
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
4,339,794✔
1209
  int32_t        code = TSDB_CODE_SUCCESS;
4,339,794✔
1210
  if (pTask->pWorkerCb) {
4,339,794!
1211
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
4,339,839✔
1212
    if (code != TSDB_CODE_SUCCESS) {
4,340,070!
1213
      pTask->code = code;
×
1214
      return pTask->code;
×
1215
    }
1216
  }
1217

1218
  code = tsem_wait(&pExchangeInfo->ready);
4,340,025✔
1219
  if (code != TSDB_CODE_SUCCESS) {
4,339,999!
1220
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1221
    pTask->code = code;
×
1222
    return pTask->code;
×
1223
  }
1224

1225
  if (pTask->pWorkerCb) {
4,340,033✔
1226
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
4,340,032✔
1227
    if (code != TSDB_CODE_SUCCESS) {
4,340,077!
1228
      pTask->code = code;
×
1229
      return pTask->code;
×
1230
    }
1231
  }
1232
  return TSDB_CODE_SUCCESS;
4,340,078✔
1233
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc