• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4908

30 Dec 2025 10:52AM UTC coverage: 65.386% (-0.2%) from 65.541%
#4908

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

1330 existing lines in 113 files now uncovered.

193461 of 295877 relevant lines covered (65.39%)

115765274.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.5
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
} SSourceDataInfo;
58

59
static void destroyExchangeOperatorInfo(void* param);
60
static void freeBlock(void* pParam);
61
static void freeSourceDataInfo(void* param);
62
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
63

64
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
65
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
66
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
67
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
68
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
69
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
70
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
71
                                 bool holdDataInBuf);
72
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
73

74
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
75

76
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
12,100,486✔
77
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
78
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
79
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
10,176,756✔
80
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
81

82

83
static void streamConcurrentlyLoadRemoteData(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
8,077,648✔
84
                                           SExecTaskInfo* pTaskInfo) {
85
  int32_t code = 0;
8,077,648✔
86
  int32_t lino = 0;
8,077,648✔
87
  int64_t startTs = taosGetTimestampUs();  
8,077,648✔
88
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
8,077,648✔
89
  int32_t completed = 0;
8,077,648✔
90
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
8,077,648✔
91
  if (code != TSDB_CODE_SUCCESS) {
8,077,648✔
92
    pTaskInfo->code = code;
×
93
    T_LONG_JMP(pTaskInfo->env, code);
×
94
  }
95
  if (completed == totalSources) {
8,077,648✔
96
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
1,562,123✔
97
    setAllSourcesCompleted(pOperator);
1,562,123✔
98
    return;
1,564,344✔
99
  }
100

101
  SSourceDataInfo* pDataInfo = NULL;
6,515,525✔
102

103
  while (1) {
3,772,450✔
104
    if (pExchangeInfo->current < 0) {
10,287,975✔
105
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
96,637✔
106
      setAllSourcesCompleted(pOperator);
96,637✔
107
      return;
96,637✔
108
    }
109
    
110
    if (pExchangeInfo->current >= totalSources) {
10,191,141✔
111
      completed = 0;
4,696,600✔
112
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
4,696,600✔
113
      if (code != TSDB_CODE_SUCCESS) {
4,696,396✔
114
        pTaskInfo->code = code;
×
115
        T_LONG_JMP(pTaskInfo->env, code);
×
116
      }
117
      if (completed == totalSources) {
4,696,396✔
118
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
3,272,353✔
119
        setAllSourcesCompleted(pOperator);
3,272,353✔
120
        return;
3,272,353✔
121
      }
122
      
123
      pExchangeInfo->current = 0;
1,424,043✔
124
    }
125

126
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
6,918,584✔
127

128
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
6,918,781✔
129
    if (!pDataInfo) {
6,918,781✔
130
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
131
      pTaskInfo->code = terrno;
×
132
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
133
    }
134

135
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
6,918,781✔
136
      pExchangeInfo->current++;
880✔
137
      continue;
880✔
138
    }
139

140
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
6,917,901✔
141

142
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
6,917,901✔
143
    if (code != TSDB_CODE_SUCCESS) {
6,917,901✔
144
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
145
      pTaskInfo->code = code;
×
146
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
147
    }
148

149
    while (true) {
705✔
150
      code = exchangeWait(pOperator, pExchangeInfo);
6,918,606✔
151
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
6,918,402✔
152
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
921✔
153
      }
154

155
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
6,917,685✔
156
      if (pDataInfo->seqId != currSeqId) {
6,916,958✔
157
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
705✔
158
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
159
        taosMemoryFreeClear(pDataInfo->pRsp);
705✔
160
        continue;
705✔
161
      }
162

163
      break;
6,916,754✔
164
    }
165

166
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
6,916,754✔
167
    if (!pSource) {
6,916,992✔
168
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
169
      pTaskInfo->code = terrno;
×
170
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
171
    }
172

173
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
6,916,992✔
UNCOV
174
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
175
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
176
             tstrerror(pDataInfo->code));
UNCOV
177
      pTaskInfo->code = pDataInfo->code;
×
UNCOV
178
      T_LONG_JMP(pTaskInfo->env, code);
×
179
    }
180

181
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
6,916,816✔
182
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
6,917,196✔
183

184
    if (pRsp->numOfRows == 0) {
6,916,992✔
185
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
3,771,570✔
186
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
187
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
188
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
189

190
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
3,771,570✔
191
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
3,771,366✔
192
        pExchangeInfo->current = -1;
96,637✔
193
      } else {
194
        pExchangeInfo->current += 1;
3,674,729✔
195
      }
196
      taosMemoryFreeClear(pDataInfo->pRsp);
3,771,366✔
197
      continue;
3,771,570✔
198
    }
199

200
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,145,626✔
201
    TAOS_CHECK_EXIT(code);
3,145,626✔
202

203
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,145,626✔
204
    if (pRsp->completed == 1) {
3,145,626✔
205
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,667,426✔
206
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
207
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
208
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
209
             pExchangeInfo->current + 1, totalSources);
210

211
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,667,426✔
212
      if (isVstbScan(pDataInfo)) {
1,667,426✔
213
        pExchangeInfo->current = -1;
×
214
        taosMemoryFreeClear(pDataInfo->pRsp);
×
215
        continue;
×
216
      }
217
    } else {
218
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
1,478,200✔
219
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
220
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
221
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
222
    }
223

224
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
3,145,626✔
225
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,145,626✔
226

227
    pExchangeInfo->current++;
3,145,626✔
228

229
    taosMemoryFreeClear(pDataInfo->pRsp);
3,145,626✔
230
    return;
3,145,626✔
231
  }
232

233
_exit:
×
234

235
  if (code) {
×
236
    pTaskInfo->code = code;
×
237
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
238
  }
239
}
240

241

242
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
177,136,587✔
243
                                           SExecTaskInfo* pTaskInfo) {
244
  int32_t code = 0;
177,136,587✔
245
  int32_t lino = 0;
177,136,587✔
246
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
177,136,587✔
247
  int32_t completed = 0;
177,136,261✔
248
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
177,136,261✔
249
  if (code != TSDB_CODE_SUCCESS) {
177,136,587✔
250
    pTaskInfo->code = code;
×
251
    T_LONG_JMP(pTaskInfo->env, code);
×
252
  }
253
  if (completed == totalSources) {
177,136,587✔
254
    setAllSourcesCompleted(pOperator);
55,589,067✔
255
    return;
55,590,159✔
256
  }
257

258
  SSourceDataInfo* pDataInfo = NULL;
121,547,520✔
259

260
  while (1) {
16,037,790✔
261
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
137,585,310✔
262
    code = exchangeWait(pOperator, pExchangeInfo);
137,585,310✔
263

264
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
137,585,310✔
265
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,010✔
266
    }
267

268
    for (int32_t i = 0; i < totalSources; ++i) {
198,100,334✔
269
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
198,100,334✔
270
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
198,100,008✔
271
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
198,100,008✔
272
        continue;
40,719,735✔
273
      }
274

275
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
157,380,599✔
276
        continue;
19,796,299✔
277
      }
278

279
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
137,584,300✔
280
      if (pDataInfo->seqId != currSeqId) {
137,584,300✔
281
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
282
        taosMemoryFreeClear(pDataInfo->pRsp);
×
283
        break;
×
284
      }
285

286
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
137,584,300✔
287
        code = pDataInfo->code;
594✔
288
        TAOS_CHECK_EXIT(code);
594✔
289
      }
290

291
      tmemory_barrier();
137,583,706✔
292
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
137,583,706✔
293
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
137,583,706✔
294
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
137,583,706✔
295

296
      // todo
297
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
137,583,706✔
298
      if (pRsp->numOfRows == 0) {
137,583,706✔
299
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
36,737,222✔
300
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
301
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
302
          if (code != TSDB_CODE_SUCCESS) {
×
303
            taosMemoryFreeClear(pDataInfo->pRsp);
×
304
            TAOS_CHECK_EXIT(code);
×
305
          }
306
        } else {
307
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
36,737,222✔
308
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
36,737,222✔
309
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
310
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
311
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
312
          taosMemoryFreeClear(pDataInfo->pRsp);
36,737,222✔
313
        }
314
        break;
36,737,222✔
315
      }
316

317
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
100,846,484✔
318

319
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
100,846,484✔
320
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
100,846,484✔
321
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
100,846,484✔
322

323
      if (pRsp->completed == 1) {
100,846,484✔
324
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
94,416,027✔
325
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
94,416,490✔
326
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
327
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
328
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
329
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
330
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
331
      } else {
332
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
6,429,994✔
333
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
334
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
335
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
336
      }
337

338
      taosMemoryFreeClear(pDataInfo->pRsp);
100,846,484✔
339

340
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
100,846,484✔
341
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
6,429,994✔
342
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
6,429,994✔
343
        if (code != TSDB_CODE_SUCCESS) {
6,429,994✔
344
          taosMemoryFreeClear(pDataInfo->pRsp);
×
345
          TAOS_CHECK_EXIT(code);
×
346
        }
347
      }
348
      
349
      return;
100,846,562✔
350
    }  // end loop
351

352
    int32_t complete1 = 0;
36,737,222✔
353
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
36,736,889✔
354
    if (code != TSDB_CODE_SUCCESS) {
36,737,222✔
355
      pTaskInfo->code = code;
×
356
      T_LONG_JMP(pTaskInfo->env, code);
×
357
    }
358
    if (complete1 == totalSources) {
36,737,222✔
359
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
20,699,432✔
360
      return;
20,699,432✔
361
    }
362
  }
363

364
_exit:
594✔
365

366
  if (code) {
594✔
367
    pTaskInfo->code = code;
594✔
368
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
594✔
369
  }
370
}
371

372
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
422,455,505✔
373
  int32_t        code = TSDB_CODE_SUCCESS;
422,455,505✔
374
  SExchangeInfo* pExchangeInfo = pOperator->info;
422,455,505✔
375
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
422,456,157✔
376

377
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
422,456,157✔
378

379
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
422,456,151✔
380
  if (pOperator->status == OP_EXEC_DONE) {
422,456,151✔
381
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
382
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
383
           pLoadInfo->totalElapsed / 1000.0);
384
    return NULL;
×
385
  }
386

387
  // we have buffered retrieved datablock, return it directly
388
  SSDataBlock* p = NULL;
422,455,953✔
389
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
422,455,953✔
390
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
236,753,192✔
391
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
236,753,070✔
392
  }
393

394
  if (p != NULL) {
422,458,036✔
395
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
236,755,275✔
396
    if (!tmp) {
236,754,374✔
397
      code = terrno;
×
398
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
399
      pTaskInfo->code = code;
×
400
      T_LONG_JMP(pTaskInfo->env, code);
×
401
    }
402
    return p;
236,754,374✔
403
  } else {
404
    if (pExchangeInfo->seqLoadData) {
185,702,761✔
405
      code = seqLoadRemoteData(pOperator);
488,526✔
406
      if (code != TSDB_CODE_SUCCESS) {
488,174✔
407
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
610✔
408
        pTaskInfo->code = code;
610✔
409
        T_LONG_JMP(pTaskInfo->env, code);
610✔
410
      }
411
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
185,214,235✔
412
      streamConcurrentlyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
8,077,648✔
413
    } else {
414
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
177,136,587✔
415
    }
416
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
185,700,084✔
417
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
594✔
418
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
594✔
419
    }
420
    
421
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
185,699,286✔
422
      qDebug("empty resultBlockList");
81,374,106✔
423
      return NULL;
81,374,106✔
424
    } else {
425
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
104,325,384✔
426
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
104,325,384✔
427
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
104,324,915✔
428
      if (!tmp) {
104,325,384✔
429
        code = terrno;
×
430
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
431
        pTaskInfo->code = code;
×
432
        T_LONG_JMP(pTaskInfo->env, code);
×
433
      }
434

435
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
104,325,384✔
436
      return p;
104,325,384✔
437
    }
438
}
439
}
440

441
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
420,676,630✔
442
  int32_t        code = TSDB_CODE_SUCCESS;
420,676,630✔
443
  int32_t        lino = 0;
420,676,630✔
444
  SExchangeInfo* pExchangeInfo = pOperator->info;
420,676,630✔
445
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
420,678,061✔
446

447
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
420,677,385✔
448

449
  code = pOperator->fpSet._openFn(pOperator);
420,679,447✔
450
  QUERY_CHECK_CODE(code, lino, _end);
420,680,547✔
451

452
  if (pOperator->status == OP_EXEC_DONE) {
420,680,547✔
453
    (*ppRes) = NULL;
96,240✔
454
    return code;
96,240✔
455
  }
456

457
  while (1) {
1,872,176✔
458
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
422,456,483✔
459
    if (pBlock == NULL) {
422,452,940✔
460
      (*ppRes) = NULL;
81,373,902✔
461
      return code;
81,374,106✔
462
    }
463

464
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
341,079,038✔
465
    QUERY_CHECK_CODE(code, lino, _end);
341,079,946✔
466

467
    if (blockDataGetNumOfRows(pBlock) == 0) {
341,079,946✔
468
      qDebug("rows 0 block got, continue next load");
954✔
469
      continue;
954✔
470
    }
471

472
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
341,079,588✔
473
    if (hasLimitOffsetInfo(pLimitInfo)) {
341,079,588✔
474
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
3,973,349✔
475
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,973,349✔
476
        qDebug("limit retrieve continue");
1,871,222✔
477
        continue;
1,871,222✔
478
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,102,127✔
479
        if (pBlock->info.rows == 0) {
2,102,127✔
480
          setOperatorCompleted(pOperator);
×
481
          (*ppRes) = NULL;
×
482
          return code;
×
483
        } else {
484
          (*ppRes) = pBlock;
2,102,127✔
485
          return code;
2,102,127✔
486
        }
487
      }
488
    } else {
489
      (*ppRes) = pBlock;
337,105,377✔
490
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
337,105,377✔
491
      return code;
337,105,781✔
492
    }
493
  }
494

495
_end:
×
496

497
  if (code != TSDB_CODE_SUCCESS) {
×
498
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
499
    pTaskInfo->code = code;
×
500
    T_LONG_JMP(pTaskInfo->env, code);
×
501
  } else {
502
    qDebug("empty block returned in exchange");
×
503
  }
504
  
505
  (*ppRes) = NULL;
×
506
  return code;
×
507
}
508

509
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
85,018,547✔
510
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
85,018,547✔
511
  if (pInfo->pSourceDataInfo == NULL) {
85,018,960✔
512
    return terrno;
×
513
  }
514

515
  if (pInfo->dynamicOp) {
85,018,771✔
516
    return TSDB_CODE_SUCCESS;
2,207,677✔
517
  }
518

519
  int32_t len = strlen(id) + 1;
82,812,206✔
520
  pInfo->pTaskId = taosMemoryCalloc(1, len);
82,812,206✔
521
  if (!pInfo->pTaskId) {
82,811,399✔
522
    return terrno;
×
523
  }
524
  tstrncpy(pInfo->pTaskId, id, len);
82,811,038✔
525
  for (int32_t i = 0; i < numOfSources; ++i) {
216,111,701✔
526
    SSourceDataInfo dataInfo = {0};
133,299,487✔
527
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
133,299,431✔
528
    dataInfo.taskId = pInfo->pTaskId;
133,299,431✔
529
    dataInfo.index = i;
133,300,080✔
530
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
133,300,080✔
531
    if (pDs == NULL) {
133,303,692✔
532
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
533
      return terrno;
×
534
    }
535
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
133,303,692✔
536
  }
537

538
  return TSDB_CODE_SUCCESS;
82,812,214✔
539
}
540

541
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
85,020,661✔
542
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
85,020,661✔
543

544
  if (numOfSources == 0) {
85,018,904✔
545
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
546
    return TSDB_CODE_INVALID_PARA;
×
547
  }
548
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
85,018,904✔
549
  if (!pInfo->pFetchRpcHandles) {
85,019,437✔
550
    return terrno;
×
551
  }
552
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
85,020,335✔
553
  if (!ret) {
85,018,890✔
554
    return terrno;
×
555
  }
556

557
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
85,018,890✔
558
  if (pInfo->pSources == NULL) {
85,020,437✔
559
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
560
    return terrno;
×
561
  }
562

563
  if (pExNode->node.dynamicOp) {
85,021,089✔
564
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
2,207,677✔
565
    if (NULL == pInfo->pHashSources) {
2,207,677✔
566
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
567
      return terrno;
×
568
    }
569
  }
570

571
  for (int32_t i = 0; i < numOfSources; ++i) {
222,764,388✔
572
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
137,743,670✔
573
    if (!pNode) {
137,741,217✔
574
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
575
      return terrno;
×
576
    }
577
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
137,741,217✔
578
    if (!tmp) {
137,743,102✔
579
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
580
      return terrno;
×
581
    }
582
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
137,743,102✔
583
    int32_t           code =
584
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
137,742,348✔
585
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
137,740,862✔
586
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
587
      return code;
×
588
    }
589
  }
590

591
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
85,020,718✔
592
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
85,018,557✔
593
  if (refId < 0) {
85,019,707✔
594
    int32_t code = terrno;
×
595
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
596
    return code;
×
597
  } else {
598
    pInfo->self = refId;
85,019,707✔
599
  }
600

601
  return initDataSource(numOfSources, pInfo, id);
85,019,707✔
602
}
603

604
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
7,876,889✔
605
  SExchangeInfo* pInfo = pOper->info;
7,876,889✔
606
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
7,877,086✔
607

608
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
7,877,086✔
609

610
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
7,877,508✔
611
  pOper->status = OP_NOT_OPENED;
7,877,677✔
612
  pInfo->current = 0;
7,877,677✔
613
  pInfo->loadInfo.totalElapsed = 0;
7,877,677✔
614
  pInfo->loadInfo.totalRows = 0;
7,877,677✔
615
  pInfo->loadInfo.totalSize = 0;
7,877,677✔
616
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
22,211,653✔
617
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
14,333,976✔
618
    taosWLockLatch(&pDataInfo->lock);
14,333,779✔
619
    taosMemoryFreeClear(pDataInfo->decompBuf);
14,333,976✔
620
    taosMemoryFreeClear(pDataInfo->pRsp);
14,333,976✔
621

622
    pDataInfo->totalRows = 0;
14,333,976✔
623
    pDataInfo->code = 0;
14,333,976✔
624
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
14,333,976✔
625
    pDataInfo->fetchSent = false;
14,333,976✔
626
    taosWUnLockLatch(&pDataInfo->lock);
14,333,976✔
627
  }
628

629
  if (pInfo->dynamicOp) {
7,877,677✔
630
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,376,207✔
631
  } 
632

633
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
7,877,677✔
634
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
7,877,677✔
635

636
  blockDataCleanup(pInfo->pDummyBlock);
7,877,677✔
637

638
  void   *data = NULL;
7,877,677✔
639
  int32_t iter = 0;
7,877,677✔
640
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
10,442,829✔
641
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
2,565,152✔
642
  }
643
  
644
  pInfo->limitInfo = (SLimitInfo){0};
7,877,480✔
645
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
7,877,480✔
646

647
  return 0;
7,877,480✔
648
}
649

650
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
85,020,328✔
651
                                   SOperatorInfo** pOptrInfo) {
652
  QRY_PARAM_CHECK(pOptrInfo);
85,020,328✔
653

654
  int32_t        code = 0;
85,020,328✔
655
  int32_t        lino = 0;
85,020,328✔
656
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
85,020,328✔
657
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
85,018,203✔
658
  if (pInfo == NULL || pOperator == NULL) {
85,017,295✔
659
    code = terrno;
×
660
    goto _error;
×
661
  }
662

663
  pOperator->pPhyNode = pExNode;
85,017,295✔
664
  pInfo->dynamicOp = pExNode->node.dynamicOp;
85,017,295✔
665
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
85,018,743✔
666
  QUERY_CHECK_CODE(code, lino, _error);
85,020,097✔
667

668
  code = tsem_init(&pInfo->ready, 0, 0);
85,020,097✔
669
  QUERY_CHECK_CODE(code, lino, _error);
85,020,894✔
670

671
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
85,020,894✔
672
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
85,021,594✔
673

674
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
85,019,670✔
675
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
85,019,996✔
676
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
85,020,329✔
677
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
85,019,995✔
678

679
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
85,020,798✔
680
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
85,021,124✔
681
  QUERY_CHECK_CODE(code, lino, _error);
85,021,268✔
682

683
  pInfo->seqLoadData = pExNode->seqRecvData;
85,021,268✔
684
  pInfo->dynTbname = pExNode->dynTbname;
85,020,799✔
685
  if (pInfo->dynTbname) {
85,019,968✔
686
    pInfo->seqLoadData = true;
9,184✔
687
  }
688
  pInfo->pTransporter = pTransporter;
85,021,131✔
689

690
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
85,019,974✔
691
                  pTaskInfo);
692
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
85,018,846✔
693

694
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
85,019,834✔
695
                            pTaskInfo->pStreamRuntimeInfo);
85,020,437✔
696
  QUERY_CHECK_CODE(code, lino, _error);
85,020,472✔
697
  qTrace("%s exchange op:%p", __func__, pOperator);
85,020,472✔
698
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
85,020,472✔
699
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
700
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
85,021,125✔
701
  *pOptrInfo = pOperator;
85,019,025✔
702
  return TSDB_CODE_SUCCESS;
85,018,562✔
703

704
_error:
×
705
  if (code != TSDB_CODE_SUCCESS) {
×
706
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
707
    pTaskInfo->code = code;
×
708
  }
709
  if (pInfo != NULL) {
×
710
    doDestroyExchangeOperatorInfo(pInfo);
×
711
  }
712

713
  if (pOperator != NULL) {
×
714
    pOperator->info = NULL;
×
715
    destroyOperator(pOperator);
×
716
  }
717
  return code;
×
718
}
719

720
void destroyExchangeOperatorInfo(void* param) {
85,020,656✔
721
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
85,020,656✔
722
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
85,020,656✔
723
  if (code != TSDB_CODE_SUCCESS) {
85,021,594✔
724
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
725
  }
726
}
85,021,594✔
727

728
void freeBlock(void* pParam) {
215,907,028✔
729
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
215,907,028✔
730
  blockDataDestroy(pBlock);
215,907,232✔
731
}
215,907,442✔
732

733
void freeSourceDataInfo(void* p) {
133,652,043✔
734
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
133,652,043✔
735
  taosMemoryFreeClear(pInfo->decompBuf);
133,652,043✔
736
  taosMemoryFreeClear(pInfo->pRsp);
133,652,043✔
737

738
  pInfo->decompBufSize = 0;
133,652,043✔
739
}
133,651,574✔
740

741
void doDestroyExchangeOperatorInfo(void* param) {
85,021,125✔
742
  if (param == NULL) {
85,021,125✔
743
    return;
×
744
  }
745
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
85,021,125✔
746
  if (pExInfo->pFetchRpcHandles) {
85,021,125✔
747
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
222,765,722✔
748
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
137,744,128✔
749
      if (*pRpcHandle > 0) {
137,745,066✔
750
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
5,769,553✔
751
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
5,769,553✔
752
      }
753
    }
754
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
85,021,594✔
755
  }
756

757
  taosArrayDestroy(pExInfo->pSources);
85,020,656✔
758
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
85,020,799✔
759

760
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
85,021,594✔
761
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
85,020,312✔
762

763
  blockDataDestroy(pExInfo->pDummyBlock);
85,020,656✔
764
  tSimpleHashCleanup(pExInfo->pHashSources);
85,020,672✔
765

766
  int32_t code = tsem_destroy(&pExInfo->ready);
85,020,673✔
767
  if (code != TSDB_CODE_SUCCESS) {
85,021,122✔
768
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
769
  }
770
  taosMemoryFreeClear(pExInfo->pTaskId);
85,021,122✔
771

772
  taosMemoryFreeClear(param);
85,019,735✔
773
}
774

775
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
145,292,073✔
776
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
145,292,073✔
777

778
  taosMemoryFreeClear(pMsg->pEpSet);
145,292,073✔
779
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
145,315,772✔
780
  if (pExchangeInfo == NULL) {
145,320,483✔
781
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
18,973✔
782
    taosMemoryFree(pMsg->pData);
18,973✔
783
    return TSDB_CODE_SUCCESS;
18,973✔
784
  }
785

786
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
145,301,510✔
787
  if (pWrapper->seqId != currSeqId) {
145,301,133✔
788
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
789
    taosMemoryFree(pMsg->pData);
×
790
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
791
    if (code != TSDB_CODE_SUCCESS) {
×
792
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
793
    }
794
    return TSDB_CODE_SUCCESS;
×
795
  }
796

797
  int32_t          index = pWrapper->sourceIndex;
145,283,797✔
798

799
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
145,285,947✔
800

801
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
145,289,638✔
802
  if (pRpcHandle != NULL) {
145,290,913✔
803
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
145,281,340✔
804
    if (ret != 0) {
145,274,223✔
805
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
6,281,577✔
806
    }
807
    *pRpcHandle = -1;
145,274,223✔
808
  }
809

810
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
145,273,400✔
811
  if (!pSourceDataInfo) {
145,295,550✔
812
    return terrno;
×
813
  }
814

815
  if (0 == code && NULL == pMsg->pData) {
145,295,550✔
816
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
817
    code = TSDB_CODE_QRY_INVALID_MSG;
×
818
  }
819

820
  taosWLockLatch(&pSourceDataInfo->lock);
145,308,259✔
821
  if (code == TSDB_CODE_SUCCESS) {
145,288,461✔
822
    pSourceDataInfo->seqId = pWrapper->seqId;
145,285,903✔
823
    pSourceDataInfo->pRsp = pMsg->pData;
145,270,213✔
824

825
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
145,257,370✔
826
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
145,238,766✔
827
    pRsp->compLen = htonl(pRsp->compLen);
145,273,716✔
828
    pRsp->payloadLen = htonl(pRsp->payloadLen);
145,271,610✔
829
    pRsp->numOfCols = htonl(pRsp->numOfCols);
145,238,091✔
830
    pRsp->useconds = htobe64(pRsp->useconds);
145,228,219✔
831
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
145,181,543✔
832

833
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
145,197,505✔
834
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
835
  } else {
836
    taosMemoryFree(pMsg->pData);
2,558✔
837
    pSourceDataInfo->code = rpcCvtErrCode(code);
2,558✔
838
    if (pSourceDataInfo->code != code) {
2,558✔
839
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
840
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
841
    } else {
842
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
2,558✔
843
             pExchangeInfo);
844
    }
845
  }
846

847
  tmemory_barrier();
145,204,049✔
848
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
145,204,049✔
849
  taosWUnLockLatch(&pSourceDataInfo->lock);
145,234,219✔
850
  
851
  code = tsem_post(&pExchangeInfo->ready);
145,252,217✔
852
  if (code != TSDB_CODE_SUCCESS) {
145,297,802✔
853
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
854
    return code;
×
855
  }
856

857
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
145,297,802✔
858
  if (code != TSDB_CODE_SUCCESS) {
145,306,444✔
859
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
860
  }
861
  return code;
145,306,441✔
862
}
863

864
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
254,929✔
865
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
254,929✔
866
  if (NULL == *ppRes) {
254,929✔
867
    return terrno;
×
868
  }
869

870
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
254,929✔
871
  if (NULL == pScan) {
254,929✔
872
    taosMemoryFreeClear(*ppRes);
×
873
    return terrno;
×
874
  }
875

876
  pScan->pUidList = taosArrayDup(pUidList, NULL);
254,929✔
877
  if (NULL == pScan->pUidList) {
254,929✔
878
    taosMemoryFree(pScan);
×
879
    taosMemoryFreeClear(*ppRes);
×
880
    return terrno;
×
881
  }
882
  pScan->type = DYN_TYPE_STB_JOIN;
254,929✔
883
  pScan->tableSeq = tableSeq;
254,929✔
884
  pScan->pOrgTbInfo = NULL;
254,929✔
885
  pScan->pBatchTbInfo = NULL;
254,929✔
886
  pScan->pTagList = NULL;
254,929✔
887
  pScan->isNewParam = false;
254,929✔
888
  pScan->window.skey = INT64_MAX;
254,929✔
889
  pScan->window.ekey = INT64_MIN;
254,929✔
890

891
  (*ppRes)->opType = srcOpType;
254,929✔
892
  (*ppRes)->downstreamIdx = 0;
254,929✔
893
  (*ppRes)->value = pScan;
254,929✔
894
  (*ppRes)->pChildren = NULL;
254,929✔
895
  (*ppRes)->reUse = false;
254,929✔
896

897
  return TSDB_CODE_SUCCESS;
254,929✔
898
}
899

900
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam) {
327,378✔
901
  int32_t                  code = TSDB_CODE_SUCCESS;
327,378✔
902
  int32_t                  lino = 0;
327,378✔
903
  STableScanOperatorParam* pScan = NULL;
327,378✔
904

905
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
327,378✔
906
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
327,378✔
907

908
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
327,378✔
909
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
327,378✔
910

911
  if (pUidList) {
327,378✔
912
    pScan->pUidList = taosArrayDup(pUidList, NULL);
327,378✔
913
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
327,378✔
914
  } else {
915
    pScan->pUidList = NULL;
×
916
  }
917

918
  if (pMap) {
327,378✔
919
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
327,378✔
920
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
327,378✔
921

922
    pScan->pOrgTbInfo->vgId = pMap->vgId;
327,378✔
923
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
327,378✔
924

925
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
327,378✔
926
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
327,378✔
927
  } else {
928
    pScan->pOrgTbInfo = NULL;
×
929
  }
930
  pScan->pTagList = NULL;
327,378✔
931
  pScan->pBatchTbInfo = NULL;
327,378✔
932

933

934
  pScan->type = DYN_TYPE_VSTB_SINGLE_SCAN;
327,378✔
935
  pScan->tableSeq = tableSeq;
327,378✔
936
  pScan->window.skey = window->skey;
327,378✔
937
  pScan->window.ekey = window->ekey;
327,378✔
938
  pScan->isNewParam = isNewParam;
327,378✔
939
  (*ppRes)->opType = srcOpType;
327,378✔
940
  (*ppRes)->downstreamIdx = 0;
327,378✔
941
  (*ppRes)->value = pScan;
327,378✔
942
  (*ppRes)->pChildren = NULL;
327,378✔
943
  (*ppRes)->reUse = false;
327,378✔
944

945
  return code;
327,378✔
946
_return:
×
947
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
948
  taosMemoryFreeClear(*ppRes);
×
949
  if (pScan) {
×
950
    taosArrayDestroy(pScan->pUidList);
×
951
    if (pScan->pOrgTbInfo) {
×
952
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
953
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
954
    }
955
    taosMemoryFree(pScan);
×
956
  }
957
  return code;
×
958
}
959

960
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
×
961
  int32_t                  code = TSDB_CODE_SUCCESS;
×
962
  int32_t                  lino = 0;
×
963
  STableScanOperatorParam* pScan = NULL;
×
964

965
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
966
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
967

968
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
×
969
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
×
970

971
  pScan->groupid = groupid;
×
972
  if (pUidList) {
×
973
    pScan->pUidList = taosArrayDup(pUidList, NULL);
×
974
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
×
975
  } else {
976
    pScan->pUidList = NULL;
×
977
  }
978
  pScan->pOrgTbInfo = NULL;
×
979

980
  if (pBatchMap) {
×
981
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
982
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
×
983
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
×
984
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
×
985
      SOrgTbInfo batchInfo = {0};
×
986
      batchInfo.vgId = pSrcInfo->vgId;
×
987
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
×
988
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
×
989
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
×
990
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
×
991
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
×
992
    }
993
  } else {
994
    pScan->pBatchTbInfo = NULL;
×
995
  }
996

997
  if (pTagList) {
×
998
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
×
999
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
×
1000

1001
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
×
1002
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
×
1003
      STagVal  dstTag;
×
1004
      dstTag.type = pSrcTag->type;
×
1005
      dstTag.cid = pSrcTag->cid;
×
1006
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
×
1007
        dstTag.nData = pSrcTag->nData;
×
1008
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
×
1009
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
×
1010
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
×
1011
      } else {
1012
        dstTag.i64 = pSrcTag->i64;
×
1013
      }
1014

1015
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
×
1016
    }
1017
  } else {
1018
    pScan->pTagList = NULL;
×
1019
  }
1020

1021

1022
  pScan->type = DYN_TYPE_VSTB_BATCH_SCAN;
×
1023
  pScan->tableSeq = tableSeq;
×
1024
  pScan->window.skey = window->skey;
×
1025
  pScan->window.ekey = window->ekey;
×
1026
  pScan->isNewParam = isNewParam;
×
1027
  (*ppRes)->opType = srcOpType;
×
1028
  (*ppRes)->downstreamIdx = 0;
×
1029
  (*ppRes)->value = pScan;
×
1030
  (*ppRes)->pChildren = NULL;
×
1031
  (*ppRes)->reUse = false;
×
1032

1033
  return code;
×
1034
_return:
×
1035
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1036
  taosMemoryFreeClear(*ppRes);
×
1037
  if (pScan) {
×
1038
    taosArrayDestroy(pScan->pUidList);
×
1039
    if (pScan->pBatchTbInfo) {
×
1040
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1041
    }
1042
    taosMemoryFree(pScan);
×
1043
  }
1044
  return code;
×
1045
}
1046

1047
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
×
1048
  int32_t                  code = TSDB_CODE_SUCCESS;
×
1049
  int32_t                  lino = 0;
×
1050

1051
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
1052
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
1053

1054
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
1055
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
×
1056

1057
  SOperatorParam* pTableScanParam = NULL;
×
1058
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
×
1059
  QUERY_CHECK_CODE(code, lino, _return);
×
1060

1061
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pTableScanParam), code, lino, _return, terrno);
×
1062

1063
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
×
1064
  (*ppRes)->downstreamIdx = 0;
×
1065
  (*ppRes)->value = NULL;
×
1066
  (*ppRes)->reUse = false;
×
1067

1068
_return:
×
1069
  return code;
×
1070
}
1071

1072
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
24,648✔
1073
  int32_t                  code = TSDB_CODE_SUCCESS;
24,648✔
1074
  int32_t                  lino = 0;
24,648✔
1075
  STagScanOperatorParam*   pScan = NULL;
24,648✔
1076

1077
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
24,648✔
1078
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
24,648✔
1079

1080
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
24,648✔
1081
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
24,648✔
1082
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
24,648✔
1083

1084
  (*ppRes)->opType = srcOpType;
24,648✔
1085
  (*ppRes)->downstreamIdx = 0;
24,648✔
1086
  (*ppRes)->value = pScan;
24,648✔
1087
  (*ppRes)->pChildren = NULL;
24,648✔
1088
  (*ppRes)->reUse = false;
24,648✔
1089

1090
  return code;
24,648✔
1091
_return:
×
1092
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1093
  taosMemoryFreeClear(*ppRes);
×
1094
  if (pScan) {
×
1095
    taosMemoryFree(pScan);
×
1096
  }
1097
  return code;
×
1098
}
1099

1100
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
3,958,168✔
1101
  if (!pRuntimeInfo || !pTimeRange) {
3,958,168✔
1102
    return TSDB_CODE_INTERNAL_ERROR;
×
1103
  }
1104

1105
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
3,958,168✔
1106
  if (!pParam) {
3,958,168✔
1107
    return TSDB_CODE_INTERNAL_ERROR;
×
1108
  }
1109

1110
  switch (pRuntimeInfo->triggerType) {
3,958,168✔
1111
    case STREAM_TRIGGER_SLIDING:
3,087,202✔
1112
      // Unable to distinguish whether there is an interval, all use wstart/wend
1113
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1114
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,087,202✔
1115
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,087,202✔
1116
      break;
3,087,202✔
1117
    case STREAM_TRIGGER_PERIOD:
170,467✔
1118
      pTimeRange->skey = pParam->prevLocalTime;
170,467✔
1119
      pTimeRange->ekey = pParam->triggerTime;
170,467✔
1120
      break;
170,467✔
1121
    default:
700,499✔
1122
      pTimeRange->skey = pParam->wstart;
700,499✔
1123
      pTimeRange->ekey = pParam->wend;
700,499✔
1124
      break;
700,499✔
1125
  }
1126

1127
  return TSDB_CODE_SUCCESS;
3,958,168✔
1128
}
1129

1130
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
145,439,066✔
1131
  int32_t          code = TSDB_CODE_SUCCESS;
145,439,066✔
1132
  int32_t          lino = 0;
145,439,066✔
1133
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
145,439,066✔
1134
  if (!pDataInfo) {
145,436,792✔
1135
    return terrno;
×
1136
  }
1137

1138
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
145,436,792✔
1139
    return TSDB_CODE_SUCCESS;
×
1140
  }
1141

1142
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
145,437,451✔
1143
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
145,438,740✔
1144
  if (!pSource) {
145,435,827✔
1145
    return terrno;
×
1146
  }
1147

1148
  pDataInfo->startTime = taosGetTimestampUs();
145,436,053✔
1149
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
145,437,822✔
1150

1151
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
145,435,664✔
1152
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
145,436,794✔
1153
  pWrapper->exchangeId = pExchangeInfo->self;
145,436,794✔
1154
  pWrapper->sourceIndex = sourceIndex;
145,437,290✔
1155
  pWrapper->seqId = pExchangeInfo->seqId;
145,437,009✔
1156

1157
  if (pSource->localExec) {
145,436,169✔
1158
    SDataBuf pBuf = {0};
×
1159
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
×
1160
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
1161
                                               pTaskInfo->localFetch.explainRes);
1162
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1163
    taosMemoryFree(pWrapper);
×
1164
    QUERY_CHECK_CODE(code, lino, _end);
×
1165
  } else {
1166
    bool needStreamPesudoFuncVals = true;
145,435,624✔
1167
    SResFetchReq req = {0};
145,435,624✔
1168
    req.header.vgId = pSource->addr.nodeId;
145,434,883✔
1169
    req.sId = pSource->sId;
145,434,411✔
1170
    req.clientId = pSource->clientId;
145,435,825✔
1171
    req.taskId = pSource->taskId;
145,435,714✔
1172
    req.queryId = pTaskInfo->id.queryId;
145,436,024✔
1173
    req.execId = pSource->execId;
145,435,578✔
1174
    if (pTaskInfo->pStreamRuntimeInfo) {
145,436,178✔
1175
      req.dynTbname = pExchangeInfo->dynTbname;
6,976,673✔
1176
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
6,976,673✔
1177
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
6,976,505✔
1178

1179
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
6,976,505✔
1180
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
20,030✔
1181
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
6,956,410✔
1182
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
3,958,168✔
1183
        QUERY_CHECK_CODE(code, lino, _end);
3,958,168✔
1184
        needStreamPesudoFuncVals = false;
3,958,168✔
1185
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
3,958,168✔
1186
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1187
               req.pStRtFuncInfo->curWindow.ekey);
1188
      }
1189
      if (!pDataInfo->fetchSent) {
6,976,440✔
1190
        req.reset = pDataInfo->fetchSent = true;
5,549,990✔
1191
      }
1192
    }
1193

1194
    switch (pDataInfo->type) {
145,434,441✔
1195
      case EX_SRC_TYPE_VSTB_SCAN: {
327,378✔
1196
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
327,378✔
1197
        taosArrayDestroy(pDataInfo->orgTbInfo->colMap);
327,378✔
1198
        taosMemoryFreeClear(pDataInfo->orgTbInfo);
327,378✔
1199
        taosArrayDestroy(pDataInfo->pSrcUidList);
327,378✔
1200
        pDataInfo->pSrcUidList = NULL;
327,378✔
1201
        if (TSDB_CODE_SUCCESS != code) {
327,378✔
1202
          pTaskInfo->code = code;
×
1203
          taosMemoryFree(pWrapper);
×
1204
          return pTaskInfo->code;
×
1205
        }
1206
        break;
327,378✔
1207
      }
1208
      case EX_SRC_TYPE_VSTB_WIN_SCAN: {
×
1209
        if (pDataInfo->pSrcUidList) {
×
1210
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false);
×
1211
          taosArrayDestroy(pDataInfo->pSrcUidList);
×
1212
          pDataInfo->pSrcUidList = NULL;
×
1213
          if (TSDB_CODE_SUCCESS != code) {
×
1214
            pTaskInfo->code = code;
×
1215
            taosMemoryFree(pWrapper);
×
1216
            return pTaskInfo->code;
×
1217
          }
1218
        }
1219
        break;
×
1220
      }
1221
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
24,648✔
1222
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
24,648✔
1223
        taosArrayDestroy(pDataInfo->pSrcUidList);
24,648✔
1224
        pDataInfo->pSrcUidList = NULL;
24,648✔
1225
        if (TSDB_CODE_SUCCESS != code) {
24,648✔
1226
          pTaskInfo->code = code;
×
1227
          taosMemoryFree(pWrapper);
×
1228
          return pTaskInfo->code;
×
1229
        }
1230
        break;
24,648✔
1231
      }
1232
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
×
1233
        if (pDataInfo->batchOrgTbInfo) {
×
1234
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
×
1235
          if (pDataInfo->batchOrgTbInfo) {
×
1236
            for (int32_t i = 0; i < taosArrayGetSize(pDataInfo->batchOrgTbInfo); ++i) {
×
1237
              SOrgTbInfo* pColMap = taosArrayGet(pDataInfo->batchOrgTbInfo, i);
×
1238
              if (pColMap) {
×
1239
                taosArrayDestroy(pColMap->colMap);
×
1240
              }
1241
            }
1242
            taosArrayDestroy(pDataInfo->batchOrgTbInfo);
×
1243
            pDataInfo->batchOrgTbInfo = NULL;
×
1244
          }
1245
          if (pDataInfo->tagList) {
×
1246
            taosArrayDestroyEx(pDataInfo->tagList, destroyTagVal);
×
1247
            pDataInfo->tagList = NULL;
×
1248
          }
1249
          if (pDataInfo->pSrcUidList) {
×
1250
            taosArrayDestroy(pDataInfo->pSrcUidList);
×
1251
            pDataInfo->pSrcUidList = NULL;
×
1252
          }
1253

1254
          if (TSDB_CODE_SUCCESS != code) {
×
1255
            pTaskInfo->code = code;
×
1256
            taosMemoryFree(pWrapper);
×
1257
            return pTaskInfo->code;
×
1258
          }
1259
        }
1260
        break;
×
1261
      }
1262
      case EX_SRC_TYPE_STB_JOIN_SCAN:
145,082,388✔
1263
      default: {
1264
        if (pDataInfo->pSrcUidList) {
145,082,388✔
1265
          code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
244,366✔
1266
          taosArrayDestroy(pDataInfo->pSrcUidList);
244,366✔
1267
          pDataInfo->pSrcUidList = NULL;
244,366✔
1268
          if (TSDB_CODE_SUCCESS != code) {
243,261✔
1269
            pTaskInfo->code = code;
×
1270
            taosMemoryFree(pWrapper);
×
1271
            return pTaskInfo->code;
×
1272
          }
1273
        }
1274
        break;
145,084,794✔
1275
      }
1276
    }
1277

1278
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
145,436,820✔
1279
    if (msgSize < 0) {
145,432,634✔
1280
      pTaskInfo->code = msgSize;
×
1281
      taosMemoryFree(pWrapper);
×
1282
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1283
      return pTaskInfo->code;
×
1284
    }
1285

1286
    void* msg = taosMemoryCalloc(1, msgSize);
145,432,634✔
1287
    if (NULL == msg) {
145,431,739✔
1288
      pTaskInfo->code = terrno;
×
1289
      taosMemoryFree(pWrapper);
×
1290
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1291
      return pTaskInfo->code;
×
1292
    }
1293

1294
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
145,431,739✔
1295
    if (msgSize < 0) {
145,435,169✔
1296
      pTaskInfo->code = msgSize;
×
1297
      taosMemoryFree(pWrapper);
×
1298
      taosMemoryFree(msg);
×
1299
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1300
      return pTaskInfo->code;
×
1301
    }
1302

1303
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
145,435,169✔
1304

1305
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
145,436,014✔
1306
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1307
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1308
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1309

1310
    // send the fetch remote task result reques
1311
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
145,438,597✔
1312
    if (NULL == pMsgSendInfo) {
145,434,679✔
1313
      taosMemoryFreeClear(msg);
×
1314
      taosMemoryFree(pWrapper);
×
1315
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1316
      pTaskInfo->code = terrno;
×
1317
      return pTaskInfo->code;
×
1318
    }
1319

1320
    pMsgSendInfo->param = pWrapper;
145,434,679✔
1321
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
145,438,271✔
1322
    pMsgSendInfo->msgInfo.pData = msg;
145,437,619✔
1323
    pMsgSendInfo->msgInfo.len = msgSize;
145,435,986✔
1324
    pMsgSendInfo->msgType = pSource->fetchMsgType;
145,437,107✔
1325
    pMsgSendInfo->fp = loadRemoteDataCallback;
145,437,436✔
1326
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
145,437,250✔
1327

1328
    int64_t transporterId = 0;
145,437,286✔
1329
    void* poolHandle = NULL;
145,436,781✔
1330
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
145,436,781✔
1331
    QUERY_CHECK_CODE(code, lino, _end);
145,439,066✔
1332
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
145,439,066✔
1333
    *pRpcHandle = transporterId;
145,439,066✔
1334
  }
1335

1336
_end:
145,439,066✔
1337
  if (code != TSDB_CODE_SUCCESS) {
145,439,066✔
1338
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1339
  }
1340
  return code;
145,439,066✔
1341
}
1342

1343
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
108,272,516✔
1344
                          SOperatorInfo* pOperator) {
1345
  pInfo->totalRows += numOfRows;
108,272,516✔
1346
  pInfo->totalSize += dataLen;
108,272,992✔
1347
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
108,272,992✔
1348
  pOperator->resultInfo.totalRows += numOfRows;
108,272,529✔
1349
}
108,272,992✔
1350

1351
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
350,261,031✔
1352
  int32_t      code = TSDB_CODE_SUCCESS;
350,261,031✔
1353
  int32_t      lino = 0;
350,261,031✔
1354
  SSDataBlock* pBlock = NULL;
350,261,031✔
1355
  if (pColList == NULL) {  // data from other sources
350,261,573✔
1356
    blockDataCleanup(pRes);
346,314,363✔
1357
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
346,314,363✔
1358
    if (code) {
346,312,085✔
1359
      return code;
×
1360
    }
1361
  } else {  // extract data according to pColList
1362
    char* pStart = pData;
3,947,210✔
1363

1364
    int32_t numOfCols = htonl(*(int32_t*)pStart);
3,947,210✔
1365
    pStart += sizeof(int32_t);
3,947,608✔
1366

1367
    // todo refactor:extract method
1368
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
3,947,608✔
1369
    for (int32_t i = 0; i < numOfCols; ++i) {
55,303,873✔
1370
      SSysTableSchema* p = (SSysTableSchema*)pStart;
51,356,265✔
1371

1372
      p->colId = htons(p->colId);
51,356,265✔
1373
      p->bytes = htonl(p->bytes);
51,356,265✔
1374
      pStart += sizeof(SSysTableSchema);
51,356,265✔
1375
    }
1376

1377
    pBlock = NULL;
3,947,608✔
1378
    code = createDataBlock(&pBlock);
3,947,608✔
1379
    QUERY_CHECK_CODE(code, lino, _end);
3,947,608✔
1380

1381
    for (int32_t i = 0; i < numOfCols; ++i) {
55,303,873✔
1382
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
51,356,265✔
1383
      code = blockDataAppendColInfo(pBlock, &idata);
51,356,265✔
1384
      QUERY_CHECK_CODE(code, lino, _end);
51,356,265✔
1385
    }
1386

1387
    code = blockDecodeInternal(pBlock, pStart, NULL);
3,947,608✔
1388
    QUERY_CHECK_CODE(code, lino, _end);
3,947,608✔
1389

1390
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
3,947,608✔
1391
    QUERY_CHECK_CODE(code, lino, _end);
3,947,608✔
1392

1393
    // data from mnode
1394
    pRes->info.dataLoad = 1;
3,947,608✔
1395
    pRes->info.rows = pBlock->info.rows;
3,947,608✔
1396
    pRes->info.scanFlag = MAIN_SCAN;
3,947,608✔
1397
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
3,947,608✔
1398
    QUERY_CHECK_CODE(code, lino, _end);
3,947,608✔
1399

1400
    blockDataDestroy(pBlock);
3,947,608✔
1401
    pBlock = NULL;
3,947,608✔
1402
  }
1403

1404
_end:
350,259,693✔
1405
  if (code != TSDB_CODE_SUCCESS) {
350,257,801✔
1406
    blockDataDestroy(pBlock);
×
1407
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1408
  }
1409
  return code;
350,259,474✔
1410
}
1411

1412
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
60,674,470✔
1413
  SExchangeInfo* pExchangeInfo = pOperator->info;
60,674,470✔
1414
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
60,674,470✔
1415

1416
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
60,674,674✔
1417
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
60,674,674✔
1418
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
60,674,674✔
1419
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1420
         pLoadInfo->totalElapsed / 1000.0);
1421

1422
  setOperatorCompleted(pOperator);
60,674,460✔
1423
}
60,674,674✔
1424

1425
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
226,648,057✔
1426
  int32_t code = TSDB_CODE_SUCCESS;
226,648,057✔
1427
  int32_t lino = 0;
226,648,057✔
1428
  size_t  total = taosArrayGetSize(pArray);
226,648,057✔
1429

1430
  int32_t completed = 0;
226,648,057✔
1431
  for (int32_t k = 0; k < total; ++k) {
625,816,944✔
1432
    SSourceDataInfo* p = taosArrayGet(pArray, k);
399,168,726✔
1433
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
399,168,726✔
1434
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
399,168,726✔
1435
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
192,114,903✔
1436
      completed += 1;
192,115,370✔
1437
    }
1438
  }
1439

1440
  *pRes = completed;
226,648,218✔
1441
_end:
226,647,581✔
1442
  if (code != TSDB_CODE_SUCCESS) {
226,647,581✔
1443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1444
  }
1445
  return code;
226,647,581✔
1446
}
1447

1448
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
81,312,738✔
1449
  SExchangeInfo* pExchangeInfo = pOperator->info;
81,312,738✔
1450
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
81,311,943✔
1451

1452
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
81,311,907✔
1453
  int64_t startTs = taosGetTimestampUs();
81,308,963✔
1454

1455
  // Asynchronously send all fetch requests to all sources.
1456
  for (int32_t i = 0; i < totalSources; ++i) {
212,935,388✔
1457
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
131,622,650✔
1458
    if (code != TSDB_CODE_SUCCESS) {
131,626,425✔
1459
      pTaskInfo->code = code;
×
1460
      return code;
×
1461
    }
1462
  }
1463

1464
  int64_t endTs = taosGetTimestampUs();
81,312,738✔
1465
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
81,312,738✔
1466
         totalSources, (endTs - startTs) / 1000.0);
1467

1468
  pOperator->status = OP_RES_TO_RETURN;
81,312,738✔
1469
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
81,312,738✔
1470
  if (isTaskKilled(pTaskInfo)) {
81,312,738✔
1471
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1472
  }
1473

1474
  return TSDB_CODE_SUCCESS;
81,312,738✔
1475
}
1476

1477
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
104,325,170✔
1478
  int32_t            code = TSDB_CODE_SUCCESS;
104,325,170✔
1479
  int32_t            lino = 0;
104,325,170✔
1480
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
104,325,170✔
1481
  SSDataBlock*       pb = NULL;
104,325,384✔
1482

1483
  char* pNextStart = pRetrieveRsp->data;
104,325,384✔
1484
  char* pStart = pNextStart;
104,325,384✔
1485

1486
  int32_t index = 0;
104,325,384✔
1487

1488
  if (pRetrieveRsp->compressed) {  // decompress the data
104,325,384✔
1489
    if (pDataInfo->decompBuf == NULL) {
×
1490
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1491
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1492
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1493
    } else {
1494
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1495
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1496
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1497
        if (p != NULL) {
×
1498
          pDataInfo->decompBuf = p;
×
1499
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1500
        }
1501
      }
1502
    }
1503
  }
1504

1505
  while (index++ < pRetrieveRsp->numOfBlocks) {
450,639,069✔
1506
    pStart = pNextStart;
346,314,542✔
1507

1508
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
346,314,542✔
1509
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
130,407,038✔
1510
      blockDataCleanup(pb);
130,407,038✔
1511
    } else {
1512
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
215,907,516✔
1513
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
215,907,130✔
1514
    }
1515

1516
    int32_t compLen = *(int32_t*)pStart;
346,314,168✔
1517
    pStart += sizeof(int32_t);
346,314,372✔
1518

1519
    int32_t rawLen = *(int32_t*)pStart;
346,314,372✔
1520
    pStart += sizeof(int32_t);
346,313,984✔
1521
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
346,314,178✔
1522

1523
    pNextStart = pStart + compLen;
346,314,178✔
1524
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
346,314,178✔
1525
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1526
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1527
      pStart = pDataInfo->decompBuf;
×
1528
    }
1529

1530
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
346,313,986✔
1531
    if (code != 0) {
346,312,600✔
1532
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1533
      goto _end;
×
1534
    }
1535

1536
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
346,312,600✔
1537
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
346,312,623✔
1538
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
346,312,623✔
1539
    pb = NULL;
346,314,134✔
1540
  }
1541

1542
_end:
104,325,384✔
1543
  if (code != TSDB_CODE_SUCCESS) {
104,325,384✔
1544
    blockDataDestroy(pb);
×
1545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1546
  }
1547
  return code;
104,325,384✔
1548
}
1549

1550
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
488,526✔
1551
  SExchangeInfo* pExchangeInfo = pOperator->info;
488,526✔
1552
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
488,526✔
1553

1554
  int32_t code = 0;
488,526✔
1555
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
488,526✔
1556
  int64_t startTs = taosGetTimestampUs();
488,526✔
1557

1558
  int32_t vgId = 0;
488,526✔
1559
  if (pExchangeInfo->dynTbname) {
488,526✔
1560
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
57,140✔
1561
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
57,140✔
1562
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
57,140✔
1563
      if (pValue != NULL && pValue->isTbname) {
57,140✔
1564
        vgId = pValue->vgId;
57,140✔
1565
        break;
57,140✔
1566
      }
1567
    }
1568
  }
1569

1570
  while (1) {
168,632✔
1571
    if (pExchangeInfo->current >= totalSources) {
657,158✔
1572
      setAllSourcesCompleted(pOperator);
154,290✔
1573
      return TSDB_CODE_SUCCESS;
154,290✔
1574
    }
1575

1576
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
502,868✔
1577
    if (!pSource) {
502,868✔
1578
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1579
      pTaskInfo->code = terrno;
×
1580
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1581
    }
1582

1583
    if (vgId != 0 && pSource->addr.nodeId != vgId){
502,868✔
1584
      pExchangeInfo->current += 1;
38,122✔
1585
      continue;
38,122✔
1586
    }
1587

1588
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
464,746✔
1589
    if (!pDataInfo) {
464,746✔
1590
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1591
      pTaskInfo->code = terrno;
×
1592
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1593
    }
1594
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
464,746✔
1595

1596
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
464,746✔
1597
    if (code != TSDB_CODE_SUCCESS) {
464,746✔
1598
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1599
      pTaskInfo->code = code;
×
1600
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1601
    }
1602

1603
    while (true) {
352✔
1604
      code = exchangeWait(pOperator, pExchangeInfo);
465,098✔
1605
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
465,098✔
1606
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
352✔
1607
      }
1608

1609
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
464,746✔
1610
      if (pDataInfo->seqId != currSeqId) {
464,746✔
1611
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
352✔
1612
        taosMemoryFreeClear(pDataInfo->pRsp);
352✔
1613
        continue;
352✔
1614
      }
1615

1616
      break;
464,394✔
1617
    }
1618

1619
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
464,394✔
1620
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
610✔
1621
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1622
             tstrerror(pDataInfo->code));
1623
      pOperator->pTaskInfo->code = pDataInfo->code;
610✔
1624
      return pOperator->pTaskInfo->code;
610✔
1625
    }
1626

1627
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
463,784✔
1628
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
463,784✔
1629

1630
    if (pRsp->numOfRows == 0) {
463,784✔
1631
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
130,510✔
1632
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1633
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1634
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1635

1636
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
130,510✔
1637
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
130,510✔
1638
        pExchangeInfo->current = totalSources;
58,477✔
1639
      } else {
1640
        pExchangeInfo->current += 1;
72,033✔
1641
      }
1642
      taosMemoryFreeClear(pDataInfo->pRsp);
130,510✔
1643
      continue;
130,510✔
1644
    }
1645

1646
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
333,274✔
1647
    if (code != TSDB_CODE_SUCCESS) {
333,274✔
1648
      goto _error;
×
1649
    }
1650

1651
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
333,274✔
1652
    if (pRsp->completed == 1) {
333,274✔
1653
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
101,190✔
1654
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1655
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1656
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1657
             pExchangeInfo->current + 1, totalSources);
1658

1659
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
101,190✔
1660
      if (isVstbScan(pDataInfo)) {
101,190✔
1661
        pExchangeInfo->current = totalSources;
×
1662
      } else {
1663
        pExchangeInfo->current += 1;
101,190✔
1664
      }
1665
    } else {
1666
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
232,084✔
1667
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1668
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1669
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1670
    }
1671
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
333,274✔
1672
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
144,785✔
1673
    }
1674
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
333,274✔
1675
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
333,274✔
1676

1677
    taosMemoryFreeClear(pDataInfo->pRsp);
333,274✔
1678
    return TSDB_CODE_SUCCESS;
333,274✔
1679
  }
1680

1681
_error:
×
1682
  pTaskInfo->code = code;
×
1683
  return code;
×
1684
}
1685

1686
void clearVtbScanDataInfo(void* pItem) {
101,383✔
1687
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
101,383✔
1688
  if (pInfo->orgTbInfo) {
101,383✔
1689
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
×
1690
    taosMemoryFreeClear(pInfo->orgTbInfo);
×
1691
  }
1692
  if (pInfo->batchOrgTbInfo) {
101,383✔
1693
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
×
1694
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
×
1695
      if (pColMap) {
×
1696
        taosArrayDestroy(pColMap->colMap);
×
1697
      }
1698
    }
1699
    taosArrayDestroy(pInfo->batchOrgTbInfo);
×
1700
  }
1701
  if (pInfo->tagList) {
101,383✔
1702
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
×
1703
    pInfo->tagList = NULL;
×
1704
  }
1705
  taosArrayDestroy(pInfo->pSrcUidList);
101,383✔
1706
}
101,383✔
1707

1708
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
×
1709
  int32_t  code = TSDB_CODE_SUCCESS;
×
1710
  int32_t  lino = 0;
×
1711
  STagVal  dstTag;
×
1712
  bool     needFree = false;
×
1713

1714
  if (pDataInfo->tagList) {
×
1715
    taosArrayClear(pDataInfo->tagList);
×
1716
  }
1717

1718
  if (pBasicParam->tagList) {
×
1719
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
×
1720
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
×
1721

1722
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
×
1723
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
×
1724
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
×
1725

1726
      dstTag = (STagVal){0};
×
1727
      dstTag.type = pSrcTag->type;
×
1728
      dstTag.cid = pSrcTag->cid;
×
1729
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
×
1730
        dstTag.nData = pSrcTag->nData;
×
1731
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
×
1732
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
×
1733
        needFree = true;
×
1734
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
×
1735
      } else {
1736
        dstTag.i64 = pSrcTag->i64;
×
1737
      }
1738

1739
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
×
1740
      needFree = false;
×
1741
    }
1742
  } else {
1743
    pDataInfo->tagList = NULL;
×
1744
  }
1745

1746
  return code;
×
1747
_return:
×
1748
  if (needFree) {
×
1749
    taosMemoryFreeClear(dstTag.pData);
×
1750
  }
1751
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1752
  return code;
×
1753
}
1754

1755
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
×
1756
  int32_t     code = TSDB_CODE_SUCCESS;
×
1757
  int32_t     lino = 0;
×
1758
  SOrgTbInfo  dstOrgTbInfo = {0};
×
1759
  bool        needFree = false;
×
1760

1761
  if (pBasicParam->batchOrgTbInfo) {
×
1762
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
1763
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
×
1764

1765
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
×
1766
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
×
1767
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
×
1768

1769
      dstOrgTbInfo = (SOrgTbInfo){0};
×
1770
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
×
1771
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
×
1772

1773
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
×
1774
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
×
1775

1776
      needFree = true;
×
1777
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
×
1778
      needFree = false;
×
1779
    }
1780
  } else {
1781
    pBasicParam->batchOrgTbInfo = NULL;
×
1782
  }
1783

1784
  return code;
×
1785
_return:
×
1786
  if (needFree) {
×
1787
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
1788
  }
1789
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1790
  return code;
×
1791
}
1792

1793
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
596,392✔
1794
  int32_t            code = TSDB_CODE_SUCCESS;
596,392✔
1795
  int32_t            lino = 0;
596,392✔
1796
  SExchangeInfo*     pExchangeInfo = pOperator->info;
596,392✔
1797
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
596,392✔
1798

1799
  if (NULL == pIdx) {
596,392✔
1800
    if (pBasicParam->isNewDeployed) {
2,167✔
1801
      SDownstreamSourceNode *pNode = NULL;
2,167✔
1802
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,167✔
1803
      QUERY_CHECK_CODE(code, lino, _return);
2,167✔
1804

1805
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,167✔
1806
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,167✔
1807
      QUERY_CHECK_CODE(code, lino, _return);
2,167✔
1808

1809
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,167✔
1810
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,167✔
1811

1812
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,167✔
1813
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,167✔
1814
      if (pExchangeInfo->pHashSources) {
2,167✔
1815
        QUERY_CHECK_CODE(code, lino, _return);
2,167✔
1816
      }
1817
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,167✔
1818
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,167✔
1819
    } else {
1820
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1821
      return TSDB_CODE_INVALID_PARA;
×
1822
    }
1823
  }
1824

1825
  qDebug("start to add single exchange source");
596,392✔
1826

1827
  switch (pBasicParam->type) {
596,392✔
1828
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
×
1829
      if (pIdx->inUseIdx < 0) {
×
1830
        SSourceDataInfo dataInfo = {0};
×
1831
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
×
1832
        dataInfo.taskId = pExchangeInfo->pTaskId;
×
1833
        dataInfo.index = pIdx->srcIdx;
×
1834
        dataInfo.groupid = pBasicParam->groupid;
×
1835
        dataInfo.window = pBasicParam->window;
×
1836
        dataInfo.isNewParam = pBasicParam->isNewParam;
×
1837
        dataInfo.batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
1838
        QUERY_CHECK_NULL(dataInfo.batchOrgTbInfo, code, lino, _return, terrno);
×
1839

1840
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
×
1841
        QUERY_CHECK_CODE(code, lino, _return);
×
1842

1843
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
×
1844
        QUERY_CHECK_CODE(code, lino, _return);
×
1845

1846
        dataInfo.orgTbInfo = NULL;
×
1847

1848
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1849
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
×
1850

1851
        dataInfo.type = pBasicParam->type;
×
1852
        dataInfo.srcOpType = pBasicParam->srcOpType;
×
1853
        dataInfo.tableSeq = pBasicParam->tableSeq;
×
1854

1855
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
×
1856

1857
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
×
1858
      } else {
1859
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
×
1860
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
×
1861

1862
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
×
1863
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
1864
        }
1865

1866
        pDataInfo->taskId = pExchangeInfo->pTaskId;
×
1867
        pDataInfo->index = pIdx->srcIdx;
×
1868
        pDataInfo->window = pBasicParam->window;
×
1869
        pDataInfo->groupid = pBasicParam->groupid;
×
1870
        pDataInfo->isNewParam = pBasicParam->isNewParam;
×
1871

1872
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
×
1873
        QUERY_CHECK_CODE(code, lino, _return);
×
1874

1875
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
×
1876
        QUERY_CHECK_CODE(code, lino, _return);
×
1877

1878
        pDataInfo->orgTbInfo = NULL;
×
1879

1880
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1881
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
×
1882

1883
        pDataInfo->type = pBasicParam->type;
×
1884
        pDataInfo->srcOpType = pBasicParam->srcOpType;
×
1885
        pDataInfo->tableSeq = pBasicParam->tableSeq;
×
1886
      }
1887
      break;
×
1888
    }
1889
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
24,648✔
1890
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
1891
      SSourceDataInfo dataInfo = {0};
24,648✔
1892
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
24,648✔
1893
      dataInfo.taskId = pExchangeInfo->pTaskId;
24,648✔
1894
      dataInfo.index = pIdx->srcIdx;
24,648✔
1895
      dataInfo.window = pBasicParam->window;
24,648✔
1896
      dataInfo.groupid = 0;
24,648✔
1897
      dataInfo.orgTbInfo = NULL;
24,648✔
1898
      dataInfo.tagList = NULL;
24,648✔
1899

1900
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
24,648✔
1901
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
24,648✔
1902

1903
      dataInfo.isNewParam = false;
24,648✔
1904
      dataInfo.type = pBasicParam->type;
24,648✔
1905
      dataInfo.srcOpType = pBasicParam->srcOpType;
24,648✔
1906
      dataInfo.tableSeq = pBasicParam->tableSeq;
24,648✔
1907

1908
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
24,648✔
1909
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
49,296✔
1910
      break;
24,648✔
1911
    }
1912
    case EX_SRC_TYPE_VSTB_SCAN: {
327,378✔
1913
      SSourceDataInfo dataInfo = {0};
327,378✔
1914
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
327,378✔
1915
      dataInfo.taskId = pExchangeInfo->pTaskId;
327,378✔
1916
      dataInfo.index = pIdx->srcIdx;
327,378✔
1917
      dataInfo.window = pBasicParam->window;
327,378✔
1918
      dataInfo.groupid = 0;
327,378✔
1919
      dataInfo.isNewParam = pBasicParam->isNewParam;
327,378✔
1920
      dataInfo.tagList = NULL;
327,378✔
1921
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
327,378✔
1922
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
327,378✔
1923
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
327,378✔
1924
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
327,378✔
1925
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
327,378✔
1926
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
327,378✔
1927

1928
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
327,378✔
1929
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
327,378✔
1930

1931
      dataInfo.type = pBasicParam->type;
327,378✔
1932
      dataInfo.srcOpType = pBasicParam->srcOpType;
327,378✔
1933
      dataInfo.tableSeq = pBasicParam->tableSeq;
327,378✔
1934

1935
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
327,378✔
1936
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
654,756✔
1937
      break;
327,378✔
1938
    }
1939
    case EX_SRC_TYPE_STB_JOIN_SCAN:
244,366✔
1940
    default: {
1941
      if (pIdx->inUseIdx < 0) {
244,366✔
1942
        SSourceDataInfo dataInfo = {0};
242,296✔
1943
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
242,296✔
1944
        dataInfo.taskId = pExchangeInfo->pTaskId;
242,296✔
1945
        dataInfo.index = pIdx->srcIdx;
242,296✔
1946
        dataInfo.groupid = 0;
242,296✔
1947
        dataInfo.tagList = NULL;
242,296✔
1948

1949
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
242,296✔
1950
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
242,296✔
1951

1952
        dataInfo.isNewParam = false;
242,296✔
1953
        dataInfo.type = pBasicParam->type;
242,296✔
1954
        dataInfo.srcOpType = pBasicParam->srcOpType;
242,296✔
1955
        dataInfo.tableSeq = pBasicParam->tableSeq;
242,296✔
1956

1957
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
484,592✔
1958

1959
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
242,296✔
1960
      } else {
1961
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,070✔
1962
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,070✔
1963
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,070✔
1964
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,070✔
1965
        }
1966

1967
        pDataInfo->tagList = NULL;
2,070✔
1968
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,070✔
1969
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,070✔
1970

1971
        pDataInfo->groupid = 0;
2,070✔
1972
        pDataInfo->isNewParam = false;
2,070✔
1973
        pDataInfo->type = pBasicParam->type;
2,070✔
1974
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,070✔
1975
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,070✔
1976
      }
1977
      break;
244,366✔
1978
    }
1979
  }
1980

1981
  return code;
596,392✔
1982
_return:
×
1983
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1984
  return code;
×
1985
}
1986

1987
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
502,781✔
1988
  SExchangeInfo*               pExchangeInfo = pOperator->info;
502,781✔
1989
  int32_t                      code = TSDB_CODE_SUCCESS;
502,781✔
1990
  SExchangeOperatorBasicParam* pBasicParam = NULL;
502,781✔
1991
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
502,781✔
1992
  if (pParam->multiParams) {
502,781✔
1993
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
146,615✔
1994
    int32_t                      iter = 0;
146,615✔
1995
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
386,841✔
1996
      code = addSingleExchangeSource(pOperator, pBasicParam);
240,226✔
1997
      if (code) {
240,226✔
1998
        return code;
×
1999
      }
2000
    }
2001
  } else {
2002
    pBasicParam = &pParam->basic;
356,166✔
2003
    code = addSingleExchangeSource(pOperator, pBasicParam);
356,166✔
2004
  }
2005

2006
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
502,781✔
2007
  pOperator->pOperatorGetParam = NULL;
502,781✔
2008

2009
  return code;
502,781✔
2010
}
2011

2012
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
446,872,972✔
2013
  SExchangeInfo* pExchangeInfo = pOperator->info;
446,872,972✔
2014
  int32_t        code = TSDB_CODE_SUCCESS;
446,873,473✔
2015
  int32_t        lino = 0;
446,873,473✔
2016
  
2017
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
446,873,473✔
2018
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
87,073,964✔
2019
    qDebug("skip prepare, opened:%d, dynamicOp:%d, getParam:%p", OPTR_IS_OPENED(pOperator), pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
360,268,482✔
2020
    return TSDB_CODE_SUCCESS;
360,268,278✔
2021
  }
2022

2023
  if (pExchangeInfo->dynamicOp) {
86,601,678✔
2024
    code = addDynamicExchangeSource(pOperator);
502,781✔
2025
    QUERY_CHECK_CODE(code, lino, _end);
502,781✔
2026
  }
2027

2028
  if (pOperator->status == OP_NOT_OPENED && (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) || IS_STREAM_MODE(pOperator->pTaskInfo)) {
86,604,171✔
2029
    pExchangeInfo->current = 0;
5,212,666✔
2030
  }
2031

2032
  int64_t st = taosGetTimestampUs();
86,601,388✔
2033

2034
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
86,601,388✔
2035
    code = prepareConcurrentlyLoad(pOperator);
81,310,138✔
2036
    QUERY_CHECK_CODE(code, lino, _end);
81,312,738✔
2037
    pExchangeInfo->openedTs = taosGetTimestampUs();
81,312,738✔
2038
  }
2039

2040
  OPTR_SET_OPENED(pOperator);
86,606,101✔
2041
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
86,604,131✔
2042

2043
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
86,603,870✔
2044

2045
_end:
19,411,464✔
2046
  if (code != TSDB_CODE_SUCCESS) {
86,603,544✔
2047
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2048
    pOperator->pTaskInfo->code = code;
×
2049
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2050
  }
2051
  return TSDB_CODE_SUCCESS;
86,603,544✔
2052
}
2053

2054
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
3,973,349✔
2055
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,973,349✔
2056

2057
  if (pLimitInfo->remainGroupOffset > 0) {
3,973,349✔
2058
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2059
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2060
      blockDataCleanup(pBlock);
×
2061
      return PROJECT_RETRIEVE_CONTINUE;
×
2062
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2063
      // now it is the data from a new group
2064
      pLimitInfo->remainGroupOffset -= 1;
×
2065

2066
      // ignore data block in current group
2067
      if (pLimitInfo->remainGroupOffset > 0) {
×
2068
        blockDataCleanup(pBlock);
×
2069
        return PROJECT_RETRIEVE_CONTINUE;
×
2070
      }
2071
    }
2072

2073
    // set current group id of the project operator
2074
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2075
  }
2076

2077
  // here check for a new group data, we need to handle the data of the previous group.
2078
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
3,973,349✔
2079
    pLimitInfo->numOfOutputGroups += 1;
232,218✔
2080
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
232,218✔
2081
      pOperator->status = OP_EXEC_DONE;
×
2082
      blockDataCleanup(pBlock);
×
2083

2084
      return PROJECT_RETRIEVE_DONE;
×
2085
    }
2086

2087
    // reset the value for a new group data
2088
    resetLimitInfoForNextGroup(pLimitInfo);
232,218✔
2089
    // existing rows that belongs to previous group.
2090
    if (pBlock->info.rows > 0) {
232,218✔
2091
      return PROJECT_RETRIEVE_DONE;
232,218✔
2092
    }
2093
  }
2094

2095
  // here we reach the start position, according to the limit/offset requirements.
2096

2097
  // set current group id
2098
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,741,131✔
2099

2100
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,741,131✔
2101
  if (pBlock->info.rows == 0) {
3,741,131✔
2102
    return PROJECT_RETRIEVE_CONTINUE;
1,871,222✔
2103
  } else {
2104
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,869,909✔
2105
      setOperatorCompleted(pOperator);
×
2106
      return PROJECT_RETRIEVE_DONE;
×
2107
    }
2108
  }
2109

2110
  // todo optimize performance
2111
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2112
  // they may not belong to the same group the limit/offset value is not valid in this case.
2113
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,869,909✔
2114
    return PROJECT_RETRIEVE_DONE;
1,869,909✔
2115
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2116
    return PROJECT_RETRIEVE_CONTINUE;
×
2117
  }
2118
}
2119

2120
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
144,969,014✔
2121
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
144,969,014✔
2122
  int32_t        code = TSDB_CODE_SUCCESS;
144,969,014✔
2123
  if (pTask->pWorkerCb) {
144,969,014✔
2124
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
144,968,234✔
2125
    if (code != TSDB_CODE_SUCCESS) {
144,969,014✔
2126
      pTask->code = code;
×
2127
      return pTask->code;
×
2128
    }
2129
  }
2130

2131
  code = tsem_wait(&pExchangeInfo->ready);
144,969,014✔
2132
  if (code != TSDB_CODE_SUCCESS) {
144,968,180✔
2133
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2134
    pTask->code = code;
×
2135
    return pTask->code;
×
2136
  }
2137

2138
  if (pTask->pWorkerCb) {
144,968,180✔
2139
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
144,969,014✔
2140
    if (code != TSDB_CODE_SUCCESS) {
144,969,014✔
2141
      pTask->code = code;
×
2142
      return pTask->code;
×
2143
    }
2144
  }
2145
  return TSDB_CODE_SUCCESS;
144,968,810✔
2146
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc