• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4747

21 Sep 2025 11:53PM UTC coverage: 58.002% (-1.1%) from 59.065%
#4747

push

travis-ci

web-flow
fix: refine python taos error log matching in checkAsan.sh (#33029)

* fix: refine python taos error log matching in checkAsan.sh

* fix: improve python taos error log matching in checkAsan.sh

133398 of 293157 branches covered (45.5%)

Branch coverage included in aggregate %.

201778 of 284713 relevant lines covered (70.87%)

5539418.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.04
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t            index;
36
  int64_t            seqId;
37
  SRWLatch           lock;
38
  SRetrieveTableRsp* pRsp;
39
  uint64_t           totalRows;
40
  int64_t            startTime;
41
  int32_t            code;
42
  EX_SOURCE_STATUS   status;
43
  const char*        taskId;
44
  SArray*            pSrcUidList;
45
  int32_t            srcOpType;
46
  bool               tableSeq;
47
  char*              decompBuf;
48
  int32_t            decompBufSize;
49
  SOrgTbInfo*        colMap;
50
  bool               isVtbRefScan;
51
  bool               isVtbTagScan;
52
  STimeWindow        window;
53
  bool               fetchSent; // need reset
54
} SSourceDataInfo;
55

56
static void destroyExchangeOperatorInfo(void* param);
57
static void freeBlock(void* pParam);
58
static void freeSourceDataInfo(void* param);
59
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
60

61
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
62
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
63
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
64
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
65
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
66
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
67
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
68
                                 bool holdDataInBuf);
69
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
70

71
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
72

73

74
static void streamConcurrentlyLoadRemoteData(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
37,914✔
75
                                           SExecTaskInfo* pTaskInfo) {
76
  int32_t code = 0;
37,914✔
77
  int32_t lino = 0;
75,828✔
78
  int64_t startTs = taosGetTimestampUs();  
37,917✔
79
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
37,917✔
80
  int32_t completed = 0;
37,918✔
81
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
37,918✔
82
  if (code != TSDB_CODE_SUCCESS) {
37,916!
83
    pTaskInfo->code = code;
×
84
    T_LONG_JMP(pTaskInfo->env, code);
×
85
  }
86
  if (completed == totalSources) {
37,916✔
87
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
9,498✔
88
    setAllSourcesCompleted(pOperator);
9,498✔
89
    return;
37,909✔
90
  }
91

92
  SSourceDataInfo* pDataInfo = NULL;
28,418✔
93

94
  while (1) {
13,978✔
95
    if (pExchangeInfo->current < 0) {
42,396!
96
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
×
97
      setAllSourcesCompleted(pOperator);
×
98
      return;
×
99
    }
100
    
101
    if (pExchangeInfo->current >= totalSources) {
42,396✔
102
      completed = 0;
18,883✔
103
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
18,883✔
104
      if (code != TSDB_CODE_SUCCESS) {
18,883!
105
        pTaskInfo->code = code;
×
106
        T_LONG_JMP(pTaskInfo->env, code);
×
107
      }
108
      if (completed == totalSources) {
18,883✔
109
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
13,967✔
110
        setAllSourcesCompleted(pOperator);
13,967✔
111
        return;
13,967✔
112
      }
113
      
114
      pExchangeInfo->current = 0;
4,916✔
115
    }
116

117
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
28,429✔
118

119
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
28,429✔
120
    if (!pDataInfo) {
28,430!
121
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
122
      pTaskInfo->code = terrno;
×
123
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
124
    }
125

126
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
28,431!
127
      pExchangeInfo->current++;
×
128
      continue;
×
129
    }
130

131
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
28,431✔
132

133
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
28,431✔
134
    if (code != TSDB_CODE_SUCCESS) {
28,434!
135
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
136
      pTaskInfo->code = code;
×
137
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
138
    }
139

140
    while (true) {
3✔
141
      code = exchangeWait(pOperator, pExchangeInfo);
28,437✔
142
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
28,437!
143
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
3!
144
      }
145

146
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
28,434✔
147
      if (pDataInfo->seqId != currSeqId) {
28,434✔
148
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
3✔
149
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
150
        taosMemoryFreeClear(pDataInfo->pRsp);
3!
151
        continue;
3✔
152
      }
153

154
      break;
28,431✔
155
    }
156

157
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
28,431✔
158
    if (!pSource) {
28,430!
159
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
160
      pTaskInfo->code = terrno;
×
161
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
162
    }
163

164
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
28,430✔
165
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
7!
166
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
167
             tstrerror(pDataInfo->code));
168
      pTaskInfo->code = pDataInfo->code;
7✔
169
      T_LONG_JMP(pTaskInfo->env, code);
7!
170
    }
171

172
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
28,423✔
173
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
28,423✔
174

175
    if (pRsp->numOfRows == 0) {
28,423✔
176
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
13,979✔
177
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
178
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
179
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
180

181
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
13,979✔
182
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
13,979!
183
        pExchangeInfo->current = -1;
×
184
      } else {
185
        pExchangeInfo->current += 1;
13,979✔
186
      }
187
      taosMemoryFreeClear(pDataInfo->pRsp);
13,979!
188
      continue;
13,979✔
189
    }
190

191
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
14,444✔
192
    if (code != TSDB_CODE_SUCCESS) {
14,440!
193
      goto _exit;
×
194
    }
195

196
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
14,440✔
197
    if (pRsp->completed == 1) {
14,440✔
198
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
9,526✔
199
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
200
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
201
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
202
             pExchangeInfo->current + 1, totalSources);
203

204
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
9,526✔
205
      if (pDataInfo->isVtbRefScan) {
9,526!
206
        pExchangeInfo->current = -1;
×
207
        taosMemoryFreeClear(pDataInfo->pRsp);
×
208
        continue;
×
209
      }
210
    } else {
211
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
4,914✔
212
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
213
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
214
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
215
    }
216

217
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
14,440✔
218
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
14,441✔
219

220
    pExchangeInfo->current++;
14,441✔
221

222
    taosMemoryFreeClear(pDataInfo->pRsp);
14,441!
223
    return;
14,444✔
224
  }
225

226
_exit:
×
227

228
  if (code) {
×
229
    pTaskInfo->code = code;
×
230
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
231
  }
232
}
233

234

235
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
1,201,429✔
236
                                           SExecTaskInfo* pTaskInfo) {
237
  int32_t code = 0;
1,201,429✔
238
  int32_t lino = 0;
1,201,429✔
239
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
1,201,429✔
240
  int32_t completed = 0;
1,201,429✔
241
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
1,201,429✔
242
  if (code != TSDB_CODE_SUCCESS) {
1,201,424!
243
    pTaskInfo->code = code;
×
244
    T_LONG_JMP(pTaskInfo->env, code);
×
245
  }
246
  if (completed == totalSources) {
1,201,424✔
247
    setAllSourcesCompleted(pOperator);
406,649✔
248
    return;
1,201,432✔
249
  }
250

251
  SSourceDataInfo* pDataInfo = NULL;
794,775✔
252

253
  while (1) {
82,709✔
254
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
877,484✔
255
    code = exchangeWait(pOperator, pExchangeInfo);
877,488✔
256

257
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
877,492!
258
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
259
    }
260

261
    for (int32_t i = 0; i < totalSources; ++i) {
1,243,151✔
262
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
1,243,146✔
263
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
1,243,145!
264
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1,243,148✔
265
        continue;
303,686✔
266
      }
267

268
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
939,462✔
269
        continue;
61,973✔
270
      }
271

272
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
877,489✔
273
      if (pDataInfo->seqId != currSeqId) {
877,482!
274
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
275
        taosMemoryFreeClear(pDataInfo->pRsp);
×
276
        break;
×
277
      }
278

279
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
877,482!
280
        code = pDataInfo->code;
×
281
        TAOS_CHECK_EXIT(code);
×
282
      }
283

284
      tmemory_barrier();
877,482✔
285
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
877,482✔
286
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
877,482✔
287
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
877,492!
288

289
      // todo
290
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
877,493✔
291
      if (pRsp->numOfRows == 0) {
877,493✔
292
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
185,232!
293
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
294
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
295
          if (code != TSDB_CODE_SUCCESS) {
×
296
            taosMemoryFreeClear(pDataInfo->pRsp);
×
297
            TAOS_CHECK_EXIT(code);
×
298
          }
299
        } else {
300
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
185,232✔
301
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
185,232✔
302
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
303
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
304
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
305
          taosMemoryFreeClear(pDataInfo->pRsp);
185,232!
306
        }
307
        break;
185,232✔
308
      }
309

310
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
692,261!
311

312
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
692,260✔
313
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
692,260✔
314
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
692,261✔
315

316
      if (pRsp->completed == 1) {
692,261✔
317
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
638,608✔
318
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
638,608✔
319
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
320
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
321
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
322
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
323
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
324
      } else {
325
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
53,653!
326
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
327
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
328
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
329
      }
330

331
      taosMemoryFreeClear(pDataInfo->pRsp);
692,262!
332

333
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan) {
692,260!
334
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
53,654✔
335
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
53,654✔
336
        if (code != TSDB_CODE_SUCCESS) {
53,654!
337
          taosMemoryFreeClear(pDataInfo->pRsp);
×
338
          TAOS_CHECK_EXIT(code);
×
339
        }
340
      }
341
      
342
      return;
794,783✔
343
    }  // end loop
344

345
    int32_t complete1 = 0;
185,237✔
346
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
185,237✔
347
    if (code != TSDB_CODE_SUCCESS) {
185,232!
348
      pTaskInfo->code = code;
×
349
      T_LONG_JMP(pTaskInfo->env, code);
×
350
    }
351
    if (complete1 == totalSources) {
185,232✔
352
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
102,523✔
353
      return;
102,523✔
354
    }
355
  }
356

357
_exit:
×
358

359
  if (code) {
×
360
    pTaskInfo->code = code;
×
361
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
362
  }
363
}
364

365
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
2,778,069✔
366
  int32_t        code = TSDB_CODE_SUCCESS;
2,778,069✔
367
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,778,069✔
368
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,778,069✔
369

370
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
2,778,069✔
371

372
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
2,778,075✔
373
  if (pOperator->status == OP_EXEC_DONE) {
2,778,075!
374
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
375
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
376
           pLoadInfo->totalElapsed / 1000.0);
377
    return NULL;
×
378
  }
379

380
  // we have buffered retrieved datablock, return it directly
381
  SSDataBlock* p = NULL;
2,778,075✔
382
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
2,778,075✔
383
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
1,537,392✔
384
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
1,537,391✔
385
  }
386

387
  if (p != NULL) {
2,778,042✔
388
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
1,537,367✔
389
    if (!tmp) {
1,537,367!
390
      code = terrno;
×
391
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
392
      pTaskInfo->code = code;
×
393
      T_LONG_JMP(pTaskInfo->env, code);
×
394
    }
395
    return p;
1,537,367✔
396
  } else {
397
    if (pExchangeInfo->seqLoadData) {
1,240,675✔
398
      code = seqLoadRemoteData(pOperator);
1,328✔
399
      if (code != TSDB_CODE_SUCCESS) {
1,328!
400
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
401
        pTaskInfo->code = code;
×
402
        T_LONG_JMP(pTaskInfo->env, code);
×
403
      }
404
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
1,239,347✔
405
      streamConcurrentlyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
37,915✔
406
    } else {
407
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
1,201,432✔
408
    }
409
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
1,240,664!
410
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
×
411
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
×
412
    }
413
    
414
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
1,240,664✔
415
      qDebug("empty resultBlockList");
533,064✔
416
      return NULL;
533,064✔
417
    } else {
418
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
707,598✔
419
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
707,604✔
420
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
707,605✔
421
      if (!tmp) {
707,606!
422
        code = terrno;
×
423
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
424
        pTaskInfo->code = code;
×
425
        T_LONG_JMP(pTaskInfo->env, code);
×
426
      }
427

428
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
707,606✔
429
      return p;
707,605✔
430
    }
431
}
432
}
433

434
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,770,152✔
435
  int32_t        code = TSDB_CODE_SUCCESS;
2,770,152✔
436
  int32_t        lino = 0;
2,770,152✔
437
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,770,152✔
438
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,770,152✔
439

440
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
2,770,152✔
441

442
  code = pOperator->fpSet._openFn(pOperator);
2,770,262✔
443
  QUERY_CHECK_CODE(code, lino, _end);
2,770,281!
444

445
  if (pOperator->status == OP_EXEC_DONE) {
2,770,281✔
446
    (*ppRes) = NULL;
199✔
447
    return code;
199✔
448
  }
449

450
  while (1) {
7,996✔
451
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
2,778,078✔
452
    if (pBlock == NULL) {
2,778,028✔
453
      (*ppRes) = NULL;
533,064✔
454
      return code;
533,064✔
455
    }
456

457
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
2,244,964✔
458
    QUERY_CHECK_CODE(code, lino, _end);
2,244,965!
459

460
    if (blockDataGetNumOfRows(pBlock) == 0) {
2,244,965✔
461
      qDebug("rows 0 block got, continue next load");
2!
462
      continue;
2✔
463
    }
464

465
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
2,244,964✔
466
    if (hasLimitOffsetInfo(pLimitInfo)) {
2,244,964✔
467
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
18,116✔
468
      if (status == PROJECT_RETRIEVE_CONTINUE) {
18,116✔
469
        qDebug("limit retrieve continue");
7,994✔
470
        continue;
7,994✔
471
      } else if (status == PROJECT_RETRIEVE_DONE) {
10,122!
472
        if (pBlock->info.rows == 0) {
10,122!
473
          setOperatorCompleted(pOperator);
×
474
          (*ppRes) = NULL;
×
475
          return code;
×
476
        } else {
477
          (*ppRes) = pBlock;
10,122✔
478
          return code;
10,122✔
479
        }
480
      }
481
    } else {
482
      (*ppRes) = pBlock;
2,226,849✔
483
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
2,226,849✔
484
      return code;
2,226,850✔
485
    }
486
  }
487

488
_end:
×
489

490
  if (code != TSDB_CODE_SUCCESS) {
×
491
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
492
    pTaskInfo->code = code;
×
493
    T_LONG_JMP(pTaskInfo->env, code);
×
494
  } else {
495
    qDebug("empty block returned in exchange");
×
496
  }
497
  
498
  (*ppRes) = NULL;
×
499
  return code;
×
500
}
501

502
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
579,341✔
503
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
579,341✔
504
  if (pInfo->pSourceDataInfo == NULL) {
579,327!
505
    return terrno;
×
506
  }
507

508
  if (pInfo->dynamicOp) {
579,327✔
509
    return TSDB_CODE_SUCCESS;
12,506✔
510
  }
511

512
  int32_t len = strlen(id) + 1;
566,821✔
513
  pInfo->pTaskId = taosMemoryCalloc(1, len);
566,821!
514
  if (!pInfo->pTaskId) {
566,846!
515
    return terrno;
×
516
  }
517
  tstrncpy(pInfo->pTaskId, id, len);
566,846✔
518
  for (int32_t i = 0; i < numOfSources; ++i) {
1,395,506✔
519
    SSourceDataInfo dataInfo = {0};
828,629✔
520
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
828,629✔
521
    dataInfo.taskId = pInfo->pTaskId;
828,629✔
522
    dataInfo.index = i;
828,629✔
523
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
828,629✔
524
    if (pDs == NULL) {
828,629!
525
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
526
      return terrno;
×
527
    }
528
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
828,629✔
529
  }
530

531
  return TSDB_CODE_SUCCESS;
566,877✔
532
}
533

534
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
579,341✔
535
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
579,341!
536

537
  if (numOfSources == 0) {
579,341!
538
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
539
    return TSDB_CODE_INVALID_PARA;
×
540
  }
541
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
579,341✔
542
  if (!pInfo->pFetchRpcHandles) {
579,347!
543
    return terrno;
×
544
  }
545
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
579,347✔
546
  if (!ret) {
579,343!
547
    return terrno;
×
548
  }
549

550
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
579,343✔
551
  if (pInfo->pSources == NULL) {
579,334!
552
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
553
    return terrno;
×
554
  }
555

556
  if (pExNode->node.dynamicOp) {
579,334✔
557
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
12,506✔
558
    if (NULL == pInfo->pHashSources) {
12,505!
559
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
560
      return terrno;
×
561
    }
562
  }
563

564
  for (int32_t i = 0; i < numOfSources; ++i) {
1,433,155✔
565
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
853,811✔
566
    if (!pNode) {
853,812!
567
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
568
      return terrno;
×
569
    }
570
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
853,812✔
571
    if (!tmp) {
853,824!
572
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
573
      return terrno;
×
574
    }
575
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
853,824✔
576
    int32_t           code =
577
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
853,824✔
578
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
853,822!
579
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
580
      return code;
×
581
    }
582
  }
583

584
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
579,344✔
585
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
579,347✔
586
  if (refId < 0) {
579,348!
587
    int32_t code = terrno;
×
588
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
589
    return code;
×
590
  } else {
591
    pInfo->self = refId;
579,348✔
592
  }
593

594
  return initDataSource(numOfSources, pInfo, id);
579,348✔
595
}
596

597
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
23,986✔
598
  SExchangeInfo* pInfo = pOper->info;
23,986✔
599
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
23,986✔
600

601
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
23,986✔
602

603
  atomic_add_fetch_64(&pInfo->seqId, 1);
23,986✔
604
  pOper->status = OP_NOT_OPENED;
23,986✔
605
  pInfo->current = 0;
23,986✔
606
  pInfo->loadInfo.totalElapsed = 0;
23,986✔
607
  pInfo->loadInfo.totalRows = 0;
23,986✔
608
  pInfo->loadInfo.totalSize = 0;
23,986✔
609
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
48,044✔
610
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
24,058✔
611
    taosWLockLatch(&pDataInfo->lock);
24,058✔
612
    taosMemoryFreeClear(pDataInfo->decompBuf);
24,058!
613
    taosMemoryFreeClear(pDataInfo->pRsp);
24,058!
614

615
    pDataInfo->totalRows = 0;
24,058✔
616
    pDataInfo->code = 0;
24,058✔
617
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
24,058✔
618
    pDataInfo->fetchSent = false;
24,058✔
619
    taosWUnLockLatch(&pDataInfo->lock);
24,058✔
620
  }
621

622
  if (pInfo->dynamicOp) {
23,986✔
623
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
16✔
624
  } 
625

626
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
23,986✔
627
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
23,986✔
628

629
  blockDataCleanup(pInfo->pDummyBlock);
23,986✔
630

631
  void   *data = NULL;
23,986✔
632
  int32_t iter = 0;
23,986✔
633
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
24,002✔
634
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
16✔
635
  }
636
  
637
  pInfo->limitInfo = (SLimitInfo){0};
23,986✔
638
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
23,986✔
639

640
  return 0;
23,986✔
641
}
642

643
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
579,342✔
644
                                   SOperatorInfo** pOptrInfo) {
645
  QRY_PARAM_CHECK(pOptrInfo);
579,342!
646

647
  int32_t        code = 0;
579,342✔
648
  int32_t        lino = 0;
579,342✔
649
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
579,342!
650
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
579,345!
651
  if (pInfo == NULL || pOperator == NULL) {
579,341!
652
    code = terrno;
×
653
    goto _error;
×
654
  }
655

656
  pOperator->pPhyNode = pExNode;
579,341✔
657
  pInfo->dynamicOp = pExNode->node.dynamicOp;
579,341✔
658
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
579,341✔
659
  QUERY_CHECK_CODE(code, lino, _error);
579,348!
660

661
  code = tsem_init(&pInfo->ready, 0, 0);
579,348✔
662
  QUERY_CHECK_CODE(code, lino, _error);
579,349!
663

664
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
579,349✔
665
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
579,352!
666

667
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
579,352✔
668
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
579,345!
669
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
579,345✔
670
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
579,339!
671

672
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
579,339✔
673
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
579,339✔
674
  QUERY_CHECK_CODE(code, lino, _error);
579,350!
675

676
  pInfo->seqLoadData = pExNode->seqRecvData;
579,350✔
677
  pInfo->dynTbname = pExNode->dynTbname;
579,350✔
678
  if (pInfo->dynTbname) {
579,350✔
679
    pInfo->seqLoadData = true;
6✔
680
  }
681
  pInfo->pTransporter = pTransporter;
579,350✔
682

683
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
579,350✔
684
                  pTaskInfo);
685
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
579,345✔
686

687
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
579,349✔
688
                            pTaskInfo->pStreamRuntimeInfo);
579,349✔
689
  QUERY_CHECK_CODE(code, lino, _error);
579,348!
690
  qTrace("%s exchange op:%p", __func__, pOperator);
579,348✔
691
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
579,348✔
692
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
693
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
579,345✔
694
  *pOptrInfo = pOperator;
579,344✔
695
  return TSDB_CODE_SUCCESS;
579,344✔
696

697
_error:
×
698
  if (code != TSDB_CODE_SUCCESS) {
×
699
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
700
    pTaskInfo->code = code;
×
701
  }
702
  if (pInfo != NULL) {
×
703
    doDestroyExchangeOperatorInfo(pInfo);
×
704
  }
705

706
  if (pOperator != NULL) {
×
707
    pOperator->info = NULL;
×
708
    destroyOperator(pOperator);
×
709
  }
710
  return code;
×
711
}
712

713
void destroyExchangeOperatorInfo(void* param) {
579,347✔
714
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
579,347✔
715
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
579,347✔
716
  if (code != TSDB_CODE_SUCCESS) {
579,352!
717
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
718
  }
719
}
579,352✔
720

721
void freeBlock(void* pParam) {
1,509,953✔
722
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
1,509,953✔
723
  blockDataDestroy(pBlock);
1,509,953✔
724
}
1,509,962✔
725

726
void freeSourceDataInfo(void* p) {
829,587✔
727
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
829,587✔
728
  taosMemoryFreeClear(pInfo->decompBuf);
829,587!
729
  taosMemoryFreeClear(pInfo->pRsp);
829,587!
730

731
  pInfo->decompBufSize = 0;
829,587✔
732
}
829,587✔
733

734
void doDestroyExchangeOperatorInfo(void* param) {
579,352✔
735
  if (param == NULL) {
579,352!
736
    return;
×
737
  }
738
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
579,352✔
739
  if (pExInfo->pFetchRpcHandles) {
579,352!
740
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
1,433,177✔
741
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
853,826✔
742
      if (*pRpcHandle > 0) {
853,825✔
743
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
1,811✔
744
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
1,811✔
745
      }
746
    }
747
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
579,351✔
748
  }
749

750
  taosArrayDestroy(pExInfo->pSources);
579,350✔
751
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
579,351✔
752

753
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
579,350✔
754
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
579,351✔
755

756
  blockDataDestroy(pExInfo->pDummyBlock);
579,353✔
757
  tSimpleHashCleanup(pExInfo->pHashSources);
579,352✔
758

759
  int32_t code = tsem_destroy(&pExInfo->ready);
579,352✔
760
  if (code != TSDB_CODE_SUCCESS) {
579,350!
761
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
762
  }
763
  taosMemoryFreeClear(pExInfo->pTaskId);
579,350!
764

765
  taosMemoryFreeClear(param);
579,353!
766
}
767

768
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
908,487✔
769
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
908,487✔
770

771
  taosMemoryFreeClear(pMsg->pEpSet);
908,487!
772
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
908,487✔
773
  if (pExchangeInfo == NULL) {
908,494✔
774
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
50!
775
    taosMemoryFree(pMsg->pData);
50!
776
    return TSDB_CODE_SUCCESS;
50✔
777
  }
778

779
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
908,444✔
780
  if (pWrapper->seqId != currSeqId) {
908,423!
781
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
782
    taosMemoryFree(pMsg->pData);
×
783
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
784
    if (code != TSDB_CODE_SUCCESS) {
×
785
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
786
    }
787
    return TSDB_CODE_SUCCESS;
×
788
  }
789

790
  int32_t          index = pWrapper->sourceIndex;
908,423✔
791

792
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
908,423✔
793

794
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
908,437✔
795
  if (pRpcHandle != NULL) {
908,412✔
796
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
908,386✔
797
    if (ret != 0) {
908,444!
798
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
×
799
    }
800
    *pRpcHandle = -1;
908,446✔
801
  }
802

803
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
908,472✔
804
  if (!pSourceDataInfo) {
908,438!
805
    return terrno;
×
806
  }
807

808
  if (0 == code && NULL == pMsg->pData) {
908,438!
809
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
810
    code = TSDB_CODE_QRY_INVALID_MSG;
×
811
  }
812

813
  taosWLockLatch(&pSourceDataInfo->lock);
908,438✔
814
  if (code == TSDB_CODE_SUCCESS) {
908,443✔
815
    pSourceDataInfo->seqId = pWrapper->seqId;
908,436✔
816
    pSourceDataInfo->pRsp = pMsg->pData;
908,436✔
817

818
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
908,436✔
819
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
908,436✔
820
    pRsp->compLen = htonl(pRsp->compLen);
908,383✔
821
    pRsp->payloadLen = htonl(pRsp->payloadLen);
908,383✔
822
    pRsp->numOfCols = htonl(pRsp->numOfCols);
908,383✔
823
    pRsp->useconds = htobe64(pRsp->useconds);
908,383✔
824
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
908,424✔
825

826
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
908,424✔
827
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
828
  } else {
829
    taosMemoryFree(pMsg->pData);
7!
830
    pSourceDataInfo->code = rpcCvtErrCode(code);
10✔
831
    if (pSourceDataInfo->code != code) {
10!
832
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
833
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
834
    } else {
835
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
10!
836
             pExchangeInfo);
837
    }
838
  }
839

840
  tmemory_barrier();
908,444✔
841
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
908,444✔
842
  taosWUnLockLatch(&pSourceDataInfo->lock);
908,444✔
843
  
844
  code = tsem_post(&pExchangeInfo->ready);
908,421✔
845
  if (code != TSDB_CODE_SUCCESS) {
908,407!
846
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
847
    return code;
×
848
  }
849

850
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
908,407✔
851
  if (code != TSDB_CODE_SUCCESS) {
908,457!
852
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
853
  }
854
  return code;
908,458✔
855
}
856

857
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
999✔
858
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
999!
859
  if (NULL == *ppRes) {
999!
860
    return terrno;
×
861
  }
862

863
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
999!
864
  if (NULL == pScan) {
999!
865
    taosMemoryFreeClear(*ppRes);
×
866
    return terrno;
×
867
  }
868

869
  pScan->pUidList = taosArrayDup(pUidList, NULL);
999✔
870
  if (NULL == pScan->pUidList) {
999!
871
    taosMemoryFree(pScan);
×
872
    taosMemoryFreeClear(*ppRes);
×
873
    return terrno;
×
874
  }
875
  pScan->tableSeq = tableSeq;
999✔
876
  pScan->pOrgTbInfo = NULL;
999✔
877
  pScan->window.skey = INT64_MAX;
999✔
878
  pScan->window.ekey = INT64_MIN;
999✔
879

880
  (*ppRes)->opType = srcOpType;
999✔
881
  (*ppRes)->downstreamIdx = 0;
999✔
882
  (*ppRes)->value = pScan;
999✔
883
  (*ppRes)->pChildren = NULL;
999✔
884
  (*ppRes)->reUse = false;
999✔
885

886
  return TSDB_CODE_SUCCESS;
999✔
887
}
888

889
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window) {
260✔
890
  int32_t                  code = TSDB_CODE_SUCCESS;
260✔
891
  int32_t                  lino = 0;
260✔
892
  STableScanOperatorParam* pScan = NULL;
260✔
893

894
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
260!
895
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
260!
896

897
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
260!
898
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
260!
899

900
  pScan->pUidList = taosArrayDup(pUidList, NULL);
260✔
901
  QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
260!
902

903
  pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
260!
904
  QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
260!
905

906
  pScan->pOrgTbInfo->vgId = pMap->vgId;
260✔
907
  tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
260✔
908

909
  pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
260✔
910
  QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
260!
911

912
  pScan->tableSeq = tableSeq;
260✔
913
  pScan->window.skey = window->skey;
260✔
914
  pScan->window.ekey = window->ekey;
260✔
915

916
  (*ppRes)->opType = srcOpType;
260✔
917
  (*ppRes)->downstreamIdx = 0;
260✔
918
  (*ppRes)->value = pScan;
260✔
919
  (*ppRes)->pChildren = NULL;
260✔
920
  (*ppRes)->reUse = false;
260✔
921

922
  return code;
260✔
923
_return:
×
924
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
925
  taosMemoryFreeClear(*ppRes);
×
926
  if (pScan) {
×
927
    taosArrayDestroy(pScan->pUidList);
×
928
    if (pScan->pOrgTbInfo) {
×
929
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
930
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
931
    }
932
    taosMemoryFree(pScan);
×
933
  }
934
  return code;
×
935
}
936

937
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
40✔
938
  int32_t                  code = TSDB_CODE_SUCCESS;
40✔
939
  int32_t                  lino = 0;
40✔
940
  STagScanOperatorParam*   pScan = NULL;
40✔
941

942
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
40!
943
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
40!
944

945
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
40!
946
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
40!
947
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
40✔
948

949
  (*ppRes)->opType = srcOpType;
40✔
950
  (*ppRes)->downstreamIdx = 0;
40✔
951
  (*ppRes)->value = pScan;
40✔
952
  (*ppRes)->pChildren = NULL;
40✔
953
  (*ppRes)->reUse = false;
40✔
954

955
  return code;
40✔
956
_return:
×
957
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
958
  taosMemoryFreeClear(*ppRes);
×
959
  if (pScan) {
×
960
    taosMemoryFree(pScan);
×
961
  }
962
  return code;
×
963
}
964

965
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
910,267✔
966
  int32_t          code = TSDB_CODE_SUCCESS;
910,267✔
967
  int32_t          lino = 0;
910,267✔
968
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
910,267✔
969
  if (!pDataInfo) {
910,268!
970
    return terrno;
×
971
  }
972

973
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
910,268!
974
    return TSDB_CODE_SUCCESS;
×
975
  }
976

977
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
910,268✔
978
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
910,268✔
979
  if (!pSource) {
910,269!
980
    return terrno;
×
981
  }
982

983
  pDataInfo->startTime = taosGetTimestampUs();
910,272✔
984
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
910,272✔
985

986
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
910,272!
987
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
910,277!
988
  pWrapper->exchangeId = pExchangeInfo->self;
910,277✔
989
  pWrapper->sourceIndex = sourceIndex;
910,277✔
990
  pWrapper->seqId = pExchangeInfo->seqId;
910,277✔
991

992
  if (pSource->localExec) {
910,277!
993
    SDataBuf pBuf = {0};
×
994
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
×
995
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
996
                                               pTaskInfo->localFetch.explainRes);
997
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
998
    taosMemoryFree(pWrapper);
×
999
    QUERY_CHECK_CODE(code, lino, _end);
×
1000
  } else {
1001
    SResFetchReq req = {0};
910,277✔
1002
    req.header.vgId = pSource->addr.nodeId;
910,277✔
1003
    req.sId = pSource->sId;
910,277✔
1004
    req.clientId = pSource->clientId;
910,277✔
1005
    req.taskId = pSource->taskId;
910,277✔
1006
    req.queryId = pTaskInfo->id.queryId;
910,277✔
1007
    req.execId = pSource->execId;
910,277✔
1008
    if (pTaskInfo->pStreamRuntimeInfo) {
910,277✔
1009
      req.dynTbname = pExchangeInfo->dynTbname;
28,488✔
1010
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
28,488✔
1011
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER)
28,488✔
1012
        qDebug("doSendFetchDataRequest to execId:%d, %p", req.execId, pTaskInfo->pStreamRuntimeInfo);
11!
1013
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
28,484✔
1014
      if (!pDataInfo->fetchSent) {
28,484✔
1015
        req.reset = pDataInfo->fetchSent = true;
23,569✔
1016
      }
1017
    }
1018
    if (pDataInfo->isVtbRefScan) {
910,273✔
1019
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window);
260✔
1020
      taosArrayDestroy(pDataInfo->colMap->colMap);
260✔
1021
      taosMemoryFreeClear(pDataInfo->colMap);
260!
1022
      taosArrayDestroy(pDataInfo->pSrcUidList);
260✔
1023
      pDataInfo->pSrcUidList = NULL;
260✔
1024
      if (TSDB_CODE_SUCCESS != code) {
260!
1025
        pTaskInfo->code = code;
×
1026
        taosMemoryFree(pWrapper);
×
1027
        return pTaskInfo->code;
×
1028
      }
1029
    } else if (pDataInfo->isVtbTagScan) {
910,013✔
1030
      code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
40✔
1031
      taosArrayDestroy(pDataInfo->pSrcUidList);
40✔
1032
      pDataInfo->pSrcUidList = NULL;
40✔
1033
      if (TSDB_CODE_SUCCESS != code) {
40!
1034
        pTaskInfo->code = code;
×
1035
        taosMemoryFree(pWrapper);
×
1036
        return pTaskInfo->code;
×
1037
      }
1038
    } else {
1039
      if (pDataInfo->pSrcUidList) {
909,973✔
1040
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
932✔
1041
        taosArrayDestroy(pDataInfo->pSrcUidList);
932✔
1042
        pDataInfo->pSrcUidList = NULL;
932✔
1043
        if (TSDB_CODE_SUCCESS != code) {
932!
1044
          pTaskInfo->code = code;
×
1045
          taosMemoryFree(pWrapper);
×
1046
          return pTaskInfo->code;
×
1047
        }
1048
      }
1049
    }
1050

1051
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
910,273✔
1052
    if (msgSize < 0) {
910,267!
1053
      pTaskInfo->code = msgSize;
×
1054
      taosMemoryFree(pWrapper);
×
1055
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1056
      return pTaskInfo->code;
×
1057
    }
1058

1059
    void* msg = taosMemoryCalloc(1, msgSize);
910,267!
1060
    if (NULL == msg) {
910,275!
1061
      pTaskInfo->code = terrno;
×
1062
      taosMemoryFree(pWrapper);
×
1063
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1064
      return pTaskInfo->code;
×
1065
    }
1066

1067
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req);
910,275✔
1068
    if (msgSize < 0) {
910,270!
1069
      pTaskInfo->code = msgSize;
×
1070
      taosMemoryFree(pWrapper);
×
1071
      taosMemoryFree(msg);
×
1072
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1073
      return pTaskInfo->code;
×
1074
    }
1075

1076
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
910,270✔
1077

1078
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
910,264✔
1079
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1080
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1081
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1082

1083
    // send the fetch remote task result reques
1084
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
910,268!
1085
    if (NULL == pMsgSendInfo) {
910,277!
1086
      taosMemoryFreeClear(msg);
×
1087
      taosMemoryFree(pWrapper);
×
1088
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1089
      pTaskInfo->code = terrno;
×
1090
      return pTaskInfo->code;
×
1091
    }
1092

1093
    pMsgSendInfo->param = pWrapper;
910,277✔
1094
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
910,277✔
1095
    pMsgSendInfo->msgInfo.pData = msg;
910,277✔
1096
    pMsgSendInfo->msgInfo.len = msgSize;
910,277✔
1097
    pMsgSendInfo->msgType = pSource->fetchMsgType;
910,277✔
1098
    pMsgSendInfo->fp = loadRemoteDataCallback;
910,277✔
1099
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
910,277✔
1100

1101
    int64_t transporterId = 0;
910,277✔
1102
    void* poolHandle = NULL;
910,277✔
1103
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
910,277✔
1104
    QUERY_CHECK_CODE(code, lino, _end);
910,278!
1105
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
910,278✔
1106
    *pRpcHandle = transporterId;
910,276✔
1107
  }
1108

1109
_end:
910,276✔
1110
  if (code != TSDB_CODE_SUCCESS) {
910,276!
1111
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1112
  }
1113
  return code;
910,275✔
1114
}
1115

1116
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
725,186✔
1117
                          SOperatorInfo* pOperator) {
1118
  pInfo->totalRows += numOfRows;
725,186✔
1119
  pInfo->totalSize += dataLen;
725,186✔
1120
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
725,189✔
1121
  pOperator->resultInfo.totalRows += numOfRows;
725,189✔
1122
}
725,189✔
1123

1124
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
2,311,808✔
1125
  int32_t      code = TSDB_CODE_SUCCESS;
2,311,808✔
1126
  int32_t      lino = 0;
2,311,808✔
1127
  SSDataBlock* pBlock = NULL;
2,311,808✔
1128
  if (pColList == NULL) {  // data from other sources
2,311,808✔
1129
    blockDataCleanup(pRes);
2,294,229✔
1130
    code = blockDecode(pRes, pData, (const char**)pNextStart);
2,294,214✔
1131
    if (code) {
2,294,077!
1132
      return code;
×
1133
    }
1134
  } else {  // extract data according to pColList
1135
    char* pStart = pData;
17,579✔
1136

1137
    int32_t numOfCols = htonl(*(int32_t*)pStart);
17,579✔
1138
    pStart += sizeof(int32_t);
17,579✔
1139

1140
    // todo refactor:extract method
1141
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
17,579✔
1142
    for (int32_t i = 0; i < numOfCols; ++i) {
227,855✔
1143
      SSysTableSchema* p = (SSysTableSchema*)pStart;
210,276✔
1144

1145
      p->colId = htons(p->colId);
210,276✔
1146
      p->bytes = htonl(p->bytes);
210,276✔
1147
      pStart += sizeof(SSysTableSchema);
210,276✔
1148
    }
1149

1150
    pBlock = NULL;
17,579✔
1151
    code = createDataBlock(&pBlock);
17,579✔
1152
    QUERY_CHECK_CODE(code, lino, _end);
17,586!
1153

1154
    for (int32_t i = 0; i < numOfCols; ++i) {
227,862✔
1155
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
210,276✔
1156
      code = blockDataAppendColInfo(pBlock, &idata);
210,276✔
1157
      QUERY_CHECK_CODE(code, lino, _end);
210,276!
1158
    }
1159

1160
    code = blockDecode(pBlock, pStart, NULL);
17,586✔
1161
    QUERY_CHECK_CODE(code, lino, _end);
17,586!
1162

1163
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
17,586✔
1164
    QUERY_CHECK_CODE(code, lino, _end);
17,586!
1165

1166
    // data from mnode
1167
    pRes->info.dataLoad = 1;
17,586✔
1168
    pRes->info.rows = pBlock->info.rows;
17,586✔
1169
    pRes->info.scanFlag = MAIN_SCAN;
17,586✔
1170
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
17,586✔
1171
    QUERY_CHECK_CODE(code, lino, _end);
17,586!
1172

1173
    blockDataDestroy(pBlock);
17,586✔
1174
    pBlock = NULL;
17,586✔
1175
  }
1176

1177
_end:
2,311,663✔
1178
  if (code != TSDB_CODE_SUCCESS) {
2,311,663!
1179
    blockDataDestroy(pBlock);
×
1180
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1181
  }
1182
  return code;
2,311,663✔
1183
}
1184

1185
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
430,541✔
1186
  SExchangeInfo* pExchangeInfo = pOperator->info;
430,541✔
1187
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
430,541✔
1188

1189
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
430,541✔
1190
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
430,541✔
1191
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
430,541✔
1192
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1193
         pLoadInfo->totalElapsed / 1000.0);
1194

1195
  setOperatorCompleted(pOperator);
430,541✔
1196
}
430,541✔
1197

1198
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
1,443,459✔
1199
  int32_t code = TSDB_CODE_SUCCESS;
1,443,459✔
1200
  int32_t lino = 0;
1,443,459✔
1201
  size_t  total = taosArrayGetSize(pArray);
1,443,459✔
1202

1203
  int32_t completed = 0;
1,443,458✔
1204
  for (int32_t k = 0; k < total; ++k) {
3,878,040✔
1205
    SSourceDataInfo* p = taosArrayGet(pArray, k);
2,434,578✔
1206
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
2,434,579!
1207
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
2,434,582✔
1208
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
1,153,854✔
1209
      completed += 1;
1,153,854✔
1210
    }
1211
  }
1212

1213
  *pRes = completed;
1,443,462✔
1214
_end:
1,443,462✔
1215
  if (code != TSDB_CODE_SUCCESS) {
1,443,462!
1216
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1217
  }
1218
  return code;
1,443,456✔
1219
}
1220

1221
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
565,273✔
1222
  SExchangeInfo* pExchangeInfo = pOperator->info;
565,273✔
1223
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
565,273✔
1224

1225
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
565,273✔
1226
  int64_t startTs = taosGetTimestampUs();
565,280✔
1227

1228
  // Asynchronously send all fetch requests to all sources.
1229
  for (int32_t i = 0; i < totalSources; ++i) {
1,392,214✔
1230
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
826,938✔
1231
    if (code != TSDB_CODE_SUCCESS) {
826,934!
1232
      pTaskInfo->code = code;
×
1233
      return code;
×
1234
    }
1235
  }
1236

1237
  int64_t endTs = taosGetTimestampUs();
565,278✔
1238
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
565,278✔
1239
         totalSources, (endTs - startTs) / 1000.0);
1240

1241
  pOperator->status = OP_RES_TO_RETURN;
565,281✔
1242
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
565,281✔
1243
  if (isTaskKilled(pTaskInfo)) {
565,281!
1244
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1245
  }
1246

1247
  return TSDB_CODE_SUCCESS;
565,281✔
1248
}
1249

1250
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
707,605✔
1251
  int32_t            code = TSDB_CODE_SUCCESS;
707,605✔
1252
  int32_t            lino = 0;
707,605✔
1253
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
707,605✔
1254
  SSDataBlock*       pb = NULL;
707,605✔
1255

1256
  char* pNextStart = pRetrieveRsp->data;
707,605✔
1257
  char* pStart = pNextStart;
707,605✔
1258

1259
  int32_t index = 0;
707,605✔
1260

1261
  if (pRetrieveRsp->compressed) {  // decompress the data
707,605!
1262
    if (pDataInfo->decompBuf == NULL) {
×
1263
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1264
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1265
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1266
    } else {
1267
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1268
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1269
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1270
        if (p != NULL) {
×
1271
          pDataInfo->decompBuf = p;
×
1272
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1273
        }
1274
      }
1275
    }
1276
  }
1277

1278
  while (index++ < pRetrieveRsp->numOfBlocks) {
3,001,882✔
1279
    pStart = pNextStart;
2,294,262✔
1280

1281
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
2,294,262✔
1282
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
784,345✔
1283
      blockDataCleanup(pb);
784,345✔
1284
    } else {
1285
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
1,509,914✔
1286
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
1,509,897!
1287
    }
1288

1289
    int32_t compLen = *(int32_t*)pStart;
2,294,239✔
1290
    pStart += sizeof(int32_t);
2,294,239✔
1291

1292
    int32_t rawLen = *(int32_t*)pStart;
2,294,239✔
1293
    pStart += sizeof(int32_t);
2,294,239✔
1294
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
2,294,239!
1295

1296
    pNextStart = pStart + compLen;
2,294,239✔
1297
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
2,294,239!
1298
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1299
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1300
      pStart = pDataInfo->decompBuf;
×
1301
    }
1302

1303
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
2,294,239✔
1304
    if (code != 0) {
2,294,080!
1305
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1306
      goto _end;
×
1307
    }
1308

1309
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
2,294,080✔
1310
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2,294,089!
1311
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
2,294,089✔
1312
    pb = NULL;
2,294,277✔
1313
  }
1314

1315
_end:
707,620✔
1316
  if (code != TSDB_CODE_SUCCESS) {
707,620!
1317
    blockDataDestroy(pb);
×
1318
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1319
  }
1320
  return code;
707,602✔
1321
}
1322

1323
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
1,328✔
1324
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,328✔
1325
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,328✔
1326

1327
  int32_t code = 0;
1,328✔
1328
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,328✔
1329
  int64_t startTs = taosGetTimestampUs();
1,328✔
1330

1331
  int32_t vgId = 0;
1,328✔
1332
  if (pExchangeInfo->dynTbname) {
1,328✔
1333
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
12✔
1334
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
12!
1335
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
12✔
1336
      if (pValue != NULL && pValue->isTbname) {
12!
1337
        vgId = pValue->vgId;
12✔
1338
        break;
12✔
1339
      }
1340
    }
1341
  }
1342

1343
  while (1) {
352✔
1344
    if (pExchangeInfo->current >= totalSources) {
1,680✔
1345
      setAllSourcesCompleted(pOperator);
427✔
1346
      return TSDB_CODE_SUCCESS;
427✔
1347
    }
1348

1349
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
1,253✔
1350
    if (!pSource) {
1,253!
1351
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1352
      pTaskInfo->code = terrno;
×
1353
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1354
    }
1355

1356
    if (vgId != 0 && pSource->addr.nodeId != vgId){
1,253!
1357
      pExchangeInfo->current += 1;
×
1358
      continue;
×
1359
    }
1360

1361
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
1,253✔
1362
    if (!pDataInfo) {
1,253!
1363
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1364
      pTaskInfo->code = terrno;
×
1365
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1366
    }
1367
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,253✔
1368

1369
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
1,253✔
1370
    if (code != TSDB_CODE_SUCCESS) {
1,253!
1371
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1372
      pTaskInfo->code = code;
×
1373
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1374
    }
1375

1376
    while (true) {
×
1377
      code = exchangeWait(pOperator, pExchangeInfo);
1,253✔
1378
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
1,253!
1379
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1380
      }
1381

1382
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
1,253✔
1383
      if (pDataInfo->seqId != currSeqId) {
1,253!
1384
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
1385
        taosMemoryFreeClear(pDataInfo->pRsp);
×
1386
        continue;
×
1387
      }
1388

1389
      break;
1,253✔
1390
    }
1391

1392
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
1,253!
1393
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
1394
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1395
             tstrerror(pDataInfo->code));
1396
      pOperator->pTaskInfo->code = pDataInfo->code;
×
1397
      return pOperator->pTaskInfo->code;
×
1398
    }
1399

1400
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
1,253✔
1401
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,253✔
1402

1403
    if (pRsp->numOfRows == 0) {
1,253✔
1404
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
352✔
1405
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1406
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1407
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1408

1409
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
352✔
1410
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
352!
1411
        pExchangeInfo->current = totalSources;
73✔
1412
      } else {
1413
        pExchangeInfo->current += 1;
279✔
1414
      }
1415
      taosMemoryFreeClear(pDataInfo->pRsp);
352!
1416
      continue;
352✔
1417
    }
1418

1419
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
901✔
1420
    if (code != TSDB_CODE_SUCCESS) {
901!
1421
      goto _error;
×
1422
    }
1423

1424
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
901✔
1425
    if (pRsp->completed == 1) {
901✔
1426
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
472!
1427
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1428
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1429
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1430
             pExchangeInfo->current + 1, totalSources);
1431

1432
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
472✔
1433
      if (pDataInfo->isVtbRefScan) {
472!
1434
        pExchangeInfo->current = totalSources;
×
1435
      } else {
1436
        pExchangeInfo->current += 1;
472✔
1437
      }
1438
    } else {
1439
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
429✔
1440
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1441
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1442
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1443
    }
1444
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
901!
1445
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
227✔
1446
    }
1447
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
901✔
1448
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
901✔
1449

1450
    taosMemoryFreeClear(pDataInfo->pRsp);
901!
1451
    return TSDB_CODE_SUCCESS;
901✔
1452
  }
1453

1454
_error:
×
1455
  pTaskInfo->code = code;
×
1456
  return code;
×
1457
}
1458

1459
void clearVtbScanDataInfo(void* pItem) {
59✔
1460
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
59✔
1461
  if (pInfo->colMap) {
59!
1462
    taosArrayDestroy(pInfo->colMap->colMap);
×
1463
    taosMemoryFreeClear(pInfo->colMap);
×
1464
  }
1465
  taosArrayDestroy(pInfo->pSrcUidList);
59✔
1466
}
59✔
1467

1468
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
1,232✔
1469
  SExchangeInfo*     pExchangeInfo = pOperator->info;
1,232✔
1470
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
1,232✔
1471
  if (NULL == pIdx) {
1,232!
1472
    qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1473
    return TSDB_CODE_INVALID_PARA;
×
1474
  }
1475

1476
  qDebug("start to add single exchange source");
1,232✔
1477

1478
  if (pBasicParam->isVtbRefScan) {
1,232✔
1479
    SSourceDataInfo dataInfo = {0};
260✔
1480
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
260✔
1481
    dataInfo.taskId = pExchangeInfo->pTaskId;
260✔
1482
    dataInfo.index = pIdx->srcIdx;
260✔
1483
    dataInfo.window = pBasicParam->window;
260✔
1484
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
260!
1485
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
260✔
1486
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
260✔
1487
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
260✔
1488

1489
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
260✔
1490
    if (dataInfo.pSrcUidList == NULL) {
260!
1491
      return terrno;
×
1492
    }
1493

1494
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
260✔
1495
    dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
260✔
1496
    dataInfo.srcOpType = pBasicParam->srcOpType;
260✔
1497
    dataInfo.tableSeq = pBasicParam->tableSeq;
260✔
1498

1499
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
260✔
1500
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
260✔
1501
    if (!tmp) {
260!
1502
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1503
      return terrno;
×
1504
    }
1505
  } else if (pBasicParam->isVtbTagScan) {
972✔
1506
    SSourceDataInfo dataInfo = {0};
40✔
1507
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
40✔
1508
    dataInfo.taskId = pExchangeInfo->pTaskId;
40✔
1509
    dataInfo.index = pIdx->srcIdx;
40✔
1510
    dataInfo.window = pBasicParam->window;
40✔
1511
    dataInfo.colMap = NULL;
40✔
1512

1513
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
40✔
1514
    if (dataInfo.pSrcUidList == NULL) {
40!
1515
      return terrno;
×
1516
    }
1517

1518
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
40✔
1519
    dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
40✔
1520
    dataInfo.srcOpType = pBasicParam->srcOpType;
40✔
1521
    dataInfo.tableSeq = pBasicParam->tableSeq;
40✔
1522

1523
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
40✔
1524
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
40✔
1525
    if (!tmp) {
40!
1526
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1527
      return terrno;
×
1528
    }
1529
  } else {
1530
    if (pIdx->inUseIdx < 0) {
932✔
1531
      SSourceDataInfo dataInfo = {0};
914✔
1532
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
914✔
1533
      dataInfo.taskId = pExchangeInfo->pTaskId;
914✔
1534
      dataInfo.index = pIdx->srcIdx;
914✔
1535
      if (pBasicParam->isVtbRefScan) {
914!
1536
        dataInfo.window = pBasicParam->window;
×
1537
        dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1538
        dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1539
        tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1540
        dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1541
      }
1542

1543
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
914✔
1544
      if (dataInfo.pSrcUidList == NULL) {
914!
1545
        return terrno;
×
1546
      }
1547

1548
      dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
914✔
1549
      dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
914✔
1550
      dataInfo.srcOpType = pBasicParam->srcOpType;
914✔
1551
      dataInfo.tableSeq = pBasicParam->tableSeq;
914✔
1552

1553
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
914✔
1554
      if (!tmp) {
914!
1555
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1556
        return terrno;
×
1557
      }
1558
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
914✔
1559
    } else {
1560
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
18✔
1561
      if (!pDataInfo) {
18!
1562
        return terrno;
×
1563
      }
1564
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
18!
1565
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
18✔
1566
      }
1567

1568
      if (pBasicParam->isVtbRefScan) {
18!
1569
        pDataInfo->window = pBasicParam->window;
×
1570
        if (!pDataInfo->colMap) {
×
1571
          pDataInfo->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1572
        }
1573
        pDataInfo->colMap->vgId = pBasicParam->colMap->vgId;
×
1574
        tstrncpy(pDataInfo->colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1575
        pDataInfo->colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1576
      }
1577

1578
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
18✔
1579
      if (pDataInfo->pSrcUidList == NULL) {
18!
1580
        return terrno;
×
1581
      }
1582

1583
      pDataInfo->isVtbRefScan = pBasicParam->isVtbRefScan;
18✔
1584
      pDataInfo->isVtbTagScan = pBasicParam->isVtbTagScan;
18✔
1585
      pDataInfo->srcOpType = pBasicParam->srcOpType;
18✔
1586
      pDataInfo->tableSeq = pBasicParam->tableSeq;
18✔
1587
    }
1588
  }
1589

1590
  return TSDB_CODE_SUCCESS;
1,232✔
1591
}
1592

1593
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
869✔
1594
  SExchangeInfo*               pExchangeInfo = pOperator->info;
869✔
1595
  int32_t                      code = TSDB_CODE_SUCCESS;
869✔
1596
  SExchangeOperatorBasicParam* pBasicParam = NULL;
869✔
1597
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
869✔
1598
  if (pParam->multiParams) {
869✔
1599
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
533✔
1600
    int32_t                      iter = 0;
533✔
1601
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
1,429✔
1602
      code = addSingleExchangeSource(pOperator, pBasicParam);
896✔
1603
      if (code) {
896!
1604
        return code;
×
1605
      }
1606
    }
1607
  } else {
1608
    pBasicParam = &pParam->basic;
336✔
1609
    code = addSingleExchangeSource(pOperator, pBasicParam);
336✔
1610
  }
1611

1612
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
869✔
1613
  pOperator->pOperatorGetParam = NULL;
869✔
1614

1615
  return code;
869✔
1616
}
1617

1618
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
2,908,258✔
1619
  SExchangeInfo* pExchangeInfo = pOperator->info;
2,908,258✔
1620
  int32_t        code = TSDB_CODE_SUCCESS;
2,908,258✔
1621
  int32_t        lino = 0;
2,908,258✔
1622
  
1623
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
2,908,258✔
1624
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
592,137✔
1625
    qDebug("skip prepare, opened:%d, dynamicOp:%d, getParam:%p", OPTR_IS_OPENED(pOperator), pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2,318,839✔
1626
    return TSDB_CODE_SUCCESS;
2,318,843✔
1627
  }
1628

1629
  if (pExchangeInfo->dynamicOp) {
589,419✔
1630
    code = addDynamicExchangeSource(pOperator);
869✔
1631
    QUERY_CHECK_CODE(code, lino, _end);
869!
1632
  }
1633

1634
  if (pOperator->status == OP_NOT_OPENED && (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) || IS_STREAM_MODE(pOperator->pTaskInfo)) {
589,419✔
1635
    pExchangeInfo->current = 0;
23,761✔
1636
  }
1637

1638
  int64_t st = taosGetTimestampUs();
589,430✔
1639

1640
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
589,430✔
1641
    code = prepareConcurrentlyLoad(pOperator);
565,280✔
1642
    QUERY_CHECK_CODE(code, lino, _end);
565,279!
1643
    pExchangeInfo->openedTs = taosGetTimestampUs();
565,278✔
1644
  }
1645

1646
  OPTR_SET_OPENED(pOperator);
589,428✔
1647
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
589,431✔
1648

1649
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
589,431✔
1650

1651
_end:
49,813✔
1652
  if (code != TSDB_CODE_SUCCESS) {
589,434!
1653
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1654
    pOperator->pTaskInfo->code = code;
×
1655
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1656
  }
1657
  return TSDB_CODE_SUCCESS;
589,434✔
1658
}
1659

1660
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
18,116✔
1661
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
18,116✔
1662

1663
  if (pLimitInfo->remainGroupOffset > 0) {
18,116!
1664
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1665
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1666
      blockDataCleanup(pBlock);
×
1667
      return PROJECT_RETRIEVE_CONTINUE;
×
1668
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1669
      // now it is the data from a new group
1670
      pLimitInfo->remainGroupOffset -= 1;
×
1671

1672
      // ignore data block in current group
1673
      if (pLimitInfo->remainGroupOffset > 0) {
×
1674
        blockDataCleanup(pBlock);
×
1675
        return PROJECT_RETRIEVE_CONTINUE;
×
1676
      }
1677
    }
1678

1679
    // set current group id of the project operator
1680
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1681
  }
1682

1683
  // here check for a new group data, we need to handle the data of the previous group.
1684
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
18,116✔
1685
    pLimitInfo->numOfOutputGroups += 1;
493✔
1686
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
493!
1687
      pOperator->status = OP_EXEC_DONE;
×
1688
      blockDataCleanup(pBlock);
×
1689

1690
      return PROJECT_RETRIEVE_DONE;
×
1691
    }
1692

1693
    // reset the value for a new group data
1694
    resetLimitInfoForNextGroup(pLimitInfo);
493✔
1695
    // existing rows that belongs to previous group.
1696
    if (pBlock->info.rows > 0) {
493!
1697
      return PROJECT_RETRIEVE_DONE;
493✔
1698
    }
1699
  }
1700

1701
  // here we reach the start position, according to the limit/offset requirements.
1702

1703
  // set current group id
1704
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
17,623✔
1705

1706
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
17,623✔
1707
  if (pBlock->info.rows == 0) {
17,623✔
1708
    return PROJECT_RETRIEVE_CONTINUE;
7,994✔
1709
  } else {
1710
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
9,629!
1711
      setOperatorCompleted(pOperator);
×
1712
      return PROJECT_RETRIEVE_DONE;
×
1713
    }
1714
  }
1715

1716
  // todo optimize performance
1717
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1718
  // they may not belong to the same group the limit/offset value is not valid in this case.
1719
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
9,629!
1720
    return PROJECT_RETRIEVE_DONE;
9,629✔
1721
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1722
    return PROJECT_RETRIEVE_CONTINUE;
×
1723
  }
1724
}
1725

1726
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
907,177✔
1727
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
907,177✔
1728
  int32_t        code = TSDB_CODE_SUCCESS;
907,177✔
1729
  if (pTask->pWorkerCb) {
907,177!
1730
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
907,180✔
1731
    if (code != TSDB_CODE_SUCCESS) {
907,184!
1732
      pTask->code = code;
×
1733
      return pTask->code;
×
1734
    }
1735
  }
1736

1737
  code = tsem_wait(&pExchangeInfo->ready);
907,181✔
1738
  if (code != TSDB_CODE_SUCCESS) {
907,175!
1739
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1740
    pTask->code = code;
×
1741
    return pTask->code;
×
1742
  }
1743

1744
  if (pTask->pWorkerCb) {
907,177!
1745
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
907,178✔
1746
    if (code != TSDB_CODE_SUCCESS) {
907,183!
1747
      pTask->code = code;
×
1748
      return pTask->code;
×
1749
    }
1750
  }
1751
  return TSDB_CODE_SUCCESS;
907,182✔
1752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc