• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5052

13 May 2026 12:00PM UTC coverage: 73.338% (-0.02%) from 73.358%
#5052

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

761 existing lines in 163 files now uncovered.

281469 of 383795 relevant lines covered (73.34%)

134502812.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.73
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
  uint64_t            fetchTimes;   // per-source fetch count
58
  int64_t             fetchCostUs;  // per-source total RPC round-trip (us)
59
} SSourceDataInfo;
60

61
static void destroyExchangeOperatorInfo(void* param);
62
static void freeBlock(void* pParam);
63
static void freeSourceDataInfo(void* param);
64
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
65

66
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
67
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
68
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
69
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
70
static void    storeNotifyInfo(SOperatorInfo* pOperator);
71
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
72
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
73
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
74
                                 bool holdDataInBuf);
75
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
76

77
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
78

79
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
36,478,217✔
80
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
81
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
82
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
21,190,283✔
83
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
84

85

86
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
17,690,280✔
87
                                             SExchangeInfo* pExchangeInfo,
88
                                             SExecTaskInfo* pTaskInfo) {
89
  int32_t code = 0;
17,690,280✔
90
  int32_t lino = 0;
17,690,280✔
91
  int64_t startTs = taosGetTimestampUs();  
17,690,852✔
92
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
17,690,852✔
93
  int32_t completed = 0;
17,691,128✔
94
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
17,691,128✔
95
  if (code != TSDB_CODE_SUCCESS) {
17,691,128✔
96
    pTaskInfo->code = code;
×
97
    T_LONG_JMP(pTaskInfo->env, code);
×
98
  }
99
  if (completed == totalSources) {
17,691,128✔
100
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
2,614,393✔
101
    setAllSourcesCompleted(pOperator);
2,614,393✔
102
    return;
2,617,427✔
103
  }
104

105
  SSourceDataInfo* pDataInfo = NULL;
15,076,735✔
106
  SStreamRuntimeFuncInfo* pStream = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
15,076,735✔
107

108
  while (1) {
7,166,415✔
109
    if (pExchangeInfo->current < 0) {
22,243,426✔
110
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
235,385✔
111
      setAllSourcesCompleted(pOperator);
235,385✔
112
      return;
235,385✔
113
    }
114
    
115
    if (pExchangeInfo->current >= totalSources) {
22,007,187✔
116
      completed = 0;
10,930,190✔
117
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
10,930,190✔
118
      if (code != TSDB_CODE_SUCCESS) {
10,929,914✔
119
        pTaskInfo->code = code;
×
120
        T_LONG_JMP(pTaskInfo->env, code);
×
121
      }
122
      if (completed == totalSources) {
10,929,914✔
123
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
5,053,430✔
124
        setAllSourcesCompleted(pOperator);
5,053,430✔
125
        return;
5,053,430✔
126
      }
127
      
128
      pExchangeInfo->current = 0;
5,876,484✔
129
    }
130

131
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
16,954,325✔
132

133
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
16,954,611✔
134
    if (!pDataInfo) {
16,953,767✔
135
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
136
      pTaskInfo->code = terrno;
×
137
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
138
    }
139

140
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
16,953,767✔
141
      pExchangeInfo->current++;
700,415✔
142
      continue;
700,415✔
143
    }
144

145
    if (!IS_STREAM_SINGLE_GRP(pTaskInfo) && pStream->pGroupReadInfos) {
16,253,644✔
146
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
×
147
      if (!pDataInfo) {
×
148
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
149
        pTaskInfo->code = terrno;
×
150
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
151
      }
152

153
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH) {
×
154
        SArray** ppNode = tSimpleHashGet(pStream->pGroupReadInfos, &pSource->addr.nodeId, sizeof(pSource->addr.nodeId));
×
155
        if (NULL == ppNode) {
×
156
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
×
157
          pExchangeInfo->current++;
×
158
          continue;
×
159
        }
160

161
        pStream->curNodeId = pSource->addr.nodeId;
×
162
        pStream->curGrpRead = *ppNode;
×
163
      }
164
    }
165

166
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
16,253,920✔
167

168
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
16,253,920✔
169
    if (code != TSDB_CODE_SUCCESS) {
16,254,196✔
170
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
171
      pTaskInfo->code = code;
×
172
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
173
    }
174

175
    while (true) {
1,654✔
176
      recordOpExecBeforeDownstream(pOperator);
16,255,850✔
177
      code = exchangeWait(pOperator, pExchangeInfo);
16,255,850✔
178
      recordOpExecAfterDownstream(pOperator, 0);
16,255,850✔
179

180
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
16,255,850✔
181
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,654✔
182
      }
183

184
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
16,254,196✔
185
      if (pDataInfo->seqId != currSeqId) {
16,254,196✔
186
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
1,654✔
187
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
188
        taosMemoryFreeClear(pDataInfo->pRsp);
1,654✔
189
        continue;
1,654✔
190
      }
191

192
      break;
16,251,690✔
193
    }
194

195
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
16,251,690✔
196
    if (!pSource) {
16,252,542✔
197
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
198
      pTaskInfo->code = terrno;
×
199
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
200
    }
201

202
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
16,252,542✔
203
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
204
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
205
             tstrerror(pDataInfo->code));
206
      pTaskInfo->code = pDataInfo->code;
×
207
      T_LONG_JMP(pTaskInfo->env, code);
×
208
    }
209

210
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
16,251,399✔
211
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
16,252,542✔
212

213
    if (pRsp->numOfRows == 0) {
16,252,235✔
214
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
6,466,000✔
215
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
216
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
217
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
218

219
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
6,466,000✔
220
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
6,466,000✔
221
        pExchangeInfo->current = -1;
235,385✔
222
      } else {
223
        pExchangeInfo->current += 1;
6,230,336✔
224
      }
225
      taosMemoryFreeClear(pDataInfo->pRsp);
6,465,721✔
226
      continue;
6,466,000✔
227
    }
228

229
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
9,786,542✔
230
    TAOS_CHECK_EXIT(code);
9,786,266✔
231

232
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
9,786,266✔
233
    if (pRsp->completed == 1) {
9,786,266✔
234
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
2,888,928✔
235
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
236
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
237
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
238
             pExchangeInfo->current + 1, totalSources);
239

240
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,888,928✔
241
      if (isVstbScan(pDataInfo)) {
2,888,928✔
242
        pExchangeInfo->current = -1;
×
243
        taosMemoryFreeClear(pDataInfo->pRsp);
×
244
        continue;
×
245
      }
246
    } else {
247
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
6,897,614✔
248
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
249
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
250
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
251
    }
252

253
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
9,786,542✔
254
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
9,786,542✔
255

256
    pExchangeInfo->current++;
9,786,542✔
257

258
    taosMemoryFreeClear(pDataInfo->pRsp);
9,786,542✔
259
    return;
9,786,542✔
260
  }
261

262
_exit:
×
263

264
  if (code) {
×
265
    pTaskInfo->code = code;
×
266
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
267
  }
268
}
269

270

271
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
258,928,450✔
272
                                           SExecTaskInfo* pTaskInfo) {
273
  int32_t code = 0;
258,928,450✔
274
  int32_t lino = 0;
258,928,450✔
275
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
258,928,450✔
276
  int32_t completed = 0;
258,928,718✔
277
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
258,927,950✔
278
  if (code != TSDB_CODE_SUCCESS) {
258,929,809✔
279
    pTaskInfo->code = code;
×
280
    T_LONG_JMP(pTaskInfo->env, code);
×
281
  }
282
  if (completed == totalSources) {
258,929,809✔
283
    setAllSourcesCompleted(pOperator);
82,921,200✔
284
    return;
82,921,884✔
285
  }
286

287
  SSourceDataInfo* pDataInfo = NULL;
176,008,609✔
288

289
  while (1) {
26,515,749✔
290
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
202,524,358✔
291
    recordOpExecBeforeDownstream(pOperator);
202,524,358✔
292
    code = exchangeWait(pOperator, pExchangeInfo);
202,521,921✔
293
    recordOpExecAfterDownstream(pOperator, 0);
202,524,752✔
294

295
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
202,525,363✔
296
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
666✔
297
    }
298

299
    for (int32_t i = 0; i < totalSources; ++i) {
360,035,065✔
300
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
360,035,065✔
301
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
360,035,120✔
302
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
360,035,120✔
303
        continue;
120,480,009✔
304
      }
305

306
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
239,555,223✔
307
        continue;
37,031,526✔
308
      }
309

310
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
202,523,697✔
311
      if (pDataInfo->seqId != currSeqId) {
202,523,697✔
312
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
313
        taosMemoryFreeClear(pDataInfo->pRsp);
×
314
        break;
×
315
      }
316

317
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
202,522,682✔
318
        code = pDataInfo->code;
3,927✔
319
        TAOS_CHECK_EXIT(code);
3,927✔
320
      }
321

322
      tmemory_barrier();
202,519,603✔
323
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
202,519,603✔
324
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
202,520,214✔
325
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
202,519,793✔
326

327
      // todo
328
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
202,519,793✔
329
      if (pRsp->numOfRows == 0) {
202,519,366✔
330
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
52,158,311✔
331
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
332
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
333
          if (code != TSDB_CODE_SUCCESS) {
×
334
            taosMemoryFreeClear(pDataInfo->pRsp);
×
335
            TAOS_CHECK_EXIT(code);
×
336
          }
337
        } else {
338
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
52,158,311✔
339
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
52,158,311✔
340
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
341
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
342
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
343
          taosMemoryFreeClear(pDataInfo->pRsp);
52,158,311✔
344
        }
345
        break;
52,158,311✔
346
      }
347

348
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
150,361,903✔
349

350
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
150,361,292✔
351
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
150,361,292✔
352
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
150,360,875✔
353

354
      if (pRsp->completed == 1) {
150,361,903✔
355
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
135,484,782✔
356
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
135,485,226✔
357
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
358
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
359
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
360
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
361
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
362
      } else {
363
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
14,876,677✔
364
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
365
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
366
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
367
      }
368

369
      taosMemoryFreeClear(pDataInfo->pRsp);
150,362,347✔
370

371
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
150,361,292✔
372
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
14,876,677✔
373
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
14,876,677✔
374
        if (code != TSDB_CODE_SUCCESS) {
14,876,677✔
375
          taosMemoryFreeClear(pDataInfo->pRsp);
×
376
          TAOS_CHECK_EXIT(code);
×
377
        }
378
      }
379
      
380
      return;
150,361,994✔
381
    }  // end loop
382

383
    int32_t complete1 = 0;
52,158,311✔
384
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
52,157,727✔
385
    if (code != TSDB_CODE_SUCCESS) {
52,158,311✔
386
      pTaskInfo->code = code;
×
387
      T_LONG_JMP(pTaskInfo->env, code);
×
388
    }
389
    if (complete1 == totalSources) {
52,158,311✔
390
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
25,642,562✔
391
      return;
25,641,978✔
392
    }
393
  }
394

395
_exit:
3,927✔
396

397
  if (code) {
3,927✔
398
    pTaskInfo->code = code;
3,927✔
399
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
3,927✔
400
  }
401
}
402

403
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
704,490,211✔
404
  int32_t        code = TSDB_CODE_SUCCESS;
704,490,211✔
405
  SExchangeInfo* pExchangeInfo = pOperator->info;
704,490,211✔
406
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
704,490,795✔
407

408
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
704,490,795✔
409

410
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
704,489,829✔
411
  if (pOperator->status == OP_EXEC_DONE) {
704,490,250✔
412
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
413
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
414
           pLoadInfo->totalElapsed / 1000.0);
415
    return NULL;
×
416
  }
417

418
  // we have buffered retrieved datablock, return it directly
419
  SSDataBlock* p = NULL;
704,488,419✔
420
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
704,489,105✔
421
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
383,241,399✔
422
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
383,241,399✔
423
  }
424

425
  if (p != NULL) {
704,488,548✔
426
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
383,241,399✔
427
    if (!tmp) {
383,240,788✔
428
      code = terrno;
×
429
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
430
      pTaskInfo->code = code;
×
431
      T_LONG_JMP(pTaskInfo->env, code);
×
432
    }
433
    return p;
383,240,788✔
434
  } else {
435
    if (pExchangeInfo->seqLoadData && (IS_NON_STREAM_MODE(pTaskInfo) || IS_STREAM_SINGLE_GRP(pTaskInfo))) {
321,247,149✔
436
      code = seqLoadRemoteData(pOperator);
44,628,038✔
437
      if (code != TSDB_CODE_SUCCESS) {
44,627,097✔
438
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1,409✔
439
        pTaskInfo->code = code;
1,409✔
440
        T_LONG_JMP(pTaskInfo->env, code);
1,409✔
441
      }
442
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
276,621,824✔
443
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
17,691,128✔
444
    } else {
445
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
258,929,809✔
446
    }
447
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
321,243,004✔
448
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
3,927✔
449
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
3,927✔
450
    }
451
    
452
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
321,239,354✔
453
      qDebug("empty resultBlockList");
128,603,215✔
454
      return NULL;
128,603,215✔
455
    } else {
456
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
192,636,927✔
457
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
192,636,040✔
458
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
192,636,765✔
459
      if (!tmp) {
192,637,218✔
460
        code = terrno;
×
461
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
462
        pTaskInfo->code = code;
×
463
        T_LONG_JMP(pTaskInfo->env, code);
×
464
      }
465

466
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
192,637,218✔
467
      return p;
192,636,331✔
468
    }
469
  }
470
}
471

472
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
702,921,946✔
473
  int32_t        code = TSDB_CODE_SUCCESS;
702,921,946✔
474
  int32_t        lino = 0;
702,921,946✔
475
  SExchangeInfo* pExchangeInfo = pOperator->info;
702,921,946✔
476
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
702,926,078✔
477

478
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
702,923,463✔
479

480
  code = pOperator->fpSet._openFn(pOperator);
702,926,443✔
481
  QUERY_CHECK_CODE(code, lino, _end);
702,924,912✔
482

483
  if (pOperator->status == OP_EXEC_DONE) {
702,924,912✔
484
    (*ppRes) = NULL;
308,877✔
485
    return code;
308,877✔
486
  }
487

488
  while (1) {
1,873,383✔
489
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
704,490,228✔
490
    if (pBlock == NULL) {
704,480,026✔
491
      (*ppRes) = NULL;
128,602,631✔
492
      return code;
128,602,631✔
493
    }
494

495
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
575,877,395✔
496
    QUERY_CHECK_CODE(code, lino, _end);
575,877,146✔
497

498
    if (blockDataGetNumOfRows(pBlock) == 0) {
575,877,146✔
499
      qDebug("rows 0 block got, continue next load");
3,724✔
500
      continue;
3,724✔
501
    }
502

503
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
575,873,422✔
504
    if (hasLimitOffsetInfo(pLimitInfo)) {
575,874,033✔
505
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
3,908,935✔
506
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,908,935✔
507
        qDebug("limit retrieve continue");
1,869,659✔
508
        continue;
1,869,659✔
509
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,039,276✔
510
        if (pBlock->info.rows == 0) {
2,039,276✔
511
          setOperatorCompleted(pOperator);
×
512
          (*ppRes) = NULL;
×
513
          return code;
×
514
        } else {
515
          (*ppRes) = pBlock;
2,039,276✔
516
          return code;
2,039,276✔
517
        }
518
      }
519
    } else {
520
      (*ppRes) = pBlock;
571,964,152✔
521
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
571,964,795✔
522
      return code;
571,965,071✔
523
    }
524
  }
525

526
_end:
×
527

528
  if (code != TSDB_CODE_SUCCESS) {
×
529
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
530
    pTaskInfo->code = code;
×
531
    T_LONG_JMP(pTaskInfo->env, code);
×
532
  } else {
533
    qDebug("empty block returned in exchange");
×
534
  }
535
  
536
  (*ppRes) = NULL;
×
537
  return code;
×
538
}
539

540
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
120,788,337✔
541
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
120,788,337✔
542
  if (pInfo->pSourceDataInfo == NULL) {
120,793,167✔
543
    return terrno;
×
544
  }
545

546
  if (pInfo->dynamicOp) {
120,792,150✔
547
    return TSDB_CODE_SUCCESS;
7,097,890✔
548
  }
549

550
  int32_t len = strlen(id) + 1;
113,695,001✔
551
  pInfo->pTaskId = taosMemoryCalloc(1, len);
113,695,001✔
552
  if (!pInfo->pTaskId) {
113,695,813✔
553
    return terrno;
×
554
  }
555
  tstrncpy(pInfo->pTaskId, id, len);
113,692,182✔
556
  for (int32_t i = 0; i < numOfSources; ++i) {
300,333,846✔
557
    SSourceDataInfo dataInfo = {0};
186,637,844✔
558
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
186,635,359✔
559
    dataInfo.taskId = pInfo->pTaskId;
186,635,359✔
560
    dataInfo.index = i;
186,637,387✔
561
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
186,637,387✔
562
    if (pDs == NULL) {
186,639,721✔
563
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
564
      return terrno;
×
565
    }
566
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
186,639,721✔
567
  }
568

569
  return TSDB_CODE_SUCCESS;
113,696,002✔
570
}
571

572
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
120,792,724✔
573
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
120,792,724✔
574

575
  if (numOfSources == 0) {
120,791,326✔
576
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
577
    return TSDB_CODE_INVALID_PARA;
×
578
  }
579
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
120,791,326✔
580
  if (!pInfo->pFetchRpcHandles) {
120,793,735✔
581
    return terrno;
×
582
  }
583
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
120,793,314✔
584
  if (!ret) {
120,791,735✔
585
    return terrno;
×
586
  }
587

588
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
120,791,735✔
589
  if (pInfo->pSources == NULL) {
120,793,179✔
590
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
591
    return terrno;
×
592
  }
593

594
  if (pExNode->node.dynamicOp) {
120,794,319✔
595
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
7,097,890✔
596
    if (NULL == pInfo->pHashSources) {
7,097,890✔
597
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
598
      return terrno;
×
599
    }
600
  }
601

602
  for (int32_t i = 0; i < numOfSources; ++i) {
326,148,380✔
603
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
205,354,472✔
604
    if (!pNode) {
205,350,855✔
605
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
606
      return terrno;
×
607
    }
608
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
205,350,855✔
609
    if (!tmp) {
205,357,239✔
610
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
611
      return terrno;
×
612
    }
613
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
205,357,239✔
614
    int32_t           code =
615
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
205,354,641✔
616
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
205,352,327✔
617
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
618
      return code;
×
619
    }
620
  }
621

622
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
120,793,908✔
623
  int64_t refId = taosAddRef(fetchObjRefPool, pInfo);
120,790,503✔
624
  if (refId < 0) {
120,789,230✔
625
    int32_t code = terrno;
×
626
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
627
    return code;
×
628
  } else {
629
    pInfo->self = refId;
120,789,230✔
630
  }
631

632
  return initDataSource(numOfSources, pInfo, id);
120,790,410✔
633
}
634

635
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
13,172,443✔
636
  SExchangeInfo* pInfo = pOper->info;
13,172,443✔
637
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
13,174,333✔
638

639
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
13,174,379✔
640

641
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
13,174,950✔
642
  pOper->status = OP_NOT_OPENED;
13,175,186✔
643
  pInfo->current = 0;
13,175,186✔
644
  pInfo->loadInfo.totalElapsed = 0;
13,174,910✔
645
  pInfo->loadInfo.totalRows = 0;
13,175,186✔
646
  pInfo->loadInfo.totalSize = 0;
13,175,186✔
647
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
37,949,875✔
648
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
24,774,696✔
649
    taosWLockLatch(&pDataInfo->lock);
24,774,696✔
650
    taosMemoryFreeClear(pDataInfo->decompBuf);
24,774,965✔
651
    taosMemoryFreeClear(pDataInfo->pRsp);
24,774,965✔
652

653
    pDataInfo->totalRows = 0;
24,774,689✔
654
    pDataInfo->code = 0;
24,774,420✔
655
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
24,774,420✔
656
    pDataInfo->fetchSent = false;
24,774,696✔
657
    pDataInfo->fetchTimes = 0;
24,774,696✔
658
    pDataInfo->fetchCostUs = 0;
24,774,965✔
659
    taosWUnLockLatch(&pDataInfo->lock);
24,774,965✔
660
  }
661

662
  if (pInfo->dynamicOp) {
13,175,186✔
663
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
2,460,759✔
664
  } 
665

666
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
13,175,186✔
667
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
13,174,096✔
668

669
  blockDataCleanup(pInfo->pDummyBlock);
13,174,910✔
670

671
  void   *data = NULL;
13,174,634✔
672
  int32_t iter = 0;
13,174,634✔
673
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
17,603,949✔
674
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
4,429,315✔
675
  }
676
  
677
  pInfo->limitInfo = (SLimitInfo){0};
13,174,910✔
678
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
13,174,596✔
679

680
  return 0;
13,174,641✔
681
}
682

683
static int32_t exchangeGetExplainExecInfo(SOperatorInfo* pOptr,
1,872,294✔
684
                                          void** pOptrExplain, uint32_t* len) {
685
  const SExchangeInfo* pExchangeInfo = pOptr->info;
1,872,294✔
686
  int32_t numSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
1,872,294✔
687

688
  SExchangeExplainInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeExplainInfo));
1,872,294✔
689
  if (!pInfo) {
1,871,850✔
690
    return terrno;
×
691
  }
692

693
  pInfo->mode = pExchangeInfo->seqLoadData ? 1 : 0;
1,871,850✔
694
  pInfo->numSources = numSources;
1,871,850✔
695

696
  /* all sources are exhausted, thus no need to lock the sources data info */
697
  for (int32_t i = 0; i < numSources; ++i) {
4,095,710✔
698
    const SSourceDataInfo* pSrc = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2,223,416✔
699
    pInfo->avgFetchTimes += (double)pSrc->fetchTimes / numSources;
2,222,972✔
700
    pInfo->avgFetchRows += (double)pSrc->totalRows / numSources;
2,223,860✔
701
    pInfo->avgFetchCost += (double)pSrc->fetchCostUs / numSources;
2,223,860✔
702
    pInfo->maxFetchTimes = TMAX(pInfo->maxFetchTimes, pSrc->fetchTimes);
2,223,860✔
703
    pInfo->maxFetchRows = TMAX(pInfo->maxFetchRows, pSrc->totalRows);
2,223,860✔
704
    pInfo->maxFetchCost = TMAX(pInfo->maxFetchCost, pSrc->fetchCostUs);
2,223,860✔
705
  }
706

707
  *pOptrExplain = pInfo;
1,872,294✔
708
  *len = sizeof(SExchangeExplainInfo);
1,872,294✔
709
  return TSDB_CODE_SUCCESS;
1,872,294✔
710
}
711

712
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
120,792,172✔
713
                                   SOperatorInfo** pOptrInfo) {
714
  QRY_PARAM_CHECK(pOptrInfo);
120,792,172✔
715

716
  int32_t        code = 0;
120,794,156✔
717
  int32_t        lino = 0;
120,794,156✔
718
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
120,794,156✔
719
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
120,788,940✔
720
  if (pInfo == NULL || pOperator == NULL) {
120,789,854✔
UNCOV
721
    code = terrno;
×
722
    goto _error;
×
723
  }
724
  initOperatorCostInfo(pOperator);
120,790,275✔
725

726
  pInfo->isExchange = true;
120,792,567✔
727
  pOperator->pPhyNode = pExNode;
120,792,140✔
728
  pInfo->dynamicOp = pExNode->node.dynamicOp;
120,793,308✔
729
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
120,791,294✔
730
  QUERY_CHECK_CODE(code, lino, _error);
120,793,471✔
731

732
  code = tsem_init(&pInfo->ready, 0, 0);
120,793,471✔
733
  QUERY_CHECK_CODE(code, lino, _error);
120,794,313✔
734

735
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
120,794,313✔
736
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
120,794,313✔
737

738
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
120,792,494✔
739
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
120,791,163✔
740
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
120,791,719✔
741
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
120,793,729✔
742

743
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
120,794,313✔
744
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
120,793,886✔
745
  QUERY_CHECK_CODE(code, lino, _error);
120,792,804✔
746

747
  pInfo->seqLoadData = pExNode->seqRecvData;
120,792,804✔
748
  pInfo->dynTbname = pExNode->dynTbname;
120,791,799✔
749
  if (pInfo->dynTbname) {
120,792,813✔
750
    pInfo->seqLoadData = true;
40,345✔
751
  }
752
  pInfo->pTransporter = pTransporter;
120,790,241✔
753

754
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
120,791,853✔
755
                  pTaskInfo);
756
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
120,789,634✔
757

758
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
120,791,455✔
759
                            pTaskInfo->pStreamRuntimeInfo);
120,792,276✔
760
  QUERY_CHECK_CODE(code, lino, _error);
120,790,957✔
761
  filterSetExecContext(pOperator->exprSupp.pFilterInfo, pTaskInfo, isTaskKilled);
120,790,957✔
762
  qTrace("%s exchange op:%p", __func__, pOperator);
120,791,051✔
763
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL,
120,791,495✔
764
                                         destroyExchangeOperatorInfo, optrDefaultBufFn,
765
                                         exchangeGetExplainExecInfo, optrDefaultGetNextExtFn, NULL);
766
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
120,789,488✔
767
  *pOptrInfo = pOperator;
120,788,906✔
768
  return TSDB_CODE_SUCCESS;
120,788,383✔
769

770
_error:
×
771
  if (code != TSDB_CODE_SUCCESS) {
×
772
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
773
    pTaskInfo->code = code;
×
774
  }
775
  if (pInfo != NULL) {
×
776
    doDestroyExchangeOperatorInfo(pInfo);
×
777
  }
778

779
  if (pOperator != NULL) {
×
780
    pOperator->info = NULL;
×
781
    destroyOperator(pOperator);
×
782
  }
783
  return code;
×
784
}
785

786
void destroyExchangeOperatorInfo(void* param) {
120,792,961✔
787
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
120,792,961✔
788
  int32_t        code = taosRemoveRef(fetchObjRefPool, pExInfo->self);
120,792,961✔
789
  if (code != TSDB_CODE_SUCCESS) {
120,794,740✔
790
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
791
  }
792
}
120,794,740✔
793

794
void freeBlock(void* pParam) {
341,625,391✔
795
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
341,625,391✔
796
  blockDataDestroy(pBlock);
341,625,972✔
797
}
341,626,844✔
798

799
void freeSourceDataInfo(void* p) {
191,165,085✔
800
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
191,165,085✔
801
  taosMemoryFreeClear(pInfo->decompBuf);
191,165,085✔
802
  taosMemoryFreeClear(pInfo->pRsp);
191,167,457✔
803

804
  pInfo->decompBufSize = 0;
191,167,310✔
805
}
191,167,282✔
806

807
void doDestroyExchangeOperatorInfo(void* param) {
120,792,961✔
808
  if (param == NULL) {
120,792,961✔
809
    return;
×
810
  }
811
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
120,792,961✔
812
  if (pExInfo->pFetchRpcHandles) {
120,792,961✔
813
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
326,150,238✔
814
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
205,356,693✔
815
      if (*pRpcHandle > 0) {
205,357,698✔
816
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
9,739,042✔
817
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
9,739,042✔
818
      }
819
    }
820
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
120,793,735✔
821
  }
822

823
  taosArrayDestroy(pExInfo->pSources);
120,793,400✔
824
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
120,793,044✔
825

826
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
120,791,431✔
827
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
120,793,573✔
828

829
  blockDataDestroy(pExInfo->pDummyBlock);
120,792,976✔
830
  tSimpleHashCleanup(pExInfo->pHashSources);
120,794,740✔
831

832
  int32_t code = tsem_destroy(&pExInfo->ready);
120,793,151✔
833
  if (code != TSDB_CODE_SUCCESS) {
120,793,545✔
834
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
835
  }
836
  taosMemoryFreeClear(pExInfo->pTaskId);
120,793,545✔
837

838
  taosMemoryFreeClear(param);
120,791,944✔
839
}
840

841
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
264,378,920✔
842
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
264,378,920✔
843

844
  taosMemoryFreeClear(pMsg->pEpSet);
264,378,920✔
845
  SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pWrapper->exchangeId);
264,411,869✔
846
  if (pExchangeInfo == NULL) {
264,420,275✔
847
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
41,588✔
848
    taosMemoryFree(pMsg->pData);
41,588✔
849
    return TSDB_CODE_SUCCESS;
41,588✔
850
  }
851

852
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
264,378,687✔
853
  if (pWrapper->seqId != currSeqId) {
264,376,003✔
854
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
855
    taosMemoryFree(pMsg->pData);
×
856
    code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
×
857
    if (code != TSDB_CODE_SUCCESS) {
×
858
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
859
    }
860
    return TSDB_CODE_SUCCESS;
×
861
  }
862

863
  int32_t          index = pWrapper->sourceIndex;
264,353,127✔
864

865
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
264,361,896✔
866

867
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
264,373,315✔
868
  if (pRpcHandle != NULL) {
264,345,886✔
869
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
264,356,112✔
870
    if (ret != 0) {
264,358,208✔
871
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
12,670,121✔
872
    }
873
    *pRpcHandle = -1;
264,358,208✔
874
  }
875

876
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
264,316,328✔
877
  if (!pSourceDataInfo) {
264,370,206✔
878
    return terrno;
×
879
  }
880

881
  if (0 == code && NULL == pMsg->pData) {
264,370,206✔
882
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
883
    code = TSDB_CODE_QRY_INVALID_MSG;
×
884
  }
885

886
  taosWLockLatch(&pSourceDataInfo->lock);
264,393,957✔
887
  if (code == TSDB_CODE_SUCCESS) {
264,387,866✔
888
    pSourceDataInfo->fetchCostUs += taosGetTimestampUs() - pSourceDataInfo->startTime;
264,362,611✔
889
    pSourceDataInfo->fetchTimes++;
264,322,635✔
890

891
    pSourceDataInfo->seqId = pWrapper->seqId;
264,328,651✔
892
    pSourceDataInfo->pRsp = pMsg->pData;
264,324,308✔
893

894
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
264,290,920✔
895
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
264,307,472✔
896
    pRsp->compLen = htonl(pRsp->compLen);
264,325,087✔
897
    pRsp->payloadLen = htonl(pRsp->payloadLen);
264,319,337✔
898
    pRsp->numOfCols = htonl(pRsp->numOfCols);
264,331,490✔
899
    pRsp->useconds = htobe64(pRsp->useconds);
264,293,253✔
900
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
264,323,323✔
901

902
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
264,252,428✔
903
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
904
  } else {
905
    taosMemoryFree(pMsg->pData);
8,542✔
906
    pSourceDataInfo->code = rpcCvtErrCode(code);
8,542✔
907
    if (pSourceDataInfo->code != code) {
8,542✔
908
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
909
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
910
    } else {
911
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
8,542✔
912
             pExchangeInfo);
913
    }
914
  }
915

916
  tmemory_barrier();
264,273,943✔
917
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
264,273,943✔
918
  taosWUnLockLatch(&pSourceDataInfo->lock);
264,324,158✔
919
  
920
  code = tsem_post(&pExchangeInfo->ready);
264,332,188✔
921
  if (code != TSDB_CODE_SUCCESS) {
264,364,370✔
922
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
923
    return code;
×
924
  }
925

926
  code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
264,364,370✔
927
  if (code != TSDB_CODE_SUCCESS) {
264,389,285✔
928
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
929
  }
930
  return code;
264,387,815✔
931
}
932

933
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
297,093✔
934
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
297,093✔
935
  if (NULL == *ppRes) {
297,093✔
936
    return terrno;
×
937
  }
938

939
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
297,093✔
940
  if (NULL == pScan) {
297,093✔
941
    taosMemoryFreeClear(*ppRes);
×
942
    return terrno;
×
943
  }
944

945
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
297,093✔
946
  pScan->pUidList = taosArrayDup(pUidList, NULL);
297,093✔
947
  if (NULL == pScan->pUidList) {
297,093✔
948
    taosMemoryFree(pScan);
×
949
    taosMemoryFreeClear(*ppRes);
×
950
    return terrno;
×
951
  }
952
  pScan->dynType = DYN_TYPE_STB_JOIN;
297,093✔
953
  pScan->tableSeq = tableSeq;
297,093✔
954
  pScan->pOrgTbInfo = NULL;
297,093✔
955
  pScan->pBatchTbInfo = NULL;
297,093✔
956
  pScan->pTagList = NULL;
297,093✔
957
  pScan->isNewParam = false;
297,093✔
958
  pScan->window.skey = INT64_MAX;
297,093✔
959
  pScan->window.ekey = INT64_MIN;
297,093✔
960
  pScan->notifyToProcess = false;
297,093✔
961
  pScan->notifyTs = 0;
297,093✔
962

963
  (*ppRes)->opType = srcOpType;
297,093✔
964
  (*ppRes)->downstreamIdx = 0;
297,093✔
965
  (*ppRes)->value = pScan;
297,093✔
966
  (*ppRes)->pChildren = NULL;
297,093✔
967
  (*ppRes)->reUse = false;
297,093✔
968

969
  return TSDB_CODE_SUCCESS;
297,093✔
970
}
971

972
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
41,920,748✔
973
  int32_t                  code = TSDB_CODE_SUCCESS;
41,920,748✔
974
  int32_t                  lino = 0;
41,920,748✔
975
  STableScanOperatorParam* pScan = NULL;
41,920,748✔
976

977
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
41,920,748✔
978
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
41,920,748✔
979

980
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
41,920,748✔
981
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
41,920,748✔
982

983
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
41,920,748✔
984
  if (pUidList) {
41,920,748✔
985
    pScan->pUidList = taosArrayDup(pUidList, NULL);
41,920,748✔
986
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
41,920,748✔
987
  } else {
988
    pScan->pUidList = NULL;
×
989
  }
990

991
  if (pMap) {
41,920,748✔
992
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
40,981,148✔
993
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
40,981,148✔
994

995
    pScan->pOrgTbInfo->vgId = pMap->vgId;
40,981,148✔
996
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
40,981,148✔
997

998
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
40,981,148✔
999
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
40,981,148✔
1000
  } else {
1001
    pScan->pOrgTbInfo = NULL;
939,600✔
1002
  }
1003
  pScan->pTagList = NULL;
41,920,748✔
1004
  pScan->pBatchTbInfo = NULL;
41,920,748✔
1005

1006

1007
  pScan->dynType = type;
41,920,748✔
1008
  pScan->tableSeq = tableSeq;
41,920,748✔
1009
  pScan->window.skey = window->skey;
41,920,748✔
1010
  pScan->window.ekey = window->ekey;
41,920,748✔
1011
  pScan->isNewParam = isNewParam;
41,920,748✔
1012
  pScan->notifyToProcess = false;
41,920,748✔
1013
  pScan->notifyTs = 0;
41,920,748✔
1014
  (*ppRes)->opType = srcOpType;
41,920,748✔
1015
  (*ppRes)->downstreamIdx = 0;
41,920,748✔
1016
  (*ppRes)->value = pScan;
41,920,748✔
1017
  (*ppRes)->pChildren = NULL;
41,920,748✔
1018
  (*ppRes)->reUse = false;
41,920,748✔
1019

1020
  return code;
41,920,748✔
1021
_return:
×
1022
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1023
  taosMemoryFreeClear(*ppRes);
×
1024
  if (pScan) {
×
1025
    taosArrayDestroy(pScan->pUidList);
×
1026
    if (pScan->pOrgTbInfo) {
×
1027
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
1028
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
1029
    }
1030
    taosMemoryFree(pScan);
×
1031
  }
1032
  return code;
×
1033
}
1034

1035
/**
1036
  @brief build the table scan operator param for notify message
1037
*/
1038
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
256,578✔
1039
                                          int32_t srcOpType, TSKEY notifyTs) {
1040
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
256,578✔
1041
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
1042
    return TSDB_CODE_INVALID_PARA;
×
1043
  }
1044
  int32_t code = TSDB_CODE_SUCCESS;
256,578✔
1045
  int32_t lino = 0;
256,578✔
1046
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
256,578✔
1047
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
256,578✔
1048

1049
  STableScanOperatorParam* pTsParam =
513,156✔
1050
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
256,578✔
1051
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
256,578✔
1052

1053
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
256,578✔
1054
  pTsParam->notifyToProcess = true;
256,578✔
1055
  pTsParam->notifyTs = notifyTs;
256,578✔
1056

1057
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
256,578✔
1058
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1059
  (*ppRes)->downstreamIdx = 0;
256,578✔
1060
  (*ppRes)->value = pTsParam;
256,578✔
1061
  (*ppRes)->pChildren = NULL;
256,578✔
1062
  /* param is not reusable when it is transferred by message */
1063
  (*ppRes)->reUse = false;
256,578✔
1064

1065
_return:
256,578✔
1066
  if (TSDB_CODE_SUCCESS != code) {
256,578✔
1067
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1068
           __func__, lino, tstrerror(code));
1069
    taosMemoryFreeClear(*ppRes);
×
1070
    if (pTsParam) {
×
1071
      taosMemoryFree(pTsParam);
×
1072
    }
1073
  }
1074
  return code;
256,578✔
1075
}
1076

1077
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
2,833,266✔
1078
  int32_t                  code = TSDB_CODE_SUCCESS;
2,833,266✔
1079
  int32_t                  lino = 0;
2,833,266✔
1080
  STableScanOperatorParam* pScan = NULL;
2,833,266✔
1081

1082
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
2,833,266✔
1083
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
2,833,266✔
1084

1085
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
2,833,266✔
1086
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
2,833,266✔
1087

1088
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
2,833,266✔
1089
  pScan->groupid = groupid;
2,833,266✔
1090
  if (pUidList) {
2,833,266✔
1091
    pScan->pUidList = taosArrayDup(pUidList, NULL);
2,833,266✔
1092
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
2,833,266✔
1093
  } else {
1094
    pScan->pUidList = NULL;
×
1095
  }
1096
  pScan->pOrgTbInfo = NULL;
2,833,266✔
1097

1098
  if (pBatchMap) {
2,833,266✔
1099
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
2,833,266✔
1100
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
2,833,266✔
1101
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
8,204,514✔
1102
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
5,371,248✔
1103
      SOrgTbInfo batchInfo = {0};
5,371,248✔
1104
      batchInfo.vgId = pSrcInfo->vgId;
5,371,248✔
1105
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
5,371,248✔
1106
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
5,371,248✔
1107
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
5,371,248✔
1108
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
5,371,248✔
1109
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
5,371,248✔
1110
    }
1111
  } else {
1112
    pScan->pBatchTbInfo = NULL;
×
1113
  }
1114

1115
  if (pTagList) {
2,833,266✔
1116
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
917,132✔
1117
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
917,132✔
1118

1119
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
5,681,520✔
1120
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
4,764,388✔
1121
      STagVal  dstTag;
4,764,388✔
1122
      dstTag.type = pSrcTag->type;
4,764,388✔
1123
      dstTag.cid = pSrcTag->cid;
4,764,388✔
1124
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,764,388✔
1125
        dstTag.nData = pSrcTag->nData;
2,068,332✔
1126
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
2,068,332✔
1127
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
2,068,332✔
1128
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
2,068,332✔
1129
      } else {
1130
        dstTag.i64 = pSrcTag->i64;
2,696,056✔
1131
      }
1132

1133
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
9,528,776✔
1134
    }
1135
  } else {
1136
    pScan->pTagList = NULL;
1,916,134✔
1137
  }
1138

1139

1140
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
2,833,266✔
1141
  pScan->tableSeq = tableSeq;
2,833,266✔
1142
  pScan->window.skey = window->skey;
2,833,266✔
1143
  pScan->window.ekey = window->ekey;
2,833,266✔
1144
  pScan->isNewParam = isNewParam;
2,833,266✔
1145
  pScan->notifyToProcess = false;
2,833,266✔
1146
  pScan->notifyTs = 0;
2,833,266✔
1147
  (*ppRes)->opType = srcOpType;
2,833,266✔
1148
  (*ppRes)->downstreamIdx = 0;
2,833,266✔
1149
  (*ppRes)->value = pScan;
2,833,266✔
1150
  (*ppRes)->pChildren = NULL;
2,833,266✔
1151
  (*ppRes)->reUse = false;
2,833,266✔
1152

1153
  return code;
2,833,266✔
1154
_return:
×
1155
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1156
  taosMemoryFreeClear(*ppRes);
×
1157
  if (pScan) {
×
1158
    taosArrayDestroy(pScan->pUidList);
×
1159
    if (pScan->pBatchTbInfo) {
×
1160
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1161
    }
1162
    taosMemoryFree(pScan);
×
1163
  }
1164
  return code;
×
1165
}
1166

1167
/*
1168
 * Build hash-agg operator get-param for dynamic virtual-table aggregation.
1169
 *
1170
 * @param ppRes Output operator param.
1171
 * @param groupid Group id bound to this batch.
1172
 * @param pUidList Source table uid list.
1173
 * @param pBatchMap Batch table metadata map.
1174
 * @param pTagList Optional tag value list.
1175
 * @param tableSeq Whether to keep table-sequential mode.
1176
 * @param window Time window for downstream scan.
1177
 * @param isNewParam Whether downstream should treat this as a new param batch.
1178
 *
1179
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1180
 */
1181
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
1,829,220✔
1182
                              SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1183
  int32_t                  code = TSDB_CODE_SUCCESS;
1,829,220✔
1184
  int32_t                  lino = 0;
1,829,220✔
1185
  SOperatorParam*          pParam = NULL;
1,829,220✔
1186

1187
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
1,829,220✔
1188
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
1,829,220✔
1189

1190
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
1,829,220✔
1191
  pParam->downstreamIdx = 0;
1,829,220✔
1192
  pParam->value = NULL;
1,829,220✔
1193
  pParam->pChildren = NULL;
1,829,220✔
1194
  pParam->reUse = false;
1,829,220✔
1195

1196
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
1,829,220✔
1197
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
1,829,220✔
1198

1199
  SOperatorParam* pTableScanParam = NULL;
1,829,220✔
1200
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
1,829,220✔
1201
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1202
  QUERY_CHECK_CODE(code, lino, _return);
1,829,220✔
1203

1204
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
3,658,440✔
1205

1206
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
1,829,220✔
1207
  pParam->downstreamIdx = 0;
1,829,220✔
1208
  pParam->value = NULL;
1,829,220✔
1209
  pParam->reUse = false;
1,829,220✔
1210

1211
  *ppRes = pParam;
1,829,220✔
1212
  return code;
1,829,220✔
1213

1214
_return:
×
1215
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1216
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1217
  return code;
×
1218
}
1219

1220
/*
1221
 * Build hash-interval operator get-param for dynamic virtual-table interval query.
1222
 *
1223
 * @param ppRes Output operator param.
1224
 * @param groupid Group id bound to this batch.
1225
 * @param pUidList Source table uid list.
1226
 * @param pBatchMap Batch table metadata map.
1227
 * @param pTagList Optional tag value list.
1228
 * @param tableSeq Whether to keep table-sequential mode.
1229
 * @param window Time window for downstream scan.
1230
 * @param isNewParam Whether downstream should treat this as a new param batch.
1231
 *
1232
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1233
 */
1234
static int32_t buildIntervalOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
73,422✔
1235
                                          SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1236
  int32_t         code = TSDB_CODE_SUCCESS;
73,422✔
1237
  int32_t         lino = 0;
73,422✔
1238
  SOperatorParam* pParam = NULL;
73,422✔
1239

1240
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
73,422✔
1241
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
73,422✔
1242

1243
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
73,422✔
1244
  pParam->downstreamIdx = 0;
73,422✔
1245
  pParam->value = NULL;
73,422✔
1246
  pParam->reUse = false;
73,422✔
1247
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
73,422✔
1248
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
73,422✔
1249

1250
  SOperatorParam* pTableScanParam = NULL;
73,422✔
1251
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
73,422✔
1252
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1253
  QUERY_CHECK_CODE(code, lino, _return);
73,422✔
1254

1255
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
146,844✔
1256

1257
  *ppRes = pParam;
73,422✔
1258
  return code;
73,422✔
1259

1260
_return:
×
1261
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1262
  qError("%s failed at %d, failed to build interval scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1263
  return code;
×
1264
}
1265

1266
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
3,359,500✔
1267
  int32_t                  code = TSDB_CODE_SUCCESS;
3,359,500✔
1268
  int32_t                  lino = 0;
3,359,500✔
1269
  STagScanOperatorParam*   pScan = NULL;
3,359,500✔
1270

1271
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
3,359,500✔
1272
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,359,500✔
1273

1274
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
3,359,500✔
1275
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,359,500✔
1276
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
3,359,500✔
1277

1278
  (*ppRes)->opType = srcOpType;
3,359,500✔
1279
  (*ppRes)->downstreamIdx = 0;
3,359,500✔
1280
  (*ppRes)->value = pScan;
3,359,500✔
1281
  (*ppRes)->pChildren = NULL;
3,359,500✔
1282
  (*ppRes)->reUse = false;
3,359,500✔
1283

1284
  return code;
3,359,500✔
1285
_return:
×
1286
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1287
  taosMemoryFreeClear(*ppRes);
×
1288
  if (pScan) {
×
1289
    taosMemoryFree(pScan);
×
1290
  }
1291
  return code;
×
1292
}
1293

1294
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
5,696,234✔
1295
  if (!pRuntimeInfo || !pTimeRange) {
5,696,234✔
1296
    return TSDB_CODE_INTERNAL_ERROR;
×
1297
  }
1298

1299
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
5,696,510✔
1300
  if (!pParam) {
5,696,234✔
1301
    return TSDB_CODE_INTERNAL_ERROR;
×
1302
  }
1303

1304
  switch (pRuntimeInfo->triggerType) {
5,696,234✔
1305
    case STREAM_TRIGGER_SLIDING:
3,978,483✔
1306
      // Unable to distinguish whether there is an interval, all use wstart/wend
1307
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1308
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,978,483✔
1309
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,978,483✔
1310
      break;
3,978,483✔
1311
    case STREAM_TRIGGER_PERIOD:
589,468✔
1312
      pTimeRange->skey = pParam->prevLocalTime;
589,468✔
1313
      pTimeRange->ekey = pParam->triggerTime;
589,468✔
1314
      break;
589,468✔
1315
    default:
1,128,007✔
1316
      pTimeRange->skey = pParam->wstart;
1,128,007✔
1317
      pTimeRange->ekey = pParam->wend;
1,128,559✔
1318
      break;
1,128,283✔
1319
  }
1320

1321
  return TSDB_CODE_SUCCESS;
5,696,234✔
1322
}
1323

1324
void clearVtbScanDataInfo(void* pItem) {
54,467,872✔
1325
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
54,467,872✔
1326
  if (pInfo->orgTbInfo) {
54,467,872✔
1327
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
40,981,148✔
1328
    taosMemoryFreeClear(pInfo->orgTbInfo);
40,981,148✔
1329
  }
1330
  if (pInfo->batchOrgTbInfo) {
54,467,872✔
1331
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
8,204,514✔
1332
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
5,371,248✔
1333
      if (pColMap) {
5,371,248✔
1334
        taosArrayDestroy(pColMap->colMap);
5,371,248✔
1335
      }
1336
    }
1337
    taosArrayDestroy(pInfo->batchOrgTbInfo);
2,833,266✔
1338
    pInfo->batchOrgTbInfo = NULL;
2,833,266✔
1339
  }
1340
  if (pInfo->tagList) {
54,467,872✔
1341
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
917,132✔
1342
    pInfo->tagList = NULL;
917,132✔
1343
  }
1344
  if (pInfo->pSrcUidList) {
54,467,872✔
1345
    taosArrayDestroy(pInfo->pSrcUidList);
43,814,414✔
1346
    pInfo->pSrcUidList = NULL;
43,814,414✔
1347
  }
1348
}
54,467,872✔
1349

1350
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
272,265,751✔
1351
  int32_t          code = TSDB_CODE_SUCCESS;
272,265,751✔
1352
  int32_t          lino = 0;
272,265,751✔
1353
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
272,265,751✔
1354
  QUERY_CHECK_NULL(pDataInfo, code, lino, _end, terrno);
272,266,333✔
1355

1356
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
272,266,333✔
1357
    return TSDB_CODE_SUCCESS;
7,604,593✔
1358
  }
1359

1360
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
264,659,039✔
1361
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
264,657,743✔
1362
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
264,663,142✔
1363

1364
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
264,663,142✔
1365

1366
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
264,661,758✔
1367
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
264,660,591✔
1368
  pWrapper->exchangeId = pExchangeInfo->self;
264,660,591✔
1369
  pWrapper->sourceIndex = sourceIndex;
264,663,199✔
1370
  pWrapper->seqId = pExchangeInfo->seqId;
264,660,750✔
1371

1372
  if (pSource->localExec) {
264,663,255✔
1373
    SDataBuf pBuf = {0};
×
1374
    code =
1375
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1376
                                  pTaskInfo->id.queryId, pSource->clientId,
1377
                                  pSource->taskId, 0, pSource->execId,
1378
                                  &pBuf.pData,
1379
                                  pTaskInfo->localFetch.explainRes);
1380
    QUERY_CHECK_CODE(code, lino, _end);
×
1381
    pDataInfo->startTime = taosGetTimestampUs();
×
1382
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1383
    taosMemoryFreeClear(pWrapper);
×
1384
    QUERY_CHECK_CODE(code, lino, _end);
×
1385
  } else {
1386
    bool needStreamRtInfo = true;
264,658,444✔
1387
    bool needStreamGrpInfo = false;
264,658,444✔
1388
    SResFetchReq req = {0};
264,658,444✔
1389
    req.header.vgId = pSource->addr.nodeId;
264,660,487✔
1390
    req.sId = pSource->sId;
264,662,496✔
1391
    req.clientId = pSource->clientId;
264,660,949✔
1392
    req.taskId = pSource->taskId;
264,661,002✔
1393
    req.queryId = pTaskInfo->id.queryId;
264,659,068✔
1394
    req.execId = pSource->execId;
264,657,310✔
1395
    if (pTaskInfo->pStreamRuntimeInfo) {
264,662,067✔
1396
      req.dynTbname = pExchangeInfo->dynTbname;
16,802,294✔
1397
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
16,802,294✔
1398
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
16,801,983✔
1399

1400
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
16,802,294✔
1401
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
1,823,871✔
1402
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
14,978,112✔
1403
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
5,696,234✔
1404
        QUERY_CHECK_CODE(code, lino, _end);
5,696,510✔
1405
        needStreamRtInfo = false;
5,696,510✔
1406
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
5,696,510✔
1407
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1408
               req.pStRtFuncInfo->curWindow.ekey);
1409
      } else {
1410
        needStreamGrpInfo = true;
9,281,878✔
1411
      }
1412
      
1413
      if (!pDataInfo->fetchSent) {
16,802,259✔
1414
        req.reset = pDataInfo->fetchSent = true;
9,926,289✔
1415
      }
1416
    }
1417

1418
    switch (pDataInfo->type) {
264,657,762✔
1419
      case EX_SRC_TYPE_VSTB_SCAN: {
40,981,148✔
1420
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
40,981,148✔
1421
        clearVtbScanDataInfo(pDataInfo);
40,981,148✔
1422
        QUERY_CHECK_CODE(code, lino, _end);
40,981,148✔
1423
        break;
40,981,148✔
1424
      }
1425
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
1,856,250✔
1426
        if (pDataInfo->pSrcUidList) {
1,856,250✔
1427
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
939,600✔
1428
          taosArrayDestroy(pDataInfo->pSrcUidList);
939,600✔
1429
          pDataInfo->pSrcUidList = NULL;
939,600✔
1430
          QUERY_CHECK_CODE(code, lino, _end);
939,600✔
1431
        }
1432
        break;
1,856,250✔
1433
      }
1434
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
3,359,500✔
1435
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
3,359,500✔
1436
        taosArrayDestroy(pDataInfo->pSrcUidList);
3,359,500✔
1437
        pDataInfo->pSrcUidList = NULL;
3,359,500✔
1438
        QUERY_CHECK_CODE(code, lino, _end);
3,359,500✔
1439
        break;
3,359,500✔
1440
      }
1441
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
2,059,933✔
1442
      case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
1443
      case EX_SRC_TYPE_VSTB_TS_SCAN: {
1444
        if (pDataInfo->batchOrgTbInfo) {
2,059,933✔
1445
          int32_t srcOpType =
930,624✔
1446
              (pDataInfo->type == EX_SRC_TYPE_VSTB_TS_SCAN)
930,624✔
1447
                  ? QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
1448
                  : QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
930,624✔
1449
          code = buildTableScanOperatorParamBatchInfo(
1,861,248✔
1450
              &req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, srcOpType,
1451
              pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window,
930,624✔
1452
              pDataInfo->isNewParam);
930,624✔
1453
          clearVtbScanDataInfo(pDataInfo);
930,624✔
1454
          QUERY_CHECK_CODE(code, lino, _end);
930,624✔
1455
        }
1456
        break;
2,059,933✔
1457
      }
1458
      case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN: {
73,422✔
1459
        if (pDataInfo->batchOrgTbInfo) {
73,422✔
1460
          code = buildIntervalOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
146,844✔
1461
                                            pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq,
73,422✔
1462
                                            &pDataInfo->window, pDataInfo->isNewParam);
73,422✔
1463
          clearVtbScanDataInfo(pDataInfo);
73,422✔
1464
          QUERY_CHECK_CODE(code, lino, _end);
73,422✔
1465
        }
1466
        break;
73,422✔
1467
      }
1468
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1,829,220✔
1469
        if (pDataInfo->batchOrgTbInfo) {
1,829,220✔
1470
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
3,658,440✔
1471
                                       pDataInfo->batchOrgTbInfo, pDataInfo->tagList,
1472
                                       pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
3,658,440✔
1473
          clearVtbScanDataInfo(pDataInfo);
1,829,220✔
1474
          QUERY_CHECK_CODE(code, lino, _end);
1,829,220✔
1475
        }
1476
        break;
1,829,220✔
1477
      }
1478
      case EX_SRC_TYPE_STB_JOIN_SCAN:
214,496,877✔
1479
      default: {
1480
        if (pDataInfo->pSrcUidList) {
214,496,877✔
1481
          code = buildTableScanOperatorParam(&req.pOpParam,
283,248✔
1482
                                             pDataInfo->pSrcUidList,
1483
                                             pDataInfo->srcOpType,
1484
                                             pDataInfo->tableSeq);
283,248✔
1485
          /* source uid list can be reused in vnode size, so only use once */
1486
          taosArrayDestroy(pDataInfo->pSrcUidList);
283,248✔
1487
          pDataInfo->pSrcUidList = NULL;
283,248✔
1488
          QUERY_CHECK_CODE(code, lino, _end);
283,248✔
1489
        }
1490
        if (pExchangeInfo->notifyToSend) {
214,503,269✔
1491
          if (NULL == req.pOpParam) {
256,578✔
1492
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
256,578✔
1493
                                                     pDataInfo->srcOpType,
1494
                                                     pExchangeInfo->notifyTs);
1495
            QUERY_CHECK_CODE(code, lino, _end);
256,578✔
1496
          } else {
1497
            /**
1498
              Currently don't support use the same param for multiple times!
1499
            */
1500
            qError("%s, %s failed, currently don't support use the same param "
×
1501
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
1502
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
1503
            taosMemoryFree(pWrapper);
×
1504
            return pTaskInfo->code;
×
1505
          }
1506
          pExchangeInfo->notifyToSend = false;
256,578✔
1507
        }
1508
        break;
214,500,224✔
1509
      }
1510
    }
1511

1512
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamRtInfo, needStreamGrpInfo);
264,659,697✔
1513
    if (msgSize < 0) {
264,658,948✔
1514
      pTaskInfo->code = msgSize;
×
1515
      taosMemoryFree(pWrapper);
×
1516
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1517
      return pTaskInfo->code;
×
1518
    }
1519

1520
    void* msg = taosMemoryCalloc(1, msgSize);
264,658,948✔
1521
    if (NULL == msg) {
264,655,615✔
1522
      pTaskInfo->code = terrno;
×
1523
      taosMemoryFree(pWrapper);
×
1524
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1525
      return pTaskInfo->code;
×
1526
    }
1527

1528
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamRtInfo, needStreamGrpInfo);
264,655,615✔
1529
    if (msgSize < 0) {
264,659,393✔
1530
      pTaskInfo->code = msgSize;
×
1531
      taosMemoryFree(pWrapper);
×
1532
      taosMemoryFree(msg);
×
1533
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1534
      return pTaskInfo->code;
×
1535
    }
1536

1537
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
264,659,393✔
1538

1539
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
264,657,976✔
1540
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1541
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1542
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1543

1544
    // send the fetch remote task result reques
1545
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
264,662,243✔
1546
    if (NULL == pMsgSendInfo) {
264,659,060✔
1547
      taosMemoryFreeClear(msg);
×
1548
      taosMemoryFree(pWrapper);
×
1549
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1550
      pTaskInfo->code = terrno;
×
1551
      return pTaskInfo->code;
×
1552
    }
1553

1554
    pMsgSendInfo->param = pWrapper;
264,659,060✔
1555
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
264,663,086✔
1556
    pMsgSendInfo->msgInfo.pData = msg;
264,660,162✔
1557
    pMsgSendInfo->msgInfo.len = msgSize;
264,659,949✔
1558
    pMsgSendInfo->msgType = pSource->fetchMsgType;
264,662,478✔
1559
    pMsgSendInfo->fp = loadRemoteDataCallback;
264,660,924✔
1560
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
264,660,930✔
1561

1562
    int64_t transporterId = 0;
264,665,265✔
1563
    void* poolHandle = NULL;
264,662,929✔
1564
    pDataInfo->startTime = taosGetTimestampUs();
264,656,477✔
1565
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
264,660,476✔
1566
    QUERY_CHECK_CODE(code, lino, _end);
264,663,379✔
1567
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
264,663,379✔
1568
    *pRpcHandle = transporterId;
264,664,347✔
1569
  }
1570

1571
_end:
264,664,838✔
1572
  if (code != TSDB_CODE_SUCCESS) {
264,664,838✔
1573
    if (pWrapper) {
×
1574
      taosMemoryFree(pWrapper);
×
1575
    }
1576
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1577
  }
1578
  return code;
264,664,227✔
1579
}
1580

1581
/**
1582
  @brief record the data loading metrics of the exchange operator, including
1583
  the number of rows, the data length, and the elapsed time of current load operation.
1584
*/
1585
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows,
198,751,043✔
1586
                          int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator) {
1587
  pInfo->totalRows += numOfRows;
198,751,043✔
1588
  pInfo->totalSize += dataLen;
198,750,459✔
1589
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
198,751,346✔
1590
}
198,751,346✔
1591

1592
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan) {
588,816,798✔
1593
  int32_t      code = TSDB_CODE_SUCCESS;
588,816,798✔
1594
  int32_t      lino = 0;
588,816,798✔
1595
  SSDataBlock* pBlock = NULL;
588,816,798✔
1596
  if (isVstbScan) {
588,817,074✔
1597
    blockDataCleanup(pRes);
28,812,955✔
1598
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
28,812,955✔
1599
    QUERY_CHECK_CODE(code, lino, _end);
28,812,420✔
1600
  }
1601
  if (pColList == NULL) {  // data from other sources
588,816,539✔
1602
    blockDataCleanup(pRes);
582,701,827✔
1603
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
582,703,249✔
1604
    QUERY_CHECK_CODE(code, lino, _end);
582,702,621✔
1605
  } else {  // extract data according to pColList
1606
    char* pStart = pData;
6,114,712✔
1607

1608
    int32_t numOfCols = htonl(*(int32_t*)pStart);
6,114,712✔
1609
    pStart += sizeof(int32_t);
6,114,712✔
1610

1611
    // todo refactor:extract method
1612
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
6,114,712✔
1613
    for (int32_t i = 0; i < numOfCols; ++i) {
86,940,210✔
1614
      SSysTableSchema* p = (SSysTableSchema*)pStart;
80,825,498✔
1615

1616
      p->colId = htons(p->colId);
80,825,498✔
1617
      p->bytes = htonl(p->bytes);
80,825,498✔
1618
      pStart += sizeof(SSysTableSchema);
80,825,498✔
1619
    }
1620

1621
    pBlock = NULL;
6,114,712✔
1622
    code = createDataBlock(&pBlock);
6,114,712✔
1623
    QUERY_CHECK_CODE(code, lino, _end);
6,114,712✔
1624

1625
    for (int32_t i = 0; i < numOfCols; ++i) {
86,940,210✔
1626
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
80,825,498✔
1627
      code = blockDataAppendColInfo(pBlock, &idata);
80,825,498✔
1628
      QUERY_CHECK_CODE(code, lino, _end);
80,825,498✔
1629
    }
1630

1631
    code = blockDecodeInternal(pBlock, pStart, NULL);
6,114,712✔
1632
    QUERY_CHECK_CODE(code, lino, _end);
6,114,712✔
1633

1634
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
6,114,712✔
1635
    QUERY_CHECK_CODE(code, lino, _end);
6,114,712✔
1636

1637
    // data from mnode
1638
    pRes->info.dataLoad = 1;
6,114,712✔
1639
    pRes->info.rows = pBlock->info.rows;
6,114,712✔
1640
    pRes->info.scanFlag = pBlock->info.scanFlag;
6,114,712✔
1641
    pRes->info.id.groupId = pBlock->info.id.groupId;
6,114,712✔
1642
    pRes->info.id.baseGId = pBlock->info.id.baseGId;
6,114,712✔
1643
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
6,114,712✔
1644
    QUERY_CHECK_CODE(code, lino, _end);
6,114,712✔
1645

1646
    blockDataDestroy(pBlock);
6,114,712✔
1647
    pBlock = NULL;
6,114,712✔
1648
  }
1649

1650
_end:
588,817,333✔
1651
  if (code != TSDB_CODE_SUCCESS) {
588,816,446✔
1652
    blockDataDestroy(pBlock);
×
1653
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1654
  }
1655
  return code;
588,816,446✔
1656
}
1657

1658
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
102,960,739✔
1659
  SExchangeInfo* pExchangeInfo = pOperator->info;
102,960,739✔
1660
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
102,961,350✔
1661

1662
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
102,960,739✔
1663
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
102,961,934✔
1664
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
102,961,934✔
1665
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1666
         pLoadInfo->totalElapsed / 1000.0);
1667

1668
  setOperatorCompleted(pOperator);
102,961,934✔
1669
}
102,959,185✔
1670

1671
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
339,708,272✔
1672
  int32_t code = TSDB_CODE_SUCCESS;
339,708,272✔
1673
  int32_t lino = 0;
339,708,272✔
1674
  size_t  total = taosArrayGetSize(pArray);
339,708,272✔
1675

1676
  int32_t completed = 0;
339,709,757✔
1677
  for (int32_t k = 0; k < total; ++k) {
1,080,835,648✔
1678
    SSourceDataInfo* p = taosArrayGet(pArray, k);
741,124,784✔
1679
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
741,126,909✔
1680
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
741,126,909✔
1681
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
354,180,053✔
1682
      completed += 1;
354,180,053✔
1683
    }
1684
  }
1685

1686
  *pRes = completed;
339,710,864✔
1687
_end:
339,708,607✔
1688
  if (code != TSDB_CODE_SUCCESS) {
339,708,607✔
1689
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1690
  }
1691
  return code;
339,709,028✔
1692
}
1693

1694
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
120,493,712✔
1695
  SExchangeInfo* pExchangeInfo = pOperator->info;
120,493,712✔
1696
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
120,494,005✔
1697

1698
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
120,493,724✔
1699
  int64_t startTs = taosGetTimestampUs();
120,489,525✔
1700

1701
  // Asynchronously send all fetch requests to all sources.
1702
  for (int32_t i = 0; i < totalSources; ++i) {
317,121,652✔
1703
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
196,625,004✔
1704
    if (code != TSDB_CODE_SUCCESS) {
196,631,516✔
1705
      pTaskInfo->code = code;
×
1706
      return code;
×
1707
    }
1708
  }
1709

1710
  int64_t endTs = taosGetTimestampUs();
120,496,626✔
1711
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
120,496,626✔
1712
         totalSources, (endTs - startTs) / 1000.0);
1713

1714
  pOperator->status = OP_RES_TO_RETURN;
120,496,626✔
1715
  if (isTaskKilled(pTaskInfo)) {
120,496,199✔
UNCOV
1716
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1717
  }
1718

1719
  return TSDB_CODE_SUCCESS;
120,496,199✔
1720
}
1721

1722
/**
1723
  @brief store STEP DONE notification info
1724
*/
1725
void storeNotifyInfo(SOperatorInfo* pOperator) {
4,667,760✔
1726
  SExchangeInfo*  pExchangeInfo = pOperator->info;
4,667,760✔
1727
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4,667,760✔
1728
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,667,760✔
1729

1730
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
4,667,760✔
1731
  if (!pParam->multiParams) {
4,667,760✔
1732
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
4,667,760✔
1733
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
4,667,760✔
1734
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1735
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
1736
      return;
×
1737
    }
1738

1739
    pExchangeInfo->notifyToSend = true;
4,667,760✔
1740
    pExchangeInfo->notifyTs = pBasic->notifyTs;
4,667,760✔
1741
  } else {
1742
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1743
           GET_TASKID(pTaskInfo), __func__);
1744
  }
1745
}
1746

1747
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
192,636,607✔
1748
  int32_t            code = TSDB_CODE_SUCCESS;
192,636,607✔
1749
  int32_t            lino = 0;
192,636,607✔
1750
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
192,636,607✔
1751
  SSDataBlock*       pb = NULL;
192,636,622✔
1752

1753
  char* pNextStart = pRetrieveRsp->data;
192,637,218✔
1754
  char* pStart = pNextStart;
192,636,046✔
1755

1756
  int32_t index = 0;
192,636,607✔
1757

1758
  if (pRetrieveRsp->compressed) {  // decompress the data
192,636,607✔
1759
    if (pDataInfo->decompBuf == NULL) {
×
1760
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1761
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1762
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1763
    } else {
1764
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1765
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1766
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1767
        if (p != NULL) {
×
1768
          pDataInfo->decompBuf = p;
×
1769
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1770
        }
1771
      }
1772
    }
1773
  }
1774

1775
  while (index++ < pRetrieveRsp->numOfBlocks) {
775,338,664✔
1776
    pStart = pNextStart;
582,702,897✔
1777

1778
    if (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN) {
582,702,897✔
1779
      pb = taosMemoryCalloc(1, sizeof(SSDataBlock));
28,812,955✔
1780
      QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
28,812,955✔
1781
    } else if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
553,890,262✔
1782
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
241,075,193✔
1783
      blockDataCleanup(pb);
241,074,902✔
1784
    } else {
1785
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
312,814,933✔
1786
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
312,815,360✔
1787
    }
1788

1789
    int32_t compLen = *(int32_t*)pStart;
582,702,940✔
1790
    pStart += sizeof(int32_t);
582,702,940✔
1791

1792
    int32_t rawLen = *(int32_t*)pStart;
582,702,755✔
1793
    pStart += sizeof(int32_t);
582,702,755✔
1794
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
582,702,940✔
1795

1796
    pNextStart = pStart + compLen;
582,702,940✔
1797
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
582,702,940✔
1798
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1799
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1800
      pStart = pDataInfo->decompBuf;
×
1801
    }
1802

1803
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart, (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN));
582,702,605✔
1804
    if (code != 0) {
582,700,249✔
1805
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1806
      goto _end;
×
1807
    }
1808

1809
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
582,700,249✔
1810
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
582,700,860✔
1811
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
582,700,860✔
1812
    pb = NULL;
582,702,653✔
1813
  }
1814

1815
_end:
192,637,218✔
1816
  if (code != TSDB_CODE_SUCCESS) {
192,637,218✔
1817
    blockDataDestroy(pb);
×
1818
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1819
  }
1820
  return code;
192,637,218✔
1821
}
1822

1823
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
44,628,038✔
1824
  SExchangeInfo* pExchangeInfo = pOperator->info;
44,628,038✔
1825
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
44,628,038✔
1826

1827
  int32_t code = 0;
44,628,038✔
1828
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
44,628,038✔
1829
  int64_t startTs = taosGetTimestampUs();
44,628,038✔
1830

1831
  int32_t vgId = 0;
44,628,038✔
1832
  if (pExchangeInfo->dynTbname) {
44,628,038✔
1833
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
301,550✔
1834
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
301,550✔
1835
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
301,550✔
1836
      if (pValue != NULL && pValue->isTbname) {
301,550✔
1837
        vgId = pValue->vgId;
301,550✔
1838
        break;
301,550✔
1839
      }
1840
    }
1841
  }
1842

1843
  while (1) {
12,252,691✔
1844
    if (pExchangeInfo->current >= totalSources) {
56,880,729✔
1845
      setAllSourcesCompleted(pOperator);
12,136,915✔
1846
      return TSDB_CODE_SUCCESS;
12,136,915✔
1847
    }
1848

1849
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
44,743,814✔
1850
    if (!pSource) {
44,743,814✔
1851
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1852
      pTaskInfo->code = terrno;
×
1853
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1854
    }
1855

1856
    if (vgId != 0 && pSource->addr.nodeId != vgId){
44,743,814✔
1857
      pExchangeInfo->current += 1;
237,994✔
1858
      continue;
237,994✔
1859
    }
1860

1861
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
44,505,820✔
1862
    if (!pDataInfo) {
44,505,820✔
1863
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1864
      pTaskInfo->code = terrno;
×
1865
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1866
    }
1867
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
44,505,820✔
1868

1869
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
44,505,820✔
1870
    if (code != TSDB_CODE_SUCCESS) {
44,505,820✔
1871
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1872
      pTaskInfo->code = code;
×
1873
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1874
    }
1875

1876
    while (true) {
941✔
1877
      recordOpExecBeforeDownstream(pOperator);
44,506,761✔
1878
      code = exchangeWait(pOperator, pExchangeInfo);
44,506,761✔
1879
      recordOpExecAfterDownstream(pOperator, 0);
44,506,761✔
1880

1881
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
44,506,761✔
1882
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
941✔
1883
      }
1884

1885
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
44,505,820✔
1886
      if (pDataInfo->seqId != currSeqId) {
44,505,820✔
1887
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
941✔
1888
        taosMemoryFreeClear(pDataInfo->pRsp);
941✔
1889
        continue;
941✔
1890
      }
1891

1892
      break;
44,504,879✔
1893
    }
1894

1895
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
44,504,879✔
1896
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
1,409✔
1897
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1898
             tstrerror(pDataInfo->code));
1899
      pOperator->pTaskInfo->code = pDataInfo->code;
1,409✔
1900
      return pOperator->pTaskInfo->code;
1,409✔
1901
    }
1902

1903
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
44,503,470✔
1904
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
44,503,470✔
1905

1906
    if (pRsp->numOfRows == 0) {
44,503,470✔
1907
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
12,014,697✔
1908
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1909
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1910
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1911

1912
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
12,014,697✔
1913
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
12,014,697✔
1914
        pExchangeInfo->current = totalSources;
11,931,399✔
1915
      } else {
1916
        pExchangeInfo->current += 1;
83,298✔
1917
      }
1918
      taosMemoryFreeClear(pDataInfo->pRsp);
12,014,697✔
1919
      continue;
12,014,697✔
1920
    }
1921

1922
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
32,488,773✔
1923
    if (code != TSDB_CODE_SUCCESS) {
32,488,773✔
1924
      goto _error;
×
1925
    }
1926

1927
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
32,488,773✔
1928
    if (pRsp->completed == 1) {
32,488,773✔
1929
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
231,915✔
1930
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1931
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1932
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1933
             pExchangeInfo->current + 1, totalSources);
1934

1935
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
231,915✔
1936
      if (isVstbScan(pDataInfo)) {
231,915✔
1937
        pExchangeInfo->current = totalSources;
×
1938
      } else {
1939
        pExchangeInfo->current += 1;
231,915✔
1940
      }
1941
    } else {
1942
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
32,256,858✔
1943
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1944
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1945
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1946
    }
1947
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
32,488,773✔
1948
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
32,092,546✔
1949
    }
1950
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
32,488,315✔
1951
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
32,488,773✔
1952

1953
    taosMemoryFreeClear(pDataInfo->pRsp);
32,488,773✔
1954
    return TSDB_CODE_SUCCESS;
32,488,773✔
1955
  }
1956

1957
_error:
×
1958
  pTaskInfo->code = code;
×
1959
  return code;
×
1960
}
1961

1962
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
2,833,266✔
1963
  int32_t  code = TSDB_CODE_SUCCESS;
2,833,266✔
1964
  int32_t  lino = 0;
2,833,266✔
1965
  STagVal  dstTag;
2,833,266✔
1966
  bool     needFree = false;
2,833,266✔
1967

1968
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
2,833,266✔
1969
    qError("%s failed since invalid exchange operator param type %d",
×
1970
      __func__, pBasicParam->paramType);
1971
    return TSDB_CODE_INVALID_PARA;
×
1972
  }
1973

1974
  if (pDataInfo->tagList) {
2,833,266✔
1975
    taosArrayClear(pDataInfo->tagList);
×
1976
  }
1977

1978
  if (pBasicParam->tagList) {
2,833,266✔
1979
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
917,132✔
1980
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
917,132✔
1981

1982
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
5,681,520✔
1983
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
4,764,388✔
1984
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
4,764,388✔
1985

1986
      dstTag = (STagVal){0};
4,764,388✔
1987
      dstTag.type = pSrcTag->type;
4,764,388✔
1988
      dstTag.cid = pSrcTag->cid;
4,764,388✔
1989
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,764,388✔
1990
        dstTag.nData = pSrcTag->nData;
2,068,332✔
1991
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
2,068,332✔
1992
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
2,068,332✔
1993
        needFree = true;
2,068,332✔
1994
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
2,068,332✔
1995
      } else {
1996
        dstTag.i64 = pSrcTag->i64;
2,696,056✔
1997
      }
1998

1999
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
9,528,776✔
2000
      needFree = false;
4,764,388✔
2001
    }
2002
  } else {
2003
    pDataInfo->tagList = NULL;
1,916,134✔
2004
  }
2005

2006
  return code;
2,833,266✔
2007
_return:
×
2008
  if (needFree) {
×
2009
    taosMemoryFreeClear(dstTag.pData);
×
2010
  }
2011
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2012
  return code;
×
2013
}
2014

2015
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
2,833,266✔
2016
  int32_t     code = TSDB_CODE_SUCCESS;
2,833,266✔
2017
  int32_t     lino = 0;
2,833,266✔
2018
  SOrgTbInfo  dstOrgTbInfo = {0};
2,833,266✔
2019
  bool        needFree = false;
2,833,266✔
2020

2021
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
2,833,266✔
2022
    qError("%s failed since invalid exchange operator param type %d",
×
2023
      __func__, pBasicParam->paramType);
2024
    return TSDB_CODE_INVALID_PARA;
×
2025
  }
2026

2027
  if (pBasicParam->batchOrgTbInfo) {
2,833,266✔
2028
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
2,833,266✔
2029
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
2,833,266✔
2030

2031
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
8,204,514✔
2032
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
5,371,248✔
2033
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
5,371,248✔
2034

2035
      dstOrgTbInfo = (SOrgTbInfo){0};
5,371,248✔
2036
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
5,371,248✔
2037
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
5,371,248✔
2038

2039
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
5,371,248✔
2040
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
5,371,248✔
2041

2042
      needFree = true;
5,371,248✔
2043
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
10,742,496✔
2044
      needFree = false;
5,371,248✔
2045
    }
2046
  } else {
2047
    pBasicParam->batchOrgTbInfo = NULL;
×
2048
  }
2049

2050
  return code;
2,833,266✔
2051
_return:
×
2052
  if (needFree) {
×
2053
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
2054
  }
2055
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2056
  return code;
×
2057
}
2058

2059
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
49,100,386✔
2060
                                SExchangeOperatorBasicParam* pBasicParam) {
2061
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
49,100,386✔
2062
    qWarn("%s, %s found invalid exchange operator param type %d",
×
2063
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
2064
    return TSDB_CODE_SUCCESS;
×
2065
  }
2066

2067
  int32_t            code = TSDB_CODE_SUCCESS;
49,100,386✔
2068
  int32_t            lino = 0;
49,100,386✔
2069
  SExchangeInfo*     pExchangeInfo = pOperator->info;
49,100,386✔
2070
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
49,100,386✔
2071

2072
  if (NULL == pIdx) {
49,100,386✔
2073
    if (pBasicParam->isNewDeployed) {
706,583✔
2074
      SDownstreamSourceNode *pNode = NULL;
2,959✔
2075
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,959✔
2076
      QUERY_CHECK_CODE(code, lino, _return);
2,959✔
2077

2078
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,959✔
2079
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,959✔
2080
      QUERY_CHECK_CODE(code, lino, _return);
2,959✔
2081

2082
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,959✔
2083
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,959✔
2084

2085
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,959✔
2086
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,959✔
2087
      if (pExchangeInfo->pHashSources) {
2,959✔
2088
        QUERY_CHECK_CODE(code, lino, _return);
2,959✔
2089
      }
2090
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,959✔
2091
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,959✔
2092
    } else if (pBasicParam->type == EX_SRC_TYPE_VSTB_TS_SCAN || pBasicParam->type == EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN) {
703,624✔
2093
      // Multi-exchange virtual table paths build each exchange param from the full vg map.
2094
      // If this exchange does not own the current vg source, skip it and let the matching exchange consume it.
2095
      qDebug("addSingleExchangeSource found no existing source for vgId: %d, sourceType:%d, skip it",
703,624✔
2096
             pBasicParam->vgId, pBasicParam->type);
2097
      return TSDB_CODE_SUCCESS;
703,624✔
2098
    } else {
2099
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
2100
      return TSDB_CODE_INVALID_PARA;
×
2101
    }
2102
  }
2103

2104
  qDebug("start to add single exchange source");
48,396,762✔
2105

2106
  switch (pBasicParam->type) {
48,396,762✔
2107
    case EX_SRC_TYPE_VSTB_TS_SCAN:
2,833,266✔
2108
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
2109
    case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
2110
    case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN:
2111
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
2112
      if (pIdx->inUseIdx < 0) {
2,833,266✔
2113
        SSourceDataInfo dataInfo = {0};
1,712,746✔
2114
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,712,746✔
2115
        dataInfo.taskId = pExchangeInfo->pTaskId;
1,712,746✔
2116
        dataInfo.index = pIdx->srcIdx;
1,712,746✔
2117
        dataInfo.groupid = pBasicParam->groupid;
1,712,746✔
2118
        dataInfo.window = pBasicParam->window;
1,712,746✔
2119
        dataInfo.isNewParam = pBasicParam->isNewParam;
1,712,746✔
2120
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
1,712,746✔
2121
        QUERY_CHECK_CODE(code, lino, _return);
1,712,746✔
2122

2123
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
1,712,746✔
2124
        QUERY_CHECK_CODE(code, lino, _return);
1,712,746✔
2125

2126
        dataInfo.orgTbInfo = NULL;
1,712,746✔
2127

2128
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,712,746✔
2129
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
1,712,746✔
2130

2131
        dataInfo.type = pBasicParam->type;
1,712,746✔
2132
        dataInfo.srcOpType = pBasicParam->srcOpType;
1,712,746✔
2133
        dataInfo.tableSeq = pBasicParam->tableSeq;
1,712,746✔
2134

2135
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
3,425,492✔
2136

2137
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,712,746✔
2138
      } else {
2139
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
1,120,520✔
2140
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
1,120,520✔
2141

2142
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1,120,520✔
2143
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,120,520✔
2144
        }
2145

2146
        pDataInfo->taskId = pExchangeInfo->pTaskId;
1,120,520✔
2147
        pDataInfo->index = pIdx->srcIdx;
1,120,520✔
2148
        pDataInfo->window = pBasicParam->window;
1,120,520✔
2149
        pDataInfo->groupid = pBasicParam->groupid;
1,120,520✔
2150
        pDataInfo->isNewParam = pBasicParam->isNewParam;
1,120,520✔
2151

2152
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
1,120,520✔
2153
        QUERY_CHECK_CODE(code, lino, _return);
1,120,520✔
2154

2155
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
1,120,520✔
2156
        QUERY_CHECK_CODE(code, lino, _return);
1,120,520✔
2157

2158
        pDataInfo->orgTbInfo = NULL;
1,120,520✔
2159

2160
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,120,520✔
2161
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
1,120,520✔
2162

2163
        pDataInfo->type = pBasicParam->type;
1,120,520✔
2164
        pDataInfo->srcOpType = pBasicParam->srcOpType;
1,120,520✔
2165
        pDataInfo->tableSeq = pBasicParam->tableSeq;
1,120,520✔
2166
      }
2167
      break;
2,833,266✔
2168
    }
2169
    case EX_SRC_TYPE_VTB_WIN_SCAN:
4,299,100✔
2170
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2171
      SSourceDataInfo dataInfo = {0};
4,299,100✔
2172
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
4,299,100✔
2173
      dataInfo.taskId = pExchangeInfo->pTaskId;
4,299,100✔
2174
      dataInfo.index = pIdx->srcIdx;
4,299,100✔
2175
      dataInfo.window = pBasicParam->window;
4,299,100✔
2176
      dataInfo.groupid = 0;
4,299,100✔
2177
      dataInfo.orgTbInfo = NULL;
4,299,100✔
2178
      dataInfo.tagList = NULL;
4,299,100✔
2179

2180
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
4,299,100✔
2181
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
4,299,100✔
2182

2183
      dataInfo.isNewParam = false;
4,299,100✔
2184
      dataInfo.type = pBasicParam->type;
4,299,100✔
2185
      dataInfo.srcOpType = pBasicParam->srcOpType;
4,299,100✔
2186
      dataInfo.tableSeq = pBasicParam->tableSeq;
4,299,100✔
2187

2188
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
4,299,100✔
2189
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
8,598,200✔
2190
      break;
4,299,100✔
2191
    }
2192
    case EX_SRC_TYPE_VSTB_SCAN: {
40,981,148✔
2193
      SSourceDataInfo dataInfo = {0};
40,981,148✔
2194
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
40,981,148✔
2195
      dataInfo.taskId = pExchangeInfo->pTaskId;
40,981,148✔
2196
      dataInfo.index = pIdx->srcIdx;
40,981,148✔
2197
      dataInfo.window = pBasicParam->window;
40,981,148✔
2198
      dataInfo.groupid = 0;
40,981,148✔
2199
      dataInfo.isNewParam = pBasicParam->isNewParam;
40,981,148✔
2200
      dataInfo.tagList = NULL;
40,981,148✔
2201
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
40,981,148✔
2202
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
40,981,148✔
2203
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
40,981,148✔
2204
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
40,981,148✔
2205
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
40,981,148✔
2206
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
40,981,148✔
2207

2208
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
40,981,148✔
2209
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
40,981,148✔
2210

2211
      dataInfo.type = pBasicParam->type;
40,981,148✔
2212
      dataInfo.srcOpType = pBasicParam->srcOpType;
40,981,148✔
2213
      dataInfo.tableSeq = pBasicParam->tableSeq;
40,981,148✔
2214

2215
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
40,981,148✔
2216
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
81,962,296✔
2217
      break;
40,981,148✔
2218
    }
2219
    case EX_SRC_TYPE_STB_JOIN_SCAN:
283,248✔
2220
    default: {
2221
      if (pIdx->inUseIdx < 0) {
283,248✔
2222
        SSourceDataInfo dataInfo = {0};
280,596✔
2223
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
280,596✔
2224
        dataInfo.taskId = pExchangeInfo->pTaskId;
280,596✔
2225
        dataInfo.index = pIdx->srcIdx;
280,596✔
2226
        dataInfo.groupid = 0;
280,596✔
2227
        dataInfo.tagList = NULL;
280,596✔
2228

2229
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
280,596✔
2230
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
280,596✔
2231

2232
        dataInfo.isNewParam = false;
280,596✔
2233
        dataInfo.type = pBasicParam->type;
280,596✔
2234
        dataInfo.srcOpType = pBasicParam->srcOpType;
280,596✔
2235
        dataInfo.tableSeq = pBasicParam->tableSeq;
280,596✔
2236

2237
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
561,192✔
2238

2239
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
280,596✔
2240
      } else {
2241
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,652✔
2242
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,652✔
2243
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,652✔
2244
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,652✔
2245
        }
2246

2247
        pDataInfo->tagList = NULL;
2,652✔
2248
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,652✔
2249
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,652✔
2250

2251
        pDataInfo->groupid = 0;
2,652✔
2252
        pDataInfo->isNewParam = false;
2,652✔
2253
        pDataInfo->type = pBasicParam->type;
2,652✔
2254
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,652✔
2255
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,652✔
2256
      }
2257
      break;
283,248✔
2258
    }
2259
  }
2260

2261
  return code;
48,396,762✔
2262
_return:
×
2263
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2264
  return code;
×
2265
}
2266

2267
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
47,760,515✔
2268
  SExchangeInfo*               pExchangeInfo = pOperator->info;
47,760,515✔
2269
  int32_t                      code = TSDB_CODE_SUCCESS;
47,760,515✔
2270
  SExchangeOperatorBasicParam* pBasicParam = NULL;
47,760,515✔
2271
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
47,760,515✔
2272
  if (pParam->multiParams) {
47,760,515✔
2273
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
2,474,963✔
2274
    int32_t                      iter = 0;
2,474,963✔
2275
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
6,289,797✔
2276
      code = addSingleExchangeSource(pOperator, pBasicParam);
3,814,834✔
2277
      if (code) {
3,814,834✔
2278
        return code;
×
2279
      }
2280
    }
2281
  } else {
2282
    pBasicParam = &pParam->basic;
45,285,552✔
2283
    code = addSingleExchangeSource(pOperator, pBasicParam);
45,285,552✔
2284
  }
2285

2286
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
47,760,515✔
2287
  pOperator->pOperatorGetParam = NULL;
47,760,515✔
2288

2289
  return code;
47,760,515✔
2290
}
2291

2292
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
751,780,600✔
2293
  SExchangeInfo* pExchangeInfo = pOperator->info;
751,780,600✔
2294
  int32_t        code = TSDB_CODE_SUCCESS;
751,783,648✔
2295
  int32_t        lino = 0;
751,783,648✔
2296
  
2297
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
751,783,648✔
2298
       NULL == pOperator->pOperatorGetParam) ||
575,207,281✔
2299
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
181,244,145✔
2300
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
578,880,335✔
2301
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2302
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2303
    return TSDB_CODE_SUCCESS;
578,879,767✔
2304
  }
2305

2306
  if (pExchangeInfo->dynamicOp) {
172,901,259✔
2307
    code = addDynamicExchangeSource(pOperator);
47,760,515✔
2308
    QUERY_CHECK_CODE(code, lino, _end);
47,760,515✔
2309
  }
2310

2311
  if (pOperator->status == OP_NOT_OPENED &&
172,900,838✔
2312
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
164,412,209✔
2313
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
131,194,209✔
2314
    pExchangeInfo->current = 0;
50,005,975✔
2315
  }
2316

2317
  if (NULL != pOperator->pOperatorGetParam) {
172,897,965✔
2318
    SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,667,760✔
2319
    storeNotifyInfo(pOperator);
4,667,760✔
2320

2321
    if (!pGetParam->reUse) {
4,667,760✔
2322
      freeOperatorParam(pGetParam, OP_GET_PARAM);
×
2323
    } else {
2324
      /**
2325
        The param is referenced by getParam, and it will be freed by
2326
        the parent operator after getting next block.
2327
      */
2328
      pGetParam->reUse = false;
4,667,760✔
2329
    }
2330
    pOperator->pOperatorGetParam = NULL;
4,667,760✔
2331
  }
2332

2333
  int64_t st = taosGetTimestampUs();
172,901,336✔
2334

2335
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
172,901,336✔
2336
    code = prepareConcurrentlyLoad(pOperator);
120,492,887✔
2337
    QUERY_CHECK_CODE(code, lino, _end);
120,496,626✔
2338
    pExchangeInfo->openedTs = taosGetTimestampUs();
120,496,199✔
2339
  }
2340

2341
  OPTR_SET_OPENED(pOperator);
172,902,439✔
2342
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
172,902,628✔
2343

2344
_end:
67,446,467✔
2345
  if (code != TSDB_CODE_SUCCESS) {
172,903,325✔
2346
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2347
    pOperator->pTaskInfo->code = code;
×
2348
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2349
  }
2350
  return code;
172,903,325✔
2351
}
2352

2353
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
3,908,935✔
2354
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,908,935✔
2355

2356
  if (pLimitInfo->remainGroupOffset > 0) {
3,908,935✔
2357
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2358
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2359
      blockDataCleanup(pBlock);
×
2360
      return PROJECT_RETRIEVE_CONTINUE;
×
2361
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2362
      // now it is the data from a new group
2363
      pLimitInfo->remainGroupOffset -= 1;
×
2364

2365
      // ignore data block in current group
2366
      if (pLimitInfo->remainGroupOffset > 0) {
×
2367
        blockDataCleanup(pBlock);
×
2368
        return PROJECT_RETRIEVE_CONTINUE;
×
2369
      }
2370
    }
2371

2372
    // set current group id of the project operator
2373
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2374
  }
2375

2376
  // here check for a new group data, we need to handle the data of the previous group.
2377
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
3,908,935✔
2378
    pLimitInfo->numOfOutputGroups += 1;
211,837✔
2379
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
211,837✔
2380
      pOperator->status = OP_EXEC_DONE;
×
2381
      blockDataCleanup(pBlock);
×
2382

2383
      return PROJECT_RETRIEVE_DONE;
×
2384
    }
2385

2386
    // reset the value for a new group data
2387
    resetLimitInfoForNextGroup(pLimitInfo);
211,837✔
2388
    // existing rows that belongs to previous group.
2389
    if (pBlock->info.rows > 0) {
211,837✔
2390
      return PROJECT_RETRIEVE_DONE;
211,837✔
2391
    }
2392
  }
2393

2394
  // here we reach the start position, according to the limit/offset requirements.
2395

2396
  // set current group id
2397
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,697,098✔
2398

2399
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,697,098✔
2400
  if (pBlock->info.rows == 0) {
3,697,098✔
2401
    return PROJECT_RETRIEVE_CONTINUE;
1,869,659✔
2402
  } else {
2403
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,827,439✔
2404
      setOperatorCompleted(pOperator);
×
2405
      return PROJECT_RETRIEVE_DONE;
×
2406
    }
2407
  }
2408

2409
  // todo optimize performance
2410
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2411
  // they may not belong to the same group the limit/offset value is not valid in this case.
2412
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,827,439✔
2413
    return PROJECT_RETRIEVE_DONE;
1,827,439✔
2414
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2415
    return PROJECT_RETRIEVE_CONTINUE;
×
2416
  }
2417
}
2418

2419
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
263,286,693✔
2420
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
263,286,693✔
2421
  int32_t        code = TSDB_CODE_SUCCESS;
263,287,390✔
2422
  if (pTask->pWorkerCb) {
263,287,390✔
2423
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
263,287,974✔
2424
    if (code != TSDB_CODE_SUCCESS) {
263,287,974✔
2425
      pTask->code = code;
×
2426
      return pTask->code;
×
2427
    }
2428
  }
2429

2430
  code = tsem_wait(&pExchangeInfo->ready);
263,286,969✔
2431
  if (code != TSDB_CODE_SUCCESS) {
263,285,744✔
2432
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2433
    pTask->code = code;
×
2434
    return pTask->code;
×
2435
  }
2436

2437
  if (pTask->pWorkerCb) {
263,285,744✔
2438
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
263,287,099✔
2439
    if (code != TSDB_CODE_SUCCESS) {
263,287,989✔
2440
      pTask->code = code;
×
2441
      return pTask->code;
×
2442
    }
2443
  }
2444
  return TSDB_CODE_SUCCESS;
263,287,974✔
2445
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc