• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.97
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
  uint64_t            fetchTimes;   // per-source fetch count
58
  int64_t             fetchCostUs;  // per-source total RPC round-trip (us)
59
} SSourceDataInfo;
60

61
static void destroyExchangeOperatorInfo(void* param);
62
static void freeBlock(void* pParam);
63
static void freeSourceDataInfo(void* param);
64
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
65

66
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
67
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
68
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
69
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
70
static void    storeNotifyInfo(SOperatorInfo* pOperator);
71
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
72
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
73
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
74
                                 bool holdDataInBuf);
75
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
76

77
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
78

79
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
33,314,507✔
80
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
81
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
82
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
18,680,846✔
83
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
84

85

86
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
15,806,055✔
87
                                             SExchangeInfo* pExchangeInfo,
88
                                             SExecTaskInfo* pTaskInfo) {
89
  int32_t code = 0;
15,806,055✔
90
  int32_t lino = 0;
15,806,055✔
91
  int64_t startTs = taosGetTimestampUs();  
15,806,532✔
92
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
15,806,532✔
93
  int32_t completed = 0;
15,806,293✔
94
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
15,806,293✔
95
  if (code != TSDB_CODE_SUCCESS) {
15,806,293✔
96
    pTaskInfo->code = code;
×
97
    T_LONG_JMP(pTaskInfo->env, code);
×
98
  }
99
  if (completed == totalSources) {
15,806,293✔
100
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
2,496,760✔
101
    setAllSourcesCompleted(pOperator);
2,496,760✔
102
    return;
2,668,446✔
103
  }
104

105
  SSourceDataInfo* pDataInfo = NULL;
13,309,533✔
106
  SStreamRuntimeFuncInfo* pStream = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
13,309,533✔
107

108
  while (1) {
5,657,011✔
109
    if (pExchangeInfo->current < 0) {
18,966,305✔
110
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
140,376✔
111
      setAllSourcesCompleted(pOperator);
140,376✔
112
      return;
140,376✔
113
    }
114
    
115
    if (pExchangeInfo->current >= totalSources) {
18,825,969✔
116
      completed = 0;
9,560,485✔
117
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
9,560,485✔
118
      if (code != TSDB_CODE_SUCCESS) {
9,560,724✔
119
        pTaskInfo->code = code;
×
120
        T_LONG_JMP(pTaskInfo->env, code);
×
121
      }
122
      if (completed == totalSources) {
9,560,724✔
123
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
4,091,410✔
124
        setAllSourcesCompleted(pOperator);
4,091,410✔
125
        return;
4,091,410✔
126
      }
127
      
128
      pExchangeInfo->current = 0;
5,469,314✔
129
    }
130

131
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
14,734,798✔
132

133
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
14,735,036✔
134
    if (!pDataInfo) {
14,735,036✔
135
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
136
      pTaskInfo->code = terrno;
×
137
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
138
    }
139

140
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
14,735,036✔
141
      pExchangeInfo->current++;
638,590✔
142
      continue;
638,590✔
143
    }
144

145
    if (!IS_STREAM_SINGLE_GRP(pTaskInfo) && pStream->pGroupReadInfos) {
14,096,446✔
NEW
146
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
×
NEW
147
      if (!pDataInfo) {
×
NEW
148
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
NEW
149
        pTaskInfo->code = terrno;
×
NEW
150
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
151
      }
152

NEW
153
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH) {
×
NEW
154
        SArray** ppNode = tSimpleHashGet(pStream->pGroupReadInfos, &pSource->addr.nodeId, sizeof(pSource->addr.nodeId));
×
NEW
155
        if (NULL == ppNode) {
×
NEW
156
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
×
NEW
157
          pExchangeInfo->current++;
×
NEW
158
          continue;
×
159
        }
160

NEW
161
        pStream->curNodeId = pSource->addr.nodeId;
×
NEW
162
        pStream->curGrpRead = *ppNode;
×
163
      }
164
    }
165

166
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
14,096,446✔
167

168
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
14,096,446✔
169
    if (code != TSDB_CODE_SUCCESS) {
14,096,685✔
170
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
171
      pTaskInfo->code = code;
×
172
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
173
    }
174

175
    while (true) {
961✔
176
      recordOpExecBeforeDownstream(pOperator);
14,097,646✔
177
      code = exchangeWait(pOperator, pExchangeInfo);
14,097,646✔
178
      recordOpExecAfterDownstream(pOperator, 0);
14,097,616✔
179

180
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
14,097,383✔
181
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
758✔
182
      }
183

184
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
14,096,625✔
185
      if (pDataInfo->seqId != currSeqId) {
14,096,362✔
186
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
961✔
187
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
188
        taosMemoryFreeClear(pDataInfo->pRsp);
961✔
189
        continue;
961✔
190
      }
191

192
      break;
14,095,135✔
193
    }
194

195
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
14,095,135✔
196
    if (!pSource) {
14,095,411✔
197
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
198
      pTaskInfo->code = terrno;
×
199
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
200
    }
201

202
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
14,095,411✔
203
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
239✔
204
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
205
             tstrerror(pDataInfo->code));
206
      pTaskInfo->code = pDataInfo->code;
239✔
207
      T_LONG_JMP(pTaskInfo->env, code);
239✔
208
    }
209

210
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
14,095,172✔
211
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
14,095,172✔
212

213
    if (pRsp->numOfRows == 0) {
14,095,172✔
214
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
5,018,699✔
215
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
216
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
217
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
218

219
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
5,018,699✔
220
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
5,018,699✔
221
        pExchangeInfo->current = -1;
140,376✔
222
      } else {
223
        pExchangeInfo->current += 1;
4,878,323✔
224
      }
225
      taosMemoryFreeClear(pDataInfo->pRsp);
5,018,699✔
226
      continue;
5,018,421✔
227
    }
228

229
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
9,076,463✔
230
    TAOS_CHECK_EXIT(code);
9,076,726✔
231

232
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
9,076,726✔
233
    if (pRsp->completed == 1) {
9,076,726✔
234
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
2,669,893✔
235
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
236
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
237
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
238
             pExchangeInfo->current + 1, totalSources);
239

240
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,669,863✔
241
      if (isVstbScan(pDataInfo)) {
2,669,863✔
242
        pExchangeInfo->current = -1;
×
243
        taosMemoryFreeClear(pDataInfo->pRsp);
×
244
        continue;
×
245
      }
246
    } else {
247
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
6,406,833✔
248
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
249
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
250
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
251
    }
252

253
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
9,076,726✔
254
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
9,076,726✔
255

256
    pExchangeInfo->current++;
9,076,726✔
257

258
    taosMemoryFreeClear(pDataInfo->pRsp);
9,076,726✔
259
    return;
9,076,726✔
260
  }
261

262
_exit:
×
263

264
  if (code) {
×
265
    pTaskInfo->code = code;
×
266
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
267
  }
268
}
269

270

271
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
241,200,133✔
272
                                           SExecTaskInfo* pTaskInfo) {
273
  int32_t code = 0;
241,200,133✔
274
  int32_t lino = 0;
241,200,133✔
275
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
241,200,133✔
276
  int32_t completed = 0;
241,199,883✔
277
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
241,199,224✔
278
  if (code != TSDB_CODE_SUCCESS) {
241,200,518✔
279
    pTaskInfo->code = code;
×
280
    T_LONG_JMP(pTaskInfo->env, code);
×
281
  }
282
  if (completed == totalSources) {
241,200,518✔
283
    setAllSourcesCompleted(pOperator);
76,573,077✔
284
    return;
76,574,267✔
285
  }
286

287
  SSourceDataInfo* pDataInfo = NULL;
164,627,441✔
288

289
  while (1) {
26,370,830✔
290
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
190,998,271✔
291
    recordOpExecBeforeDownstream(pOperator);
190,998,271✔
292
    code = exchangeWait(pOperator, pExchangeInfo);
190,998,271✔
293
    recordOpExecAfterDownstream(pOperator, 0);
190,998,765✔
294

295
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
190,999,308✔
296
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,137✔
297
    }
298

299
    for (int32_t i = 0; i < totalSources; ++i) {
349,309,726✔
300
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
349,309,183✔
301
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
349,309,183✔
302
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
349,309,183✔
303
        continue;
121,408,403✔
304
      }
305

306
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
227,901,323✔
307
        continue;
36,903,695✔
308
      }
309

310
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
190,997,628✔
311
      if (pDataInfo->seqId != currSeqId) {
190,997,628✔
312
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
313
        taosMemoryFreeClear(pDataInfo->pRsp);
×
314
        break;
×
315
      }
316

317
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
190,996,542✔
318
        code = pDataInfo->code;
3,324✔
319
        TAOS_CHECK_EXIT(code);
3,324✔
320
      }
321

322
      tmemory_barrier();
190,993,761✔
323
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
190,993,761✔
324
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
190,992,675✔
325
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
190,993,231✔
326

327
      // todo
328
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
190,993,231✔
329
      if (pRsp->numOfRows == 0) {
190,993,774✔
330
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
49,732,939✔
331
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
332
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
333
          if (code != TSDB_CODE_SUCCESS) {
×
334
            taosMemoryFreeClear(pDataInfo->pRsp);
×
335
            TAOS_CHECK_EXIT(code);
×
336
          }
337
        } else {
338
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
49,732,939✔
339
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
49,732,939✔
340
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
341
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
342
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
343
          taosMemoryFreeClear(pDataInfo->pRsp);
49,732,939✔
344
        }
345
        break;
49,732,939✔
346
      }
347

348
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
141,261,365✔
349

350
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
141,261,365✔
351
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
141,260,805✔
352
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
141,261,365✔
353

354
      if (pRsp->completed == 1) {
141,261,365✔
355
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
127,562,138✔
356
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
127,562,138✔
357
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
358
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
359
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
360
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
361
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
362
      } else {
363
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
13,699,227✔
364
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
365
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
366
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
367
      }
368

369
      taosMemoryFreeClear(pDataInfo->pRsp);
141,261,365✔
370

371
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
141,261,365✔
372
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
13,699,227✔
373
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
13,699,227✔
374
        if (code != TSDB_CODE_SUCCESS) {
13,699,227✔
375
          taosMemoryFreeClear(pDataInfo->pRsp);
×
376
          TAOS_CHECK_EXIT(code);
×
377
        }
378
      }
379
      
380
      return;
141,260,525✔
381
    }  // end loop
382

383
    int32_t complete1 = 0;
49,733,482✔
384
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
49,732,939✔
385
    if (code != TSDB_CODE_SUCCESS) {
49,732,939✔
386
      pTaskInfo->code = code;
×
387
      T_LONG_JMP(pTaskInfo->env, code);
×
388
    }
389
    if (complete1 == totalSources) {
49,732,939✔
390
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
23,362,109✔
391
      return;
23,362,109✔
392
    }
393
  }
394

395
_exit:
3,324✔
396

397
  if (code) {
3,324✔
398
    pTaskInfo->code = code;
3,324✔
399
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
3,324✔
400
  }
401
}
402

403
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
666,773,680✔
404
  int32_t        code = TSDB_CODE_SUCCESS;
666,773,680✔
405
  SExchangeInfo* pExchangeInfo = pOperator->info;
666,773,680✔
406
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
666,774,186✔
407

408
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
666,773,680✔
409

410
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
666,773,015✔
411
  if (pOperator->status == OP_EXEC_DONE) {
666,773,400✔
412
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
413
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
414
           pLoadInfo->totalElapsed / 1000.0);
415
    return NULL;
×
416
  }
417

418
  // we have buffered retrieved datablock, return it directly
419
  SSDataBlock* p = NULL;
666,773,650✔
420
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
666,773,272✔
421
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
366,045,816✔
422
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
366,045,816✔
423
  }
424

425
  if (p != NULL) {
666,772,393✔
426
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
366,045,816✔
427
    if (!tmp) {
366,045,816✔
428
      code = terrno;
×
429
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
430
      pTaskInfo->code = code;
×
431
      T_LONG_JMP(pTaskInfo->env, code);
×
432
    }
433
    return p;
366,045,816✔
434
  } else {
435
    if (pExchangeInfo->seqLoadData && (IS_NON_STREAM_MODE(pTaskInfo) || IS_STREAM_SINGLE_GRP(pTaskInfo))) {
300,726,577✔
436
      code = seqLoadRemoteData(pOperator);
43,721,053✔
437
      if (code != TSDB_CODE_SUCCESS) {
43,719,823✔
438
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1,299✔
439
        pTaskInfo->code = code;
1,299✔
440
        T_LONG_JMP(pTaskInfo->env, code);
1,299✔
441
      }
442
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
257,006,280✔
443
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
15,806,054✔
444
    } else {
445
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
241,200,518✔
446
    }
447
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
300,723,671✔
448
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
3,354✔
449
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
3,354✔
450
    }
451
    
452
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
300,719,939✔
453
      qDebug("empty resultBlockList");
118,495,390✔
454
      return NULL;
118,495,390✔
455
    } else {
456
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
182,223,989✔
457
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
182,224,549✔
458
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
182,224,927✔
459
      if (!tmp) {
182,224,897✔
460
        code = terrno;
×
461
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
462
        pTaskInfo->code = code;
×
463
        T_LONG_JMP(pTaskInfo->env, code);
×
464
      }
465

466
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
182,224,897✔
467
      return p;
182,224,927✔
468
    }
469
  }
470
}
471

472
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
664,782,409✔
473
  int32_t        code = TSDB_CODE_SUCCESS;
664,782,409✔
474
  int32_t        lino = 0;
664,782,409✔
475
  SExchangeInfo* pExchangeInfo = pOperator->info;
664,782,409✔
476
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
664,783,191✔
477

478
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
664,783,964✔
479

480
  code = pOperator->fpSet._openFn(pOperator);
664,785,897✔
481
  QUERY_CHECK_CODE(code, lino, _end);
664,784,944✔
482

483
  if (pOperator->status == OP_EXEC_DONE) {
664,784,944✔
484
    (*ppRes) = NULL;
148,234✔
485
    return code;
148,234✔
486
  }
487

488
  while (1) {
2,136,560✔
489
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
666,774,425✔
490
    if (pBlock == NULL) {
666,766,133✔
491
      (*ppRes) = NULL;
118,495,390✔
492
      return code;
118,495,390✔
493
    }
494

495
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
548,270,743✔
496
    QUERY_CHECK_CODE(code, lino, _end);
548,270,743✔
497

498
    if (blockDataGetNumOfRows(pBlock) == 0) {
548,270,743✔
499
      qDebug("rows 0 block got, continue next load");
3,390✔
500
      continue;
3,390✔
501
    }
502

503
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
548,266,793✔
504
    if (hasLimitOffsetInfo(pLimitInfo)) {
548,267,353✔
505
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
4,230,910✔
506
      if (status == PROJECT_RETRIEVE_CONTINUE) {
4,230,910✔
507
        qDebug("limit retrieve continue");
2,133,170✔
508
        continue;
2,133,170✔
509
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,097,740✔
510
        if (pBlock->info.rows == 0) {
2,097,740✔
511
          setOperatorCompleted(pOperator);
×
512
          (*ppRes) = NULL;
×
513
          return code;
×
514
        } else {
515
          (*ppRes) = pBlock;
2,097,740✔
516
          return code;
2,097,740✔
517
        }
518
      }
519
    } else {
520
      (*ppRes) = pBlock;
544,036,065✔
521
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
544,036,065✔
522
      return code;
544,035,270✔
523
    }
524
  }
525

526
_end:
×
527

528
  if (code != TSDB_CODE_SUCCESS) {
×
529
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
530
    pTaskInfo->code = code;
×
531
    T_LONG_JMP(pTaskInfo->env, code);
×
532
  } else {
533
    qDebug("empty block returned in exchange");
×
534
  }
535
  
536
  (*ppRes) = NULL;
×
537
  return code;
×
538
}
539

540
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
112,930,245✔
541
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
112,930,245✔
542
  if (pInfo->pSourceDataInfo == NULL) {
112,928,920✔
543
    return terrno;
×
544
  }
545

546
  if (pInfo->dynamicOp) {
112,928,672✔
547
    return TSDB_CODE_SUCCESS;
6,830,477✔
548
  }
549

550
  int32_t len = strlen(id) + 1;
106,097,810✔
551
  pInfo->pTaskId = taosMemoryCalloc(1, len);
106,097,810✔
552
  if (!pInfo->pTaskId) {
106,097,182✔
553
    return terrno;
×
554
  }
555
  tstrncpy(pInfo->pTaskId, id, len);
106,098,251✔
556
  for (int32_t i = 0; i < numOfSources; ++i) {
281,836,059✔
557
    SSourceDataInfo dataInfo = {0};
175,735,491✔
558
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
175,733,518✔
559
    dataInfo.taskId = pInfo->pTaskId;
175,733,518✔
560
    dataInfo.index = i;
175,734,335✔
561
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
175,734,335✔
562
    if (pDs == NULL) {
175,734,418✔
563
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
564
      return terrno;
×
565
    }
566
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
175,734,418✔
567
  }
568

569
  return TSDB_CODE_SUCCESS;
106,100,568✔
570
}
571

572
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
112,929,919✔
573
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
112,929,919✔
574

575
  if (numOfSources == 0) {
112,928,633✔
576
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
577
    return TSDB_CODE_INVALID_PARA;
×
578
  }
579
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
112,928,633✔
580
  if (!pInfo->pFetchRpcHandles) {
112,930,645✔
581
    return terrno;
×
582
  }
583
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
112,929,385✔
584
  if (!ret) {
112,929,715✔
585
    return terrno;
×
586
  }
587

588
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
112,929,715✔
589
  if (pInfo->pSources == NULL) {
112,930,136✔
590
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
591
    return terrno;
×
592
  }
593

594
  if (pExNode->node.dynamicOp) {
112,930,667✔
595
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,830,477✔
596
    if (NULL == pInfo->pHashSources) {
6,830,477✔
597
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
598
      return terrno;
×
599
    }
600
  }
601

602
  for (int32_t i = 0; i < numOfSources; ++i) {
307,742,067✔
603
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
194,813,252✔
604
    if (!pNode) {
194,810,513✔
605
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
606
      return terrno;
×
607
    }
608
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
194,810,513✔
609
    if (!tmp) {
194,813,630✔
610
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
611
      return terrno;
×
612
    }
613
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
194,813,630✔
614
    int32_t           code =
615
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
194,814,161✔
616
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
194,811,213✔
617
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
618
      return code;
×
619
    }
620
  }
621

622
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
112,928,815✔
623
  int64_t refId = taosAddRef(fetchObjRefPool, pInfo);
112,930,667✔
624
  if (refId < 0) {
112,928,294✔
625
    int32_t code = terrno;
×
626
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
627
    return code;
×
628
  } else {
629
    pInfo->self = refId;
112,928,294✔
630
  }
631

632
  return initDataSource(numOfSources, pInfo, id);
112,928,294✔
633
}
634

635
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
11,633,041✔
636
  SExchangeInfo* pInfo = pOper->info;
11,633,041✔
637
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
11,634,739✔
638

639
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
11,634,687✔
640

641
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
11,636,672✔
642
  pOper->status = OP_NOT_OPENED;
11,635,639✔
643
  pInfo->current = 0;
11,635,639✔
644
  pInfo->loadInfo.totalElapsed = 0;
11,635,639✔
645
  pInfo->loadInfo.totalRows = 0;
11,635,639✔
646
  pInfo->loadInfo.totalSize = 0;
11,635,639✔
647
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
33,599,705✔
648
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
21,964,036✔
649
    taosWLockLatch(&pDataInfo->lock);
21,964,036✔
650
    taosMemoryFreeClear(pDataInfo->decompBuf);
21,964,304✔
651
    taosMemoryFreeClear(pDataInfo->pRsp);
21,964,304✔
652

653
    pDataInfo->totalRows = 0;
21,964,304✔
654
    pDataInfo->code = 0;
21,964,304✔
655
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
21,964,304✔
656
    pDataInfo->fetchSent = false;
21,964,304✔
657
    pDataInfo->fetchTimes = 0;
21,964,304✔
658
    pDataInfo->fetchCostUs = 0;
21,964,304✔
659
    taosWUnLockLatch(&pDataInfo->lock);
21,964,066✔
660
  }
661

662
  if (pInfo->dynamicOp) {
11,635,639✔
663
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
2,246,338✔
664
  } 
665

666
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
11,635,639✔
667
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
11,635,639✔
668

669
  blockDataCleanup(pInfo->pDummyBlock);
11,635,639✔
670

671
  void   *data = NULL;
11,635,639✔
672
  int32_t iter = 0;
11,635,639✔
673
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
16,375,269✔
674
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
4,739,630✔
675
  }
676
  
677
  pInfo->limitInfo = (SLimitInfo){0};
11,635,639✔
678
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
11,635,639✔
679

680
  return 0;
11,635,639✔
681
}
682

683
static int32_t exchangeGetExplainExecInfo(SOperatorInfo* pOptr,
1,687,917✔
684
                                          void** pOptrExplain, uint32_t* len) {
685
  const SExchangeInfo* pExchangeInfo = pOptr->info;
1,687,917✔
686
  int32_t numSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
1,687,917✔
687

688
  SExchangeExplainInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeExplainInfo));
1,687,917✔
689
  if (!pInfo) {
1,687,917✔
690
    return terrno;
×
691
  }
692

693
  pInfo->mode = pExchangeInfo->seqLoadData ? 1 : 0;
1,687,917✔
694
  pInfo->numSources = numSources;
1,687,917✔
695

696
  /* all sources are exhausted, thus no need to lock the sources data info */
697
  for (int32_t i = 0; i < numSources; ++i) {
3,694,045✔
698
    const SSourceDataInfo* pSrc = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2,006,128✔
699
    pInfo->avgFetchTimes += (double)pSrc->fetchTimes / numSources;
2,006,128✔
700
    pInfo->avgFetchRows += (double)pSrc->totalRows / numSources;
2,006,128✔
701
    pInfo->avgFetchCost += (double)pSrc->fetchCostUs / numSources;
2,006,128✔
702
    pInfo->maxFetchTimes = TMAX(pInfo->maxFetchTimes, pSrc->fetchTimes);
2,006,128✔
703
    pInfo->maxFetchRows = TMAX(pInfo->maxFetchRows, pSrc->totalRows);
2,006,128✔
704
    pInfo->maxFetchCost = TMAX(pInfo->maxFetchCost, pSrc->fetchCostUs);
2,006,128✔
705
  }
706

707
  *pOptrExplain = pInfo;
1,687,917✔
708
  *len = sizeof(SExchangeExplainInfo);
1,687,917✔
709
  return TSDB_CODE_SUCCESS;
1,687,917✔
710
}
711

712
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
112,929,741✔
713
                                   SOperatorInfo** pOptrInfo) {
714
  QRY_PARAM_CHECK(pOptrInfo);
112,929,741✔
715

716
  int32_t        code = 0;
112,930,245✔
717
  int32_t        lino = 0;
112,930,245✔
718
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
112,930,245✔
719
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
112,927,610✔
720
  if (pInfo == NULL || pOperator == NULL) {
112,927,524✔
UNCOV
721
    code = terrno;
×
722
    goto _error;
×
723
  }
724
  initOperatorCostInfo(pOperator);
112,928,055✔
725

726
  pInfo->isExchange = true;
112,929,919✔
727
  pOperator->pPhyNode = pExNode;
112,931,045✔
728
  pInfo->dynamicOp = pExNode->node.dynamicOp;
112,930,541✔
729
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
112,929,919✔
730
  QUERY_CHECK_CODE(code, lino, _error);
112,930,660✔
731

732
  code = tsem_init(&pInfo->ready, 0, 0);
112,930,660✔
733
  QUERY_CHECK_CODE(code, lino, _error);
112,930,667✔
734

735
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
112,930,667✔
736
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
112,930,502✔
737

738
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
112,930,502✔
739
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
112,930,136✔
740
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
112,929,571✔
741
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
112,930,921✔
742

743
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
112,930,921✔
744
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
112,930,143✔
745
  QUERY_CHECK_CODE(code, lino, _error);
112,930,502✔
746

747
  pInfo->seqLoadData = pExNode->seqRecvData;
112,930,502✔
748
  pInfo->dynTbname = pExNode->dynTbname;
112,930,502✔
749
  if (pInfo->dynTbname) {
112,929,702✔
750
    pInfo->seqLoadData = true;
34,140✔
751
  }
752
  pInfo->pTransporter = pTransporter;
112,929,702✔
753

754
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
112,928,015✔
755
                  pTaskInfo);
756
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
112,929,722✔
757

758
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
112,930,638✔
759
                            pTaskInfo->pStreamRuntimeInfo);
112,928,801✔
760
  QUERY_CHECK_CODE(code, lino, _error);
112,930,124✔
761
  qTrace("%s exchange op:%p", __func__, pOperator);
112,930,124✔
762
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL,
112,930,524✔
763
                                         destroyExchangeOperatorInfo, optrDefaultBufFn,
764
                                         exchangeGetExplainExecInfo, optrDefaultGetNextExtFn, NULL);
765
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
112,930,667✔
766
  *pOptrInfo = pOperator;
112,929,971✔
767
  return TSDB_CODE_SUCCESS;
112,929,971✔
768

769
_error:
×
770
  if (code != TSDB_CODE_SUCCESS) {
×
771
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
772
    pTaskInfo->code = code;
×
773
  }
774
  if (pInfo != NULL) {
×
775
    doDestroyExchangeOperatorInfo(pInfo);
×
776
  }
777

778
  if (pOperator != NULL) {
×
779
    pOperator->info = NULL;
×
780
    destroyOperator(pOperator);
×
781
  }
782
  return code;
×
783
}
784

785
void destroyExchangeOperatorInfo(void* param) {
112,931,045✔
786
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
112,931,045✔
787
  int32_t        code = taosRemoveRef(fetchObjRefPool, pExInfo->self);
112,931,045✔
788
  if (code != TSDB_CODE_SUCCESS) {
112,929,984✔
789
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
790
  }
791
}
112,929,984✔
792

793
void freeBlock(void* pParam) {
321,792,542✔
794
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
321,792,542✔
795
  blockDataDestroy(pBlock);
321,792,542✔
796
}
321,791,773✔
797

798
void freeSourceDataInfo(void* p) {
180,191,746✔
799
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
180,191,746✔
800
  taosMemoryFreeClear(pInfo->decompBuf);
180,191,746✔
801
  taosMemoryFreeClear(pInfo->pRsp);
180,191,746✔
802

803
  pInfo->decompBufSize = 0;
180,191,746✔
804
}
180,191,746✔
805

806
void doDestroyExchangeOperatorInfo(void* param) {
112,931,045✔
807
  if (param == NULL) {
112,931,045✔
808
    return;
×
809
  }
810
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
112,931,045✔
811
  if (pExInfo->pFetchRpcHandles) {
112,931,045✔
812
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
307,741,785✔
813
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
194,813,919✔
814
      if (*pRpcHandle > 0) {
194,813,919✔
815
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
10,044,521✔
816
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
10,044,521✔
817
      }
818
    }
819
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
112,929,378✔
820
  }
821

822
  taosArrayDestroy(pExInfo->pSources);
112,932,055✔
823
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
112,929,254✔
824

825
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
112,930,289✔
826
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
112,930,514✔
827

828
  blockDataDestroy(pExInfo->pDummyBlock);
112,930,667✔
829
  tSimpleHashCleanup(pExInfo->pHashSources);
112,931,045✔
830

831
  int32_t code = tsem_destroy(&pExInfo->ready);
112,929,759✔
832
  if (code != TSDB_CODE_SUCCESS) {
112,930,515✔
833
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
834
  }
835
  taosMemoryFreeClear(pExInfo->pTaskId);
112,930,515✔
836

837
  taosMemoryFreeClear(param);
112,930,858✔
838
}
839

840
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
249,543,026✔
841
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
249,543,026✔
842

843
  taosMemoryFreeClear(pMsg->pEpSet);
249,543,026✔
844
  SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pWrapper->exchangeId);
249,580,924✔
845
  if (pExchangeInfo == NULL) {
249,580,613✔
846
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
19,328✔
847
    taosMemoryFree(pMsg->pData);
19,328✔
848
    return TSDB_CODE_SUCCESS;
19,328✔
849
  }
850

851
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
249,561,285✔
852
  if (pWrapper->seqId != currSeqId) {
249,571,315✔
853
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
854
    taosMemoryFree(pMsg->pData);
×
855
    code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
×
856
    if (code != TSDB_CODE_SUCCESS) {
×
857
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
858
    }
859
    return TSDB_CODE_SUCCESS;
×
860
  }
861

862
  int32_t          index = pWrapper->sourceIndex;
249,566,438✔
863

864
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
249,555,179✔
865

866
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
249,563,432✔
867
  if (pRpcHandle != NULL) {
249,565,790✔
868
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
249,569,881✔
869
    if (ret != 0) {
249,538,842✔
870
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
12,203,754✔
871
    }
872
    *pRpcHandle = -1;
249,538,842✔
873
  }
874

875
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
249,506,612✔
876
  if (!pSourceDataInfo) {
249,564,406✔
877
    return terrno;
×
878
  }
879

880
  if (0 == code && NULL == pMsg->pData) {
249,564,406✔
881
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
882
    code = TSDB_CODE_QRY_INVALID_MSG;
×
883
  }
884

885
  taosWLockLatch(&pSourceDataInfo->lock);
249,561,613✔
886
  if (code == TSDB_CODE_SUCCESS) {
249,583,411✔
887
    pSourceDataInfo->fetchCostUs += taosGetTimestampUs() - pSourceDataInfo->startTime;
249,555,908✔
888
    pSourceDataInfo->fetchTimes++;
249,534,202✔
889

890
    pSourceDataInfo->seqId = pWrapper->seqId;
249,508,801✔
891
    pSourceDataInfo->pRsp = pMsg->pData;
249,527,526✔
892

893
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
249,501,364✔
894
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
249,548,103✔
895
    pRsp->compLen = htonl(pRsp->compLen);
249,534,100✔
896
    pRsp->payloadLen = htonl(pRsp->payloadLen);
249,530,329✔
897
    pRsp->numOfCols = htonl(pRsp->numOfCols);
249,482,734✔
898
    pRsp->useconds = htobe64(pRsp->useconds);
249,505,087✔
899
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
249,442,489✔
900

901
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
249,478,952✔
902
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
903
  } else {
904
    taosMemoryFree(pMsg->pData);
8,173✔
905
    pSourceDataInfo->code = rpcCvtErrCode(code);
8,173✔
906
    if (pSourceDataInfo->code != code) {
8,173✔
907
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
908
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
909
    } else {
910
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
8,173✔
911
             pExchangeInfo);
912
    }
913
  }
914

915
  tmemory_barrier();
249,502,977✔
916
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
249,502,977✔
917
  taosWUnLockLatch(&pSourceDataInfo->lock);
249,530,175✔
918
  
919
  code = tsem_post(&pExchangeInfo->ready);
249,532,107✔
920
  if (code != TSDB_CODE_SUCCESS) {
249,551,600✔
921
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
922
    return code;
×
923
  }
924

925
  code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
249,551,600✔
926
  if (code != TSDB_CODE_SUCCESS) {
249,587,886✔
927
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
928
  }
929
  return code;
249,579,083✔
930
}
931

932
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
272,535✔
933
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
272,535✔
934
  if (NULL == *ppRes) {
272,535✔
935
    return terrno;
×
936
  }
937

938
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
272,535✔
939
  if (NULL == pScan) {
272,535✔
940
    taosMemoryFreeClear(*ppRes);
×
941
    return terrno;
×
942
  }
943

944
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
272,535✔
945
  pScan->pUidList = taosArrayDup(pUidList, NULL);
272,535✔
946
  if (NULL == pScan->pUidList) {
272,535✔
947
    taosMemoryFree(pScan);
×
948
    taosMemoryFreeClear(*ppRes);
×
949
    return terrno;
×
950
  }
951
  pScan->dynType = DYN_TYPE_STB_JOIN;
272,535✔
952
  pScan->tableSeq = tableSeq;
272,535✔
953
  pScan->pOrgTbInfo = NULL;
272,535✔
954
  pScan->pBatchTbInfo = NULL;
272,535✔
955
  pScan->pTagList = NULL;
272,535✔
956
  pScan->isNewParam = false;
272,535✔
957
  pScan->window.skey = INT64_MAX;
272,535✔
958
  pScan->window.ekey = INT64_MIN;
272,535✔
959
  pScan->notifyToProcess = false;
272,535✔
960
  pScan->notifyTs = 0;
272,535✔
961

962
  (*ppRes)->opType = srcOpType;
272,535✔
963
  (*ppRes)->downstreamIdx = 0;
272,535✔
964
  (*ppRes)->value = pScan;
272,535✔
965
  (*ppRes)->pChildren = NULL;
272,535✔
966
  (*ppRes)->reUse = false;
272,535✔
967

968
  return TSDB_CODE_SUCCESS;
272,535✔
969
}
970

971
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
41,021,560✔
972
  int32_t                  code = TSDB_CODE_SUCCESS;
41,021,560✔
973
  int32_t                  lino = 0;
41,021,560✔
974
  STableScanOperatorParam* pScan = NULL;
41,021,560✔
975

976
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
41,021,560✔
977
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
41,021,560✔
978

979
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
41,021,560✔
980
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
41,021,560✔
981

982
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
41,021,560✔
983
  if (pUidList) {
41,021,560✔
984
    pScan->pUidList = taosArrayDup(pUidList, NULL);
41,021,560✔
985
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
41,021,560✔
986
  } else {
987
    pScan->pUidList = NULL;
×
988
  }
989

990
  if (pMap) {
41,021,560✔
991
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
40,268,302✔
992
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
40,268,302✔
993

994
    pScan->pOrgTbInfo->vgId = pMap->vgId;
40,268,302✔
995
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
40,268,302✔
996

997
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
40,268,302✔
998
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
40,268,302✔
999
  } else {
1000
    pScan->pOrgTbInfo = NULL;
753,258✔
1001
  }
1002
  pScan->pTagList = NULL;
41,021,560✔
1003
  pScan->pBatchTbInfo = NULL;
41,021,560✔
1004

1005

1006
  pScan->dynType = type;
41,021,560✔
1007
  pScan->tableSeq = tableSeq;
41,021,560✔
1008
  pScan->window.skey = window->skey;
41,021,560✔
1009
  pScan->window.ekey = window->ekey;
41,021,560✔
1010
  pScan->isNewParam = isNewParam;
41,021,560✔
1011
  pScan->notifyToProcess = false;
41,021,560✔
1012
  pScan->notifyTs = 0;
41,021,560✔
1013
  (*ppRes)->opType = srcOpType;
41,021,560✔
1014
  (*ppRes)->downstreamIdx = 0;
41,021,560✔
1015
  (*ppRes)->value = pScan;
41,021,560✔
1016
  (*ppRes)->pChildren = NULL;
41,021,560✔
1017
  (*ppRes)->reUse = false;
41,021,560✔
1018

1019
  return code;
41,021,560✔
1020
_return:
×
1021
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1022
  taosMemoryFreeClear(*ppRes);
×
1023
  if (pScan) {
×
1024
    taosArrayDestroy(pScan->pUidList);
×
1025
    if (pScan->pOrgTbInfo) {
×
1026
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
1027
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
1028
    }
1029
    taosMemoryFree(pScan);
×
1030
  }
1031
  return code;
×
1032
}
1033

1034
/**
1035
  @brief build the table scan operator param for notify message
1036
*/
1037
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
226,476✔
1038
                                          int32_t srcOpType, TSKEY notifyTs) {
1039
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
226,476✔
1040
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
1041
    return TSDB_CODE_INVALID_PARA;
×
1042
  }
1043
  int32_t code = TSDB_CODE_SUCCESS;
226,476✔
1044
  int32_t lino = 0;
226,476✔
1045
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
226,476✔
1046
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
226,476✔
1047

1048
  STableScanOperatorParam* pTsParam =
452,952✔
1049
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
226,476✔
1050
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
226,476✔
1051

1052
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
226,476✔
1053
  pTsParam->notifyToProcess = true;
226,476✔
1054
  pTsParam->notifyTs = notifyTs;
226,476✔
1055

1056
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
226,476✔
1057
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1058
  (*ppRes)->downstreamIdx = 0;
226,476✔
1059
  (*ppRes)->value = pTsParam;
226,476✔
1060
  (*ppRes)->pChildren = NULL;
226,476✔
1061
  /* param is not reusable when it is transferred by message */
1062
  (*ppRes)->reUse = false;
226,476✔
1063

1064
_return:
226,476✔
1065
  if (TSDB_CODE_SUCCESS != code) {
226,476✔
1066
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1067
           __func__, lino, tstrerror(code));
1068
    taosMemoryFreeClear(*ppRes);
×
1069
    if (pTsParam) {
×
1070
      taosMemoryFree(pTsParam);
×
1071
    }
1072
  }
1073
  return code;
226,476✔
1074
}
1075

1076
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
3,025,749✔
1077
  int32_t                  code = TSDB_CODE_SUCCESS;
3,025,749✔
1078
  int32_t                  lino = 0;
3,025,749✔
1079
  STableScanOperatorParam* pScan = NULL;
3,025,749✔
1080

1081
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
3,025,749✔
1082
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,025,749✔
1083

1084
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
3,025,749✔
1085
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,025,749✔
1086

1087
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
3,025,749✔
1088
  pScan->groupid = groupid;
3,025,749✔
1089
  if (pUidList) {
3,025,749✔
1090
    pScan->pUidList = taosArrayDup(pUidList, NULL);
3,025,749✔
1091
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
3,025,749✔
1092
  } else {
1093
    pScan->pUidList = NULL;
×
1094
  }
1095
  pScan->pOrgTbInfo = NULL;
3,025,749✔
1096

1097
  if (pBatchMap) {
3,025,749✔
1098
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
3,025,749✔
1099
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
3,025,749✔
1100
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
8,898,951✔
1101
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
5,873,202✔
1102
      SOrgTbInfo batchInfo = {0};
5,873,202✔
1103
      batchInfo.vgId = pSrcInfo->vgId;
5,873,202✔
1104
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
5,873,202✔
1105
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
5,873,202✔
1106
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
5,873,202✔
1107
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
5,873,202✔
1108
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
5,873,202✔
1109
    }
1110
  } else {
1111
    pScan->pBatchTbInfo = NULL;
×
1112
  }
1113

1114
  if (pTagList) {
3,025,749✔
1115
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
843,062✔
1116
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
843,062✔
1117

1118
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
5,221,032✔
1119
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
4,377,970✔
1120
      STagVal  dstTag;
4,377,970✔
1121
      dstTag.type = pSrcTag->type;
4,377,970✔
1122
      dstTag.cid = pSrcTag->cid;
4,377,970✔
1123
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,377,970✔
1124
        dstTag.nData = pSrcTag->nData;
1,900,410✔
1125
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
1,900,410✔
1126
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
1,900,410✔
1127
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
1,900,410✔
1128
      } else {
1129
        dstTag.i64 = pSrcTag->i64;
2,477,560✔
1130
      }
1131

1132
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
8,755,940✔
1133
    }
1134
  } else {
1135
    pScan->pTagList = NULL;
2,182,687✔
1136
  }
1137

1138

1139
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
3,025,749✔
1140
  pScan->tableSeq = tableSeq;
3,025,749✔
1141
  pScan->window.skey = window->skey;
3,025,749✔
1142
  pScan->window.ekey = window->ekey;
3,025,749✔
1143
  pScan->isNewParam = isNewParam;
3,025,749✔
1144
  pScan->notifyToProcess = false;
3,025,749✔
1145
  pScan->notifyTs = 0;
3,025,749✔
1146
  (*ppRes)->opType = srcOpType;
3,025,749✔
1147
  (*ppRes)->downstreamIdx = 0;
3,025,749✔
1148
  (*ppRes)->value = pScan;
3,025,749✔
1149
  (*ppRes)->pChildren = NULL;
3,025,749✔
1150
  (*ppRes)->reUse = false;
3,025,749✔
1151

1152
  return code;
3,025,749✔
1153
_return:
×
1154
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1155
  taosMemoryFreeClear(*ppRes);
×
1156
  if (pScan) {
×
1157
    taosArrayDestroy(pScan->pUidList);
×
1158
    if (pScan->pBatchTbInfo) {
×
1159
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1160
    }
1161
    taosMemoryFree(pScan);
×
1162
  }
1163
  return code;
×
1164
}
1165

1166
/*
1167
 * Build hash-agg operator get-param for dynamic virtual-table aggregation.
1168
 *
1169
 * @param ppRes Output operator param.
1170
 * @param groupid Group id bound to this batch.
1171
 * @param pUidList Source table uid list.
1172
 * @param pBatchMap Batch table metadata map.
1173
 * @param pTagList Optional tag value list.
1174
 * @param tableSeq Whether to keep table-sequential mode.
1175
 * @param window Time window for downstream scan.
1176
 * @param isNewParam Whether downstream should treat this as a new param batch.
1177
 *
1178
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1179
 */
1180
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
1,680,909✔
1181
                              SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1182
  int32_t                  code = TSDB_CODE_SUCCESS;
1,680,909✔
1183
  int32_t                  lino = 0;
1,680,909✔
1184
  SOperatorParam*          pParam = NULL;
1,680,909✔
1185

1186
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
1,680,909✔
1187
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
1,680,909✔
1188

1189
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
1,680,909✔
1190
  pParam->downstreamIdx = 0;
1,680,909✔
1191
  pParam->value = NULL;
1,680,909✔
1192
  pParam->pChildren = NULL;
1,680,909✔
1193
  pParam->reUse = false;
1,680,909✔
1194

1195
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
1,680,909✔
1196
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
1,680,909✔
1197

1198
  SOperatorParam* pTableScanParam = NULL;
1,680,909✔
1199
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
1,680,909✔
1200
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1201
  QUERY_CHECK_CODE(code, lino, _return);
1,680,909✔
1202

1203
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
3,361,818✔
1204

1205
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
1,680,909✔
1206
  pParam->downstreamIdx = 0;
1,680,909✔
1207
  pParam->value = NULL;
1,680,909✔
1208
  pParam->reUse = false;
1,680,909✔
1209

1210
  *ppRes = pParam;
1,680,909✔
1211
  return code;
1,680,909✔
1212

1213
_return:
×
1214
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1215
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1216
  return code;
×
1217
}
1218

1219
/*
1220
 * Build hash-interval operator get-param for dynamic virtual-table interval query.
1221
 *
1222
 * @param ppRes Output operator param.
1223
 * @param groupid Group id bound to this batch.
1224
 * @param pUidList Source table uid list.
1225
 * @param pBatchMap Batch table metadata map.
1226
 * @param pTagList Optional tag value list.
1227
 * @param tableSeq Whether to keep table-sequential mode.
1228
 * @param window Time window for downstream scan.
1229
 * @param isNewParam Whether downstream should treat this as a new param batch.
1230
 *
1231
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1232
 */
1233
static int32_t buildIntervalOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
105,176✔
1234
                                          SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1235
  int32_t         code = TSDB_CODE_SUCCESS;
105,176✔
1236
  int32_t         lino = 0;
105,176✔
1237
  SOperatorParam* pParam = NULL;
105,176✔
1238

1239
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
105,176✔
1240
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
105,176✔
1241

1242
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
105,176✔
1243
  pParam->downstreamIdx = 0;
105,176✔
1244
  pParam->value = NULL;
105,176✔
1245
  pParam->reUse = false;
105,176✔
1246
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
105,176✔
1247
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
105,176✔
1248

1249
  SOperatorParam* pTableScanParam = NULL;
105,176✔
1250
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
105,176✔
1251
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1252
  QUERY_CHECK_CODE(code, lino, _return);
105,176✔
1253

1254
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
210,352✔
1255

1256
  *ppRes = pParam;
105,176✔
1257
  return code;
105,176✔
1258

1259
_return:
×
1260
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1261
  qError("%s failed at %d, failed to build interval scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1262
  return code;
×
1263
}
1264

1265
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
3,135,278✔
1266
  int32_t                  code = TSDB_CODE_SUCCESS;
3,135,278✔
1267
  int32_t                  lino = 0;
3,135,278✔
1268
  STagScanOperatorParam*   pScan = NULL;
3,135,278✔
1269

1270
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
3,135,278✔
1271
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,135,278✔
1272

1273
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
3,135,278✔
1274
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,135,278✔
1275
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
3,135,278✔
1276

1277
  (*ppRes)->opType = srcOpType;
3,135,278✔
1278
  (*ppRes)->downstreamIdx = 0;
3,135,278✔
1279
  (*ppRes)->value = pScan;
3,135,278✔
1280
  (*ppRes)->pChildren = NULL;
3,135,278✔
1281
  (*ppRes)->reUse = false;
3,135,278✔
1282

1283
  return code;
3,135,278✔
1284
_return:
×
1285
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1286
  taosMemoryFreeClear(*ppRes);
×
1287
  if (pScan) {
×
1288
    taosMemoryFree(pScan);
×
1289
  }
1290
  return code;
×
1291
}
1292

1293
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,583,943✔
1294
  if (!pRuntimeInfo || !pTimeRange) {
4,583,943✔
1295
    return TSDB_CODE_INTERNAL_ERROR;
1✔
1296
  }
1297

1298
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,583,942✔
1299
  if (!pParam) {
4,583,703✔
1300
    return TSDB_CODE_INTERNAL_ERROR;
×
1301
  }
1302

1303
  switch (pRuntimeInfo->triggerType) {
4,583,703✔
1304
    case STREAM_TRIGGER_SLIDING:
3,390,954✔
1305
      // Unable to distinguish whether there is an interval, all use wstart/wend
1306
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1307
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,390,954✔
1308
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,390,954✔
1309
      break;
3,390,954✔
1310
    case STREAM_TRIGGER_PERIOD:
274,073✔
1311
      pTimeRange->skey = pParam->prevLocalTime;
274,073✔
1312
      pTimeRange->ekey = pParam->triggerTime;
274,073✔
1313
      break;
274,073✔
1314
    default:
918,915✔
1315
      pTimeRange->skey = pParam->wstart;
918,915✔
1316
      pTimeRange->ekey = pParam->wend;
919,155✔
1317
      break;
918,916✔
1318
  }
1319

1320
  return TSDB_CODE_SUCCESS;
4,583,943✔
1321
}
1322

1323
void clearVtbScanDataInfo(void* pItem) {
53,636,127✔
1324
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
53,636,127✔
1325
  if (pInfo->orgTbInfo) {
53,636,127✔
1326
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
40,268,302✔
1327
    taosMemoryFreeClear(pInfo->orgTbInfo);
40,268,302✔
1328
  }
1329
  if (pInfo->batchOrgTbInfo) {
53,636,127✔
1330
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
8,898,951✔
1331
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
5,873,202✔
1332
      if (pColMap) {
5,873,202✔
1333
        taosArrayDestroy(pColMap->colMap);
5,873,202✔
1334
      }
1335
    }
1336
    taosArrayDestroy(pInfo->batchOrgTbInfo);
3,025,749✔
1337
    pInfo->batchOrgTbInfo = NULL;
3,025,749✔
1338
  }
1339
  if (pInfo->tagList) {
53,636,127✔
1340
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
843,062✔
1341
    pInfo->tagList = NULL;
843,062✔
1342
  }
1343
  if (pInfo->pSrcUidList) {
53,636,127✔
1344
    taosArrayDestroy(pInfo->pSrcUidList);
43,294,051✔
1345
    pInfo->pSrcUidList = NULL;
43,294,051✔
1346
  }
1347
}
53,636,127✔
1348

1349
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
256,578,250✔
1350
  int32_t          code = TSDB_CODE_SUCCESS;
256,578,250✔
1351
  int32_t          lino = 0;
256,578,250✔
1352
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
256,578,250✔
1353
  QUERY_CHECK_NULL(pDataInfo, code, lino, _end, terrno);
256,575,350✔
1354

1355
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
256,575,350✔
1356
    return TSDB_CODE_SUCCESS;
6,829,613✔
1357
  }
1358

1359
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
249,749,988✔
1360
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
249,750,612✔
1361
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
249,748,172✔
1362

1363
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
249,748,172✔
1364

1365
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
249,747,396✔
1366
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
249,745,465✔
1367
  pWrapper->exchangeId = pExchangeInfo->self;
249,745,465✔
1368
  pWrapper->sourceIndex = sourceIndex;
249,747,911✔
1369
  pWrapper->seqId = pExchangeInfo->seqId;
249,749,390✔
1370

1371
  if (pSource->localExec) {
249,744,433✔
1372
    SDataBuf pBuf = {0};
×
1373
    code =
1374
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1375
                                  pTaskInfo->id.queryId, pSource->clientId,
1376
                                  pSource->taskId, 0, pSource->execId,
1377
                                  &pBuf.pData,
1378
                                  pTaskInfo->localFetch.explainRes);
1379
    QUERY_CHECK_CODE(code, lino, _end);
×
1380
    pDataInfo->startTime = taosGetTimestampUs();
×
1381
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1382
    taosMemoryFreeClear(pWrapper);
×
1383
    QUERY_CHECK_CODE(code, lino, _end);
×
1384
  } else {
1385
    bool needStreamRtInfo = true;
249,749,591✔
1386
    bool needStreamGrpInfo = false;
249,749,591✔
1387
    SResFetchReq req = {0};
249,749,591✔
1388
    req.header.vgId = pSource->addr.nodeId;
249,749,592✔
1389
    req.sId = pSource->sId;
249,747,175✔
1390
    req.clientId = pSource->clientId;
249,745,846✔
1391
    req.taskId = pSource->taskId;
249,747,695✔
1392
    req.queryId = pTaskInfo->id.queryId;
249,746,622✔
1393
    req.execId = pSource->execId;
249,743,731✔
1394
    if (pTaskInfo->pStreamRuntimeInfo) {
249,746,380✔
1395
      req.dynTbname = pExchangeInfo->dynTbname;
14,536,077✔
1396
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
14,536,077✔
1397
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
14,535,064✔
1398

1399
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
14,536,077✔
1400
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
1,677,875✔
1401
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
12,857,074✔
1402
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,584,182✔
1403
        QUERY_CHECK_CODE(code, lino, _end);
4,583,943✔
1404
        needStreamRtInfo = false;
4,583,943✔
1405
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,583,943✔
1406
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1407
               req.pStRtFuncInfo->curWindow.ekey);
1408
      } else {
1409
        needStreamGrpInfo = true;
8,274,259✔
1410
      }
1411
      
1412
      if (!pDataInfo->fetchSent) {
14,536,330✔
1413
        req.reset = pDataInfo->fetchSent = true;
8,145,600✔
1414
      }
1415
    }
1416

1417
    switch (pDataInfo->type) {
249,743,777✔
1418
      case EX_SRC_TYPE_VSTB_SCAN: {
40,268,302✔
1419
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
40,268,302✔
1420
        clearVtbScanDataInfo(pDataInfo);
40,268,302✔
1421
        QUERY_CHECK_CODE(code, lino, _end);
40,268,302✔
1422
        break;
40,268,302✔
1423
      }
1424
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
1,596,631✔
1425
        if (pDataInfo->pSrcUidList) {
1,596,631✔
1426
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
753,258✔
1427
          taosArrayDestroy(pDataInfo->pSrcUidList);
753,258✔
1428
          pDataInfo->pSrcUidList = NULL;
753,258✔
1429
          QUERY_CHECK_CODE(code, lino, _end);
753,258✔
1430
        }
1431
        break;
1,596,631✔
1432
      }
1433
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
3,135,278✔
1434
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
3,135,278✔
1435
        taosArrayDestroy(pDataInfo->pSrcUidList);
3,135,278✔
1436
        pDataInfo->pSrcUidList = NULL;
3,135,278✔
1437
        QUERY_CHECK_CODE(code, lino, _end);
3,135,278✔
1438
        break;
3,135,278✔
1439
      }
1440
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
2,792,045✔
1441
      case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
1442
      case EX_SRC_TYPE_VSTB_TS_SCAN: {
1443
        if (pDataInfo->batchOrgTbInfo) {
2,792,045✔
1444
          int32_t srcOpType =
1,239,664✔
1445
              (pDataInfo->type == EX_SRC_TYPE_VSTB_TS_SCAN)
1,239,664✔
1446
                  ? QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
1447
                  : QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1,239,664✔
1448
          code = buildTableScanOperatorParamBatchInfo(
2,479,328✔
1449
              &req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, srcOpType,
1450
              pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window,
1,239,664✔
1451
              pDataInfo->isNewParam);
1,239,664✔
1452
          clearVtbScanDataInfo(pDataInfo);
1,239,664✔
1453
          QUERY_CHECK_CODE(code, lino, _end);
1,239,664✔
1454
        }
1455
        break;
2,792,045✔
1456
      }
1457
      case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN: {
105,176✔
1458
        if (pDataInfo->batchOrgTbInfo) {
105,176✔
1459
          code = buildIntervalOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
210,352✔
1460
                                            pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq,
105,176✔
1461
                                            &pDataInfo->window, pDataInfo->isNewParam);
105,176✔
1462
          clearVtbScanDataInfo(pDataInfo);
105,176✔
1463
          QUERY_CHECK_CODE(code, lino, _end);
105,176✔
1464
        }
1465
        break;
105,176✔
1466
      }
1467
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1,680,909✔
1468
        if (pDataInfo->batchOrgTbInfo) {
1,680,909✔
1469
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
3,361,818✔
1470
                                       pDataInfo->batchOrgTbInfo, pDataInfo->tagList,
1471
                                       pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
3,361,818✔
1472
          clearVtbScanDataInfo(pDataInfo);
1,680,909✔
1473
          QUERY_CHECK_CODE(code, lino, _end);
1,680,909✔
1474
        }
1475
        break;
1,680,909✔
1476
      }
1477
      case EX_SRC_TYPE_STB_JOIN_SCAN:
200,163,456✔
1478
      default: {
1479
        if (pDataInfo->pSrcUidList) {
200,163,456✔
1480
          code = buildTableScanOperatorParam(&req.pOpParam,
260,094✔
1481
                                             pDataInfo->pSrcUidList,
1482
                                             pDataInfo->srcOpType,
1483
                                             pDataInfo->tableSeq);
260,094✔
1484
          /* source uid list can be reused in vnode size, so only use once */
1485
          taosArrayDestroy(pDataInfo->pSrcUidList);
260,094✔
1486
          pDataInfo->pSrcUidList = NULL;
260,094✔
1487
          QUERY_CHECK_CODE(code, lino, _end);
260,094✔
1488
        }
1489
        if (pExchangeInfo->notifyToSend) {
200,171,273✔
1490
          if (NULL == req.pOpParam) {
226,476✔
1491
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
226,476✔
1492
                                                     pDataInfo->srcOpType,
1493
                                                     pExchangeInfo->notifyTs);
1494
            QUERY_CHECK_CODE(code, lino, _end);
226,476✔
1495
          } else {
1496
            /**
1497
              Currently don't support use the same param for multiple times!
1498
            */
1499
            qError("%s, %s failed, currently don't support use the same param "
×
1500
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
1501
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
1502
            taosMemoryFree(pWrapper);
×
1503
            return pTaskInfo->code;
×
1504
          }
1505
          pExchangeInfo->notifyToSend = false;
226,476✔
1506
        }
1507
        break;
200,170,043✔
1508
      }
1509
    }
1510

1511
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamRtInfo, needStreamGrpInfo);
249,748,384✔
1512
    if (msgSize < 0) {
249,741,958✔
1513
      pTaskInfo->code = msgSize;
×
1514
      taosMemoryFree(pWrapper);
×
1515
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1516
      return pTaskInfo->code;
×
1517
    }
1518

1519
    void* msg = taosMemoryCalloc(1, msgSize);
249,741,958✔
1520
    if (NULL == msg) {
249,740,670✔
1521
      pTaskInfo->code = terrno;
×
1522
      taosMemoryFree(pWrapper);
×
1523
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1524
      return pTaskInfo->code;
×
1525
    }
1526

1527
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamRtInfo, needStreamGrpInfo);
249,740,670✔
1528
    if (msgSize < 0) {
249,744,113✔
1529
      pTaskInfo->code = msgSize;
×
1530
      taosMemoryFree(pWrapper);
×
1531
      taosMemoryFree(msg);
×
1532
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1533
      return pTaskInfo->code;
×
1534
    }
1535

1536
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
249,744,113✔
1537

1538
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
249,743,628✔
1539
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1540
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1541
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1542

1543
    // send the fetch remote task result reques
1544
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
249,749,960✔
1545
    if (NULL == pMsgSendInfo) {
249,746,432✔
1546
      taosMemoryFreeClear(msg);
×
1547
      taosMemoryFree(pWrapper);
×
1548
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1549
      pTaskInfo->code = terrno;
×
1550
      return pTaskInfo->code;
×
1551
    }
1552

1553
    pMsgSendInfo->param = pWrapper;
249,746,432✔
1554
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
249,749,434✔
1555
    pMsgSendInfo->msgInfo.pData = msg;
249,751,027✔
1556
    pMsgSendInfo->msgInfo.len = msgSize;
249,750,111✔
1557
    pMsgSendInfo->msgType = pSource->fetchMsgType;
249,750,510✔
1558
    pMsgSendInfo->fp = loadRemoteDataCallback;
249,751,013✔
1559
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
249,750,242✔
1560

1561
    int64_t transporterId = 0;
249,746,656✔
1562
    void* poolHandle = NULL;
249,746,571✔
1563
    pDataInfo->startTime = taosGetTimestampUs();
249,751,419✔
1564
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
249,749,739✔
1565
    QUERY_CHECK_CODE(code, lino, _end);
249,751,557✔
1566
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
249,751,557✔
1567
    *pRpcHandle = transporterId;
249,751,026✔
1568
  }
1569

1570
_end:
249,751,557✔
1571
  if (code != TSDB_CODE_SUCCESS) {
249,751,557✔
1572
    if (pWrapper) {
×
1573
      taosMemoryFree(pWrapper);
×
1574
    }
1575
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1576
  }
1577
  return code;
249,751,026✔
1578
}
1579

1580
/**
1581
  @brief record the data loading metrics of the exchange operator, including
1582
  the number of rows, the data length, and the elapsed time of current load operation.
1583
*/
1584
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows,
187,553,807✔
1585
                          int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator) {
1586
  pInfo->totalRows += numOfRows;
187,553,807✔
1587
  pInfo->totalSize += dataLen;
187,553,807✔
1588
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
187,553,807✔
1589
}
187,553,807✔
1590

1591
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan) {
560,826,988✔
1592
  int32_t      code = TSDB_CODE_SUCCESS;
560,826,988✔
1593
  int32_t      lino = 0;
560,826,988✔
1594
  SSDataBlock* pBlock = NULL;
560,826,988✔
1595
  if (isVstbScan) {
560,827,678✔
1596
    blockDataCleanup(pRes);
28,490,723✔
1597
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
28,490,723✔
1598
    QUERY_CHECK_CODE(code, lino, _end);
28,488,691✔
1599
  }
1600
  if (pColList == NULL) {  // data from other sources
560,825,646✔
1601
    blockDataCleanup(pRes);
555,496,766✔
1602
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
555,498,395✔
1603
    QUERY_CHECK_CODE(code, lino, _end);
555,498,860✔
1604
  } else {  // extract data according to pColList
1605
    char* pStart = pData;
5,328,880✔
1606

1607
    int32_t numOfCols = htonl(*(int32_t*)pStart);
5,328,880✔
1608
    pStart += sizeof(int32_t);
5,328,880✔
1609

1610
    // todo refactor:extract method
1611
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
5,328,880✔
1612
    for (int32_t i = 0; i < numOfCols; ++i) {
74,424,800✔
1613
      SSysTableSchema* p = (SSysTableSchema*)pStart;
69,095,920✔
1614

1615
      p->colId = htons(p->colId);
69,095,920✔
1616
      p->bytes = htonl(p->bytes);
69,095,920✔
1617
      pStart += sizeof(SSysTableSchema);
69,095,920✔
1618
    }
1619

1620
    pBlock = NULL;
5,328,880✔
1621
    code = createDataBlock(&pBlock);
5,328,880✔
1622
    QUERY_CHECK_CODE(code, lino, _end);
5,328,880✔
1623

1624
    for (int32_t i = 0; i < numOfCols; ++i) {
74,424,800✔
1625
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
69,095,920✔
1626
      code = blockDataAppendColInfo(pBlock, &idata);
69,095,920✔
1627
      QUERY_CHECK_CODE(code, lino, _end);
69,095,920✔
1628
    }
1629

1630
    code = blockDecodeInternal(pBlock, pStart, NULL);
5,328,880✔
1631
    QUERY_CHECK_CODE(code, lino, _end);
5,328,880✔
1632

1633
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
5,328,880✔
1634
    QUERY_CHECK_CODE(code, lino, _end);
5,328,880✔
1635

1636
    // data from mnode
1637
    pRes->info.dataLoad = 1;
5,328,880✔
1638
    pRes->info.rows = pBlock->info.rows;
5,328,880✔
1639
    pRes->info.scanFlag = pBlock->info.scanFlag;
5,328,880✔
1640
    pRes->info.id.groupId = pBlock->info.id.groupId;
5,328,880✔
1641
    pRes->info.id.baseGId = pBlock->info.id.baseGId;
5,328,880✔
1642
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
5,328,880✔
1643
    QUERY_CHECK_CODE(code, lino, _end);
5,328,880✔
1644

1645
    blockDataDestroy(pBlock);
5,328,880✔
1646
    pBlock = NULL;
5,328,880✔
1647
  }
1648

1649
_end:
560,827,740✔
1650
  if (code != TSDB_CODE_SUCCESS) {
560,827,050✔
1651
    blockDataDestroy(pBlock);
×
1652
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1653
  }
1654
  return code;
560,827,050✔
1655
}
1656

1657
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
95,133,311✔
1658
  SExchangeInfo* pExchangeInfo = pOperator->info;
95,133,311✔
1659
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
95,133,311✔
1660

1661
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
95,133,311✔
1662
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
95,133,311✔
1663
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
95,133,311✔
1664
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1665
         pLoadInfo->totalElapsed / 1000.0);
1666

1667
  setOperatorCompleted(pOperator);
95,133,311✔
1668
}
95,133,311✔
1669

1670
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
316,298,304✔
1671
  int32_t code = TSDB_CODE_SUCCESS;
316,298,304✔
1672
  int32_t lino = 0;
316,298,304✔
1673
  size_t  total = taosArrayGetSize(pArray);
316,298,304✔
1674

1675
  int32_t completed = 0;
316,300,381✔
1676
  for (int32_t k = 0; k < total; ++k) {
1,030,995,782✔
1677
    SSourceDataInfo* p = taosArrayGet(pArray, k);
714,696,688✔
1678
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
714,696,362✔
1679
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
714,696,362✔
1680
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
341,142,817✔
1681
      completed += 1;
341,143,382✔
1682
    }
1683
  }
1684

1685
  *pRes = completed;
316,299,094✔
1686
_end:
316,298,070✔
1687
  if (code != TSDB_CODE_SUCCESS) {
316,298,070✔
1688
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1689
  }
1690
  return code;
316,298,223✔
1691
}
1692

1693
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
112,505,611✔
1694
  SExchangeInfo* pExchangeInfo = pOperator->info;
112,505,611✔
1695
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
112,505,056✔
1696

1697
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
112,505,611✔
1698
  int64_t startTs = taosGetTimestampUs();
112,500,097✔
1699

1700
  // Asynchronously send all fetch requests to all sources.
1701
  for (int32_t i = 0; i < totalSources; ++i) {
297,656,121✔
1702
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
185,154,456✔
1703
    if (code != TSDB_CODE_SUCCESS) {
185,155,646✔
1704
      pTaskInfo->code = code;
×
1705
      return code;
×
1706
    }
1707
  }
1708

1709
  int64_t endTs = taosGetTimestampUs();
112,507,048✔
1710
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
112,507,048✔
1711
         totalSources, (endTs - startTs) / 1000.0);
1712

1713
  pOperator->status = OP_RES_TO_RETURN;
112,507,048✔
1714
  if (isTaskKilled(pTaskInfo)) {
112,507,048✔
1715
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1716
  }
1717

1718
  return TSDB_CODE_SUCCESS;
112,507,048✔
1719
}
1720

1721
/**
1722
  @brief store STEP DONE notification info
1723
*/
1724
void storeNotifyInfo(SOperatorInfo* pOperator) {
4,197,244✔
1725
  SExchangeInfo*  pExchangeInfo = pOperator->info;
4,197,244✔
1726
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4,197,244✔
1727
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,197,244✔
1728

1729
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
4,197,244✔
1730
  if (!pParam->multiParams) {
4,197,244✔
1731
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
4,197,244✔
1732
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
4,197,244✔
1733
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1734
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
1735
      return;
×
1736
    }
1737

1738
    pExchangeInfo->notifyToSend = true;
4,197,244✔
1739
    pExchangeInfo->notifyTs = pBasic->notifyTs;
4,197,244✔
1740
  } else {
1741
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1742
           GET_TASKID(pTaskInfo), __func__);
1743
  }
1744
}
1745

1746
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
182,224,131✔
1747
  int32_t            code = TSDB_CODE_SUCCESS;
182,224,131✔
1748
  int32_t            lino = 0;
182,224,131✔
1749
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
182,224,131✔
1750
  SSDataBlock*       pb = NULL;
182,224,927✔
1751

1752
  char* pNextStart = pRetrieveRsp->data;
182,224,384✔
1753
  char* pStart = pNextStart;
182,224,121✔
1754

1755
  int32_t index = 0;
182,224,121✔
1756

1757
  if (pRetrieveRsp->compressed) {  // decompress the data
182,224,121✔
1758
    if (pDataInfo->decompBuf == NULL) {
×
1759
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1760
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1761
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1762
    } else {
1763
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1764
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1765
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1766
        if (p != NULL) {
×
1767
          pDataInfo->decompBuf = p;
×
1768
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1769
        }
1770
      }
1771
    }
1772
  }
1773

1774
  while (index++ < pRetrieveRsp->numOfBlocks) {
737,724,325✔
1775
    pStart = pNextStart;
555,499,537✔
1776

1777
    if (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN) {
555,499,537✔
1778
      pb = taosMemoryCalloc(1, sizeof(SSDataBlock));
28,490,723✔
1779
      QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
28,490,723✔
1780
    } else if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
527,008,201✔
1781
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
233,707,093✔
1782
      blockDataCleanup(pb);
233,706,840✔
1783
    } else {
1784
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
293,301,718✔
1785
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
293,301,878✔
1786
    }
1787

1788
    int32_t compLen = *(int32_t*)pStart;
555,499,151✔
1789
    pStart += sizeof(int32_t);
555,498,673✔
1790

1791
    int32_t rawLen = *(int32_t*)pStart;
555,499,402✔
1792
    pStart += sizeof(int32_t);
555,499,402✔
1793
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
555,498,965✔
1794

1795
    pNextStart = pStart + compLen;
555,498,965✔
1796
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
555,499,151✔
1797
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1798
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1799
      pStart = pDataInfo->decompBuf;
×
1800
    }
1801

1802
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart, (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN));
555,499,694✔
1803
    if (code != 0) {
555,497,388✔
1804
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1805
      goto _end;
×
1806
    }
1807

1808
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
555,497,388✔
1809
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
555,500,138✔
1810
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
555,500,138✔
1811
    pb = NULL;
555,500,737✔
1812
  }
1813

1814
_end:
182,224,897✔
1815
  if (code != TSDB_CODE_SUCCESS) {
182,224,897✔
1816
    blockDataDestroy(pb);
×
1817
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1818
  }
1819
  return code;
182,224,927✔
1820
}
1821

1822
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
43,721,053✔
1823
  SExchangeInfo* pExchangeInfo = pOperator->info;
43,721,053✔
1824
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
43,721,053✔
1825

1826
  int32_t code = 0;
43,721,053✔
1827
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
43,721,053✔
1828
  int64_t startTs = taosGetTimestampUs();
43,721,053✔
1829

1830
  int32_t vgId = 0;
43,721,053✔
1831
  if (pExchangeInfo->dynTbname) {
43,721,053✔
1832
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
264,040✔
1833
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
264,040✔
1834
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
264,040✔
1835
      if (pValue != NULL && pValue->isTbname) {
264,040✔
1836
        vgId = pValue->vgId;
264,040✔
1837
        break;
264,040✔
1838
      }
1839
    }
1840
  }
1841

1842
  while (1) {
11,962,859✔
1843
    if (pExchangeInfo->current >= totalSources) {
55,683,912✔
1844
      setAllSourcesCompleted(pOperator);
11,831,688✔
1845
      return TSDB_CODE_SUCCESS;
11,831,688✔
1846
    }
1847

1848
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
43,852,224✔
1849
    if (!pSource) {
43,852,224✔
1850
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1851
      pTaskInfo->code = terrno;
×
1852
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1853
    }
1854

1855
    if (vgId != 0 && pSource->addr.nodeId != vgId){
43,852,224✔
1856
      pExchangeInfo->current += 1;
223,659✔
1857
      continue;
223,659✔
1858
    }
1859

1860
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
43,628,565✔
1861
    if (!pDataInfo) {
43,628,565✔
1862
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1863
      pTaskInfo->code = terrno;
×
1864
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1865
    }
1866
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
43,628,565✔
1867

1868
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
43,628,565✔
1869
    if (code != TSDB_CODE_SUCCESS) {
43,628,565✔
1870
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1871
      pTaskInfo->code = code;
×
1872
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1873
    }
1874

1875
    while (true) {
1,230✔
1876
      recordOpExecBeforeDownstream(pOperator);
43,629,795✔
1877
      code = exchangeWait(pOperator, pExchangeInfo);
43,629,795✔
1878
      recordOpExecAfterDownstream(pOperator, 0);
43,629,795✔
1879

1880
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
43,629,795✔
1881
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,230✔
1882
      }
1883

1884
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
43,628,565✔
1885
      if (pDataInfo->seqId != currSeqId) {
43,628,565✔
1886
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
1,230✔
1887
        taosMemoryFreeClear(pDataInfo->pRsp);
1,230✔
1888
        continue;
1,230✔
1889
      }
1890

1891
      break;
43,627,335✔
1892
    }
1893

1894
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
43,627,335✔
1895
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
1,299✔
1896
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1897
             tstrerror(pDataInfo->code));
1898
      pOperator->pTaskInfo->code = pDataInfo->code;
1,299✔
1899
      return pOperator->pTaskInfo->code;
1,299✔
1900
    }
1901

1902
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
43,625,797✔
1903
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
43,626,036✔
1904

1905
    if (pRsp->numOfRows == 0) {
43,626,036✔
1906
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
11,739,200✔
1907
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1908
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1909
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1910

1911
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
11,739,200✔
1912
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
11,739,200✔
1913
        pExchangeInfo->current = totalSources;
11,635,904✔
1914
      } else {
1915
        pExchangeInfo->current += 1;
103,296✔
1916
      }
1917
      taosMemoryFreeClear(pDataInfo->pRsp);
11,739,200✔
1918
      continue;
11,739,200✔
1919
    }
1920

1921
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
31,886,836✔
1922
    if (code != TSDB_CODE_SUCCESS) {
31,886,836✔
1923
      goto _error;
×
1924
    }
1925

1926
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
31,886,836✔
1927
    if (pRsp->completed == 1) {
31,886,836✔
1928
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
187,488✔
1929
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1930
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1931
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1932
             pExchangeInfo->current + 1, totalSources);
1933

1934
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
187,488✔
1935
      if (isVstbScan(pDataInfo)) {
187,488✔
1936
        pExchangeInfo->current = totalSources;
×
1937
      } else {
1938
        pExchangeInfo->current += 1;
187,488✔
1939
      }
1940
    } else {
1941
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
31,699,348✔
1942
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1943
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1944
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1945
    }
1946
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
31,886,836✔
1947
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
31,559,060✔
1948
    }
1949
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
31,886,836✔
1950
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
31,886,836✔
1951

1952
    taosMemoryFreeClear(pDataInfo->pRsp);
31,886,836✔
1953
    return TSDB_CODE_SUCCESS;
31,886,836✔
1954
  }
1955

1956
_error:
×
1957
  pTaskInfo->code = code;
×
1958
  return code;
×
1959
}
1960

1961
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
3,025,749✔
1962
  int32_t  code = TSDB_CODE_SUCCESS;
3,025,749✔
1963
  int32_t  lino = 0;
3,025,749✔
1964
  STagVal  dstTag;
3,025,749✔
1965
  bool     needFree = false;
3,025,749✔
1966

1967
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
3,025,749✔
1968
    qError("%s failed since invalid exchange operator param type %d",
×
1969
      __func__, pBasicParam->paramType);
1970
    return TSDB_CODE_INVALID_PARA;
×
1971
  }
1972

1973
  if (pDataInfo->tagList) {
3,025,749✔
1974
    taosArrayClear(pDataInfo->tagList);
×
1975
  }
1976

1977
  if (pBasicParam->tagList) {
3,025,749✔
1978
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
843,062✔
1979
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
843,062✔
1980

1981
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
5,221,032✔
1982
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
4,377,970✔
1983
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
4,377,970✔
1984

1985
      dstTag = (STagVal){0};
4,377,970✔
1986
      dstTag.type = pSrcTag->type;
4,377,970✔
1987
      dstTag.cid = pSrcTag->cid;
4,377,970✔
1988
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,377,970✔
1989
        dstTag.nData = pSrcTag->nData;
1,900,410✔
1990
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
1,900,410✔
1991
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
1,900,410✔
1992
        needFree = true;
1,900,410✔
1993
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
1,900,410✔
1994
      } else {
1995
        dstTag.i64 = pSrcTag->i64;
2,477,560✔
1996
      }
1997

1998
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
8,755,940✔
1999
      needFree = false;
4,377,970✔
2000
    }
2001
  } else {
2002
    pDataInfo->tagList = NULL;
2,182,687✔
2003
  }
2004

2005
  return code;
3,025,749✔
2006
_return:
×
2007
  if (needFree) {
×
2008
    taosMemoryFreeClear(dstTag.pData);
×
2009
  }
2010
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2011
  return code;
×
2012
}
2013

2014
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
3,025,749✔
2015
  int32_t     code = TSDB_CODE_SUCCESS;
3,025,749✔
2016
  int32_t     lino = 0;
3,025,749✔
2017
  SOrgTbInfo  dstOrgTbInfo = {0};
3,025,749✔
2018
  bool        needFree = false;
3,025,749✔
2019

2020
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
3,025,749✔
2021
    qError("%s failed since invalid exchange operator param type %d",
×
2022
      __func__, pBasicParam->paramType);
2023
    return TSDB_CODE_INVALID_PARA;
×
2024
  }
2025

2026
  if (pBasicParam->batchOrgTbInfo) {
3,025,749✔
2027
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
3,025,749✔
2028
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
3,025,749✔
2029

2030
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
8,898,951✔
2031
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
5,873,202✔
2032
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
5,873,202✔
2033

2034
      dstOrgTbInfo = (SOrgTbInfo){0};
5,873,202✔
2035
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
5,873,202✔
2036
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
5,873,202✔
2037

2038
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
5,873,202✔
2039
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
5,873,202✔
2040

2041
      needFree = true;
5,873,202✔
2042
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
11,746,404✔
2043
      needFree = false;
5,873,202✔
2044
    }
2045
  } else {
2046
    pBasicParam->batchOrgTbInfo = NULL;
×
2047
  }
2048

2049
  return code;
3,025,749✔
2050
_return:
×
2051
  if (needFree) {
×
2052
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
2053
  }
2054
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2055
  return code;
×
2056
}
2057

2058
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
48,623,352✔
2059
                                SExchangeOperatorBasicParam* pBasicParam) {
2060
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
48,623,352✔
2061
    qWarn("%s, %s found invalid exchange operator param type %d",
×
2062
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
2063
    return TSDB_CODE_SUCCESS;
×
2064
  }
2065

2066
  int32_t            code = TSDB_CODE_SUCCESS;
48,623,352✔
2067
  int32_t            lino = 0;
48,623,352✔
2068
  SExchangeInfo*     pExchangeInfo = pOperator->info;
48,623,352✔
2069
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
48,623,352✔
2070

2071
  if (NULL == pIdx) {
48,623,352✔
2072
    if (pBasicParam->isNewDeployed) {
1,183,289✔
2073
      SDownstreamSourceNode *pNode = NULL;
2,618✔
2074
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,618✔
2075
      QUERY_CHECK_CODE(code, lino, _return);
2,618✔
2076

2077
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,618✔
2078
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,618✔
2079
      QUERY_CHECK_CODE(code, lino, _return);
2,618✔
2080

2081
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,618✔
2082
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,618✔
2083

2084
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,618✔
2085
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,618✔
2086
      if (pExchangeInfo->pHashSources) {
2,618✔
2087
        QUERY_CHECK_CODE(code, lino, _return);
2,618✔
2088
      }
2089
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,618✔
2090
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,618✔
2091
    } else if (pBasicParam->type == EX_SRC_TYPE_VSTB_TS_SCAN || pBasicParam->type == EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN) {
1,180,671✔
2092
      // Multi-exchange virtual table paths build each exchange param from the full vg map.
2093
      // If this exchange does not own the current vg source, skip it and let the matching exchange consume it.
2094
      qDebug("addSingleExchangeSource found no existing source for vgId: %d, sourceType:%d, skip it",
1,180,671✔
2095
             pBasicParam->vgId, pBasicParam->type);
2096
      return TSDB_CODE_SUCCESS;
1,180,671✔
2097
    } else {
2098
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
2099
      return TSDB_CODE_INVALID_PARA;
×
2100
    }
2101
  }
2102

2103
  qDebug("start to add single exchange source");
47,442,681✔
2104

2105
  switch (pBasicParam->type) {
47,442,681✔
2106
    case EX_SRC_TYPE_VSTB_TS_SCAN:
3,025,749✔
2107
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
2108
    case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
2109
    case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN:
2110
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
2111
      if (pIdx->inUseIdx < 0) {
3,025,749✔
2112
        SSourceDataInfo dataInfo = {0};
1,943,344✔
2113
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,943,344✔
2114
        dataInfo.taskId = pExchangeInfo->pTaskId;
1,943,344✔
2115
        dataInfo.index = pIdx->srcIdx;
1,943,344✔
2116
        dataInfo.groupid = pBasicParam->groupid;
1,943,344✔
2117
        dataInfo.window = pBasicParam->window;
1,943,344✔
2118
        dataInfo.isNewParam = pBasicParam->isNewParam;
1,943,344✔
2119
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
1,943,344✔
2120
        QUERY_CHECK_CODE(code, lino, _return);
1,943,344✔
2121

2122
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
1,943,344✔
2123
        QUERY_CHECK_CODE(code, lino, _return);
1,943,344✔
2124

2125
        dataInfo.orgTbInfo = NULL;
1,943,344✔
2126

2127
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,943,344✔
2128
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
1,943,344✔
2129

2130
        dataInfo.type = pBasicParam->type;
1,943,344✔
2131
        dataInfo.srcOpType = pBasicParam->srcOpType;
1,943,344✔
2132
        dataInfo.tableSeq = pBasicParam->tableSeq;
1,943,344✔
2133

2134
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
3,886,688✔
2135

2136
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,943,344✔
2137
      } else {
2138
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
1,082,405✔
2139
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
1,082,405✔
2140

2141
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1,082,405✔
2142
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,082,405✔
2143
        }
2144

2145
        pDataInfo->taskId = pExchangeInfo->pTaskId;
1,082,405✔
2146
        pDataInfo->index = pIdx->srcIdx;
1,082,405✔
2147
        pDataInfo->window = pBasicParam->window;
1,082,405✔
2148
        pDataInfo->groupid = pBasicParam->groupid;
1,082,405✔
2149
        pDataInfo->isNewParam = pBasicParam->isNewParam;
1,082,405✔
2150

2151
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
1,082,405✔
2152
        QUERY_CHECK_CODE(code, lino, _return);
1,082,405✔
2153

2154
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
1,082,405✔
2155
        QUERY_CHECK_CODE(code, lino, _return);
1,082,405✔
2156

2157
        pDataInfo->orgTbInfo = NULL;
1,082,405✔
2158

2159
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,082,405✔
2160
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
1,082,405✔
2161

2162
        pDataInfo->type = pBasicParam->type;
1,082,405✔
2163
        pDataInfo->srcOpType = pBasicParam->srcOpType;
1,082,405✔
2164
        pDataInfo->tableSeq = pBasicParam->tableSeq;
1,082,405✔
2165
      }
2166
      break;
3,025,749✔
2167
    }
2168
    case EX_SRC_TYPE_VTB_WIN_SCAN:
3,888,536✔
2169
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2170
      SSourceDataInfo dataInfo = {0};
3,888,536✔
2171
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
3,888,536✔
2172
      dataInfo.taskId = pExchangeInfo->pTaskId;
3,888,536✔
2173
      dataInfo.index = pIdx->srcIdx;
3,888,536✔
2174
      dataInfo.window = pBasicParam->window;
3,888,536✔
2175
      dataInfo.groupid = 0;
3,888,536✔
2176
      dataInfo.orgTbInfo = NULL;
3,888,536✔
2177
      dataInfo.tagList = NULL;
3,888,536✔
2178

2179
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
3,888,536✔
2180
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
3,888,536✔
2181

2182
      dataInfo.isNewParam = false;
3,888,536✔
2183
      dataInfo.type = pBasicParam->type;
3,888,536✔
2184
      dataInfo.srcOpType = pBasicParam->srcOpType;
3,888,536✔
2185
      dataInfo.tableSeq = pBasicParam->tableSeq;
3,888,536✔
2186

2187
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
3,888,536✔
2188
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
7,777,072✔
2189
      break;
3,888,536✔
2190
    }
2191
    case EX_SRC_TYPE_VSTB_SCAN: {
40,268,302✔
2192
      SSourceDataInfo dataInfo = {0};
40,268,302✔
2193
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
40,268,302✔
2194
      dataInfo.taskId = pExchangeInfo->pTaskId;
40,268,302✔
2195
      dataInfo.index = pIdx->srcIdx;
40,268,302✔
2196
      dataInfo.window = pBasicParam->window;
40,268,302✔
2197
      dataInfo.groupid = 0;
40,268,302✔
2198
      dataInfo.isNewParam = pBasicParam->isNewParam;
40,268,302✔
2199
      dataInfo.tagList = NULL;
40,268,302✔
2200
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
40,268,302✔
2201
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
40,268,302✔
2202
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
40,268,302✔
2203
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
40,268,302✔
2204
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
40,268,302✔
2205
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
40,268,302✔
2206

2207
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
40,268,302✔
2208
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
40,268,302✔
2209

2210
      dataInfo.type = pBasicParam->type;
40,268,302✔
2211
      dataInfo.srcOpType = pBasicParam->srcOpType;
40,268,302✔
2212
      dataInfo.tableSeq = pBasicParam->tableSeq;
40,268,302✔
2213

2214
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
40,268,302✔
2215
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
80,536,604✔
2216
      break;
40,268,302✔
2217
    }
2218
    case EX_SRC_TYPE_STB_JOIN_SCAN:
260,094✔
2219
    default: {
2220
      if (pIdx->inUseIdx < 0) {
260,094✔
2221
        SSourceDataInfo dataInfo = {0};
257,718✔
2222
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
257,718✔
2223
        dataInfo.taskId = pExchangeInfo->pTaskId;
257,718✔
2224
        dataInfo.index = pIdx->srcIdx;
257,718✔
2225
        dataInfo.groupid = 0;
257,718✔
2226
        dataInfo.tagList = NULL;
257,718✔
2227

2228
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
257,718✔
2229
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
257,718✔
2230

2231
        dataInfo.isNewParam = false;
257,718✔
2232
        dataInfo.type = pBasicParam->type;
257,718✔
2233
        dataInfo.srcOpType = pBasicParam->srcOpType;
257,718✔
2234
        dataInfo.tableSeq = pBasicParam->tableSeq;
257,718✔
2235

2236
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
515,436✔
2237

2238
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
257,718✔
2239
      } else {
2240
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,376✔
2241
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,376✔
2242
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,376✔
2243
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,376✔
2244
        }
2245

2246
        pDataInfo->tagList = NULL;
2,376✔
2247
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,376✔
2248
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,376✔
2249

2250
        pDataInfo->groupid = 0;
2,376✔
2251
        pDataInfo->isNewParam = false;
2,376✔
2252
        pDataInfo->type = pBasicParam->type;
2,376✔
2253
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,376✔
2254
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,376✔
2255
      }
2256
      break;
260,094✔
2257
    }
2258
  }
2259

2260
  return code;
47,442,681✔
2261
_return:
×
2262
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2263
  return code;
×
2264
}
2265

2266
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
46,726,849✔
2267
  SExchangeInfo*               pExchangeInfo = pOperator->info;
46,726,849✔
2268
  int32_t                      code = TSDB_CODE_SUCCESS;
46,726,849✔
2269
  SExchangeOperatorBasicParam* pBasicParam = NULL;
46,726,849✔
2270
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
46,726,849✔
2271
  if (pParam->multiParams) {
46,726,849✔
2272
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
2,565,259✔
2273
    int32_t                      iter = 0;
2,565,259✔
2274
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
7,027,021✔
2275
      code = addSingleExchangeSource(pOperator, pBasicParam);
4,461,762✔
2276
      if (code) {
4,461,762✔
2277
        return code;
×
2278
      }
2279
    }
2280
  } else {
2281
    pBasicParam = &pParam->basic;
44,161,590✔
2282
    code = addSingleExchangeSource(pOperator, pBasicParam);
44,161,590✔
2283
  }
2284

2285
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
46,726,849✔
2286
  pOperator->pOperatorGetParam = NULL;
46,726,849✔
2287

2288
  return code;
46,726,849✔
2289
}
2290

2291
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
711,959,937✔
2292
  SExchangeInfo* pExchangeInfo = pOperator->info;
711,959,937✔
2293
  int32_t        code = TSDB_CODE_SUCCESS;
711,959,847✔
2294
  int32_t        lino = 0;
711,959,847✔
2295
  
2296
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
711,959,847✔
2297
       NULL == pOperator->pOperatorGetParam) ||
543,906,718✔
2298
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
172,251,068✔
2299
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
549,091,342✔
2300
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2301
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2302
    return TSDB_CODE_SUCCESS;
549,092,039✔
2303
  }
2304

2305
  if (pExchangeInfo->dynamicOp) {
162,868,523✔
2306
    code = addDynamicExchangeSource(pOperator);
46,726,849✔
2307
    QUERY_CHECK_CODE(code, lino, _end);
46,726,849✔
2308
  }
2309

2310
  if (pOperator->status == OP_NOT_OPENED &&
162,868,226✔
2311
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
155,155,044✔
2312
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
121,799,712✔
2313
    pExchangeInfo->current = 0;
48,160,232✔
2314
  }
2315

2316
  if (NULL != pOperator->pOperatorGetParam) {
162,867,131✔
2317
    SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,197,244✔
2318
    storeNotifyInfo(pOperator);
4,197,244✔
2319

2320
    if (!pGetParam->reUse) {
4,197,244✔
2321
      freeOperatorParam(pGetParam, OP_GET_PARAM);
×
2322
    } else {
2323
      /**
2324
        The param is referenced by getParam, and it will be freed by
2325
        the parent operator after getting next block.
2326
      */
2327
      pGetParam->reUse = false;
4,197,244✔
2328
    }
2329
    pOperator->pOperatorGetParam = NULL;
4,197,244✔
2330
  }
2331

2332
  int64_t st = taosGetTimestampUs();
162,868,571✔
2333

2334
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
162,868,571✔
2335
    code = prepareConcurrentlyLoad(pOperator);
112,504,311✔
2336
    QUERY_CHECK_CODE(code, lino, _end);
112,506,483✔
2337
    pExchangeInfo->openedTs = taosGetTimestampUs();
112,507,048✔
2338
  }
2339

2340
  OPTR_SET_OPENED(pOperator);
162,874,864✔
2341
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
162,869,502✔
2342

2343
_end:
64,814,920✔
2344
  if (code != TSDB_CODE_SUCCESS) {
162,869,263✔
2345
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2346
    pOperator->pTaskInfo->code = code;
×
2347
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2348
  }
2349
  return code;
162,869,263✔
2350
}
2351

2352
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
4,230,910✔
2353
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,230,910✔
2354

2355
  if (pLimitInfo->remainGroupOffset > 0) {
4,230,910✔
2356
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2357
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2358
      blockDataCleanup(pBlock);
×
2359
      return PROJECT_RETRIEVE_CONTINUE;
×
2360
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2361
      // now it is the data from a new group
2362
      pLimitInfo->remainGroupOffset -= 1;
×
2363

2364
      // ignore data block in current group
2365
      if (pLimitInfo->remainGroupOffset > 0) {
×
2366
        blockDataCleanup(pBlock);
×
2367
        return PROJECT_RETRIEVE_CONTINUE;
×
2368
      }
2369
    }
2370

2371
    // set current group id of the project operator
2372
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2373
  }
2374

2375
  // here check for a new group data, we need to handle the data of the previous group.
2376
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
4,230,910✔
2377
    pLimitInfo->numOfOutputGroups += 1;
159,678✔
2378
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
159,678✔
2379
      pOperator->status = OP_EXEC_DONE;
×
2380
      blockDataCleanup(pBlock);
×
2381

2382
      return PROJECT_RETRIEVE_DONE;
×
2383
    }
2384

2385
    // reset the value for a new group data
2386
    resetLimitInfoForNextGroup(pLimitInfo);
159,678✔
2387
    // existing rows that belongs to previous group.
2388
    if (pBlock->info.rows > 0) {
159,678✔
2389
      return PROJECT_RETRIEVE_DONE;
159,678✔
2390
    }
2391
  }
2392

2393
  // here we reach the start position, according to the limit/offset requirements.
2394

2395
  // set current group id
2396
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
4,071,232✔
2397

2398
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
4,071,232✔
2399
  if (pBlock->info.rows == 0) {
4,071,232✔
2400
    return PROJECT_RETRIEVE_CONTINUE;
2,133,170✔
2401
  } else {
2402
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,938,062✔
2403
      setOperatorCompleted(pOperator);
×
2404
      return PROJECT_RETRIEVE_DONE;
×
2405
    }
2406
  }
2407

2408
  // todo optimize performance
2409
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2410
  // they may not belong to the same group the limit/offset value is not valid in this case.
2411
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,938,062✔
2412
    return PROJECT_RETRIEVE_DONE;
1,938,062✔
2413
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2414
    return PROJECT_RETRIEVE_CONTINUE;
×
2415
  }
2416
}
2417

2418
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
248,725,712✔
2419
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
248,725,712✔
2420
  int32_t        code = TSDB_CODE_SUCCESS;
248,725,687✔
2421
  if (pTask->pWorkerCb) {
248,725,687✔
2422
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
248,724,571✔
2423
    if (code != TSDB_CODE_SUCCESS) {
248,726,749✔
2424
      pTask->code = code;
×
2425
      return pTask->code;
×
2426
    }
2427
  }
2428

2429
  code = tsem_wait(&pExchangeInfo->ready);
248,726,588✔
2430
  if (code != TSDB_CODE_SUCCESS) {
248,726,305✔
2431
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2432
    pTask->code = code;
×
2433
    return pTask->code;
×
2434
  }
2435

2436
  if (pTask->pWorkerCb) {
248,726,305✔
2437
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
248,725,953✔
2438
    if (code != TSDB_CODE_SUCCESS) {
248,726,903✔
2439
      pTask->code = code;
×
2440
      return pTask->code;
×
2441
    }
2442
  }
2443
  return TSDB_CODE_SUCCESS;
248,726,749✔
2444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc