• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.55
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
} SSourceDataInfo;
58

59
static void destroyExchangeOperatorInfo(void* param);
60
static void freeBlock(void* pParam);
61
static void freeSourceDataInfo(void* param);
62
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
63

64
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
65
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
66
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
67
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
68
static void    storeNotifyInfo(SOperatorInfo* pOperator);
69
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
70
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
71
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
72
                                 bool holdDataInBuf);
73
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
74

75
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
76

77
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
25,366,206✔
78
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
79
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
80
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
16,580,272✔
81
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
82

83

84
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
9,631,064✔
85
                                             SExchangeInfo* pExchangeInfo,
86
                                             SExecTaskInfo* pTaskInfo) {
87
  int32_t code = 0;
9,631,064✔
88
  int32_t lino = 0;
9,631,064✔
89
  int64_t startTs = taosGetTimestampUs();  
9,631,276✔
90
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
9,631,276✔
91
  int32_t completed = 0;
9,631,276✔
92
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
9,631,276✔
93
  if (code != TSDB_CODE_SUCCESS) {
9,631,064✔
94
    pTaskInfo->code = code;
×
95
    T_LONG_JMP(pTaskInfo->env, code);
×
96
  }
97
  if (completed == totalSources) {
9,631,064✔
98
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
1,740,174✔
99
    setAllSourcesCompleted(pOperator);
1,740,174✔
100
    return;
1,742,154✔
101
  }
102

103
  SSourceDataInfo* pDataInfo = NULL;
7,890,890✔
104

105
  while (1) {
4,468,975✔
106
    if (pExchangeInfo->current < 0) {
12,359,865✔
107
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
104,779✔
108
      setAllSourcesCompleted(pOperator);
104,779✔
109
      return;
104,779✔
110
    }
111
    
112
    if (pExchangeInfo->current >= totalSources) {
12,255,059✔
113
      completed = 0;
5,811,896✔
114
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
5,811,896✔
115
      if (code != TSDB_CODE_SUCCESS) {
5,811,896✔
116
        pTaskInfo->code = code;
×
117
        T_LONG_JMP(pTaskInfo->env, code);
×
118
      }
119
      if (completed == totalSources) {
5,811,896✔
120
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
3,927,461✔
121
        setAllSourcesCompleted(pOperator);
3,927,461✔
122
        return;
3,927,461✔
123
      }
124
      
125
      pExchangeInfo->current = 0;
1,884,435✔
126
    }
127

128
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
8,327,625✔
129

130
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
8,327,625✔
131
    if (!pDataInfo) {
8,327,837✔
132
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
133
      pTaskInfo->code = terrno;
×
134
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
135
    }
136

137
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
8,327,837✔
138
      pExchangeInfo->current++;
980✔
139
      continue;
980✔
140
    }
141

142
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
8,326,857✔
143

144
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
8,326,857✔
145
    if (code != TSDB_CODE_SUCCESS) {
8,326,857✔
146
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
147
      pTaskInfo->code = code;
×
148
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
149
    }
150

UNCOV
151
    while (true) {
×
152
      code = exchangeWait(pOperator, pExchangeInfo);
8,326,857✔
153
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
8,326,644✔
154
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
701✔
155
      }
156

157
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
8,326,156✔
158
      if (pDataInfo->seqId != currSeqId) {
8,326,644✔
UNCOV
159
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
×
160
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
UNCOV
161
        taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
162
        continue;
×
163
      }
164

165
      break;
8,326,857✔
166
    }
167

168
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
8,326,857✔
169
    if (!pSource) {
8,326,400✔
170
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
171
      pTaskInfo->code = terrno;
×
172
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
173
    }
174

175
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
8,326,400✔
176
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
177
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
178
             tstrerror(pDataInfo->code));
179
      pTaskInfo->code = pDataInfo->code;
×
180
      T_LONG_JMP(pTaskInfo->env, code);
×
181
    }
182

183
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
8,326,400✔
184
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
8,326,644✔
185

186
    if (pRsp->numOfRows == 0) {
8,326,644✔
187
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
4,467,995✔
188
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
189
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
190
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
191

192
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
4,467,995✔
193
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
4,467,995✔
194
        pExchangeInfo->current = -1;
104,779✔
195
      } else {
196
        pExchangeInfo->current += 1;
4,363,216✔
197
      }
198
      taosMemoryFreeClear(pDataInfo->pRsp);
4,467,995✔
199
      continue;
4,467,995✔
200
    }
201

202
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,858,862✔
203
    TAOS_CHECK_EXIT(code);
3,858,862✔
204

205
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,858,862✔
206
    if (pRsp->completed == 1) {
3,858,862✔
207
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,854,096✔
208
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
209
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
210
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
211
             pExchangeInfo->current + 1, totalSources);
212

213
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,854,096✔
214
      if (isVstbScan(pDataInfo)) {
1,854,096✔
215
        pExchangeInfo->current = -1;
×
216
        taosMemoryFreeClear(pDataInfo->pRsp);
×
217
        continue;
×
218
      }
219
    } else {
220
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
2,004,766✔
221
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
222
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
223
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
224
    }
225

226
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
3,858,862✔
227
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,858,862✔
228

229
    pExchangeInfo->current++;
3,858,862✔
230

231
    taosMemoryFreeClear(pDataInfo->pRsp);
3,858,862✔
232
    return;
3,858,862✔
233
  }
234

235
_exit:
×
236

237
  if (code) {
×
238
    pTaskInfo->code = code;
×
239
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
240
  }
241
}
242

243

244
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
296,960,933✔
245
                                           SExecTaskInfo* pTaskInfo) {
246
  int32_t code = 0;
296,960,933✔
247
  int32_t lino = 0;
296,960,933✔
248
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
296,960,933✔
249
  int32_t completed = 0;
296,960,028✔
250
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
296,960,028✔
251
  if (code != TSDB_CODE_SUCCESS) {
296,960,468✔
252
    pTaskInfo->code = code;
×
253
    T_LONG_JMP(pTaskInfo->env, code);
×
254
  }
255
  if (completed == totalSources) {
296,960,468✔
256
    setAllSourcesCompleted(pOperator);
97,904,045✔
257
    return;
97,904,750✔
258
  }
259

260
  SSourceDataInfo* pDataInfo = NULL;
199,056,423✔
261

262
  while (1) {
22,734,637✔
263
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
221,791,060✔
264
    code = exchangeWait(pOperator, pExchangeInfo);
221,791,060✔
265

266
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
221,793,268✔
267
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
36,735✔
268
    }
269

270
    for (int32_t i = 0; i < totalSources; ++i) {
368,300,499✔
271
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
368,299,104✔
272
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
368,300,499✔
273
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
368,300,499✔
274
        continue;
110,051,021✔
275
      }
276

277
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
258,249,993✔
278
        continue;
36,493,460✔
279
      }
280

281
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
221,757,463✔
282
      if (pDataInfo->seqId != currSeqId) {
221,756,998✔
283
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
284
        taosMemoryFreeClear(pDataInfo->pRsp);
×
285
        break;
×
286
      }
287

288
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
221,756,068✔
289
        code = pDataInfo->code;
6,831✔
290
        TAOS_CHECK_EXIT(code);
6,831✔
291
      }
292

293
      tmemory_barrier();
221,749,237✔
294
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
221,749,237✔
295
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
221,750,167✔
296
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
221,749,237✔
297

298
      // todo
299
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
221,749,237✔
300
      if (pRsp->numOfRows == 0) {
221,749,702✔
301
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
46,650,251✔
302
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
303
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
304
          if (code != TSDB_CODE_SUCCESS) {
×
305
            taosMemoryFreeClear(pDataInfo->pRsp);
×
306
            TAOS_CHECK_EXIT(code);
×
307
          }
308
        } else {
309
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
46,650,251✔
310
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
46,650,251✔
311
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
312
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
313
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
314
          taosMemoryFreeClear(pDataInfo->pRsp);
46,650,251✔
315
        }
316
        break;
46,650,251✔
317
      }
318

319
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
175,099,451✔
320

321
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
175,099,366✔
322
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
175,099,401✔
323
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
175,100,381✔
324

325
      if (pRsp->completed == 1) {
175,098,491✔
326
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
162,962,691✔
327
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
162,962,691✔
328
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
329
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
330
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
331
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
332
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
333
      } else {
334
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
12,137,175✔
335
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
336
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
337
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
338
      }
339

340
      taosMemoryFreeClear(pDataInfo->pRsp);
175,100,342✔
341

342
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
175,098,936✔
343
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
12,137,175✔
344
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
12,137,175✔
345
        if (code != TSDB_CODE_SUCCESS) {
12,137,175✔
346
          taosMemoryFreeClear(pDataInfo->pRsp);
×
347
          TAOS_CHECK_EXIT(code);
×
348
        }
349
      }
350
      
351
      return;
175,099,490✔
352
    }  // end loop
353

354
    int32_t complete1 = 0;
46,651,646✔
355
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
46,650,251✔
356
    if (code != TSDB_CODE_SUCCESS) {
46,650,251✔
357
      pTaskInfo->code = code;
×
358
      T_LONG_JMP(pTaskInfo->env, code);
×
359
    }
360
    if (complete1 == totalSources) {
46,650,251✔
361
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
23,915,614✔
362
      return;
23,915,114✔
363
    }
364
  }
365

366
_exit:
6,366✔
367

368
  if (code) {
6,831✔
369
    pTaskInfo->code = code;
6,831✔
370
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
6,831✔
371
  }
372
}
373

374
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
593,562,527✔
375
  int32_t        code = TSDB_CODE_SUCCESS;
593,562,527✔
376
  SExchangeInfo* pExchangeInfo = pOperator->info;
593,562,527✔
377
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
593,563,805✔
378

379
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
593,561,942✔
380

381
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
593,559,773✔
382
  if (pOperator->status == OP_EXEC_DONE) {
593,559,721✔
383
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
384
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
385
           pLoadInfo->totalElapsed / 1000.0);
386
    return NULL;
×
387
  }
388

389
  // we have buffered retrieved datablock, return it directly
390
  SSDataBlock* p = NULL;
593,559,923✔
391
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
593,561,473✔
392
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
259,782,592✔
393
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
259,782,799✔
394
  }
395

396
  if (p != NULL) {
593,561,313✔
397
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
259,782,592✔
398
    if (!tmp) {
259,782,799✔
399
      code = terrno;
×
400
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
401
      pTaskInfo->code = code;
×
402
      T_LONG_JMP(pTaskInfo->env, code);
×
403
    }
404
    return p;
259,782,799✔
405
  } else {
406
    if (pExchangeInfo->seqLoadData) {
333,778,721✔
407
      code = seqLoadRemoteData(pOperator);
27,187,402✔
408
      if (code != TSDB_CODE_SUCCESS) {
27,187,402✔
409
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
621✔
410
        pTaskInfo->code = code;
621✔
411
        T_LONG_JMP(pTaskInfo->env, code);
621✔
412
      }
413
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
306,591,266✔
414
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
9,631,064✔
415
    } else {
416
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
296,959,447✔
417
    }
418
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
333,742,922✔
419
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
8,226✔
420
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
8,226✔
421
    }
422
    
423
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
333,733,049✔
424
      qDebug("empty resultBlockList");
134,419,117✔
425
      return NULL;
134,419,117✔
426
    } else {
427
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
199,315,842✔
428
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
199,316,357✔
429
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
199,317,237✔
430
      if (!tmp) {
199,317,024✔
431
        code = terrno;
×
432
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
433
        pTaskInfo->code = code;
×
434
        T_LONG_JMP(pTaskInfo->env, code);
×
435
      }
436

437
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
199,317,024✔
438
      return p;
199,317,024✔
439
    }
440
  }
441
}
442

443
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
591,737,090✔
444
  int32_t        code = TSDB_CODE_SUCCESS;
591,737,090✔
445
  int32_t        lino = 0;
591,737,090✔
446
  SExchangeInfo* pExchangeInfo = pOperator->info;
591,737,090✔
447
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
591,740,727✔
448

449
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
591,739,118✔
450

451
  code = pOperator->fpSet._openFn(pOperator);
591,740,908✔
452
  QUERY_CHECK_CODE(code, lino, _end);
591,739,616✔
453

454
  if (pOperator->status == OP_EXEC_DONE) {
591,739,616✔
455
    (*ppRes) = NULL;
81,418✔
456
    return code;
81,418✔
457
  }
458

459
  while (1) {
1,903,515✔
460
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
593,562,643✔
461
    if (pBlock == NULL) {
593,517,006✔
462
      (*ppRes) = NULL;
134,418,389✔
463
      return code;
134,418,389✔
464
    }
465

466
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
459,098,617✔
467
    QUERY_CHECK_CODE(code, lino, _end);
459,097,535✔
468

469
    if (blockDataGetNumOfRows(pBlock) == 0) {
459,097,535✔
470
      qDebug("rows 0 block got, continue next load");
996✔
471
      continue;
996✔
472
    }
473

474
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
459,097,948✔
475
    if (hasLimitOffsetInfo(pLimitInfo)) {
459,097,533✔
476
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
3,937,997✔
477
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,937,997✔
478
        qDebug("limit retrieve continue");
1,902,519✔
479
        continue;
1,902,519✔
480
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,035,478✔
481
        if (pBlock->info.rows == 0) {
2,035,478✔
482
          setOperatorCompleted(pOperator);
×
483
          (*ppRes) = NULL;
×
484
          return code;
×
485
        } else {
486
          (*ppRes) = pBlock;
2,035,478✔
487
          return code;
2,035,478✔
488
        }
489
      }
490
    } else {
491
      (*ppRes) = pBlock;
455,157,368✔
492
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
455,159,228✔
493
      return code;
455,156,645✔
494
    }
495
  }
496

497
_end:
×
498

499
  if (code != TSDB_CODE_SUCCESS) {
×
500
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
501
    pTaskInfo->code = code;
×
502
    T_LONG_JMP(pTaskInfo->env, code);
×
503
  } else {
504
    qDebug("empty block returned in exchange");
×
505
  }
506
  
507
  (*ppRes) = NULL;
×
508
  return code;
×
509
}
510

511
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
132,006,080✔
512
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
132,006,080✔
513
  if (pInfo->pSourceDataInfo == NULL) {
132,004,781✔
514
    return terrno;
×
515
  }
516

517
  if (pInfo->dynamicOp) {
132,005,117✔
518
    return TSDB_CODE_SUCCESS;
6,103,087✔
519
  }
520

521
  int32_t len = strlen(id) + 1;
125,903,505✔
522
  pInfo->pTaskId = taosMemoryCalloc(1, len);
125,903,505✔
523
  if (!pInfo->pTaskId) {
125,903,099✔
524
    return terrno;
×
525
  }
526
  tstrncpy(pInfo->pTaskId, id, len);
125,898,878✔
527
  for (int32_t i = 0; i < numOfSources; ++i) {
333,799,396✔
528
    SSourceDataInfo dataInfo = {0};
207,893,994✔
529
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
207,893,028✔
530
    dataInfo.taskId = pInfo->pTaskId;
207,893,028✔
531
    dataInfo.index = i;
207,895,282✔
532
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
207,895,282✔
533
    if (pDs == NULL) {
207,895,832✔
534
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
535
      return terrno;
×
536
    }
537
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
207,895,832✔
538
  }
539

540
  return TSDB_CODE_SUCCESS;
125,905,402✔
541
}
542

543
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
132,007,548✔
544
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
132,007,548✔
545

546
  if (numOfSources == 0) {
132,003,283✔
547
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
548
    return TSDB_CODE_INVALID_PARA;
×
549
  }
550
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
132,003,283✔
551
  if (!pInfo->pFetchRpcHandles) {
132,007,036✔
552
    return terrno;
×
553
  }
554
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
132,007,546✔
555
  if (!ret) {
132,004,657✔
556
    return terrno;
×
557
  }
558

559
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
132,004,657✔
560
  if (pInfo->pSources == NULL) {
132,007,083✔
561
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
562
    return terrno;
×
563
  }
564

565
  if (pExNode->node.dynamicOp) {
132,006,118✔
566
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,103,087✔
567
    if (NULL == pInfo->pHashSources) {
6,103,087✔
568
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
569
      return terrno;
×
570
    }
571
  }
572

573
  for (int32_t i = 0; i < numOfSources; ++i) {
352,632,277✔
574
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
220,626,208✔
575
    if (!pNode) {
220,619,619✔
576
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
577
      return terrno;
×
578
    }
579
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
220,619,619✔
580
    if (!tmp) {
220,629,354✔
581
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
582
      return terrno;
×
583
    }
584
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
220,629,354✔
585
    int32_t           code =
586
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
220,629,336✔
587
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
220,624,603✔
588
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
589
      return code;
×
590
    }
591
  }
592

593
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
132,006,069✔
594
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
132,006,487✔
595
  if (refId < 0) {
132,000,568✔
596
    int32_t code = terrno;
×
597
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
598
    return code;
×
599
  } else {
600
    pInfo->self = refId;
132,000,568✔
601
  }
602

603
  return initDataSource(numOfSources, pInfo, id);
132,002,966✔
604
}
605

606
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
8,972,943✔
607
  SExchangeInfo* pInfo = pOper->info;
8,972,943✔
608
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
8,974,000✔
609

610
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
8,973,578✔
611

612
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
8,974,065✔
613
  pOper->status = OP_NOT_OPENED;
8,974,485✔
614
  pInfo->current = 0;
8,974,698✔
615
  pInfo->loadInfo.totalElapsed = 0;
8,974,698✔
616
  pInfo->loadInfo.totalRows = 0;
8,974,698✔
617
  pInfo->loadInfo.totalSize = 0;
8,974,485✔
618
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
24,957,463✔
619
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
15,982,976✔
620
    taosWLockLatch(&pDataInfo->lock);
15,982,552✔
621
    taosMemoryFreeClear(pDataInfo->decompBuf);
15,982,554✔
622
    taosMemoryFreeClear(pDataInfo->pRsp);
15,982,765✔
623

624
    pDataInfo->totalRows = 0;
15,982,554✔
625
    pDataInfo->code = 0;
15,982,554✔
626
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
15,982,554✔
627
    pDataInfo->fetchSent = false;
15,982,765✔
628
    taosWUnLockLatch(&pDataInfo->lock);
15,982,765✔
629
  }
630

631
  if (pInfo->dynamicOp) {
8,974,698✔
632
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,503,044✔
633
  } 
634

635
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
8,974,698✔
636
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
8,974,272✔
637

638
  blockDataCleanup(pInfo->pDummyBlock);
8,974,698✔
639

640
  void   *data = NULL;
8,974,485✔
641
  int32_t iter = 0;
8,974,485✔
642
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
12,071,198✔
643
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
3,096,713✔
644
  }
645
  
646
  pInfo->limitInfo = (SLimitInfo){0};
8,974,485✔
647
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
8,974,485✔
648

649
  return 0;
8,974,698✔
650
}
651

652
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
132,007,548✔
653
                                   SOperatorInfo** pOptrInfo) {
654
  QRY_PARAM_CHECK(pOptrInfo);
132,007,548✔
655

656
  int32_t        code = 0;
132,009,419✔
657
  int32_t        lino = 0;
132,009,419✔
658
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
132,009,419✔
659
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
132,003,781✔
660
  if (pInfo == NULL || pOperator == NULL) {
132,002,868✔
661
    code = terrno;
×
662
    goto _error;
×
663
  }
664

665
  pOperator->pPhyNode = pExNode;
132,002,868✔
666
  pInfo->dynamicOp = pExNode->node.dynamicOp;
132,002,868✔
667
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
132,005,212✔
668
  QUERY_CHECK_CODE(code, lino, _error);
132,008,024✔
669

670
  code = tsem_init(&pInfo->ready, 0, 0);
132,008,024✔
671
  QUERY_CHECK_CODE(code, lino, _error);
132,005,933✔
672

673
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
132,005,933✔
674
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
132,007,524✔
675

676
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
132,007,524✔
677
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
132,007,176✔
678
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
132,008,489✔
679
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
132,005,696✔
680

681
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
132,007,676✔
682
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
132,007,211✔
683
  QUERY_CHECK_CODE(code, lino, _error);
132,007,548✔
684

685
  pInfo->seqLoadData = pExNode->seqRecvData;
132,007,548✔
686
  pInfo->dynTbname = pExNode->dynTbname;
132,007,559✔
687
  if (pInfo->dynTbname) {
132,008,013✔
688
    pInfo->seqLoadData = true;
10,596✔
689
  }
690
  pInfo->pTransporter = pTransporter;
132,007,548✔
691

692
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
132,006,583✔
693
                  pTaskInfo);
694
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
132,006,357✔
695

696
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
132,006,583✔
697
                            pTaskInfo->pStreamRuntimeInfo);
132,006,118✔
698
  QUERY_CHECK_CODE(code, lino, _error);
132,003,456✔
699
  qTrace("%s exchange op:%p", __func__, pOperator);
132,003,456✔
700
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
132,003,456✔
701
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
702
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
132,003,091✔
703
  *pOptrInfo = pOperator;
132,004,750✔
704
  return TSDB_CODE_SUCCESS;
132,005,223✔
705

706
_error:
×
707
  if (code != TSDB_CODE_SUCCESS) {
×
708
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
709
    pTaskInfo->code = code;
×
710
  }
711
  if (pInfo != NULL) {
×
712
    doDestroyExchangeOperatorInfo(pInfo);
×
713
  }
714

715
  if (pOperator != NULL) {
×
716
    pOperator->info = NULL;
×
717
    destroyOperator(pOperator);
×
718
  }
719
  return code;
×
720
}
721

722
void destroyExchangeOperatorInfo(void* param) {
132,007,475✔
723
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
132,007,475✔
724
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
132,007,475✔
725
  if (code != TSDB_CODE_SUCCESS) {
132,008,919✔
726
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
727
  }
728
}
132,008,919✔
729

730
void freeBlock(void* pParam) {
253,601,315✔
731
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
253,601,315✔
732
  blockDataDestroy(pBlock);
253,601,993✔
733
}
253,601,747✔
734

735
void freeSourceDataInfo(void* p) {
212,224,057✔
736
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
212,224,057✔
737
  taosMemoryFreeClear(pInfo->decompBuf);
212,224,057✔
738
  taosMemoryFreeClear(pInfo->pRsp);
212,227,712✔
739

740
  pInfo->decompBufSize = 0;
212,225,537✔
741
}
212,225,536✔
742

743
void doDestroyExchangeOperatorInfo(void* param) {
132,007,127✔
744
  if (param == NULL) {
132,007,127✔
745
    return;
×
746
  }
747
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
132,007,127✔
748
  if (pExInfo->pFetchRpcHandles) {
132,007,127✔
749
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
352,637,421✔
750
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
220,631,753✔
751
      if (*pRpcHandle > 0) {
220,631,831✔
752
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
8,148,366✔
753
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
8,148,366✔
754
      }
755
    }
756
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
132,007,979✔
757
  }
758

759
  taosArrayDestroy(pExInfo->pSources);
132,009,172✔
760
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
132,008,571✔
761

762
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
132,005,186✔
763
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
132,006,584✔
764

765
  blockDataDestroy(pExInfo->pDummyBlock);
132,004,911✔
766
  tSimpleHashCleanup(pExInfo->pHashSources);
132,007,474✔
767

768
  int32_t code = tsem_destroy(&pExInfo->ready);
132,007,002✔
769
  if (code != TSDB_CODE_SUCCESS) {
132,005,209✔
770
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
771
  }
772
  taosMemoryFreeClear(pExInfo->pTaskId);
132,005,209✔
773

774
  taosMemoryFreeClear(param);
131,998,745✔
775
}
776

777
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
258,147,293✔
778
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
258,147,293✔
779

780
  taosMemoryFreeClear(pMsg->pEpSet);
258,147,293✔
781
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
258,227,177✔
782
  if (pExchangeInfo == NULL) {
258,236,381✔
783
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
46,181✔
784
    taosMemoryFree(pMsg->pData);
46,181✔
785
    return TSDB_CODE_SUCCESS;
46,181✔
786
  }
787

788
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
258,190,200✔
789
  if (pWrapper->seqId != currSeqId) {
258,195,676✔
790
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
791
    taosMemoryFree(pMsg->pData);
×
792
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
793
    if (code != TSDB_CODE_SUCCESS) {
×
794
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
795
    }
796
    return TSDB_CODE_SUCCESS;
×
797
  }
798

799
  int32_t          index = pWrapper->sourceIndex;
258,115,340✔
800

801
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
258,151,633✔
802

803
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
258,153,926✔
804
  if (pRpcHandle != NULL) {
258,166,030✔
805
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
258,092,308✔
806
    if (ret != 0) {
258,144,809✔
807
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
10,306,392✔
808
    }
809
    *pRpcHandle = -1;
258,144,809✔
810
  }
811

812
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
258,142,561✔
813
  if (!pSourceDataInfo) {
258,178,802✔
814
    return terrno;
×
815
  }
816

817
  if (0 == code && NULL == pMsg->pData) {
258,178,802✔
818
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
819
    code = TSDB_CODE_QRY_INVALID_MSG;
×
820
  }
821

822
  taosWLockLatch(&pSourceDataInfo->lock);
258,244,128✔
823
  if (code == TSDB_CODE_SUCCESS) {
258,156,114✔
824
    pSourceDataInfo->seqId = pWrapper->seqId;
258,140,187✔
825
    pSourceDataInfo->pRsp = pMsg->pData;
258,144,918✔
826

827
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
258,039,899✔
828
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
258,044,367✔
829
    pRsp->compLen = htonl(pRsp->compLen);
258,105,651✔
830
    pRsp->payloadLen = htonl(pRsp->payloadLen);
258,112,056✔
831
    pRsp->numOfCols = htonl(pRsp->numOfCols);
257,993,889✔
832
    pRsp->useconds = htobe64(pRsp->useconds);
258,105,951✔
833
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
257,926,111✔
834

835
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
258,015,405✔
836
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
837
  } else {
838
    taosMemoryFree(pMsg->pData);
15,927✔
839
    pSourceDataInfo->code = rpcCvtErrCode(code);
15,927✔
840
    if (pSourceDataInfo->code != code) {
15,927✔
841
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
842
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
843
    } else {
844
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
15,927✔
845
             pExchangeInfo);
846
    }
847
  }
848

849
  tmemory_barrier();
258,032,322✔
850
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
258,032,322✔
851
  taosWUnLockLatch(&pSourceDataInfo->lock);
258,125,947✔
852
  
853
  code = tsem_post(&pExchangeInfo->ready);
258,041,039✔
854
  if (code != TSDB_CODE_SUCCESS) {
258,202,876✔
855
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
856
    return code;
×
857
  }
858

859
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
258,202,876✔
860
  if (code != TSDB_CODE_SUCCESS) {
258,210,134✔
861
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
862
  }
863
  return code;
258,210,630✔
864
}
865

866
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
242,659✔
867
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
242,659✔
868
  if (NULL == *ppRes) {
242,659✔
869
    return terrno;
×
870
  }
871

872
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
242,659✔
873
  if (NULL == pScan) {
242,659✔
874
    taosMemoryFreeClear(*ppRes);
×
875
    return terrno;
×
876
  }
877

878
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
242,659✔
879
  pScan->pUidList = taosArrayDup(pUidList, NULL);
242,659✔
880
  if (NULL == pScan->pUidList) {
242,659✔
881
    taosMemoryFree(pScan);
×
882
    taosMemoryFreeClear(*ppRes);
×
883
    return terrno;
×
884
  }
885
  pScan->dynType = DYN_TYPE_STB_JOIN;
242,659✔
886
  pScan->tableSeq = tableSeq;
242,659✔
887
  pScan->pOrgTbInfo = NULL;
242,659✔
888
  pScan->pBatchTbInfo = NULL;
242,659✔
889
  pScan->pTagList = NULL;
242,659✔
890
  pScan->isNewParam = false;
242,659✔
891
  pScan->window.skey = INT64_MAX;
242,659✔
892
  pScan->window.ekey = INT64_MIN;
242,659✔
893
  pScan->notifyToProcess = false;
242,659✔
894
  pScan->notifyTs = 0;
242,659✔
895

896
  (*ppRes)->opType = srcOpType;
242,659✔
897
  (*ppRes)->downstreamIdx = 0;
242,659✔
898
  (*ppRes)->value = pScan;
242,659✔
899
  (*ppRes)->pChildren = NULL;
242,659✔
900
  (*ppRes)->reUse = false;
242,659✔
901

902
  return TSDB_CODE_SUCCESS;
242,659✔
903
}
904

905
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
24,421,160✔
906
  int32_t                  code = TSDB_CODE_SUCCESS;
24,421,160✔
907
  int32_t                  lino = 0;
24,421,160✔
908
  STableScanOperatorParam* pScan = NULL;
24,421,160✔
909

910
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
24,421,160✔
911
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
24,421,373✔
912

913
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
24,421,160✔
914
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
24,421,373✔
915

916
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
24,421,373✔
917
  if (pUidList) {
24,421,373✔
918
    pScan->pUidList = taosArrayDup(pUidList, NULL);
24,421,373✔
919
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
24,421,162✔
920
  } else {
921
    pScan->pUidList = NULL;
×
922
  }
923

924
  if (pMap) {
24,421,162✔
925
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
23,566,702✔
926
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
23,566,489✔
927

928
    pScan->pOrgTbInfo->vgId = pMap->vgId;
23,566,702✔
929
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
23,566,702✔
930

931
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
23,566,913✔
932
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
23,566,700✔
933
  } else {
934
    pScan->pOrgTbInfo = NULL;
854,460✔
935
  }
936
  pScan->pTagList = NULL;
24,421,373✔
937
  pScan->pBatchTbInfo = NULL;
24,421,160✔
938

939

940
  pScan->dynType = type;
24,421,373✔
941
  pScan->tableSeq = tableSeq;
24,421,160✔
942
  pScan->window.skey = window->skey;
24,421,373✔
943
  pScan->window.ekey = window->ekey;
24,421,373✔
944
  pScan->isNewParam = isNewParam;
24,421,373✔
945
  pScan->notifyToProcess = false;
24,421,160✔
946
  pScan->notifyTs = 0;
24,421,373✔
947
  (*ppRes)->opType = srcOpType;
24,421,373✔
948
  (*ppRes)->downstreamIdx = 0;
24,421,373✔
949
  (*ppRes)->value = pScan;
24,421,160✔
950
  (*ppRes)->pChildren = NULL;
24,421,160✔
951
  (*ppRes)->reUse = false;
24,421,373✔
952

953
  return code;
24,421,373✔
954
_return:
×
955
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
956
  taosMemoryFreeClear(*ppRes);
×
957
  if (pScan) {
×
958
    taosArrayDestroy(pScan->pUidList);
×
959
    if (pScan->pOrgTbInfo) {
×
960
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
961
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
962
    }
963
    taosMemoryFree(pScan);
×
964
  }
965
  return code;
×
966
}
967

968
/**
969
  @brief build the table scan operator param for notify message
970
*/
971
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
219,432✔
972
                                          int32_t srcOpType, TSKEY notifyTs) {
973
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
219,432✔
974
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
975
    return TSDB_CODE_INVALID_PARA;
×
976
  }
977
  int32_t code = TSDB_CODE_SUCCESS;
219,432✔
978
  int32_t lino = 0;
219,432✔
979
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
219,432✔
980
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
219,432✔
981

982
  STableScanOperatorParam* pTsParam =
438,864✔
983
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
219,432✔
984
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
219,432✔
985

986
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
219,432✔
987
  pTsParam->notifyToProcess = true;
219,432✔
988
  pTsParam->notifyTs = notifyTs;
219,432✔
989

990
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
219,432✔
991
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
992
  (*ppRes)->downstreamIdx = 0;
219,432✔
993
  (*ppRes)->value = pTsParam;
219,432✔
994
  (*ppRes)->pChildren = NULL;
219,432✔
995
  /* param is not reusable when it is transferred by message */
996
  (*ppRes)->reUse = false;
219,432✔
997

998
_return:
219,432✔
999
  if (TSDB_CODE_SUCCESS != code) {
219,432✔
1000
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1001
           __func__, lino, tstrerror(code));
1002
    taosMemoryFreeClear(*ppRes);
×
1003
    if (pTsParam) {
×
1004
      taosMemoryFree(pTsParam);
×
1005
    }
1006
  }
1007
  return code;
219,432✔
1008
}
1009

1010
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
4,262,052✔
1011
  int32_t                  code = TSDB_CODE_SUCCESS;
4,262,052✔
1012
  int32_t                  lino = 0;
4,262,052✔
1013
  STableScanOperatorParam* pScan = NULL;
4,262,052✔
1014

1015
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,262,052✔
1016
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
4,262,052✔
1017

1018
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
4,262,052✔
1019
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
4,262,052✔
1020

1021
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
4,262,052✔
1022
  pScan->groupid = groupid;
4,262,052✔
1023
  if (pUidList) {
4,262,052✔
1024
    pScan->pUidList = taosArrayDup(pUidList, NULL);
4,262,052✔
1025
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
4,262,052✔
1026
  } else {
1027
    pScan->pUidList = NULL;
×
1028
  }
1029
  pScan->pOrgTbInfo = NULL;
4,262,052✔
1030

1031
  if (pBatchMap) {
4,262,052✔
1032
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
4,262,052✔
1033
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
4,262,052✔
1034
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
11,310,368✔
1035
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
7,048,316✔
1036
      SOrgTbInfo batchInfo = {0};
7,048,316✔
1037
      batchInfo.vgId = pSrcInfo->vgId;
7,048,316✔
1038
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
7,048,316✔
1039
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
7,048,316✔
1040
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
7,048,316✔
1041
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
7,048,316✔
1042
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
7,048,316✔
1043
    }
1044
  } else {
1045
    pScan->pBatchTbInfo = NULL;
×
1046
  }
1047

1048
  if (pTagList) {
4,262,052✔
1049
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
1,973,744✔
1050
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
1,973,744✔
1051

1052
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
13,346,068✔
1053
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
11,372,324✔
1054
      STagVal  dstTag;
11,372,324✔
1055
      dstTag.type = pSrcTag->type;
11,372,324✔
1056
      dstTag.cid = pSrcTag->cid;
11,372,324✔
1057
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
11,372,324✔
1058
        dstTag.nData = pSrcTag->nData;
4,988,760✔
1059
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
4,988,760✔
1060
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
4,988,760✔
1061
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
4,988,760✔
1062
      } else {
1063
        dstTag.i64 = pSrcTag->i64;
6,383,564✔
1064
      }
1065

1066
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
22,744,648✔
1067
    }
1068
  } else {
1069
    pScan->pTagList = NULL;
2,288,308✔
1070
  }
1071

1072

1073
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
4,262,052✔
1074
  pScan->tableSeq = tableSeq;
4,262,052✔
1075
  pScan->window.skey = window->skey;
4,262,052✔
1076
  pScan->window.ekey = window->ekey;
4,262,052✔
1077
  pScan->isNewParam = isNewParam;
4,262,052✔
1078
  pScan->notifyToProcess = false;
4,262,052✔
1079
  pScan->notifyTs = 0;
4,262,052✔
1080
  (*ppRes)->opType = srcOpType;
4,262,052✔
1081
  (*ppRes)->downstreamIdx = 0;
4,262,052✔
1082
  (*ppRes)->value = pScan;
4,262,052✔
1083
  (*ppRes)->pChildren = NULL;
4,262,052✔
1084
  (*ppRes)->reUse = false;
4,262,052✔
1085

1086
  return code;
4,262,052✔
1087
_return:
×
1088
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1089
  taosMemoryFreeClear(*ppRes);
×
1090
  if (pScan) {
×
1091
    taosArrayDestroy(pScan->pUidList);
×
1092
    if (pScan->pBatchTbInfo) {
×
1093
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1094
    }
1095
    taosMemoryFree(pScan);
×
1096
  }
1097
  return code;
×
1098
}
1099

1100
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam, EExchangeSourceType type) {
4,262,052✔
1101
  int32_t                  code = TSDB_CODE_SUCCESS;
4,262,052✔
1102
  int32_t                  lino = 0;
4,262,052✔
1103
  SOperatorParam*          pParam = NULL;
4,262,052✔
1104

1105
  switch (type) {
4,262,052✔
1106
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
4,060,708✔
1107
      pParam = taosMemoryMalloc(sizeof(SOperatorParam));
4,060,708✔
1108
      QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
4,060,708✔
1109

1110
      pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
4,060,708✔
1111
      QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
4,060,708✔
1112

1113
      SOperatorParam* pTableScanParam = NULL;
4,060,708✔
1114
      code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
4,060,708✔
1115
      QUERY_CHECK_CODE(code, lino, _return);
4,060,708✔
1116

1117
      QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
8,121,416✔
1118

1119
      pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
4,060,708✔
1120
      pParam->downstreamIdx = 0;
4,060,708✔
1121
      pParam->value = NULL;
4,060,708✔
1122
      pParam->reUse = false;
4,060,708✔
1123

1124
      break;
4,060,708✔
1125
    }
1126
    case EX_SRC_TYPE_VSTB_WIN_SCAN: {
201,344✔
1127
      code = buildTableScanOperatorParamBatchInfo(&pParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
201,344✔
1128
      QUERY_CHECK_CODE(code, lino, _return);
201,344✔
1129
      break;
201,344✔
1130
    }
1131
    default: {
×
1132
      code = TSDB_CODE_INVALID_PARA;
×
1133
      qError("%s failed at %d, invalid exchange source type:%d", __FUNCTION__, lino, type);
×
1134
      goto _return;
×
1135
    }
1136
  }
1137

1138
  *ppRes = pParam;
4,262,052✔
1139
  return code;
4,262,052✔
1140

1141
_return:
×
1142
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1143
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1144
  return code;
×
1145
}
1146

1147
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
3,469,065✔
1148
  int32_t                  code = TSDB_CODE_SUCCESS;
3,469,065✔
1149
  int32_t                  lino = 0;
3,469,065✔
1150
  STagScanOperatorParam*   pScan = NULL;
3,469,065✔
1151

1152
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,469,065✔
1153
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,469,065✔
1154

1155
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
3,469,065✔
1156
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,469,065✔
1157
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
3,469,065✔
1158

1159
  (*ppRes)->opType = srcOpType;
3,469,065✔
1160
  (*ppRes)->downstreamIdx = 0;
3,469,065✔
1161
  (*ppRes)->value = pScan;
3,469,065✔
1162
  (*ppRes)->pChildren = NULL;
3,469,065✔
1163
  (*ppRes)->reUse = false;
3,469,065✔
1164

1165
  return code;
3,469,065✔
1166
_return:
×
1167
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1168
  taosMemoryFreeClear(*ppRes);
×
1169
  if (pScan) {
×
1170
    taosMemoryFree(pScan);
×
1171
  }
1172
  return code;
×
1173
}
1174

1175
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,842,294✔
1176
  if (!pRuntimeInfo || !pTimeRange) {
4,842,294✔
1177
    return TSDB_CODE_INTERNAL_ERROR;
×
1178
  }
1179

1180
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,842,294✔
1181
  if (!pParam) {
4,842,081✔
1182
    return TSDB_CODE_INTERNAL_ERROR;
×
1183
  }
1184

1185
  switch (pRuntimeInfo->triggerType) {
4,842,081✔
1186
    case STREAM_TRIGGER_SLIDING:
3,892,576✔
1187
      // Unable to distinguish whether there is an interval, all use wstart/wend
1188
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1189
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,892,576✔
1190
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,892,576✔
1191
      break;
3,892,576✔
1192
    case STREAM_TRIGGER_PERIOD:
171,560✔
1193
      pTimeRange->skey = pParam->prevLocalTime;
171,560✔
1194
      pTimeRange->ekey = pParam->triggerTime;
171,560✔
1195
      break;
171,560✔
1196
    default:
778,158✔
1197
      pTimeRange->skey = pParam->wstart;
778,158✔
1198
      pTimeRange->ekey = pParam->wend;
778,158✔
1199
      break;
778,158✔
1200
  }
1201

1202
  return TSDB_CODE_SUCCESS;
4,842,294✔
1203
}
1204

1205
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
268,220,757✔
1206
  int32_t          code = TSDB_CODE_SUCCESS;
268,220,757✔
1207
  int32_t          lino = 0;
268,220,757✔
1208
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
268,220,757✔
1209
  if (!pDataInfo) {
268,215,764✔
1210
    return terrno;
×
1211
  }
1212

1213
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
268,215,764✔
1214
    return TSDB_CODE_SUCCESS;
9,799,993✔
1215
  }
1216

1217
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
258,421,565✔
1218
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
258,421,429✔
1219
  if (!pSource) {
258,419,448✔
1220
    return terrno;
×
1221
  }
1222

1223
  pDataInfo->startTime = taosGetTimestampUs();
258,420,877✔
1224
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
258,418,862✔
1225

1226
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
258,420,474✔
1227
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
258,414,753✔
1228
  pWrapper->exchangeId = pExchangeInfo->self;
258,414,753✔
1229
  pWrapper->sourceIndex = sourceIndex;
258,417,257✔
1230
  pWrapper->seqId = pExchangeInfo->seqId;
258,416,293✔
1231

1232
  if (pSource->localExec) {
258,420,284✔
1233
    SDataBuf pBuf = {0};
×
1234
    int32_t  code =
1235
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1236
                                  pTaskInfo->id.queryId, pSource->clientId,
1237
                                  pSource->taskId, 0, pSource->execId,
1238
                                  &pBuf.pData,
1239
                                  pTaskInfo->localFetch.explainRes);
1240
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1241
    taosMemoryFree(pWrapper);
×
1242
    QUERY_CHECK_CODE(code, lino, _end);
×
1243
  } else {
1244
    bool needStreamPesudoFuncVals = true;
258,411,141✔
1245
    SResFetchReq req = {0};
258,411,141✔
1246
    req.header.vgId = pSource->addr.nodeId;
258,419,684✔
1247
    req.sId = pSource->sId;
258,419,678✔
1248
    req.clientId = pSource->clientId;
258,419,757✔
1249
    req.taskId = pSource->taskId;
258,415,482✔
1250
    req.queryId = pTaskInfo->id.queryId;
258,416,077✔
1251
    req.execId = pSource->execId;
258,421,538✔
1252
    if (pTaskInfo->pStreamRuntimeInfo) {
258,421,660✔
1253
      req.dynTbname = pExchangeInfo->dynTbname;
8,399,271✔
1254
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
8,399,237✔
1255
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
8,399,663✔
1256

1257
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
8,399,450✔
1258
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
27,683✔
1259
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
8,371,980✔
1260
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,842,294✔
1261
        QUERY_CHECK_CODE(code, lino, _end);
4,842,294✔
1262
        needStreamPesudoFuncVals = false;
4,842,294✔
1263
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,842,294✔
1264
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1265
               req.pStRtFuncInfo->curWindow.ekey);
1266
      }
1267
      if (!pDataInfo->fetchSent) {
8,399,663✔
1268
        req.reset = pDataInfo->fetchSent = true;
6,447,806✔
1269
      }
1270
    }
1271

1272
    switch (pDataInfo->type) {
258,418,575✔
1273
      case EX_SRC_TYPE_VSTB_SCAN: {
23,566,700✔
1274
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
23,566,700✔
1275
        taosArrayDestroy(pDataInfo->orgTbInfo->colMap);
23,566,913✔
1276
        taosMemoryFreeClear(pDataInfo->orgTbInfo);
23,566,913✔
1277
        taosArrayDestroy(pDataInfo->pSrcUidList);
23,566,913✔
1278
        pDataInfo->pSrcUidList = NULL;
23,566,913✔
1279
        if (TSDB_CODE_SUCCESS != code) {
23,566,913✔
1280
          pTaskInfo->code = code;
×
1281
          taosMemoryFree(pWrapper);
×
1282
          return pTaskInfo->code;
×
1283
        }
1284
        break;
23,566,913✔
1285
      }
1286
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
1,809,875✔
1287
        if (pDataInfo->pSrcUidList) {
1,809,875✔
1288
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
854,460✔
1289
          taosArrayDestroy(pDataInfo->pSrcUidList);
854,460✔
1290
          pDataInfo->pSrcUidList = NULL;
854,460✔
1291
          if (TSDB_CODE_SUCCESS != code) {
854,460✔
1292
            pTaskInfo->code = code;
×
1293
            taosMemoryFree(pWrapper);
×
1294
            return pTaskInfo->code;
×
1295
          }
1296
        }
1297
        break;
1,809,875✔
1298
      }
1299
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
3,469,065✔
1300
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
3,469,065✔
1301
        taosArrayDestroy(pDataInfo->pSrcUidList);
3,469,065✔
1302
        pDataInfo->pSrcUidList = NULL;
3,469,065✔
1303
        if (TSDB_CODE_SUCCESS != code) {
3,469,065✔
1304
          pTaskInfo->code = code;
×
1305
          taosMemoryFree(pWrapper);
×
1306
          return pTaskInfo->code;
×
1307
        }
1308
        break;
3,469,065✔
1309
      }
1310
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
4,538,335✔
1311
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1312
        if (pDataInfo->batchOrgTbInfo) {
4,538,335✔
1313
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, pDataInfo->type);
4,262,052✔
1314
          if (pDataInfo->batchOrgTbInfo) {
4,262,052✔
1315
            for (int32_t i = 0; i < taosArrayGetSize(pDataInfo->batchOrgTbInfo); ++i) {
11,310,368✔
1316
              SOrgTbInfo* pColMap = taosArrayGet(pDataInfo->batchOrgTbInfo, i);
7,048,316✔
1317
              if (pColMap) {
7,048,316✔
1318
                taosArrayDestroy(pColMap->colMap);
7,048,316✔
1319
              }
1320
            }
1321
            taosArrayDestroy(pDataInfo->batchOrgTbInfo);
4,262,052✔
1322
            pDataInfo->batchOrgTbInfo = NULL;
4,262,052✔
1323
          }
1324
          if (pDataInfo->tagList) {
4,262,052✔
1325
            taosArrayDestroyEx(pDataInfo->tagList, destroyTagVal);
1,973,744✔
1326
            pDataInfo->tagList = NULL;
1,973,744✔
1327
          }
1328
          if (pDataInfo->pSrcUidList) {
4,262,052✔
1329
            taosArrayDestroy(pDataInfo->pSrcUidList);
4,262,052✔
1330
            pDataInfo->pSrcUidList = NULL;
4,262,052✔
1331
          }
1332

1333
          if (TSDB_CODE_SUCCESS != code) {
4,262,052✔
1334
            pTaskInfo->code = code;
×
1335
            taosMemoryFree(pWrapper);
×
1336
            return pTaskInfo->code;
×
1337
          }
1338
        }
1339
        break;
4,538,335✔
1340
      }
1341
      case EX_SRC_TYPE_STB_JOIN_SCAN:
225,025,819✔
1342
      default: {
1343
        if (pDataInfo->pSrcUidList) {
225,025,819✔
1344
          code = buildTableScanOperatorParam(&req.pOpParam,
231,401✔
1345
                                             pDataInfo->pSrcUidList,
1346
                                             pDataInfo->srcOpType,
1347
                                             pDataInfo->tableSeq);
231,401✔
1348
          /* source uid list can be reused in vnode size, so only use once */
1349
          taosArrayDestroy(pDataInfo->pSrcUidList);
231,401✔
1350
          pDataInfo->pSrcUidList = NULL;
231,401✔
1351
          if (TSDB_CODE_SUCCESS != code) {
231,401✔
1352
            pTaskInfo->code = code;
×
1353
            taosMemoryFree(pWrapper);
×
1354
            return pTaskInfo->code;
×
1355
          }
1356
        }
1357
        if (pExchangeInfo->notifyToSend) {
225,033,078✔
1358
          if (NULL == req.pOpParam) {
219,432✔
1359
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
219,432✔
1360
                                                     pDataInfo->srcOpType,
1361
                                                     pExchangeInfo->notifyTs);
1362
            if (TSDB_CODE_SUCCESS != code) {
219,432✔
1363
              pTaskInfo->code = code;
×
1364
              taosMemoryFree(pWrapper);
×
1365
              return pTaskInfo->code;
×
1366
            }
1367
          } else {
1368
            /**
1369
              Currently don't support use the same param for multiple times!
1370
            */
1371
            qError("%s, %s failed, currently don't support use the same param "
×
1372
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
1373
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
1374
            taosMemoryFree(pWrapper);
×
1375
            return pTaskInfo->code;
×
1376
          }
1377
          pExchangeInfo->notifyToSend = false;
219,432✔
1378
        }
1379
        break;
225,035,115✔
1380
      }
1381
    }
1382

1383
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
258,419,303✔
1384
    if (msgSize < 0) {
258,413,903✔
1385
      pTaskInfo->code = msgSize;
×
1386
      taosMemoryFree(pWrapper);
×
1387
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1388
      return pTaskInfo->code;
×
1389
    }
1390

1391
    void* msg = taosMemoryCalloc(1, msgSize);
258,413,903✔
1392
    if (NULL == msg) {
258,407,000✔
1393
      pTaskInfo->code = terrno;
×
1394
      taosMemoryFree(pWrapper);
×
1395
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1396
      return pTaskInfo->code;
×
1397
    }
1398

1399
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
258,407,000✔
1400
    if (msgSize < 0) {
258,415,376✔
1401
      pTaskInfo->code = msgSize;
×
1402
      taosMemoryFree(pWrapper);
×
1403
      taosMemoryFree(msg);
×
1404
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1405
      return pTaskInfo->code;
×
1406
    }
1407

1408
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
258,415,376✔
1409

1410
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
258,417,638✔
1411
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1412
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1413
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1414

1415
    // send the fetch remote task result reques
1416
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
258,419,936✔
1417
    if (NULL == pMsgSendInfo) {
258,412,929✔
1418
      taosMemoryFreeClear(msg);
×
1419
      taosMemoryFree(pWrapper);
×
1420
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1421
      pTaskInfo->code = terrno;
×
1422
      return pTaskInfo->code;
×
1423
    }
1424

1425
    pMsgSendInfo->param = pWrapper;
258,412,929✔
1426
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
258,417,703✔
1427
    pMsgSendInfo->msgInfo.pData = msg;
258,418,516✔
1428
    pMsgSendInfo->msgInfo.len = msgSize;
258,418,072✔
1429
    pMsgSendInfo->msgType = pSource->fetchMsgType;
258,413,563✔
1430
    pMsgSendInfo->fp = loadRemoteDataCallback;
258,416,754✔
1431
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
258,417,380✔
1432

1433
    int64_t transporterId = 0;
258,415,926✔
1434
    void* poolHandle = NULL;
258,421,583✔
1435
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
258,421,583✔
1436
    QUERY_CHECK_CODE(code, lino, _end);
258,420,708✔
1437
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
258,420,708✔
1438
    *pRpcHandle = transporterId;
258,424,004✔
1439
  }
1440

1441
_end:
258,424,469✔
1442
  if (code != TSDB_CODE_SUCCESS) {
258,424,469✔
1443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1444
  }
1445
  return code;
258,423,185✔
1446
}
1447

1448
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
203,613,777✔
1449
                          SOperatorInfo* pOperator) {
1450
  pInfo->totalRows += numOfRows;
203,613,777✔
1451
  pInfo->totalSize += dataLen;
203,613,301✔
1452
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
203,614,505✔
1453
  pOperator->resultInfo.totalRows += numOfRows;
203,613,312✔
1454
}
203,614,505✔
1455

1456
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
470,244,847✔
1457
  int32_t      code = TSDB_CODE_SUCCESS;
470,244,847✔
1458
  int32_t      lino = 0;
470,244,847✔
1459
  SSDataBlock* pBlock = NULL;
470,244,847✔
1460
  if (pColList == NULL) {  // data from other sources
470,245,060✔
1461
    blockDataCleanup(pRes);
465,949,237✔
1462
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
465,948,856✔
1463
    if (code) {
465,946,806✔
1464
      return code;
×
1465
    }
1466
  } else {  // extract data according to pColList
1467
    char* pStart = pData;
4,295,823✔
1468

1469
    int32_t numOfCols = htonl(*(int32_t*)pStart);
4,295,823✔
1470
    pStart += sizeof(int32_t);
4,295,823✔
1471

1472
    // todo refactor:extract method
1473
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
4,295,823✔
1474
    for (int32_t i = 0; i < numOfCols; ++i) {
60,477,052✔
1475
      SSysTableSchema* p = (SSysTableSchema*)pStart;
56,181,229✔
1476

1477
      p->colId = htons(p->colId);
56,181,229✔
1478
      p->bytes = htonl(p->bytes);
56,181,229✔
1479
      pStart += sizeof(SSysTableSchema);
56,181,229✔
1480
    }
1481

1482
    pBlock = NULL;
4,295,823✔
1483
    code = createDataBlock(&pBlock);
4,295,823✔
1484
    QUERY_CHECK_CODE(code, lino, _end);
4,295,823✔
1485

1486
    for (int32_t i = 0; i < numOfCols; ++i) {
60,477,052✔
1487
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
56,181,229✔
1488
      code = blockDataAppendColInfo(pBlock, &idata);
56,181,229✔
1489
      QUERY_CHECK_CODE(code, lino, _end);
56,181,229✔
1490
    }
1491

1492
    code = blockDecodeInternal(pBlock, pStart, NULL);
4,295,823✔
1493
    QUERY_CHECK_CODE(code, lino, _end);
4,295,823✔
1494

1495
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
4,295,823✔
1496
    QUERY_CHECK_CODE(code, lino, _end);
4,295,823✔
1497

1498
    // data from mnode
1499
    pRes->info.dataLoad = 1;
4,295,823✔
1500
    pRes->info.rows = pBlock->info.rows;
4,295,823✔
1501
    pRes->info.scanFlag = MAIN_SCAN;
4,295,823✔
1502
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
4,295,823✔
1503
    QUERY_CHECK_CODE(code, lino, _end);
4,295,823✔
1504

1505
    blockDataDestroy(pBlock);
4,295,823✔
1506
    pBlock = NULL;
4,295,823✔
1507
  }
1508

1509
_end:
470,242,629✔
1510
  if (code != TSDB_CODE_SUCCESS) {
470,241,951✔
1511
    blockDataDestroy(pBlock);
×
1512
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1513
  }
1514
  return code;
470,241,060✔
1515
}
1516

1517
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
110,503,801✔
1518
  SExchangeInfo* pExchangeInfo = pOperator->info;
110,503,801✔
1519
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
110,503,801✔
1520

1521
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
110,503,588✔
1522
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
110,503,588✔
1523
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
110,503,240✔
1524
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1525
         pLoadInfo->totalElapsed / 1000.0);
1526

1527
  setOperatorCompleted(pOperator);
110,503,240✔
1528
}
110,503,240✔
1529

1530
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
359,053,618✔
1531
  int32_t code = TSDB_CODE_SUCCESS;
359,053,618✔
1532
  int32_t lino = 0;
359,053,618✔
1533
  size_t  total = taosArrayGetSize(pArray);
359,053,618✔
1534

1535
  int32_t completed = 0;
359,052,391✔
1536
  for (int32_t k = 0; k < total; ++k) {
1,100,219,358✔
1537
    SSourceDataInfo* p = taosArrayGet(pArray, k);
741,166,972✔
1538
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
741,166,737✔
1539
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
741,166,737✔
1540
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
363,574,565✔
1541
      completed += 1;
363,575,064✔
1542
    }
1543
  }
1544

1545
  *pRes = completed;
359,052,386✔
1546
_end:
359,051,527✔
1547
  if (code != TSDB_CODE_SUCCESS) {
359,051,527✔
1548
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1549
  }
1550
  return code;
359,053,771✔
1551
}
1552

1553
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
132,583,869✔
1554
  SExchangeInfo* pExchangeInfo = pOperator->info;
132,583,869✔
1555
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
132,583,942✔
1556

1557
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
132,584,592✔
1558
  int64_t startTs = taosGetTimestampUs();
132,580,951✔
1559

1560
  // Asynchronously send all fetch requests to all sources.
1561
  for (int32_t i = 0; i < totalSources; ++i) {
353,183,969✔
1562
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
220,602,205✔
1563
    if (code != TSDB_CODE_SUCCESS) {
220,603,018✔
1564
      pTaskInfo->code = code;
×
1565
      return code;
×
1566
    }
1567
  }
1568

1569
  int64_t endTs = taosGetTimestampUs();
132,586,799✔
1570
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
132,586,799✔
1571
         totalSources, (endTs - startTs) / 1000.0);
1572

1573
  pOperator->status = OP_RES_TO_RETURN;
132,587,154✔
1574
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
132,587,154✔
1575
  if (isTaskKilled(pTaskInfo)) {
132,586,800✔
1576
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
465✔
1577
  }
1578

1579
  return TSDB_CODE_SUCCESS;
132,585,377✔
1580
}
1581

1582
/**
1583
  @brief store STEP DONE notification info
1584
*/
1585
void storeNotifyInfo(SOperatorInfo* pOperator) {
4,110,748✔
1586
  SExchangeInfo*  pExchangeInfo = pOperator->info;
4,110,748✔
1587
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4,110,748✔
1588
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,110,748✔
1589

1590
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
4,110,748✔
1591
  if (!pParam->multiParams) {
4,110,748✔
1592
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
4,110,748✔
1593
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
4,110,748✔
1594
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1595
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
1596
      return;
×
1597
    }
1598

1599
    pExchangeInfo->notifyToSend = true;
4,110,748✔
1600
    pExchangeInfo->notifyTs = pBasic->notifyTs;
4,110,748✔
1601
  } else {
1602
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1603
           GET_TASKID(pTaskInfo), __func__);
1604
  }
1605
}
1606

1607
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
199,316,539✔
1608
  int32_t            code = TSDB_CODE_SUCCESS;
199,316,539✔
1609
  int32_t            lino = 0;
199,316,539✔
1610
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
199,316,539✔
1611
  SSDataBlock*       pb = NULL;
199,318,469✔
1612

1613
  char* pNextStart = pRetrieveRsp->data;
199,317,539✔
1614
  char* pStart = pNextStart;
199,317,074✔
1615

1616
  int32_t index = 0;
199,318,004✔
1617

1618
  if (pRetrieveRsp->compressed) {  // decompress the data
199,318,004✔
1619
    if (pDataInfo->decompBuf == NULL) {
×
1620
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1621
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1622
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1623
    } else {
1624
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1625
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1626
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1627
        if (p != NULL) {
×
1628
          pDataInfo->decompBuf = p;
×
1629
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1630
        }
1631
      }
1632
    }
1633
  }
1634

1635
  while (index++ < pRetrieveRsp->numOfBlocks) {
665,267,655✔
1636
    pStart = pNextStart;
465,948,547✔
1637

1638
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
465,948,547✔
1639
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
212,344,827✔
1640
      blockDataCleanup(pb);
212,344,827✔
1641
    } else {
1642
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
253,604,364✔
1643
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
253,604,242✔
1644
    }
1645

1646
    int32_t compLen = *(int32_t*)pStart;
465,948,604✔
1647
    pStart += sizeof(int32_t);
465,949,030✔
1648

1649
    int32_t rawLen = *(int32_t*)pStart;
465,949,954✔
1650
    pStart += sizeof(int32_t);
465,949,024✔
1651
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
465,949,024✔
1652

1653
    pNextStart = pStart + compLen;
465,949,024✔
1654
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
465,949,489✔
1655
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1656
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1657
      pStart = pDataInfo->decompBuf;
×
1658
    }
1659

1660
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
465,948,815✔
1661
    if (code != 0) {
465,944,985✔
1662
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1663
      goto _end;
×
1664
    }
1665

1666
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
465,944,985✔
1667
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
465,949,065✔
1668
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
465,949,065✔
1669
    pb = NULL;
465,950,116✔
1670
  }
1671

1672
_end:
199,318,217✔
1673
  if (code != TSDB_CODE_SUCCESS) {
199,318,217✔
1674
    blockDataDestroy(pb);
×
1675
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1676
  }
1677
  return code;
199,318,217✔
1678
}
1679

1680
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
27,187,402✔
1681
  SExchangeInfo* pExchangeInfo = pOperator->info;
27,187,402✔
1682
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
27,187,402✔
1683

1684
  int32_t code = 0;
27,187,402✔
1685
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
27,187,402✔
1686
  int64_t startTs = taosGetTimestampUs();
27,187,402✔
1687

1688
  int32_t vgId = 0;
27,187,402✔
1689
  if (pExchangeInfo->dynTbname) {
27,187,402✔
1690
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
72,732✔
1691
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
72,732✔
1692
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
72,732✔
1693
      if (pValue != NULL && pValue->isTbname) {
72,732✔
1694
        vgId = pValue->vgId;
72,732✔
1695
        break;
72,732✔
1696
      }
1697
    }
1698
  }
1699

1700
  while (1) {
6,847,800✔
1701
    if (pExchangeInfo->current >= totalSources) {
34,035,202✔
1702
      setAllSourcesCompleted(pOperator);
6,827,342✔
1703
      return TSDB_CODE_SUCCESS;
6,827,342✔
1704
    }
1705

1706
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
27,207,860✔
1707
    if (!pSource) {
27,207,860✔
1708
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1709
      pTaskInfo->code = terrno;
×
1710
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1711
    }
1712

1713
    if (vgId != 0 && pSource->addr.nodeId != vgId){
27,207,860✔
1714
      pExchangeInfo->current += 1;
52,662✔
1715
      continue;
52,662✔
1716
    }
1717

1718
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
27,155,198✔
1719
    if (!pDataInfo) {
27,155,198✔
1720
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1721
      pTaskInfo->code = terrno;
×
1722
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1723
    }
1724
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
27,155,198✔
1725

1726
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
27,155,198✔
1727
    if (code != TSDB_CODE_SUCCESS) {
27,155,198✔
1728
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1729
      pTaskInfo->code = code;
×
1730
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1731
    }
1732

UNCOV
1733
    while (true) {
×
1734
      code = exchangeWait(pOperator, pExchangeInfo);
27,155,198✔
1735
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
27,155,198✔
UNCOV
1736
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1737
      }
1738

1739
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
27,155,198✔
1740
      if (pDataInfo->seqId != currSeqId) {
27,155,198✔
UNCOV
1741
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
UNCOV
1742
        taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
1743
        continue;
×
1744
      }
1745

1746
      break;
27,155,198✔
1747
    }
1748

1749
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
27,155,198✔
1750
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
621✔
1751
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1752
             tstrerror(pDataInfo->code));
1753
      pOperator->pTaskInfo->code = pDataInfo->code;
621✔
1754
      return pOperator->pTaskInfo->code;
621✔
1755
    }
1756

1757
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
27,154,577✔
1758
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
27,154,577✔
1759

1760
    if (pRsp->numOfRows == 0) {
27,154,577✔
1761
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
6,795,138✔
1762
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1763
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1764
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1765

1766
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
6,795,138✔
1767
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
6,795,138✔
1768
        pExchangeInfo->current = totalSources;
6,715,257✔
1769
      } else {
1770
        pExchangeInfo->current += 1;
79,881✔
1771
      }
1772
      taosMemoryFreeClear(pDataInfo->pRsp);
6,795,138✔
1773
      continue;
6,795,138✔
1774
    }
1775

1776
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
20,359,439✔
1777
    if (code != TSDB_CODE_SUCCESS) {
20,359,439✔
1778
      goto _error;
×
1779
    }
1780

1781
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
20,359,439✔
1782
    if (pRsp->completed == 1) {
20,359,439✔
1783
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
112,015✔
1784
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1785
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1786
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1787
             pExchangeInfo->current + 1, totalSources);
1788

1789
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
112,015✔
1790
      if (isVstbScan(pDataInfo)) {
112,015✔
1791
        pExchangeInfo->current = totalSources;
×
1792
      } else {
1793
        pExchangeInfo->current += 1;
112,015✔
1794
      }
1795
    } else {
1796
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
20,247,424✔
1797
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1798
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1799
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1800
    }
1801
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
20,359,439✔
1802
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
20,162,412✔
1803
    }
1804
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
20,359,439✔
1805
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
20,359,439✔
1806

1807
    taosMemoryFreeClear(pDataInfo->pRsp);
20,359,439✔
1808
    return TSDB_CODE_SUCCESS;
20,359,439✔
1809
  }
1810

1811
_error:
×
1812
  pTaskInfo->code = code;
×
1813
  return code;
×
1814
}
1815

1816
void clearVtbScanDataInfo(void* pItem) {
5,694,674✔
1817
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
5,694,674✔
1818
  if (pInfo->orgTbInfo) {
5,694,674✔
1819
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
×
1820
    taosMemoryFreeClear(pInfo->orgTbInfo);
×
1821
  }
1822
  if (pInfo->batchOrgTbInfo) {
5,694,674✔
1823
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
×
1824
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
×
1825
      if (pColMap) {
×
1826
        taosArrayDestroy(pColMap->colMap);
×
1827
      }
1828
    }
1829
    taosArrayDestroy(pInfo->batchOrgTbInfo);
×
1830
  }
1831
  if (pInfo->tagList) {
5,694,674✔
1832
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
×
1833
    pInfo->tagList = NULL;
×
1834
  }
1835
  taosArrayDestroy(pInfo->pSrcUidList);
5,694,674✔
1836
}
5,694,674✔
1837

1838
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
4,262,052✔
1839
  int32_t  code = TSDB_CODE_SUCCESS;
4,262,052✔
1840
  int32_t  lino = 0;
4,262,052✔
1841
  STagVal  dstTag;
4,262,052✔
1842
  bool     needFree = false;
4,262,052✔
1843

1844
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
4,262,052✔
1845
    qError("%s failed since invalid exchange operator param type %d",
×
1846
      __func__, pBasicParam->paramType);
1847
    return TSDB_CODE_INVALID_PARA;
×
1848
  }
1849

1850
  if (pDataInfo->tagList) {
4,262,052✔
1851
    taosArrayClear(pDataInfo->tagList);
×
1852
  }
1853

1854
  if (pBasicParam->tagList) {
4,262,052✔
1855
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
1,973,744✔
1856
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
1,973,744✔
1857

1858
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
13,346,068✔
1859
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
11,372,324✔
1860
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
11,372,324✔
1861

1862
      dstTag = (STagVal){0};
11,372,324✔
1863
      dstTag.type = pSrcTag->type;
11,372,324✔
1864
      dstTag.cid = pSrcTag->cid;
11,372,324✔
1865
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
11,372,324✔
1866
        dstTag.nData = pSrcTag->nData;
4,988,760✔
1867
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
4,988,760✔
1868
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
4,988,760✔
1869
        needFree = true;
4,988,760✔
1870
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
4,988,760✔
1871
      } else {
1872
        dstTag.i64 = pSrcTag->i64;
6,383,564✔
1873
      }
1874

1875
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
22,744,648✔
1876
      needFree = false;
11,372,324✔
1877
    }
1878
  } else {
1879
    pDataInfo->tagList = NULL;
2,288,308✔
1880
  }
1881

1882
  return code;
4,262,052✔
1883
_return:
×
1884
  if (needFree) {
×
1885
    taosMemoryFreeClear(dstTag.pData);
×
1886
  }
1887
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1888
  return code;
×
1889
}
1890

1891
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
4,262,052✔
1892
  int32_t     code = TSDB_CODE_SUCCESS;
4,262,052✔
1893
  int32_t     lino = 0;
4,262,052✔
1894
  SOrgTbInfo  dstOrgTbInfo = {0};
4,262,052✔
1895
  bool        needFree = false;
4,262,052✔
1896

1897
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
4,262,052✔
1898
    qError("%s failed since invalid exchange operator param type %d",
×
1899
      __func__, pBasicParam->paramType);
1900
    return TSDB_CODE_INVALID_PARA;
×
1901
  }
1902

1903
  if (pBasicParam->batchOrgTbInfo) {
4,262,052✔
1904
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
4,262,052✔
1905
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
4,262,052✔
1906

1907
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
11,310,368✔
1908
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
7,048,316✔
1909
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
7,048,316✔
1910

1911
      dstOrgTbInfo = (SOrgTbInfo){0};
7,048,316✔
1912
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
7,048,316✔
1913
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
7,048,316✔
1914

1915
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
7,048,316✔
1916
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
7,048,316✔
1917

1918
      needFree = true;
7,048,316✔
1919
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
14,096,632✔
1920
      needFree = false;
7,048,316✔
1921
    }
1922
  } else {
1923
    pBasicParam->batchOrgTbInfo = NULL;
×
1924
  }
1925

1926
  return code;
4,262,052✔
1927
_return:
×
1928
  if (needFree) {
×
1929
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
1930
  }
1931
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1932
  return code;
×
1933
}
1934

1935
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
32,383,891✔
1936
                                SExchangeOperatorBasicParam* pBasicParam) {
1937
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
32,383,891✔
1938
    qWarn("%s, %s found invalid exchange operator param type %d",
×
1939
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
1940
    return TSDB_CODE_SUCCESS;
×
1941
  }
1942

1943
  int32_t            code = TSDB_CODE_SUCCESS;
32,383,891✔
1944
  int32_t            lino = 0;
32,383,891✔
1945
  SExchangeInfo*     pExchangeInfo = pOperator->info;
32,383,891✔
1946
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
32,383,891✔
1947

1948
  if (NULL == pIdx) {
32,383,891✔
1949
    if (pBasicParam->isNewDeployed) {
2,321✔
1950
      SDownstreamSourceNode *pNode = NULL;
2,321✔
1951
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,321✔
1952
      QUERY_CHECK_CODE(code, lino, _return);
2,321✔
1953

1954
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,321✔
1955
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,321✔
1956
      QUERY_CHECK_CODE(code, lino, _return);
2,321✔
1957

1958
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,321✔
1959
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,321✔
1960

1961
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,321✔
1962
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,321✔
1963
      if (pExchangeInfo->pHashSources) {
2,321✔
1964
        QUERY_CHECK_CODE(code, lino, _return);
2,321✔
1965
      }
1966
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,321✔
1967
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,321✔
1968
    } else {
1969
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1970
      return TSDB_CODE_INVALID_PARA;
×
1971
    }
1972
  }
1973

1974
  qDebug("start to add single exchange source");
32,383,891✔
1975

1976
  switch (pBasicParam->type) {
32,383,891✔
1977
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
4,262,052✔
1978
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1979
      if (pIdx->inUseIdx < 0) {
4,262,052✔
1980
        SSourceDataInfo dataInfo = {0};
2,066,128✔
1981
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2,066,128✔
1982
        dataInfo.taskId = pExchangeInfo->pTaskId;
2,066,128✔
1983
        dataInfo.index = pIdx->srcIdx;
2,066,128✔
1984
        dataInfo.groupid = pBasicParam->groupid;
2,066,128✔
1985
        dataInfo.window = pBasicParam->window;
2,066,128✔
1986
        dataInfo.isNewParam = pBasicParam->isNewParam;
2,066,128✔
1987
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
2,066,128✔
1988
        QUERY_CHECK_CODE(code, lino, _return);
2,066,128✔
1989

1990
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
2,066,128✔
1991
        QUERY_CHECK_CODE(code, lino, _return);
2,066,128✔
1992

1993
        dataInfo.orgTbInfo = NULL;
2,066,128✔
1994

1995
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,066,128✔
1996
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
2,066,128✔
1997

1998
        dataInfo.type = pBasicParam->type;
2,066,128✔
1999
        dataInfo.srcOpType = pBasicParam->srcOpType;
2,066,128✔
2000
        dataInfo.tableSeq = pBasicParam->tableSeq;
2,066,128✔
2001

2002
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
4,132,256✔
2003

2004
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
2,066,128✔
2005
      } else {
2006
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,195,924✔
2007
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,195,924✔
2008

2009
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,195,924✔
2010
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,195,924✔
2011
        }
2012

2013
        pDataInfo->taskId = pExchangeInfo->pTaskId;
2,195,924✔
2014
        pDataInfo->index = pIdx->srcIdx;
2,195,924✔
2015
        pDataInfo->window = pBasicParam->window;
2,195,924✔
2016
        pDataInfo->groupid = pBasicParam->groupid;
2,195,924✔
2017
        pDataInfo->isNewParam = pBasicParam->isNewParam;
2,195,924✔
2018

2019
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
2,195,924✔
2020
        QUERY_CHECK_CODE(code, lino, _return);
2,195,924✔
2021

2022
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
2,195,924✔
2023
        QUERY_CHECK_CODE(code, lino, _return);
2,195,924✔
2024

2025
        pDataInfo->orgTbInfo = NULL;
2,195,924✔
2026

2027
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,195,924✔
2028
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,195,924✔
2029

2030
        pDataInfo->type = pBasicParam->type;
2,195,924✔
2031
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,195,924✔
2032
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,195,924✔
2033
      }
2034
      break;
4,262,052✔
2035
    }
2036
    case EX_SRC_TYPE_VTB_WIN_SCAN:
4,323,525✔
2037
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2038
      SSourceDataInfo dataInfo = {0};
4,323,525✔
2039
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
4,323,525✔
2040
      dataInfo.taskId = pExchangeInfo->pTaskId;
4,323,525✔
2041
      dataInfo.index = pIdx->srcIdx;
4,323,525✔
2042
      dataInfo.window = pBasicParam->window;
4,323,525✔
2043
      dataInfo.groupid = 0;
4,323,525✔
2044
      dataInfo.orgTbInfo = NULL;
4,323,525✔
2045
      dataInfo.tagList = NULL;
4,323,525✔
2046

2047
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
4,323,525✔
2048
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
4,323,525✔
2049

2050
      dataInfo.isNewParam = false;
4,323,525✔
2051
      dataInfo.type = pBasicParam->type;
4,323,525✔
2052
      dataInfo.srcOpType = pBasicParam->srcOpType;
4,323,525✔
2053
      dataInfo.tableSeq = pBasicParam->tableSeq;
4,323,525✔
2054

2055
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
4,323,525✔
2056
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
8,647,050✔
2057
      break;
4,323,525✔
2058
    }
2059
    case EX_SRC_TYPE_VSTB_SCAN: {
23,566,913✔
2060
      SSourceDataInfo dataInfo = {0};
23,566,913✔
2061
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
23,566,913✔
2062
      dataInfo.taskId = pExchangeInfo->pTaskId;
23,566,913✔
2063
      dataInfo.index = pIdx->srcIdx;
23,566,700✔
2064
      dataInfo.window = pBasicParam->window;
23,566,913✔
2065
      dataInfo.groupid = 0;
23,566,913✔
2066
      dataInfo.isNewParam = pBasicParam->isNewParam;
23,566,913✔
2067
      dataInfo.tagList = NULL;
23,566,700✔
2068
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
23,566,700✔
2069
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
23,566,913✔
2070
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
23,566,913✔
2071
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
23,566,913✔
2072
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
23,566,700✔
2073
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
23,566,913✔
2074

2075
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
23,566,700✔
2076
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
23,566,913✔
2077

2078
      dataInfo.type = pBasicParam->type;
23,566,913✔
2079
      dataInfo.srcOpType = pBasicParam->srcOpType;
23,566,913✔
2080
      dataInfo.tableSeq = pBasicParam->tableSeq;
23,566,913✔
2081

2082
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
23,566,913✔
2083
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
47,133,826✔
2084
      break;
23,566,913✔
2085
    }
2086
    case EX_SRC_TYPE_STB_JOIN_SCAN:
231,401✔
2087
    default: {
2088
      if (pIdx->inUseIdx < 0) {
231,401✔
2089
        SSourceDataInfo dataInfo = {0};
229,205✔
2090
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
229,205✔
2091
        dataInfo.taskId = pExchangeInfo->pTaskId;
229,205✔
2092
        dataInfo.index = pIdx->srcIdx;
229,205✔
2093
        dataInfo.groupid = 0;
229,205✔
2094
        dataInfo.tagList = NULL;
229,205✔
2095

2096
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
229,205✔
2097
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
229,205✔
2098

2099
        dataInfo.isNewParam = false;
229,205✔
2100
        dataInfo.type = pBasicParam->type;
229,205✔
2101
        dataInfo.srcOpType = pBasicParam->srcOpType;
229,205✔
2102
        dataInfo.tableSeq = pBasicParam->tableSeq;
229,205✔
2103

2104
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
458,410✔
2105

2106
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
229,205✔
2107
      } else {
2108
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,196✔
2109
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,196✔
2110
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,196✔
2111
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,196✔
2112
        }
2113

2114
        pDataInfo->tagList = NULL;
2,196✔
2115
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,196✔
2116
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,196✔
2117

2118
        pDataInfo->groupid = 0;
2,196✔
2119
        pDataInfo->isNewParam = false;
2,196✔
2120
        pDataInfo->type = pBasicParam->type;
2,196✔
2121
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,196✔
2122
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,196✔
2123
      }
2124
      break;
231,401✔
2125
    }
2126
  }
2127

2128
  return code;
32,383,891✔
2129
_return:
×
2130
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2131
  return code;
×
2132
}
2133

2134
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
31,907,739✔
2135
  SExchangeInfo*               pExchangeInfo = pOperator->info;
31,907,739✔
2136
  int32_t                      code = TSDB_CODE_SUCCESS;
31,907,526✔
2137
  SExchangeOperatorBasicParam* pBasicParam = NULL;
31,907,526✔
2138
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
31,907,526✔
2139
  if (pParam->multiParams) {
31,907,526✔
2140
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
4,012,909✔
2141
    int32_t                      iter = 0;
4,012,909✔
2142
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
8,501,970✔
2143
      code = addSingleExchangeSource(pOperator, pBasicParam);
4,489,061✔
2144
      if (code) {
4,489,061✔
2145
        return code;
×
2146
      }
2147
    }
2148
  } else {
2149
    pBasicParam = &pParam->basic;
27,894,617✔
2150
    code = addSingleExchangeSource(pOperator, pBasicParam);
27,894,830✔
2151
  }
2152

2153
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
31,907,526✔
2154
  pOperator->pOperatorGetParam = NULL;
31,907,739✔
2155

2156
  return code;
31,907,739✔
2157
}
2158

2159
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
638,044,879✔
2160
  SExchangeInfo* pExchangeInfo = pOperator->info;
638,044,879✔
2161
  int32_t        code = TSDB_CODE_SUCCESS;
638,047,976✔
2162
  int32_t        lino = 0;
638,047,976✔
2163
  
2164
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
638,047,976✔
2165
       NULL == pOperator->pOperatorGetParam) ||
470,154,523✔
2166
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
171,994,747✔
2167
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
472,483,678✔
2168
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2169
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2170
    return TSDB_CODE_SUCCESS;
472,484,259✔
2171
  }
2172

2173
  if (pExchangeInfo->dynamicOp) {
165,559,933✔
2174
    code = addDynamicExchangeSource(pOperator);
31,907,739✔
2175
    QUERY_CHECK_CODE(code, lino, _end);
31,907,739✔
2176
  }
2177

2178
  if (pOperator->status == OP_NOT_OPENED &&
165,560,640✔
2179
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
155,774,925✔
2180
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
141,237,671✔
2181
    pExchangeInfo->current = 0;
30,354,692✔
2182
  }
2183

2184
  if (NULL != pOperator->pOperatorGetParam) {
165,560,495✔
2185
    storeNotifyInfo(pOperator);
4,110,748✔
2186
    /**
2187
      The param is referenced by getParam, and it will be freed by
2188
      the parent operator after getting next block.
2189
    */
2190
    pOperator->pOperatorGetParam->reUse = false;
4,110,748✔
2191
    pOperator->pOperatorGetParam = NULL;
4,110,748✔
2192
  }
2193

2194
  int64_t st = taosGetTimestampUs();
165,558,530✔
2195

2196
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
165,558,530✔
2197
    code = prepareConcurrentlyLoad(pOperator);
132,580,536✔
2198
    QUERY_CHECK_CODE(code, lino, _end);
132,585,377✔
2199
    pExchangeInfo->openedTs = taosGetTimestampUs();
132,585,377✔
2200
  }
2201

2202
  OPTR_SET_OPENED(pOperator);
165,563,142✔
2203
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
165,562,272✔
2204

2205
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
165,560,870✔
2206

2207
_end:
85,304,968✔
2208
  if (code != TSDB_CODE_SUCCESS) {
165,560,858✔
2209
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2210
    pOperator->pTaskInfo->code = code;
×
2211
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2212
  }
2213
  return code;
165,560,858✔
2214
}
2215

2216
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
3,937,997✔
2217
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,937,997✔
2218

2219
  if (pLimitInfo->remainGroupOffset > 0) {
3,937,997✔
2220
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2221
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2222
      blockDataCleanup(pBlock);
×
2223
      return PROJECT_RETRIEVE_CONTINUE;
×
2224
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2225
      // now it is the data from a new group
2226
      pLimitInfo->remainGroupOffset -= 1;
×
2227

2228
      // ignore data block in current group
2229
      if (pLimitInfo->remainGroupOffset > 0) {
×
2230
        blockDataCleanup(pBlock);
×
2231
        return PROJECT_RETRIEVE_CONTINUE;
×
2232
      }
2233
    }
2234

2235
    // set current group id of the project operator
2236
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2237
  }
2238

2239
  // here check for a new group data, we need to handle the data of the previous group.
2240
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
3,937,997✔
2241
    pLimitInfo->numOfOutputGroups += 1;
160,732✔
2242
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
160,732✔
2243
      pOperator->status = OP_EXEC_DONE;
×
2244
      blockDataCleanup(pBlock);
×
2245

2246
      return PROJECT_RETRIEVE_DONE;
×
2247
    }
2248

2249
    // reset the value for a new group data
2250
    resetLimitInfoForNextGroup(pLimitInfo);
160,732✔
2251
    // existing rows that belongs to previous group.
2252
    if (pBlock->info.rows > 0) {
160,732✔
2253
      return PROJECT_RETRIEVE_DONE;
160,732✔
2254
    }
2255
  }
2256

2257
  // here we reach the start position, according to the limit/offset requirements.
2258

2259
  // set current group id
2260
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,777,265✔
2261

2262
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,777,265✔
2263
  if (pBlock->info.rows == 0) {
3,777,265✔
2264
    return PROJECT_RETRIEVE_CONTINUE;
1,902,519✔
2265
  } else {
2266
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,874,746✔
2267
      setOperatorCompleted(pOperator);
×
2268
      return PROJECT_RETRIEVE_DONE;
×
2269
    }
2270
  }
2271

2272
  // todo optimize performance
2273
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2274
  // they may not belong to the same group the limit/offset value is not valid in this case.
2275
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,874,746✔
2276
    return PROJECT_RETRIEVE_DONE;
1,874,746✔
2277
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2278
    return PROJECT_RETRIEVE_CONTINUE;
×
2279
  }
2280
}
2281

2282
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
257,273,115✔
2283
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
257,273,115✔
2284
  int32_t        code = TSDB_CODE_SUCCESS;
257,274,858✔
2285
  if (pTask->pWorkerCb) {
257,274,858✔
2286
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
257,274,393✔
2287
    if (code != TSDB_CODE_SUCCESS) {
257,275,323✔
2288
      pTask->code = code;
×
2289
      return pTask->code;
×
2290
    }
2291
  }
2292

2293
  code = tsem_wait(&pExchangeInfo->ready);
257,276,253✔
2294
  if (code != TSDB_CODE_SUCCESS) {
257,273,715✔
2295
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2296
    pTask->code = code;
×
2297
    return pTask->code;
×
2298
  }
2299

2300
  if (pTask->pWorkerCb) {
257,273,715✔
2301
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
257,275,323✔
2302
    if (code != TSDB_CODE_SUCCESS) {
257,276,001✔
2303
      pTask->code = code;
×
2304
      return pTask->code;
×
2305
    }
2306
  }
2307
  return TSDB_CODE_SUCCESS;
257,275,323✔
2308
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc