• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5044

06 May 2026 02:35AM UTC coverage: 73.169% (+0.06%) from 73.107%
#5044

push

travis-ci

web-flow
feat: [6659794715] cpu limit (#35153)

244 of 275 new or added lines in 23 files covered. (88.73%)

526 existing lines in 141 files now uncovered.

277745 of 379596 relevant lines covered (73.17%)

133740972.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.47
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
  uint64_t            fetchTimes;   // per-source fetch count
58
  int64_t             fetchCostUs;  // per-source total RPC round-trip (us)
59
} SSourceDataInfo;
60

61
static void destroyExchangeOperatorInfo(void* param);
62
static void freeBlock(void* pParam);
63
static void freeSourceDataInfo(void* param);
64
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
65

66
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
67
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
68
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
69
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
70
static void    storeNotifyInfo(SOperatorInfo* pOperator);
71
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
72
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
73
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
74
                                 bool holdDataInBuf);
75
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
76

77
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
78

79
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
34,064,534✔
80
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
81
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
82
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
19,489,439✔
83
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
84

85

86
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
14,377,900✔
87
                                             SExchangeInfo* pExchangeInfo,
88
                                             SExecTaskInfo* pTaskInfo) {
89
  int32_t code = 0;
14,377,900✔
90
  int32_t lino = 0;
14,377,900✔
91
  int64_t startTs = taosGetTimestampUs();  
14,377,900✔
92
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
14,377,900✔
93
  int32_t completed = 0;
14,377,900✔
94
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
14,377,900✔
95
  if (code != TSDB_CODE_SUCCESS) {
14,377,900✔
96
    pTaskInfo->code = code;
×
97
    T_LONG_JMP(pTaskInfo->env, code);
×
98
  }
99
  if (completed == totalSources) {
14,377,900✔
100
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
2,541,694✔
101
    setAllSourcesCompleted(pOperator);
2,541,694✔
102
    return;
2,544,156✔
103
  }
104

105
  SSourceDataInfo* pDataInfo = NULL;
11,836,206✔
106
  SStreamRuntimeFuncInfo* pStream = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
11,836,206✔
107

108
  while (1) {
5,841,861✔
109
    if (pExchangeInfo->current < 0) {
17,678,322✔
110
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
210,737✔
111
      setAllSourcesCompleted(pOperator);
210,737✔
112
      return;
210,737✔
113
    }
114
    
115
    if (pExchangeInfo->current >= totalSources) {
17,467,585✔
116
      completed = 0;
8,369,408✔
117
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
8,369,408✔
118
      if (code != TSDB_CODE_SUCCESS) {
8,369,408✔
119
        pTaskInfo->code = code;
×
120
        T_LONG_JMP(pTaskInfo->env, code);
×
121
      }
122
      if (completed == totalSources) {
8,369,408✔
123
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
4,424,061✔
124
        setAllSourcesCompleted(pOperator);
4,424,061✔
125
        return;
4,424,061✔
126
      }
127
      
128
      pExchangeInfo->current = 0;
3,945,347✔
129
    }
130

131
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
13,043,524✔
132

133
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
13,043,524✔
134
    if (!pDataInfo) {
13,043,014✔
135
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
136
      pTaskInfo->code = terrno;
×
137
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
138
    }
139

140
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
13,043,014✔
141
      pExchangeInfo->current++;
212,035✔
142
      continue;
212,035✔
143
    }
144

145
    if (!IS_STREAM_SINGLE_GRP(pTaskInfo) && pStream->pGroupReadInfos) {
12,831,489✔
146
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
×
147
      if (!pDataInfo) {
×
148
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
149
        pTaskInfo->code = terrno;
×
150
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
151
      }
152

153
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH) {
×
154
        SArray** ppNode = tSimpleHashGet(pStream->pGroupReadInfos, &pSource->addr.nodeId, sizeof(pSource->addr.nodeId));
×
155
        if (NULL == ppNode) {
×
156
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
×
157
          pExchangeInfo->current++;
×
158
          continue;
255✔
159
        }
160

161
        pStream->curNodeId = pSource->addr.nodeId;
×
162
        pStream->curGrpRead = *ppNode;
×
163
      }
164
    }
165

166
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
12,831,489✔
167

168
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
12,831,489✔
169
    if (code != TSDB_CODE_SUCCESS) {
12,831,489✔
170
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
171
      pTaskInfo->code = code;
×
172
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
173
    }
174

175
    while (true) {
572✔
176
      recordOpExecBeforeDownstream(pOperator);
12,832,061✔
177
      code = exchangeWait(pOperator, pExchangeInfo);
12,832,061✔
178
      recordOpExecAfterDownstream(pOperator, 0);
12,832,061✔
179

180
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
12,832,061✔
181
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
572✔
182
      }
183

184
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
12,831,489✔
185
      if (pDataInfo->seqId != currSeqId) {
12,831,394✔
186
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
572✔
187
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
188
        taosMemoryFreeClear(pDataInfo->pRsp);
572✔
189
        continue;
572✔
190
      }
191

192
      break;
12,830,822✔
193
    }
194

195
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
12,830,822✔
196
    if (!pSource) {
12,830,917✔
197
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
198
      pTaskInfo->code = terrno;
×
199
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
200
    }
201

202
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
12,830,917✔
UNCOV
203
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
204
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
205
             tstrerror(pDataInfo->code));
UNCOV
206
      pTaskInfo->code = pDataInfo->code;
×
UNCOV
207
      T_LONG_JMP(pTaskInfo->env, code);
×
208
    }
209

210
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
12,830,631✔
211
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
12,830,917✔
212

213
    if (pRsp->numOfRows == 0) {
12,830,712✔
214
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
5,629,826✔
215
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
216
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
217
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
218

219
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
5,629,826✔
220
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
5,629,826✔
221
        pExchangeInfo->current = -1;
210,737✔
222
      } else {
223
        pExchangeInfo->current += 1;
5,419,089✔
224
      }
225
      taosMemoryFreeClear(pDataInfo->pRsp);
5,629,826✔
226
      continue;
5,629,571✔
227
    }
228

229
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
7,201,091✔
230
    TAOS_CHECK_EXIT(code);
7,201,091✔
231

232
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
7,201,091✔
233
    if (pRsp->completed == 1) {
7,201,091✔
234
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
2,787,814✔
235
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
236
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
237
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
238
             pExchangeInfo->current + 1, totalSources);
239

240
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,787,814✔
241
      if (isVstbScan(pDataInfo)) {
2,787,814✔
242
        pExchangeInfo->current = -1;
×
243
        taosMemoryFreeClear(pDataInfo->pRsp);
×
244
        continue;
×
245
      }
246
    } else {
247
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
4,413,277✔
248
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
249
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
250
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
251
    }
252

253
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
7,201,091✔
254
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
7,201,091✔
255

256
    pExchangeInfo->current++;
7,201,091✔
257

258
    taosMemoryFreeClear(pDataInfo->pRsp);
7,201,091✔
259
    return;
7,201,091✔
260
  }
261

262
_exit:
×
263

264
  if (code) {
×
265
    pTaskInfo->code = code;
×
266
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
267
  }
268
}
269

270

271
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
249,950,355✔
272
                                           SExecTaskInfo* pTaskInfo) {
273
  int32_t code = 0;
249,950,355✔
274
  int32_t lino = 0;
249,950,355✔
275
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
249,950,355✔
276
  int32_t completed = 0;
249,950,971✔
277
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
249,950,971✔
278
  if (code != TSDB_CODE_SUCCESS) {
249,951,495✔
279
    pTaskInfo->code = code;
×
280
    T_LONG_JMP(pTaskInfo->env, code);
×
281
  }
282
  if (completed == totalSources) {
249,951,495✔
283
    setAllSourcesCompleted(pOperator);
80,099,276✔
284
    return;
80,100,342✔
285
  }
286

287
  SSourceDataInfo* pDataInfo = NULL;
169,852,219✔
288

289
  while (1) {
25,291,365✔
290
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
195,143,584✔
291
    recordOpExecBeforeDownstream(pOperator);
195,143,584✔
292
    code = exchangeWait(pOperator, pExchangeInfo);
195,143,584✔
293
    recordOpExecAfterDownstream(pOperator, 0);
195,143,990✔
294

295
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
195,143,990✔
296
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
2,900✔
297
    }
298

299
    for (int32_t i = 0; i < totalSources; ++i) {
346,333,192✔
300
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
346,332,622✔
301
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
346,332,622✔
302
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
346,332,622✔
303
        continue;
114,745,201✔
304
      }
305

306
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
231,587,991✔
307
        continue;
36,446,901✔
308
      }
309

310
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
195,141,090✔
311
      if (pDataInfo->seqId != currSeqId) {
195,140,561✔
312
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
313
        taosMemoryFreeClear(pDataInfo->pRsp);
×
314
        break;
×
315
      }
316

317
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
195,140,561✔
318
        code = pDataInfo->code;
3,569✔
319
        TAOS_CHECK_EXIT(code);
3,569✔
320
      }
321

322
      tmemory_barrier();
195,137,521✔
323
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
195,137,521✔
324
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
195,137,521✔
325
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
195,137,521✔
326

327
      // todo
328
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
195,137,521✔
329
      if (pRsp->numOfRows == 0) {
195,136,992✔
330
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
49,849,072✔
331
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
332
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
333
          if (code != TSDB_CODE_SUCCESS) {
×
334
            taosMemoryFreeClear(pDataInfo->pRsp);
×
335
            TAOS_CHECK_EXIT(code);
×
336
          }
337
        } else {
338
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
49,849,072✔
339
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
49,849,072✔
340
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
341
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
342
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
343
          taosMemoryFreeClear(pDataInfo->pRsp);
49,849,072✔
344
        }
345
        break;
49,849,072✔
346
      }
347

348
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
145,287,920✔
349

350
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
145,288,449✔
351
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
145,288,449✔
352
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
145,287,890✔
353

354
      if (pRsp->completed == 1) {
145,288,449✔
355
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
131,312,417✔
356
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
131,312,417✔
357
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
358
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
359
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
360
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
361
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
362
      } else {
363
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
13,976,032✔
364
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
365
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
366
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
367
      }
368

369
      taosMemoryFreeClear(pDataInfo->pRsp);
145,288,449✔
370

371
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
145,287,890✔
372
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
13,976,032✔
373
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
13,976,032✔
374
        if (code != TSDB_CODE_SUCCESS) {
13,976,032✔
375
          taosMemoryFreeClear(pDataInfo->pRsp);
×
376
          TAOS_CHECK_EXIT(code);
×
377
        }
378
      }
379
      
380
      return;
145,287,942✔
381
    }  // end loop
382

383
    int32_t complete1 = 0;
49,849,642✔
384
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
49,849,072✔
385
    if (code != TSDB_CODE_SUCCESS) {
49,849,072✔
386
      pTaskInfo->code = code;
×
387
      T_LONG_JMP(pTaskInfo->env, code);
×
388
    }
389
    if (complete1 == totalSources) {
49,849,072✔
390
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
24,557,707✔
391
      return;
24,557,707✔
392
    }
393
  }
394

395
_exit:
3,569✔
396

397
  if (code) {
3,569✔
398
    pTaskInfo->code = code;
3,569✔
399
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
3,569✔
400
  }
401
}
402

403
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
677,468,102✔
404
  int32_t        code = TSDB_CODE_SUCCESS;
677,468,102✔
405
  SExchangeInfo* pExchangeInfo = pOperator->info;
677,468,102✔
406
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
677,467,578✔
407

408
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
677,467,928✔
409

410
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
677,466,164✔
411
  if (pOperator->status == OP_EXEC_DONE) {
677,466,562✔
412
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
413
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
414
           pLoadInfo->totalElapsed / 1000.0);
415
    return NULL;
×
416
  }
417

418
  // we have buffered retrieved datablock, return it directly
419
  SSDataBlock* p = NULL;
677,466,735✔
420
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
677,467,522✔
421
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
370,635,332✔
422
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
370,635,332✔
423
  }
424

425
  if (p != NULL) {
677,467,522✔
426
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
370,635,586✔
427
    if (!tmp) {
370,635,332✔
428
      code = terrno;
×
429
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
430
      pTaskInfo->code = code;
×
431
      T_LONG_JMP(pTaskInfo->env, code);
×
432
    }
433
    return p;
370,635,332✔
434
  } else {
435
    if (pExchangeInfo->seqLoadData && (IS_NON_STREAM_MODE(pTaskInfo) || IS_STREAM_SINGLE_GRP(pTaskInfo))) {
306,831,936✔
436
      code = seqLoadRemoteData(pOperator);
42,502,866✔
437
      if (code != TSDB_CODE_SUCCESS) {
42,502,866✔
438
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1,355✔
439
        pTaskInfo->code = code;
1,355✔
440
        T_LONG_JMP(pTaskInfo->env, code);
1,355✔
441
      }
442
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
264,327,930✔
443
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
14,377,645✔
444
    } else {
445
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
249,951,495✔
446
    }
447
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
306,827,825✔
448
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
3,569✔
449
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
3,569✔
450
    }
451
    
452
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
306,823,387✔
453
      qDebug("empty resultBlockList");
123,398,886✔
454
      return NULL;
123,398,886✔
455
    } else {
456
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
183,425,640✔
457
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
183,425,640✔
458
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
183,425,640✔
459
      if (!tmp) {
183,425,640✔
460
        code = terrno;
×
461
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
462
        pTaskInfo->code = code;
×
463
        T_LONG_JMP(pTaskInfo->env, code);
×
464
      }
465

466
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
183,425,640✔
467
      return p;
183,425,640✔
468
    }
469
  }
470
}
471

472
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
675,479,779✔
473
  int32_t        code = TSDB_CODE_SUCCESS;
675,479,779✔
474
  int32_t        lino = 0;
675,479,779✔
475
  SExchangeInfo* pExchangeInfo = pOperator->info;
675,479,779✔
476
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
675,481,699✔
477

478
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
675,480,647✔
479

480
  code = pOperator->fpSet._openFn(pOperator);
675,482,663✔
481
  QUERY_CHECK_CODE(code, lino, _end);
675,481,461✔
482

483
  if (pOperator->status == OP_EXEC_DONE) {
675,481,461✔
484
    (*ppRes) = NULL;
273,726✔
485
    return code;
273,726✔
486
  }
487

488
  while (1) {
2,259,406✔
489
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
677,467,396✔
490
    if (pBlock == NULL) {
677,459,610✔
491
      (*ppRes) = NULL;
123,398,886✔
492
      return code;
123,398,886✔
493
    }
494

495
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
554,060,724✔
496
    QUERY_CHECK_CODE(code, lino, _end);
554,060,724✔
497

498
    if (blockDataGetNumOfRows(pBlock) == 0) {
554,060,724✔
499
      qDebug("rows 0 block got, continue next load");
3,558✔
500
      continue;
3,558✔
501
    }
502

503
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
554,057,668✔
504
    if (hasLimitOffsetInfo(pLimitInfo)) {
554,057,668✔
505
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
4,817,904✔
506
      if (status == PROJECT_RETRIEVE_CONTINUE) {
4,817,904✔
507
        qDebug("limit retrieve continue");
2,255,848✔
508
        continue;
2,255,848✔
509
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,562,056✔
510
        if (pBlock->info.rows == 0) {
2,562,056✔
511
          setOperatorCompleted(pOperator);
×
512
          (*ppRes) = NULL;
×
513
          return code;
×
514
        } else {
515
          (*ppRes) = pBlock;
2,562,056✔
516
          return code;
2,562,056✔
517
        }
518
      }
519
    } else {
520
      (*ppRes) = pBlock;
549,239,764✔
521
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
549,239,527✔
522
      return code;
549,239,764✔
523
    }
524
  }
525

526
_end:
×
527

528
  if (code != TSDB_CODE_SUCCESS) {
×
529
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
530
    pTaskInfo->code = code;
×
531
    T_LONG_JMP(pTaskInfo->env, code);
×
532
  } else {
533
    qDebug("empty block returned in exchange");
×
534
  }
535
  
536
  (*ppRes) = NULL;
×
537
  return code;
×
538
}
539

540
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
116,521,068✔
541
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
116,521,068✔
542
  if (pInfo->pSourceDataInfo == NULL) {
116,519,664✔
543
    return terrno;
×
544
  }
545

546
  if (pInfo->dynamicOp) {
116,517,701✔
547
    return TSDB_CODE_SUCCESS;
6,765,594✔
548
  }
549

550
  int32_t len = strlen(id) + 1;
109,753,969✔
551
  pInfo->pTaskId = taosMemoryCalloc(1, len);
109,753,969✔
552
  if (!pInfo->pTaskId) {
109,756,401✔
553
    return terrno;
×
554
  }
555
  tstrncpy(pInfo->pTaskId, id, len);
109,753,544✔
556
  for (int32_t i = 0; i < numOfSources; ++i) {
289,700,952✔
557
    SSourceDataInfo dataInfo = {0};
179,943,463✔
558
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
179,945,750✔
559
    dataInfo.taskId = pInfo->pTaskId;
179,945,750✔
560
    dataInfo.index = i;
179,947,511✔
561
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
179,947,511✔
562
    if (pDs == NULL) {
179,947,490✔
563
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
564
      return terrno;
×
565
    }
566
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
179,947,490✔
567
  }
568

569
  return TSDB_CODE_SUCCESS;
109,757,489✔
570
}
571

572
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
116,522,025✔
573
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
116,522,025✔
574

575
  if (numOfSources == 0) {
116,520,684✔
576
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
577
    return TSDB_CODE_INVALID_PARA;
×
578
  }
579
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
116,520,684✔
580
  if (!pInfo->pFetchRpcHandles) {
116,522,025✔
581
    return terrno;
×
582
  }
583
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
116,520,967✔
584
  if (!ret) {
116,519,067✔
585
    return terrno;
×
586
  }
587

588
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
116,519,067✔
589
  if (pInfo->pSources == NULL) {
116,519,019✔
590
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
591
    return terrno;
×
592
  }
593

594
  if (pExNode->node.dynamicOp) {
116,519,954✔
595
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,765,594✔
596
    if (NULL == pInfo->pHashSources) {
6,765,594✔
597
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
598
      return terrno;
×
599
    }
600
  }
601

602
  for (int32_t i = 0; i < numOfSources; ++i) {
314,304,131✔
603
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
197,781,542✔
604
    if (!pNode) {
197,782,058✔
605
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
606
      return terrno;
×
607
    }
608
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
197,782,058✔
609
    if (!tmp) {
197,786,828✔
610
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
611
      return terrno;
×
612
    }
613
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
197,786,828✔
614
    int32_t           code =
615
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
197,785,334✔
616
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
197,782,905✔
617
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
618
      return code;
×
619
    }
620
  }
621

622
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
116,522,589✔
623
  int64_t refId = taosAddRef(fetchObjRefPool, pInfo);
116,519,073✔
624
  if (refId < 0) {
116,520,662✔
625
    int32_t code = terrno;
×
626
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
627
    return code;
×
628
  } else {
629
    pInfo->self = refId;
116,520,662✔
630
  }
631

632
  return initDataSource(numOfSources, pInfo, id);
116,521,619✔
633
}
634

635
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
12,206,250✔
636
  SExchangeInfo* pInfo = pOper->info;
12,206,250✔
637
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
12,206,703✔
638

639
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
12,206,951✔
640

641
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
12,208,142✔
642
  pOper->status = OP_NOT_OPENED;
12,207,651✔
643
  pInfo->current = 0;
12,207,651✔
644
  pInfo->loadInfo.totalElapsed = 0;
12,207,651✔
645
  pInfo->loadInfo.totalRows = 0;
12,207,651✔
646
  pInfo->loadInfo.totalSize = 0;
12,207,396✔
647
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
35,908,372✔
648
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
23,700,721✔
649
    taosWLockLatch(&pDataInfo->lock);
23,700,721✔
650
    taosMemoryFreeClear(pDataInfo->decompBuf);
23,700,721✔
651
    taosMemoryFreeClear(pDataInfo->pRsp);
23,700,721✔
652

653
    pDataInfo->totalRows = 0;
23,700,721✔
654
    pDataInfo->code = 0;
23,700,721✔
655
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
23,700,721✔
656
    pDataInfo->fetchSent = false;
23,700,466✔
657
    pDataInfo->fetchTimes = 0;
23,700,721✔
658
    pDataInfo->fetchCostUs = 0;
23,700,721✔
659
    taosWUnLockLatch(&pDataInfo->lock);
23,700,721✔
660
  }
661

662
  if (pInfo->dynamicOp) {
12,207,396✔
663
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
2,338,064✔
664
  } 
665

666
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
12,207,396✔
667
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
12,207,651✔
668

669
  blockDataCleanup(pInfo->pDummyBlock);
12,207,651✔
670

671
  void   *data = NULL;
12,207,651✔
672
  int32_t iter = 0;
12,207,651✔
673
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
16,482,391✔
674
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
4,274,740✔
675
  }
676
  
677
  pInfo->limitInfo = (SLimitInfo){0};
12,207,396✔
678
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
12,207,651✔
679

680
  return 0;
12,207,651✔
681
}
682

683
static int32_t exchangeGetExplainExecInfo(SOperatorInfo* pOptr,
1,773,688✔
684
                                          void** pOptrExplain, uint32_t* len) {
685
  const SExchangeInfo* pExchangeInfo = pOptr->info;
1,773,688✔
686
  int32_t numSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
1,773,688✔
687

688
  SExchangeExplainInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeExplainInfo));
1,773,688✔
689
  if (!pInfo) {
1,772,848✔
690
    return terrno;
×
691
  }
692

693
  pInfo->mode = pExchangeInfo->seqLoadData ? 1 : 0;
1,772,848✔
694
  pInfo->numSources = numSources;
1,773,268✔
695

696
  /* all sources are exhausted, thus no need to lock the sources data info */
697
  for (int32_t i = 0; i < numSources; ++i) {
3,881,022✔
698
    const SSourceDataInfo* pSrc = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2,108,174✔
699
    pInfo->avgFetchTimes += (double)pSrc->fetchTimes / numSources;
2,108,174✔
700
    pInfo->avgFetchRows += (double)pSrc->totalRows / numSources;
2,108,594✔
701
    pInfo->avgFetchCost += (double)pSrc->fetchCostUs / numSources;
2,108,174✔
702
    pInfo->maxFetchTimes = TMAX(pInfo->maxFetchTimes, pSrc->fetchTimes);
2,108,594✔
703
    pInfo->maxFetchRows = TMAX(pInfo->maxFetchRows, pSrc->totalRows);
2,108,594✔
704
    pInfo->maxFetchCost = TMAX(pInfo->maxFetchCost, pSrc->fetchCostUs);
2,108,174✔
705
  }
706

707
  *pOptrExplain = pInfo;
1,772,848✔
708
  *len = sizeof(SExchangeExplainInfo);
1,772,848✔
709
  return TSDB_CODE_SUCCESS;
1,773,268✔
710
}
711

712
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
116,522,554✔
713
                                   SOperatorInfo** pOptrInfo) {
714
  QRY_PARAM_CHECK(pOptrInfo);
116,522,554✔
715

716
  int32_t        code = 0;
116,523,489✔
717
  int32_t        lino = 0;
116,523,489✔
718
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
116,523,489✔
719
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
116,519,496✔
720
  if (pInfo == NULL || pOperator == NULL) {
116,520,125✔
721
    code = terrno;
×
722
    goto _error;
×
723
  }
724
  initOperatorCostInfo(pOperator);
116,520,125✔
725

726
  pInfo->isExchange = true;
116,522,025✔
727
  pOperator->pPhyNode = pExNode;
116,522,554✔
728
  pInfo->dynamicOp = pExNode->node.dynamicOp;
116,522,554✔
729
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
116,521,619✔
730
  QUERY_CHECK_CODE(code, lino, _error);
116,522,677✔
731

732
  code = tsem_init(&pInfo->ready, 0, 0);
116,522,677✔
733
  QUERY_CHECK_CODE(code, lino, _error);
116,519,398✔
734

735
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
116,519,398✔
736
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
116,522,126✔
737

738
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
116,521,567✔
739
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
116,521,952✔
740
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
116,521,567✔
741
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
116,518,970✔
742

743
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
116,521,720✔
744
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
116,522,118✔
745
  QUERY_CHECK_CODE(code, lino, _error);
116,521,567✔
746

747
  pInfo->seqLoadData = pExNode->seqRecvData;
116,521,567✔
748
  pInfo->dynTbname = pExNode->dynTbname;
116,522,524✔
749
  if (pInfo->dynTbname) {
116,519,871✔
750
    pInfo->seqLoadData = true;
34,017✔
751
  }
752
  pInfo->pTransporter = pTransporter;
116,521,008✔
753

754
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
116,519,626✔
755
                  pTaskInfo);
756
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
116,520,467✔
757

758
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
116,523,489✔
759
                            pTaskInfo->pStreamRuntimeInfo);
116,521,709✔
760
  QUERY_CHECK_CODE(code, lino, _error);
116,520,201✔
761
  filterSetExecContext(pOperator->exprSupp.pFilterInfo, pTaskInfo, isTaskKilled);
116,520,201✔
762
  qTrace("%s exchange op:%p", __func__, pOperator);
116,519,510✔
763
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL,
116,519,510✔
764
                                         destroyExchangeOperatorInfo, optrDefaultBufFn,
765
                                         exchangeGetExplainExecInfo, optrDefaultGetNextExtFn, NULL);
766
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
116,518,679✔
767
  *pOptrInfo = pOperator;
116,521,416✔
768
  return TSDB_CODE_SUCCESS;
116,520,449✔
769

770
_error:
×
771
  if (code != TSDB_CODE_SUCCESS) {
×
772
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
773
    pTaskInfo->code = code;
×
774
  }
775
  if (pInfo != NULL) {
×
776
    doDestroyExchangeOperatorInfo(pInfo);
×
777
  }
778

779
  if (pOperator != NULL) {
×
780
    pOperator->info = NULL;
×
781
    destroyOperator(pOperator);
×
782
  }
783
  return code;
×
784
}
785

786
void destroyExchangeOperatorInfo(void* param) {
116,522,909✔
787
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
116,522,909✔
788
  int32_t        code = taosRemoveRef(fetchObjRefPool, pExInfo->self);
116,522,909✔
789
  if (code != TSDB_CODE_SUCCESS) {
116,522,339✔
790
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
791
  }
792
}
116,522,339✔
793

794
void freeBlock(void* pParam) {
335,729,954✔
795
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
335,729,954✔
796
  blockDataDestroy(pBlock);
335,729,954✔
797
}
335,731,104✔
798

799
void freeSourceDataInfo(void* p) {
184,271,765✔
800
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
184,271,765✔
801
  taosMemoryFreeClear(pInfo->decompBuf);
184,271,765✔
802
  taosMemoryFreeClear(pInfo->pRsp);
184,272,915✔
803

804
  pInfo->decompBufSize = 0;
184,271,185✔
805
}
184,272,915✔
806

807
void doDestroyExchangeOperatorInfo(void* param) {
116,522,909✔
808
  if (param == NULL) {
116,522,909✔
809
    return;
×
810
  }
811
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
116,522,909✔
812
  if (pExInfo->pFetchRpcHandles) {
116,522,909✔
813
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
314,310,307✔
814
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
197,786,398✔
815
      if (*pRpcHandle > 0) {
197,787,398✔
816
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
9,913,917✔
817
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
9,913,917✔
818
      }
819
    }
820
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
116,523,489✔
821
  }
822

823
  taosArrayDestroy(pExInfo->pSources);
116,523,489✔
824
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
116,522,919✔
825

826
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
116,522,339✔
827
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
116,522,919✔
828

829
  blockDataDestroy(pExInfo->pDummyBlock);
116,522,919✔
830
  tSimpleHashCleanup(pExInfo->pHashSources);
116,522,909✔
831

832
  int32_t code = tsem_destroy(&pExInfo->ready);
116,522,919✔
833
  if (code != TSDB_CODE_SUCCESS) {
116,522,339✔
834
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
835
  }
836
  taosMemoryFreeClear(pExInfo->pTaskId);
116,522,339✔
837

838
  taosMemoryFreeClear(param);
116,522,339✔
839
}
840

841
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
251,387,463✔
842
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
251,387,463✔
843

844
  taosMemoryFreeClear(pMsg->pEpSet);
251,387,463✔
845
  SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pWrapper->exchangeId);
251,416,102✔
846
  if (pExchangeInfo == NULL) {
251,425,590✔
847
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
23,679✔
848
    taosMemoryFree(pMsg->pData);
23,679✔
849
    return TSDB_CODE_SUCCESS;
23,679✔
850
  }
851

852
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
251,401,911✔
853
  if (pWrapper->seqId != currSeqId) {
251,407,493✔
854
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
855
    taosMemoryFree(pMsg->pData);
×
856
    code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
×
857
    if (code != TSDB_CODE_SUCCESS) {
×
858
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
859
    }
860
    return TSDB_CODE_SUCCESS;
×
861
  }
862

863
  int32_t          index = pWrapper->sourceIndex;
251,396,161✔
864

865
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
251,396,410✔
866

867
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
251,403,283✔
868
  if (pRpcHandle != NULL) {
251,409,770✔
869
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
251,412,991✔
870
    if (ret != 0) {
251,368,491✔
871
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
12,864,179✔
872
    }
873
    *pRpcHandle = -1;
251,368,491✔
874
  }
875

876
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
251,369,399✔
877
  if (!pSourceDataInfo) {
251,402,193✔
878
    return terrno;
×
879
  }
880

881
  if (0 == code && NULL == pMsg->pData) {
251,402,193✔
882
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
883
    code = TSDB_CODE_QRY_INVALID_MSG;
×
884
  }
885

886
  taosWLockLatch(&pSourceDataInfo->lock);
251,400,103✔
887
  if (code == TSDB_CODE_SUCCESS) {
251,384,966✔
888
    pSourceDataInfo->fetchCostUs += taosGetTimestampUs() - pSourceDataInfo->startTime;
251,368,282✔
889
    pSourceDataInfo->fetchTimes++;
251,378,603✔
890

891
    pSourceDataInfo->seqId = pWrapper->seqId;
251,349,626✔
892
    pSourceDataInfo->pRsp = pMsg->pData;
251,370,480✔
893

894
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
251,342,915✔
895
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
251,382,860✔
896
    pRsp->compLen = htonl(pRsp->compLen);
251,373,828✔
897
    pRsp->payloadLen = htonl(pRsp->payloadLen);
251,371,500✔
898
    pRsp->numOfCols = htonl(pRsp->numOfCols);
251,309,530✔
899
    pRsp->useconds = htobe64(pRsp->useconds);
251,338,651✔
900
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
251,269,051✔
901

902
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
251,319,949✔
903
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
904
  } else {
905
    taosMemoryFree(pMsg->pData);
6,769✔
906
    pSourceDataInfo->code = rpcCvtErrCode(code);
6,769✔
907
    if (pSourceDataInfo->code != code) {
6,769✔
908
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
909
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
910
    } else {
911
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
6,769✔
912
             pExchangeInfo);
913
    }
914
  }
915

916
  tmemory_barrier();
251,341,094✔
917
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
251,341,094✔
918
  taosWUnLockLatch(&pSourceDataInfo->lock);
251,362,718✔
919
  
920
  code = tsem_post(&pExchangeInfo->ready);
251,362,791✔
921
  if (code != TSDB_CODE_SUCCESS) {
251,400,617✔
922
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
923
    return code;
×
924
  }
925

926
  code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
251,400,617✔
927
  if (code != TSDB_CODE_SUCCESS) {
251,421,346✔
928
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
929
  }
930
  return code;
251,411,405✔
931
}
932

933
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
297,143✔
934
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
297,143✔
935
  if (NULL == *ppRes) {
297,143✔
936
    return terrno;
×
937
  }
938

939
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
297,143✔
940
  if (NULL == pScan) {
297,143✔
941
    taosMemoryFreeClear(*ppRes);
×
942
    return terrno;
×
943
  }
944

945
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
297,143✔
946
  pScan->pUidList = taosArrayDup(pUidList, NULL);
297,143✔
947
  if (NULL == pScan->pUidList) {
297,143✔
948
    taosMemoryFree(pScan);
×
949
    taosMemoryFreeClear(*ppRes);
×
950
    return terrno;
×
951
  }
952
  pScan->dynType = DYN_TYPE_STB_JOIN;
297,143✔
953
  pScan->tableSeq = tableSeq;
297,143✔
954
  pScan->pOrgTbInfo = NULL;
297,143✔
955
  pScan->pBatchTbInfo = NULL;
297,143✔
956
  pScan->pTagList = NULL;
297,143✔
957
  pScan->isNewParam = false;
297,143✔
958
  pScan->window.skey = INT64_MAX;
297,143✔
959
  pScan->window.ekey = INT64_MIN;
297,143✔
960
  pScan->notifyToProcess = false;
297,143✔
961
  pScan->notifyTs = 0;
297,143✔
962

963
  (*ppRes)->opType = srcOpType;
297,143✔
964
  (*ppRes)->downstreamIdx = 0;
297,143✔
965
  (*ppRes)->value = pScan;
297,143✔
966
  (*ppRes)->pChildren = NULL;
297,143✔
967
  (*ppRes)->reUse = false;
297,143✔
968

969
  return TSDB_CODE_SUCCESS;
297,143✔
970
}
971

972
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
39,945,863✔
973
  int32_t                  code = TSDB_CODE_SUCCESS;
39,945,863✔
974
  int32_t                  lino = 0;
39,945,863✔
975
  STableScanOperatorParam* pScan = NULL;
39,945,863✔
976

977
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
39,945,863✔
978
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
39,945,863✔
979

980
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
39,945,863✔
981
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
39,945,863✔
982

983
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
39,945,863✔
984
  if (pUidList) {
39,945,863✔
985
    pScan->pUidList = taosArrayDup(pUidList, NULL);
39,945,863✔
986
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
39,945,863✔
987
  } else {
988
    pScan->pUidList = NULL;
×
989
  }
990

991
  if (pMap) {
39,945,863✔
992
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
39,046,979✔
993
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
39,046,979✔
994

995
    pScan->pOrgTbInfo->vgId = pMap->vgId;
39,046,979✔
996
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
39,046,979✔
997

998
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
39,046,979✔
999
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
39,046,979✔
1000
  } else {
1001
    pScan->pOrgTbInfo = NULL;
898,884✔
1002
  }
1003
  pScan->pTagList = NULL;
39,945,863✔
1004
  pScan->pBatchTbInfo = NULL;
39,945,863✔
1005

1006

1007
  pScan->dynType = type;
39,945,863✔
1008
  pScan->tableSeq = tableSeq;
39,945,863✔
1009
  pScan->window.skey = window->skey;
39,945,863✔
1010
  pScan->window.ekey = window->ekey;
39,945,863✔
1011
  pScan->isNewParam = isNewParam;
39,945,863✔
1012
  pScan->notifyToProcess = false;
39,945,863✔
1013
  pScan->notifyTs = 0;
39,945,863✔
1014
  (*ppRes)->opType = srcOpType;
39,945,863✔
1015
  (*ppRes)->downstreamIdx = 0;
39,945,863✔
1016
  (*ppRes)->value = pScan;
39,945,863✔
1017
  (*ppRes)->pChildren = NULL;
39,945,863✔
1018
  (*ppRes)->reUse = false;
39,945,863✔
1019

1020
  return code;
39,945,863✔
1021
_return:
×
1022
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1023
  taosMemoryFreeClear(*ppRes);
×
1024
  if (pScan) {
×
1025
    taosArrayDestroy(pScan->pUidList);
×
1026
    if (pScan->pOrgTbInfo) {
×
1027
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
1028
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
1029
    }
1030
    taosMemoryFree(pScan);
×
1031
  }
1032
  return code;
×
1033
}
1034

1035
/**
1036
  @brief build the table scan operator param for notify message
1037
*/
1038
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
243,418✔
1039
                                          int32_t srcOpType, TSKEY notifyTs) {
1040
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
243,418✔
1041
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
1042
    return TSDB_CODE_INVALID_PARA;
×
1043
  }
1044
  int32_t code = TSDB_CODE_SUCCESS;
243,418✔
1045
  int32_t lino = 0;
243,418✔
1046
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
243,418✔
1047
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
243,418✔
1048

1049
  STableScanOperatorParam* pTsParam =
486,836✔
1050
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
243,418✔
1051
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
243,418✔
1052

1053
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
243,418✔
1054
  pTsParam->notifyToProcess = true;
243,418✔
1055
  pTsParam->notifyTs = notifyTs;
243,418✔
1056

1057
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
243,418✔
1058
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1059
  (*ppRes)->downstreamIdx = 0;
243,418✔
1060
  (*ppRes)->value = pTsParam;
243,418✔
1061
  (*ppRes)->pChildren = NULL;
243,418✔
1062
  /* param is not reusable when it is transferred by message */
1063
  (*ppRes)->reUse = false;
243,418✔
1064

1065
_return:
243,418✔
1066
  if (TSDB_CODE_SUCCESS != code) {
243,418✔
1067
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1068
           __func__, lino, tstrerror(code));
1069
    taosMemoryFreeClear(*ppRes);
×
1070
    if (pTsParam) {
×
1071
      taosMemoryFree(pTsParam);
×
1072
    }
1073
  }
1074
  return code;
243,418✔
1075
}
1076

1077
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
2,709,482✔
1078
  int32_t                  code = TSDB_CODE_SUCCESS;
2,709,482✔
1079
  int32_t                  lino = 0;
2,709,482✔
1080
  STableScanOperatorParam* pScan = NULL;
2,709,482✔
1081

1082
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
2,709,482✔
1083
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
2,709,482✔
1084

1085
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
2,709,482✔
1086
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
2,709,482✔
1087

1088
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
2,709,482✔
1089
  pScan->groupid = groupid;
2,709,482✔
1090
  if (pUidList) {
2,709,482✔
1091
    pScan->pUidList = taosArrayDup(pUidList, NULL);
2,709,482✔
1092
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
2,709,482✔
1093
  } else {
1094
    pScan->pUidList = NULL;
×
1095
  }
1096
  pScan->pOrgTbInfo = NULL;
2,709,482✔
1097

1098
  if (pBatchMap) {
2,709,482✔
1099
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
2,709,482✔
1100
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
2,709,482✔
1101
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
7,841,608✔
1102
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
5,132,126✔
1103
      SOrgTbInfo batchInfo = {0};
5,132,126✔
1104
      batchInfo.vgId = pSrcInfo->vgId;
5,132,126✔
1105
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
5,132,126✔
1106
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
5,132,126✔
1107
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
5,132,126✔
1108
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
5,132,126✔
1109
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
5,132,126✔
1110
    }
1111
  } else {
1112
    pScan->pBatchTbInfo = NULL;
×
1113
  }
1114

1115
  if (pTagList) {
2,709,482✔
1116
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
879,844✔
1117
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
879,844✔
1118

1119
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
5,451,504✔
1120
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
4,571,660✔
1121
      STagVal  dstTag;
4,571,660✔
1122
      dstTag.type = pSrcTag->type;
4,571,660✔
1123
      dstTag.cid = pSrcTag->cid;
4,571,660✔
1124
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,571,660✔
1125
        dstTag.nData = pSrcTag->nData;
1,984,764✔
1126
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
1,984,764✔
1127
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
1,984,764✔
1128
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
1,984,764✔
1129
      } else {
1130
        dstTag.i64 = pSrcTag->i64;
2,586,896✔
1131
      }
1132

1133
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
9,143,320✔
1134
    }
1135
  } else {
1136
    pScan->pTagList = NULL;
1,829,638✔
1137
  }
1138

1139

1140
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
2,709,482✔
1141
  pScan->tableSeq = tableSeq;
2,709,482✔
1142
  pScan->window.skey = window->skey;
2,709,482✔
1143
  pScan->window.ekey = window->ekey;
2,709,482✔
1144
  pScan->isNewParam = isNewParam;
2,709,482✔
1145
  pScan->notifyToProcess = false;
2,709,482✔
1146
  pScan->notifyTs = 0;
2,709,482✔
1147
  (*ppRes)->opType = srcOpType;
2,709,482✔
1148
  (*ppRes)->downstreamIdx = 0;
2,709,482✔
1149
  (*ppRes)->value = pScan;
2,709,482✔
1150
  (*ppRes)->pChildren = NULL;
2,709,482✔
1151
  (*ppRes)->reUse = false;
2,709,482✔
1152

1153
  return code;
2,709,482✔
1154
_return:
×
1155
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1156
  taosMemoryFreeClear(*ppRes);
×
1157
  if (pScan) {
×
1158
    taosArrayDestroy(pScan->pUidList);
×
1159
    if (pScan->pBatchTbInfo) {
×
1160
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1161
    }
1162
    taosMemoryFree(pScan);
×
1163
  }
1164
  return code;
×
1165
}
1166

1167
/*
1168
 * Build hash-agg operator get-param for dynamic virtual-table aggregation.
1169
 *
1170
 * @param ppRes Output operator param.
1171
 * @param groupid Group id bound to this batch.
1172
 * @param pUidList Source table uid list.
1173
 * @param pBatchMap Batch table metadata map.
1174
 * @param pTagList Optional tag value list.
1175
 * @param tableSeq Whether to keep table-sequential mode.
1176
 * @param window Time window for downstream scan.
1177
 * @param isNewParam Whether downstream should treat this as a new param batch.
1178
 *
1179
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1180
 */
1181
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
1,754,604✔
1182
                              SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1183
  int32_t                  code = TSDB_CODE_SUCCESS;
1,754,604✔
1184
  int32_t                  lino = 0;
1,754,604✔
1185
  SOperatorParam*          pParam = NULL;
1,754,604✔
1186

1187
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
1,754,604✔
1188
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
1,754,604✔
1189

1190
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
1,754,604✔
1191
  pParam->downstreamIdx = 0;
1,754,604✔
1192
  pParam->value = NULL;
1,754,604✔
1193
  pParam->pChildren = NULL;
1,754,604✔
1194
  pParam->reUse = false;
1,754,604✔
1195

1196
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
1,754,604✔
1197
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
1,754,604✔
1198

1199
  SOperatorParam* pTableScanParam = NULL;
1,754,604✔
1200
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
1,754,604✔
1201
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1202
  QUERY_CHECK_CODE(code, lino, _return);
1,754,604✔
1203

1204
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
3,509,208✔
1205

1206
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
1,754,604✔
1207
  pParam->downstreamIdx = 0;
1,754,604✔
1208
  pParam->value = NULL;
1,754,604✔
1209
  pParam->reUse = false;
1,754,604✔
1210

1211
  *ppRes = pParam;
1,754,604✔
1212
  return code;
1,754,604✔
1213

1214
_return:
×
1215
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1216
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1217
  return code;
×
1218
}
1219

1220
/*
1221
 * Build hash-interval operator get-param for dynamic virtual-table interval query.
1222
 *
1223
 * @param ppRes Output operator param.
1224
 * @param groupid Group id bound to this batch.
1225
 * @param pUidList Source table uid list.
1226
 * @param pBatchMap Batch table metadata map.
1227
 * @param pTagList Optional tag value list.
1228
 * @param tableSeq Whether to keep table-sequential mode.
1229
 * @param window Time window for downstream scan.
1230
 * @param isNewParam Whether downstream should treat this as a new param batch.
1231
 *
1232
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1233
 */
1234
static int32_t buildIntervalOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
69,822✔
1235
                                          SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1236
  int32_t         code = TSDB_CODE_SUCCESS;
69,822✔
1237
  int32_t         lino = 0;
69,822✔
1238
  SOperatorParam* pParam = NULL;
69,822✔
1239

1240
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
69,822✔
1241
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
69,822✔
1242

1243
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
69,822✔
1244
  pParam->downstreamIdx = 0;
69,822✔
1245
  pParam->value = NULL;
69,822✔
1246
  pParam->reUse = false;
69,822✔
1247
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
69,822✔
1248
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
69,822✔
1249

1250
  SOperatorParam* pTableScanParam = NULL;
69,822✔
1251
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
69,822✔
1252
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1253
  QUERY_CHECK_CODE(code, lino, _return);
69,822✔
1254

1255
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
139,644✔
1256

1257
  *ppRes = pParam;
69,822✔
1258
  return code;
69,822✔
1259

1260
_return:
×
1261
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1262
  qError("%s failed at %d, failed to build interval scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1263
  return code;
×
1264
}
1265

1266
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
3,206,755✔
1267
  int32_t                  code = TSDB_CODE_SUCCESS;
3,206,755✔
1268
  int32_t                  lino = 0;
3,206,755✔
1269
  STagScanOperatorParam*   pScan = NULL;
3,206,755✔
1270

1271
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
3,206,755✔
1272
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,206,755✔
1273

1274
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
3,206,755✔
1275
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,206,755✔
1276
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
3,206,755✔
1277

1278
  (*ppRes)->opType = srcOpType;
3,206,755✔
1279
  (*ppRes)->downstreamIdx = 0;
3,206,755✔
1280
  (*ppRes)->value = pScan;
3,206,755✔
1281
  (*ppRes)->pChildren = NULL;
3,206,755✔
1282
  (*ppRes)->reUse = false;
3,206,755✔
1283

1284
  return code;
3,206,755✔
1285
_return:
×
1286
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1287
  taosMemoryFreeClear(*ppRes);
×
1288
  if (pScan) {
×
1289
    taosMemoryFree(pScan);
×
1290
  }
1291
  return code;
×
1292
}
1293

1294
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,992,301✔
1295
  if (!pRuntimeInfo || !pTimeRange) {
4,992,301✔
1296
    return TSDB_CODE_INTERNAL_ERROR;
×
1297
  }
1298

1299
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,992,301✔
1300
  if (!pParam) {
4,992,301✔
1301
    return TSDB_CODE_INTERNAL_ERROR;
×
1302
  }
1303

1304
  switch (pRuntimeInfo->triggerType) {
4,992,301✔
1305
    case STREAM_TRIGGER_SLIDING:
3,425,221✔
1306
      // Unable to distinguish whether there is an interval, all use wstart/wend
1307
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1308
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,425,221✔
1309
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,425,221✔
1310
      break;
3,425,221✔
1311
    case STREAM_TRIGGER_PERIOD:
519,916✔
1312
      pTimeRange->skey = pParam->prevLocalTime;
519,916✔
1313
      pTimeRange->ekey = pParam->triggerTime;
520,715✔
1314
      break;
519,916✔
1315
    default:
1,046,365✔
1316
      pTimeRange->skey = pParam->wstart;
1,046,365✔
1317
      pTimeRange->ekey = pParam->wend;
1,046,365✔
1318
      break;
1,047,164✔
1319
  }
1320

1321
  return TSDB_CODE_SUCCESS;
4,992,301✔
1322
}
1323

1324
void clearVtbScanDataInfo(void* pItem) {
51,908,156✔
1325
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
51,908,156✔
1326
  if (pInfo->orgTbInfo) {
51,908,156✔
1327
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
39,046,979✔
1328
    taosMemoryFreeClear(pInfo->orgTbInfo);
39,046,979✔
1329
  }
1330
  if (pInfo->batchOrgTbInfo) {
51,908,156✔
1331
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
7,841,608✔
1332
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
5,132,126✔
1333
      if (pColMap) {
5,132,126✔
1334
        taosArrayDestroy(pColMap->colMap);
5,132,126✔
1335
      }
1336
    }
1337
    taosArrayDestroy(pInfo->batchOrgTbInfo);
2,709,482✔
1338
    pInfo->batchOrgTbInfo = NULL;
2,709,482✔
1339
  }
1340
  if (pInfo->tagList) {
51,908,156✔
1341
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
879,844✔
1342
    pInfo->tagList = NULL;
879,844✔
1343
  }
1344
  if (pInfo->pSrcUidList) {
51,908,156✔
1345
    taosArrayDestroy(pInfo->pSrcUidList);
41,756,461✔
1346
    pInfo->pSrcUidList = NULL;
41,756,461✔
1347
  }
1348
}
51,908,156✔
1349

1350
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
258,814,301✔
1351
  int32_t          code = TSDB_CODE_SUCCESS;
258,814,301✔
1352
  int32_t          lino = 0;
258,814,301✔
1353
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
258,814,301✔
1354
  QUERY_CHECK_NULL(pDataInfo, code, lino, _end, terrno);
258,813,212✔
1355

1356
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
258,813,212✔
1357
    return TSDB_CODE_SUCCESS;
7,233,852✔
1358
  }
1359

1360
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
251,579,996✔
1361
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
251,581,242✔
1362
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
251,580,449✔
1363

1364
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
251,580,449✔
1365

1366
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
251,580,110✔
1367
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
251,579,502✔
1368
  pWrapper->exchangeId = pExchangeInfo->self;
251,579,502✔
1369
  pWrapper->sourceIndex = sourceIndex;
251,579,736✔
1370
  pWrapper->seqId = pExchangeInfo->seqId;
251,582,623✔
1371

1372
  if (pSource->localExec) {
251,577,518✔
1373
    SDataBuf pBuf = {0};
×
1374
    code =
1375
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1376
                                  pTaskInfo->id.queryId, pSource->clientId,
1377
                                  pSource->taskId, 0, pSource->execId,
1378
                                  &pBuf.pData,
1379
                                  pTaskInfo->localFetch.explainRes);
1380
    QUERY_CHECK_CODE(code, lino, _end);
×
1381
    pDataInfo->startTime = taosGetTimestampUs();
×
1382
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1383
    taosMemoryFreeClear(pWrapper);
×
1384
    QUERY_CHECK_CODE(code, lino, _end);
×
1385
  } else {
1386
    bool needStreamRtInfo = true;
251,579,076✔
1387
    bool needStreamGrpInfo = false;
251,579,076✔
1388
    SResFetchReq req = {0};
251,579,076✔
1389
    req.header.vgId = pSource->addr.nodeId;
251,579,076✔
1390
    req.sId = pSource->sId;
251,577,996✔
1391
    req.clientId = pSource->clientId;
251,577,593✔
1392
    req.taskId = pSource->taskId;
251,579,076✔
1393
    req.queryId = pTaskInfo->id.queryId;
251,578,824✔
1394
    req.execId = pSource->execId;
251,577,552✔
1395
    if (pTaskInfo->pStreamRuntimeInfo) {
251,576,331✔
1396
      req.dynTbname = pExchangeInfo->dynTbname;
13,328,012✔
1397
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
13,328,012✔
1398
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
13,327,757✔
1399

1400
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
13,328,012✔
1401
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
811,012✔
1402
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
12,515,946✔
1403
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,992,301✔
1404
        QUERY_CHECK_CODE(code, lino, _end);
4,992,301✔
1405
        needStreamRtInfo = false;
4,992,301✔
1406
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,992,301✔
1407
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1408
               req.pStRtFuncInfo->curWindow.ekey);
1409
      } else {
1410
        needStreamGrpInfo = true;
7,524,699✔
1411
      }
1412
      
1413
      if (!pDataInfo->fetchSent) {
13,328,811✔
1414
        req.reset = pDataInfo->fetchSent = true;
8,951,544✔
1415
      }
1416
    }
1417

1418
    switch (pDataInfo->type) {
251,574,254✔
1419
      case EX_SRC_TYPE_VSTB_SCAN: {
39,046,979✔
1420
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
39,046,979✔
1421
        clearVtbScanDataInfo(pDataInfo);
39,046,979✔
1422
        QUERY_CHECK_CODE(code, lino, _end);
39,046,979✔
1423
        break;
39,046,979✔
1424
      }
1425
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
1,777,104✔
1426
        if (pDataInfo->pSrcUidList) {
1,777,104✔
1427
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
898,884✔
1428
          taosArrayDestroy(pDataInfo->pSrcUidList);
898,884✔
1429
          pDataInfo->pSrcUidList = NULL;
898,884✔
1430
          QUERY_CHECK_CODE(code, lino, _end);
898,884✔
1431
        }
1432
        break;
1,777,104✔
1433
      }
1434
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
3,206,755✔
1435
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
3,206,755✔
1436
        taosArrayDestroy(pDataInfo->pSrcUidList);
3,206,755✔
1437
        pDataInfo->pSrcUidList = NULL;
3,206,755✔
1438
        QUERY_CHECK_CODE(code, lino, _end);
3,206,755✔
1439
        break;
3,206,755✔
1440
      }
1441
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
1,980,564✔
1442
      case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
1443
      case EX_SRC_TYPE_VSTB_TS_SCAN: {
1444
        if (pDataInfo->batchOrgTbInfo) {
1,980,564✔
1445
          int32_t srcOpType =
885,056✔
1446
              (pDataInfo->type == EX_SRC_TYPE_VSTB_TS_SCAN)
885,056✔
1447
                  ? QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
1448
                  : QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
885,056✔
1449
          code = buildTableScanOperatorParamBatchInfo(
1,770,112✔
1450
              &req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, srcOpType,
1451
              pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window,
885,056✔
1452
              pDataInfo->isNewParam);
885,056✔
1453
          clearVtbScanDataInfo(pDataInfo);
885,056✔
1454
          QUERY_CHECK_CODE(code, lino, _end);
885,056✔
1455
        }
1456
        break;
1,980,564✔
1457
      }
1458
      case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN: {
69,822✔
1459
        if (pDataInfo->batchOrgTbInfo) {
69,822✔
1460
          code = buildIntervalOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
139,644✔
1461
                                            pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq,
69,822✔
1462
                                            &pDataInfo->window, pDataInfo->isNewParam);
69,822✔
1463
          clearVtbScanDataInfo(pDataInfo);
69,822✔
1464
          QUERY_CHECK_CODE(code, lino, _end);
69,822✔
1465
        }
1466
        break;
69,822✔
1467
      }
1468
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1,754,604✔
1469
        if (pDataInfo->batchOrgTbInfo) {
1,754,604✔
1470
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
3,509,208✔
1471
                                       pDataInfo->batchOrgTbInfo, pDataInfo->tagList,
1472
                                       pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
3,509,208✔
1473
          clearVtbScanDataInfo(pDataInfo);
1,754,604✔
1474
          QUERY_CHECK_CODE(code, lino, _end);
1,754,604✔
1475
        }
1476
        break;
1,754,604✔
1477
      }
1478
      case EX_SRC_TYPE_STB_JOIN_SCAN:
203,739,730✔
1479
      default: {
1480
        if (pDataInfo->pSrcUidList) {
203,739,730✔
1481
          code = buildTableScanOperatorParam(&req.pOpParam,
284,035✔
1482
                                             pDataInfo->pSrcUidList,
1483
                                             pDataInfo->srcOpType,
1484
                                             pDataInfo->tableSeq);
284,035✔
1485
          /* source uid list can be reused in vnode size, so only use once */
1486
          taosArrayDestroy(pDataInfo->pSrcUidList);
284,035✔
1487
          pDataInfo->pSrcUidList = NULL;
284,035✔
1488
          QUERY_CHECK_CODE(code, lino, _end);
284,035✔
1489
        }
1490
        if (pExchangeInfo->notifyToSend) {
203,743,112✔
1491
          if (NULL == req.pOpParam) {
243,418✔
1492
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
243,418✔
1493
                                                     pDataInfo->srcOpType,
1494
                                                     pExchangeInfo->notifyTs);
1495
            QUERY_CHECK_CODE(code, lino, _end);
243,418✔
1496
          } else {
1497
            /**
1498
              Currently don't support use the same param for multiple times!
1499
            */
1500
            qError("%s, %s failed, currently don't support use the same param "
×
1501
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
1502
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
1503
            taosMemoryFree(pWrapper);
×
1504
            return pTaskInfo->code;
×
1505
          }
1506
          pExchangeInfo->notifyToSend = false;
243,418✔
1507
        }
1508
        break;
203,742,131✔
1509
      }
1510
    }
1511

1512
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamRtInfo, needStreamGrpInfo);
251,577,959✔
1513
    if (msgSize < 0) {
251,578,404✔
1514
      pTaskInfo->code = msgSize;
×
1515
      taosMemoryFree(pWrapper);
×
1516
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1517
      return pTaskInfo->code;
×
1518
    }
1519

1520
    void* msg = taosMemoryCalloc(1, msgSize);
251,578,404✔
1521
    if (NULL == msg) {
251,572,902✔
1522
      pTaskInfo->code = terrno;
×
1523
      taosMemoryFree(pWrapper);
×
1524
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1525
      return pTaskInfo->code;
×
1526
    }
1527

1528
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamRtInfo, needStreamGrpInfo);
251,572,902✔
1529
    if (msgSize < 0) {
251,577,075✔
1530
      pTaskInfo->code = msgSize;
×
1531
      taosMemoryFree(pWrapper);
×
1532
      taosMemoryFree(msg);
×
1533
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1534
      return pTaskInfo->code;
×
1535
    }
1536

1537
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
251,577,075✔
1538

1539
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
251,576,202✔
1540
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1541
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1542
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1543

1544
    // send the fetch remote task result reques
1545
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
251,580,862✔
1546
    if (NULL == pMsgSendInfo) {
251,577,691✔
1547
      taosMemoryFreeClear(msg);
×
1548
      taosMemoryFree(pWrapper);
×
1549
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1550
      pTaskInfo->code = terrno;
×
1551
      return pTaskInfo->code;
×
1552
    }
1553

1554
    pMsgSendInfo->param = pWrapper;
251,577,691✔
1555
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
251,580,854✔
1556
    pMsgSendInfo->msgInfo.pData = msg;
251,579,470✔
1557
    pMsgSendInfo->msgInfo.len = msgSize;
251,580,050✔
1558
    pMsgSendInfo->msgType = pSource->fetchMsgType;
251,579,623✔
1559
    pMsgSendInfo->fp = loadRemoteDataCallback;
251,581,660✔
1560
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
251,580,703✔
1561

1562
    int64_t transporterId = 0;
251,580,201✔
1563
    void* poolHandle = NULL;
251,579,223✔
1564
    pDataInfo->startTime = taosGetTimestampUs();
251,581,840✔
1565
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
251,580,019✔
1566
    QUERY_CHECK_CODE(code, lino, _end);
251,581,290✔
1567
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
251,581,290✔
1568
    *pRpcHandle = transporterId;
251,583,182✔
1569
  }
1570

1571
_end:
251,582,255✔
1572
  if (code != TSDB_CODE_SUCCESS) {
251,582,255✔
1573
    if (pWrapper) {
×
1574
      taosMemoryFree(pWrapper);
×
1575
    }
1576
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1577
  }
1578
  return code;
251,582,784✔
1579
}
1580

1581
/**
1582
  @brief record the data loading metrics of the exchange operator, including
1583
  the number of rows, the data length, and the elapsed time of current load operation.
1584
*/
1585
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows,
189,130,338✔
1586
                          int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator) {
1587
  pInfo->totalRows += numOfRows;
189,130,338✔
1588
  pInfo->totalSize += dataLen;
189,130,338✔
1589
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
189,129,779✔
1590
}
189,130,338✔
1591

1592
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan) {
567,849,337✔
1593
  int32_t      code = TSDB_CODE_SUCCESS;
567,849,337✔
1594
  int32_t      lino = 0;
567,849,337✔
1595
  SSDataBlock* pBlock = NULL;
567,849,337✔
1596
  if (isVstbScan) {
567,849,363✔
1597
    blockDataCleanup(pRes);
27,461,361✔
1598
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
27,461,361✔
1599
    QUERY_CHECK_CODE(code, lino, _end);
27,461,211✔
1600
  }
1601
  if (pColList == NULL) {  // data from other sources
567,849,213✔
1602
    blockDataCleanup(pRes);
562,144,515✔
1603
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
562,144,519✔
1604
    QUERY_CHECK_CODE(code, lino, _end);
562,146,301✔
1605
  } else {  // extract data according to pColList
1606
    char* pStart = pData;
5,704,698✔
1607

1608
    int32_t numOfCols = htonl(*(int32_t*)pStart);
5,704,698✔
1609
    pStart += sizeof(int32_t);
5,704,698✔
1610

1611
    // todo refactor:extract method
1612
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
5,704,698✔
1613
    for (int32_t i = 0; i < numOfCols; ++i) {
81,161,016✔
1614
      SSysTableSchema* p = (SSysTableSchema*)pStart;
75,456,318✔
1615

1616
      p->colId = htons(p->colId);
75,456,318✔
1617
      p->bytes = htonl(p->bytes);
75,456,318✔
1618
      pStart += sizeof(SSysTableSchema);
75,456,318✔
1619
    }
1620

1621
    pBlock = NULL;
5,704,698✔
1622
    code = createDataBlock(&pBlock);
5,704,698✔
1623
    QUERY_CHECK_CODE(code, lino, _end);
5,704,698✔
1624

1625
    for (int32_t i = 0; i < numOfCols; ++i) {
81,161,016✔
1626
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
75,456,318✔
1627
      code = blockDataAppendColInfo(pBlock, &idata);
75,456,318✔
1628
      QUERY_CHECK_CODE(code, lino, _end);
75,456,318✔
1629
    }
1630

1631
    code = blockDecodeInternal(pBlock, pStart, NULL);
5,704,698✔
1632
    QUERY_CHECK_CODE(code, lino, _end);
5,704,698✔
1633

1634
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
5,704,698✔
1635
    QUERY_CHECK_CODE(code, lino, _end);
5,704,698✔
1636

1637
    // data from mnode
1638
    pRes->info.dataLoad = 1;
5,704,698✔
1639
    pRes->info.rows = pBlock->info.rows;
5,704,698✔
1640
    pRes->info.scanFlag = pBlock->info.scanFlag;
5,704,698✔
1641
    pRes->info.id.groupId = pBlock->info.id.groupId;
5,704,698✔
1642
    pRes->info.id.baseGId = pBlock->info.id.baseGId;
5,704,698✔
1643
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
5,704,698✔
1644
    QUERY_CHECK_CODE(code, lino, _end);
5,704,698✔
1645

1646
    blockDataDestroy(pBlock);
5,704,698✔
1647
    pBlock = NULL;
5,704,698✔
1648
  }
1649

1650
_end:
567,850,999✔
1651
  if (code != TSDB_CODE_SUCCESS) {
567,850,729✔
1652
    blockDataDestroy(pBlock);
×
1653
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1654
  }
1655
  return code;
567,850,729✔
1656
}
1657

1658
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
98,841,179✔
1659
  SExchangeInfo* pExchangeInfo = pOperator->info;
98,841,179✔
1660
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
98,841,179✔
1661

1662
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
98,841,179✔
1663
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
98,841,179✔
1664
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
98,841,179✔
1665
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1666
         pLoadInfo->totalElapsed / 1000.0);
1667

1668
  setOperatorCompleted(pOperator);
98,841,179✔
1669
}
98,841,179✔
1670

1671
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
322,548,130✔
1672
  int32_t code = TSDB_CODE_SUCCESS;
322,548,130✔
1673
  int32_t lino = 0;
322,548,130✔
1674
  size_t  total = taosArrayGetSize(pArray);
322,548,130✔
1675

1676
  int32_t completed = 0;
322,547,598✔
1677
  for (int32_t k = 0; k < total; ++k) {
1,026,823,136✔
1678
    SSourceDataInfo* p = taosArrayGet(pArray, k);
704,275,228✔
1679
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
704,274,973✔
1680
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
704,274,973✔
1681
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
339,028,119✔
1682
      completed += 1;
339,028,119✔
1683
    }
1684
  }
1685

1686
  *pRes = completed;
322,547,908✔
1687
_end:
322,547,628✔
1688
  if (code != TSDB_CODE_SUCCESS) {
322,547,628✔
1689
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1690
  }
1691
  return code;
322,548,130✔
1692
}
1693

1694
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
116,338,551✔
1695
  SExchangeInfo* pExchangeInfo = pOperator->info;
116,338,551✔
1696
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
116,340,072✔
1697

1698
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
116,337,788✔
1699
  int64_t startTs = taosGetTimestampUs();
116,337,246✔
1700

1701
  // Asynchronously send all fetch requests to all sources.
1702
  for (int32_t i = 0; i < totalSources; ++i) {
305,940,912✔
1703
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
189,601,586✔
1704
    if (code != TSDB_CODE_SUCCESS) {
189,603,666✔
UNCOV
1705
      pTaskInfo->code = code;
×
1706
      return code;
×
1707
    }
1708
  }
1709

1710
  int64_t endTs = taosGetTimestampUs();
116,341,599✔
1711
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
116,341,599✔
1712
         totalSources, (endTs - startTs) / 1000.0);
1713

1714
  pOperator->status = OP_RES_TO_RETURN;
116,341,599✔
1715
  if (isTaskKilled(pTaskInfo)) {
116,341,201✔
1716
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1717
  }
1718

1719
  return TSDB_CODE_SUCCESS;
116,341,599✔
1720
}
1721

1722
/**
1723
  @brief store STEP DONE notification info
1724
*/
1725
void storeNotifyInfo(SOperatorInfo* pOperator) {
4,464,492✔
1726
  SExchangeInfo*  pExchangeInfo = pOperator->info;
4,464,492✔
1727
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4,464,492✔
1728
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,464,492✔
1729

1730
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
4,464,492✔
1731
  if (!pParam->multiParams) {
4,464,492✔
1732
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
4,464,492✔
1733
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
4,464,492✔
1734
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1735
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
1736
      return;
×
1737
    }
1738

1739
    pExchangeInfo->notifyToSend = true;
4,464,492✔
1740
    pExchangeInfo->notifyTs = pBasic->notifyTs;
4,464,492✔
1741
  } else {
1742
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1743
           GET_TASKID(pTaskInfo), __func__);
1744
  }
1745
}
1746

1747
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
183,425,640✔
1748
  int32_t            code = TSDB_CODE_SUCCESS;
183,425,640✔
1749
  int32_t            lino = 0;
183,425,640✔
1750
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
183,425,640✔
1751
  SSDataBlock*       pb = NULL;
183,425,640✔
1752

1753
  char* pNextStart = pRetrieveRsp->data;
183,425,640✔
1754
  char* pStart = pNextStart;
183,425,070✔
1755

1756
  int32_t index = 0;
183,425,070✔
1757

1758
  if (pRetrieveRsp->compressed) {  // decompress the data
183,425,070✔
1759
    if (pDataInfo->decompBuf == NULL) {
×
1760
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1761
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1762
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1763
    } else {
1764
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1765
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1766
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1767
        if (p != NULL) {
×
1768
          pDataInfo->decompBuf = p;
×
1769
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1770
        }
1771
      }
1772
    }
1773
  }
1774

1775
  while (index++ < pRetrieveRsp->numOfBlocks) {
745,571,941✔
1776
    pStart = pNextStart;
562,145,731✔
1777

1778
    if (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN) {
562,145,731✔
1779
      pb = taosMemoryCalloc(1, sizeof(SSDataBlock));
27,461,609✔
1780
      QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
27,461,361✔
1781
    } else if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
534,684,692✔
1782
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
226,415,197✔
1783
      blockDataCleanup(pb);
226,415,197✔
1784
    } else {
1785
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
308,268,966✔
1786
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
308,269,097✔
1787
    }
1788

1789
    int32_t compLen = *(int32_t*)pStart;
562,145,560✔
1790
    pStart += sizeof(int32_t);
562,145,568✔
1791

1792
    int32_t rawLen = *(int32_t*)pStart;
562,145,712✔
1793
    pStart += sizeof(int32_t);
562,145,712✔
1794
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
562,145,554✔
1795

1796
    pNextStart = pStart + compLen;
562,145,554✔
1797
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
562,145,395✔
1798
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1799
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1800
      pStart = pDataInfo->decompBuf;
×
1801
    }
1802

1803
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart, (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN));
562,145,793✔
1804
    if (code != 0) {
562,144,373✔
1805
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1806
      goto _end;
×
1807
    }
1808

1809
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
562,144,373✔
1810
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
562,145,859✔
1811
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
562,145,859✔
1812
    pb = NULL;
562,146,301✔
1813
  }
1814

1815
_end:
183,425,640✔
1816
  if (code != TSDB_CODE_SUCCESS) {
183,425,640✔
1817
    blockDataDestroy(pb);
×
1818
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1819
  }
1820
  return code;
183,425,640✔
1821
}
1822

1823
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
42,502,866✔
1824
  SExchangeInfo* pExchangeInfo = pOperator->info;
42,502,866✔
1825
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
42,502,866✔
1826

1827
  int32_t code = 0;
42,502,866✔
1828
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
42,502,866✔
1829
  int64_t startTs = taosGetTimestampUs();
42,502,866✔
1830

1831
  int32_t vgId = 0;
42,502,866✔
1832
  if (pExchangeInfo->dynTbname) {
42,502,866✔
1833
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
246,144✔
1834
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
246,144✔
1835
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
246,144✔
1836
      if (pValue != NULL && pValue->isTbname) {
246,144✔
1837
        vgId = pValue->vgId;
246,144✔
1838
        break;
246,144✔
1839
      }
1840
    }
1841
  }
1842

1843
  while (1) {
11,651,567✔
1844
    if (pExchangeInfo->current >= totalSources) {
54,154,433✔
1845
      setAllSourcesCompleted(pOperator);
11,565,411✔
1846
      return TSDB_CODE_SUCCESS;
11,565,411✔
1847
    }
1848

1849
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
42,589,022✔
1850
    if (!pSource) {
42,589,022✔
1851
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1852
      pTaskInfo->code = terrno;
×
1853
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1854
    }
1855

1856
    if (vgId != 0 && pSource->addr.nodeId != vgId){
42,589,022✔
1857
      pExchangeInfo->current += 1;
183,971✔
1858
      continue;
183,971✔
1859
    }
1860

1861
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
42,405,051✔
1862
    if (!pDataInfo) {
42,405,051✔
1863
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1864
      pTaskInfo->code = terrno;
×
1865
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1866
    }
1867
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
42,405,051✔
1868

1869
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
42,405,051✔
1870
    if (code != TSDB_CODE_SUCCESS) {
42,405,051✔
1871
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1872
      pTaskInfo->code = code;
×
1873
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1874
    }
1875

UNCOV
1876
    while (true) {
×
1877
      recordOpExecBeforeDownstream(pOperator);
42,405,051✔
1878
      code = exchangeWait(pOperator, pExchangeInfo);
42,405,051✔
1879
      recordOpExecAfterDownstream(pOperator, 0);
42,405,051✔
1880

1881
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
42,405,051✔
UNCOV
1882
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1883
      }
1884

1885
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
42,405,051✔
1886
      if (pDataInfo->seqId != currSeqId) {
42,405,051✔
UNCOV
1887
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
UNCOV
1888
        taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
1889
        continue;
×
1890
      }
1891

1892
      break;
42,405,051✔
1893
    }
1894

1895
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
42,405,051✔
1896
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
1,355✔
1897
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1898
             tstrerror(pDataInfo->code));
1899
      pOperator->pTaskInfo->code = pDataInfo->code;
1,355✔
1900
      return pOperator->pTaskInfo->code;
1,355✔
1901
    }
1902

1903
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
42,403,696✔
1904
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
42,403,696✔
1905

1906
    if (pRsp->numOfRows == 0) {
42,403,696✔
1907
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
11,467,596✔
1908
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1909
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1910
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1911

1912
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
11,467,596✔
1913
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
11,467,596✔
1914
        pExchangeInfo->current = totalSources;
11,373,278✔
1915
      } else {
1916
        pExchangeInfo->current += 1;
94,318✔
1917
      }
1918
      taosMemoryFreeClear(pDataInfo->pRsp);
11,467,596✔
1919
      continue;
11,467,596✔
1920
    }
1921

1922
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
30,936,100✔
1923
    if (code != TSDB_CODE_SUCCESS) {
30,936,100✔
1924
      goto _error;
×
1925
    }
1926

1927
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
30,936,100✔
1928
    if (pRsp->completed == 1) {
30,936,100✔
1929
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
203,485✔
1930
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1931
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1932
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1933
             pExchangeInfo->current + 1, totalSources);
1934

1935
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
203,485✔
1936
      if (isVstbScan(pDataInfo)) {
203,485✔
1937
        pExchangeInfo->current = totalSources;
×
1938
      } else {
1939
        pExchangeInfo->current += 1;
203,485✔
1940
      }
1941
    } else {
1942
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
30,732,615✔
1943
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1944
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1945
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1946
    }
1947
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
30,936,100✔
1948
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
30,595,055✔
1949
    }
1950
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
30,936,100✔
1951
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
30,936,100✔
1952

1953
    taosMemoryFreeClear(pDataInfo->pRsp);
30,936,100✔
1954
    return TSDB_CODE_SUCCESS;
30,936,100✔
1955
  }
1956

1957
_error:
×
1958
  pTaskInfo->code = code;
×
1959
  return code;
×
1960
}
1961

1962
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
2,709,482✔
1963
  int32_t  code = TSDB_CODE_SUCCESS;
2,709,482✔
1964
  int32_t  lino = 0;
2,709,482✔
1965
  STagVal  dstTag;
2,709,482✔
1966
  bool     needFree = false;
2,709,482✔
1967

1968
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
2,709,482✔
1969
    qError("%s failed since invalid exchange operator param type %d",
×
1970
      __func__, pBasicParam->paramType);
1971
    return TSDB_CODE_INVALID_PARA;
×
1972
  }
1973

1974
  if (pDataInfo->tagList) {
2,709,482✔
1975
    taosArrayClear(pDataInfo->tagList);
×
1976
  }
1977

1978
  if (pBasicParam->tagList) {
2,709,482✔
1979
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
879,844✔
1980
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
879,844✔
1981

1982
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
5,451,504✔
1983
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
4,571,660✔
1984
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
4,571,660✔
1985

1986
      dstTag = (STagVal){0};
4,571,660✔
1987
      dstTag.type = pSrcTag->type;
4,571,660✔
1988
      dstTag.cid = pSrcTag->cid;
4,571,660✔
1989
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,571,660✔
1990
        dstTag.nData = pSrcTag->nData;
1,984,764✔
1991
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
1,984,764✔
1992
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
1,984,764✔
1993
        needFree = true;
1,984,764✔
1994
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
1,984,764✔
1995
      } else {
1996
        dstTag.i64 = pSrcTag->i64;
2,586,896✔
1997
      }
1998

1999
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
9,143,320✔
2000
      needFree = false;
4,571,660✔
2001
    }
2002
  } else {
2003
    pDataInfo->tagList = NULL;
1,829,638✔
2004
  }
2005

2006
  return code;
2,709,482✔
2007
_return:
×
2008
  if (needFree) {
×
2009
    taosMemoryFreeClear(dstTag.pData);
×
2010
  }
2011
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2012
  return code;
×
2013
}
2014

2015
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
2,709,482✔
2016
  int32_t     code = TSDB_CODE_SUCCESS;
2,709,482✔
2017
  int32_t     lino = 0;
2,709,482✔
2018
  SOrgTbInfo  dstOrgTbInfo = {0};
2,709,482✔
2019
  bool        needFree = false;
2,709,482✔
2020

2021
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
2,709,482✔
2022
    qError("%s failed since invalid exchange operator param type %d",
×
2023
      __func__, pBasicParam->paramType);
2024
    return TSDB_CODE_INVALID_PARA;
×
2025
  }
2026

2027
  if (pBasicParam->batchOrgTbInfo) {
2,709,482✔
2028
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
2,709,482✔
2029
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
2,709,482✔
2030

2031
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
7,841,608✔
2032
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
5,132,126✔
2033
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
5,132,126✔
2034

2035
      dstOrgTbInfo = (SOrgTbInfo){0};
5,132,126✔
2036
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
5,132,126✔
2037
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
5,132,126✔
2038

2039
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
5,132,126✔
2040
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
5,132,126✔
2041

2042
      needFree = true;
5,132,126✔
2043
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
10,264,252✔
2044
      needFree = false;
5,132,126✔
2045
    }
2046
  } else {
2047
    pBasicParam->batchOrgTbInfo = NULL;
×
2048
  }
2049

2050
  return code;
2,709,482✔
2051
_return:
×
2052
  if (needFree) {
×
2053
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
2054
  }
2055
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2056
  return code;
×
2057
}
2058

2059
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
46,814,573✔
2060
                                SExchangeOperatorBasicParam* pBasicParam) {
2061
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
46,814,573✔
2062
    qWarn("%s, %s found invalid exchange operator param type %d",
×
2063
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
2064
    return TSDB_CODE_SUCCESS;
×
2065
  }
2066

2067
  int32_t            code = TSDB_CODE_SUCCESS;
46,814,573✔
2068
  int32_t            lino = 0;
46,814,573✔
2069
  SExchangeInfo*     pExchangeInfo = pOperator->info;
46,814,573✔
2070
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
46,814,573✔
2071

2072
  if (NULL == pIdx) {
46,814,573✔
2073
    if (pBasicParam->isNewDeployed) {
671,166✔
2074
      SDownstreamSourceNode *pNode = NULL;
2,728✔
2075
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,728✔
2076
      QUERY_CHECK_CODE(code, lino, _return);
2,728✔
2077

2078
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,728✔
2079
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,728✔
2080
      QUERY_CHECK_CODE(code, lino, _return);
2,728✔
2081

2082
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,728✔
2083
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,728✔
2084

2085
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,728✔
2086
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,728✔
2087
      if (pExchangeInfo->pHashSources) {
2,728✔
2088
        QUERY_CHECK_CODE(code, lino, _return);
2,728✔
2089
      }
2090
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,728✔
2091
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,728✔
2092
    } else if (pBasicParam->type == EX_SRC_TYPE_VSTB_TS_SCAN || pBasicParam->type == EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN) {
668,438✔
2093
      // Multi-exchange virtual table paths build each exchange param from the full vg map.
2094
      // If this exchange does not own the current vg source, skip it and let the matching exchange consume it.
2095
      qDebug("addSingleExchangeSource found no existing source for vgId: %d, sourceType:%d, skip it",
668,438✔
2096
             pBasicParam->vgId, pBasicParam->type);
2097
      return TSDB_CODE_SUCCESS;
668,438✔
2098
    } else {
2099
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
2100
      return TSDB_CODE_INVALID_PARA;
×
2101
    }
2102
  }
2103

2104
  qDebug("start to add single exchange source");
46,146,135✔
2105

2106
  switch (pBasicParam->type) {
46,146,135✔
2107
    case EX_SRC_TYPE_VSTB_TS_SCAN:
2,709,482✔
2108
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
2109
    case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
2110
    case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN:
2111
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
2112
      if (pIdx->inUseIdx < 0) {
2,709,482✔
2113
        SSourceDataInfo dataInfo = {0};
1,635,202✔
2114
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,635,202✔
2115
        dataInfo.taskId = pExchangeInfo->pTaskId;
1,635,202✔
2116
        dataInfo.index = pIdx->srcIdx;
1,635,202✔
2117
        dataInfo.groupid = pBasicParam->groupid;
1,635,202✔
2118
        dataInfo.window = pBasicParam->window;
1,635,202✔
2119
        dataInfo.isNewParam = pBasicParam->isNewParam;
1,635,202✔
2120
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
1,635,202✔
2121
        QUERY_CHECK_CODE(code, lino, _return);
1,635,202✔
2122

2123
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
1,635,202✔
2124
        QUERY_CHECK_CODE(code, lino, _return);
1,635,202✔
2125

2126
        dataInfo.orgTbInfo = NULL;
1,635,202✔
2127

2128
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,635,202✔
2129
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
1,635,202✔
2130

2131
        dataInfo.type = pBasicParam->type;
1,635,202✔
2132
        dataInfo.srcOpType = pBasicParam->srcOpType;
1,635,202✔
2133
        dataInfo.tableSeq = pBasicParam->tableSeq;
1,635,202✔
2134

2135
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
3,270,404✔
2136

2137
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,635,202✔
2138
      } else {
2139
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
1,074,280✔
2140
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
1,074,280✔
2141

2142
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1,074,280✔
2143
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,074,280✔
2144
        }
2145

2146
        pDataInfo->taskId = pExchangeInfo->pTaskId;
1,074,280✔
2147
        pDataInfo->index = pIdx->srcIdx;
1,074,280✔
2148
        pDataInfo->window = pBasicParam->window;
1,074,280✔
2149
        pDataInfo->groupid = pBasicParam->groupid;
1,074,280✔
2150
        pDataInfo->isNewParam = pBasicParam->isNewParam;
1,074,280✔
2151

2152
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
1,074,280✔
2153
        QUERY_CHECK_CODE(code, lino, _return);
1,074,280✔
2154

2155
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
1,074,280✔
2156
        QUERY_CHECK_CODE(code, lino, _return);
1,074,280✔
2157

2158
        pDataInfo->orgTbInfo = NULL;
1,074,280✔
2159

2160
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,074,280✔
2161
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
1,074,280✔
2162

2163
        pDataInfo->type = pBasicParam->type;
1,074,280✔
2164
        pDataInfo->srcOpType = pBasicParam->srcOpType;
1,074,280✔
2165
        pDataInfo->tableSeq = pBasicParam->tableSeq;
1,074,280✔
2166
      }
2167
      break;
2,709,482✔
2168
    }
2169
    case EX_SRC_TYPE_VTB_WIN_SCAN:
4,105,639✔
2170
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2171
      SSourceDataInfo dataInfo = {0};
4,105,639✔
2172
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
4,105,639✔
2173
      dataInfo.taskId = pExchangeInfo->pTaskId;
4,105,639✔
2174
      dataInfo.index = pIdx->srcIdx;
4,105,639✔
2175
      dataInfo.window = pBasicParam->window;
4,105,639✔
2176
      dataInfo.groupid = 0;
4,105,639✔
2177
      dataInfo.orgTbInfo = NULL;
4,105,639✔
2178
      dataInfo.tagList = NULL;
4,105,639✔
2179

2180
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
4,105,639✔
2181
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
4,105,639✔
2182

2183
      dataInfo.isNewParam = false;
4,105,639✔
2184
      dataInfo.type = pBasicParam->type;
4,105,639✔
2185
      dataInfo.srcOpType = pBasicParam->srcOpType;
4,105,639✔
2186
      dataInfo.tableSeq = pBasicParam->tableSeq;
4,105,639✔
2187

2188
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
4,105,639✔
2189
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
8,211,278✔
2190
      break;
4,105,639✔
2191
    }
2192
    case EX_SRC_TYPE_VSTB_SCAN: {
39,046,979✔
2193
      SSourceDataInfo dataInfo = {0};
39,046,979✔
2194
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
39,046,979✔
2195
      dataInfo.taskId = pExchangeInfo->pTaskId;
39,046,979✔
2196
      dataInfo.index = pIdx->srcIdx;
39,046,979✔
2197
      dataInfo.window = pBasicParam->window;
39,046,979✔
2198
      dataInfo.groupid = 0;
39,046,979✔
2199
      dataInfo.isNewParam = pBasicParam->isNewParam;
39,046,979✔
2200
      dataInfo.tagList = NULL;
39,046,979✔
2201
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
39,046,979✔
2202
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
39,046,979✔
2203
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
39,046,979✔
2204
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
39,046,979✔
2205
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
39,046,979✔
2206
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
39,046,979✔
2207

2208
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
39,046,979✔
2209
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
39,046,979✔
2210

2211
      dataInfo.type = pBasicParam->type;
39,046,979✔
2212
      dataInfo.srcOpType = pBasicParam->srcOpType;
39,046,979✔
2213
      dataInfo.tableSeq = pBasicParam->tableSeq;
39,046,979✔
2214

2215
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
39,046,979✔
2216
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
78,093,958✔
2217
      break;
39,046,979✔
2218
    }
2219
    case EX_SRC_TYPE_STB_JOIN_SCAN:
284,035✔
2220
    default: {
2221
      if (pIdx->inUseIdx < 0) {
284,035✔
2222
        SSourceDataInfo dataInfo = {0};
281,515✔
2223
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
281,515✔
2224
        dataInfo.taskId = pExchangeInfo->pTaskId;
281,515✔
2225
        dataInfo.index = pIdx->srcIdx;
281,515✔
2226
        dataInfo.groupid = 0;
281,515✔
2227
        dataInfo.tagList = NULL;
281,515✔
2228

2229
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
281,515✔
2230
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
281,515✔
2231

2232
        dataInfo.isNewParam = false;
281,515✔
2233
        dataInfo.type = pBasicParam->type;
281,515✔
2234
        dataInfo.srcOpType = pBasicParam->srcOpType;
281,515✔
2235
        dataInfo.tableSeq = pBasicParam->tableSeq;
281,515✔
2236

2237
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
563,030✔
2238

2239
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
281,515✔
2240
      } else {
2241
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,520✔
2242
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,520✔
2243
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,520✔
2244
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,520✔
2245
        }
2246

2247
        pDataInfo->tagList = NULL;
2,520✔
2248
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,520✔
2249
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,520✔
2250

2251
        pDataInfo->groupid = 0;
2,520✔
2252
        pDataInfo->isNewParam = false;
2,520✔
2253
        pDataInfo->type = pBasicParam->type;
2,520✔
2254
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,520✔
2255
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,520✔
2256
      }
2257
      break;
284,035✔
2258
    }
2259
  }
2260

2261
  return code;
46,146,135✔
2262
_return:
×
2263
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2264
  return code;
×
2265
}
2266

2267
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
45,541,244✔
2268
  SExchangeInfo*               pExchangeInfo = pOperator->info;
45,541,244✔
2269
  int32_t                      code = TSDB_CODE_SUCCESS;
45,541,244✔
2270
  SExchangeOperatorBasicParam* pBasicParam = NULL;
45,541,244✔
2271
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
45,541,244✔
2272
  if (pParam->multiParams) {
45,541,244✔
2273
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
2,383,586✔
2274
    int32_t                      iter = 0;
2,383,586✔
2275
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
6,040,501✔
2276
      code = addSingleExchangeSource(pOperator, pBasicParam);
3,656,915✔
2277
      if (code) {
3,656,915✔
2278
        return code;
×
2279
      }
2280
    }
2281
  } else {
2282
    pBasicParam = &pParam->basic;
43,157,658✔
2283
    code = addSingleExchangeSource(pOperator, pBasicParam);
43,157,658✔
2284
  }
2285

2286
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
45,541,244✔
2287
  pOperator->pOperatorGetParam = NULL;
45,541,244✔
2288

2289
  return code;
45,541,244✔
2290
}
2291

2292
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
722,891,796✔
2293
  SExchangeInfo* pExchangeInfo = pOperator->info;
722,891,796✔
2294
  int32_t        code = TSDB_CODE_SUCCESS;
722,892,331✔
2295
  int32_t        lino = 0;
722,892,331✔
2296
  
2297
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
722,892,331✔
2298
       NULL == pOperator->pOperatorGetParam) ||
553,466,374✔
2299
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
173,889,548✔
2300
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
556,958,004✔
2301
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2302
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2303
    return TSDB_CODE_SUCCESS;
556,957,846✔
2304
  }
2305

2306
  if (pExchangeInfo->dynamicOp) {
165,932,824✔
2307
    code = addDynamicExchangeSource(pOperator);
45,541,244✔
2308
    QUERY_CHECK_CODE(code, lino, _end);
45,541,244✔
2309
  }
2310

2311
  if (pOperator->status == OP_NOT_OPENED &&
165,932,576✔
2312
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
157,814,284✔
2313
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
126,175,044✔
2314
    pExchangeInfo->current = 0;
47,300,574✔
2315
  }
2316

2317
  if (NULL != pOperator->pOperatorGetParam) {
165,932,481✔
2318
    SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,464,492✔
2319
    storeNotifyInfo(pOperator);
4,464,492✔
2320

2321
    if (!pGetParam->reUse) {
4,464,492✔
2322
      freeOperatorParam(pGetParam, OP_GET_PARAM);
×
2323
    } else {
2324
      /**
2325
        The param is referenced by getParam, and it will be freed by
2326
        the parent operator after getting next block.
2327
      */
2328
      pGetParam->reUse = false;
4,464,492✔
2329
    }
2330
    pOperator->pOperatorGetParam = NULL;
4,464,492✔
2331
  }
2332

2333
  int64_t st = taosGetTimestampUs();
165,932,570✔
2334

2335
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
165,932,570✔
2336
    code = prepareConcurrentlyLoad(pOperator);
116,335,406✔
2337
    QUERY_CHECK_CODE(code, lino, _end);
116,341,599✔
2338
    pExchangeInfo->openedTs = taosGetTimestampUs();
116,341,599✔
2339
  }
2340

2341
  OPTR_SET_OPENED(pOperator);
165,938,795✔
2342
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
165,935,054✔
2343

2344
_end:
64,285,569✔
2345
  if (code != TSDB_CODE_SUCCESS) {
165,934,648✔
2346
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2347
    pOperator->pTaskInfo->code = code;
×
2348
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2349
  }
2350
  return code;
165,934,648✔
2351
}
2352

2353
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
4,817,904✔
2354
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,817,904✔
2355

2356
  if (pLimitInfo->remainGroupOffset > 0) {
4,817,904✔
2357
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2358
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2359
      blockDataCleanup(pBlock);
×
2360
      return PROJECT_RETRIEVE_CONTINUE;
×
2361
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2362
      // now it is the data from a new group
2363
      pLimitInfo->remainGroupOffset -= 1;
×
2364

2365
      // ignore data block in current group
2366
      if (pLimitInfo->remainGroupOffset > 0) {
×
2367
        blockDataCleanup(pBlock);
×
2368
        return PROJECT_RETRIEVE_CONTINUE;
×
2369
      }
2370
    }
2371

2372
    // set current group id of the project operator
2373
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2374
  }
2375

2376
  // here check for a new group data, we need to handle the data of the previous group.
2377
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
4,817,904✔
2378
    pLimitInfo->numOfOutputGroups += 1;
275,554✔
2379
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
275,554✔
2380
      pOperator->status = OP_EXEC_DONE;
×
2381
      blockDataCleanup(pBlock);
×
2382

2383
      return PROJECT_RETRIEVE_DONE;
×
2384
    }
2385

2386
    // reset the value for a new group data
2387
    resetLimitInfoForNextGroup(pLimitInfo);
275,554✔
2388
    // existing rows that belongs to previous group.
2389
    if (pBlock->info.rows > 0) {
275,554✔
2390
      return PROJECT_RETRIEVE_DONE;
275,554✔
2391
    }
2392
  }
2393

2394
  // here we reach the start position, according to the limit/offset requirements.
2395

2396
  // set current group id
2397
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
4,542,350✔
2398

2399
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
4,542,350✔
2400
  if (pBlock->info.rows == 0) {
4,542,350✔
2401
    return PROJECT_RETRIEVE_CONTINUE;
2,255,848✔
2402
  } else {
2403
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
2,286,502✔
2404
      setOperatorCompleted(pOperator);
×
2405
      return PROJECT_RETRIEVE_DONE;
×
2406
    }
2407
  }
2408

2409
  // todo optimize performance
2410
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2411
  // they may not belong to the same group the limit/offset value is not valid in this case.
2412
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
2,286,502✔
2413
    return PROJECT_RETRIEVE_DONE;
2,286,502✔
2414
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2415
    return PROJECT_RETRIEVE_CONTINUE;
×
2416
  }
2417
}
2418

2419
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
250,380,318✔
2420
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
250,380,318✔
2421
  int32_t        code = TSDB_CODE_SUCCESS;
250,380,696✔
2422
  if (pTask->pWorkerCb) {
250,380,696✔
2423
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
250,381,102✔
2424
    if (code != TSDB_CODE_SUCCESS) {
250,381,102✔
2425
      pTask->code = code;
×
2426
      return pTask->code;
×
2427
    }
2428
  }
2429

2430
  code = tsem_wait(&pExchangeInfo->ready);
250,380,696✔
2431
  if (code != TSDB_CODE_SUCCESS) {
250,380,854✔
2432
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2433
    pTask->code = code;
×
2434
    return pTask->code;
×
2435
  }
2436

2437
  if (pTask->pWorkerCb) {
250,380,854✔
2438
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
250,381,102✔
2439
    if (code != TSDB_CODE_SUCCESS) {
250,381,350✔
2440
      pTask->code = code;
×
2441
      return pTask->code;
×
2442
    }
2443
  }
2444
  return TSDB_CODE_SUCCESS;
250,381,102✔
2445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc