• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4696

29 Aug 2025 06:36AM UTC coverage: 58.25% (+0.2%) from 58.041%
#4696

push

travis-ci

web-flow
fix(gpt): fix race-condition in preparing tmp files (#32800)

133424 of 291873 branches covered (45.71%)

Branch coverage included in aggregate %.

5 of 34 new or added lines in 6 files covered. (14.71%)

444 existing lines in 69 files now uncovered.

201767 of 283561 relevant lines covered (71.15%)

17907122.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.56
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t            index;
36
  int64_t            seqId;
37
  SRWLatch           lock;
38
  SRetrieveTableRsp* pRsp;
39
  uint64_t           totalRows;
40
  int64_t            startTime;
41
  int32_t            code;
42
  EX_SOURCE_STATUS   status;
43
  const char*        taskId;
44
  SArray*            pSrcUidList;
45
  int32_t            srcOpType;
46
  bool               tableSeq;
47
  char*              decompBuf;
48
  int32_t            decompBufSize;
49
  SOrgTbInfo*        colMap;
50
  bool               isVtbRefScan;
51
  bool               isVtbTagScan;
52
  STimeWindow        window;
53
  bool               fetchSent; // need reset
54
} SSourceDataInfo;
55

56
static void destroyExchangeOperatorInfo(void* param);
57
static void freeBlock(void* pParam);
58
static void freeSourceDataInfo(void* param);
59
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
60

61
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
62
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
63
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
64
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
65
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
66
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
67
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
68
                                 bool holdDataInBuf);
69
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
70

71
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
72

73

74
static void streamConcurrentlyLoadRemoteData(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
33,682✔
75
                                           SExecTaskInfo* pTaskInfo) {
76
  int32_t code = 0;
33,682✔
77
  int32_t lino = 0;
67,364✔
78
  int64_t startTs = taosGetTimestampUs();  
33,683✔
79
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
33,683✔
80
  int32_t completed = 0;
33,684✔
81
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
33,684✔
82
  if (code != TSDB_CODE_SUCCESS) {
33,682!
83
    pTaskInfo->code = code;
×
84
    T_LONG_JMP(pTaskInfo->env, code);
×
85
  }
86
  if (completed == totalSources) {
33,682✔
87
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
9,309✔
88
    setAllSourcesCompleted(pOperator);
9,309✔
89
    return;
33,684✔
90
  }
91

92
  SSourceDataInfo* pDataInfo = NULL;
24,373✔
93

94
  while (1) {
11,731✔
95
    if (pExchangeInfo->current < 0) {
36,104!
96
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
×
97
      setAllSourcesCompleted(pOperator);
×
98
      return;
×
99
    }
100
    
101
    if (pExchangeInfo->current >= totalSources) {
36,104✔
102
      completed = 0;
15,065✔
103
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
15,065✔
104
      if (code != TSDB_CODE_SUCCESS) {
15,063!
105
        pTaskInfo->code = code;
×
106
        T_LONG_JMP(pTaskInfo->env, code);
×
107
      }
108
      if (completed == totalSources) {
15,063✔
109
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
11,731✔
110
        setAllSourcesCompleted(pOperator);
11,731✔
111
        return;
11,731✔
112
      }
113
      
114
      pExchangeInfo->current = 0;
3,332✔
115
    }
116

117
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
24,371✔
118

119
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
24,371✔
120
    if (!pDataInfo) {
24,371!
121
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
122
      pTaskInfo->code = terrno;
×
123
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
124
    }
125

126
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
24,371!
127
      pExchangeInfo->current++;
×
128
      continue;
×
129
    }
130

131
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
24,371✔
132

133
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
24,371✔
134
    if (code != TSDB_CODE_SUCCESS) {
24,375!
135
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
136
      pTaskInfo->code = code;
×
137
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
138
    }
139

140
    while (true) {
×
141
      code = exchangeWait(pOperator, pExchangeInfo);
24,375✔
142
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
24,376!
143
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
144
      }
145

146
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
24,376✔
147
      if (pDataInfo->seqId != currSeqId) {
24,376!
148
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
×
149
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
150
        taosMemoryFreeClear(pDataInfo->pRsp);
×
151
        continue;
×
152
      }
153

154
      break;
24,376✔
155
    }
156

157
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
24,376✔
158
    if (!pSource) {
24,376!
159
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
160
      pTaskInfo->code = terrno;
×
161
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
162
    }
163

164
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
24,376!
165
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
166
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
167
             tstrerror(pDataInfo->code));
168
      pTaskInfo->code = pDataInfo->code;
×
169
      T_LONG_JMP(pTaskInfo->env, code);
×
170
    }
171

172
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
24,376✔
173
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
24,376✔
174

175
    if (pRsp->numOfRows == 0) {
24,376✔
176
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
11,731✔
177
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
178
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
179
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
180

181
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
11,731✔
182
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
11,731!
183
        pExchangeInfo->current = -1;
×
184
      } else {
185
        pExchangeInfo->current += 1;
11,731✔
186
      }
187
      taosMemoryFreeClear(pDataInfo->pRsp);
11,731!
188
      continue;
11,731✔
189
    }
190

191
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
12,645✔
192
    if (code != TSDB_CODE_SUCCESS) {
12,643!
193
      goto _exit;
×
194
    }
195

196
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
12,643✔
197
    if (pRsp->completed == 1) {
12,643✔
198
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
9,308✔
199
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
200
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
201
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
202
             pExchangeInfo->current + 1, totalSources);
203

204
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
9,308✔
205
      if (pDataInfo->isVtbRefScan) {
9,308!
206
        pExchangeInfo->current = -1;
×
207
        taosMemoryFreeClear(pDataInfo->pRsp);
×
208
        continue;
×
209
      }
210
    } else {
211
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
3,335✔
212
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
213
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
214
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
215
    }
216

217
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
12,643✔
218
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
12,643✔
219

220
    pExchangeInfo->current++;
12,643✔
221

222
    taosMemoryFreeClear(pDataInfo->pRsp);
12,643!
223
    return;
12,644✔
224
  }
225

226
_exit:
×
227

228
  if (code) {
×
229
    pTaskInfo->code = code;
×
230
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
231
  }
232
}
233

234

235
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
6,864,016✔
236
                                           SExecTaskInfo* pTaskInfo) {
237
  int32_t code = 0;
6,864,016✔
238
  int32_t lino = 0;
6,864,016✔
239
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
6,864,016✔
240
  int32_t completed = 0;
6,862,542✔
241
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
6,862,542✔
242
  if (code != TSDB_CODE_SUCCESS) {
6,863,775!
243
    pTaskInfo->code = code;
×
244
    T_LONG_JMP(pTaskInfo->env, code);
×
245
  }
246
  if (completed == totalSources) {
6,863,775✔
247
    setAllSourcesCompleted(pOperator);
1,831,572✔
248
    return;
6,866,151✔
249
  }
250

251
  SSourceDataInfo* pDataInfo = NULL;
5,032,203✔
252

253
  while (1) {
249,385✔
254
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
5,281,588✔
255
    code = exchangeWait(pOperator, pExchangeInfo);
5,281,589✔
256

257
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
5,284,415!
258
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
259
    }
260

261
    for (int32_t i = 0; i < totalSources; ++i) {
31,168,693✔
262
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
31,136,146✔
263
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
31,106,447!
264
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
31,161,593✔
265
        continue;
24,958,133✔
266
      }
267

268
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
6,203,460✔
269
        continue;
926,169✔
270
      }
271

272
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
5,277,291✔
273
      if (pDataInfo->seqId != currSeqId) {
5,280,696!
274
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
275
        taosMemoryFreeClear(pDataInfo->pRsp);
×
276
        break;
×
277
      }
278

279
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
5,280,696✔
280
        code = pDataInfo->code;
4✔
281
        TAOS_CHECK_EXIT(code);
4!
282
      }
283

284
      tmemory_barrier();
5,280,692✔
285
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
5,280,692✔
286
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
5,280,692✔
287
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
5,281,129!
288

289
      // todo
290
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
5,281,776✔
291
      if (pRsp->numOfRows == 0) {
5,281,776✔
292
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
1,807,567!
293
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
294
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
295
          if (code != TSDB_CODE_SUCCESS) {
×
296
            taosMemoryFreeClear(pDataInfo->pRsp);
×
297
            TAOS_CHECK_EXIT(code);
×
298
          }
299
        } else {
300
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,807,567✔
301
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
1,807,567✔
302
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
303
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
304
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
305
          taosMemoryFreeClear(pDataInfo->pRsp);
1,807,565!
306
        }
307
        break;
1,807,726✔
308
      }
309

310
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
3,474,209!
311

312
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,475,515✔
313
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
3,475,515✔
314
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,475,755✔
315

316
      if (pRsp->completed == 1) {
3,475,755✔
317
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
3,194,875✔
318
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
3,194,875✔
319
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
320
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
321
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
322
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
323
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
324
      } else {
325
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
280,880✔
326
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
327
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
328
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
329
      }
330

331
      taosMemoryFreeClear(pDataInfo->pRsp);
3,475,707!
332

333
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan) {
3,476,318!
334
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
280,853✔
335
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
280,853✔
336
        if (code != TSDB_CODE_SUCCESS) {
280,853!
337
          taosMemoryFreeClear(pDataInfo->pRsp);
×
338
          TAOS_CHECK_EXIT(code);
×
339
        }
340
      }
341
      
342
      return;
5,034,466✔
343
    }  // end loop
344

345
    int32_t complete1 = 0;
1,840,273✔
346
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
1,840,273✔
347
    if (code != TSDB_CODE_SUCCESS) {
1,807,475!
UNCOV
348
      pTaskInfo->code = code;
×
UNCOV
349
      T_LONG_JMP(pTaskInfo->env, code);
×
350
    }
351
    if (complete1 == totalSources) {
1,807,527✔
352
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
1,558,142✔
353
      return;
1,558,148✔
354
    }
355
  }
356

357
_exit:
4✔
358

359
  if (code) {
4!
360
    pTaskInfo->code = code;
4✔
361
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
4!
362
  }
363
}
364

365
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
14,120,745✔
366
  int32_t        code = TSDB_CODE_SUCCESS;
14,120,745✔
367
  SExchangeInfo* pExchangeInfo = pOperator->info;
14,120,745✔
368
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
14,120,745✔
369

370
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
14,120,745✔
371

372
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
14,118,044✔
373
  if (pOperator->status == OP_EXEC_DONE) {
14,118,044!
374
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
375
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
376
           pLoadInfo->totalElapsed / 1000.0);
377
    return NULL;
×
378
  }
379

380
  // we have buffered retrieved datablock, return it directly
381
  SSDataBlock* p = NULL;
14,118,044✔
382
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
14,118,044✔
383
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
7,219,825✔
384
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
7,219,574✔
385
  }
386

387
  if (p != NULL) {
14,118,659✔
388
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
7,219,887✔
389
    if (!tmp) {
7,219,979!
390
      code = terrno;
×
391
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
392
      pTaskInfo->code = code;
×
393
      T_LONG_JMP(pTaskInfo->env, code);
×
394
    }
395
    return p;
7,219,979✔
396
  } else {
397
    if (pExchangeInfo->seqLoadData) {
6,898,772✔
398
      code = seqLoadRemoteData(pOperator);
1,279✔
399
      if (code != TSDB_CODE_SUCCESS) {
1,279!
400
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
401
        pTaskInfo->code = code;
×
402
        T_LONG_JMP(pTaskInfo->env, code);
×
403
      }
404
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
6,897,493✔
405
      streamConcurrentlyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
33,683✔
406
    } else {
407
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
6,863,810✔
408
    }
409
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
6,900,419✔
410
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
4!
411
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
4!
412
    }
413
    
414
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
6,900,415✔
415
      qDebug("empty resultBlockList");
3,410,933✔
416
      return NULL;
3,410,939✔
417
    } else {
418
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
3,487,793✔
419
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
3,487,019✔
420
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
3,488,163✔
421
      if (!tmp) {
3,488,978!
422
        code = terrno;
×
423
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
424
        pTaskInfo->code = code;
×
425
        T_LONG_JMP(pTaskInfo->env, code);
×
426
      }
427

428
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
3,488,978✔
429
      return p;
3,488,942✔
430
    }
431
}
432
}
433

434
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
14,112,416✔
435
  int32_t        code = TSDB_CODE_SUCCESS;
14,112,416✔
436
  int32_t        lino = 0;
14,112,416✔
437
  SExchangeInfo* pExchangeInfo = pOperator->info;
14,112,416✔
438
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
14,112,416✔
439

440
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
14,112,416✔
441

442
  code = pOperator->fpSet._openFn(pOperator);
14,112,509✔
443
  QUERY_CHECK_CODE(code, lino, _end);
14,112,654!
444

445
  if (pOperator->status == OP_EXEC_DONE) {
14,112,654✔
446
    (*ppRes) = NULL;
39✔
447
    return code;
39✔
448
  }
449

450
  while (1) {
8,213✔
451
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
14,120,828✔
452
    if (pBlock == NULL) {
14,118,530✔
453
      (*ppRes) = NULL;
3,411,054✔
454
      return code;
3,411,054✔
455
    }
456

457
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
10,707,476✔
458
    QUERY_CHECK_CODE(code, lino, _end);
10,708,643!
459

460
    if (blockDataGetNumOfRows(pBlock) == 0) {
10,708,643✔
461
      qDebug("rows 0 block got, continue next load");
2!
462
      continue;
2✔
463
    }
464

465
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
10,708,596✔
466
    if (hasLimitOffsetInfo(pLimitInfo)) {
10,708,596✔
467
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
18,711✔
468
      if (status == PROJECT_RETRIEVE_CONTINUE) {
18,711✔
469
        qDebug("limit retrieve continue");
8,211✔
470
        continue;
8,211✔
471
      } else if (status == PROJECT_RETRIEVE_DONE) {
10,500!
472
        if (pBlock->info.rows == 0) {
10,500!
473
          setOperatorCompleted(pOperator);
×
474
          (*ppRes) = NULL;
×
475
          return code;
×
476
        } else {
477
          (*ppRes) = pBlock;
10,500✔
478
          return code;
10,500✔
479
        }
480
      }
481
    } else {
482
      (*ppRes) = pBlock;
10,689,857✔
483
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
10,689,857✔
484
      return code;
10,689,843✔
485
    }
486
  }
487

488
_end:
×
489

490
  if (code != TSDB_CODE_SUCCESS) {
×
491
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
492
    pTaskInfo->code = code;
×
493
    T_LONG_JMP(pTaskInfo->env, code);
×
494
  } else {
495
    qDebug("empty block returned in exchange");
×
496
  }
497
  
498
  (*ppRes) = NULL;
×
499
  return code;
×
500
}
501

502
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
3,457,881✔
503
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
3,457,881✔
504
  if (pInfo->pSourceDataInfo == NULL) {
3,457,661✔
505
    return terrno;
1✔
506
  }
507

508
  if (pInfo->dynamicOp) {
3,457,660✔
509
    return TSDB_CODE_SUCCESS;
12,136✔
510
  }
511

512
  int32_t len = strlen(id) + 1;
3,445,524✔
513
  pInfo->pTaskId = taosMemoryCalloc(1, len);
3,445,524!
514
  if (!pInfo->pTaskId) {
3,445,731!
515
    return terrno;
×
516
  }
517
  tstrncpy(pInfo->pTaskId, id, len);
3,445,731✔
518
  for (int32_t i = 0; i < numOfSources; ++i) {
8,450,950✔
519
    SSourceDataInfo dataInfo = {0};
5,005,178✔
520
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
5,005,178✔
521
    dataInfo.taskId = pInfo->pTaskId;
5,005,178✔
522
    dataInfo.index = i;
5,005,178✔
523
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
5,005,178✔
524
    if (pDs == NULL) {
5,005,212!
525
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
526
      return terrno;
×
527
    }
528
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
5,005,212✔
529
  }
530

531
  return TSDB_CODE_SUCCESS;
3,445,772✔
532
}
533

534
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
3,457,792✔
535
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
3,457,792!
536

537
  if (numOfSources == 0) {
3,457,792!
538
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
539
    return TSDB_CODE_INVALID_PARA;
×
540
  }
541
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
3,457,792✔
542
  if (!pInfo->pFetchRpcHandles) {
3,457,873!
543
    return terrno;
×
544
  }
545
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
3,457,873✔
546
  if (!ret) {
3,457,811!
547
    return terrno;
×
548
  }
549

550
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
3,457,811✔
551
  if (pInfo->pSources == NULL) {
3,457,658!
552
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
553
    return terrno;
×
554
  }
555

556
  if (pExNode->node.dynamicOp) {
3,457,658✔
557
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
12,136✔
558
    if (NULL == pInfo->pHashSources) {
12,136!
559
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
560
      return terrno;
×
561
    }
562
  }
563

564
  for (int32_t i = 0; i < numOfSources; ++i) {
8,487,010✔
565
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
5,029,167✔
566
    if (!pNode) {
5,029,493!
567
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
568
      return terrno;
×
569
    }
570
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
5,029,493✔
571
    if (!tmp) {
5,029,315!
572
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
573
      return terrno;
×
574
    }
575
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
5,029,315✔
576
    int32_t           code =
577
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
5,029,315✔
578
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
5,029,352!
579
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
580
      return code;
×
581
    }
582
  }
583

584
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
3,457,843✔
585
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
3,457,819✔
586
  if (refId < 0) {
3,457,909!
587
    int32_t code = terrno;
×
588
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
589
    return code;
×
590
  } else {
591
    pInfo->self = refId;
3,457,909✔
592
  }
593

594
  return initDataSource(numOfSources, pInfo, id);
3,457,909✔
595
}
596

597
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
21,333✔
598
  SExchangeInfo* pInfo = pOper->info;
21,333✔
599
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
21,333✔
600

601
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
21,333✔
602

603
  atomic_add_fetch_64(&pInfo->seqId, 1);
21,333✔
604
  pOper->status = OP_NOT_OPENED;
21,333✔
605
  pInfo->current = 0;
21,333✔
606
  pInfo->loadInfo.totalElapsed = 0;
21,333✔
607
  pInfo->loadInfo.totalRows = 0;
21,333✔
608
  pInfo->loadInfo.totalSize = 0;
21,333✔
609
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
42,666✔
610
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
21,333✔
611
    taosWLockLatch(&pDataInfo->lock);
21,333✔
612
    taosMemoryFreeClear(pDataInfo->decompBuf);
21,333!
613
    taosMemoryFreeClear(pDataInfo->pRsp);
21,333!
614

615
    pDataInfo->totalRows = 0;
21,333✔
616
    pDataInfo->code = 0;
21,333✔
617
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
21,333✔
618
    pDataInfo->fetchSent = false;
21,333✔
619
    taosWUnLockLatch(&pDataInfo->lock);
21,333✔
620
  }
621

622
  if (pInfo->dynamicOp) {
21,333!
623
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
624
  } 
625

626
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
21,333✔
627
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
21,333✔
628

629
  blockDataCleanup(pInfo->pDummyBlock);
21,333✔
630

631
  void   *data = NULL;
21,333✔
632
  int32_t iter = 0;
21,333✔
633
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
21,333!
634
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
×
635
  }
636
  
637
  pInfo->limitInfo = (SLimitInfo){0};
21,333✔
638
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
21,333✔
639

640
  return 0;
21,333✔
641
}
642

643
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
3,457,867✔
644
                                   SOperatorInfo** pOptrInfo) {
645
  QRY_PARAM_CHECK(pOptrInfo);
3,457,867!
646

647
  int32_t        code = 0;
3,457,867✔
648
  int32_t        lino = 0;
3,457,867✔
649
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
3,457,867!
650
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,457,783!
651
  if (pInfo == NULL || pOperator == NULL) {
3,457,778!
652
    code = terrno;
1✔
653
    goto _error;
×
654
  }
655

656
  pOperator->pPhyNode = pExNode;
3,457,777✔
657
  pInfo->dynamicOp = pExNode->node.dynamicOp;
3,457,777✔
658
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
3,457,777✔
659
  QUERY_CHECK_CODE(code, lino, _error);
3,457,798!
660

661
  code = tsem_init(&pInfo->ready, 0, 0);
3,457,798✔
662
  QUERY_CHECK_CODE(code, lino, _error);
3,457,776!
663

664
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
3,457,776✔
665
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
3,457,852!
666

667
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
3,457,852✔
668
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
3,457,758!
669
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
3,457,758✔
670
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
3,457,724!
671

672
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
3,457,724✔
673
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
3,457,724✔
674
  QUERY_CHECK_CODE(code, lino, _error);
3,457,903!
675

676
  pInfo->seqLoadData = pExNode->seqRecvData;
3,457,903✔
677
  pInfo->dynTbname = pExNode->dynTbname;
3,457,903✔
678
  if (pInfo->dynTbname) {
3,457,903✔
679
    pInfo->seqLoadData = true;
6✔
680
  }
681
  pInfo->pTransporter = pTransporter;
3,457,903✔
682

683
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
3,457,903✔
684
                  pTaskInfo);
685
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
3,457,845✔
686

687
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
3,457,541✔
688
                            pTaskInfo->pStreamRuntimeInfo);
3,457,541✔
689
  QUERY_CHECK_CODE(code, lino, _error);
3,457,572!
690
  qTrace("%s exchange op:%p", __func__, pOperator);
3,457,572!
691
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
3,457,572✔
692
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
693
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
3,457,828✔
694
  *pOptrInfo = pOperator;
3,457,817✔
695
  return TSDB_CODE_SUCCESS;
3,457,817✔
696

697
_error:
×
698
  if (code != TSDB_CODE_SUCCESS) {
×
699
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
700
    pTaskInfo->code = code;
×
701
  }
702
  if (pInfo != NULL) {
×
703
    doDestroyExchangeOperatorInfo(pInfo);
×
704
  }
705

706
  if (pOperator != NULL) {
×
707
    pOperator->info = NULL;
×
708
    destroyOperator(pOperator);
×
709
  }
710
  return code;
×
711
}
712

713
void destroyExchangeOperatorInfo(void* param) {
3,457,885✔
714
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,457,885✔
715
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
3,457,885✔
716
  if (code != TSDB_CODE_SUCCESS) {
3,457,948!
717
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
718
  }
719
}
3,457,948✔
720

721
void freeBlock(void* pParam) {
5,195,545✔
722
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
5,195,545✔
723
  blockDataDestroy(pBlock);
5,195,545✔
724
}
5,195,591✔
725

726
void freeSourceDataInfo(void* p) {
5,006,512✔
727
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
5,006,512✔
728
  taosMemoryFreeClear(pInfo->decompBuf);
5,006,512!
729
  taosMemoryFreeClear(pInfo->pRsp);
5,006,512!
730

731
  pInfo->decompBufSize = 0;
5,006,512✔
732
}
5,006,512✔
733

734
void doDestroyExchangeOperatorInfo(void* param) {
3,457,918✔
735
  if (param == NULL) {
3,457,918!
736
    return;
×
737
  }
738
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
3,457,918✔
739
  if (pExInfo->pFetchRpcHandles) {
3,457,918!
740
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
8,485,021✔
741
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
5,027,647✔
742
      if (*pRpcHandle > 0) {
5,027,081✔
743
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
3,938✔
744
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
3,938✔
745
      }
746
    }
747
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
3,457,374✔
748
  }
749

750
  taosArrayDestroy(pExInfo->pSources);
3,457,841✔
751
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
3,457,895✔
752

753
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
3,457,854✔
754
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
3,457,880✔
755

756
  blockDataDestroy(pExInfo->pDummyBlock);
3,457,908✔
757
  tSimpleHashCleanup(pExInfo->pHashSources);
3,457,936✔
758

759
  int32_t code = tsem_destroy(&pExInfo->ready);
3,457,925✔
760
  if (code != TSDB_CODE_SUCCESS) {
3,457,924!
761
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
762
  }
763
  taosMemoryFreeClear(pExInfo->pTaskId);
3,457,924!
764

765
  taosMemoryFreeClear(param);
3,457,911!
766
}
767

768
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
5,307,098✔
769
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
5,307,098✔
770

771
  taosMemoryFreeClear(pMsg->pEpSet);
5,307,098!
772
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
5,307,098✔
773
  if (pExchangeInfo == NULL) {
5,308,706✔
774
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
18!
775
    taosMemoryFree(pMsg->pData);
18!
776
    return TSDB_CODE_SUCCESS;
18✔
777
  }
778

779
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
5,308,688✔
780
  if (pWrapper->seqId != currSeqId) {
5,305,734!
781
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
782
    taosMemoryFree(pMsg->pData);
×
783
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
784
    if (code != TSDB_CODE_SUCCESS) {
×
785
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
786
    }
787
    return TSDB_CODE_SUCCESS;
×
788
  }
789

790
  int32_t          index = pWrapper->sourceIndex;
5,305,734✔
791

792
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
5,305,734✔
793

794
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
5,305,743✔
795
  if (pRpcHandle != NULL) {
5,278,184✔
796
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
5,277,965✔
797
    if (ret != 0) {
5,308,193✔
798
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
3,815✔
799
    }
800
    *pRpcHandle = -1;
5,307,852✔
801
  }
802

803
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
5,308,071✔
804
  if (!pSourceDataInfo) {
5,280,313!
805
    return terrno;
×
806
  }
807

808
  if (0 == code && NULL == pMsg->pData) {
5,280,313!
809
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
810
    code = TSDB_CODE_QRY_INVALID_MSG;
×
811
  }
812

813
  taosWLockLatch(&pSourceDataInfo->lock);
5,280,313✔
814
  if (code == TSDB_CODE_SUCCESS) {
5,305,083!
815
    pSourceDataInfo->seqId = pWrapper->seqId;
5,305,159✔
816
    pSourceDataInfo->pRsp = pMsg->pData;
5,305,159✔
817

818
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
5,305,159✔
819
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
5,305,159✔
820
    pRsp->compLen = htonl(pRsp->compLen);
5,299,245✔
821
    pRsp->payloadLen = htonl(pRsp->payloadLen);
5,299,245✔
822
    pRsp->numOfCols = htonl(pRsp->numOfCols);
5,299,245✔
823
    pRsp->useconds = htobe64(pRsp->useconds);
5,299,245✔
824
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
5,304,104✔
825

826
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
5,304,104✔
827
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
828
  } else {
829
    taosMemoryFree(pMsg->pData);
×
830
    pSourceDataInfo->code = rpcCvtErrCode(code);
4✔
831
    if (pSourceDataInfo->code != code) {
4!
832
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
833
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
834
    } else {
835
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
4!
836
             pExchangeInfo);
837
    }
838
  }
839

840
  tmemory_barrier();
5,304,121✔
841
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
5,304,121✔
842
  taosWUnLockLatch(&pSourceDataInfo->lock);
5,304,121✔
843
  
844
  code = tsem_post(&pExchangeInfo->ready);
5,307,316✔
845
  if (code != TSDB_CODE_SUCCESS) {
5,308,814!
846
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
847
    return code;
×
848
  }
849

850
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
5,308,814✔
851
  if (code != TSDB_CODE_SUCCESS) {
5,309,264!
852
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
853
  }
854
  return code;
5,309,324✔
855
}
856

857
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
1,009✔
858
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,009!
859
  if (NULL == *ppRes) {
1,009!
860
    return terrno;
×
861
  }
862

863
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
1,009!
864
  if (NULL == pScan) {
1,009!
865
    taosMemoryFreeClear(*ppRes);
×
866
    return terrno;
×
867
  }
868

869
  pScan->pUidList = taosArrayDup(pUidList, NULL);
1,009✔
870
  if (NULL == pScan->pUidList) {
1,009!
871
    taosMemoryFree(pScan);
×
872
    taosMemoryFreeClear(*ppRes);
×
873
    return terrno;
×
874
  }
875
  pScan->tableSeq = tableSeq;
1,009✔
876
  pScan->pOrgTbInfo = NULL;
1,009✔
877
  pScan->window.skey = INT64_MAX;
1,009✔
878
  pScan->window.ekey = INT64_MIN;
1,009✔
879

880
  (*ppRes)->opType = srcOpType;
1,009✔
881
  (*ppRes)->downstreamIdx = 0;
1,009✔
882
  (*ppRes)->value = pScan;
1,009✔
883
  (*ppRes)->pChildren = NULL;
1,009✔
884
  (*ppRes)->reUse = false;
1,009✔
885

886
  return TSDB_CODE_SUCCESS;
1,009✔
887
}
888

889
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window) {
228✔
890
  int32_t                  code = TSDB_CODE_SUCCESS;
228✔
891
  int32_t                  lino = 0;
228✔
892
  STableScanOperatorParam* pScan = NULL;
228✔
893

894
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
228!
895
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
228!
896

897
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
228!
898
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
228!
899

900
  pScan->pUidList = taosArrayDup(pUidList, NULL);
228✔
901
  QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
228!
902

903
  pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
228!
904
  QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
228!
905

906
  pScan->pOrgTbInfo->vgId = pMap->vgId;
228✔
907
  tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
228✔
908

909
  pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
228✔
910
  QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
228!
911

912
  pScan->tableSeq = tableSeq;
228✔
913
  pScan->window.skey = window->skey;
228✔
914
  pScan->window.ekey = window->ekey;
228✔
915

916
  (*ppRes)->opType = srcOpType;
228✔
917
  (*ppRes)->downstreamIdx = 0;
228✔
918
  (*ppRes)->value = pScan;
228✔
919
  (*ppRes)->pChildren = NULL;
228✔
920
  (*ppRes)->reUse = false;
228✔
921

922
  return code;
228✔
923
_return:
×
924
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
925
  taosMemoryFreeClear(*ppRes);
×
926
  if (pScan) {
×
927
    taosArrayDestroy(pScan->pUidList);
×
928
    if (pScan->pOrgTbInfo) {
×
929
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
930
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
931
    }
932
    taosMemoryFree(pScan);
×
933
  }
934
  return code;
×
935
}
936

937
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
24✔
938
  int32_t                  code = TSDB_CODE_SUCCESS;
24✔
939
  int32_t                  lino = 0;
24✔
940
  STagScanOperatorParam*   pScan = NULL;
24✔
941

942
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
24!
943
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
24!
944

945
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
24!
946
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
24!
947
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
24✔
948

949
  (*ppRes)->opType = srcOpType;
24✔
950
  (*ppRes)->downstreamIdx = 0;
24✔
951
  (*ppRes)->value = pScan;
24✔
952
  (*ppRes)->pChildren = NULL;
24✔
953
  (*ppRes)->reUse = false;
24✔
954

955
  return code;
24✔
956
_return:
×
957
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
958
  taosMemoryFreeClear(*ppRes);
×
959
  if (pScan) {
×
960
    taosMemoryFree(pScan);
×
961
  }
962
  return code;
×
963
}
964

965
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
5,307,844✔
966
  int32_t          code = TSDB_CODE_SUCCESS;
5,307,844✔
967
  int32_t          lino = 0;
5,307,844✔
968
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
5,307,844✔
969
  if (!pDataInfo) {
5,305,283!
UNCOV
970
    return terrno;
×
971
  }
972

973
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
5,305,352!
974
    return TSDB_CODE_SUCCESS;
×
975
  }
976

977
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
5,305,352✔
978
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
5,305,352✔
979
  if (!pSource) {
5,303,706!
980
    return terrno;
×
981
  }
982

983
  pDataInfo->startTime = taosGetTimestampUs();
5,307,529✔
984
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
5,307,529✔
985

986
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
5,308,810!
987
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
5,310,701!
988
  pWrapper->exchangeId = pExchangeInfo->self;
5,310,701✔
989
  pWrapper->sourceIndex = sourceIndex;
5,310,701✔
990
  pWrapper->seqId = pExchangeInfo->seqId;
5,310,701✔
991

992
  if (pSource->localExec) {
5,310,701!
993
    SDataBuf pBuf = {0};
×
994
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
×
995
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
996
                                               pTaskInfo->localFetch.explainRes);
997
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
998
    taosMemoryFree(pWrapper);
×
999
    QUERY_CHECK_CODE(code, lino, _end);
×
1000
  } else {
1001
    SResFetchReq req = {0};
5,310,701✔
1002
    req.header.vgId = pSource->addr.nodeId;
5,310,701✔
1003
    req.sId = pSource->sId;
5,310,701✔
1004
    req.clientId = pSource->clientId;
5,310,701✔
1005
    req.taskId = pSource->taskId;
5,310,701✔
1006
    req.queryId = pTaskInfo->id.queryId;
5,310,701✔
1007
    req.execId = pSource->execId;
5,310,701✔
1008
    if (pTaskInfo->pStreamRuntimeInfo) {
5,310,701✔
1009
      req.dynTbname = pExchangeInfo->dynTbname;
24,382✔
1010
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
24,382✔
1011
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER)
24,382!
1012
        qDebug("doSendFetchDataRequest to execId:%d, %p", req.execId, pTaskInfo->pStreamRuntimeInfo);
×
1013
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
24,469✔
1014
      if (!pDataInfo->fetchSent) {
24,469✔
1015
        req.reset = pDataInfo->fetchSent = true;
21,046✔
1016
      }
1017
    }
1018
    if (pDataInfo->isVtbRefScan) {
5,310,788✔
1019
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window);
228✔
1020
      taosArrayDestroy(pDataInfo->colMap->colMap);
228✔
1021
      taosMemoryFreeClear(pDataInfo->colMap);
228!
1022
      taosArrayDestroy(pDataInfo->pSrcUidList);
228✔
1023
      pDataInfo->pSrcUidList = NULL;
228✔
1024
      if (TSDB_CODE_SUCCESS != code) {
228!
1025
        pTaskInfo->code = code;
×
1026
        taosMemoryFree(pWrapper);
×
1027
        return pTaskInfo->code;
×
1028
      }
1029
    } else if (pDataInfo->isVtbTagScan) {
5,310,560✔
1030
      code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
24✔
1031
      taosArrayDestroy(pDataInfo->pSrcUidList);
24✔
1032
      pDataInfo->pSrcUidList = NULL;
24✔
1033
      if (TSDB_CODE_SUCCESS != code) {
24!
1034
        pTaskInfo->code = code;
×
1035
        taosMemoryFree(pWrapper);
×
1036
        return pTaskInfo->code;
×
1037
      }
1038
    } else {
1039
      if (pDataInfo->pSrcUidList) {
5,310,536✔
1040
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
942✔
1041
        taosArrayDestroy(pDataInfo->pSrcUidList);
942✔
1042
        pDataInfo->pSrcUidList = NULL;
942✔
1043
        if (TSDB_CODE_SUCCESS != code) {
942!
1044
          pTaskInfo->code = code;
×
1045
          taosMemoryFree(pWrapper);
×
1046
          return pTaskInfo->code;
×
1047
        }
1048
      }
1049
    }
1050

1051
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
5,310,788✔
1052
    if (msgSize < 0) {
5,310,120!
1053
      pTaskInfo->code = msgSize;
×
1054
      taosMemoryFree(pWrapper);
×
1055
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1056
      return pTaskInfo->code;
×
1057
    }
1058

1059
    void* msg = taosMemoryCalloc(1, msgSize);
5,310,120!
1060
    if (NULL == msg) {
5,310,552!
1061
      pTaskInfo->code = terrno;
×
1062
      taosMemoryFree(pWrapper);
×
1063
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1064
      return pTaskInfo->code;
×
1065
    }
1066

1067
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req);
5,310,552✔
1068
    if (msgSize < 0) {
5,310,158!
1069
      pTaskInfo->code = msgSize;
×
1070
      taosMemoryFree(pWrapper);
×
1071
      taosMemoryFree(msg);
×
1072
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1073
      return pTaskInfo->code;
×
1074
    }
1075

1076
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
5,310,158✔
1077

1078
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
5,310,147✔
1079
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1080
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1081
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1082

1083
    // send the fetch remote task result reques
1084
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
5,310,151!
1085
    if (NULL == pMsgSendInfo) {
5,310,296!
1086
      taosMemoryFreeClear(msg);
×
1087
      taosMemoryFree(pWrapper);
×
1088
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1089
      pTaskInfo->code = terrno;
×
1090
      return pTaskInfo->code;
×
1091
    }
1092

1093
    pMsgSendInfo->param = pWrapper;
5,310,296✔
1094
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
5,310,296✔
1095
    pMsgSendInfo->msgInfo.pData = msg;
5,310,296✔
1096
    pMsgSendInfo->msgInfo.len = msgSize;
5,310,296✔
1097
    pMsgSendInfo->msgType = pSource->fetchMsgType;
5,310,296✔
1098
    pMsgSendInfo->fp = loadRemoteDataCallback;
5,310,296✔
1099
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
5,310,296✔
1100

1101
    int64_t transporterId = 0;
5,310,296✔
1102
    void* poolHandle = NULL;
5,310,296✔
1103
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
5,310,296✔
1104
    QUERY_CHECK_CODE(code, lino, _end);
5,310,744!
1105
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
5,310,744✔
1106
    *pRpcHandle = transporterId;
5,307,393✔
1107
  }
1108

1109
_end:
5,307,393✔
1110
  if (code != TSDB_CODE_SUCCESS) {
5,307,393!
1111
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1112
  }
1113
  return code;
5,307,395✔
1114
}
1115

1116
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
3,659,552✔
1117
                          SOperatorInfo* pOperator) {
1118
  pInfo->totalRows += numOfRows;
3,659,552✔
1119
  pInfo->totalSize += dataLen;
3,659,552✔
1120
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
3,659,774✔
1121
  pOperator->resultInfo.totalRows += numOfRows;
3,659,774✔
1122
}
3,659,774✔
1123

1124
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
10,925,568✔
1125
  int32_t      code = TSDB_CODE_SUCCESS;
10,925,568✔
1126
  int32_t      lino = 0;
10,925,568✔
1127
  SSDataBlock* pBlock = NULL;
10,925,568✔
1128
  if (pColList == NULL) {  // data from other sources
10,925,568✔
1129
    blockDataCleanup(pRes);
10,755,642✔
1130
    code = blockDecode(pRes, pData, (const char**)pNextStart);
10,754,286✔
1131
    if (code) {
10,751,553!
1132
      return code;
×
1133
    }
1134
  } else {  // extract data according to pColList
1135
    char* pStart = pData;
169,926✔
1136

1137
    int32_t numOfCols = htonl(*(int32_t*)pStart);
169,926✔
1138
    pStart += sizeof(int32_t);
169,926✔
1139

1140
    // todo refactor:extract method
1141
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
169,926✔
1142
    for (int32_t i = 0; i < numOfCols; ++i) {
2,779,722✔
1143
      SSysTableSchema* p = (SSysTableSchema*)pStart;
2,609,796✔
1144

1145
      p->colId = htons(p->colId);
2,609,796✔
1146
      p->bytes = htonl(p->bytes);
2,609,796✔
1147
      pStart += sizeof(SSysTableSchema);
2,609,796✔
1148
    }
1149

1150
    pBlock = NULL;
169,926✔
1151
    code = createDataBlock(&pBlock);
169,926✔
1152
    QUERY_CHECK_CODE(code, lino, _end);
170,518!
1153

1154
    for (int32_t i = 0; i < numOfCols; ++i) {
2,776,957✔
1155
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
2,606,470✔
1156
      code = blockDataAppendColInfo(pBlock, &idata);
2,611,109✔
1157
      QUERY_CHECK_CODE(code, lino, _end);
2,606,439!
1158
    }
1159

1160
    code = blockDecode(pBlock, pStart, NULL);
170,487✔
1161
    QUERY_CHECK_CODE(code, lino, _end);
170,432!
1162

1163
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
170,432✔
1164
    QUERY_CHECK_CODE(code, lino, _end);
170,490!
1165

1166
    // data from mnode
1167
    pRes->info.dataLoad = 1;
170,490✔
1168
    pRes->info.rows = pBlock->info.rows;
170,490✔
1169
    pRes->info.scanFlag = MAIN_SCAN;
170,490✔
1170
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
170,490✔
1171
    QUERY_CHECK_CODE(code, lino, _end);
170,467!
1172

1173
    blockDataDestroy(pBlock);
170,467✔
1174
    pBlock = NULL;
170,560✔
1175
  }
1176

1177
_end:
10,922,113✔
1178
  if (code != TSDB_CODE_SUCCESS) {
10,922,113!
1179
    blockDataDestroy(pBlock);
×
1180
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1181
  }
1182
  return code;
10,922,762✔
1183
}
1184

1185
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
1,853,029✔
1186
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,853,029✔
1187
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,853,029✔
1188

1189
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,853,029✔
1190
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,853,029✔
1191
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
1,853,029✔
1192
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1193
         pLoadInfo->totalElapsed / 1000.0);
1194

1195
  setOperatorCompleted(pOperator);
1,853,029✔
1196
}
1,853,137✔
1197

1198
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
8,718,931✔
1199
  int32_t code = TSDB_CODE_SUCCESS;
8,718,931✔
1200
  int32_t lino = 0;
8,718,931✔
1201
  size_t  total = taosArrayGetSize(pArray);
8,718,931✔
1202

1203
  int32_t completed = 0;
8,717,253✔
1204
  for (int32_t k = 0; k < total; ++k) {
69,645,404✔
1205
    SSourceDataInfo* p = taosArrayGet(pArray, k);
60,947,352✔
1206
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
60,897,616!
1207
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
60,926,943✔
1208
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
30,639,344✔
1209
      completed += 1;
30,640,552✔
1210
    }
1211
  }
1212

1213
  *pRes = completed;
8,698,052✔
1214
_end:
8,698,052✔
1215
  if (code != TSDB_CODE_SUCCESS) {
8,698,052!
1216
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1217
  }
1218
  return code;
8,717,464✔
1219
}
1220

1221
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
3,444,368✔
1222
  SExchangeInfo* pExchangeInfo = pOperator->info;
3,444,368✔
1223
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,444,368✔
1224

1225
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
3,444,368✔
1226
  int64_t startTs = taosGetTimestampUs();
3,444,175✔
1227

1228
  // Asynchronously send all fetch requests to all sources.
1229
  for (int32_t i = 0; i < totalSources; ++i) {
8,445,163✔
1230
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
5,001,285✔
1231
    if (code != TSDB_CODE_SUCCESS) {
5,000,988!
1232
      pTaskInfo->code = code;
×
1233
      return code;
×
1234
    }
1235
  }
1236

1237
  int64_t endTs = taosGetTimestampUs();
3,444,170✔
1238
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
3,444,170✔
1239
         totalSources, (endTs - startTs) / 1000.0);
1240

1241
  pOperator->status = OP_RES_TO_RETURN;
3,444,171✔
1242
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
3,444,201✔
1243
  if (isTaskKilled(pTaskInfo)) {
3,444,201!
1244
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1245
  }
1246

1247
  return TSDB_CODE_SUCCESS;
3,444,368✔
1248
}
1249

1250
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
3,487,705✔
1251
  int32_t            code = TSDB_CODE_SUCCESS;
3,487,705✔
1252
  int32_t            lino = 0;
3,487,705✔
1253
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,487,705✔
1254
  SSDataBlock*       pb = NULL;
3,487,705✔
1255

1256
  char* pNextStart = pRetrieveRsp->data;
3,487,705✔
1257
  char* pStart = pNextStart;
3,487,705✔
1258

1259
  int32_t index = 0;
3,487,705✔
1260

1261
  if (pRetrieveRsp->compressed) {  // decompress the data
3,487,705!
1262
    if (pDataInfo->decompBuf == NULL) {
×
1263
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1264
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1265
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1266
    } else {
1267
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1268
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1269
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1270
        if (p != NULL) {
×
1271
          pDataInfo->decompBuf = p;
×
1272
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1273
        }
1274
      }
1275
    }
1276
  }
1277

1278
  while (index++ < pRetrieveRsp->numOfBlocks) {
14,242,410✔
1279
    pStart = pNextStart;
10,753,032✔
1280

1281
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
10,753,032✔
1282
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
5,559,229✔
1283
      blockDataCleanup(pb);
5,559,169✔
1284
    } else {
1285
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
5,194,130✔
1286
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
5,195,256!
1287
    }
1288

1289
    int32_t compLen = *(int32_t*)pStart;
10,754,140✔
1290
    pStart += sizeof(int32_t);
10,754,140✔
1291

1292
    int32_t rawLen = *(int32_t*)pStart;
10,754,140✔
1293
    pStart += sizeof(int32_t);
10,754,140✔
1294
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
10,754,140!
1295

1296
    pNextStart = pStart + compLen;
10,754,140✔
1297
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
10,754,140!
1298
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1299
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1300
      pStart = pDataInfo->decompBuf;
×
1301
    }
1302

1303
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
10,754,140✔
1304
    if (code != 0) {
10,753,030!
1305
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1306
      goto _end;
×
1307
    }
1308

1309
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
10,753,030✔
1310
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
10,754,491!
1311
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
10,754,491✔
1312
    pb = NULL;
10,754,705✔
1313
  }
1314

1315
_end:
3,489,378✔
1316
  if (code != TSDB_CODE_SUCCESS) {
3,489,378!
1317
    blockDataDestroy(pb);
×
1318
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1319
  }
1320
  return code;
3,489,017✔
1321
}
1322

1323
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
1,279✔
1324
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,279✔
1325
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,279✔
1326

1327
  int32_t code = 0;
1,279✔
1328
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,279✔
1329
  int64_t startTs = taosGetTimestampUs();
1,279✔
1330

1331
  int32_t vgId = 0;
1,279✔
1332
  if (pExchangeInfo->dynTbname) {
1,279✔
1333
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
12✔
1334
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
12!
1335
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
12✔
1336
      if (pValue != NULL && pValue->isTbname) {
12!
1337
        vgId = pValue->vgId;
12✔
1338
        break;
12✔
1339
      }
1340
    }
1341
  }
1342

1343
  while (1) {
331✔
1344
    if (pExchangeInfo->current >= totalSources) {
1,610✔
1345
      setAllSourcesCompleted(pOperator);
411✔
1346
      return TSDB_CODE_SUCCESS;
411✔
1347
    }
1348

1349
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
1,199✔
1350
    if (!pSource) {
1,199!
1351
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1352
      pTaskInfo->code = terrno;
×
1353
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1354
    }
1355

1356
    if (vgId != 0 && pSource->addr.nodeId != vgId){
1,199!
1357
      pExchangeInfo->current += 1;
×
1358
      continue;
×
1359
    }
1360

1361
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
1,199✔
1362
    if (!pDataInfo) {
1,199!
1363
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1364
      pTaskInfo->code = terrno;
×
1365
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1366
    }
1367
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,199✔
1368

1369
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
1,199✔
1370
    if (code != TSDB_CODE_SUCCESS) {
1,199!
1371
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1372
      pTaskInfo->code = code;
×
1373
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1374
    }
1375

1376
    while (true) {
×
1377
      code = exchangeWait(pOperator, pExchangeInfo);
1,199✔
1378
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
1,199!
1379
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1380
      }
1381

1382
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
1,199✔
1383
      if (pDataInfo->seqId != currSeqId) {
1,199!
1384
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
1385
        taosMemoryFreeClear(pDataInfo->pRsp);
×
1386
        continue;
×
1387
      }
1388

1389
      break;
1,199✔
1390
    }
1391

1392
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
1,199!
1393
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
1394
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1395
             tstrerror(pDataInfo->code));
1396
      pOperator->pTaskInfo->code = pDataInfo->code;
×
1397
      return pOperator->pTaskInfo->code;
×
1398
    }
1399

1400
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
1,199✔
1401
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,199✔
1402

1403
    if (pRsp->numOfRows == 0) {
1,199✔
1404
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
331!
1405
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1406
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1407
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1408

1409
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
331✔
1410
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
331!
1411
        pExchangeInfo->current = totalSources;
57✔
1412
      } else {
1413
        pExchangeInfo->current += 1;
274✔
1414
      }
1415
      taosMemoryFreeClear(pDataInfo->pRsp);
331!
1416
      continue;
331✔
1417
    }
1418

1419
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
868✔
1420
    if (code != TSDB_CODE_SUCCESS) {
868!
1421
      goto _error;
×
1422
    }
1423

1424
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
868✔
1425
    if (pRsp->completed == 1) {
868✔
1426
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
477!
1427
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1428
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1429
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1430
             pExchangeInfo->current + 1, totalSources);
1431

1432
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
477✔
1433
      if (pDataInfo->isVtbRefScan) {
477!
1434
        pExchangeInfo->current = totalSources;
×
1435
      } else {
1436
        pExchangeInfo->current += 1;
477✔
1437
      }
1438
    } else {
1439
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
391!
1440
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1441
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1442
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1443
    }
1444
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
868!
1445
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
195✔
1446
    }
1447
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
868✔
1448
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
868✔
1449

1450
    taosMemoryFreeClear(pDataInfo->pRsp);
868!
1451
    return TSDB_CODE_SUCCESS;
868✔
1452
  }
1453

1454
_error:
×
1455
  pTaskInfo->code = code;
×
1456
  return code;
×
1457
}
1458

1459
void clearVtbScanDataInfo(void* pItem) {
51✔
1460
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
51✔
1461
  if (pInfo->colMap) {
51!
1462
    taosArrayDestroy(pInfo->colMap->colMap);
×
1463
    taosMemoryFreeClear(pInfo->colMap);
×
1464
  }
1465
  taosArrayDestroy(pInfo->pSrcUidList);
51✔
1466
}
51✔
1467

1468
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
1,194✔
1469
  SExchangeInfo*     pExchangeInfo = pOperator->info;
1,194✔
1470
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
1,194✔
1471
  if (NULL == pIdx) {
1,194!
1472
    qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1473
    return TSDB_CODE_INVALID_PARA;
×
1474
  }
1475

1476
  qDebug("start to add single exchange source");
1,194!
1477

1478
  if (pBasicParam->isVtbRefScan) {
1,194✔
1479
    SSourceDataInfo dataInfo = {0};
228✔
1480
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
228✔
1481
    dataInfo.taskId = pExchangeInfo->pTaskId;
228✔
1482
    dataInfo.index = pIdx->srcIdx;
228✔
1483
    dataInfo.window = pBasicParam->window;
228✔
1484
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
228!
1485
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
228✔
1486
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
228✔
1487
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
228✔
1488

1489
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
228✔
1490
    if (dataInfo.pSrcUidList == NULL) {
228!
1491
      return terrno;
×
1492
    }
1493

1494
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
228✔
1495
    dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
228✔
1496
    dataInfo.srcOpType = pBasicParam->srcOpType;
228✔
1497
    dataInfo.tableSeq = pBasicParam->tableSeq;
228✔
1498

1499
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
228✔
1500
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
228✔
1501
    if (!tmp) {
228!
1502
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1503
      return terrno;
×
1504
    }
1505
  } else if (pBasicParam->isVtbTagScan) {
966✔
1506
    SSourceDataInfo dataInfo = {0};
24✔
1507
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
24✔
1508
    dataInfo.taskId = pExchangeInfo->pTaskId;
24✔
1509
    dataInfo.index = pIdx->srcIdx;
24✔
1510
    dataInfo.window = pBasicParam->window;
24✔
1511
    dataInfo.colMap = NULL;
24✔
1512

1513
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
24✔
1514
    if (dataInfo.pSrcUidList == NULL) {
24!
1515
      return terrno;
×
1516
    }
1517

1518
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
24✔
1519
    dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
24✔
1520
    dataInfo.srcOpType = pBasicParam->srcOpType;
24✔
1521
    dataInfo.tableSeq = pBasicParam->tableSeq;
24✔
1522

1523
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
24✔
1524
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
24✔
1525
    if (!tmp) {
24!
1526
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1527
      return terrno;
×
1528
    }
1529
  } else {
1530
    if (pIdx->inUseIdx < 0) {
942✔
1531
      SSourceDataInfo dataInfo = {0};
924✔
1532
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
924✔
1533
      dataInfo.taskId = pExchangeInfo->pTaskId;
924✔
1534
      dataInfo.index = pIdx->srcIdx;
924✔
1535
      if (pBasicParam->isVtbRefScan) {
924!
1536
        dataInfo.window = pBasicParam->window;
×
1537
        dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1538
        dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1539
        tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1540
        dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1541
      }
1542

1543
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
924✔
1544
      if (dataInfo.pSrcUidList == NULL) {
924!
1545
        return terrno;
×
1546
      }
1547

1548
      dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
924✔
1549
      dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
924✔
1550
      dataInfo.srcOpType = pBasicParam->srcOpType;
924✔
1551
      dataInfo.tableSeq = pBasicParam->tableSeq;
924✔
1552

1553
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
924✔
1554
      if (!tmp) {
924!
1555
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1556
        return terrno;
×
1557
      }
1558
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
924✔
1559
    } else {
1560
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
18✔
1561
      if (!pDataInfo) {
18!
1562
        return terrno;
×
1563
      }
1564
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
18!
1565
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
18✔
1566
      }
1567

1568
      if (pBasicParam->isVtbRefScan) {
18!
1569
        pDataInfo->window = pBasicParam->window;
×
1570
        if (!pDataInfo->colMap) {
×
1571
          pDataInfo->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1572
        }
1573
        pDataInfo->colMap->vgId = pBasicParam->colMap->vgId;
×
1574
        tstrncpy(pDataInfo->colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1575
        pDataInfo->colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1576
      }
1577

1578
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
18✔
1579
      if (pDataInfo->pSrcUidList == NULL) {
18!
1580
        return terrno;
×
1581
      }
1582

1583
      pDataInfo->isVtbRefScan = pBasicParam->isVtbRefScan;
18✔
1584
      pDataInfo->isVtbTagScan = pBasicParam->isVtbTagScan;
18✔
1585
      pDataInfo->srcOpType = pBasicParam->srcOpType;
18✔
1586
      pDataInfo->tableSeq = pBasicParam->tableSeq;
18✔
1587
    }
1588
  }
1589

1590
  return TSDB_CODE_SUCCESS;
1,194✔
1591
}
1592

1593
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
831✔
1594
  SExchangeInfo*               pExchangeInfo = pOperator->info;
831✔
1595
  int32_t                      code = TSDB_CODE_SUCCESS;
831✔
1596
  SExchangeOperatorBasicParam* pBasicParam = NULL;
831✔
1597
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
831✔
1598
  if (pParam->multiParams) {
831✔
1599
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
543✔
1600
    int32_t                      iter = 0;
543✔
1601
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
1,449✔
1602
      code = addSingleExchangeSource(pOperator, pBasicParam);
906✔
1603
      if (code) {
906!
1604
        return code;
×
1605
      }
1606
    }
1607
  } else {
1608
    pBasicParam = &pParam->basic;
288✔
1609
    code = addSingleExchangeSource(pOperator, pBasicParam);
288✔
1610
  }
1611

1612
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
831✔
1613
  pOperator->pOperatorGetParam = NULL;
831✔
1614

1615
  return code;
831✔
1616
}
1617

1618
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
16,794,256✔
1619
  SExchangeInfo* pExchangeInfo = pOperator->info;
16,794,256✔
1620
  int32_t        code = TSDB_CODE_SUCCESS;
16,794,256✔
1621
  int32_t        lino = 0;
16,794,256✔
1622
  
1623
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
16,794,256✔
1624
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
3,468,369✔
1625
    qDebug("skip prepare, opened:%d, dynamicOp:%d, getParam:%p", OPTR_IS_OPENED(pOperator), pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
13,328,603✔
1626
    return TSDB_CODE_SUCCESS;
13,328,685✔
1627
  }
1628

1629
  if (pExchangeInfo->dynamicOp) {
3,465,653✔
1630
    code = addDynamicExchangeSource(pOperator);
831✔
1631
    QUERY_CHECK_CODE(code, lino, _end);
831!
1632
  }
1633

1634
  if (pOperator->status == OP_NOT_OPENED && (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) || IS_STREAM_MODE(pOperator->pTaskInfo)) {
3,465,653!
1635
    pExchangeInfo->current = 0;
21,280✔
1636
  }
1637

1638
  int64_t st = taosGetTimestampUs();
3,466,068✔
1639

1640
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
3,466,068✔
1641
    code = prepareConcurrentlyLoad(pOperator);
3,444,391✔
1642
    QUERY_CHECK_CODE(code, lino, _end);
3,444,356!
1643
    pExchangeInfo->openedTs = taosGetTimestampUs();
3,444,378✔
1644
  }
1645

1646
  OPTR_SET_OPENED(pOperator);
3,466,055✔
1647
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
3,466,051✔
1648

1649
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
3,466,051✔
1650

1651
_end:
2,954,958✔
1652
  if (code != TSDB_CODE_SUCCESS) {
3,466,065!
1653
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1654
    pOperator->pTaskInfo->code = code;
×
1655
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1656
  }
1657
  return TSDB_CODE_SUCCESS;
3,466,065✔
1658
}
1659

1660
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
18,711✔
1661
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
18,711✔
1662

1663
  if (pLimitInfo->remainGroupOffset > 0) {
18,711!
1664
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1665
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1666
      blockDataCleanup(pBlock);
×
1667
      return PROJECT_RETRIEVE_CONTINUE;
×
1668
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1669
      // now it is the data from a new group
1670
      pLimitInfo->remainGroupOffset -= 1;
×
1671

1672
      // ignore data block in current group
1673
      if (pLimitInfo->remainGroupOffset > 0) {
×
1674
        blockDataCleanup(pBlock);
×
1675
        return PROJECT_RETRIEVE_CONTINUE;
×
1676
      }
1677
    }
1678

1679
    // set current group id of the project operator
1680
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1681
  }
1682

1683
  // here check for a new group data, we need to handle the data of the previous group.
1684
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
18,711✔
1685
    pLimitInfo->numOfOutputGroups += 1;
495✔
1686
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
495!
1687
      pOperator->status = OP_EXEC_DONE;
×
1688
      blockDataCleanup(pBlock);
×
1689

1690
      return PROJECT_RETRIEVE_DONE;
×
1691
    }
1692

1693
    // reset the value for a new group data
1694
    resetLimitInfoForNextGroup(pLimitInfo);
495✔
1695
    // existing rows that belongs to previous group.
1696
    if (pBlock->info.rows > 0) {
495!
1697
      return PROJECT_RETRIEVE_DONE;
495✔
1698
    }
1699
  }
1700

1701
  // here we reach the start position, according to the limit/offset requirements.
1702

1703
  // set current group id
1704
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
18,216✔
1705

1706
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
18,216✔
1707
  if (pBlock->info.rows == 0) {
18,216✔
1708
    return PROJECT_RETRIEVE_CONTINUE;
8,211✔
1709
  } else {
1710
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
10,005!
1711
      setOperatorCompleted(pOperator);
×
1712
      return PROJECT_RETRIEVE_DONE;
×
1713
    }
1714
  }
1715

1716
  // todo optimize performance
1717
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1718
  // they may not belong to the same group the limit/offset value is not valid in this case.
1719
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
10,005!
1720
    return PROJECT_RETRIEVE_DONE;
10,005✔
1721
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1722
    return PROJECT_RETRIEVE_CONTINUE;
×
1723
  }
1724
}
1725

1726
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
5,307,779✔
1727
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
5,307,779✔
1728
  int32_t        code = TSDB_CODE_SUCCESS;
5,307,779✔
1729
  if (pTask->pWorkerCb) {
5,307,779!
1730
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
5,307,835✔
1731
    if (code != TSDB_CODE_SUCCESS) {
5,310,024!
1732
      pTask->code = code;
×
1733
      return pTask->code;
×
1734
    }
1735
  }
1736

1737
  code = tsem_wait(&pExchangeInfo->ready);
5,309,968✔
1738
  if (code != TSDB_CODE_SUCCESS) {
5,309,917!
1739
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1740
    pTask->code = code;
×
1741
    return pTask->code;
×
1742
  }
1743

1744
  if (pTask->pWorkerCb) {
5,309,922✔
1745
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
5,309,921✔
1746
    if (code != TSDB_CODE_SUCCESS) {
5,310,035!
1747
      pTask->code = code;
×
1748
      return pTask->code;
×
1749
    }
1750
  }
1751
  return TSDB_CODE_SUCCESS;
5,310,036✔
1752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc