• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5006

29 Mar 2026 04:32AM UTC coverage: 72.274% (+0.1%) from 72.152%
#5006

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253711 of 351039 relevant lines covered (72.27%)

131490495.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.39
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
  uint64_t            fetchTimes;   // per-source fetch count
58
  int64_t             fetchCostUs;  // per-source total RPC round-trip (us)
59
} SSourceDataInfo;
60

61
static void destroyExchangeOperatorInfo(void* param);
62
static void freeBlock(void* pParam);
63
static void freeSourceDataInfo(void* param);
64
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
65

66
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
67
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
68
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
69
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
70
static void    storeNotifyInfo(SOperatorInfo* pOperator);
71
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
72
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
73
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
74
                                 bool holdDataInBuf);
75
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
76

77
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
78

79
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
26,929,797✔
80
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
81
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
82
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
17,791,711✔
83
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
84

85

86
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
15,446,089✔
87
                                             SExchangeInfo* pExchangeInfo,
88
                                             SExecTaskInfo* pTaskInfo) {
89
  int32_t code = 0;
15,446,089✔
90
  int32_t lino = 0;
15,446,089✔
91
  int64_t startTs = taosGetTimestampUs();  
15,446,092✔
92
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
15,446,092✔
93
  int32_t completed = 0;
15,446,089✔
94
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
15,446,328✔
95
  if (code != TSDB_CODE_SUCCESS) {
15,445,336✔
96
    pTaskInfo->code = code;
×
97
    T_LONG_JMP(pTaskInfo->env, code);
×
98
  }
99
  if (completed == totalSources) {
15,445,336✔
100
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
2,262,452✔
101
    setAllSourcesCompleted(pOperator);
2,262,452✔
102
    return;
2,265,284✔
103
  }
104

105
  SSourceDataInfo* pDataInfo = NULL;
13,182,884✔
106

107
  while (1) {
5,723,837✔
108
    if (pExchangeInfo->current < 0) {
18,906,721✔
109
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
135,966✔
110
      setAllSourcesCompleted(pOperator);
135,966✔
111
      return;
135,966✔
112
    }
113
    
114
    if (pExchangeInfo->current >= totalSources) {
18,771,253✔
115
      completed = 0;
9,671,554✔
116
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
9,671,554✔
117
      if (code != TSDB_CODE_SUCCESS) {
9,671,554✔
118
        pTaskInfo->code = code;
×
119
        T_LONG_JMP(pTaskInfo->env, code);
×
120
      }
121
      if (completed == totalSources) {
9,671,554✔
122
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
4,167,261✔
123
        setAllSourcesCompleted(pOperator);
4,167,261✔
124
        return;
4,167,261✔
125
      }
126
      
127
      pExchangeInfo->current = 0;
5,504,293✔
128
    }
129

130
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
14,604,247✔
131

132
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
14,604,247✔
133
    if (!pDataInfo) {
14,603,996✔
134
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
135
      pTaskInfo->code = terrno;
×
136
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
137
    }
138

139
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
14,603,996✔
140
      pExchangeInfo->current++;
648,640✔
141
      continue;
648,640✔
142
    }
143

144
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
13,955,607✔
145

146
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
13,955,607✔
147
    if (code != TSDB_CODE_SUCCESS) {
13,955,602✔
148
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
149
      pTaskInfo->code = code;
×
150
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
151
    }
152

153
    while (true) {
767✔
154
      recordOpExecBeforeDownstream(pOperator);
13,956,369✔
155
      code = exchangeWait(pOperator, pExchangeInfo);
13,956,374✔
156
      recordOpExecAfterDownstream(pOperator, 0);
13,956,613✔
157

158
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
13,956,613✔
159
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
767✔
160
      }
161

162
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
13,955,846✔
163
      if (pDataInfo->seqId != currSeqId) {
13,955,846✔
164
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
767✔
165
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
166
        taosMemoryFreeClear(pDataInfo->pRsp);
767✔
167
        continue;
767✔
168
      }
169

170
      break;
13,955,079✔
171
    }
172

173
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
13,955,079✔
174
    if (!pSource) {
13,955,079✔
175
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
176
      pTaskInfo->code = terrno;
×
177
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
178
    }
179

180
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
13,955,079✔
181
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
182
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
183
             tstrerror(pDataInfo->code));
184
      pTaskInfo->code = pDataInfo->code;
×
185
      T_LONG_JMP(pTaskInfo->env, code);
×
186
    }
187

188
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
13,955,079✔
189
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
13,955,079✔
190

191
    if (pRsp->numOfRows == 0) {
13,954,806✔
192
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
5,075,197✔
193
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
194
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
195
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
196

197
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
5,075,197✔
198
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
5,075,197✔
199
        pExchangeInfo->current = -1;
135,966✔
200
      } else {
201
        pExchangeInfo->current += 1;
4,939,231✔
202
      }
203
      taosMemoryFreeClear(pDataInfo->pRsp);
5,075,197✔
204
      continue;
5,075,197✔
205
    }
206

207
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
8,879,882✔
208
    TAOS_CHECK_EXIT(code);
8,879,882✔
209

210
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
8,879,882✔
211
    if (pRsp->completed == 1) {
8,879,882✔
212
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
2,432,416✔
213
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
214
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
215
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
216
             pExchangeInfo->current + 1, totalSources);
217

218
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,432,416✔
219
      if (isVstbScan(pDataInfo)) {
2,432,416✔
220
        pExchangeInfo->current = -1;
×
221
        taosMemoryFreeClear(pDataInfo->pRsp);
×
222
        continue;
×
223
      }
224
    } else {
225
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
6,447,466✔
226
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
227
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
228
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
229
    }
230

231
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
8,879,882✔
232
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
8,879,882✔
233

234
    pExchangeInfo->current++;
8,879,882✔
235

236
    taosMemoryFreeClear(pDataInfo->pRsp);
8,879,882✔
237
    return;
8,879,882✔
238
  }
239

240
_exit:
×
241

242
  if (code) {
×
243
    pTaskInfo->code = code;
×
244
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
245
  }
246
}
247

248

249
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
239,644,760✔
250
                                           SExecTaskInfo* pTaskInfo) {
251
  int32_t code = 0;
239,644,760✔
252
  int32_t lino = 0;
239,644,760✔
253
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
239,644,760✔
254
  int32_t completed = 0;
239,644,911✔
255
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
239,644,760✔
256
  if (code != TSDB_CODE_SUCCESS) {
239,644,760✔
257
    pTaskInfo->code = code;
×
258
    T_LONG_JMP(pTaskInfo->env, code);
×
259
  }
260
  if (completed == totalSources) {
239,644,760✔
261
    setAllSourcesCompleted(pOperator);
76,131,916✔
262
    return;
76,133,078✔
263
  }
264

265
  SSourceDataInfo* pDataInfo = NULL;
163,512,844✔
266

267
  while (1) {
18,849,887✔
268
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
182,362,731✔
269
    recordOpExecBeforeDownstream(pOperator);
182,362,731✔
270
    code = exchangeWait(pOperator, pExchangeInfo);
182,362,731✔
271
    recordOpExecAfterDownstream(pOperator, 0);
182,363,261✔
272

273
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
182,363,261✔
274
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,156✔
275
    }
276

277
    for (int32_t i = 0; i < totalSources; ++i) {
299,570,714✔
278
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
299,568,626✔
279
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
299,570,714✔
280
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
299,570,714✔
281
        continue;
86,696,206✔
282
      }
283

284
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
212,874,508✔
285
        continue;
30,512,909✔
286
      }
287

288
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
182,361,599✔
289
      if (pDataInfo->seqId != currSeqId) {
182,361,599✔
290
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
291
        taosMemoryFreeClear(pDataInfo->pRsp);
×
292
        break;
×
293
      }
294

295
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
182,361,599✔
296
        code = pDataInfo->code;
339✔
297
        TAOS_CHECK_EXIT(code);
339✔
298
      }
299

300
      tmemory_barrier();
182,361,260✔
301
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
182,361,260✔
302
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
182,361,260✔
303
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
182,361,260✔
304

305
      // todo
306
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
182,361,260✔
307
      if (pRsp->numOfRows == 0) {
182,361,260✔
308
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
41,297,790✔
309
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
310
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
311
          if (code != TSDB_CODE_SUCCESS) {
×
312
            taosMemoryFreeClear(pDataInfo->pRsp);
×
313
            TAOS_CHECK_EXIT(code);
×
314
          }
315
        } else {
316
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
41,297,790✔
317
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
41,297,790✔
318
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
319
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
320
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
321
          taosMemoryFreeClear(pDataInfo->pRsp);
41,297,790✔
322
        }
323
        break;
41,297,790✔
324
      }
325

326
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
141,063,470✔
327

328
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
141,063,470✔
329
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
141,063,470✔
330
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
141,063,470✔
331

332
      if (pRsp->completed == 1) {
141,063,470✔
333
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
128,288,848✔
334
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
128,288,310✔
335
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
336
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
337
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
338
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
339
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
340
      } else {
341
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
12,774,622✔
342
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
343
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
344
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
345
      }
346

347
      taosMemoryFreeClear(pDataInfo->pRsp);
141,062,932✔
348

349
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
141,063,470✔
350
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
12,774,622✔
351
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
12,774,622✔
352
        if (code != TSDB_CODE_SUCCESS) {
12,774,622✔
353
          taosMemoryFreeClear(pDataInfo->pRsp);
×
354
          TAOS_CHECK_EXIT(code);
×
355
        }
356
      }
357
      
358
      return;
141,063,540✔
359
    }  // end loop
360

361
    int32_t complete1 = 0;
41,299,878✔
362
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
41,297,790✔
363
    if (code != TSDB_CODE_SUCCESS) {
41,297,790✔
364
      pTaskInfo->code = code;
×
365
      T_LONG_JMP(pTaskInfo->env, code);
×
366
    }
367
    if (complete1 == totalSources) {
41,297,790✔
368
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
22,447,903✔
369
      return;
22,447,903✔
370
    }
371
  }
372

373
_exit:
339✔
374

375
  if (code) {
339✔
376
    pTaskInfo->code = code;
339✔
377
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
339✔
378
  }
379
}
380

381
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
635,546,087✔
382
  int32_t        code = TSDB_CODE_SUCCESS;
635,546,087✔
383
  SExchangeInfo* pExchangeInfo = pOperator->info;
635,546,087✔
384
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
635,546,326✔
385

386
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
635,546,087✔
387

388
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
635,544,658✔
389
  if (pOperator->status == OP_EXEC_DONE) {
635,544,140✔
390
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
391
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
392
           pLoadInfo->totalElapsed / 1000.0);
393
    return NULL;
×
394
  }
395

396
  // we have buffered retrieved datablock, return it directly
397
  SSDataBlock* p = NULL;
635,544,985✔
398
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
635,545,541✔
399
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
355,011,050✔
400
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
355,011,058✔
401
  }
402

403
  if (p != NULL) {
635,545,044✔
404
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
355,011,288✔
405
    if (!tmp) {
355,010,512✔
406
      code = terrno;
×
407
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
408
      pTaskInfo->code = code;
×
409
      T_LONG_JMP(pTaskInfo->env, code);
×
410
    }
411
    return p;
355,010,512✔
412
  } else {
413
    if (pExchangeInfo->seqLoadData) {
280,533,756✔
414
      code = seqLoadRemoteData(pOperator);
25,444,180✔
415
      if (code != TSDB_CODE_SUCCESS) {
25,443,366✔
416
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
670✔
417
        pTaskInfo->code = code;
670✔
418
        T_LONG_JMP(pTaskInfo->env, code);
670✔
419
      }
420
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
255,089,955✔
421
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
15,446,089✔
422
    } else {
423
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
239,644,760✔
424
    }
425
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
280,531,506✔
426
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
339✔
427
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
339✔
428
    }
429
    
430
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
280,531,167✔
431
      qDebug("empty resultBlockList");
111,698,920✔
432
      return NULL;
111,698,642✔
433
    } else {
434
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
168,832,247✔
435
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
168,832,247✔
436
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
168,832,247✔
437
      if (!tmp) {
168,832,247✔
438
        code = terrno;
×
439
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
440
        pTaskInfo->code = code;
×
441
        T_LONG_JMP(pTaskInfo->env, code);
×
442
      }
443

444
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
168,832,247✔
445
      return p;
168,832,247✔
446
    }
447
  }
448
}
449

450
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
633,434,148✔
451
  int32_t        code = TSDB_CODE_SUCCESS;
633,434,148✔
452
  int32_t        lino = 0;
633,434,148✔
453
  SExchangeInfo* pExchangeInfo = pOperator->info;
633,434,148✔
454
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
633,435,754✔
455

456
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
633,433,861✔
457

458
  code = pOperator->fpSet._openFn(pOperator);
633,435,332✔
459
  QUERY_CHECK_CODE(code, lino, _end);
633,435,632✔
460

461
  if (pOperator->status == OP_EXEC_DONE) {
633,435,632✔
462
    (*ppRes) = NULL;
170,040✔
463
    return code;
170,040✔
464
  }
465

466
  while (1) {
2,280,487✔
467
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
635,545,470✔
468
    if (pBlock == NULL) {
635,541,679✔
469
      (*ppRes) = NULL;
111,698,920✔
470
      return code;
111,698,920✔
471
    }
472

473
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
523,842,759✔
474
    QUERY_CHECK_CODE(code, lino, _end);
523,842,797✔
475

476
    if (blockDataGetNumOfRows(pBlock) == 0) {
523,842,797✔
477
      qDebug("rows 0 block got, continue next load");
3,358✔
478
      continue;
3,358✔
479
    }
480

481
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
523,839,669✔
482
    if (hasLimitOffsetInfo(pLimitInfo)) {
523,839,669✔
483
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
4,489,025✔
484
      if (status == PROJECT_RETRIEVE_CONTINUE) {
4,489,025✔
485
        qDebug("limit retrieve continue");
2,277,129✔
486
        continue;
2,277,129✔
487
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,211,896✔
488
        if (pBlock->info.rows == 0) {
2,211,896✔
489
          setOperatorCompleted(pOperator);
×
490
          (*ppRes) = NULL;
×
491
          return code;
×
492
        } else {
493
          (*ppRes) = pBlock;
2,211,896✔
494
          return code;
2,211,896✔
495
        }
496
      }
497
    } else {
498
      (*ppRes) = pBlock;
519,350,546✔
499
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
519,350,687✔
500
      return code;
519,351,152✔
501
    }
502
  }
503

504
_end:
×
505

506
  if (code != TSDB_CODE_SUCCESS) {
×
507
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
508
    pTaskInfo->code = code;
×
509
    T_LONG_JMP(pTaskInfo->env, code);
×
510
  } else {
511
    qDebug("empty block returned in exchange");
×
512
  }
513
  
514
  (*ppRes) = NULL;
×
515
  return code;
×
516
}
517

518
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
111,364,940✔
519
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
111,364,940✔
520
  if (pInfo->pSourceDataInfo == NULL) {
111,365,501✔
521
    return terrno;
×
522
  }
523

524
  if (pInfo->dynamicOp) {
111,366,547✔
525
    return TSDB_CODE_SUCCESS;
6,128,103✔
526
  }
527

528
  int32_t len = strlen(id) + 1;
105,237,882✔
529
  pInfo->pTaskId = taosMemoryCalloc(1, len);
105,237,882✔
530
  if (!pInfo->pTaskId) {
105,236,814✔
531
    return terrno;
×
532
  }
533
  tstrncpy(pInfo->pTaskId, id, len);
105,235,746✔
534
  for (int32_t i = 0; i < numOfSources; ++i) {
274,074,146✔
535
    SSourceDataInfo dataInfo = {0};
168,835,172✔
536
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
168,834,239✔
537
    dataInfo.taskId = pInfo->pTaskId;
168,834,239✔
538
    dataInfo.index = i;
168,835,551✔
539
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
168,835,551✔
540
    if (pDs == NULL) {
168,836,089✔
541
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
542
      return terrno;
×
543
    }
544
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
168,836,089✔
545
  }
546

547
  return TSDB_CODE_SUCCESS;
105,238,974✔
548
}
549

550
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
111,365,448✔
551
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
111,365,448✔
552

553
  if (numOfSources == 0) {
111,362,067✔
554
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
555
    return TSDB_CODE_INVALID_PARA;
×
556
  }
557
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
111,362,067✔
558
  if (!pInfo->pFetchRpcHandles) {
111,364,523✔
559
    return terrno;
×
560
  }
561
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
111,365,069✔
562
  if (!ret) {
111,365,986✔
563
    return terrno;
×
564
  }
565

566
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
111,365,986✔
567
  if (pInfo->pSources == NULL) {
111,365,480✔
568
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
569
    return terrno;
×
570
  }
571

572
  if (pExNode->node.dynamicOp) {
111,365,631✔
573
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,128,103✔
574
    if (NULL == pInfo->pHashSources) {
6,128,103✔
575
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
576
      return terrno;
×
577
    }
578
  }
579

580
  for (int32_t i = 0; i < numOfSources; ++i) {
292,370,537✔
581
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
181,005,738✔
582
    if (!pNode) {
181,002,543✔
583
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
584
      return terrno;
×
585
    }
586
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
181,002,543✔
587
    if (!tmp) {
181,004,883✔
588
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
589
      return terrno;
×
590
    }
591
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
181,004,883✔
592
    int32_t           code =
593
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
181,005,239✔
594
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
181,004,754✔
595
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
596
      return code;
×
597
    }
598
  }
599

600
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
111,364,799✔
601
  int64_t refId = taosAddRef(fetchObjRefPool, pInfo);
111,364,181✔
602
  if (refId < 0) {
111,363,864✔
603
    int32_t code = terrno;
×
604
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
605
    return code;
×
606
  } else {
607
    pInfo->self = refId;
111,363,864✔
608
  }
609

610
  return initDataSource(numOfSources, pInfo, id);
111,364,964✔
611
}
612

613
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
10,579,501✔
614
  SExchangeInfo* pInfo = pOper->info;
10,579,501✔
615
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
10,580,208✔
616

617
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
10,579,971✔
618

619
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
10,580,439✔
620
  pOper->status = OP_NOT_OPENED;
10,580,442✔
621
  pInfo->current = 0;
10,580,442✔
622
  pInfo->loadInfo.totalElapsed = 0;
10,580,442✔
623
  pInfo->loadInfo.totalRows = 0;
10,580,203✔
624
  pInfo->loadInfo.totalSize = 0;
10,580,203✔
625
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
30,011,858✔
626
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
19,431,182✔
627
    taosWLockLatch(&pDataInfo->lock);
19,431,421✔
628
    taosMemoryFreeClear(pDataInfo->decompBuf);
19,430,943✔
629
    taosMemoryFreeClear(pDataInfo->pRsp);
19,431,182✔
630

631
    pDataInfo->totalRows = 0;
19,431,182✔
632
    pDataInfo->code = 0;
19,430,948✔
633
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
19,431,421✔
634
    pDataInfo->fetchSent = false;
19,431,421✔
635
    pDataInfo->fetchTimes = 0;
19,430,709✔
636
    pDataInfo->fetchCostUs = 0;
19,430,948✔
637
    taosWUnLockLatch(&pDataInfo->lock);
19,430,948✔
638
  }
639

640
  if (pInfo->dynamicOp) {
10,580,203✔
641
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,911,845✔
642
  } 
643

644
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
10,580,203✔
645
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
10,580,206✔
646

647
  blockDataCleanup(pInfo->pDummyBlock);
10,580,203✔
648

649
  void   *data = NULL;
10,579,967✔
650
  int32_t iter = 0;
10,579,967✔
651
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
14,080,240✔
652
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
3,500,273✔
653
  }
654
  
655
  pInfo->limitInfo = (SLimitInfo){0};
10,579,694✔
656
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
10,579,933✔
657

658
  return 0;
10,580,442✔
659
}
660

661
static int32_t exchangeGetExplainExecInfo(SOperatorInfo* pOptr,
1,667,616✔
662
                                          void** pOptrExplain, uint32_t* len) {
663
  const SExchangeInfo* pExchangeInfo = pOptr->info;
1,667,616✔
664
  int32_t numSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
1,667,616✔
665

666
  SExchangeExplainInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeExplainInfo));
1,667,616✔
667
  if (!pInfo) {
1,667,616✔
668
    return terrno;
×
669
  }
670

671
  pInfo->mode = pExchangeInfo->seqLoadData ? 1 : 0;
1,667,616✔
672
  pInfo->numSources = numSources;
1,667,616✔
673

674
  /* all sources are exhausted, thus no need to lock the sources data info */
675
  for (int32_t i = 0; i < numSources; ++i) {
3,649,611✔
676
    const SSourceDataInfo* pSrc = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
1,982,390✔
677
    pInfo->avgFetchTimes += (double)pSrc->fetchTimes / numSources;
1,982,390✔
678
    pInfo->avgFetchRows += (double)pSrc->totalRows / numSources;
1,981,995✔
679
    pInfo->avgFetchCost += (double)pSrc->fetchCostUs / numSources;
1,981,995✔
680
    pInfo->maxFetchTimes = TMAX(pInfo->maxFetchTimes, pSrc->fetchTimes);
1,982,390✔
681
    pInfo->maxFetchRows = TMAX(pInfo->maxFetchRows, pSrc->totalRows);
1,982,390✔
682
    pInfo->maxFetchCost = TMAX(pInfo->maxFetchCost, pSrc->fetchCostUs);
1,982,390✔
683
  }
684

685
  *pOptrExplain = pInfo;
1,667,221✔
686
  *len = sizeof(SExchangeExplainInfo);
1,667,616✔
687
  return TSDB_CODE_SUCCESS;
1,667,616✔
688
}
689

690
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
111,365,471✔
691
                                   SOperatorInfo** pOptrInfo) {
692
  QRY_PARAM_CHECK(pOptrInfo);
111,365,471✔
693

694
  int32_t        code = 0;
111,366,539✔
695
  int32_t        lino = 0;
111,366,539✔
696
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
111,366,539✔
697
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
111,365,180✔
698
  if (pInfo == NULL || pOperator == NULL) {
111,364,318✔
699
    code = terrno;
×
700
    goto _error;
×
701
  }
702
  initOperatorCostInfo(pOperator);
111,364,318✔
703

704
  pInfo->isExchange = true;
111,365,448✔
705
  pOperator->pPhyNode = pExNode;
111,365,448✔
706
  pInfo->dynamicOp = pExNode->node.dynamicOp;
111,366,033✔
707
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
111,365,448✔
708
  QUERY_CHECK_CODE(code, lino, _error);
111,367,077✔
709

710
  code = tsem_init(&pInfo->ready, 0, 0);
111,367,077✔
711
  QUERY_CHECK_CODE(code, lino, _error);
111,364,751✔
712

713
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
111,364,751✔
714
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
111,367,077✔
715

716
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
111,367,077✔
717
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
111,365,243✔
718
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
111,366,682✔
719
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
111,366,152✔
720

721
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
111,366,153✔
722
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
111,365,093✔
723
  QUERY_CHECK_CODE(code, lino, _error);
111,365,986✔
724

725
  pInfo->seqLoadData = pExNode->seqRecvData;
111,365,986✔
726
  pInfo->dynTbname = pExNode->dynTbname;
111,366,515✔
727
  if (pInfo->dynTbname) {
111,365,607✔
728
    pInfo->seqLoadData = true;
32,150✔
729
  }
730
  pInfo->pTransporter = pTransporter;
111,365,986✔
731

732
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
111,365,456✔
733
                  pTaskInfo);
734
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
111,365,227✔
735

736
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
111,367,077✔
737
                            pTaskInfo->pStreamRuntimeInfo);
111,364,698✔
738
  QUERY_CHECK_CODE(code, lino, _error);
111,364,857✔
739
  qTrace("%s exchange op:%p", __func__, pOperator);
111,364,857✔
740
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL,
111,364,857✔
741
                                         destroyExchangeOperatorInfo, optrDefaultBufFn,
742
                                         exchangeGetExplainExecInfo, optrDefaultGetNextExtFn, NULL);
743
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
111,365,053✔
744
  *pOptrInfo = pOperator;
111,365,203✔
745
  return TSDB_CODE_SUCCESS;
111,365,203✔
746

747
_error:
×
748
  if (code != TSDB_CODE_SUCCESS) {
×
749
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
750
    pTaskInfo->code = code;
×
751
  }
752
  if (pInfo != NULL) {
×
753
    doDestroyExchangeOperatorInfo(pInfo);
×
754
  }
755

756
  if (pOperator != NULL) {
×
757
    pOperator->info = NULL;
×
758
    destroyOperator(pOperator);
×
759
  }
760
  return code;
×
761
}
762

763
void destroyExchangeOperatorInfo(void* param) {
111,365,871✔
764
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
111,365,871✔
765
  int32_t        code = taosRemoveRef(fetchObjRefPool, pExInfo->self);
111,365,871✔
766
  if (code != TSDB_CODE_SUCCESS) {
111,367,077✔
767
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
768
  }
769
}
111,367,077✔
770

771
void freeBlock(void* pParam) {
299,967,665✔
772
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
299,967,665✔
773
  blockDataDestroy(pBlock);
299,967,159✔
774
}
299,967,904✔
775

776
void freeSourceDataInfo(void* p) {
172,440,482✔
777
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
172,440,482✔
778
  taosMemoryFreeClear(pInfo->decompBuf);
172,440,482✔
779
  taosMemoryFreeClear(pInfo->pRsp);
172,440,378✔
780

781
  pInfo->decompBufSize = 0;
172,440,884✔
782
}
172,440,884✔
783

784
void doDestroyExchangeOperatorInfo(void* param) {
111,366,675✔
785
  if (param == NULL) {
111,366,675✔
786
    return;
×
787
  }
788
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
111,366,675✔
789
  if (pExInfo->pFetchRpcHandles) {
111,366,675✔
790
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
292,375,038✔
791
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
181,008,065✔
792
      if (*pRpcHandle > 0) {
181,007,559✔
793
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
12,961,193✔
794
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
12,961,193✔
795
      }
796
    }
797
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
111,367,077✔
798
  }
799

800
  taosArrayDestroy(pExInfo->pSources);
111,366,169✔
801
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
111,367,077✔
802

803
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
111,365,365✔
804
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
111,365,372✔
805

806
  blockDataDestroy(pExInfo->pDummyBlock);
111,367,077✔
807
  tSimpleHashCleanup(pExInfo->pHashSources);
111,367,077✔
808

809
  int32_t code = tsem_destroy(&pExInfo->ready);
111,366,675✔
810
  if (code != TSDB_CODE_SUCCESS) {
111,366,137✔
811
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
812
  }
813
  taosMemoryFreeClear(pExInfo->pTaskId);
111,366,137✔
814

815
  taosMemoryFreeClear(param);
111,367,102✔
816
}
817

818
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
222,529,075✔
819
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
222,529,075✔
820

821
  taosMemoryFreeClear(pMsg->pEpSet);
222,529,075✔
822
  SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pWrapper->exchangeId);
222,542,195✔
823
  if (pExchangeInfo == NULL) {
222,542,826✔
824
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
23,976✔
825
    taosMemoryFree(pMsg->pData);
23,976✔
826
    return TSDB_CODE_SUCCESS;
23,976✔
827
  }
828

829
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
222,518,850✔
830
  if (pWrapper->seqId != currSeqId) {
222,526,226✔
831
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
832
    taosMemoryFree(pMsg->pData);
×
833
    code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
×
834
    if (code != TSDB_CODE_SUCCESS) {
×
835
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
836
    }
837
    return TSDB_CODE_SUCCESS;
×
838
  }
839

840
  int32_t          index = pWrapper->sourceIndex;
222,521,484✔
841

842
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
222,516,559✔
843

844
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
222,520,354✔
845
  if (pRpcHandle != NULL) {
222,524,486✔
846
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
222,526,035✔
847
    if (ret != 0) {
222,513,386✔
848
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
15,467,401✔
849
    }
850
    *pRpcHandle = -1;
222,513,386✔
851
  }
852

853
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
222,500,092✔
854
  if (!pSourceDataInfo) {
222,523,763✔
855
    return terrno;
×
856
  }
857

858
  if (0 == code && NULL == pMsg->pData) {
222,523,763✔
859
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
860
    code = TSDB_CODE_QRY_INVALID_MSG;
×
861
  }
862

863
  taosWLockLatch(&pSourceDataInfo->lock);
222,517,567✔
864
  if (code == TSDB_CODE_SUCCESS) {
222,521,580✔
865
    pSourceDataInfo->fetchCostUs += taosGetTimestampUs() - pSourceDataInfo->startTime;
222,519,918✔
866
    pSourceDataInfo->fetchTimes++;
222,504,973✔
867

868
    pSourceDataInfo->seqId = pWrapper->seqId;
222,497,862✔
869
    pSourceDataInfo->pRsp = pMsg->pData;
222,506,291✔
870

871
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
222,496,665✔
872
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
222,521,282✔
873
    pRsp->compLen = htonl(pRsp->compLen);
222,510,303✔
874
    pRsp->payloadLen = htonl(pRsp->payloadLen);
222,498,327✔
875
    pRsp->numOfCols = htonl(pRsp->numOfCols);
222,451,964✔
876
    pRsp->useconds = htobe64(pRsp->useconds);
222,458,472✔
877
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
222,447,839✔
878

879
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
222,476,699✔
880
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
881
  } else {
882
    taosMemoryFree(pMsg->pData);
3,257✔
883
    pSourceDataInfo->code = rpcCvtErrCode(code);
3,257✔
884
    if (pSourceDataInfo->code != code) {
3,257✔
885
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
886
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
887
    } else {
888
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
3,257✔
889
             pExchangeInfo);
890
    }
891
  }
892

893
  tmemory_barrier();
222,487,069✔
894
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
222,487,069✔
895
  taosWUnLockLatch(&pSourceDataInfo->lock);
222,501,619✔
896
  
897
  code = tsem_post(&pExchangeInfo->ready);
222,486,421✔
898
  if (code != TSDB_CODE_SUCCESS) {
222,523,613✔
899
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
900
    return code;
×
901
  }
902

903
  code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
222,523,613✔
904
  if (code != TSDB_CODE_SUCCESS) {
222,532,603✔
905
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
906
  }
907
  return code;
222,527,574✔
908
}
909

910
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
271,124✔
911
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
271,124✔
912
  if (NULL == *ppRes) {
271,124✔
913
    return terrno;
×
914
  }
915

916
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
271,124✔
917
  if (NULL == pScan) {
271,124✔
918
    taosMemoryFreeClear(*ppRes);
×
919
    return terrno;
×
920
  }
921

922
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
271,124✔
923
  pScan->pUidList = taosArrayDup(pUidList, NULL);
271,124✔
924
  if (NULL == pScan->pUidList) {
271,124✔
925
    taosMemoryFree(pScan);
×
926
    taosMemoryFreeClear(*ppRes);
×
927
    return terrno;
×
928
  }
929
  pScan->dynType = DYN_TYPE_STB_JOIN;
271,124✔
930
  pScan->tableSeq = tableSeq;
271,124✔
931
  pScan->pOrgTbInfo = NULL;
271,124✔
932
  pScan->pBatchTbInfo = NULL;
271,124✔
933
  pScan->pTagList = NULL;
271,124✔
934
  pScan->isNewParam = false;
271,124✔
935
  pScan->window.skey = INT64_MAX;
271,124✔
936
  pScan->window.ekey = INT64_MIN;
271,124✔
937
  pScan->notifyToProcess = false;
271,124✔
938
  pScan->notifyTs = 0;
271,124✔
939

940
  (*ppRes)->opType = srcOpType;
271,124✔
941
  (*ppRes)->downstreamIdx = 0;
271,124✔
942
  (*ppRes)->value = pScan;
271,124✔
943
  (*ppRes)->pChildren = NULL;
271,124✔
944
  (*ppRes)->reUse = false;
271,124✔
945

946
  return TSDB_CODE_SUCCESS;
271,124✔
947
}
948

949
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
22,829,464✔
950
  int32_t                  code = TSDB_CODE_SUCCESS;
22,829,464✔
951
  int32_t                  lino = 0;
22,829,464✔
952
  STableScanOperatorParam* pScan = NULL;
22,829,464✔
953

954
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
22,829,464✔
955
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
22,829,464✔
956

957
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
22,829,464✔
958
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
22,829,464✔
959

960
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
22,829,464✔
961
  if (pUidList) {
22,829,464✔
962
    pScan->pUidList = taosArrayDup(pUidList, NULL);
22,829,464✔
963
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
22,829,464✔
964
  } else {
965
    pScan->pUidList = NULL;
×
966
  }
967

968
  if (pMap) {
22,829,464✔
969
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
22,085,599✔
970
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
22,085,599✔
971

972
    pScan->pOrgTbInfo->vgId = pMap->vgId;
22,085,599✔
973
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
22,085,599✔
974

975
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
22,085,599✔
976
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
22,085,599✔
977
  } else {
978
    pScan->pOrgTbInfo = NULL;
743,865✔
979
  }
980
  pScan->pTagList = NULL;
22,829,464✔
981
  pScan->pBatchTbInfo = NULL;
22,829,227✔
982

983

984
  pScan->dynType = type;
22,829,464✔
985
  pScan->tableSeq = tableSeq;
22,829,227✔
986
  pScan->window.skey = window->skey;
22,829,464✔
987
  pScan->window.ekey = window->ekey;
22,829,464✔
988
  pScan->isNewParam = isNewParam;
22,829,464✔
989
  pScan->notifyToProcess = false;
22,829,464✔
990
  pScan->notifyTs = 0;
22,829,464✔
991
  (*ppRes)->opType = srcOpType;
22,829,464✔
992
  (*ppRes)->downstreamIdx = 0;
22,829,464✔
993
  (*ppRes)->value = pScan;
22,829,464✔
994
  (*ppRes)->pChildren = NULL;
22,829,464✔
995
  (*ppRes)->reUse = false;
22,829,464✔
996

997
  return code;
22,829,227✔
998
_return:
×
999
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1000
  taosMemoryFreeClear(*ppRes);
×
1001
  if (pScan) {
×
1002
    taosArrayDestroy(pScan->pUidList);
×
1003
    if (pScan->pOrgTbInfo) {
×
1004
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
1005
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
1006
    }
1007
    taosMemoryFree(pScan);
×
1008
  }
1009
  return code;
×
1010
}
1011

1012
/**
1013
  @brief build the table scan operator param for notify message
1014
*/
1015
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
232,030✔
1016
                                          int32_t srcOpType, TSKEY notifyTs) {
1017
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
232,030✔
1018
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
1019
    return TSDB_CODE_INVALID_PARA;
×
1020
  }
1021
  int32_t code = TSDB_CODE_SUCCESS;
232,030✔
1022
  int32_t lino = 0;
232,030✔
1023
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
232,030✔
1024
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
232,030✔
1025

1026
  STableScanOperatorParam* pTsParam =
464,060✔
1027
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
232,030✔
1028
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
232,030✔
1029

1030
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
232,030✔
1031
  pTsParam->notifyToProcess = true;
232,030✔
1032
  pTsParam->notifyTs = notifyTs;
232,030✔
1033

1034
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
232,030✔
1035
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1036
  (*ppRes)->downstreamIdx = 0;
232,030✔
1037
  (*ppRes)->value = pTsParam;
232,030✔
1038
  (*ppRes)->pChildren = NULL;
232,030✔
1039
  /* param is not reusable when it is transferred by message */
1040
  (*ppRes)->reUse = false;
232,030✔
1041

1042
_return:
232,030✔
1043
  if (TSDB_CODE_SUCCESS != code) {
232,030✔
1044
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1045
           __func__, lino, tstrerror(code));
1046
    taosMemoryFreeClear(*ppRes);
×
1047
    if (pTsParam) {
×
1048
      taosMemoryFree(pTsParam);
×
1049
    }
1050
  }
1051
  return code;
232,030✔
1052
}
1053

1054
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
2,212,592✔
1055
  int32_t                  code = TSDB_CODE_SUCCESS;
2,212,592✔
1056
  int32_t                  lino = 0;
2,212,592✔
1057
  STableScanOperatorParam* pScan = NULL;
2,212,592✔
1058

1059
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
2,212,592✔
1060
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
2,212,592✔
1061

1062
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
2,212,592✔
1063
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
2,212,592✔
1064

1065
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
2,212,592✔
1066
  pScan->groupid = groupid;
2,212,592✔
1067
  if (pUidList) {
2,212,592✔
1068
    pScan->pUidList = taosArrayDup(pUidList, NULL);
2,212,592✔
1069
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
2,212,592✔
1070
  } else {
1071
    pScan->pUidList = NULL;
×
1072
  }
1073
  pScan->pOrgTbInfo = NULL;
2,212,592✔
1074

1075
  if (pBatchMap) {
2,212,592✔
1076
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
2,212,592✔
1077
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
2,212,592✔
1078
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
6,531,179✔
1079
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
4,318,587✔
1080
      SOrgTbInfo batchInfo = {0};
4,318,587✔
1081
      batchInfo.vgId = pSrcInfo->vgId;
4,318,587✔
1082
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
4,318,587✔
1083
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
4,318,587✔
1084
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
4,318,587✔
1085
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
4,318,587✔
1086
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
4,318,587✔
1087
    }
1088
  } else {
1089
    pScan->pBatchTbInfo = NULL;
×
1090
  }
1091

1092
  if (pTagList) {
2,212,592✔
1093
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
804,528✔
1094
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
804,528✔
1095

1096
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
5,104,512✔
1097
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
4,299,984✔
1098
      STagVal  dstTag;
4,299,984✔
1099
      dstTag.type = pSrcTag->type;
4,299,984✔
1100
      dstTag.cid = pSrcTag->cid;
4,299,984✔
1101
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,299,984✔
1102
        dstTag.nData = pSrcTag->nData;
1,879,200✔
1103
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
1,879,200✔
1104
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
1,879,200✔
1105
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
1,879,200✔
1106
      } else {
1107
        dstTag.i64 = pSrcTag->i64;
2,420,784✔
1108
      }
1109

1110
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
8,599,968✔
1111
    }
1112
  } else {
1113
    pScan->pTagList = NULL;
1,408,064✔
1114
  }
1115

1116

1117
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
2,212,592✔
1118
  pScan->tableSeq = tableSeq;
2,212,592✔
1119
  pScan->window.skey = window->skey;
2,212,592✔
1120
  pScan->window.ekey = window->ekey;
2,212,592✔
1121
  pScan->isNewParam = isNewParam;
2,212,592✔
1122
  pScan->notifyToProcess = false;
2,212,592✔
1123
  pScan->notifyTs = 0;
2,212,592✔
1124
  (*ppRes)->opType = srcOpType;
2,212,592✔
1125
  (*ppRes)->downstreamIdx = 0;
2,212,592✔
1126
  (*ppRes)->value = pScan;
2,212,592✔
1127
  (*ppRes)->pChildren = NULL;
2,212,592✔
1128
  (*ppRes)->reUse = false;
2,212,592✔
1129

1130
  return code;
2,212,592✔
1131
_return:
×
1132
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1133
  taosMemoryFreeClear(*ppRes);
×
1134
  if (pScan) {
×
1135
    taosArrayDestroy(pScan->pUidList);
×
1136
    if (pScan->pBatchTbInfo) {
×
1137
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1138
    }
1139
    taosMemoryFree(pScan);
×
1140
  }
1141
  return code;
×
1142
}
1143

1144
/*
1145
 * Build hash-agg operator get-param for dynamic virtual-table aggregation.
1146
 *
1147
 * @param ppRes Output operator param.
1148
 * @param groupid Group id bound to this batch.
1149
 * @param pUidList Source table uid list.
1150
 * @param pBatchMap Batch table metadata map.
1151
 * @param pTagList Optional tag value list.
1152
 * @param tableSeq Whether to keep table-sequential mode.
1153
 * @param window Time window for downstream scan.
1154
 * @param isNewParam Whether downstream should treat this as a new param batch.
1155
 *
1156
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1157
 */
1158
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
1,617,716✔
1159
                              SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1160
  int32_t                  code = TSDB_CODE_SUCCESS;
1,617,716✔
1161
  int32_t                  lino = 0;
1,617,716✔
1162
  SOperatorParam*          pParam = NULL;
1,617,716✔
1163

1164
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
1,617,716✔
1165
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
1,617,716✔
1166

1167
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
1,617,716✔
1168
  pParam->downstreamIdx = 0;
1,617,716✔
1169
  pParam->value = NULL;
1,617,716✔
1170
  pParam->pChildren = NULL;
1,617,716✔
1171
  pParam->reUse = false;
1,617,716✔
1172

1173
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
1,617,716✔
1174
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
1,617,716✔
1175

1176
  SOperatorParam* pTableScanParam = NULL;
1,617,716✔
1177
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
1,617,716✔
1178
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1179
  QUERY_CHECK_CODE(code, lino, _return);
1,617,716✔
1180

1181
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
3,235,432✔
1182

1183
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
1,617,716✔
1184
  pParam->downstreamIdx = 0;
1,617,716✔
1185
  pParam->value = NULL;
1,617,716✔
1186
  pParam->reUse = false;
1,617,716✔
1187

1188
  *ppRes = pParam;
1,617,716✔
1189
  return code;
1,617,716✔
1190

1191
_return:
×
1192
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1193
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1194
  return code;
×
1195
}
1196

1197
/*
1198
 * Build hash-interval operator get-param for dynamic virtual-table interval query.
1199
 *
1200
 * @param ppRes Output operator param.
1201
 * @param groupid Group id bound to this batch.
1202
 * @param pUidList Source table uid list.
1203
 * @param pBatchMap Batch table metadata map.
1204
 * @param pTagList Optional tag value list.
1205
 * @param tableSeq Whether to keep table-sequential mode.
1206
 * @param window Time window for downstream scan.
1207
 * @param isNewParam Whether downstream should treat this as a new param batch.
1208
 *
1209
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1210
 */
1211
static int32_t buildIntervalOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
51,030✔
1212
                                          SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1213
  int32_t         code = TSDB_CODE_SUCCESS;
51,030✔
1214
  int32_t         lino = 0;
51,030✔
1215
  SOperatorParam* pParam = NULL;
51,030✔
1216

1217
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
51,030✔
1218
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
51,030✔
1219

1220
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
51,030✔
1221
  pParam->downstreamIdx = 0;
51,030✔
1222
  pParam->value = NULL;
51,030✔
1223
  pParam->reUse = false;
51,030✔
1224
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
51,030✔
1225
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
51,030✔
1226

1227
  SOperatorParam* pTableScanParam = NULL;
51,030✔
1228
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
51,030✔
1229
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1230
  QUERY_CHECK_CODE(code, lino, _return);
51,030✔
1231

1232
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
102,060✔
1233

1234
  *ppRes = pParam;
51,030✔
1235
  return code;
51,030✔
1236

1237
_return:
×
1238
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1239
  qError("%s failed at %d, failed to build interval scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1240
  return code;
×
1241
}
1242

1243
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
3,063,401✔
1244
  int32_t                  code = TSDB_CODE_SUCCESS;
3,063,401✔
1245
  int32_t                  lino = 0;
3,063,401✔
1246
  STagScanOperatorParam*   pScan = NULL;
3,063,401✔
1247

1248
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
3,063,401✔
1249
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,063,401✔
1250

1251
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
3,063,401✔
1252
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,063,401✔
1253
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
3,063,401✔
1254

1255
  (*ppRes)->opType = srcOpType;
3,063,401✔
1256
  (*ppRes)->downstreamIdx = 0;
3,063,401✔
1257
  (*ppRes)->value = pScan;
3,063,401✔
1258
  (*ppRes)->pChildren = NULL;
3,063,401✔
1259
  (*ppRes)->reUse = false;
3,063,401✔
1260

1261
  return code;
3,063,401✔
1262
_return:
×
1263
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1264
  taosMemoryFreeClear(*ppRes);
×
1265
  if (pScan) {
×
1266
    taosMemoryFree(pScan);
×
1267
  }
1268
  return code;
×
1269
}
1270

1271
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,603,827✔
1272
  if (!pRuntimeInfo || !pTimeRange) {
4,603,827✔
1273
    return TSDB_CODE_INTERNAL_ERROR;
×
1274
  }
1275

1276
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,604,066✔
1277
  if (!pParam) {
4,603,830✔
1278
    return TSDB_CODE_INTERNAL_ERROR;
×
1279
  }
1280

1281
  switch (pRuntimeInfo->triggerType) {
4,603,830✔
1282
    case STREAM_TRIGGER_SLIDING:
3,375,493✔
1283
      // Unable to distinguish whether there is an interval, all use wstart/wend
1284
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1285
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,375,493✔
1286
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,375,493✔
1287
      break;
3,375,493✔
1288
    case STREAM_TRIGGER_PERIOD:
316,399✔
1289
      pTimeRange->skey = pParam->prevLocalTime;
316,399✔
1290
      pTimeRange->ekey = pParam->triggerTime;
316,399✔
1291
      break;
316,399✔
1292
    default:
911,699✔
1293
      pTimeRange->skey = pParam->wstart;
911,699✔
1294
      pTimeRange->ekey = pParam->wend;
911,938✔
1295
      break;
911,935✔
1296
  }
1297

1298
  return TSDB_CODE_SUCCESS;
4,603,827✔
1299
}
1300

1301
void clearVtbScanDataInfo(void* pItem) {
29,534,446✔
1302
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
29,534,446✔
1303
  if (pInfo->orgTbInfo) {
29,534,446✔
1304
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
22,085,599✔
1305
    taosMemoryFreeClear(pInfo->orgTbInfo);
22,085,599✔
1306
  }
1307
  if (pInfo->batchOrgTbInfo) {
29,534,209✔
1308
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
6,531,179✔
1309
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
4,318,587✔
1310
      if (pColMap) {
4,318,587✔
1311
        taosArrayDestroy(pColMap->colMap);
4,318,587✔
1312
      }
1313
    }
1314
    taosArrayDestroy(pInfo->batchOrgTbInfo);
2,212,592✔
1315
    pInfo->batchOrgTbInfo = NULL;
2,212,592✔
1316
  }
1317
  if (pInfo->tagList) {
29,534,446✔
1318
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
804,528✔
1319
    pInfo->tagList = NULL;
804,528✔
1320
  }
1321
  if (pInfo->pSrcUidList) {
29,534,446✔
1322
    taosArrayDestroy(pInfo->pSrcUidList);
24,298,191✔
1323
    pInfo->pSrcUidList = NULL;
24,297,954✔
1324
  }
1325
}
29,534,209✔
1326

1327
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
229,497,130✔
1328
  int32_t          code = TSDB_CODE_SUCCESS;
229,497,130✔
1329
  int32_t          lino = 0;
229,497,130✔
1330
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
229,497,130✔
1331
  QUERY_CHECK_NULL(pDataInfo, code, lino, _end, terrno);
229,494,999✔
1332

1333
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
229,494,999✔
1334
    return TSDB_CODE_SUCCESS;
6,814,804✔
1335
  }
1336

1337
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
222,683,916✔
1338
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
222,684,155✔
1339
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
222,681,421✔
1340

1341
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
222,681,421✔
1342

1343
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
222,681,958✔
1344
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
222,681,092✔
1345
  pWrapper->exchangeId = pExchangeInfo->self;
222,681,092✔
1346
  pWrapper->sourceIndex = sourceIndex;
222,683,109✔
1347
  pWrapper->seqId = pExchangeInfo->seqId;
222,682,818✔
1348

1349
  if (pSource->localExec) {
222,682,137✔
1350
    SDataBuf pBuf = {0};
×
1351
    code =
1352
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1353
                                  pTaskInfo->id.queryId, pSource->clientId,
1354
                                  pSource->taskId, 0, pSource->execId,
1355
                                  &pBuf.pData,
1356
                                  pTaskInfo->localFetch.explainRes);
1357
    QUERY_CHECK_CODE(code, lino, _end);
×
1358
    pDataInfo->startTime = taosGetTimestampUs();
×
1359
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1360
    taosMemoryFreeClear(pWrapper);
×
1361
    QUERY_CHECK_CODE(code, lino, _end);
×
1362
  } else {
1363
    bool needStreamPesudoFuncVals = true;
222,682,176✔
1364
    SResFetchReq req = {0};
222,682,176✔
1365
    req.header.vgId = pSource->addr.nodeId;
222,682,832✔
1366
    req.sId = pSource->sId;
222,682,152✔
1367
    req.clientId = pSource->clientId;
222,680,646✔
1368
    req.taskId = pSource->taskId;
222,682,025✔
1369
    req.queryId = pTaskInfo->id.queryId;
222,682,036✔
1370
    req.execId = pSource->execId;
222,680,469✔
1371
    if (pTaskInfo->pStreamRuntimeInfo) {
222,679,894✔
1372
      req.dynTbname = pExchangeInfo->dynTbname;
14,130,986✔
1373
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
14,131,225✔
1374
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
14,130,750✔
1375

1376
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
14,130,970✔
1377
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
1,722,934✔
1378
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
12,407,797✔
1379
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,603,827✔
1380
        QUERY_CHECK_CODE(code, lino, _end);
4,603,827✔
1381
        needStreamPesudoFuncVals = false;
4,603,827✔
1382
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,603,827✔
1383
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1384
               req.pStRtFuncInfo->curWindow.ekey);
1385
      }
1386
      if (!pDataInfo->fetchSent) {
14,130,731✔
1387
        req.reset = pDataInfo->fetchSent = true;
7,693,959✔
1388
      }
1389
    }
1390

1391
    switch (pDataInfo->type) {
222,678,565✔
1392
      case EX_SRC_TYPE_VSTB_SCAN: {
22,085,599✔
1393
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
22,085,599✔
1394
        clearVtbScanDataInfo(pDataInfo);
22,085,362✔
1395
        QUERY_CHECK_CODE(code, lino, _end);
22,085,599✔
1396
        break;
22,085,599✔
1397
      }
1398
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
1,578,565✔
1399
        if (pDataInfo->pSrcUidList) {
1,578,565✔
1400
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
743,865✔
1401
          taosArrayDestroy(pDataInfo->pSrcUidList);
743,865✔
1402
          pDataInfo->pSrcUidList = NULL;
743,865✔
1403
          QUERY_CHECK_CODE(code, lino, _end);
743,865✔
1404
        }
1405
        break;
1,578,565✔
1406
      }
1407
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
3,063,401✔
1408
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
3,063,401✔
1409
        taosArrayDestroy(pDataInfo->pSrcUidList);
3,063,401✔
1410
        pDataInfo->pSrcUidList = NULL;
3,063,401✔
1411
        QUERY_CHECK_CODE(code, lino, _end);
3,063,401✔
1412
        break;
3,063,401✔
1413
      }
1414
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
1,481,499✔
1415
      case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
1416
      case EX_SRC_TYPE_VSTB_TS_SCAN: {
1417
        if (pDataInfo->batchOrgTbInfo) {
1,481,499✔
1418
          int32_t srcOpType =
543,846✔
1419
              (pDataInfo->type == EX_SRC_TYPE_VSTB_TS_SCAN)
543,846✔
1420
                  ? QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
1421
                  : QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
543,846✔
1422
          code = buildTableScanOperatorParamBatchInfo(
1,087,692✔
1423
              &req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, srcOpType,
1424
              pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window,
543,846✔
1425
              pDataInfo->isNewParam);
543,846✔
1426
          clearVtbScanDataInfo(pDataInfo);
543,846✔
1427
          QUERY_CHECK_CODE(code, lino, _end);
543,846✔
1428
        }
1429
        break;
1,481,499✔
1430
      }
1431
      case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN: {
51,030✔
1432
        if (pDataInfo->batchOrgTbInfo) {
51,030✔
1433
          code = buildIntervalOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
102,060✔
1434
                                            pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq,
51,030✔
1435
                                            &pDataInfo->window, pDataInfo->isNewParam);
51,030✔
1436
          clearVtbScanDataInfo(pDataInfo);
51,030✔
1437
          QUERY_CHECK_CODE(code, lino, _end);
51,030✔
1438
        }
1439
        break;
51,030✔
1440
      }
1441
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1,617,716✔
1442
        if (pDataInfo->batchOrgTbInfo) {
1,617,716✔
1443
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
3,235,432✔
1444
                                       pDataInfo->batchOrgTbInfo, pDataInfo->tagList,
1445
                                       pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
3,235,432✔
1446
          clearVtbScanDataInfo(pDataInfo);
1,617,716✔
1447
          QUERY_CHECK_CODE(code, lino, _end);
1,617,716✔
1448
        }
1449
        break;
1,617,716✔
1450
      }
1451
      case EX_SRC_TYPE_STB_JOIN_SCAN:
192,798,462✔
1452
      default: {
1453
        if (pDataInfo->pSrcUidList) {
192,798,462✔
1454
          code = buildTableScanOperatorParam(&req.pOpParam,
258,802✔
1455
                                             pDataInfo->pSrcUidList,
1456
                                             pDataInfo->srcOpType,
1457
                                             pDataInfo->tableSeq);
258,802✔
1458
          /* source uid list can be reused in vnode size, so only use once */
1459
          taosArrayDestroy(pDataInfo->pSrcUidList);
258,802✔
1460
          pDataInfo->pSrcUidList = NULL;
258,802✔
1461
          QUERY_CHECK_CODE(code, lino, _end);
258,802✔
1462
        }
1463
        if (pExchangeInfo->notifyToSend) {
192,804,288✔
1464
          if (NULL == req.pOpParam) {
232,030✔
1465
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
232,030✔
1466
                                                     pDataInfo->srcOpType,
1467
                                                     pExchangeInfo->notifyTs);
1468
            QUERY_CHECK_CODE(code, lino, _end);
232,030✔
1469
          } else {
1470
            /**
1471
              Currently don't support use the same param for multiple times!
1472
            */
1473
            qError("%s, %s failed, currently don't support use the same param "
×
1474
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
1475
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
1476
            taosMemoryFree(pWrapper);
×
1477
            return pTaskInfo->code;
×
1478
          }
1479
          pExchangeInfo->notifyToSend = false;
232,030✔
1480
        }
1481
        break;
192,803,045✔
1482
      }
1483
    }
1484

1485
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
222,680,855✔
1486
    if (msgSize < 0) {
222,681,688✔
1487
      pTaskInfo->code = msgSize;
×
1488
      taosMemoryFree(pWrapper);
×
1489
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1490
      return pTaskInfo->code;
×
1491
    }
1492

1493
    void* msg = taosMemoryCalloc(1, msgSize);
222,681,688✔
1494
    if (NULL == msg) {
222,678,787✔
1495
      pTaskInfo->code = terrno;
×
1496
      taosMemoryFree(pWrapper);
×
1497
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1498
      return pTaskInfo->code;
×
1499
    }
1500

1501
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
222,678,787✔
1502
    if (msgSize < 0) {
222,678,564✔
1503
      pTaskInfo->code = msgSize;
×
1504
      taosMemoryFree(pWrapper);
×
1505
      taosMemoryFree(msg);
×
1506
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1507
      return pTaskInfo->code;
×
1508
    }
1509

1510
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
222,678,564✔
1511

1512
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
222,678,812✔
1513
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1514
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1515
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1516

1517
    // send the fetch remote task result reques
1518
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
222,681,801✔
1519
    if (NULL == pMsgSendInfo) {
222,681,654✔
1520
      taosMemoryFreeClear(msg);
×
1521
      taosMemoryFree(pWrapper);
×
1522
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1523
      pTaskInfo->code = terrno;
×
1524
      return pTaskInfo->code;
×
1525
    }
1526

1527
    pMsgSendInfo->param = pWrapper;
222,681,654✔
1528
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
222,682,348✔
1529
    pMsgSendInfo->msgInfo.pData = msg;
222,682,541✔
1530
    pMsgSendInfo->msgInfo.len = msgSize;
222,682,541✔
1531
    pMsgSendInfo->msgType = pSource->fetchMsgType;
222,680,881✔
1532
    pMsgSendInfo->fp = loadRemoteDataCallback;
222,682,063✔
1533
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
222,682,063✔
1534

1535
    int64_t transporterId = 0;
222,682,241✔
1536
    void* poolHandle = NULL;
222,681,393✔
1537
    pDataInfo->startTime = taosGetTimestampUs();
222,683,537✔
1538
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
222,683,246✔
1539
    QUERY_CHECK_CODE(code, lino, _end);
222,682,326✔
1540
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
222,682,326✔
1541
    *pRpcHandle = transporterId;
222,684,155✔
1542
  }
1543

1544
_end:
222,683,365✔
1545
  if (code != TSDB_CODE_SUCCESS) {
222,683,365✔
1546
    if (pWrapper) {
×
1547
      taosMemoryFree(pWrapper);
×
1548
    }
1549
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1550
  }
1551
  return code;
222,683,362✔
1552
}
1553

1554
/**
1555
  @brief record the data loading metrics of the exchange operator, including
1556
  the number of rows, the data length, and the elapsed time of current load operation.
1557
*/
1558
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows,
173,887,777✔
1559
                          int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator) {
1560
  pInfo->totalRows += numOfRows;
173,887,777✔
1561
  pInfo->totalSize += dataLen;
173,887,249✔
1562
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
173,887,249✔
1563
}
173,887,777✔
1564

1565
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan) {
536,175,464✔
1566
  int32_t      code = TSDB_CODE_SUCCESS;
536,175,464✔
1567
  int32_t      lino = 0;
536,175,464✔
1568
  SSDataBlock* pBlock = NULL;
536,175,464✔
1569
  if (isVstbScan) {
536,175,464✔
1570
    blockDataCleanup(pRes);
15,560,383✔
1571
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
15,560,383✔
1572
    QUERY_CHECK_CODE(code, lino, _end);
15,560,383✔
1573
  }
1574
  if (pColList == NULL) {  // data from other sources
536,175,464✔
1575
    blockDataCleanup(pRes);
531,119,934✔
1576
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
531,118,722✔
1577
    QUERY_CHECK_CODE(code, lino, _end);
531,117,720✔
1578
  } else {  // extract data according to pColList
1579
    char* pStart = pData;
5,055,530✔
1580

1581
    int32_t numOfCols = htonl(*(int32_t*)pStart);
5,055,530✔
1582
    pStart += sizeof(int32_t);
5,055,530✔
1583

1584
    // todo refactor:extract method
1585
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
5,055,530✔
1586
    for (int32_t i = 0; i < numOfCols; ++i) {
70,995,478✔
1587
      SSysTableSchema* p = (SSysTableSchema*)pStart;
65,939,948✔
1588

1589
      p->colId = htons(p->colId);
65,939,948✔
1590
      p->bytes = htonl(p->bytes);
65,939,948✔
1591
      pStart += sizeof(SSysTableSchema);
65,939,948✔
1592
    }
1593

1594
    pBlock = NULL;
5,055,530✔
1595
    code = createDataBlock(&pBlock);
5,055,530✔
1596
    QUERY_CHECK_CODE(code, lino, _end);
5,055,530✔
1597

1598
    for (int32_t i = 0; i < numOfCols; ++i) {
70,995,478✔
1599
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
65,939,948✔
1600
      code = blockDataAppendColInfo(pBlock, &idata);
65,939,948✔
1601
      QUERY_CHECK_CODE(code, lino, _end);
65,939,948✔
1602
    }
1603

1604
    code = blockDecodeInternal(pBlock, pStart, NULL);
5,055,530✔
1605
    QUERY_CHECK_CODE(code, lino, _end);
5,055,530✔
1606

1607
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
5,055,530✔
1608
    QUERY_CHECK_CODE(code, lino, _end);
5,055,530✔
1609

1610
    // data from mnode
1611
    pRes->info.dataLoad = 1;
5,055,530✔
1612
    pRes->info.rows = pBlock->info.rows;
5,055,530✔
1613
    pRes->info.scanFlag = MAIN_SCAN;
5,055,530✔
1614
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
5,055,530✔
1615
    QUERY_CHECK_CODE(code, lino, _end);
5,055,530✔
1616

1617
    blockDataDestroy(pBlock);
5,055,530✔
1618
    pBlock = NULL;
5,055,530✔
1619
  }
1620

1621
_end:
536,173,250✔
1622
  if (code != TSDB_CODE_SUCCESS) {
536,172,774✔
1623
    blockDataDestroy(pBlock);
×
1624
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1625
  }
1626
  return code;
536,172,774✔
1627
}
1628

1629
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
89,251,396✔
1630
  SExchangeInfo* pExchangeInfo = pOperator->info;
89,251,396✔
1631
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
89,251,396✔
1632

1633
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
89,251,396✔
1634
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
89,251,396✔
1635
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
89,251,396✔
1636
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1637
         pLoadInfo->totalElapsed / 1000.0);
1638

1639
  setOperatorCompleted(pOperator);
89,251,396✔
1640
}
89,251,396✔
1641

1642
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
306,058,153✔
1643
  int32_t code = TSDB_CODE_SUCCESS;
306,058,153✔
1644
  int32_t lino = 0;
306,058,153✔
1645
  size_t  total = taosArrayGetSize(pArray);
306,058,153✔
1646

1647
  int32_t completed = 0;
306,059,538✔
1648
  for (int32_t k = 0; k < total; ++k) {
920,814,805✔
1649
    SSourceDataInfo* p = taosArrayGet(pArray, k);
614,755,333✔
1650
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
614,755,627✔
1651
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
614,755,627✔
1652
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
291,507,109✔
1653
      completed += 1;
291,507,617✔
1654
    }
1655
  }
1656

1657
  *pRes = completed;
306,059,472✔
1658
_end:
306,059,719✔
1659
  if (code != TSDB_CODE_SUCCESS) {
306,059,719✔
1660
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1661
  }
1662
  return code;
306,060,485✔
1663
}
1664

1665
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
111,097,519✔
1666
  SExchangeInfo* pExchangeInfo = pOperator->info;
111,097,519✔
1667
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
111,096,989✔
1668

1669
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
111,095,852✔
1670
  int64_t startTs = taosGetTimestampUs();
111,095,473✔
1671

1672
  // Asynchronously send all fetch requests to all sources.
1673
  for (int32_t i = 0; i < totalSources; ++i) {
288,506,593✔
1674
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
177,408,568✔
1675
    if (code != TSDB_CODE_SUCCESS) {
177,411,674✔
1676
      pTaskInfo->code = code;
554✔
1677
      return code;
×
1678
    }
1679
  }
1680

1681
  int64_t endTs = taosGetTimestampUs();
111,098,049✔
1682
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
111,098,049✔
1683
         totalSources, (endTs - startTs) / 1000.0);
1684

1685
  pOperator->status = OP_RES_TO_RETURN;
111,098,049✔
1686
  if (isTaskKilled(pTaskInfo)) {
111,098,049✔
1687
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1688
  }
1689

1690
  return TSDB_CODE_SUCCESS;
111,097,663✔
1691
}
1692

1693
/**
1694
  @brief store STEP DONE notification info
1695
*/
1696
void storeNotifyInfo(SOperatorInfo* pOperator) {
4,194,020✔
1697
  SExchangeInfo*  pExchangeInfo = pOperator->info;
4,194,020✔
1698
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4,194,020✔
1699
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,194,020✔
1700

1701
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
4,194,020✔
1702
  if (!pParam->multiParams) {
4,194,020✔
1703
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
4,194,020✔
1704
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
4,194,020✔
1705
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1706
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
1707
      return;
×
1708
    }
1709

1710
    pExchangeInfo->notifyToSend = true;
4,194,020✔
1711
    pExchangeInfo->notifyTs = pBasic->notifyTs;
4,194,020✔
1712
  } else {
1713
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1714
           GET_TASKID(pTaskInfo), __func__);
1715
  }
1716
}
1717

1718
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
168,832,247✔
1719
  int32_t            code = TSDB_CODE_SUCCESS;
168,832,247✔
1720
  int32_t            lino = 0;
168,832,247✔
1721
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
168,832,247✔
1722
  SSDataBlock*       pb = NULL;
168,832,247✔
1723

1724
  char* pNextStart = pRetrieveRsp->data;
168,832,247✔
1725
  char* pStart = pNextStart;
168,832,247✔
1726

1727
  int32_t index = 0;
168,832,247✔
1728

1729
  if (pRetrieveRsp->compressed) {  // decompress the data
168,832,247✔
1730
    if (pDataInfo->decompBuf == NULL) {
×
1731
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1732
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1733
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1734
    } else {
1735
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1736
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1737
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1738
        if (p != NULL) {
×
1739
          pDataInfo->decompBuf = p;
×
1740
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1741
        }
1742
      }
1743
    }
1744
  }
1745

1746
  while (index++ < pRetrieveRsp->numOfBlocks) {
699,952,181✔
1747
    pStart = pNextStart;
531,119,424✔
1748

1749
    if (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN) {
531,119,424✔
1750
      pb = taosMemoryCalloc(1, sizeof(SSDataBlock));
15,560,383✔
1751
      QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
15,560,383✔
1752
    } else if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
515,558,811✔
1753
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
231,150,769✔
1754
      blockDataCleanup(pb);
231,151,522✔
1755
    } else {
1756
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
284,407,572✔
1757
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
284,407,795✔
1758
    }
1759

1760
    int32_t compLen = *(int32_t*)pStart;
531,119,700✔
1761
    pStart += sizeof(int32_t);
531,119,700✔
1762

1763
    int32_t rawLen = *(int32_t*)pStart;
531,119,462✔
1764
    pStart += sizeof(int32_t);
531,119,211✔
1765
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
531,119,462✔
1766

1767
    pNextStart = pStart + compLen;
531,119,462✔
1768
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
531,119,462✔
1769
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1770
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1771
      pStart = pDataInfo->decompBuf;
×
1772
    }
1773

1774
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart, (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN));
531,119,700✔
1775
    if (code != 0) {
531,116,936✔
1776
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1777
      goto _end;
×
1778
    }
1779

1780
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
531,116,936✔
1781
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
531,119,228✔
1782
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
531,119,228✔
1783
    pb = NULL;
531,119,934✔
1784
  }
1785

1786
_end:
168,832,247✔
1787
  if (code != TSDB_CODE_SUCCESS) {
168,832,247✔
1788
    blockDataDestroy(pb);
×
1789
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1790
  }
1791
  return code;
168,832,247✔
1792
}
1793

1794
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
25,443,794✔
1795
  SExchangeInfo* pExchangeInfo = pOperator->info;
25,443,794✔
1796
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
25,444,180✔
1797

1798
  int32_t code = 0;
25,444,180✔
1799
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
25,444,180✔
1800
  int64_t startTs = taosGetTimestampUs();
25,444,180✔
1801

1802
  int32_t vgId = 0;
25,444,180✔
1803
  if (pExchangeInfo->dynTbname) {
25,444,180✔
1804
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
231,528✔
1805
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
231,528✔
1806
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
231,528✔
1807
      if (pValue != NULL && pValue->isTbname) {
231,528✔
1808
        vgId = pValue->vgId;
231,528✔
1809
        break;
231,528✔
1810
      }
1811
    }
1812
  }
1813

1814
  while (1) {
6,663,577✔
1815
    if (pExchangeInfo->current >= totalSources) {
32,107,757✔
1816
      setAllSourcesCompleted(pOperator);
6,553,801✔
1817
      return TSDB_CODE_SUCCESS;
6,553,801✔
1818
    }
1819

1820
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
25,553,956✔
1821
    if (!pSource) {
25,553,956✔
1822
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1823
      pTaskInfo->code = terrno;
×
1824
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1825
    }
1826

1827
    if (vgId != 0 && pSource->addr.nodeId != vgId){
25,553,956✔
1828
      pExchangeInfo->current += 1;
197,139✔
1829
      continue;
197,139✔
1830
    }
1831

1832
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
25,356,817✔
1833
    if (!pDataInfo) {
25,356,817✔
1834
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1835
      pTaskInfo->code = terrno;
×
1836
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1837
    }
1838
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
25,356,817✔
1839

1840
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
25,356,817✔
1841
    if (code != TSDB_CODE_SUCCESS) {
25,356,817✔
1842
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1843
      pTaskInfo->code = code;
×
1844
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1845
    }
1846

1847
    while (true) {
814✔
1848
      recordOpExecBeforeDownstream(pOperator);
25,357,631✔
1849
      code = exchangeWait(pOperator, pExchangeInfo);
25,357,631✔
1850
      recordOpExecAfterDownstream(pOperator, 0);
25,357,631✔
1851

1852
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
25,357,631✔
1853
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
814✔
1854
      }
1855

1856
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
25,356,817✔
1857
      if (pDataInfo->seqId != currSeqId) {
25,356,817✔
1858
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
814✔
1859
        taosMemoryFreeClear(pDataInfo->pRsp);
814✔
1860
        continue;
814✔
1861
      }
1862

1863
      break;
25,356,003✔
1864
    }
1865

1866
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
25,356,003✔
1867
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
670✔
1868
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1869
             tstrerror(pDataInfo->code));
1870
      pOperator->pTaskInfo->code = pDataInfo->code;
670✔
1871
      return pOperator->pTaskInfo->code;
670✔
1872
    }
1873

1874
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
25,355,333✔
1875
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
25,355,333✔
1876

1877
    if (pRsp->numOfRows == 0) {
25,355,333✔
1878
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
6,466,438✔
1879
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1880
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1881
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1882

1883
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
6,466,438✔
1884
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
6,466,438✔
1885
        pExchangeInfo->current = totalSources;
6,388,580✔
1886
      } else {
1887
        pExchangeInfo->current += 1;
77,858✔
1888
      }
1889
      taosMemoryFreeClear(pDataInfo->pRsp);
6,466,438✔
1890
      continue;
6,466,438✔
1891
    }
1892

1893
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
18,888,895✔
1894
    if (code != TSDB_CODE_SUCCESS) {
18,888,895✔
1895
      goto _error;
×
1896
    }
1897

1898
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
18,888,895✔
1899
    if (pRsp->completed == 1) {
18,888,895✔
1900
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
181,124✔
1901
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1902
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1903
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1904
             pExchangeInfo->current + 1, totalSources);
1905

1906
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
181,124✔
1907
      if (isVstbScan(pDataInfo)) {
181,124✔
1908
        pExchangeInfo->current = totalSources;
×
1909
      } else {
1910
        pExchangeInfo->current += 1;
181,124✔
1911
      }
1912
    } else {
1913
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
18,707,771✔
1914
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1915
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1916
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1917
    }
1918
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
18,888,895✔
1919
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
18,562,984✔
1920
    }
1921
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
18,888,895✔
1922
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
18,888,895✔
1923

1924
    taosMemoryFreeClear(pDataInfo->pRsp);
18,888,895✔
1925
    return TSDB_CODE_SUCCESS;
18,888,895✔
1926
  }
1927

1928
_error:
×
1929
  pTaskInfo->code = code;
×
1930
  return code;
×
1931
}
1932

1933
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
2,212,592✔
1934
  int32_t  code = TSDB_CODE_SUCCESS;
2,212,592✔
1935
  int32_t  lino = 0;
2,212,592✔
1936
  STagVal  dstTag;
2,212,592✔
1937
  bool     needFree = false;
2,212,592✔
1938

1939
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
2,212,592✔
1940
    qError("%s failed since invalid exchange operator param type %d",
×
1941
      __func__, pBasicParam->paramType);
1942
    return TSDB_CODE_INVALID_PARA;
×
1943
  }
1944

1945
  if (pDataInfo->tagList) {
2,212,592✔
1946
    taosArrayClear(pDataInfo->tagList);
×
1947
  }
1948

1949
  if (pBasicParam->tagList) {
2,212,592✔
1950
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
804,528✔
1951
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
804,528✔
1952

1953
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
5,104,512✔
1954
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
4,299,984✔
1955
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
4,299,984✔
1956

1957
      dstTag = (STagVal){0};
4,299,984✔
1958
      dstTag.type = pSrcTag->type;
4,299,984✔
1959
      dstTag.cid = pSrcTag->cid;
4,299,984✔
1960
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
4,299,984✔
1961
        dstTag.nData = pSrcTag->nData;
1,879,200✔
1962
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
1,879,200✔
1963
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
1,879,200✔
1964
        needFree = true;
1,879,200✔
1965
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
1,879,200✔
1966
      } else {
1967
        dstTag.i64 = pSrcTag->i64;
2,420,784✔
1968
      }
1969

1970
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
8,599,968✔
1971
      needFree = false;
4,299,984✔
1972
    }
1973
  } else {
1974
    pDataInfo->tagList = NULL;
1,408,064✔
1975
  }
1976

1977
  return code;
2,212,592✔
1978
_return:
×
1979
  if (needFree) {
×
1980
    taosMemoryFreeClear(dstTag.pData);
×
1981
  }
1982
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1983
  return code;
×
1984
}
1985

1986
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
2,212,592✔
1987
  int32_t     code = TSDB_CODE_SUCCESS;
2,212,592✔
1988
  int32_t     lino = 0;
2,212,592✔
1989
  SOrgTbInfo  dstOrgTbInfo = {0};
2,212,592✔
1990
  bool        needFree = false;
2,212,592✔
1991

1992
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
2,212,592✔
1993
    qError("%s failed since invalid exchange operator param type %d",
×
1994
      __func__, pBasicParam->paramType);
1995
    return TSDB_CODE_INVALID_PARA;
×
1996
  }
1997

1998
  if (pBasicParam->batchOrgTbInfo) {
2,212,592✔
1999
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
2,212,592✔
2000
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
2,212,592✔
2001

2002
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
6,531,179✔
2003
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
4,318,587✔
2004
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
4,318,587✔
2005

2006
      dstOrgTbInfo = (SOrgTbInfo){0};
4,318,587✔
2007
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
4,318,587✔
2008
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
4,318,587✔
2009

2010
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
4,318,587✔
2011
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
4,318,587✔
2012

2013
      needFree = true;
4,318,587✔
2014
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
8,637,174✔
2015
      needFree = false;
4,318,587✔
2016
    }
2017
  } else {
2018
    pBasicParam->batchOrgTbInfo = NULL;
×
2019
  }
2020

2021
  return code;
2,212,592✔
2022
_return:
×
2023
  if (needFree) {
×
2024
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
2025
  }
2026
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2027
  return code;
×
2028
}
2029

2030
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
28,483,329✔
2031
                                SExchangeOperatorBasicParam* pBasicParam) {
2032
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
28,483,329✔
2033
    qWarn("%s, %s found invalid exchange operator param type %d",
×
2034
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
2035
    return TSDB_CODE_SUCCESS;
×
2036
  }
2037

2038
  int32_t            code = TSDB_CODE_SUCCESS;
28,483,329✔
2039
  int32_t            lino = 0;
28,483,329✔
2040
  SExchangeInfo*     pExchangeInfo = pOperator->info;
28,483,329✔
2041
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
28,483,329✔
2042

2043
  if (NULL == pIdx) {
28,483,329✔
2044
    if (pBasicParam->isNewDeployed) {
121,644✔
2045
      SDownstreamSourceNode *pNode = NULL;
2,574✔
2046
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,574✔
2047
      QUERY_CHECK_CODE(code, lino, _return);
2,574✔
2048

2049
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,574✔
2050
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,574✔
2051
      QUERY_CHECK_CODE(code, lino, _return);
2,574✔
2052

2053
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,574✔
2054
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,574✔
2055

2056
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,574✔
2057
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,574✔
2058
      if (pExchangeInfo->pHashSources) {
2,574✔
2059
        QUERY_CHECK_CODE(code, lino, _return);
2,574✔
2060
      }
2061
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,574✔
2062
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,574✔
2063
    } else if (pBasicParam->type == EX_SRC_TYPE_VSTB_TS_SCAN || pBasicParam->type == EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN) {
119,070✔
2064
      // Multi-exchange virtual table paths build each exchange param from the full vg map.
2065
      // If this exchange does not own the current vg source, skip it and let the matching exchange consume it.
2066
      qDebug("addSingleExchangeSource found no existing source for vgId: %d, sourceType:%d, skip it",
119,070✔
2067
             pBasicParam->vgId, pBasicParam->type);
2068
      return TSDB_CODE_SUCCESS;
119,070✔
2069
    } else {
2070
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
2071
      return TSDB_CODE_INVALID_PARA;
×
2072
    }
2073
  }
2074

2075
  qDebug("start to add single exchange source");
28,364,259✔
2076

2077
  switch (pBasicParam->type) {
28,364,259✔
2078
    case EX_SRC_TYPE_VSTB_TS_SCAN:
2,212,592✔
2079
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
2080
    case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
2081
    case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN:
2082
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
2083
      if (pIdx->inUseIdx < 0) {
2,212,592✔
2084
        SSourceDataInfo dataInfo = {0};
1,253,904✔
2085
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,253,904✔
2086
        dataInfo.taskId = pExchangeInfo->pTaskId;
1,253,904✔
2087
        dataInfo.index = pIdx->srcIdx;
1,253,904✔
2088
        dataInfo.groupid = pBasicParam->groupid;
1,253,904✔
2089
        dataInfo.window = pBasicParam->window;
1,253,904✔
2090
        dataInfo.isNewParam = pBasicParam->isNewParam;
1,253,904✔
2091
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
1,253,904✔
2092
        QUERY_CHECK_CODE(code, lino, _return);
1,253,904✔
2093

2094
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
1,253,904✔
2095
        QUERY_CHECK_CODE(code, lino, _return);
1,253,904✔
2096

2097
        dataInfo.orgTbInfo = NULL;
1,253,904✔
2098

2099
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,253,904✔
2100
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
1,253,904✔
2101

2102
        dataInfo.type = pBasicParam->type;
1,253,904✔
2103
        dataInfo.srcOpType = pBasicParam->srcOpType;
1,253,904✔
2104
        dataInfo.tableSeq = pBasicParam->tableSeq;
1,253,904✔
2105

2106
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
2,507,808✔
2107

2108
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,253,904✔
2109
      } else {
2110
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
958,688✔
2111
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
958,688✔
2112

2113
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
958,688✔
2114
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
958,688✔
2115
        }
2116

2117
        pDataInfo->taskId = pExchangeInfo->pTaskId;
958,688✔
2118
        pDataInfo->index = pIdx->srcIdx;
958,688✔
2119
        pDataInfo->window = pBasicParam->window;
958,688✔
2120
        pDataInfo->groupid = pBasicParam->groupid;
958,688✔
2121
        pDataInfo->isNewParam = pBasicParam->isNewParam;
958,688✔
2122

2123
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
958,688✔
2124
        QUERY_CHECK_CODE(code, lino, _return);
958,688✔
2125

2126
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
958,688✔
2127
        QUERY_CHECK_CODE(code, lino, _return);
958,688✔
2128

2129
        pDataInfo->orgTbInfo = NULL;
958,688✔
2130

2131
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
958,688✔
2132
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
958,688✔
2133

2134
        pDataInfo->type = pBasicParam->type;
958,688✔
2135
        pDataInfo->srcOpType = pBasicParam->srcOpType;
958,688✔
2136
        pDataInfo->tableSeq = pBasicParam->tableSeq;
958,688✔
2137
      }
2138
      break;
2,212,592✔
2139
    }
2140
    case EX_SRC_TYPE_VTB_WIN_SCAN:
3,807,266✔
2141
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2142
      SSourceDataInfo dataInfo = {0};
3,807,266✔
2143
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
3,807,266✔
2144
      dataInfo.taskId = pExchangeInfo->pTaskId;
3,807,266✔
2145
      dataInfo.index = pIdx->srcIdx;
3,807,266✔
2146
      dataInfo.window = pBasicParam->window;
3,807,266✔
2147
      dataInfo.groupid = 0;
3,807,266✔
2148
      dataInfo.orgTbInfo = NULL;
3,807,266✔
2149
      dataInfo.tagList = NULL;
3,807,266✔
2150

2151
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
3,807,266✔
2152
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
3,807,266✔
2153

2154
      dataInfo.isNewParam = false;
3,807,266✔
2155
      dataInfo.type = pBasicParam->type;
3,807,266✔
2156
      dataInfo.srcOpType = pBasicParam->srcOpType;
3,807,266✔
2157
      dataInfo.tableSeq = pBasicParam->tableSeq;
3,807,266✔
2158

2159
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
3,807,266✔
2160
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
7,614,532✔
2161
      break;
3,807,266✔
2162
    }
2163
    case EX_SRC_TYPE_VSTB_SCAN: {
22,085,599✔
2164
      SSourceDataInfo dataInfo = {0};
22,085,599✔
2165
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
22,085,599✔
2166
      dataInfo.taskId = pExchangeInfo->pTaskId;
22,085,599✔
2167
      dataInfo.index = pIdx->srcIdx;
22,085,599✔
2168
      dataInfo.window = pBasicParam->window;
22,085,599✔
2169
      dataInfo.groupid = 0;
22,085,599✔
2170
      dataInfo.isNewParam = pBasicParam->isNewParam;
22,085,599✔
2171
      dataInfo.tagList = NULL;
22,085,599✔
2172
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
22,085,599✔
2173
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
22,085,599✔
2174
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
22,085,599✔
2175
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
22,085,599✔
2176
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
22,085,599✔
2177
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
22,085,599✔
2178

2179
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
22,085,599✔
2180
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
22,085,599✔
2181

2182
      dataInfo.type = pBasicParam->type;
22,085,599✔
2183
      dataInfo.srcOpType = pBasicParam->srcOpType;
22,085,599✔
2184
      dataInfo.tableSeq = pBasicParam->tableSeq;
22,085,599✔
2185

2186
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
22,085,599✔
2187
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
44,171,198✔
2188
      break;
22,085,599✔
2189
    }
2190
    case EX_SRC_TYPE_STB_JOIN_SCAN:
258,802✔
2191
    default: {
2192
      if (pIdx->inUseIdx < 0) {
258,802✔
2193
        SSourceDataInfo dataInfo = {0};
256,432✔
2194
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
256,432✔
2195
        dataInfo.taskId = pExchangeInfo->pTaskId;
256,432✔
2196
        dataInfo.index = pIdx->srcIdx;
256,432✔
2197
        dataInfo.groupid = 0;
256,432✔
2198
        dataInfo.tagList = NULL;
256,432✔
2199

2200
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
256,432✔
2201
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
256,432✔
2202

2203
        dataInfo.isNewParam = false;
256,432✔
2204
        dataInfo.type = pBasicParam->type;
256,432✔
2205
        dataInfo.srcOpType = pBasicParam->srcOpType;
256,432✔
2206
        dataInfo.tableSeq = pBasicParam->tableSeq;
256,432✔
2207

2208
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
512,864✔
2209

2210
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
256,432✔
2211
      } else {
2212
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,370✔
2213
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,370✔
2214
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,370✔
2215
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,370✔
2216
        }
2217

2218
        pDataInfo->tagList = NULL;
2,370✔
2219
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,370✔
2220
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,370✔
2221

2222
        pDataInfo->groupid = 0;
2,370✔
2223
        pDataInfo->isNewParam = false;
2,370✔
2224
        pDataInfo->type = pBasicParam->type;
2,370✔
2225
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,370✔
2226
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,370✔
2227
      }
2228
      break;
258,802✔
2229
    }
2230
  }
2231

2232
  return code;
28,364,259✔
2233
_return:
×
2234
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2235
  return code;
×
2236
}
2237

2238
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
27,965,826✔
2239
  SExchangeInfo*               pExchangeInfo = pOperator->info;
27,965,826✔
2240
  int32_t                      code = TSDB_CODE_SUCCESS;
27,965,826✔
2241
  SExchangeOperatorBasicParam* pBasicParam = NULL;
27,965,826✔
2242
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
27,965,826✔
2243
  if (pParam->multiParams) {
27,965,826✔
2244
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
2,068,221✔
2245
    int32_t                      iter = 0;
2,068,221✔
2246
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
4,653,945✔
2247
      code = addSingleExchangeSource(pOperator, pBasicParam);
2,585,724✔
2248
      if (code) {
2,585,724✔
2249
        return code;
×
2250
      }
2251
    }
2252
  } else {
2253
    pBasicParam = &pParam->basic;
25,897,605✔
2254
    code = addSingleExchangeSource(pOperator, pBasicParam);
25,897,605✔
2255
  }
2256

2257
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
27,965,826✔
2258
  pOperator->pOperatorGetParam = NULL;
27,965,826✔
2259

2260
  return code;
27,965,826✔
2261
}
2262

2263
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
679,853,139✔
2264
  SExchangeInfo* pExchangeInfo = pOperator->info;
679,853,139✔
2265
  int32_t        code = TSDB_CODE_SUCCESS;
679,853,529✔
2266
  int32_t        lino = 0;
679,853,529✔
2267
  
2268
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
679,853,529✔
2269
       NULL == pOperator->pOperatorGetParam) ||
534,182,938✔
2270
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
149,862,687✔
2271
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
536,836,545✔
2272
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2273
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2274
    return TSDB_CODE_SUCCESS;
536,837,073✔
2275
  }
2276

2277
  if (pExchangeInfo->dynamicOp) {
143,016,959✔
2278
    code = addDynamicExchangeSource(pOperator);
27,965,826✔
2279
    QUERY_CHECK_CODE(code, lino, _end);
27,965,826✔
2280
  }
2281

2282
  if (pOperator->status == OP_NOT_OPENED &&
143,016,723✔
2283
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
135,344,109✔
2284
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
120,175,452✔
2285
    pExchangeInfo->current = 0;
29,735,561✔
2286
  }
2287

2288
  if (NULL != pOperator->pOperatorGetParam) {
143,015,825✔
2289
    SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,194,020✔
2290
    storeNotifyInfo(pOperator);
4,194,020✔
2291

2292
    if (!pGetParam->reUse) {
4,194,020✔
2293
      freeOperatorParam(pGetParam, OP_GET_PARAM);
×
2294
    } else {
2295
      /**
2296
        The param is referenced by getParam, and it will be freed by
2297
        the parent operator after getting next block.
2298
      */
2299
      pGetParam->reUse = false;
4,194,020✔
2300
    }
2301
    pOperator->pOperatorGetParam = NULL;
4,194,020✔
2302
  }
2303

2304
  int64_t st = taosGetTimestampUs();
143,015,499✔
2305

2306
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
143,015,499✔
2307
    code = prepareConcurrentlyLoad(pOperator);
111,096,610✔
2308
    QUERY_CHECK_CODE(code, lino, _end);
111,097,511✔
2309
    pExchangeInfo->openedTs = taosGetTimestampUs();
111,098,049✔
2310
  }
2311

2312
  OPTR_SET_OPENED(pOperator);
143,017,404✔
2313
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
143,017,113✔
2314

2315
_end:
45,522,776✔
2316
  if (code != TSDB_CODE_SUCCESS) {
143,017,253✔
2317
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2318
    pOperator->pTaskInfo->code = code;
×
2319
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2320
  }
2321
  return code;
143,017,253✔
2322
}
2323

2324
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
4,489,025✔
2325
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,489,025✔
2326

2327
  if (pLimitInfo->remainGroupOffset > 0) {
4,489,025✔
2328
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2329
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2330
      blockDataCleanup(pBlock);
×
2331
      return PROJECT_RETRIEVE_CONTINUE;
×
2332
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2333
      // now it is the data from a new group
2334
      pLimitInfo->remainGroupOffset -= 1;
×
2335

2336
      // ignore data block in current group
2337
      if (pLimitInfo->remainGroupOffset > 0) {
×
2338
        blockDataCleanup(pBlock);
×
2339
        return PROJECT_RETRIEVE_CONTINUE;
×
2340
      }
2341
    }
2342

2343
    // set current group id of the project operator
2344
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2345
  }
2346

2347
  // here check for a new group data, we need to handle the data of the previous group.
2348
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
4,489,025✔
2349
    pLimitInfo->numOfOutputGroups += 1;
180,453✔
2350
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
180,453✔
2351
      pOperator->status = OP_EXEC_DONE;
×
2352
      blockDataCleanup(pBlock);
×
2353

2354
      return PROJECT_RETRIEVE_DONE;
×
2355
    }
2356

2357
    // reset the value for a new group data
2358
    resetLimitInfoForNextGroup(pLimitInfo);
180,453✔
2359
    // existing rows that belongs to previous group.
2360
    if (pBlock->info.rows > 0) {
180,453✔
2361
      return PROJECT_RETRIEVE_DONE;
180,453✔
2362
    }
2363
  }
2364

2365
  // here we reach the start position, according to the limit/offset requirements.
2366

2367
  // set current group id
2368
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
4,308,572✔
2369

2370
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
4,308,572✔
2371
  if (pBlock->info.rows == 0) {
4,308,572✔
2372
    return PROJECT_RETRIEVE_CONTINUE;
2,277,129✔
2373
  } else {
2374
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
2,031,443✔
2375
      setOperatorCompleted(pOperator);
×
2376
      return PROJECT_RETRIEVE_DONE;
×
2377
    }
2378
  }
2379

2380
  // todo optimize performance
2381
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2382
  // they may not belong to the same group the limit/offset value is not valid in this case.
2383
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
2,031,443✔
2384
    return PROJECT_RETRIEVE_DONE;
2,031,443✔
2385
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2386
    return PROJECT_RETRIEVE_CONTINUE;
×
2387
  }
2388
}
2389

2390
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
221,676,736✔
2391
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
221,676,736✔
2392
  int32_t        code = TSDB_CODE_SUCCESS;
221,677,266✔
2393
  if (pTask->pWorkerCb) {
221,677,266✔
2394
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
221,677,505✔
2395
    if (code != TSDB_CODE_SUCCESS) {
221,677,254✔
2396
      pTask->code = code;
×
2397
      return pTask->code;
×
2398
    }
2399
  }
2400

2401
  code = tsem_wait(&pExchangeInfo->ready);
221,676,230✔
2402
  if (code != TSDB_CODE_SUCCESS) {
221,676,967✔
2403
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2404
    pTask->code = code;
×
2405
    return pTask->code;
×
2406
  }
2407

2408
  if (pTask->pWorkerCb) {
221,676,967✔
2409
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
221,677,505✔
2410
    if (code != TSDB_CODE_SUCCESS) {
221,678,043✔
2411
      pTask->code = code;
×
2412
      return pTask->code;
×
2413
    }
2414
  }
2415
  return TSDB_CODE_SUCCESS;
221,677,505✔
2416
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc