• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4965

09 Feb 2026 01:16AM UTC coverage: 66.884% (-0.02%) from 66.908%
#4965

push

travis-ci

web-flow
docs: add support for recording STMT to CSV files (#34276)

* docs: add support for recording STMT to CSV files

* docs: update version for STMT recording feature in CSV files

205798 of 307696 relevant lines covered (66.88%)

126382200.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.24
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
} SSourceDataInfo;
58

59
static void destroyExchangeOperatorInfo(void* param);
60
static void freeBlock(void* pParam);
61
static void freeSourceDataInfo(void* param);
62
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
63

64
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
65
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
66
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
67
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
68
static void    storeNotifyInfo(SOperatorInfo* pOperator);
69
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
70
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
71
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
72
                                 bool holdDataInBuf);
73
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
74

75
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
76

77
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
24,783,558✔
78
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
79
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
80
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
15,795,356✔
81
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
82

83

84
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
9,825,236✔
85
                                             SExchangeInfo* pExchangeInfo,
86
                                             SExecTaskInfo* pTaskInfo) {
87
  int32_t code = 0;
9,825,236✔
88
  int32_t lino = 0;
9,825,236✔
89
  int64_t startTs = taosGetTimestampUs();  
9,826,389✔
90
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
9,826,389✔
91
  int32_t completed = 0;
9,826,161✔
92
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
9,826,161✔
93
  if (code != TSDB_CODE_SUCCESS) {
9,826,389✔
94
    pTaskInfo->code = code;
×
95
    T_LONG_JMP(pTaskInfo->env, code);
×
96
  }
97
  if (completed == totalSources) {
9,826,389✔
98
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
1,869,247✔
99
    setAllSourcesCompleted(pOperator);
1,869,247✔
100
    return;
1,871,233✔
101
  }
102

103
  SSourceDataInfo* pDataInfo = NULL;
7,957,142✔
104

105
  while (1) {
4,643,436✔
106
    if (pExchangeInfo->current < 0) {
12,600,578✔
107
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
130,208✔
108
      setAllSourcesCompleted(pOperator);
130,208✔
109
      return;
130,208✔
110
    }
111
    
112
    if (pExchangeInfo->current >= totalSources) {
12,469,917✔
113
      completed = 0;
5,680,949✔
114
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
5,680,949✔
115
      if (code != TSDB_CODE_SUCCESS) {
5,680,949✔
116
        pTaskInfo->code = code;
×
117
        T_LONG_JMP(pTaskInfo->env, code);
×
118
      }
119
      if (completed == totalSources) {
5,680,949✔
120
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
3,973,779✔
121
        setAllSourcesCompleted(pOperator);
3,973,779✔
122
        return;
3,973,779✔
123
      }
124
      
125
      pExchangeInfo->current = 0;
1,707,170✔
126
    }
127

128
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
8,496,591✔
129

130
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
8,496,591✔
131
    if (!pDataInfo) {
8,496,366✔
132
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
133
      pTaskInfo->code = terrno;
×
134
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
135
    }
136

137
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
8,496,366✔
138
      pExchangeInfo->current++;
1,030✔
139
      continue;
1,030✔
140
    }
141

142
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
8,495,336✔
143

144
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
8,495,336✔
145
    if (code != TSDB_CODE_SUCCESS) {
8,495,561✔
146
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
147
      pTaskInfo->code = code;
×
148
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
149
    }
150

151
    while (true) {
748✔
152
      code = exchangeWait(pOperator, pExchangeInfo);
8,496,309✔
153
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
8,495,038✔
154
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
155
      }
156

157
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
8,495,561✔
158
      if (pDataInfo->seqId != currSeqId) {
8,495,561✔
159
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
748✔
160
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
161
        taosMemoryFreeClear(pDataInfo->pRsp);
748✔
162
        continue;
748✔
163
      }
164

165
      break;
8,494,065✔
166
    }
167

168
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
8,494,065✔
169
    if (!pSource) {
8,494,813✔
170
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
171
      pTaskInfo->code = terrno;
×
172
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
173
    }
174

175
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
8,494,813✔
176
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
177
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
178
             tstrerror(pDataInfo->code));
179
      pTaskInfo->code = pDataInfo->code;
×
180
      T_LONG_JMP(pTaskInfo->env, code);
×
181
    }
182

183
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
8,494,813✔
184
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
8,494,813✔
185

186
    if (pRsp->numOfRows == 0) {
8,494,813✔
187
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
4,642,406✔
188
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
189
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
190
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
191

192
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
4,642,406✔
193
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
4,642,406✔
194
        pExchangeInfo->current = -1;
130,208✔
195
      } else {
196
        pExchangeInfo->current += 1;
4,512,198✔
197
      }
198
      taosMemoryFreeClear(pDataInfo->pRsp);
4,642,406✔
199
      continue;
4,642,406✔
200
    }
201

202
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,852,162✔
203
    TAOS_CHECK_EXIT(code);
3,852,407✔
204

205
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,852,407✔
206
    if (pRsp->completed == 1) {
3,852,407✔
207
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
2,009,083✔
208
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
209
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
210
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
211
             pExchangeInfo->current + 1, totalSources);
212

213
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,009,083✔
214
      if (isVstbScan(pDataInfo)) {
2,009,083✔
215
        pExchangeInfo->current = -1;
×
216
        taosMemoryFreeClear(pDataInfo->pRsp);
×
217
        continue;
×
218
      }
219
    } else {
220
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
1,843,324✔
221
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
222
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
223
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
224
    }
225

226
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
3,852,407✔
227
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,852,407✔
228

229
    pExchangeInfo->current++;
3,852,407✔
230

231
    taosMemoryFreeClear(pDataInfo->pRsp);
3,852,407✔
232
    return;
3,852,407✔
233
  }
234

235
_exit:
×
236

237
  if (code) {
×
238
    pTaskInfo->code = code;
×
239
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
240
  }
241
}
242

243

244
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
214,014,529✔
245
                                           SExecTaskInfo* pTaskInfo) {
246
  int32_t code = 0;
214,014,529✔
247
  int32_t lino = 0;
214,014,529✔
248
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
214,014,529✔
249
  int32_t completed = 0;
214,010,668✔
250
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
214,012,597✔
251
  if (code != TSDB_CODE_SUCCESS) {
214,014,045✔
252
    pTaskInfo->code = code;
×
253
    T_LONG_JMP(pTaskInfo->env, code);
×
254
  }
255
  if (completed == totalSources) {
214,014,045✔
256
    setAllSourcesCompleted(pOperator);
70,806,883✔
257
    return;
70,808,001✔
258
  }
259

260
  SSourceDataInfo* pDataInfo = NULL;
143,207,162✔
261

262
  while (1) {
16,866,649✔
263
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
160,073,811✔
264
    code = exchangeWait(pOperator, pExchangeInfo);
160,073,811✔
265

266
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
160,074,517✔
267
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,692✔
268
    }
269

270
    for (int32_t i = 0; i < totalSources; ++i) {
238,340,223✔
271
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
238,339,245✔
272
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
238,339,713✔
273
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
238,339,713✔
274
        continue;
53,554,757✔
275
      }
276

277
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
184,785,950✔
278
        continue;
24,712,533✔
279
      }
280

281
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
160,073,417✔
282
      if (pDataInfo->seqId != currSeqId) {
160,072,550✔
283
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
284
        taosMemoryFreeClear(pDataInfo->pRsp);
×
285
        break;
×
286
      }
287

288
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
160,072,933✔
289
        code = pDataInfo->code;
954✔
290
        TAOS_CHECK_EXIT(code);
954✔
291
      }
292

293
      tmemory_barrier();
160,071,979✔
294
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
160,071,979✔
295
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
160,071,979✔
296
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
160,071,626✔
297

298
      // todo
299
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
160,071,626✔
300
      if (pRsp->numOfRows == 0) {
160,071,596✔
301
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
36,454,593✔
302
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
303
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
304
          if (code != TSDB_CODE_SUCCESS) {
×
305
            taosMemoryFreeClear(pDataInfo->pRsp);
×
306
            TAOS_CHECK_EXIT(code);
×
307
          }
308
        } else {
309
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
36,454,593✔
310
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
36,454,240✔
311
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
312
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
313
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
314
          taosMemoryFreeClear(pDataInfo->pRsp);
36,454,240✔
315
        }
316
        break;
36,454,240✔
317
      }
318

319
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
123,617,870✔
320

321
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
123,617,517✔
322
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
123,617,870✔
323
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
123,617,870✔
324

325
      if (pRsp->completed == 1) {
123,617,359✔
326
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
112,417,426✔
327
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
112,416,562✔
328
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
329
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
330
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
331
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
332
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
333
      } else {
334
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
11,200,444✔
335
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
336
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
337
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
338
      }
339

340
      taosMemoryFreeClear(pDataInfo->pRsp);
123,617,006✔
341

342
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
123,617,164✔
343
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
11,200,444✔
344
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
11,200,444✔
345
        if (code != TSDB_CODE_SUCCESS) {
11,200,444✔
346
          taosMemoryFreeClear(pDataInfo->pRsp);
×
347
          TAOS_CHECK_EXIT(code);
×
348
        }
349
      }
350
      
351
      return;
123,617,621✔
352
    }  // end loop
353

354
    int32_t complete1 = 0;
36,455,218✔
355
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
36,454,240✔
356
    if (code != TSDB_CODE_SUCCESS) {
36,454,240✔
357
      pTaskInfo->code = code;
×
358
      T_LONG_JMP(pTaskInfo->env, code);
×
359
    }
360
    if (complete1 == totalSources) {
36,454,240✔
361
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
19,587,591✔
362
      return;
19,587,238✔
363
    }
364
  }
365

366
_exit:
954✔
367

368
  if (code) {
954✔
369
    pTaskInfo->code = code;
954✔
370
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
954✔
371
  }
372
}
373

374
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
521,830,325✔
375
  int32_t        code = TSDB_CODE_SUCCESS;
521,830,325✔
376
  SExchangeInfo* pExchangeInfo = pOperator->info;
521,830,325✔
377
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
521,831,214✔
378

379
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
521,830,722✔
380

381
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
521,829,873✔
382
  if (pOperator->status == OP_EXEC_DONE) {
521,829,156✔
383
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
384
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
385
           pLoadInfo->totalElapsed / 1000.0);
386
    return NULL;
×
387
  }
388

389
  // we have buffered retrieved datablock, return it directly
390
  SSDataBlock* p = NULL;
521,830,949✔
391
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
521,828,905✔
392
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
270,718,080✔
393
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
270,716,289✔
394
  }
395

396
  if (p != NULL) {
521,830,489✔
397
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
270,718,314✔
398
    if (!tmp) {
270,719,738✔
399
      code = terrno;
×
400
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
401
      pTaskInfo->code = code;
×
402
      T_LONG_JMP(pTaskInfo->env, code);
×
403
    }
404
    return p;
270,719,738✔
405
  } else {
406
    if (pExchangeInfo->seqLoadData) {
251,112,175✔
407
      code = seqLoadRemoteData(pOperator);
27,273,293✔
408
      if (code != TSDB_CODE_SUCCESS) {
27,272,144✔
409
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
634✔
410
        pTaskInfo->code = code;
634✔
411
        T_LONG_JMP(pTaskInfo->env, code);
634✔
412
      }
413
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
223,840,331✔
414
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
9,825,933✔
415
    } else {
416
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
214,014,882✔
417
    }
418
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
251,109,390✔
419
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
954✔
420
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
954✔
421
    }
422
    
423
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
251,108,789✔
424
      qDebug("empty resultBlockList");
103,218,160✔
425
      return NULL;
103,218,160✔
426
    } else {
427
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
147,890,982✔
428
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
147,891,335✔
429
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
147,890,982✔
430
      if (!tmp) {
147,890,856✔
431
        code = terrno;
×
432
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
433
        pTaskInfo->code = code;
×
434
        T_LONG_JMP(pTaskInfo->env, code);
×
435
      }
436

437
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
147,890,856✔
438
      return p;
147,890,982✔
439
    }
440
  }
441
}
442

443
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
519,921,087✔
444
  int32_t        code = TSDB_CODE_SUCCESS;
519,921,087✔
445
  int32_t        lino = 0;
519,921,087✔
446
  SExchangeInfo* pExchangeInfo = pOperator->info;
519,921,087✔
447
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
519,922,946✔
448

449
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
519,923,455✔
450

451
  code = pOperator->fpSet._openFn(pOperator);
519,924,446✔
452
  QUERY_CHECK_CODE(code, lino, _end);
519,924,140✔
453

454
  if (pOperator->status == OP_EXEC_DONE) {
519,924,140✔
455
    (*ppRes) = NULL;
135,114✔
456
    return code;
135,114✔
457
  }
458

459
  while (1) {
2,041,385✔
460
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
521,830,406✔
461
    if (pBlock == NULL) {
521,829,103✔
462
      (*ppRes) = NULL;
103,218,160✔
463
      return code;
103,218,160✔
464
    }
465

466
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
418,610,943✔
467
    QUERY_CHECK_CODE(code, lino, _end);
418,608,449✔
468

469
    if (blockDataGetNumOfRows(pBlock) == 0) {
418,608,449✔
470
      qDebug("rows 0 block got, continue next load");
1,010✔
471
      continue;
1,010✔
472
    }
473

474
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
418,609,111✔
475
    if (hasLimitOffsetInfo(pLimitInfo)) {
418,608,758✔
476
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
4,185,532✔
477
      if (status == PROJECT_RETRIEVE_CONTINUE) {
4,185,532✔
478
        qDebug("limit retrieve continue");
2,040,375✔
479
        continue;
2,040,375✔
480
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,145,157✔
481
        if (pBlock->info.rows == 0) {
2,145,157✔
482
          setOperatorCompleted(pOperator);
×
483
          (*ppRes) = NULL;
×
484
          return code;
×
485
        } else {
486
          (*ppRes) = pBlock;
2,145,157✔
487
          return code;
2,145,157✔
488
        }
489
      }
490
    } else {
491
      (*ppRes) = pBlock;
414,422,327✔
492
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
414,422,080✔
493
      return code;
414,422,294✔
494
    }
495
  }
496

497
_end:
×
498

499
  if (code != TSDB_CODE_SUCCESS) {
×
500
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
501
    pTaskInfo->code = code;
×
502
    T_LONG_JMP(pTaskInfo->env, code);
×
503
  } else {
504
    qDebug("empty block returned in exchange");
×
505
  }
506
  
507
  (*ppRes) = NULL;
×
508
  return code;
×
509
}
510

511
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
99,835,089✔
512
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
99,835,089✔
513
  if (pInfo->pSourceDataInfo == NULL) {
99,838,483✔
514
    return terrno;
×
515
  }
516

517
  if (pInfo->dynamicOp) {
99,837,103✔
518
    return TSDB_CODE_SUCCESS;
5,977,008✔
519
  }
520

521
  int32_t len = strlen(id) + 1;
93,861,123✔
522
  pInfo->pTaskId = taosMemoryCalloc(1, len);
93,861,123✔
523
  if (!pInfo->pTaskId) {
93,861,092✔
524
    return terrno;
×
525
  }
526
  tstrncpy(pInfo->pTaskId, id, len);
93,858,873✔
527
  for (int32_t i = 0; i < numOfSources; ++i) {
239,511,476✔
528
    SSourceDataInfo dataInfo = {0};
145,648,465✔
529
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
145,646,398✔
530
    dataInfo.taskId = pInfo->pTaskId;
145,646,398✔
531
    dataInfo.index = i;
145,649,186✔
532
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
145,649,186✔
533
    if (pDs == NULL) {
145,651,675✔
534
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
535
      return terrno;
×
536
    }
537
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
145,651,675✔
538
  }
539

540
  return TSDB_CODE_SUCCESS;
93,863,011✔
541
}
542

543
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
99,837,642✔
544
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
99,837,642✔
545

546
  if (numOfSources == 0) {
99,834,809✔
547
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
548
    return TSDB_CODE_INVALID_PARA;
×
549
  }
550
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
99,834,809✔
551
  if (!pInfo->pFetchRpcHandles) {
99,839,508✔
552
    return terrno;
×
553
  }
554
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
99,837,527✔
555
  if (!ret) {
99,835,003✔
556
    return terrno;
×
557
  }
558

559
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
99,835,003✔
560
  if (pInfo->pSources == NULL) {
99,836,502✔
561
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
562
    return terrno;
×
563
  }
564

565
  if (pExNode->node.dynamicOp) {
99,836,266✔
566
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
5,977,008✔
567
    if (NULL == pInfo->pHashSources) {
5,977,008✔
568
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
569
      return terrno;
×
570
    }
571
  }
572

573
  for (int32_t i = 0; i < numOfSources; ++i) {
257,972,460✔
574
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
158,133,975✔
575
    if (!pNode) {
158,131,623✔
576
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
577
      return terrno;
×
578
    }
579
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
158,131,623✔
580
    if (!tmp) {
158,135,032✔
581
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
582
      return terrno;
×
583
    }
584
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
158,135,032✔
585
    int32_t           code =
586
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
158,133,632✔
587
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
158,134,647✔
588
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
589
      return code;
×
590
    }
591
  }
592

593
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
99,838,485✔
594
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
99,838,497✔
595
  if (refId < 0) {
99,833,762✔
596
    int32_t code = terrno;
×
597
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
598
    return code;
×
599
  } else {
600
    pInfo->self = refId;
99,833,762✔
601
  }
602

603
  return initDataSource(numOfSources, pInfo, id);
99,834,416✔
604
}
605

606
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
8,926,296✔
607
  SExchangeInfo* pInfo = pOper->info;
8,926,296✔
608
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
8,926,784✔
609

610
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
8,926,529✔
611

612
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
8,926,529✔
613
  pOper->status = OP_NOT_OPENED;
8,926,783✔
614
  pInfo->current = 0;
8,926,783✔
615
  pInfo->loadInfo.totalElapsed = 0;
8,926,784✔
616
  pInfo->loadInfo.totalRows = 0;
8,926,781✔
617
  pInfo->loadInfo.totalSize = 0;
8,927,009✔
618
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
23,673,638✔
619
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
14,746,400✔
620
    taosWLockLatch(&pDataInfo->lock);
14,746,854✔
621
    taosMemoryFreeClear(pDataInfo->decompBuf);
14,746,629✔
622
    taosMemoryFreeClear(pDataInfo->pRsp);
14,746,854✔
623

624
    pDataInfo->totalRows = 0;
14,746,629✔
625
    pDataInfo->code = 0;
14,746,854✔
626
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
14,746,628✔
627
    pDataInfo->fetchSent = false;
14,746,854✔
628
    taosWUnLockLatch(&pDataInfo->lock);
14,746,628✔
629
  }
630

631
  if (pInfo->dynamicOp) {
8,926,558✔
632
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,358,569✔
633
  } 
634

635
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
8,926,559✔
636
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
8,926,784✔
637

638
  blockDataCleanup(pInfo->pDummyBlock);
8,927,009✔
639

640
  void   *data = NULL;
8,926,784✔
641
  int32_t iter = 0;
8,926,784✔
642
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
11,631,839✔
643
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
2,705,053✔
644
  }
645
  
646
  pInfo->limitInfo = (SLimitInfo){0};
8,926,561✔
647
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
8,926,561✔
648

649
  return 0;
8,926,558✔
650
}
651

652
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
99,838,153✔
653
                                   SOperatorInfo** pOptrInfo) {
654
  QRY_PARAM_CHECK(pOptrInfo);
99,838,153✔
655

656
  int32_t        code = 0;
99,839,144✔
657
  int32_t        lino = 0;
99,839,144✔
658
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
99,839,144✔
659
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
99,836,663✔
660
  if (pInfo == NULL || pOperator == NULL) {
99,833,631✔
661
    code = terrno;
×
662
    goto _error;
×
663
  }
664

665
  pOperator->pPhyNode = pExNode;
99,833,631✔
666
  pInfo->dynamicOp = pExNode->node.dynamicOp;
99,834,142✔
667
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
99,834,496✔
668
  QUERY_CHECK_CODE(code, lino, _error);
99,838,829✔
669

670
  code = tsem_init(&pInfo->ready, 0, 0);
99,838,829✔
671
  QUERY_CHECK_CODE(code, lino, _error);
99,838,949✔
672

673
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
99,838,949✔
674
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
99,839,507✔
675

676
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
99,837,761✔
677
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
99,838,432✔
678
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
99,838,785✔
679
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
99,838,801✔
680

681
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
99,839,313✔
682
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
99,839,666✔
683
  QUERY_CHECK_CODE(code, lino, _error);
99,838,119✔
684

685
  pInfo->seqLoadData = pExNode->seqRecvData;
99,838,119✔
686
  pInfo->dynTbname = pExNode->dynTbname;
99,838,631✔
687
  if (pInfo->dynTbname) {
99,839,143✔
688
    pInfo->seqLoadData = true;
12,020✔
689
  }
690
  pInfo->pTransporter = pTransporter;
99,838,818✔
691

692
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
99,837,607✔
693
                  pTaskInfo);
694
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
99,837,761✔
695

696
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
99,837,261✔
697
                            pTaskInfo->pStreamRuntimeInfo);
99,838,421✔
698
  QUERY_CHECK_CODE(code, lino, _error);
99,836,886✔
699
  qTrace("%s exchange op:%p", __func__, pOperator);
99,836,886✔
700
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
99,836,886✔
701
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
702
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
99,835,719✔
703
  *pOptrInfo = pOperator;
99,837,266✔
704
  return TSDB_CODE_SUCCESS;
99,836,754✔
705

706
_error:
×
707
  if (code != TSDB_CODE_SUCCESS) {
×
708
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
709
    pTaskInfo->code = code;
×
710
  }
711
  if (pInfo != NULL) {
×
712
    doDestroyExchangeOperatorInfo(pInfo);
×
713
  }
714

715
  if (pOperator != NULL) {
×
716
    pOperator->info = NULL;
×
717
    destroyOperator(pOperator);
×
718
  }
719
  return code;
×
720
}
721

722
void destroyExchangeOperatorInfo(void* param) {
99,838,325✔
723
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
99,838,325✔
724
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
99,838,325✔
725
  if (code != TSDB_CODE_SUCCESS) {
99,837,237✔
726
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
727
  }
728
}
99,837,237✔
729

730
void freeBlock(void* pParam) {
276,685,633✔
731
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
276,685,633✔
732
  blockDataDestroy(pBlock);
276,686,816✔
733
}
276,687,232✔
734

735
void freeSourceDataInfo(void* p) {
149,779,047✔
736
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
149,779,047✔
737
  taosMemoryFreeClear(pInfo->decompBuf);
149,779,047✔
738
  taosMemoryFreeClear(pInfo->pRsp);
149,780,869✔
739

740
  pInfo->decompBufSize = 0;
149,780,404✔
741
}
149,781,362✔
742

743
void doDestroyExchangeOperatorInfo(void* param) {
99,838,325✔
744
  if (param == NULL) {
99,838,325✔
745
    return;
×
746
  }
747
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
99,838,325✔
748
  if (pExInfo->pFetchRpcHandles) {
99,838,325✔
749
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
257,977,160✔
750
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
158,138,099✔
751
      if (*pRpcHandle > 0) {
158,138,452✔
752
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
6,776,030✔
753
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
6,776,030✔
754
      }
755
    }
756
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
99,839,061✔
757
  }
758

759
  taosArrayDestroy(pExInfo->pSources);
99,839,540✔
760
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
99,837,666✔
761

762
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
99,836,306✔
763
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
99,836,823✔
764

765
  blockDataDestroy(pExInfo->pDummyBlock);
99,839,144✔
766
  tSimpleHashCleanup(pExInfo->pHashSources);
99,837,758✔
767

768
  int32_t code = tsem_destroy(&pExInfo->ready);
99,836,914✔
769
  if (code != TSDB_CODE_SUCCESS) {
99,837,226✔
770
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
771
  }
772
  taosMemoryFreeClear(pExInfo->pTaskId);
99,837,226✔
773

774
  taosMemoryFreeClear(param);
99,834,358✔
775
}
776

777
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
196,518,731✔
778
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
196,518,731✔
779

780
  taosMemoryFreeClear(pMsg->pEpSet);
196,518,731✔
781
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
196,548,981✔
782
  if (pExchangeInfo == NULL) {
196,542,938✔
783
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
16,672✔
784
    taosMemoryFree(pMsg->pData);
16,672✔
785
    return TSDB_CODE_SUCCESS;
16,672✔
786
  }
787

788
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
196,526,266✔
789
  if (pWrapper->seqId != currSeqId) {
196,534,342✔
790
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
791
    taosMemoryFree(pMsg->pData);
×
792
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
793
    if (code != TSDB_CODE_SUCCESS) {
×
794
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
795
    }
796
    return TSDB_CODE_SUCCESS;
×
797
  }
798

799
  int32_t          index = pWrapper->sourceIndex;
196,514,936✔
800

801
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
196,517,430✔
802

803
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
196,523,954✔
804
  if (pRpcHandle != NULL) {
196,533,191✔
805
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
196,533,434✔
806
    if (ret != 0) {
196,486,102✔
807
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
8,758,907✔
808
    }
809
    *pRpcHandle = -1;
196,486,102✔
810
  }
811

812
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
196,497,395✔
813
  if (!pSourceDataInfo) {
196,511,591✔
814
    return terrno;
×
815
  }
816

817
  if (0 == code && NULL == pMsg->pData) {
196,511,591✔
818
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
819
    code = TSDB_CODE_QRY_INVALID_MSG;
×
820
  }
821

822
  taosWLockLatch(&pSourceDataInfo->lock);
196,511,729✔
823
  if (code == TSDB_CODE_SUCCESS) {
196,543,091✔
824
    pSourceDataInfo->seqId = pWrapper->seqId;
196,539,182✔
825
    pSourceDataInfo->pRsp = pMsg->pData;
196,507,043✔
826

827
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
196,528,723✔
828
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
196,492,945✔
829
    pRsp->compLen = htonl(pRsp->compLen);
196,491,869✔
830
    pRsp->payloadLen = htonl(pRsp->payloadLen);
196,484,662✔
831
    pRsp->numOfCols = htonl(pRsp->numOfCols);
196,502,901✔
832
    pRsp->useconds = htobe64(pRsp->useconds);
196,447,829✔
833
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
196,436,474✔
834

835
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
196,467,215✔
836
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
837
  } else {
838
    taosMemoryFree(pMsg->pData);
3,909✔
839
    pSourceDataInfo->code = rpcCvtErrCode(code);
3,909✔
840
    if (pSourceDataInfo->code != code) {
3,909✔
841
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
842
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
843
    } else {
844
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
3,909✔
845
             pExchangeInfo);
846
    }
847
  }
848

849
  tmemory_barrier();
196,480,814✔
850
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
196,480,814✔
851
  taosWUnLockLatch(&pSourceDataInfo->lock);
196,509,134✔
852
  
853
  code = tsem_post(&pExchangeInfo->ready);
196,530,923✔
854
  if (code != TSDB_CODE_SUCCESS) {
196,529,146✔
855
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
856
    return code;
×
857
  }
858

859
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
196,529,146✔
860
  if (code != TSDB_CODE_SUCCESS) {
196,546,157✔
861
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
862
  }
863
  return code;
196,538,897✔
864
}
865

866
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
252,835✔
867
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
252,835✔
868
  if (NULL == *ppRes) {
252,835✔
869
    return terrno;
×
870
  }
871

872
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
252,835✔
873
  if (NULL == pScan) {
252,835✔
874
    taosMemoryFreeClear(*ppRes);
×
875
    return terrno;
×
876
  }
877

878
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
252,835✔
879
  pScan->pUidList = taosArrayDup(pUidList, NULL);
252,835✔
880
  if (NULL == pScan->pUidList) {
252,835✔
881
    taosMemoryFree(pScan);
×
882
    taosMemoryFreeClear(*ppRes);
×
883
    return terrno;
×
884
  }
885
  pScan->dynType = DYN_TYPE_STB_JOIN;
252,835✔
886
  pScan->tableSeq = tableSeq;
252,835✔
887
  pScan->pOrgTbInfo = NULL;
252,835✔
888
  pScan->pBatchTbInfo = NULL;
252,835✔
889
  pScan->pTagList = NULL;
252,835✔
890
  pScan->isNewParam = false;
252,835✔
891
  pScan->window.skey = INT64_MAX;
252,835✔
892
  pScan->window.ekey = INT64_MIN;
252,835✔
893
  pScan->notifyToProcess = false;
252,835✔
894
  pScan->notifyTs = 0;
252,835✔
895

896
  (*ppRes)->opType = srcOpType;
252,835✔
897
  (*ppRes)->downstreamIdx = 0;
252,835✔
898
  (*ppRes)->value = pScan;
252,835✔
899
  (*ppRes)->pChildren = NULL;
252,835✔
900
  (*ppRes)->reUse = false;
252,835✔
901

902
  return TSDB_CODE_SUCCESS;
252,835✔
903
}
904

905
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
24,373,822✔
906
  int32_t                  code = TSDB_CODE_SUCCESS;
24,373,822✔
907
  int32_t                  lino = 0;
24,373,822✔
908
  STableScanOperatorParam* pScan = NULL;
24,373,822✔
909

910
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
24,373,822✔
911
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
24,373,822✔
912

913
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
24,373,822✔
914
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
24,373,822✔
915

916
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
24,373,822✔
917
  if (pUidList) {
24,373,822✔
918
    pScan->pUidList = taosArrayDup(pUidList, NULL);
24,373,822✔
919
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
24,373,822✔
920
  } else {
921
    pScan->pUidList = NULL;
×
922
  }
923

924
  if (pMap) {
24,373,822✔
925
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
23,671,468✔
926
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
23,671,468✔
927

928
    pScan->pOrgTbInfo->vgId = pMap->vgId;
23,671,468✔
929
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
23,671,468✔
930

931
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
23,671,468✔
932
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
23,671,468✔
933
  } else {
934
    pScan->pOrgTbInfo = NULL;
702,354✔
935
  }
936
  pScan->pTagList = NULL;
24,373,822✔
937
  pScan->pBatchTbInfo = NULL;
24,373,822✔
938

939

940
  pScan->dynType = type;
24,373,822✔
941
  pScan->tableSeq = tableSeq;
24,373,822✔
942
  pScan->window.skey = window->skey;
24,373,822✔
943
  pScan->window.ekey = window->ekey;
24,373,822✔
944
  pScan->isNewParam = isNewParam;
24,373,822✔
945
  pScan->notifyToProcess = false;
24,373,822✔
946
  pScan->notifyTs = 0;
24,373,822✔
947
  (*ppRes)->opType = srcOpType;
24,373,822✔
948
  (*ppRes)->downstreamIdx = 0;
24,373,822✔
949
  (*ppRes)->value = pScan;
24,373,822✔
950
  (*ppRes)->pChildren = NULL;
24,373,822✔
951
  (*ppRes)->reUse = false;
24,373,822✔
952

953
  return code;
24,373,822✔
954
_return:
×
955
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
956
  taosMemoryFreeClear(*ppRes);
×
957
  if (pScan) {
×
958
    taosArrayDestroy(pScan->pUidList);
×
959
    if (pScan->pOrgTbInfo) {
×
960
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
961
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
962
    }
963
    taosMemoryFree(pScan);
×
964
  }
965
  return code;
×
966
}
967

968
/**
969
  @brief build the table scan operator param for notify message
970
*/
971
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
218,948✔
972
                                          int32_t srcOpType, TSKEY notifyTs) {
973
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
218,948✔
974
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
975
    return TSDB_CODE_INVALID_PARA;
×
976
  }
977
  int32_t code = TSDB_CODE_SUCCESS;
218,948✔
978
  int32_t lino = 0;
218,948✔
979
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
218,948✔
980
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
218,948✔
981

982
  STableScanOperatorParam* pTsParam =
437,896✔
983
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
218,948✔
984
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
218,948✔
985

986
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
218,948✔
987
  pTsParam->notifyToProcess = true;
218,948✔
988
  pTsParam->notifyTs = notifyTs;
218,948✔
989

990
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
218,948✔
991
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
992
  (*ppRes)->downstreamIdx = 0;
218,948✔
993
  (*ppRes)->value = pTsParam;
218,948✔
994
  (*ppRes)->pChildren = NULL;
218,948✔
995
  /* param is not reusable when it is transferred by message */
996
  (*ppRes)->reUse = false;
218,948✔
997

998
_return:
218,948✔
999
  if (TSDB_CODE_SUCCESS != code) {
218,948✔
1000
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1001
           __func__, lino, tstrerror(code));
1002
    taosMemoryFreeClear(*ppRes);
×
1003
    if (pTsParam) {
×
1004
      taosMemoryFree(pTsParam);
×
1005
    }
1006
  }
1007
  return code;
218,948✔
1008
}
1009

1010
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
4,086,898✔
1011
  int32_t                  code = TSDB_CODE_SUCCESS;
4,086,898✔
1012
  int32_t                  lino = 0;
4,086,898✔
1013
  STableScanOperatorParam* pScan = NULL;
4,086,898✔
1014

1015
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,086,898✔
1016
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
4,086,898✔
1017

1018
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
4,086,898✔
1019
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
4,086,898✔
1020

1021
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
4,086,898✔
1022
  pScan->groupid = groupid;
4,086,898✔
1023
  if (pUidList) {
4,086,898✔
1024
    pScan->pUidList = taosArrayDup(pUidList, NULL);
4,086,898✔
1025
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
4,086,898✔
1026
  } else {
1027
    pScan->pUidList = NULL;
×
1028
  }
1029
  pScan->pOrgTbInfo = NULL;
4,086,898✔
1030

1031
  if (pBatchMap) {
4,086,898✔
1032
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
4,086,898✔
1033
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
4,086,898✔
1034
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
10,943,479✔
1035
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
6,856,581✔
1036
      SOrgTbInfo batchInfo = {0};
6,856,581✔
1037
      batchInfo.vgId = pSrcInfo->vgId;
6,856,581✔
1038
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
6,856,581✔
1039
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
6,856,581✔
1040
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
6,856,581✔
1041
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
6,856,581✔
1042
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
6,856,581✔
1043
    }
1044
  } else {
1045
    pScan->pBatchTbInfo = NULL;
×
1046
  }
1047

1048
  if (pTagList) {
4,086,898✔
1049
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
1,892,152✔
1050
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
1,892,152✔
1051

1052
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
12,791,156✔
1053
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
10,899,004✔
1054
      STagVal  dstTag;
10,899,004✔
1055
      dstTag.type = pSrcTag->type;
10,899,004✔
1056
      dstTag.cid = pSrcTag->cid;
10,899,004✔
1057
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
10,899,004✔
1058
        dstTag.nData = pSrcTag->nData;
4,780,728✔
1059
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
4,780,728✔
1060
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
4,780,728✔
1061
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
4,780,728✔
1062
      } else {
1063
        dstTag.i64 = pSrcTag->i64;
6,118,276✔
1064
      }
1065

1066
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
21,798,008✔
1067
    }
1068
  } else {
1069
    pScan->pTagList = NULL;
2,194,746✔
1070
  }
1071

1072

1073
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
4,086,898✔
1074
  pScan->tableSeq = tableSeq;
4,086,898✔
1075
  pScan->window.skey = window->skey;
4,086,898✔
1076
  pScan->window.ekey = window->ekey;
4,086,898✔
1077
  pScan->isNewParam = isNewParam;
4,086,898✔
1078
  pScan->notifyToProcess = false;
4,086,898✔
1079
  pScan->notifyTs = 0;
4,086,898✔
1080
  (*ppRes)->opType = srcOpType;
4,086,898✔
1081
  (*ppRes)->downstreamIdx = 0;
4,086,898✔
1082
  (*ppRes)->value = pScan;
4,086,898✔
1083
  (*ppRes)->pChildren = NULL;
4,086,898✔
1084
  (*ppRes)->reUse = false;
4,086,898✔
1085

1086
  return code;
4,086,898✔
1087
_return:
×
1088
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1089
  taosMemoryFreeClear(*ppRes);
×
1090
  if (pScan) {
×
1091
    taosArrayDestroy(pScan->pUidList);
×
1092
    if (pScan->pBatchTbInfo) {
×
1093
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1094
    }
1095
    taosMemoryFree(pScan);
×
1096
  }
1097
  return code;
×
1098
}
1099

1100
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam, EExchangeSourceType type) {
4,086,898✔
1101
  int32_t                  code = TSDB_CODE_SUCCESS;
4,086,898✔
1102
  int32_t                  lino = 0;
4,086,898✔
1103
  SOperatorParam*          pParam = NULL;
4,086,898✔
1104

1105
  switch (type) {
4,086,898✔
1106
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
3,882,914✔
1107
      pParam = taosMemoryMalloc(sizeof(SOperatorParam));
3,882,914✔
1108
      QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
3,882,914✔
1109

1110
      pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
3,882,914✔
1111
      QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
3,882,914✔
1112

1113
      SOperatorParam* pTableScanParam = NULL;
3,882,914✔
1114
      code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
3,882,914✔
1115
      QUERY_CHECK_CODE(code, lino, _return);
3,882,914✔
1116

1117
      QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
7,765,828✔
1118

1119
      pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
3,882,914✔
1120
      pParam->downstreamIdx = 0;
3,882,914✔
1121
      pParam->value = NULL;
3,882,914✔
1122
      pParam->reUse = false;
3,882,914✔
1123

1124
      break;
3,882,914✔
1125
    }
1126
    case EX_SRC_TYPE_VSTB_WIN_SCAN: {
203,984✔
1127
      code = buildTableScanOperatorParamBatchInfo(&pParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
203,984✔
1128
      QUERY_CHECK_CODE(code, lino, _return);
203,984✔
1129
      break;
203,984✔
1130
    }
1131
    default: {
×
1132
      code = TSDB_CODE_INVALID_PARA;
×
1133
      qError("%s failed at %d, invalid exchange source type:%d", __FUNCTION__, lino, type);
×
1134
      goto _return;
×
1135
    }
1136
  }
1137

1138
  *ppRes = pParam;
4,086,898✔
1139
  return code;
4,086,898✔
1140

1141
_return:
×
1142
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1143
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1144
  return code;
×
1145
}
1146

1147
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
3,467,585✔
1148
  int32_t                  code = TSDB_CODE_SUCCESS;
3,467,585✔
1149
  int32_t                  lino = 0;
3,467,585✔
1150
  STagScanOperatorParam*   pScan = NULL;
3,467,585✔
1151

1152
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,467,585✔
1153
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,467,585✔
1154

1155
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
3,467,585✔
1156
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,467,585✔
1157
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
3,467,585✔
1158

1159
  (*ppRes)->opType = srcOpType;
3,467,585✔
1160
  (*ppRes)->downstreamIdx = 0;
3,467,585✔
1161
  (*ppRes)->value = pScan;
3,467,585✔
1162
  (*ppRes)->pChildren = NULL;
3,467,585✔
1163
  (*ppRes)->reUse = false;
3,467,585✔
1164

1165
  return code;
3,467,585✔
1166
_return:
×
1167
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1168
  taosMemoryFreeClear(*ppRes);
×
1169
  if (pScan) {
×
1170
    taosMemoryFree(pScan);
×
1171
  }
1172
  return code;
×
1173
}
1174

1175
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,600,574✔
1176
  if (!pRuntimeInfo || !pTimeRange) {
4,600,574✔
1177
    return TSDB_CODE_INTERNAL_ERROR;
×
1178
  }
1179

1180
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,600,574✔
1181
  if (!pParam) {
4,600,574✔
1182
    return TSDB_CODE_INTERNAL_ERROR;
×
1183
  }
1184

1185
  switch (pRuntimeInfo->triggerType) {
4,600,574✔
1186
    case STREAM_TRIGGER_SLIDING:
3,466,102✔
1187
      // Unable to distinguish whether there is an interval, all use wstart/wend
1188
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1189
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,466,102✔
1190
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,466,102✔
1191
      break;
3,466,102✔
1192
    case STREAM_TRIGGER_PERIOD:
245,357✔
1193
      pTimeRange->skey = pParam->prevLocalTime;
245,357✔
1194
      pTimeRange->ekey = pParam->triggerTime;
245,357✔
1195
      break;
245,357✔
1196
    default:
889,115✔
1197
      pTimeRange->skey = pParam->wstart;
889,115✔
1198
      pTimeRange->ekey = pParam->wend;
888,890✔
1199
      break;
889,115✔
1200
  }
1201

1202
  return TSDB_CODE_SUCCESS;
4,600,574✔
1203
}
1204

1205
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
205,998,365✔
1206
  int32_t          code = TSDB_CODE_SUCCESS;
205,998,365✔
1207
  int32_t          lino = 0;
205,998,365✔
1208
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
205,998,365✔
1209
  if (!pDataInfo) {
205,999,003✔
1210
    return terrno;
×
1211
  }
1212

1213
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
205,999,003✔
1214
    return TSDB_CODE_SUCCESS;
9,364,107✔
1215
  }
1216

1217
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
196,636,510✔
1218
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
196,635,504✔
1219
  if (!pSource) {
196,635,036✔
1220
    return terrno;
×
1221
  }
1222

1223
  pDataInfo->startTime = taosGetTimestampUs();
196,635,400✔
1224
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
196,636,552✔
1225

1226
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
196,633,248✔
1227
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
196,631,656✔
1228
  pWrapper->exchangeId = pExchangeInfo->self;
196,631,656✔
1229
  pWrapper->sourceIndex = sourceIndex;
196,636,062✔
1230
  pWrapper->seqId = pExchangeInfo->seqId;
196,635,500✔
1231

1232
  if (pSource->localExec) {
196,633,928✔
1233
    SDataBuf pBuf = {0};
×
1234
    int32_t  code =
1235
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1236
                                  pTaskInfo->id.queryId, pSource->clientId,
1237
                                  pSource->taskId, 0, pSource->execId,
1238
                                  &pBuf.pData,
1239
                                  pTaskInfo->localFetch.explainRes);
1240
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1241
    taosMemoryFree(pWrapper);
×
1242
    QUERY_CHECK_CODE(code, lino, _end);
×
1243
  } else {
1244
    bool needStreamPesudoFuncVals = true;
196,636,582✔
1245
    SResFetchReq req = {0};
196,636,582✔
1246
    req.header.vgId = pSource->addr.nodeId;
196,635,348✔
1247
    req.sId = pSource->sId;
196,635,450✔
1248
    req.clientId = pSource->clientId;
196,633,920✔
1249
    req.taskId = pSource->taskId;
196,637,809✔
1250
    req.queryId = pTaskInfo->id.queryId;
196,634,406✔
1251
    req.execId = pSource->execId;
196,635,340✔
1252
    if (pTaskInfo->pStreamRuntimeInfo) {
196,633,459✔
1253
      req.dynTbname = pExchangeInfo->dynTbname;
8,574,008✔
1254
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
8,574,008✔
1255
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
8,573,780✔
1256

1257
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
8,573,555✔
1258
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
34,898✔
1259
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
8,538,887✔
1260
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,600,574✔
1261
        QUERY_CHECK_CODE(code, lino, _end);
4,600,574✔
1262
        needStreamPesudoFuncVals = false;
4,600,574✔
1263
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,600,574✔
1264
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1265
               req.pStRtFuncInfo->curWindow.ekey);
1266
      }
1267
      if (!pDataInfo->fetchSent) {
8,573,785✔
1268
        req.reset = pDataInfo->fetchSent = true;
6,798,060✔
1269
      }
1270
    }
1271

1272
    switch (pDataInfo->type) {
196,636,622✔
1273
      case EX_SRC_TYPE_VSTB_SCAN: {
23,671,468✔
1274
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
23,671,468✔
1275
        taosArrayDestroy(pDataInfo->orgTbInfo->colMap);
23,671,468✔
1276
        taosMemoryFreeClear(pDataInfo->orgTbInfo);
23,671,468✔
1277
        taosArrayDestroy(pDataInfo->pSrcUidList);
23,671,468✔
1278
        pDataInfo->pSrcUidList = NULL;
23,671,468✔
1279
        if (TSDB_CODE_SUCCESS != code) {
23,671,468✔
1280
          pTaskInfo->code = code;
×
1281
          taosMemoryFree(pWrapper);
×
1282
          return pTaskInfo->code;
×
1283
        }
1284
        break;
23,671,468✔
1285
      }
1286
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
1,484,678✔
1287
        if (pDataInfo->pSrcUidList) {
1,484,678✔
1288
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
702,354✔
1289
          taosArrayDestroy(pDataInfo->pSrcUidList);
702,354✔
1290
          pDataInfo->pSrcUidList = NULL;
702,354✔
1291
          if (TSDB_CODE_SUCCESS != code) {
702,354✔
1292
            pTaskInfo->code = code;
×
1293
            taosMemoryFree(pWrapper);
×
1294
            return pTaskInfo->code;
×
1295
          }
1296
        }
1297
        break;
1,484,678✔
1298
      }
1299
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
3,467,585✔
1300
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
3,467,585✔
1301
        taosArrayDestroy(pDataInfo->pSrcUidList);
3,467,585✔
1302
        pDataInfo->pSrcUidList = NULL;
3,467,585✔
1303
        if (TSDB_CODE_SUCCESS != code) {
3,467,585✔
1304
          pTaskInfo->code = code;
×
1305
          taosMemoryFree(pWrapper);
×
1306
          return pTaskInfo->code;
×
1307
        }
1308
        break;
3,467,585✔
1309
      }
1310
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
4,365,055✔
1311
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1312
        if (pDataInfo->batchOrgTbInfo) {
4,365,055✔
1313
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, pDataInfo->type);
4,086,898✔
1314
          if (pDataInfo->batchOrgTbInfo) {
4,086,898✔
1315
            for (int32_t i = 0; i < taosArrayGetSize(pDataInfo->batchOrgTbInfo); ++i) {
10,943,479✔
1316
              SOrgTbInfo* pColMap = taosArrayGet(pDataInfo->batchOrgTbInfo, i);
6,856,581✔
1317
              if (pColMap) {
6,856,581✔
1318
                taosArrayDestroy(pColMap->colMap);
6,856,581✔
1319
              }
1320
            }
1321
            taosArrayDestroy(pDataInfo->batchOrgTbInfo);
4,086,898✔
1322
            pDataInfo->batchOrgTbInfo = NULL;
4,086,898✔
1323
          }
1324
          if (pDataInfo->tagList) {
4,086,898✔
1325
            taosArrayDestroyEx(pDataInfo->tagList, destroyTagVal);
1,892,152✔
1326
            pDataInfo->tagList = NULL;
1,892,152✔
1327
          }
1328
          if (pDataInfo->pSrcUidList) {
4,086,898✔
1329
            taosArrayDestroy(pDataInfo->pSrcUidList);
4,086,898✔
1330
            pDataInfo->pSrcUidList = NULL;
4,086,898✔
1331
          }
1332

1333
          if (TSDB_CODE_SUCCESS != code) {
4,086,898✔
1334
            pTaskInfo->code = code;
×
1335
            taosMemoryFree(pWrapper);
×
1336
            return pTaskInfo->code;
×
1337
          }
1338
        }
1339
        break;
4,365,055✔
1340
      }
1341
      case EX_SRC_TYPE_STB_JOIN_SCAN:
163,643,188✔
1342
      default: {
1343
        if (pDataInfo->pSrcUidList) {
163,643,188✔
1344
          code = buildTableScanOperatorParam(&req.pOpParam,
241,075✔
1345
                                             pDataInfo->pSrcUidList,
1346
                                             pDataInfo->srcOpType,
1347
                                             pDataInfo->tableSeq);
241,075✔
1348
          /* source uid list can be reused in vnode size, so only use once */
1349
          taosArrayDestroy(pDataInfo->pSrcUidList);
241,075✔
1350
          pDataInfo->pSrcUidList = NULL;
241,075✔
1351
          if (TSDB_CODE_SUCCESS != code) {
241,075✔
1352
            pTaskInfo->code = code;
×
1353
            taosMemoryFree(pWrapper);
×
1354
            return pTaskInfo->code;
×
1355
          }
1356
        }
1357
        if (pExchangeInfo->notifyToSend) {
163,646,318✔
1358
          if (NULL == req.pOpParam) {
218,948✔
1359
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
218,948✔
1360
                                                     pDataInfo->srcOpType,
1361
                                                     pExchangeInfo->notifyTs);
1362
            if (TSDB_CODE_SUCCESS != code) {
218,948✔
1363
              pTaskInfo->code = code;
×
1364
              taosMemoryFree(pWrapper);
×
1365
              return pTaskInfo->code;
×
1366
            }
1367
          } else {
1368
            /**
1369
              Currently don't support use the same param for multiple times!
1370
            */
1371
            qError("%s, %s failed, currently don't support use the same param "
×
1372
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
1373
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
1374
            taosMemoryFree(pWrapper);
×
1375
            return pTaskInfo->code;
×
1376
          }
1377
          pExchangeInfo->notifyToSend = false;
218,948✔
1378
        }
1379
        break;
163,647,086✔
1380
      }
1381
    }
1382

1383
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
196,635,872✔
1384
    if (msgSize < 0) {
196,632,049✔
1385
      pTaskInfo->code = msgSize;
×
1386
      taosMemoryFree(pWrapper);
×
1387
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1388
      return pTaskInfo->code;
×
1389
    }
1390

1391
    void* msg = taosMemoryCalloc(1, msgSize);
196,632,049✔
1392
    if (NULL == msg) {
196,624,939✔
1393
      pTaskInfo->code = terrno;
×
1394
      taosMemoryFree(pWrapper);
×
1395
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1396
      return pTaskInfo->code;
×
1397
    }
1398

1399
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
196,624,939✔
1400
    if (msgSize < 0) {
196,628,869✔
1401
      pTaskInfo->code = msgSize;
×
1402
      taosMemoryFree(pWrapper);
×
1403
      taosMemoryFree(msg);
×
1404
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1405
      return pTaskInfo->code;
×
1406
    }
1407

1408
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
196,628,869✔
1409

1410
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
196,630,019✔
1411
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1412
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1413
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1414

1415
    // send the fetch remote task result reques
1416
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
196,636,641✔
1417
    if (NULL == pMsgSendInfo) {
196,632,120✔
1418
      taosMemoryFreeClear(msg);
×
1419
      taosMemoryFree(pWrapper);
×
1420
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1421
      pTaskInfo->code = terrno;
×
1422
      return pTaskInfo->code;
×
1423
    }
1424

1425
    pMsgSendInfo->param = pWrapper;
196,632,120✔
1426
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
196,634,405✔
1427
    pMsgSendInfo->msgInfo.pData = msg;
196,634,107✔
1428
    pMsgSendInfo->msgInfo.len = msgSize;
196,636,727✔
1429
    pMsgSendInfo->msgType = pSource->fetchMsgType;
196,635,432✔
1430
    pMsgSendInfo->fp = loadRemoteDataCallback;
196,634,913✔
1431
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
196,636,110✔
1432

1433
    int64_t transporterId = 0;
196,636,152✔
1434
    void* poolHandle = NULL;
196,634,231✔
1435
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
196,634,231✔
1436
    QUERY_CHECK_CODE(code, lino, _end);
196,636,218✔
1437
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
196,636,218✔
1438
    *pRpcHandle = transporterId;
196,636,954✔
1439
  }
1440

1441
_end:
196,638,424✔
1442
  if (code != TSDB_CODE_SUCCESS) {
196,638,424✔
1443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1444
  }
1445
  return code;
196,638,424✔
1446
}
1447

1448
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
152,573,140✔
1449
                          SOperatorInfo* pOperator) {
1450
  pInfo->totalRows += numOfRows;
152,573,140✔
1451
  pInfo->totalSize += dataLen;
152,573,493✔
1452
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
152,573,140✔
1453
  pOperator->resultInfo.totalRows += numOfRows;
152,573,140✔
1454
}
152,573,493✔
1455

1456
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan) {
430,275,564✔
1457
  int32_t      code = TSDB_CODE_SUCCESS;
430,275,564✔
1458
  int32_t      lino = 0;
430,275,564✔
1459
  SSDataBlock* pBlock = NULL;
430,275,564✔
1460
  if (isVstbScan) {
430,275,786✔
1461
    blockDataCleanup(pRes);
16,805,928✔
1462
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
16,805,928✔
1463
    QUERY_CHECK_CODE(code, lino, _end);
16,805,928✔
1464
  }
1465
  if (pColList == NULL) {  // data from other sources
430,275,786✔
1466
    blockDataCleanup(pRes);
425,593,628✔
1467
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
425,592,833✔
1468
    QUERY_CHECK_CODE(code, lino, _end);
425,592,419✔
1469
  } else {  // extract data according to pColList
1470
    char* pStart = pData;
4,682,158✔
1471

1472
    int32_t numOfCols = htonl(*(int32_t*)pStart);
4,682,158✔
1473
    pStart += sizeof(int32_t);
4,682,158✔
1474

1475
    // todo refactor:extract method
1476
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
4,682,158✔
1477
    for (int32_t i = 0; i < numOfCols; ++i) {
66,039,658✔
1478
      SSysTableSchema* p = (SSysTableSchema*)pStart;
61,357,500✔
1479

1480
      p->colId = htons(p->colId);
61,357,500✔
1481
      p->bytes = htonl(p->bytes);
61,357,500✔
1482
      pStart += sizeof(SSysTableSchema);
61,357,500✔
1483
    }
1484

1485
    pBlock = NULL;
4,682,158✔
1486
    code = createDataBlock(&pBlock);
4,682,158✔
1487
    QUERY_CHECK_CODE(code, lino, _end);
4,682,158✔
1488

1489
    for (int32_t i = 0; i < numOfCols; ++i) {
66,039,658✔
1490
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
61,357,500✔
1491
      code = blockDataAppendColInfo(pBlock, &idata);
61,357,500✔
1492
      QUERY_CHECK_CODE(code, lino, _end);
61,357,500✔
1493
    }
1494

1495
    code = blockDecodeInternal(pBlock, pStart, NULL);
4,682,158✔
1496
    QUERY_CHECK_CODE(code, lino, _end);
4,682,158✔
1497

1498
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
4,682,158✔
1499
    QUERY_CHECK_CODE(code, lino, _end);
4,682,158✔
1500

1501
    // data from mnode
1502
    pRes->info.dataLoad = 1;
4,682,158✔
1503
    pRes->info.rows = pBlock->info.rows;
4,682,158✔
1504
    pRes->info.scanFlag = MAIN_SCAN;
4,682,158✔
1505
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
4,682,158✔
1506
    QUERY_CHECK_CODE(code, lino, _end);
4,682,158✔
1507

1508
    blockDataDestroy(pBlock);
4,682,158✔
1509
    pBlock = NULL;
4,682,158✔
1510
  }
1511

1512
_end:
430,274,577✔
1513
  if (code != TSDB_CODE_SUCCESS) {
430,274,192✔
1514
    blockDataDestroy(pBlock);
×
1515
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1516
  }
1517
  return code;
430,274,192✔
1518
}
1519

1520
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
83,630,569✔
1521
  SExchangeInfo* pExchangeInfo = pOperator->info;
83,630,569✔
1522
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
83,630,569✔
1523

1524
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
83,630,569✔
1525
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
83,630,569✔
1526
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
83,630,569✔
1527
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1528
         pLoadInfo->totalElapsed / 1000.0);
1529

1530
  setOperatorCompleted(pOperator);
83,630,569✔
1531
}
83,630,569✔
1532

1533
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
265,974,998✔
1534
  int32_t code = TSDB_CODE_SUCCESS;
265,974,998✔
1535
  int32_t lino = 0;
265,974,998✔
1536
  size_t  total = taosArrayGetSize(pArray);
265,974,998✔
1537

1538
  int32_t completed = 0;
265,975,123✔
1539
  for (int32_t k = 0; k < total; ++k) {
747,413,773✔
1540
    SSourceDataInfo* p = taosArrayGet(pArray, k);
481,441,226✔
1541
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
481,441,551✔
1542
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
481,441,551✔
1543
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
233,688,162✔
1544
      completed += 1;
233,688,515✔
1545
    }
1546
  }
1547

1548
  *pRes = completed;
265,972,547✔
1549
_end:
265,974,631✔
1550
  if (code != TSDB_CODE_SUCCESS) {
265,974,631✔
1551
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1552
  }
1553
  return code;
265,974,631✔
1554
}
1555

1556
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
101,675,698✔
1557
  SExchangeInfo* pExchangeInfo = pOperator->info;
101,675,698✔
1558
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
101,675,472✔
1559

1560
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
101,675,345✔
1561
  int64_t startTs = taosGetTimestampUs();
101,673,903✔
1562

1563
  // Asynchronously send all fetch requests to all sources.
1564
  for (int32_t i = 0; i < totalSources; ++i) {
260,738,975✔
1565
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
159,064,435✔
1566
    if (code != TSDB_CODE_SUCCESS) {
159,065,789✔
1567
      pTaskInfo->code = code;
717✔
1568
      return code;
×
1569
    }
1570
  }
1571

1572
  int64_t endTs = taosGetTimestampUs();
101,676,688✔
1573
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
101,676,688✔
1574
         totalSources, (endTs - startTs) / 1000.0);
1575

1576
  pOperator->status = OP_RES_TO_RETURN;
101,676,187✔
1577
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
101,675,470✔
1578
  if (isTaskKilled(pTaskInfo)) {
101,677,041✔
1579
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1580
  }
1581

1582
  return TSDB_CODE_SUCCESS;
101,677,041✔
1583
}
1584

1585
/**
1586
  @brief store STEP DONE notification info
1587
*/
1588
void storeNotifyInfo(SOperatorInfo* pOperator) {
4,006,284✔
1589
  SExchangeInfo*  pExchangeInfo = pOperator->info;
4,006,284✔
1590
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4,006,284✔
1591
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,006,284✔
1592

1593
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
4,006,284✔
1594
  if (!pParam->multiParams) {
4,006,284✔
1595
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
4,006,284✔
1596
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
4,006,284✔
1597
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1598
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
1599
      return;
×
1600
    }
1601

1602
    pExchangeInfo->notifyToSend = true;
4,006,284✔
1603
    pExchangeInfo->notifyTs = pBasic->notifyTs;
4,006,284✔
1604
  } else {
1605
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1606
           GET_TASKID(pTaskInfo), __func__);
1607
  }
1608
}
1609

1610
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
147,891,335✔
1611
  int32_t            code = TSDB_CODE_SUCCESS;
147,891,335✔
1612
  int32_t            lino = 0;
147,891,335✔
1613
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
147,891,335✔
1614
  SSDataBlock*       pb = NULL;
147,891,335✔
1615

1616
  char* pNextStart = pRetrieveRsp->data;
147,891,335✔
1617
  char* pStart = pNextStart;
147,891,335✔
1618

1619
  int32_t index = 0;
147,891,335✔
1620

1621
  if (pRetrieveRsp->compressed) {  // decompress the data
147,891,335✔
1622
    if (pDataInfo->decompBuf == NULL) {
×
1623
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1624
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1625
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1626
    } else {
1627
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1628
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1629
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1630
        if (p != NULL) {
×
1631
          pDataInfo->decompBuf = p;
×
1632
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1633
        }
1634
      }
1635
    }
1636
  }
1637

1638
  while (index++ < pRetrieveRsp->numOfBlocks) {
573,483,237✔
1639
    pStart = pNextStart;
425,592,513✔
1640

1641
    if (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN) {
425,592,513✔
1642
      pb = taosMemoryCalloc(1, sizeof(SSDataBlock));
16,805,928✔
1643
      QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
16,805,928✔
1644
    } else if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
408,786,365✔
1645
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
148,906,452✔
1646
      blockDataCleanup(pb);
148,906,452✔
1647
    } else {
1648
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
259,880,876✔
1649
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
259,881,375✔
1650
    }
1651

1652
    int32_t compLen = *(int32_t*)pStart;
425,593,303✔
1653
    pStart += sizeof(int32_t);
425,589,922✔
1654

1655
    int32_t rawLen = *(int32_t*)pStart;
425,590,721✔
1656
    pStart += sizeof(int32_t);
425,590,721✔
1657
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
425,591,774✔
1658

1659
    pNextStart = pStart + compLen;
425,591,774✔
1660
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
425,592,000✔
1661
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1662
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1663
      pStart = pDataInfo->decompBuf;
×
1664
    }
1665

1666
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart, (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN));
425,592,000✔
1667
    if (code != 0) {
425,591,561✔
1668
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1669
      goto _end;
×
1670
    }
1671

1672
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
425,591,561✔
1673
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
425,591,809✔
1674
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
425,591,809✔
1675
    pb = NULL;
425,591,902✔
1676
  }
1677

1678
_end:
147,890,982✔
1679
  if (code != TSDB_CODE_SUCCESS) {
147,890,982✔
1680
    blockDataDestroy(pb);
×
1681
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1682
  }
1683
  return code;
147,890,982✔
1684
}
1685

1686
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
27,273,293✔
1687
  SExchangeInfo* pExchangeInfo = pOperator->info;
27,273,293✔
1688
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
27,273,293✔
1689

1690
  int32_t code = 0;
27,273,293✔
1691
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
27,273,293✔
1692
  int64_t startTs = taosGetTimestampUs();
27,273,293✔
1693

1694
  int32_t vgId = 0;
27,273,293✔
1695
  if (pExchangeInfo->dynTbname) {
27,273,293✔
1696
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
79,769✔
1697
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
79,769✔
1698
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
79,769✔
1699
      if (pValue != NULL && pValue->isTbname) {
79,769✔
1700
        vgId = pValue->vgId;
79,769✔
1701
        break;
79,769✔
1702
      }
1703
    }
1704
  }
1705

1706
  while (1) {
6,874,581✔
1707
    if (pExchangeInfo->current >= totalSources) {
34,147,874✔
1708
      setAllSourcesCompleted(pOperator);
6,850,452✔
1709
      return TSDB_CODE_SUCCESS;
6,850,452✔
1710
    }
1711

1712
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
27,297,422✔
1713
    if (!pSource) {
27,297,422✔
1714
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1715
      pTaskInfo->code = terrno;
×
1716
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1717
    }
1718

1719
    if (vgId != 0 && pSource->addr.nodeId != vgId){
27,297,422✔
1720
      pExchangeInfo->current += 1;
57,169✔
1721
      continue;
57,169✔
1722
    }
1723

1724
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
27,240,253✔
1725
    if (!pDataInfo) {
27,240,253✔
1726
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1727
      pTaskInfo->code = terrno;
×
1728
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1729
    }
1730
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
27,240,253✔
1731

1732
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
27,240,253✔
1733
    if (code != TSDB_CODE_SUCCESS) {
27,240,253✔
1734
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1735
      pTaskInfo->code = code;
×
1736
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1737
    }
1738

1739
    while (true) {
1,149✔
1740
      code = exchangeWait(pOperator, pExchangeInfo);
27,241,402✔
1741
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
27,241,402✔
1742
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,149✔
1743
      }
1744

1745
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
27,240,253✔
1746
      if (pDataInfo->seqId != currSeqId) {
27,240,253✔
1747
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
1,149✔
1748
        taosMemoryFreeClear(pDataInfo->pRsp);
1,149✔
1749
        continue;
1,149✔
1750
      }
1751

1752
      break;
27,239,104✔
1753
    }
1754

1755
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
27,239,104✔
1756
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
634✔
1757
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1758
             tstrerror(pDataInfo->code));
1759
      pOperator->pTaskInfo->code = pDataInfo->code;
634✔
1760
      return pOperator->pTaskInfo->code;
634✔
1761
    }
1762

1763
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
27,238,470✔
1764
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
27,238,470✔
1765

1766
    if (pRsp->numOfRows == 0) {
27,238,470✔
1767
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
6,817,412✔
1768
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1769
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1770
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1771

1772
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
6,817,412✔
1773
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
6,817,412✔
1774
        pExchangeInfo->current = totalSources;
6,734,698✔
1775
      } else {
1776
        pExchangeInfo->current += 1;
82,714✔
1777
      }
1778
      taosMemoryFreeClear(pDataInfo->pRsp);
6,817,412✔
1779
      continue;
6,817,412✔
1780
    }
1781

1782
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
20,421,058✔
1783
    if (code != TSDB_CODE_SUCCESS) {
20,421,058✔
1784
      goto _error;
×
1785
    }
1786

1787
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
20,421,058✔
1788
    if (pRsp->completed == 1) {
20,421,058✔
1789
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
114,213✔
1790
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1791
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1792
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1793
             pExchangeInfo->current + 1, totalSources);
1794

1795
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
114,213✔
1796
      if (isVstbScan(pDataInfo)) {
114,213✔
1797
        pExchangeInfo->current = totalSources;
×
1798
      } else {
1799
        pExchangeInfo->current += 1;
114,213✔
1800
      }
1801
    } else {
1802
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
20,306,845✔
1803
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1804
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1805
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1806
    }
1807
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
20,421,058✔
1808
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
20,206,137✔
1809
    }
1810
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
20,421,058✔
1811
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
20,421,058✔
1812

1813
    taosMemoryFreeClear(pDataInfo->pRsp);
20,421,058✔
1814
    return TSDB_CODE_SUCCESS;
20,421,058✔
1815
  }
1816

1817
_error:
×
1818
  pTaskInfo->code = code;
×
1819
  return code;
×
1820
}
1821

1822
void clearVtbScanDataInfo(void* pItem) {
5,715,968✔
1823
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
5,715,968✔
1824
  if (pInfo->orgTbInfo) {
5,715,968✔
1825
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
×
1826
    taosMemoryFreeClear(pInfo->orgTbInfo);
×
1827
  }
1828
  if (pInfo->batchOrgTbInfo) {
5,715,968✔
1829
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
×
1830
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
×
1831
      if (pColMap) {
×
1832
        taosArrayDestroy(pColMap->colMap);
×
1833
      }
1834
    }
1835
    taosArrayDestroy(pInfo->batchOrgTbInfo);
×
1836
  }
1837
  if (pInfo->tagList) {
5,715,968✔
1838
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
×
1839
    pInfo->tagList = NULL;
×
1840
  }
1841
  taosArrayDestroy(pInfo->pSrcUidList);
5,715,968✔
1842
}
5,715,968✔
1843

1844
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
4,086,898✔
1845
  int32_t  code = TSDB_CODE_SUCCESS;
4,086,898✔
1846
  int32_t  lino = 0;
4,086,898✔
1847
  STagVal  dstTag;
4,086,898✔
1848
  bool     needFree = false;
4,086,898✔
1849

1850
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
4,086,898✔
1851
    qError("%s failed since invalid exchange operator param type %d",
×
1852
      __func__, pBasicParam->paramType);
1853
    return TSDB_CODE_INVALID_PARA;
×
1854
  }
1855

1856
  if (pDataInfo->tagList) {
4,086,898✔
1857
    taosArrayClear(pDataInfo->tagList);
×
1858
  }
1859

1860
  if (pBasicParam->tagList) {
4,086,898✔
1861
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
1,892,152✔
1862
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
1,892,152✔
1863

1864
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
12,791,156✔
1865
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
10,899,004✔
1866
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
10,899,004✔
1867

1868
      dstTag = (STagVal){0};
10,899,004✔
1869
      dstTag.type = pSrcTag->type;
10,899,004✔
1870
      dstTag.cid = pSrcTag->cid;
10,899,004✔
1871
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
10,899,004✔
1872
        dstTag.nData = pSrcTag->nData;
4,780,728✔
1873
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
4,780,728✔
1874
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
4,780,728✔
1875
        needFree = true;
4,780,728✔
1876
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
4,780,728✔
1877
      } else {
1878
        dstTag.i64 = pSrcTag->i64;
6,118,276✔
1879
      }
1880

1881
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
21,798,008✔
1882
      needFree = false;
10,899,004✔
1883
    }
1884
  } else {
1885
    pDataInfo->tagList = NULL;
2,194,746✔
1886
  }
1887

1888
  return code;
4,086,898✔
1889
_return:
×
1890
  if (needFree) {
×
1891
    taosMemoryFreeClear(dstTag.pData);
×
1892
  }
1893
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1894
  return code;
×
1895
}
1896

1897
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
4,086,898✔
1898
  int32_t     code = TSDB_CODE_SUCCESS;
4,086,898✔
1899
  int32_t     lino = 0;
4,086,898✔
1900
  SOrgTbInfo  dstOrgTbInfo = {0};
4,086,898✔
1901
  bool        needFree = false;
4,086,898✔
1902

1903
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
4,086,898✔
1904
    qError("%s failed since invalid exchange operator param type %d",
×
1905
      __func__, pBasicParam->paramType);
1906
    return TSDB_CODE_INVALID_PARA;
×
1907
  }
1908

1909
  if (pBasicParam->batchOrgTbInfo) {
4,086,898✔
1910
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
4,086,898✔
1911
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
4,086,898✔
1912

1913
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
10,943,479✔
1914
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
6,856,581✔
1915
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
6,856,581✔
1916

1917
      dstOrgTbInfo = (SOrgTbInfo){0};
6,856,581✔
1918
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
6,856,581✔
1919
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
6,856,581✔
1920

1921
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
6,856,581✔
1922
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
6,856,581✔
1923

1924
      needFree = true;
6,856,581✔
1925
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
13,713,162✔
1926
      needFree = false;
6,856,581✔
1927
    }
1928
  } else {
1929
    pBasicParam->batchOrgTbInfo = NULL;
×
1930
  }
1931

1932
  return code;
4,086,898✔
1933
_return:
×
1934
  if (needFree) {
×
1935
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
1936
  }
1937
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1938
  return code;
×
1939
}
1940

1941
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
32,169,380✔
1942
                                SExchangeOperatorBasicParam* pBasicParam) {
1943
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
32,169,380✔
1944
    qWarn("%s, %s found invalid exchange operator param type %d",
×
1945
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
1946
    return TSDB_CODE_SUCCESS;
×
1947
  }
1948

1949
  int32_t            code = TSDB_CODE_SUCCESS;
32,169,380✔
1950
  int32_t            lino = 0;
32,169,380✔
1951
  SExchangeInfo*     pExchangeInfo = pOperator->info;
32,169,380✔
1952
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
32,169,380✔
1953

1954
  if (NULL == pIdx) {
32,169,380✔
1955
    if (pBasicParam->isNewDeployed) {
2,453✔
1956
      SDownstreamSourceNode *pNode = NULL;
2,453✔
1957
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,453✔
1958
      QUERY_CHECK_CODE(code, lino, _return);
2,453✔
1959

1960
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,453✔
1961
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,453✔
1962
      QUERY_CHECK_CODE(code, lino, _return);
2,453✔
1963

1964
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,453✔
1965
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,453✔
1966

1967
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,453✔
1968
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,453✔
1969
      if (pExchangeInfo->pHashSources) {
2,453✔
1970
        QUERY_CHECK_CODE(code, lino, _return);
2,453✔
1971
      }
1972
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,453✔
1973
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,453✔
1974
    } else {
1975
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1976
      return TSDB_CODE_INVALID_PARA;
×
1977
    }
1978
  }
1979

1980
  qDebug("start to add single exchange source");
32,169,380✔
1981

1982
  switch (pBasicParam->type) {
32,169,380✔
1983
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
4,086,898✔
1984
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1985
      if (pIdx->inUseIdx < 0) {
4,086,898✔
1986
        SSourceDataInfo dataInfo = {0};
1,971,398✔
1987
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,971,398✔
1988
        dataInfo.taskId = pExchangeInfo->pTaskId;
1,971,398✔
1989
        dataInfo.index = pIdx->srcIdx;
1,971,398✔
1990
        dataInfo.groupid = pBasicParam->groupid;
1,971,398✔
1991
        dataInfo.window = pBasicParam->window;
1,971,398✔
1992
        dataInfo.isNewParam = pBasicParam->isNewParam;
1,971,398✔
1993
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
1,971,398✔
1994
        QUERY_CHECK_CODE(code, lino, _return);
1,971,398✔
1995

1996
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
1,971,398✔
1997
        QUERY_CHECK_CODE(code, lino, _return);
1,971,398✔
1998

1999
        dataInfo.orgTbInfo = NULL;
1,971,398✔
2000

2001
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,971,398✔
2002
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
1,971,398✔
2003

2004
        dataInfo.type = pBasicParam->type;
1,971,398✔
2005
        dataInfo.srcOpType = pBasicParam->srcOpType;
1,971,398✔
2006
        dataInfo.tableSeq = pBasicParam->tableSeq;
1,971,398✔
2007

2008
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
3,942,796✔
2009

2010
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
1,971,398✔
2011
      } else {
2012
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,115,500✔
2013
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,115,500✔
2014

2015
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,115,500✔
2016
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,115,500✔
2017
        }
2018

2019
        pDataInfo->taskId = pExchangeInfo->pTaskId;
2,115,500✔
2020
        pDataInfo->index = pIdx->srcIdx;
2,115,500✔
2021
        pDataInfo->window = pBasicParam->window;
2,115,500✔
2022
        pDataInfo->groupid = pBasicParam->groupid;
2,115,500✔
2023
        pDataInfo->isNewParam = pBasicParam->isNewParam;
2,115,500✔
2024

2025
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
2,115,500✔
2026
        QUERY_CHECK_CODE(code, lino, _return);
2,115,500✔
2027

2028
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
2,115,500✔
2029
        QUERY_CHECK_CODE(code, lino, _return);
2,115,500✔
2030

2031
        pDataInfo->orgTbInfo = NULL;
2,115,500✔
2032

2033
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,115,500✔
2034
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,115,500✔
2035

2036
        pDataInfo->type = pBasicParam->type;
2,115,500✔
2037
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,115,500✔
2038
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,115,500✔
2039
      }
2040
      break;
4,086,898✔
2041
    }
2042
    case EX_SRC_TYPE_VTB_WIN_SCAN:
4,169,939✔
2043
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2044
      SSourceDataInfo dataInfo = {0};
4,169,939✔
2045
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
4,169,939✔
2046
      dataInfo.taskId = pExchangeInfo->pTaskId;
4,169,939✔
2047
      dataInfo.index = pIdx->srcIdx;
4,169,939✔
2048
      dataInfo.window = pBasicParam->window;
4,169,939✔
2049
      dataInfo.groupid = 0;
4,169,939✔
2050
      dataInfo.orgTbInfo = NULL;
4,169,939✔
2051
      dataInfo.tagList = NULL;
4,169,939✔
2052

2053
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
4,169,939✔
2054
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
4,169,939✔
2055

2056
      dataInfo.isNewParam = false;
4,169,939✔
2057
      dataInfo.type = pBasicParam->type;
4,169,939✔
2058
      dataInfo.srcOpType = pBasicParam->srcOpType;
4,169,939✔
2059
      dataInfo.tableSeq = pBasicParam->tableSeq;
4,169,939✔
2060

2061
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
4,169,939✔
2062
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
8,339,878✔
2063
      break;
4,169,939✔
2064
    }
2065
    case EX_SRC_TYPE_VSTB_SCAN: {
23,671,468✔
2066
      SSourceDataInfo dataInfo = {0};
23,671,468✔
2067
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
23,671,468✔
2068
      dataInfo.taskId = pExchangeInfo->pTaskId;
23,671,468✔
2069
      dataInfo.index = pIdx->srcIdx;
23,671,468✔
2070
      dataInfo.window = pBasicParam->window;
23,671,468✔
2071
      dataInfo.groupid = 0;
23,671,468✔
2072
      dataInfo.isNewParam = pBasicParam->isNewParam;
23,671,468✔
2073
      dataInfo.tagList = NULL;
23,671,468✔
2074
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
23,671,468✔
2075
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
23,671,468✔
2076
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
23,671,468✔
2077
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
23,671,468✔
2078
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
23,671,468✔
2079
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
23,671,468✔
2080

2081
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
23,671,468✔
2082
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
23,671,468✔
2083

2084
      dataInfo.type = pBasicParam->type;
23,671,468✔
2085
      dataInfo.srcOpType = pBasicParam->srcOpType;
23,671,468✔
2086
      dataInfo.tableSeq = pBasicParam->tableSeq;
23,671,468✔
2087

2088
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
23,671,468✔
2089
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
47,342,936✔
2090
      break;
23,671,468✔
2091
    }
2092
    case EX_SRC_TYPE_STB_JOIN_SCAN:
241,075✔
2093
    default: {
2094
      if (pIdx->inUseIdx < 0) {
241,075✔
2095
        SSourceDataInfo dataInfo = {0};
238,801✔
2096
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
238,801✔
2097
        dataInfo.taskId = pExchangeInfo->pTaskId;
238,801✔
2098
        dataInfo.index = pIdx->srcIdx;
238,801✔
2099
        dataInfo.groupid = 0;
238,801✔
2100
        dataInfo.tagList = NULL;
238,801✔
2101

2102
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
238,801✔
2103
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
238,801✔
2104

2105
        dataInfo.isNewParam = false;
238,801✔
2106
        dataInfo.type = pBasicParam->type;
238,801✔
2107
        dataInfo.srcOpType = pBasicParam->srcOpType;
238,801✔
2108
        dataInfo.tableSeq = pBasicParam->tableSeq;
238,801✔
2109

2110
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
477,602✔
2111

2112
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
238,801✔
2113
      } else {
2114
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,274✔
2115
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,274✔
2116
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,274✔
2117
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,274✔
2118
        }
2119

2120
        pDataInfo->tagList = NULL;
2,274✔
2121
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,274✔
2122
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,274✔
2123

2124
        pDataInfo->groupid = 0;
2,274✔
2125
        pDataInfo->isNewParam = false;
2,274✔
2126
        pDataInfo->type = pBasicParam->type;
2,274✔
2127
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,274✔
2128
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,274✔
2129
      }
2130
      break;
241,075✔
2131
    }
2132
  }
2133

2134
  return code;
32,169,380✔
2135
_return:
×
2136
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2137
  return code;
×
2138
}
2139

2140
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
31,745,019✔
2141
  SExchangeInfo*               pExchangeInfo = pOperator->info;
31,745,019✔
2142
  int32_t                      code = TSDB_CODE_SUCCESS;
31,745,019✔
2143
  SExchangeOperatorBasicParam* pBasicParam = NULL;
31,745,019✔
2144
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
31,745,019✔
2145
  if (pParam->multiParams) {
31,745,019✔
2146
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
3,899,064✔
2147
    int32_t                      iter = 0;
3,899,064✔
2148
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
8,222,489✔
2149
      code = addSingleExchangeSource(pOperator, pBasicParam);
4,323,425✔
2150
      if (code) {
4,323,425✔
2151
        return code;
×
2152
      }
2153
    }
2154
  } else {
2155
    pBasicParam = &pParam->basic;
27,845,955✔
2156
    code = addSingleExchangeSource(pOperator, pBasicParam);
27,845,955✔
2157
  }
2158

2159
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
31,745,019✔
2160
  pOperator->pOperatorGetParam = NULL;
31,745,019✔
2161

2162
  return code;
31,745,019✔
2163
}
2164

2165
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
558,642,346✔
2166
  SExchangeInfo* pExchangeInfo = pOperator->info;
558,642,346✔
2167
  int32_t        code = TSDB_CODE_SUCCESS;
558,650,408✔
2168
  int32_t        lino = 0;
558,650,408✔
2169
  
2170
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
558,650,408✔
2171
       NULL == pOperator->pOperatorGetParam) ||
421,735,594✔
2172
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
140,918,652✔
2173
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
423,688,113✔
2174
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2175
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2176
    return TSDB_CODE_SUCCESS;
423,712,834✔
2177
  }
2178

2179
  if (pExchangeInfo->dynamicOp) {
134,961,874✔
2180
    code = addDynamicExchangeSource(pOperator);
31,745,019✔
2181
    QUERY_CHECK_CODE(code, lino, _end);
31,745,019✔
2182
  }
2183

2184
  if (pOperator->status == OP_NOT_OPENED &&
134,937,999✔
2185
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
125,365,583✔
2186
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
110,544,580✔
2187
    pExchangeInfo->current = 0;
30,648,380✔
2188
  }
2189

2190
  if (NULL != pOperator->pOperatorGetParam) {
134,937,389✔
2191
    storeNotifyInfo(pOperator);
4,006,284✔
2192
    /**
2193
      The param is referenced by getParam, and it will be freed by
2194
      the parent operator after getting next block.
2195
    */
2196
    pOperator->pOperatorGetParam->reUse = false;
4,006,284✔
2197
    pOperator->pOperatorGetParam = NULL;
4,006,284✔
2198
  }
2199

2200
  int64_t st = taosGetTimestampUs();
134,939,603✔
2201

2202
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
134,939,603✔
2203
    code = prepareConcurrentlyLoad(pOperator);
101,675,577✔
2204
    QUERY_CHECK_CODE(code, lino, _end);
101,676,040✔
2205
    pExchangeInfo->openedTs = taosGetTimestampUs();
101,676,541✔
2206
  }
2207

2208
  OPTR_SET_OPENED(pOperator);
134,938,555✔
2209
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
134,938,494✔
2210

2211
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
134,938,405✔
2212

2213
_end:
49,243,847✔
2214
  if (code != TSDB_CODE_SUCCESS) {
134,938,792✔
2215
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2216
    pOperator->pTaskInfo->code = code;
×
2217
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2218
  }
2219
  return code;
134,938,792✔
2220
}
2221

2222
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
4,185,532✔
2223
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,185,532✔
2224

2225
  if (pLimitInfo->remainGroupOffset > 0) {
4,185,532✔
2226
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2227
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2228
      blockDataCleanup(pBlock);
×
2229
      return PROJECT_RETRIEVE_CONTINUE;
×
2230
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2231
      // now it is the data from a new group
2232
      pLimitInfo->remainGroupOffset -= 1;
×
2233

2234
      // ignore data block in current group
2235
      if (pLimitInfo->remainGroupOffset > 0) {
×
2236
        blockDataCleanup(pBlock);
×
2237
        return PROJECT_RETRIEVE_CONTINUE;
×
2238
      }
2239
    }
2240

2241
    // set current group id of the project operator
2242
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2243
  }
2244

2245
  // here check for a new group data, we need to handle the data of the previous group.
2246
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
4,185,532✔
2247
    pLimitInfo->numOfOutputGroups += 1;
294,108✔
2248
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
294,108✔
2249
      pOperator->status = OP_EXEC_DONE;
×
2250
      blockDataCleanup(pBlock);
×
2251

2252
      return PROJECT_RETRIEVE_DONE;
×
2253
    }
2254

2255
    // reset the value for a new group data
2256
    resetLimitInfoForNextGroup(pLimitInfo);
294,108✔
2257
    // existing rows that belongs to previous group.
2258
    if (pBlock->info.rows > 0) {
294,108✔
2259
      return PROJECT_RETRIEVE_DONE;
294,108✔
2260
    }
2261
  }
2262

2263
  // here we reach the start position, according to the limit/offset requirements.
2264

2265
  // set current group id
2266
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,891,424✔
2267

2268
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,891,424✔
2269
  if (pBlock->info.rows == 0) {
3,891,424✔
2270
    return PROJECT_RETRIEVE_CONTINUE;
2,040,375✔
2271
  } else {
2272
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,851,049✔
2273
      setOperatorCompleted(pOperator);
×
2274
      return PROJECT_RETRIEVE_DONE;
×
2275
    }
2276
  }
2277

2278
  // todo optimize performance
2279
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2280
  // they may not belong to the same group the limit/offset value is not valid in this case.
2281
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,851,049✔
2282
    return PROJECT_RETRIEVE_DONE;
1,851,049✔
2283
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2284
    return PROJECT_RETRIEVE_CONTINUE;
×
2285
  }
2286
}
2287

2288
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
195,811,522✔
2289
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
195,811,522✔
2290
  int32_t        code = TSDB_CODE_SUCCESS;
195,812,006✔
2291
  if (pTask->pWorkerCb) {
195,812,006✔
2292
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
195,810,591✔
2293
    if (code != TSDB_CODE_SUCCESS) {
195,812,006✔
2294
      pTask->code = code;
×
2295
      return pTask->code;
×
2296
    }
2297
  }
2298

2299
  code = tsem_wait(&pExchangeInfo->ready);
195,812,712✔
2300
  if (code != TSDB_CODE_SUCCESS) {
195,811,365✔
2301
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2302
    pTask->code = code;
×
2303
    return pTask->code;
×
2304
  }
2305

2306
  if (pTask->pWorkerCb) {
195,811,365✔
2307
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
195,812,712✔
2308
    if (code != TSDB_CODE_SUCCESS) {
195,812,486✔
2309
      pTask->code = code;
×
2310
      return pTask->code;
×
2311
    }
2312
  }
2313
  return TSDB_CODE_SUCCESS;
195,812,228✔
2314
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc