• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4961

09 Feb 2026 01:16AM UTC coverage: 66.798% (-0.08%) from 66.88%
#4961

push

travis-ci

web-flow
docs: add support for recording STMT to CSV files (#34276)

* docs: add support for recording STMT to CSV files

* docs: update version for STMT recording feature in CSV files

205534 of 307696 relevant lines covered (66.8%)

127069311.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.45
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
} SSourceDataInfo;
58

59
static void destroyExchangeOperatorInfo(void* param);
60
static void freeBlock(void* pParam);
61
static void freeSourceDataInfo(void* param);
62
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
63

64
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
65
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
66
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
67
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
68
static void    storeNotifyInfo(SOperatorInfo* pOperator);
69
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
70
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
71
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
72
                                 bool holdDataInBuf);
73
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
74

75
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
76

77
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
24,649,258✔
78
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
79
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
80
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
15,436,341✔
81
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
82

83

84
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
9,359,445✔
85
                                             SExchangeInfo* pExchangeInfo,
86
                                             SExecTaskInfo* pTaskInfo) {
87
  int32_t code = 0;
9,359,445✔
88
  int32_t lino = 0;
9,359,445✔
89
  int64_t startTs = taosGetTimestampUs();  
9,360,544✔
90
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
9,360,544✔
91
  int32_t completed = 0;
9,360,988✔
92
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
9,360,988✔
93
  if (code != TSDB_CODE_SUCCESS) {
9,360,543✔
94
    pTaskInfo->code = code;
×
95
    T_LONG_JMP(pTaskInfo->env, code);
×
96
  }
97
  if (completed == totalSources) {
9,360,543✔
98
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
1,811,614✔
99
    setAllSourcesCompleted(pOperator);
1,811,614✔
100
    return;
1,813,922✔
101
  }
102

103
  SSourceDataInfo* pDataInfo = NULL;
7,548,929✔
104

105
  while (1) {
4,130,062✔
106
    if (pExchangeInfo->current < 0) {
11,678,991✔
107
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
88,467✔
108
      setAllSourcesCompleted(pOperator);
88,467✔
109
      return;
88,467✔
110
    }
111
    
112
    if (pExchangeInfo->current >= totalSources) {
11,590,749✔
113
      completed = 0;
5,428,801✔
114
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
5,428,801✔
115
      if (code != TSDB_CODE_SUCCESS) {
5,428,577✔
116
        pTaskInfo->code = code;
×
117
        T_LONG_JMP(pTaskInfo->env, code);
×
118
      }
119
      if (completed == totalSources) {
5,428,577✔
120
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
3,669,392✔
121
        setAllSourcesCompleted(pOperator);
3,669,392✔
122
        return;
3,669,392✔
123
      }
124
      
125
      pExchangeInfo->current = 0;
1,759,185✔
126
    }
127

128
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
7,920,462✔
129

130
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
7,920,462✔
131
    if (!pDataInfo) {
7,920,939✔
132
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
133
      pTaskInfo->code = terrno;
×
134
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
135
    }
136

137
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
7,920,939✔
138
      pExchangeInfo->current++;
975✔
139
      continue;
975✔
140
    }
141

142
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
7,919,495✔
143

144
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
7,919,711✔
145
    if (code != TSDB_CODE_SUCCESS) {
7,920,822✔
146
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
147
      pTaskInfo->code = code;
×
148
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
149
    }
150

151
    while (true) {
732✔
152
      code = exchangeWait(pOperator, pExchangeInfo);
7,921,554✔
153
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
7,920,602✔
154
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
220✔
155
      }
156

157
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
7,920,602✔
158
      if (pDataInfo->seqId != currSeqId) {
7,920,822✔
159
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
732✔
160
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
161
        taosMemoryFreeClear(pDataInfo->pRsp);
732✔
162
        continue;
732✔
163
      }
164

165
      break;
7,919,870✔
166
    }
167

168
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
7,919,870✔
169
    if (!pSource) {
7,919,870✔
170
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
171
      pTaskInfo->code = terrno;
×
172
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
173
    }
174

175
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
7,919,870✔
176
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
177
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
178
             tstrerror(pDataInfo->code));
179
      pTaskInfo->code = pDataInfo->code;
×
180
      T_LONG_JMP(pTaskInfo->env, code);
×
181
    }
182

183
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
7,919,870✔
184
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
7,920,090✔
185

186
    if (pRsp->numOfRows == 0) {
7,920,090✔
187
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
4,129,087✔
188
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
189
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
190
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
191

192
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
4,129,087✔
193
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
4,129,087✔
194
        pExchangeInfo->current = -1;
88,467✔
195
      } else {
196
        pExchangeInfo->current += 1;
4,040,620✔
197
      }
198
      taosMemoryFreeClear(pDataInfo->pRsp);
4,129,087✔
199
      continue;
4,129,087✔
200
    }
201

202
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,791,003✔
203
    TAOS_CHECK_EXIT(code);
3,791,003✔
204

205
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,791,003✔
206
    if (pRsp->completed == 1) {
3,790,782✔
207
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,909,514✔
208
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
209
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
210
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
211
             pExchangeInfo->current + 1, totalSources);
212

213
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,909,514✔
214
      if (isVstbScan(pDataInfo)) {
1,909,514✔
215
        pExchangeInfo->current = -1;
×
216
        taosMemoryFreeClear(pDataInfo->pRsp);
×
217
        continue;
×
218
      }
219
    } else {
220
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
1,881,489✔
221
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
222
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
223
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
224
    }
225

226
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
3,791,003✔
227
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,791,003✔
228

229
    pExchangeInfo->current++;
3,791,003✔
230

231
    taosMemoryFreeClear(pDataInfo->pRsp);
3,791,003✔
232
    return;
3,791,003✔
233
  }
234

235
_exit:
×
236

237
  if (code) {
×
238
    pTaskInfo->code = code;
×
239
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
240
  }
241
}
242

243

244
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
231,591,257✔
245
                                           SExecTaskInfo* pTaskInfo) {
246
  int32_t code = 0;
231,591,257✔
247
  int32_t lino = 0;
231,591,257✔
248
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
231,591,257✔
249
  int32_t completed = 0;
231,589,311✔
250
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
231,590,544✔
251
  if (code != TSDB_CODE_SUCCESS) {
231,586,434✔
252
    pTaskInfo->code = code;
×
253
    T_LONG_JMP(pTaskInfo->env, code);
×
254
  }
255
  if (completed == totalSources) {
231,586,434✔
256
    setAllSourcesCompleted(pOperator);
76,482,034✔
257
    return;
76,483,018✔
258
  }
259

260
  SSourceDataInfo* pDataInfo = NULL;
155,104,400✔
261

262
  while (1) {
24,169,245✔
263
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
179,273,645✔
264
    code = exchangeWait(pOperator, pExchangeInfo);
179,275,513✔
265

266
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
179,282,549✔
267
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
2,076✔
268
    }
269

270
    for (int32_t i = 0; i < totalSources; ++i) {
333,130,875✔
271
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
333,128,840✔
272
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
333,128,939✔
273
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
333,128,939✔
274
        continue;
132,386,103✔
275
      }
276

277
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
200,741,837✔
278
        continue;
21,464,299✔
279
      }
280

281
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
179,279,949✔
282
      if (pDataInfo->seqId != currSeqId) {
179,279,464✔
283
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
284
        taosMemoryFreeClear(pDataInfo->pRsp);
×
285
        break;
×
286
      }
287

288
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
179,278,577✔
289
        code = pDataInfo->code;
1,563✔
290
        TAOS_CHECK_EXIT(code);
1,563✔
291
      }
292

293
      tmemory_barrier();
179,278,420✔
294
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
179,278,420✔
295
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
179,278,530✔
296
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
179,277,523✔
297

298
      // todo
299
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
179,277,523✔
300
      if (pRsp->numOfRows == 0) {
179,277,010✔
301
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
43,888,859✔
302
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
303
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
304
          if (code != TSDB_CODE_SUCCESS) {
×
305
            taosMemoryFreeClear(pDataInfo->pRsp);
×
306
            TAOS_CHECK_EXIT(code);
×
307
          }
308
        } else {
309
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
43,888,859✔
310
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
43,888,346✔
311
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
312
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
313
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
314
          taosMemoryFreeClear(pDataInfo->pRsp);
43,888,859✔
315
        }
316
        break;
43,888,859✔
317
      }
318

319
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
135,389,021✔
320

321
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
135,390,051✔
322
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
135,389,555✔
323
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
135,389,042✔
324

325
      if (pRsp->completed == 1) {
135,388,195✔
326
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
124,067,814✔
327
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
124,067,814✔
328
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
329
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
330
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
331
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
332
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
333
      } else {
334
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
11,321,385✔
335
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
336
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
337
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
338
      }
339

340
      taosMemoryFreeClear(pDataInfo->pRsp);
135,389,199✔
341

342
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
135,389,298✔
343
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
11,321,385✔
344
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
11,321,385✔
345
        if (code != TSDB_CODE_SUCCESS) {
11,321,385✔
346
          taosMemoryFreeClear(pDataInfo->pRsp);
×
347
          TAOS_CHECK_EXIT(code);
×
348
        }
349
      }
350
      
351
      return;
135,389,580✔
352
    }  // end loop
353

354
    int32_t complete1 = 0;
43,890,894✔
355
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
43,888,859✔
356
    if (code != TSDB_CODE_SUCCESS) {
43,888,859✔
357
      pTaskInfo->code = code;
×
358
      T_LONG_JMP(pTaskInfo->env, code);
×
359
    }
360
    if (complete1 == totalSources) {
43,888,859✔
361
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
19,719,614✔
362
      return;
19,719,118✔
363
    }
364
  }
365

366
_exit:
1,563✔
367

368
  if (code) {
1,563✔
369
    pTaskInfo->code = code;
1,563✔
370
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
1,563✔
371
  }
372
}
373

374
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
552,604,495✔
375
  int32_t        code = TSDB_CODE_SUCCESS;
552,604,495✔
376
  SExchangeInfo* pExchangeInfo = pOperator->info;
552,604,495✔
377
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
552,608,955✔
378

379
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
552,606,698✔
380

381
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
552,606,001✔
382
  if (pOperator->status == OP_EXEC_DONE) {
552,607,429✔
383
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
384
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
385
           pLoadInfo->totalElapsed / 1000.0);
386
    return NULL;
×
387
  }
388

389
  // we have buffered retrieved datablock, return it directly
390
  SSDataBlock* p = NULL;
552,604,348✔
391
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
552,601,519✔
392
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
282,885,529✔
393
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
282,883,244✔
394
  }
395

396
  if (p != NULL) {
552,608,168✔
397
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
282,891,098✔
398
    if (!tmp) {
282,886,727✔
399
      code = terrno;
×
400
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
401
      pTaskInfo->code = code;
×
402
      T_LONG_JMP(pTaskInfo->env, code);
×
403
    }
404
    return p;
282,886,727✔
405
  } else {
406
    if (pExchangeInfo->seqLoadData) {
269,717,070✔
407
      code = seqLoadRemoteData(pOperator);
28,767,895✔
408
      if (code != TSDB_CODE_SUCCESS) {
28,767,137✔
409
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
626✔
410
        pTaskInfo->code = code;
626✔
411
        T_LONG_JMP(pTaskInfo->env, code);
626✔
412
      }
413
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
240,948,243✔
414
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
9,360,988✔
415
    } else {
416
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
231,589,507✔
417
    }
418
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
269,719,730✔
419
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
1,563✔
420
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
1,563✔
421
    }
422
    
423
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
269,714,803✔
424
      qDebug("empty resultBlockList");
108,978,355✔
425
      return NULL;
108,978,355✔
426
    } else {
427
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
160,737,991✔
428
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
160,736,520✔
429
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
160,737,613✔
430
      if (!tmp) {
160,737,317✔
431
        code = terrno;
×
432
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
433
        pTaskInfo->code = code;
×
434
        T_LONG_JMP(pTaskInfo->env, code);
×
435
      }
436

437
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
160,737,317✔
438
      return p;
160,738,910✔
439
    }
440
  }
441
}
442

443
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
551,023,689✔
444
  int32_t        code = TSDB_CODE_SUCCESS;
551,023,689✔
445
  int32_t        lino = 0;
551,023,689✔
446
  SExchangeInfo* pExchangeInfo = pOperator->info;
551,023,689✔
447
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
551,036,361✔
448

449
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
551,029,426✔
450

451
  code = pOperator->fpSet._openFn(pOperator);
551,052,486✔
452
  QUERY_CHECK_CODE(code, lino, _end);
551,052,520✔
453

454
  if (pOperator->status == OP_EXEC_DONE) {
551,052,520✔
455
    (*ppRes) = NULL;
83,405✔
456
    return code;
83,405✔
457
  }
458

459
  while (1) {
1,637,551✔
460
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
552,609,321✔
461
    if (pBlock == NULL) {
552,602,265✔
462
      (*ppRes) = NULL;
108,978,355✔
463
      return code;
108,978,355✔
464
    }
465

466
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
443,623,910✔
467
    QUERY_CHECK_CODE(code, lino, _end);
443,626,121✔
468

469
    if (blockDataGetNumOfRows(pBlock) == 0) {
443,626,121✔
470
      qDebug("rows 0 block got, continue next load");
998✔
471
      continue;
998✔
472
    }
473

474
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
443,621,868✔
475
    if (hasLimitOffsetInfo(pLimitInfo)) {
443,622,004✔
476
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
3,479,753✔
477
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,479,753✔
478
        qDebug("limit retrieve continue");
1,636,553✔
479
        continue;
1,636,553✔
480
      } else if (status == PROJECT_RETRIEVE_DONE) {
1,843,200✔
481
        if (pBlock->info.rows == 0) {
1,843,200✔
482
          setOperatorCompleted(pOperator);
×
483
          (*ppRes) = NULL;
×
484
          return code;
×
485
        } else {
486
          (*ppRes) = pBlock;
1,843,200✔
487
          return code;
1,843,200✔
488
        }
489
      }
490
    } else {
491
      (*ppRes) = pBlock;
440,141,301✔
492
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
440,143,246✔
493
      return code;
440,144,399✔
494
    }
495
  }
496

497
_end:
×
498

499
  if (code != TSDB_CODE_SUCCESS) {
×
500
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
501
    pTaskInfo->code = code;
×
502
    T_LONG_JMP(pTaskInfo->env, code);
×
503
  } else {
504
    qDebug("empty block returned in exchange");
×
505
  }
506
  
507
  (*ppRes) = NULL;
×
508
  return code;
×
509
}
510

511
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
104,209,411✔
512
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
104,209,411✔
513
  if (pInfo->pSourceDataInfo == NULL) {
104,207,881✔
514
    return terrno;
×
515
  }
516

517
  if (pInfo->dynamicOp) {
104,210,199✔
518
    return TSDB_CODE_SUCCESS;
6,193,959✔
519
  }
520

521
  int32_t len = strlen(id) + 1;
98,020,270✔
522
  pInfo->pTaskId = taosMemoryCalloc(1, len);
98,020,270✔
523
  if (!pInfo->pTaskId) {
98,026,879✔
524
    return terrno;
×
525
  }
526
  tstrncpy(pInfo->pTaskId, id, len);
98,011,404✔
527
  for (int32_t i = 0; i < numOfSources; ++i) {
261,559,848✔
528
    SSourceDataInfo dataInfo = {0};
163,519,109✔
529
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
163,508,690✔
530
    dataInfo.taskId = pInfo->pTaskId;
163,508,690✔
531
    dataInfo.index = i;
163,519,415✔
532
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
163,519,415✔
533
    if (pDs == NULL) {
163,540,437✔
534
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
535
      return terrno;
×
536
    }
537
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
163,540,437✔
538
  }
539

540
  return TSDB_CODE_SUCCESS;
98,040,739✔
541
}
542

543
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
104,222,100✔
544
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
104,222,100✔
545

546
  if (numOfSources == 0) {
104,208,868✔
547
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
548
    return TSDB_CODE_INVALID_PARA;
×
549
  }
550
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
104,208,868✔
551
  if (!pInfo->pFetchRpcHandles) {
104,224,392✔
552
    return terrno;
×
553
  }
554
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
104,218,193✔
555
  if (!ret) {
104,215,403✔
556
    return terrno;
×
557
  }
558

559
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
104,215,403✔
560
  if (pInfo->pSources == NULL) {
104,215,342✔
561
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
562
    return terrno;
×
563
  }
564

565
  if (pExNode->node.dynamicOp) {
104,218,395✔
566
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,193,959✔
567
    if (NULL == pInfo->pHashSources) {
6,194,176✔
568
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
569
      return terrno;
×
570
    }
571
  }
572

573
  for (int32_t i = 0; i < numOfSources; ++i) {
281,380,412✔
574
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
177,150,309✔
575
    if (!pNode) {
177,137,837✔
576
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
577
      return terrno;
×
578
    }
579
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
177,137,837✔
580
    if (!tmp) {
177,168,232✔
581
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
582
      return terrno;
×
583
    }
584
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
177,168,232✔
585
    int32_t           code =
586
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
177,160,691✔
587
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
177,147,503✔
588
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
589
      return code;
×
590
    }
591
  }
592

593
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
104,230,103✔
594
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
104,211,272✔
595
  if (refId < 0) {
104,207,596✔
596
    int32_t code = terrno;
×
597
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
598
    return code;
×
599
  } else {
600
    pInfo->self = refId;
104,207,596✔
601
  }
602

603
  return initDataSource(numOfSources, pInfo, id);
104,208,129✔
604
}
605

606
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
8,593,359✔
607
  SExchangeInfo* pInfo = pOper->info;
8,593,359✔
608
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
8,594,015✔
609

610
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
8,594,015✔
611

612
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
8,594,232✔
613
  pOper->status = OP_NOT_OPENED;
8,594,232✔
614
  pInfo->current = 0;
8,594,232✔
615
  pInfo->loadInfo.totalElapsed = 0;
8,594,232✔
616
  pInfo->loadInfo.totalRows = 0;
8,593,791✔
617
  pInfo->loadInfo.totalSize = 0;
8,594,012✔
618
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
23,646,488✔
619
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
15,052,043✔
620
    taosWLockLatch(&pDataInfo->lock);
15,051,382✔
621
    taosMemoryFreeClear(pDataInfo->decompBuf);
15,052,697✔
622
    taosMemoryFreeClear(pDataInfo->pRsp);
15,052,697✔
623

624
    pDataInfo->totalRows = 0;
15,052,697✔
625
    pDataInfo->code = 0;
15,052,475✔
626
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
15,052,697✔
627
    pDataInfo->fetchSent = false;
15,052,697✔
628
    taosWUnLockLatch(&pDataInfo->lock);
15,052,477✔
629
  }
630

631
  if (pInfo->dynamicOp) {
8,594,232✔
632
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,407,394✔
633
  } 
634

635
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
8,594,232✔
636
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
8,594,012✔
637

638
  blockDataCleanup(pInfo->pDummyBlock);
8,594,232✔
639

640
  void   *data = NULL;
8,594,015✔
641
  int32_t iter = 0;
8,594,015✔
642
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
11,185,986✔
643
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
2,591,971✔
644
  }
645
  
646
  pInfo->limitInfo = (SLimitInfo){0};
8,593,541✔
647
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
8,594,015✔
648

649
  return 0;
8,593,791✔
650
}
651

652
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
104,223,976✔
653
                                   SOperatorInfo** pOptrInfo) {
654
  QRY_PARAM_CHECK(pOptrInfo);
104,223,976✔
655

656
  int32_t        code = 0;
104,228,460✔
657
  int32_t        lino = 0;
104,228,460✔
658
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
104,228,460✔
659
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
104,207,561✔
660
  if (pInfo == NULL || pOperator == NULL) {
104,206,719✔
661
    code = terrno;
×
662
    goto _error;
×
663
  }
664

665
  pOperator->pPhyNode = pExNode;
104,207,106✔
666
  pInfo->dynamicOp = pExNode->node.dynamicOp;
104,210,374✔
667
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
104,210,400✔
668
  QUERY_CHECK_CODE(code, lino, _error);
104,231,710✔
669

670
  code = tsem_init(&pInfo->ready, 0, 0);
104,231,710✔
671
  QUERY_CHECK_CODE(code, lino, _error);
104,232,288✔
672

673
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
104,232,288✔
674
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
104,231,710✔
675

676
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
104,221,683✔
677
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
104,227,130✔
678
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
104,229,542✔
679
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
104,230,186✔
680

681
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
104,230,945✔
682
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
104,227,655✔
683
  QUERY_CHECK_CODE(code, lino, _error);
104,227,261✔
684

685
  pInfo->seqLoadData = pExNode->seqRecvData;
104,227,261✔
686
  pInfo->dynTbname = pExNode->dynTbname;
104,227,643✔
687
  if (pInfo->dynTbname) {
104,220,440✔
688
    pInfo->seqLoadData = true;
11,932✔
689
  }
690
  pInfo->pTransporter = pTransporter;
104,217,544✔
691

692
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
104,222,933✔
693
                  pTaskInfo);
694
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
104,213,939✔
695

696
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
104,219,029✔
697
                            pTaskInfo->pStreamRuntimeInfo);
104,221,618✔
698
  QUERY_CHECK_CODE(code, lino, _error);
104,213,451✔
699
  qTrace("%s exchange op:%p", __func__, pOperator);
104,213,451✔
700
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
104,214,204✔
701
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
702
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
104,215,685✔
703
  *pOptrInfo = pOperator;
104,222,027✔
704
  return TSDB_CODE_SUCCESS;
104,222,521✔
705

706
_error:
×
707
  if (code != TSDB_CODE_SUCCESS) {
×
708
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
709
    pTaskInfo->code = code;
×
710
  }
711
  if (pInfo != NULL) {
×
712
    doDestroyExchangeOperatorInfo(pInfo);
×
713
  }
714

715
  if (pOperator != NULL) {
×
716
    pOperator->info = NULL;
×
717
    destroyOperator(pOperator);
×
718
  }
719
  return code;
×
720
}
721

722
void destroyExchangeOperatorInfo(void* param) {
104,235,337✔
723
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
104,235,337✔
724
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
104,235,337✔
725
  if (code != TSDB_CODE_SUCCESS) {
104,231,955✔
726
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
727
  }
728
}
104,231,955✔
729

730
void freeBlock(void* pParam) {
272,037,230✔
731
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
272,037,230✔
732
  blockDataDestroy(pBlock);
272,037,230✔
733
}
272,039,576✔
734

735
void freeSourceDataInfo(void* p) {
168,256,250✔
736
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
168,256,250✔
737
  taosMemoryFreeClear(pInfo->decompBuf);
168,256,250✔
738
  taosMemoryFreeClear(pInfo->pRsp);
168,261,634✔
739

740
  pInfo->decompBufSize = 0;
168,256,729✔
741
}
168,255,248✔
742

743
void doDestroyExchangeOperatorInfo(void* param) {
104,233,933✔
744
  if (param == NULL) {
104,233,933✔
745
    return;
×
746
  }
747
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
104,233,933✔
748
  if (pExInfo->pFetchRpcHandles) {
104,233,933✔
749
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
281,406,059✔
750
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
177,176,660✔
751
      if (*pRpcHandle > 0) {
177,175,668✔
752
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
902,285✔
753
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
902,285✔
754
      }
755
    }
756
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
104,235,573✔
757
  }
758

759
  taosArrayDestroy(pExInfo->pSources);
104,232,616✔
760
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
104,230,908✔
761

762
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
104,225,581✔
763
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
104,226,764✔
764

765
  blockDataDestroy(pExInfo->pDummyBlock);
104,226,201✔
766
  tSimpleHashCleanup(pExInfo->pHashSources);
104,228,632✔
767

768
  int32_t code = tsem_destroy(&pExInfo->ready);
104,228,027✔
769
  if (code != TSDB_CODE_SUCCESS) {
104,227,158✔
770
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
771
  }
772
  taosMemoryFreeClear(pExInfo->pTaskId);
104,227,158✔
773

774
  taosMemoryFreeClear(param);
104,215,187✔
775
}
776

777
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
216,639,155✔
778
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
216,639,155✔
779

780
  taosMemoryFreeClear(pMsg->pEpSet);
216,639,155✔
781
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
216,678,061✔
782
  if (pExchangeInfo == NULL) {
216,678,018✔
783
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
16,936✔
784
    taosMemoryFree(pMsg->pData);
16,936✔
785
    return TSDB_CODE_SUCCESS;
16,936✔
786
  }
787

788
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
216,661,082✔
789
  if (pWrapper->seqId != currSeqId) {
216,671,885✔
790
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
791
    taosMemoryFree(pMsg->pData);
×
792
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
793
    if (code != TSDB_CODE_SUCCESS) {
×
794
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
795
    }
796
    return TSDB_CODE_SUCCESS;
×
797
  }
798

799
  int32_t          index = pWrapper->sourceIndex;
216,626,143✔
800

801
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
216,634,129✔
802

803
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
216,648,497✔
804
  if (pRpcHandle != NULL) {
216,647,871✔
805
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
216,652,265✔
806
    if (ret != 0) {
216,601,014✔
807
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
1,012,779✔
808
    }
809
    *pRpcHandle = -1;
216,601,014✔
810
  }
811

812
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
216,599,309✔
813
  if (!pSourceDataInfo) {
216,631,362✔
814
    return terrno;
×
815
  }
816

817
  if (0 == code && NULL == pMsg->pData) {
216,631,362✔
818
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
819
    code = TSDB_CODE_QRY_INVALID_MSG;
×
820
  }
821

822
  taosWLockLatch(&pSourceDataInfo->lock);
216,629,736✔
823
  if (code == TSDB_CODE_SUCCESS) {
216,684,399✔
824
    pSourceDataInfo->seqId = pWrapper->seqId;
216,678,539✔
825
    pSourceDataInfo->pRsp = pMsg->pData;
216,622,087✔
826

827
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
216,632,787✔
828
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
216,582,723✔
829
    pRsp->compLen = htonl(pRsp->compLen);
216,581,624✔
830
    pRsp->payloadLen = htonl(pRsp->payloadLen);
216,563,918✔
831
    pRsp->numOfCols = htonl(pRsp->numOfCols);
216,505,662✔
832
    pRsp->useconds = htobe64(pRsp->useconds);
216,465,855✔
833
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
216,417,862✔
834

835
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
216,533,975✔
836
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
837
  } else {
838
    taosMemoryFree(pMsg->pData);
5,860✔
839
    pSourceDataInfo->code = rpcCvtErrCode(code);
5,860✔
840
    if (pSourceDataInfo->code != code) {
5,860✔
841
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
842
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
843
    } else {
844
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
5,860✔
845
             pExchangeInfo);
846
    }
847
  }
848

849
  tmemory_barrier();
216,564,054✔
850
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
216,564,054✔
851
  taosWUnLockLatch(&pSourceDataInfo->lock);
216,521,580✔
852
  
853
  code = tsem_post(&pExchangeInfo->ready);
216,536,992✔
854
  if (code != TSDB_CODE_SUCCESS) {
216,645,375✔
855
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
856
    return code;
×
857
  }
858

859
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
216,645,375✔
860
  if (code != TSDB_CODE_SUCCESS) {
216,685,025✔
861
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
862
  }
863
  return code;
216,674,578✔
864
}
865

866
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
249,752✔
867
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
249,752✔
868
  if (NULL == *ppRes) {
249,752✔
869
    return terrno;
×
870
  }
871

872
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
249,752✔
873
  if (NULL == pScan) {
249,752✔
874
    taosMemoryFreeClear(*ppRes);
×
875
    return terrno;
×
876
  }
877

878
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
249,752✔
879
  pScan->pUidList = taosArrayDup(pUidList, NULL);
249,752✔
880
  if (NULL == pScan->pUidList) {
249,752✔
881
    taosMemoryFree(pScan);
×
882
    taosMemoryFreeClear(*ppRes);
×
883
    return terrno;
×
884
  }
885
  pScan->dynType = DYN_TYPE_STB_JOIN;
249,752✔
886
  pScan->tableSeq = tableSeq;
249,752✔
887
  pScan->pOrgTbInfo = NULL;
249,752✔
888
  pScan->pBatchTbInfo = NULL;
249,752✔
889
  pScan->pTagList = NULL;
249,752✔
890
  pScan->isNewParam = false;
249,752✔
891
  pScan->window.skey = INT64_MAX;
249,752✔
892
  pScan->window.ekey = INT64_MIN;
249,752✔
893
  pScan->notifyToProcess = false;
249,752✔
894
  pScan->notifyTs = 0;
249,752✔
895

896
  (*ppRes)->opType = srcOpType;
249,752✔
897
  (*ppRes)->downstreamIdx = 0;
249,752✔
898
  (*ppRes)->value = pScan;
249,752✔
899
  (*ppRes)->pChildren = NULL;
249,752✔
900
  (*ppRes)->reUse = false;
249,752✔
901

902
  return TSDB_CODE_SUCCESS;
249,752✔
903
}
904

905
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
25,592,531✔
906
  int32_t                  code = TSDB_CODE_SUCCESS;
25,592,531✔
907
  int32_t                  lino = 0;
25,592,531✔
908
  STableScanOperatorParam* pScan = NULL;
25,592,531✔
909

910
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
25,592,531✔
911
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
25,592,752✔
912

913
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
25,592,752✔
914
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
25,592,752✔
915

916
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
25,592,752✔
917
  if (pUidList) {
25,592,752✔
918
    pScan->pUidList = taosArrayDup(pUidList, NULL);
25,592,752✔
919
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
25,592,752✔
920
  } else {
921
    pScan->pUidList = NULL;
×
922
  }
923

924
  if (pMap) {
25,592,752✔
925
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
24,886,762✔
926
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
24,886,762✔
927

928
    pScan->pOrgTbInfo->vgId = pMap->vgId;
24,886,762✔
929
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
24,886,762✔
930

931
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
24,886,762✔
932
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
24,886,762✔
933
  } else {
934
    pScan->pOrgTbInfo = NULL;
705,990✔
935
  }
936
  pScan->pTagList = NULL;
25,592,752✔
937
  pScan->pBatchTbInfo = NULL;
25,592,752✔
938

939

940
  pScan->dynType = type;
25,592,752✔
941
  pScan->tableSeq = tableSeq;
25,592,752✔
942
  pScan->window.skey = window->skey;
25,592,752✔
943
  pScan->window.ekey = window->ekey;
25,592,752✔
944
  pScan->isNewParam = isNewParam;
25,592,752✔
945
  pScan->notifyToProcess = false;
25,592,752✔
946
  pScan->notifyTs = 0;
25,592,752✔
947
  (*ppRes)->opType = srcOpType;
25,592,752✔
948
  (*ppRes)->downstreamIdx = 0;
25,592,752✔
949
  (*ppRes)->value = pScan;
25,592,752✔
950
  (*ppRes)->pChildren = NULL;
25,592,752✔
951
  (*ppRes)->reUse = false;
25,592,752✔
952

953
  return code;
25,592,752✔
954
_return:
×
955
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
956
  taosMemoryFreeClear(*ppRes);
×
957
  if (pScan) {
×
958
    taosArrayDestroy(pScan->pUidList);
×
959
    if (pScan->pOrgTbInfo) {
×
960
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
961
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
962
    }
963
    taosMemoryFree(pScan);
×
964
  }
965
  return code;
×
966
}
967

968
/**
969
  @brief build the table scan operator param for notify message
970
*/
971
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
218,481✔
972
                                          int32_t srcOpType, TSKEY notifyTs) {
973
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
218,481✔
974
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
975
    return TSDB_CODE_INVALID_PARA;
×
976
  }
977
  int32_t code = TSDB_CODE_SUCCESS;
218,481✔
978
  int32_t lino = 0;
218,481✔
979
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
218,481✔
980
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
218,481✔
981

982
  STableScanOperatorParam* pTsParam =
436,962✔
983
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
218,481✔
984
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
218,481✔
985

986
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
218,481✔
987
  pTsParam->notifyToProcess = true;
218,481✔
988
  pTsParam->notifyTs = notifyTs;
218,481✔
989

990
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
218,481✔
991
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
992
  (*ppRes)->downstreamIdx = 0;
218,481✔
993
  (*ppRes)->value = pTsParam;
218,481✔
994
  (*ppRes)->pChildren = NULL;
218,481✔
995
  /* param is not reusable when it is transferred by message */
996
  (*ppRes)->reUse = false;
218,481✔
997

998
_return:
218,481✔
999
  if (TSDB_CODE_SUCCESS != code) {
218,481✔
1000
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1001
           __func__, lino, tstrerror(code));
1002
    taosMemoryFreeClear(*ppRes);
×
1003
    if (pTsParam) {
×
1004
      taosMemoryFree(pTsParam);
×
1005
    }
1006
  }
1007
  return code;
218,481✔
1008
}
1009

1010
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
5,208,670✔
1011
  int32_t                  code = TSDB_CODE_SUCCESS;
5,208,670✔
1012
  int32_t                  lino = 0;
5,208,670✔
1013
  STableScanOperatorParam* pScan = NULL;
5,208,670✔
1014

1015
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
5,208,670✔
1016
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
5,208,670✔
1017

1018
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
5,208,670✔
1019
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
5,208,670✔
1020

1021
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
5,208,670✔
1022
  pScan->groupid = groupid;
5,208,670✔
1023
  if (pUidList) {
5,208,670✔
1024
    pScan->pUidList = taosArrayDup(pUidList, NULL);
5,208,670✔
1025
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
5,208,670✔
1026
  } else {
1027
    pScan->pUidList = NULL;
×
1028
  }
1029
  pScan->pOrgTbInfo = NULL;
5,208,670✔
1030

1031
  if (pBatchMap) {
5,208,670✔
1032
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
5,208,670✔
1033
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
5,208,670✔
1034
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
13,483,116✔
1035
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
8,274,446✔
1036
      SOrgTbInfo batchInfo = {0};
8,274,446✔
1037
      batchInfo.vgId = pSrcInfo->vgId;
8,274,446✔
1038
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
8,274,446✔
1039
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
8,274,446✔
1040
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
8,274,446✔
1041
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
8,274,446✔
1042
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
8,274,446✔
1043
    }
1044
  } else {
1045
    pScan->pBatchTbInfo = NULL;
×
1046
  }
1047

1048
  if (pTagList) {
5,208,670✔
1049
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
2,416,256✔
1050
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
2,416,256✔
1051

1052
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
16,353,364✔
1053
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
13,937,108✔
1054
      STagVal  dstTag;
13,937,108✔
1055
      dstTag.type = pSrcTag->type;
13,937,108✔
1056
      dstTag.cid = pSrcTag->cid;
13,937,108✔
1057
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
13,937,108✔
1058
        dstTag.nData = pSrcTag->nData;
6,115,608✔
1059
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
6,115,608✔
1060
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
6,115,608✔
1061
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
6,115,608✔
1062
      } else {
1063
        dstTag.i64 = pSrcTag->i64;
7,821,500✔
1064
      }
1065

1066
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
27,874,216✔
1067
    }
1068
  } else {
1069
    pScan->pTagList = NULL;
2,792,414✔
1070
  }
1071

1072

1073
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
5,208,670✔
1074
  pScan->tableSeq = tableSeq;
5,208,670✔
1075
  pScan->window.skey = window->skey;
5,208,670✔
1076
  pScan->window.ekey = window->ekey;
5,208,670✔
1077
  pScan->isNewParam = isNewParam;
5,208,670✔
1078
  pScan->notifyToProcess = false;
5,208,670✔
1079
  pScan->notifyTs = 0;
5,208,670✔
1080
  (*ppRes)->opType = srcOpType;
5,208,670✔
1081
  (*ppRes)->downstreamIdx = 0;
5,208,670✔
1082
  (*ppRes)->value = pScan;
5,208,670✔
1083
  (*ppRes)->pChildren = NULL;
5,208,670✔
1084
  (*ppRes)->reUse = false;
5,208,670✔
1085

1086
  return code;
5,208,670✔
1087
_return:
×
1088
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1089
  taosMemoryFreeClear(*ppRes);
×
1090
  if (pScan) {
×
1091
    taosArrayDestroy(pScan->pUidList);
×
1092
    if (pScan->pBatchTbInfo) {
×
1093
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1094
    }
1095
    taosMemoryFree(pScan);
×
1096
  }
1097
  return code;
×
1098
}
1099

1100
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam, EExchangeSourceType type) {
5,208,670✔
1101
  int32_t                  code = TSDB_CODE_SUCCESS;
5,208,670✔
1102
  int32_t                  lino = 0;
5,208,670✔
1103
  SOperatorParam*          pParam = NULL;
5,208,670✔
1104

1105
  switch (type) {
5,208,670✔
1106
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
5,004,158✔
1107
      pParam = taosMemoryMalloc(sizeof(SOperatorParam));
5,004,158✔
1108
      QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
5,004,158✔
1109

1110
      pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
5,004,158✔
1111
      QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
5,004,158✔
1112

1113
      SOperatorParam* pTableScanParam = NULL;
5,004,158✔
1114
      code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
5,004,158✔
1115
      QUERY_CHECK_CODE(code, lino, _return);
5,004,158✔
1116

1117
      QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
10,008,316✔
1118

1119
      pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
5,004,158✔
1120
      pParam->downstreamIdx = 0;
5,004,158✔
1121
      pParam->value = NULL;
5,004,158✔
1122
      pParam->reUse = false;
5,004,158✔
1123

1124
      break;
5,004,158✔
1125
    }
1126
    case EX_SRC_TYPE_VSTB_WIN_SCAN: {
204,512✔
1127
      code = buildTableScanOperatorParamBatchInfo(&pParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
204,512✔
1128
      QUERY_CHECK_CODE(code, lino, _return);
204,512✔
1129
      break;
204,512✔
1130
    }
1131
    default: {
×
1132
      code = TSDB_CODE_INVALID_PARA;
×
1133
      qError("%s failed at %d, invalid exchange source type:%d", __FUNCTION__, lino, type);
×
1134
      goto _return;
×
1135
    }
1136
  }
1137

1138
  *ppRes = pParam;
5,208,670✔
1139
  return code;
5,208,670✔
1140

1141
_return:
×
1142
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1143
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1144
  return code;
×
1145
}
1146

1147
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
3,716,359✔
1148
  int32_t                  code = TSDB_CODE_SUCCESS;
3,716,359✔
1149
  int32_t                  lino = 0;
3,716,359✔
1150
  STagScanOperatorParam*   pScan = NULL;
3,716,359✔
1151

1152
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,716,359✔
1153
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,716,359✔
1154

1155
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
3,716,359✔
1156
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,716,359✔
1157
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
3,716,359✔
1158

1159
  (*ppRes)->opType = srcOpType;
3,716,359✔
1160
  (*ppRes)->downstreamIdx = 0;
3,716,359✔
1161
  (*ppRes)->value = pScan;
3,716,359✔
1162
  (*ppRes)->pChildren = NULL;
3,716,359✔
1163
  (*ppRes)->reUse = false;
3,716,359✔
1164

1165
  return code;
3,716,359✔
1166
_return:
×
1167
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1168
  taosMemoryFreeClear(*ppRes);
×
1169
  if (pScan) {
×
1170
    taosMemoryFree(pScan);
×
1171
  }
1172
  return code;
×
1173
}
1174

1175
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,535,799✔
1176
  if (!pRuntimeInfo || !pTimeRange) {
4,535,799✔
1177
    return TSDB_CODE_INTERNAL_ERROR;
×
1178
  }
1179

1180
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,535,799✔
1181
  if (!pParam) {
4,535,817✔
1182
    return TSDB_CODE_INTERNAL_ERROR;
×
1183
  }
1184

1185
  switch (pRuntimeInfo->triggerType) {
4,535,817✔
1186
    case STREAM_TRIGGER_SLIDING:
3,522,236✔
1187
      // Unable to distinguish whether there is an interval, all use wstart/wend
1188
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1189
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,522,236✔
1190
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,522,236✔
1191
      break;
3,522,236✔
1192
    case STREAM_TRIGGER_PERIOD:
146,349✔
1193
      pTimeRange->skey = pParam->prevLocalTime;
146,349✔
1194
      pTimeRange->ekey = pParam->triggerTime;
146,349✔
1195
      break;
146,349✔
1196
    default:
867,012✔
1197
      pTimeRange->skey = pParam->wstart;
867,012✔
1198
      pTimeRange->ekey = pParam->wend;
867,452✔
1199
      break;
867,011✔
1200
  }
1201

1202
  return TSDB_CODE_SUCCESS;
4,535,596✔
1203
}
1204

1205
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
227,660,117✔
1206
  int32_t          code = TSDB_CODE_SUCCESS;
227,660,117✔
1207
  int32_t          lino = 0;
227,660,117✔
1208
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
227,660,117✔
1209
  if (!pDataInfo) {
227,659,172✔
1210
    return terrno;
×
1211
  }
1212

1213
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
227,659,172✔
1214
    return TSDB_CODE_SUCCESS;
10,884,709✔
1215
  }
1216

1217
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
216,777,387✔
1218
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
216,776,423✔
1219
  if (!pSource) {
216,771,084✔
1220
    return terrno;
×
1221
  }
1222

1223
  pDataInfo->startTime = taosGetTimestampUs();
216,768,576✔
1224
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
216,771,329✔
1225

1226
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
216,770,509✔
1227
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
216,767,982✔
1228
  pWrapper->exchangeId = pExchangeInfo->self;
216,767,982✔
1229
  pWrapper->sourceIndex = sourceIndex;
216,770,673✔
1230
  pWrapper->seqId = pExchangeInfo->seqId;
216,773,705✔
1231

1232
  if (pSource->localExec) {
216,770,889✔
1233
    SDataBuf pBuf = {0};
×
1234
    int32_t  code =
1235
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1236
                                  pTaskInfo->id.queryId, pSource->clientId,
1237
                                  pSource->taskId, 0, pSource->execId,
1238
                                  &pBuf.pData,
1239
                                  pTaskInfo->localFetch.explainRes);
1240
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1241
    taosMemoryFree(pWrapper);
×
1242
    QUERY_CHECK_CODE(code, lino, _end);
×
1243
  } else {
1244
    bool needStreamPesudoFuncVals = true;
216,764,432✔
1245
    SResFetchReq req = {0};
216,764,432✔
1246
    req.header.vgId = pSource->addr.nodeId;
216,774,872✔
1247
    req.sId = pSource->sId;
216,770,314✔
1248
    req.clientId = pSource->clientId;
216,767,265✔
1249
    req.taskId = pSource->taskId;
216,773,066✔
1250
    req.queryId = pTaskInfo->id.queryId;
216,765,986✔
1251
    req.execId = pSource->execId;
216,765,719✔
1252
    if (pTaskInfo->pStreamRuntimeInfo) {
216,767,254✔
1253
      req.dynTbname = pExchangeInfo->dynTbname;
7,988,113✔
1254
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
7,988,113✔
1255
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
7,988,138✔
1256

1257
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
7,988,553✔
1258
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
33,637✔
1259
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
7,954,238✔
1260
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,535,359✔
1261
        QUERY_CHECK_CODE(code, lino, _end);
4,535,596✔
1262
        needStreamPesudoFuncVals = false;
4,535,596✔
1263
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,535,596✔
1264
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1265
               req.pStRtFuncInfo->curWindow.ekey);
1266
      }
1267
      if (!pDataInfo->fetchSent) {
7,987,664✔
1268
        req.reset = pDataInfo->fetchSent = true;
6,160,392✔
1269
      }
1270
    }
1271

1272
    switch (pDataInfo->type) {
216,764,978✔
1273
      case EX_SRC_TYPE_VSTB_SCAN: {
24,886,762✔
1274
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
24,886,762✔
1275
        taosArrayDestroy(pDataInfo->orgTbInfo->colMap);
24,886,762✔
1276
        taosMemoryFreeClear(pDataInfo->orgTbInfo);
24,886,762✔
1277
        taosArrayDestroy(pDataInfo->pSrcUidList);
24,886,762✔
1278
        pDataInfo->pSrcUidList = NULL;
24,886,762✔
1279
        if (TSDB_CODE_SUCCESS != code) {
24,886,762✔
1280
          pTaskInfo->code = code;
×
1281
          taosMemoryFree(pWrapper);
×
1282
          return pTaskInfo->code;
×
1283
        }
1284
        break;
24,886,762✔
1285
      }
1286
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
1,497,609✔
1287
        if (pDataInfo->pSrcUidList) {
1,497,609✔
1288
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
705,990✔
1289
          taosArrayDestroy(pDataInfo->pSrcUidList);
705,990✔
1290
          pDataInfo->pSrcUidList = NULL;
705,990✔
1291
          if (TSDB_CODE_SUCCESS != code) {
705,990✔
1292
            pTaskInfo->code = code;
×
1293
            taosMemoryFree(pWrapper);
×
1294
            return pTaskInfo->code;
×
1295
          }
1296
        }
1297
        break;
1,497,609✔
1298
      }
1299
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
3,716,359✔
1300
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
3,716,359✔
1301
        taosArrayDestroy(pDataInfo->pSrcUidList);
3,716,359✔
1302
        pDataInfo->pSrcUidList = NULL;
3,716,359✔
1303
        if (TSDB_CODE_SUCCESS != code) {
3,716,359✔
1304
          pTaskInfo->code = code;
×
1305
          taosMemoryFree(pWrapper);
×
1306
          return pTaskInfo->code;
×
1307
        }
1308
        break;
3,716,359✔
1309
      }
1310
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
5,485,810✔
1311
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1312
        if (pDataInfo->batchOrgTbInfo) {
5,485,810✔
1313
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, pDataInfo->type);
5,208,670✔
1314
          if (pDataInfo->batchOrgTbInfo) {
5,208,670✔
1315
            for (int32_t i = 0; i < taosArrayGetSize(pDataInfo->batchOrgTbInfo); ++i) {
13,483,116✔
1316
              SOrgTbInfo* pColMap = taosArrayGet(pDataInfo->batchOrgTbInfo, i);
8,274,446✔
1317
              if (pColMap) {
8,274,446✔
1318
                taosArrayDestroy(pColMap->colMap);
8,274,446✔
1319
              }
1320
            }
1321
            taosArrayDestroy(pDataInfo->batchOrgTbInfo);
5,208,670✔
1322
            pDataInfo->batchOrgTbInfo = NULL;
5,208,670✔
1323
          }
1324
          if (pDataInfo->tagList) {
5,208,670✔
1325
            taosArrayDestroyEx(pDataInfo->tagList, destroyTagVal);
2,416,256✔
1326
            pDataInfo->tagList = NULL;
2,416,256✔
1327
          }
1328
          if (pDataInfo->pSrcUidList) {
5,208,670✔
1329
            taosArrayDestroy(pDataInfo->pSrcUidList);
5,208,670✔
1330
            pDataInfo->pSrcUidList = NULL;
5,208,670✔
1331
          }
1332

1333
          if (TSDB_CODE_SUCCESS != code) {
5,208,670✔
1334
            pTaskInfo->code = code;
×
1335
            taosMemoryFree(pWrapper);
×
1336
            return pTaskInfo->code;
×
1337
          }
1338
        }
1339
        break;
5,485,810✔
1340
      }
1341
      case EX_SRC_TYPE_STB_JOIN_SCAN:
181,176,560✔
1342
      default: {
1343
        if (pDataInfo->pSrcUidList) {
181,176,560✔
1344
          code = buildTableScanOperatorParam(&req.pOpParam,
238,130✔
1345
                                             pDataInfo->pSrcUidList,
1346
                                             pDataInfo->srcOpType,
1347
                                             pDataInfo->tableSeq);
238,130✔
1348
          /* source uid list can be reused in vnode size, so only use once */
1349
          taosArrayDestroy(pDataInfo->pSrcUidList);
238,130✔
1350
          pDataInfo->pSrcUidList = NULL;
238,130✔
1351
          if (TSDB_CODE_SUCCESS != code) {
238,130✔
1352
            pTaskInfo->code = code;
×
1353
            taosMemoryFree(pWrapper);
×
1354
            return pTaskInfo->code;
×
1355
          }
1356
        }
1357
        if (pExchangeInfo->notifyToSend) {
181,186,755✔
1358
          if (NULL == req.pOpParam) {
218,481✔
1359
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
218,481✔
1360
                                                     pDataInfo->srcOpType,
1361
                                                     pExchangeInfo->notifyTs);
1362
            if (TSDB_CODE_SUCCESS != code) {
218,481✔
1363
              pTaskInfo->code = code;
×
1364
              taosMemoryFree(pWrapper);
×
1365
              return pTaskInfo->code;
×
1366
            }
1367
          } else {
1368
            /**
1369
              Currently don't support use the same param for multiple times!
1370
            */
1371
            qError("%s, %s failed, currently don't support use the same param "
×
1372
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
1373
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
1374
            taosMemoryFree(pWrapper);
×
1375
            return pTaskInfo->code;
×
1376
          }
1377
          pExchangeInfo->notifyToSend = false;
218,481✔
1378
        }
1379
        break;
181,182,385✔
1380
      }
1381
    }
1382

1383
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
216,768,925✔
1384
    if (msgSize < 0) {
216,748,053✔
1385
      pTaskInfo->code = msgSize;
×
1386
      taosMemoryFree(pWrapper);
×
1387
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1388
      return pTaskInfo->code;
×
1389
    }
1390

1391
    void* msg = taosMemoryCalloc(1, msgSize);
216,748,053✔
1392
    if (NULL == msg) {
216,733,104✔
1393
      pTaskInfo->code = terrno;
×
1394
      taosMemoryFree(pWrapper);
×
1395
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1396
      return pTaskInfo->code;
×
1397
    }
1398

1399
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
216,733,104✔
1400
    if (msgSize < 0) {
216,741,763✔
1401
      pTaskInfo->code = msgSize;
×
1402
      taosMemoryFree(pWrapper);
×
1403
      taosMemoryFree(msg);
×
1404
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1405
      return pTaskInfo->code;
×
1406
    }
1407

1408
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
216,741,763✔
1409

1410
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
216,744,627✔
1411
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1412
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1413
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1414

1415
    // send the fetch remote task result reques
1416
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
216,775,416✔
1417
    if (NULL == pMsgSendInfo) {
216,769,873✔
1418
      taosMemoryFreeClear(msg);
×
1419
      taosMemoryFree(pWrapper);
×
1420
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1421
      pTaskInfo->code = terrno;
×
1422
      return pTaskInfo->code;
×
1423
    }
1424

1425
    pMsgSendInfo->param = pWrapper;
216,769,873✔
1426
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
216,774,169✔
1427
    pMsgSendInfo->msgInfo.pData = msg;
216,775,971✔
1428
    pMsgSendInfo->msgInfo.len = msgSize;
216,773,783✔
1429
    pMsgSendInfo->msgType = pSource->fetchMsgType;
216,770,388✔
1430
    pMsgSendInfo->fp = loadRemoteDataCallback;
216,774,677✔
1431
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
216,772,072✔
1432

1433
    int64_t transporterId = 0;
216,776,618✔
1434
    void* poolHandle = NULL;
216,775,737✔
1435
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
216,775,737✔
1436
    QUERY_CHECK_CODE(code, lino, _end);
216,780,177✔
1437
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
216,780,177✔
1438
    *pRpcHandle = transporterId;
216,780,867✔
1439
  }
1440

1441
_end:
216,780,355✔
1442
  if (code != TSDB_CODE_SUCCESS) {
216,780,355✔
1443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1444
  }
1445
  return code;
216,780,717✔
1446
}
1447

1448
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
165,290,836✔
1449
                          SOperatorInfo* pOperator) {
1450
  pInfo->totalRows += numOfRows;
165,290,836✔
1451
  pInfo->totalSize += dataLen;
165,289,989✔
1452
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
165,290,726✔
1453
  pOperator->resultInfo.totalRows += numOfRows;
165,289,301✔
1454
}
165,290,726✔
1455

1456
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan) {
455,374,985✔
1457
  int32_t      code = TSDB_CODE_SUCCESS;
455,374,985✔
1458
  int32_t      lino = 0;
455,374,985✔
1459
  SSDataBlock* pBlock = NULL;
455,374,985✔
1460
  if (isVstbScan) {
455,376,857✔
1461
    blockDataCleanup(pRes);
17,696,292✔
1462
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
17,696,292✔
1463
    QUERY_CHECK_CODE(code, lino, _end);
17,696,070✔
1464
  }
1465
  if (pColList == NULL) {  // data from other sources
455,376,635✔
1466
    blockDataCleanup(pRes);
450,825,750✔
1467
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
450,823,894✔
1468
    QUERY_CHECK_CODE(code, lino, _end);
450,826,734✔
1469
  } else {  // extract data according to pColList
1470
    char* pStart = pData;
4,550,885✔
1471

1472
    int32_t numOfCols = htonl(*(int32_t*)pStart);
4,550,885✔
1473
    pStart += sizeof(int32_t);
4,550,885✔
1474

1475
    // todo refactor:extract method
1476
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
4,550,885✔
1477
    for (int32_t i = 0; i < numOfCols; ++i) {
64,194,472✔
1478
      SSysTableSchema* p = (SSysTableSchema*)pStart;
59,643,587✔
1479

1480
      p->colId = htons(p->colId);
59,643,587✔
1481
      p->bytes = htonl(p->bytes);
59,643,587✔
1482
      pStart += sizeof(SSysTableSchema);
59,643,587✔
1483
    }
1484

1485
    pBlock = NULL;
4,550,885✔
1486
    code = createDataBlock(&pBlock);
4,550,885✔
1487
    QUERY_CHECK_CODE(code, lino, _end);
4,550,885✔
1488

1489
    for (int32_t i = 0; i < numOfCols; ++i) {
64,194,472✔
1490
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
59,643,587✔
1491
      code = blockDataAppendColInfo(pBlock, &idata);
59,643,587✔
1492
      QUERY_CHECK_CODE(code, lino, _end);
59,643,587✔
1493
    }
1494

1495
    code = blockDecodeInternal(pBlock, pStart, NULL);
4,550,885✔
1496
    QUERY_CHECK_CODE(code, lino, _end);
4,550,885✔
1497

1498
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
4,550,885✔
1499
    QUERY_CHECK_CODE(code, lino, _end);
4,550,885✔
1500

1501
    // data from mnode
1502
    pRes->info.dataLoad = 1;
4,550,885✔
1503
    pRes->info.rows = pBlock->info.rows;
4,550,885✔
1504
    pRes->info.scanFlag = MAIN_SCAN;
4,550,885✔
1505
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
4,550,885✔
1506
    QUERY_CHECK_CODE(code, lino, _end);
4,550,885✔
1507

1508
    blockDataDestroy(pBlock);
4,550,885✔
1509
    pBlock = NULL;
4,550,885✔
1510
  }
1511

1512
_end:
455,377,619✔
1513
  if (code != TSDB_CODE_SUCCESS) {
455,377,368✔
1514
    blockDataDestroy(pBlock);
×
1515
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1516
  }
1517
  return code;
455,377,368✔
1518
}
1519

1520
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
89,258,741✔
1521
  SExchangeInfo* pExchangeInfo = pOperator->info;
89,258,741✔
1522
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
89,258,741✔
1523

1524
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
89,258,741✔
1525
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
89,258,741✔
1526
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
89,258,520✔
1527
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1528
         pLoadInfo->totalElapsed / 1000.0);
1529

1530
  setOperatorCompleted(pOperator);
89,258,520✔
1531
}
89,258,741✔
1532

1533
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
290,267,195✔
1534
  int32_t code = TSDB_CODE_SUCCESS;
290,267,195✔
1535
  int32_t lino = 0;
290,267,195✔
1536
  size_t  total = taosArrayGetSize(pArray);
290,267,195✔
1537

1538
  int32_t completed = 0;
290,266,595✔
1539
  for (int32_t k = 0; k < total; ++k) {
961,894,939✔
1540
    SSourceDataInfo* p = taosArrayGet(pArray, k);
671,632,340✔
1541
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
671,629,523✔
1542
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
671,629,523✔
1543
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
330,386,726✔
1544
      completed += 1;
330,388,967✔
1545
    }
1546
  }
1547

1548
  *pRes = completed;
290,262,599✔
1549
_end:
290,267,516✔
1550
  if (code != TSDB_CODE_SUCCESS) {
290,267,516✔
1551
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1552
  }
1553
  return code;
290,269,635✔
1554
}
1555

1556
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
106,710,923✔
1557
  SExchangeInfo* pExchangeInfo = pOperator->info;
106,710,923✔
1558
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
106,713,780✔
1559

1560
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
106,709,501✔
1561
  int64_t startTs = taosGetTimestampUs();
106,706,636✔
1562

1563
  // Asynchronously send all fetch requests to all sources.
1564
  for (int32_t i = 0; i < totalSources; ++i) {
286,392,831✔
1565
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
179,674,762✔
1566
    if (code != TSDB_CODE_SUCCESS) {
179,686,853✔
1567
      pTaskInfo->code = code;
658✔
1568
      return code;
×
1569
    }
1570
  }
1571

1572
  int64_t endTs = taosGetTimestampUs();
106,714,528✔
1573
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
106,714,528✔
1574
         totalSources, (endTs - startTs) / 1000.0);
1575

1576
  pOperator->status = OP_RES_TO_RETURN;
106,715,039✔
1577
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
106,715,781✔
1578
  if (isTaskKilled(pTaskInfo)) {
106,715,262✔
1579
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1580
  }
1581

1582
  return TSDB_CODE_SUCCESS;
106,714,543✔
1583
}
1584

1585
/**
1586
  @brief store STEP DONE notification info
1587
*/
1588
void storeNotifyInfo(SOperatorInfo* pOperator) {
4,003,842✔
1589
  SExchangeInfo*  pExchangeInfo = pOperator->info;
4,003,842✔
1590
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4,003,842✔
1591
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,003,842✔
1592

1593
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
4,003,842✔
1594
  if (!pParam->multiParams) {
4,003,842✔
1595
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
4,003,842✔
1596
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
4,003,842✔
1597
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1598
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
1599
      return;
×
1600
    }
1601

1602
    pExchangeInfo->notifyToSend = true;
4,003,842✔
1603
    pExchangeInfo->notifyTs = pBasic->notifyTs;
4,003,842✔
1604
  } else {
1605
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1606
           GET_TASKID(pTaskInfo), __func__);
1607
  }
1608
}
1609

1610
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
160,739,275✔
1611
  int32_t            code = TSDB_CODE_SUCCESS;
160,739,275✔
1612
  int32_t            lino = 0;
160,739,275✔
1613
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
160,739,275✔
1614
  SSDataBlock*       pb = NULL;
160,739,820✔
1615

1616
  char* pNextStart = pRetrieveRsp->data;
160,739,309✔
1617
  char* pStart = pNextStart;
160,738,732✔
1618

1619
  int32_t index = 0;
160,738,732✔
1620

1621
  if (pRetrieveRsp->compressed) {  // decompress the data
160,738,732✔
1622
    if (pDataInfo->decompBuf == NULL) {
×
1623
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1624
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1625
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1626
    } else {
1627
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1628
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1629
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1630
        if (p != NULL) {
×
1631
          pDataInfo->decompBuf = p;
×
1632
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1633
        }
1634
      }
1635
    }
1636
  }
1637

1638
  while (index++ < pRetrieveRsp->numOfBlocks) {
611,565,023✔
1639
    pStart = pNextStart;
450,824,497✔
1640

1641
    if (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN) {
450,824,497✔
1642
      pb = taosMemoryCalloc(1, sizeof(SSDataBlock));
17,696,292✔
1643
      QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
17,696,292✔
1644
    } else if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
433,128,118✔
1645
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
178,772,416✔
1646
      blockDataCleanup(pb);
178,772,416✔
1647
    } else {
1648
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
254,355,414✔
1649
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
254,355,563✔
1650
    }
1651

1652
    int32_t compLen = *(int32_t*)pStart;
450,824,568✔
1653
    pStart += sizeof(int32_t);
450,825,504✔
1654

1655
    int32_t rawLen = *(int32_t*)pStart;
450,826,130✔
1656
    pStart += sizeof(int32_t);
450,826,130✔
1657
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
450,825,773✔
1658

1659
    pNextStart = pStart + compLen;
450,825,773✔
1660
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
450,825,610✔
1661
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1662
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1663
      pStart = pDataInfo->decompBuf;
×
1664
    }
1665

1666
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart, (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN));
450,825,967✔
1667
    if (code != 0) {
450,820,631✔
1668
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1669
      goto _end;
×
1670
    }
1671

1672
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
450,820,631✔
1673
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
450,820,951✔
1674
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
450,820,951✔
1675
    pb = NULL;
450,825,560✔
1676
  }
1677

1678
_end:
160,739,812✔
1679
  if (code != TSDB_CODE_SUCCESS) {
160,739,812✔
1680
    blockDataDestroy(pb);
×
1681
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1682
  }
1683
  return code;
160,739,812✔
1684
}
1685

1686
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
28,767,895✔
1687
  SExchangeInfo* pExchangeInfo = pOperator->info;
28,767,895✔
1688
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
28,767,895✔
1689

1690
  int32_t code = 0;
28,767,674✔
1691
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
28,767,674✔
1692
  int64_t startTs = taosGetTimestampUs();
28,767,895✔
1693

1694
  int32_t vgId = 0;
28,767,895✔
1695
  if (pExchangeInfo->dynTbname) {
28,767,895✔
1696
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
68,724✔
1697
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
68,724✔
1698
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
68,724✔
1699
      if (pValue != NULL && pValue->isTbname) {
68,724✔
1700
        vgId = pValue->vgId;
68,724✔
1701
        break;
68,724✔
1702
      }
1703
    }
1704
  }
1705

1706
  while (1) {
7,222,823✔
1707
    if (pExchangeInfo->current >= totalSources) {
35,990,718✔
1708
      setAllSourcesCompleted(pOperator);
7,207,234✔
1709
      return TSDB_CODE_SUCCESS;
7,207,234✔
1710
    }
1711

1712
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
28,783,484✔
1713
    if (!pSource) {
28,783,484✔
1714
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1715
      pTaskInfo->code = terrno;
×
1716
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1717
    }
1718

1719
    if (vgId != 0 && pSource->addr.nodeId != vgId){
28,783,484✔
1720
      pExchangeInfo->current += 1;
47,110✔
1721
      continue;
47,110✔
1722
    }
1723

1724
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
28,736,374✔
1725
    if (!pDataInfo) {
28,736,374✔
1726
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1727
      pTaskInfo->code = terrno;
×
1728
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1729
    }
1730
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
28,736,374✔
1731

1732
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
28,736,374✔
1733
    if (code != TSDB_CODE_SUCCESS) {
28,736,374✔
1734
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1735
      pTaskInfo->code = code;
×
1736
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1737
    }
1738

1739
    while (true) {
758✔
1740
      code = exchangeWait(pOperator, pExchangeInfo);
28,737,132✔
1741
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
28,737,132✔
1742
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
758✔
1743
      }
1744

1745
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
28,736,374✔
1746
      if (pDataInfo->seqId != currSeqId) {
28,736,374✔
1747
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
758✔
1748
        taosMemoryFreeClear(pDataInfo->pRsp);
758✔
1749
        continue;
758✔
1750
      }
1751

1752
      break;
28,735,616✔
1753
    }
1754

1755
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
28,735,616✔
1756
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
626✔
1757
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1758
             tstrerror(pDataInfo->code));
1759
      pOperator->pTaskInfo->code = pDataInfo->code;
626✔
1760
      return pOperator->pTaskInfo->code;
626✔
1761
    }
1762

1763
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
28,734,990✔
1764
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
28,734,990✔
1765

1766
    if (pRsp->numOfRows == 0) {
28,734,990✔
1767
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
7,175,713✔
1768
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1769
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1770
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1771

1772
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
7,175,713✔
1773
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
7,175,713✔
1774
        pExchangeInfo->current = totalSources;
7,101,377✔
1775
      } else {
1776
        pExchangeInfo->current += 1;
74,336✔
1777
      }
1778
      taosMemoryFreeClear(pDataInfo->pRsp);
7,175,713✔
1779
      continue;
7,175,713✔
1780
    }
1781

1782
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
21,559,277✔
1783
    if (code != TSDB_CODE_SUCCESS) {
21,559,277✔
1784
      goto _error;
×
1785
    }
1786

1787
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
21,559,277✔
1788
    if (pRsp->completed == 1) {
21,559,277✔
1789
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
113,559✔
1790
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1791
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1792
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1793
             pExchangeInfo->current + 1, totalSources);
1794

1795
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
113,559✔
1796
      if (isVstbScan(pDataInfo)) {
113,559✔
1797
        pExchangeInfo->current = totalSources;
×
1798
      } else {
1799
        pExchangeInfo->current += 1;
113,559✔
1800
      }
1801
    } else {
1802
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
21,445,718✔
1803
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1804
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1805
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1806
    }
1807
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
21,559,277✔
1808
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
21,359,320✔
1809
    }
1810
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
21,559,277✔
1811
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
21,559,277✔
1812

1813
    taosMemoryFreeClear(pDataInfo->pRsp);
21,559,277✔
1814
    return TSDB_CODE_SUCCESS;
21,559,277✔
1815
  }
1816

1817
_error:
×
1818
  pTaskInfo->code = code;
×
1819
  return code;
×
1820
}
1821

1822
void clearVtbScanDataInfo(void* pItem) {
6,032,671✔
1823
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
6,032,671✔
1824
  if (pInfo->orgTbInfo) {
6,032,671✔
1825
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
×
1826
    taosMemoryFreeClear(pInfo->orgTbInfo);
×
1827
  }
1828
  if (pInfo->batchOrgTbInfo) {
6,032,671✔
1829
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
×
1830
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
×
1831
      if (pColMap) {
×
1832
        taosArrayDestroy(pColMap->colMap);
×
1833
      }
1834
    }
1835
    taosArrayDestroy(pInfo->batchOrgTbInfo);
×
1836
  }
1837
  if (pInfo->tagList) {
6,032,671✔
1838
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
×
1839
    pInfo->tagList = NULL;
×
1840
  }
1841
  taosArrayDestroy(pInfo->pSrcUidList);
6,032,671✔
1842
}
6,032,671✔
1843

1844
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
5,208,670✔
1845
  int32_t  code = TSDB_CODE_SUCCESS;
5,208,670✔
1846
  int32_t  lino = 0;
5,208,670✔
1847
  STagVal  dstTag;
5,208,670✔
1848
  bool     needFree = false;
5,208,670✔
1849

1850
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
5,208,670✔
1851
    qError("%s failed since invalid exchange operator param type %d",
×
1852
      __func__, pBasicParam->paramType);
1853
    return TSDB_CODE_INVALID_PARA;
×
1854
  }
1855

1856
  if (pDataInfo->tagList) {
5,208,670✔
1857
    taosArrayClear(pDataInfo->tagList);
×
1858
  }
1859

1860
  if (pBasicParam->tagList) {
5,208,670✔
1861
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
2,416,256✔
1862
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
2,416,256✔
1863

1864
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
16,353,364✔
1865
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
13,937,108✔
1866
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
13,937,108✔
1867

1868
      dstTag = (STagVal){0};
13,937,108✔
1869
      dstTag.type = pSrcTag->type;
13,937,108✔
1870
      dstTag.cid = pSrcTag->cid;
13,937,108✔
1871
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
13,937,108✔
1872
        dstTag.nData = pSrcTag->nData;
6,115,608✔
1873
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
6,115,608✔
1874
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
6,115,608✔
1875
        needFree = true;
6,115,608✔
1876
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
6,115,608✔
1877
      } else {
1878
        dstTag.i64 = pSrcTag->i64;
7,821,500✔
1879
      }
1880

1881
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
27,874,216✔
1882
      needFree = false;
13,937,108✔
1883
    }
1884
  } else {
1885
    pDataInfo->tagList = NULL;
2,792,414✔
1886
  }
1887

1888
  return code;
5,208,670✔
1889
_return:
×
1890
  if (needFree) {
×
1891
    taosMemoryFreeClear(dstTag.pData);
×
1892
  }
1893
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1894
  return code;
×
1895
}
1896

1897
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
5,208,670✔
1898
  int32_t     code = TSDB_CODE_SUCCESS;
5,208,670✔
1899
  int32_t     lino = 0;
5,208,670✔
1900
  SOrgTbInfo  dstOrgTbInfo = {0};
5,208,670✔
1901
  bool        needFree = false;
5,208,670✔
1902

1903
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
5,208,670✔
1904
    qError("%s failed since invalid exchange operator param type %d",
×
1905
      __func__, pBasicParam->paramType);
1906
    return TSDB_CODE_INVALID_PARA;
×
1907
  }
1908

1909
  if (pBasicParam->batchOrgTbInfo) {
5,208,670✔
1910
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
5,208,670✔
1911
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
5,208,670✔
1912

1913
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
13,483,116✔
1914
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
8,274,446✔
1915
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
8,274,446✔
1916

1917
      dstOrgTbInfo = (SOrgTbInfo){0};
8,274,446✔
1918
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
8,274,446✔
1919
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
8,274,446✔
1920

1921
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
8,274,446✔
1922
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
8,274,446✔
1923

1924
      needFree = true;
8,274,446✔
1925
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
16,548,892✔
1926
      needFree = false;
8,274,446✔
1927
    }
1928
  } else {
1929
    pBasicParam->batchOrgTbInfo = NULL;
×
1930
  }
1931

1932
  return code;
5,208,670✔
1933
_return:
×
1934
  if (needFree) {
×
1935
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
1936
  }
1937
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1938
  return code;
×
1939
}
1940

1941
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
34,755,911✔
1942
                                SExchangeOperatorBasicParam* pBasicParam) {
1943
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
34,755,911✔
1944
    qWarn("%s, %s found invalid exchange operator param type %d",
×
1945
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
1946
    return TSDB_CODE_SUCCESS;
×
1947
  }
1948

1949
  int32_t            code = TSDB_CODE_SUCCESS;
34,755,911✔
1950
  int32_t            lino = 0;
34,755,911✔
1951
  SExchangeInfo*     pExchangeInfo = pOperator->info;
34,755,911✔
1952
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
34,755,911✔
1953

1954
  if (NULL == pIdx) {
34,755,911✔
1955
    if (pBasicParam->isNewDeployed) {
2,387✔
1956
      SDownstreamSourceNode *pNode = NULL;
2,387✔
1957
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,387✔
1958
      QUERY_CHECK_CODE(code, lino, _return);
2,387✔
1959

1960
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,387✔
1961
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,387✔
1962
      QUERY_CHECK_CODE(code, lino, _return);
2,387✔
1963

1964
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,387✔
1965
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,387✔
1966

1967
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,387✔
1968
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,387✔
1969
      if (pExchangeInfo->pHashSources) {
2,387✔
1970
        QUERY_CHECK_CODE(code, lino, _return);
2,387✔
1971
      }
1972
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,387✔
1973
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,387✔
1974
    } else {
1975
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1976
      return TSDB_CODE_INVALID_PARA;
×
1977
    }
1978
  }
1979

1980
  qDebug("start to add single exchange source");
34,755,911✔
1981

1982
  switch (pBasicParam->type) {
34,755,911✔
1983
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
5,208,670✔
1984
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1985
      if (pIdx->inUseIdx < 0) {
5,208,670✔
1986
        SSourceDataInfo dataInfo = {0};
2,555,588✔
1987
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2,555,588✔
1988
        dataInfo.taskId = pExchangeInfo->pTaskId;
2,555,588✔
1989
        dataInfo.index = pIdx->srcIdx;
2,555,588✔
1990
        dataInfo.groupid = pBasicParam->groupid;
2,555,588✔
1991
        dataInfo.window = pBasicParam->window;
2,555,588✔
1992
        dataInfo.isNewParam = pBasicParam->isNewParam;
2,555,588✔
1993
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
2,555,588✔
1994
        QUERY_CHECK_CODE(code, lino, _return);
2,555,588✔
1995

1996
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
2,555,588✔
1997
        QUERY_CHECK_CODE(code, lino, _return);
2,555,588✔
1998

1999
        dataInfo.orgTbInfo = NULL;
2,555,588✔
2000

2001
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,555,588✔
2002
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
2,555,588✔
2003

2004
        dataInfo.type = pBasicParam->type;
2,555,588✔
2005
        dataInfo.srcOpType = pBasicParam->srcOpType;
2,555,588✔
2006
        dataInfo.tableSeq = pBasicParam->tableSeq;
2,555,588✔
2007

2008
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
5,111,176✔
2009

2010
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
2,555,588✔
2011
      } else {
2012
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,653,082✔
2013
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,653,082✔
2014

2015
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,653,082✔
2016
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,653,082✔
2017
        }
2018

2019
        pDataInfo->taskId = pExchangeInfo->pTaskId;
2,653,082✔
2020
        pDataInfo->index = pIdx->srcIdx;
2,653,082✔
2021
        pDataInfo->window = pBasicParam->window;
2,653,082✔
2022
        pDataInfo->groupid = pBasicParam->groupid;
2,653,082✔
2023
        pDataInfo->isNewParam = pBasicParam->isNewParam;
2,653,082✔
2024

2025
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
2,653,082✔
2026
        QUERY_CHECK_CODE(code, lino, _return);
2,653,082✔
2027

2028
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
2,653,082✔
2029
        QUERY_CHECK_CODE(code, lino, _return);
2,653,082✔
2030

2031
        pDataInfo->orgTbInfo = NULL;
2,653,082✔
2032

2033
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,653,082✔
2034
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,653,082✔
2035

2036
        pDataInfo->type = pBasicParam->type;
2,653,082✔
2037
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,653,082✔
2038
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,653,082✔
2039
      }
2040
      break;
5,208,670✔
2041
    }
2042
    case EX_SRC_TYPE_VTB_WIN_SCAN:
4,422,349✔
2043
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2044
      SSourceDataInfo dataInfo = {0};
4,422,349✔
2045
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
4,422,349✔
2046
      dataInfo.taskId = pExchangeInfo->pTaskId;
4,422,349✔
2047
      dataInfo.index = pIdx->srcIdx;
4,422,349✔
2048
      dataInfo.window = pBasicParam->window;
4,422,349✔
2049
      dataInfo.groupid = 0;
4,422,349✔
2050
      dataInfo.orgTbInfo = NULL;
4,422,349✔
2051
      dataInfo.tagList = NULL;
4,422,349✔
2052

2053
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
4,422,349✔
2054
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
4,422,349✔
2055

2056
      dataInfo.isNewParam = false;
4,422,349✔
2057
      dataInfo.type = pBasicParam->type;
4,422,349✔
2058
      dataInfo.srcOpType = pBasicParam->srcOpType;
4,422,349✔
2059
      dataInfo.tableSeq = pBasicParam->tableSeq;
4,422,349✔
2060

2061
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
4,422,349✔
2062
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
8,844,698✔
2063
      break;
4,422,349✔
2064
    }
2065
    case EX_SRC_TYPE_VSTB_SCAN: {
24,886,762✔
2066
      SSourceDataInfo dataInfo = {0};
24,886,762✔
2067
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
24,886,762✔
2068
      dataInfo.taskId = pExchangeInfo->pTaskId;
24,886,762✔
2069
      dataInfo.index = pIdx->srcIdx;
24,886,762✔
2070
      dataInfo.window = pBasicParam->window;
24,886,762✔
2071
      dataInfo.groupid = 0;
24,886,762✔
2072
      dataInfo.isNewParam = pBasicParam->isNewParam;
24,886,762✔
2073
      dataInfo.tagList = NULL;
24,886,762✔
2074
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
24,886,762✔
2075
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
24,886,762✔
2076
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
24,886,762✔
2077
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
24,886,762✔
2078
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
24,886,762✔
2079
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
24,886,762✔
2080

2081
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
24,886,541✔
2082
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
24,886,762✔
2083

2084
      dataInfo.type = pBasicParam->type;
24,886,762✔
2085
      dataInfo.srcOpType = pBasicParam->srcOpType;
24,886,762✔
2086
      dataInfo.tableSeq = pBasicParam->tableSeq;
24,886,762✔
2087

2088
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
24,886,762✔
2089
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
49,773,524✔
2090
      break;
24,886,762✔
2091
    }
2092
    case EX_SRC_TYPE_STB_JOIN_SCAN:
238,130✔
2093
    default: {
2094
      if (pIdx->inUseIdx < 0) {
238,130✔
2095
        SSourceDataInfo dataInfo = {0};
235,904✔
2096
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
235,904✔
2097
        dataInfo.taskId = pExchangeInfo->pTaskId;
235,904✔
2098
        dataInfo.index = pIdx->srcIdx;
235,904✔
2099
        dataInfo.groupid = 0;
235,904✔
2100
        dataInfo.tagList = NULL;
235,904✔
2101

2102
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
235,904✔
2103
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
235,904✔
2104

2105
        dataInfo.isNewParam = false;
235,904✔
2106
        dataInfo.type = pBasicParam->type;
235,904✔
2107
        dataInfo.srcOpType = pBasicParam->srcOpType;
235,904✔
2108
        dataInfo.tableSeq = pBasicParam->tableSeq;
235,904✔
2109

2110
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
471,808✔
2111

2112
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
235,904✔
2113
      } else {
2114
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,226✔
2115
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,226✔
2116
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,226✔
2117
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,226✔
2118
        }
2119

2120
        pDataInfo->tagList = NULL;
2,226✔
2121
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,226✔
2122
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,226✔
2123

2124
        pDataInfo->groupid = 0;
2,226✔
2125
        pDataInfo->isNewParam = false;
2,226✔
2126
        pDataInfo->type = pBasicParam->type;
2,226✔
2127
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,226✔
2128
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,226✔
2129
      }
2130
      break;
238,130✔
2131
    }
2132
  }
2133

2134
  return code;
34,755,690✔
2135
_return:
221✔
2136
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
221✔
2137
  return code;
×
2138
}
2139

2140
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
34,058,940✔
2141
  SExchangeInfo*               pExchangeInfo = pOperator->info;
34,058,940✔
2142
  int32_t                      code = TSDB_CODE_SUCCESS;
34,058,940✔
2143
  SExchangeOperatorBasicParam* pBasicParam = NULL;
34,058,940✔
2144
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
34,058,940✔
2145
  if (pParam->multiParams) {
34,058,940✔
2146
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
4,745,377✔
2147
    int32_t                      iter = 0;
4,745,377✔
2148
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
10,187,725✔
2149
      code = addSingleExchangeSource(pOperator, pBasicParam);
5,442,348✔
2150
      if (code) {
5,442,348✔
2151
        return code;
×
2152
      }
2153
    }
2154
  } else {
2155
    pBasicParam = &pParam->basic;
29,313,563✔
2156
    code = addSingleExchangeSource(pOperator, pBasicParam);
29,313,563✔
2157
  }
2158

2159
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
34,058,940✔
2160
  pOperator->pOperatorGetParam = NULL;
34,058,940✔
2161

2162
  return code;
34,058,940✔
2163
}
2164

2165
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
593,369,275✔
2166
  SExchangeInfo* pExchangeInfo = pOperator->info;
593,369,275✔
2167
  int32_t        code = TSDB_CODE_SUCCESS;
593,377,048✔
2168
  int32_t        lino = 0;
593,377,048✔
2169
  
2170
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
593,377,048✔
2171
       NULL == pOperator->pOperatorGetParam) ||
449,336,783✔
2172
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
148,043,663✔
2173
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
452,305,731✔
2174
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2175
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2176
    return TSDB_CODE_SUCCESS;
452,303,648✔
2177
  }
2178

2179
  if (pExchangeInfo->dynamicOp) {
141,066,760✔
2180
    code = addDynamicExchangeSource(pOperator);
34,058,940✔
2181
    QUERY_CHECK_CODE(code, lino, _end);
34,058,940✔
2182
  }
2183

2184
  if (pOperator->status == OP_NOT_OPENED &&
141,066,096✔
2185
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
130,584,335✔
2186
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
115,363,554✔
2187
    pExchangeInfo->current = 0;
31,525,591✔
2188
  }
2189

2190
  if (NULL != pOperator->pOperatorGetParam) {
141,063,031✔
2191
    storeNotifyInfo(pOperator);
4,003,842✔
2192
    /**
2193
      The param is referenced by getParam, and it will be freed by
2194
      the parent operator after getting next block.
2195
    */
2196
    pOperator->pOperatorGetParam->reUse = false;
4,003,842✔
2197
    pOperator->pOperatorGetParam = NULL;
4,003,842✔
2198
  }
2199

2200
  int64_t st = taosGetTimestampUs();
141,066,569✔
2201

2202
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
141,066,569✔
2203
    code = prepareConcurrentlyLoad(pOperator);
106,708,430✔
2204
    QUERY_CHECK_CODE(code, lino, _end);
106,715,039✔
2205
    pExchangeInfo->openedTs = taosGetTimestampUs();
106,714,152✔
2206
  }
2207

2208
  OPTR_SET_OPENED(pOperator);
141,074,167✔
2209
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
141,070,911✔
2210

2211
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
141,068,952✔
2212

2213
_end:
51,953,770✔
2214
  if (code != TSDB_CODE_SUCCESS) {
141,068,505✔
2215
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2216
    pOperator->pTaskInfo->code = code;
×
2217
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2218
  }
2219
  return code;
141,068,505✔
2220
}
2221

2222
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
3,479,753✔
2223
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,479,753✔
2224

2225
  if (pLimitInfo->remainGroupOffset > 0) {
3,479,753✔
2226
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2227
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2228
      blockDataCleanup(pBlock);
×
2229
      return PROJECT_RETRIEVE_CONTINUE;
×
2230
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2231
      // now it is the data from a new group
2232
      pLimitInfo->remainGroupOffset -= 1;
×
2233

2234
      // ignore data block in current group
2235
      if (pLimitInfo->remainGroupOffset > 0) {
×
2236
        blockDataCleanup(pBlock);
×
2237
        return PROJECT_RETRIEVE_CONTINUE;
×
2238
      }
2239
    }
2240

2241
    // set current group id of the project operator
2242
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2243
  }
2244

2245
  // here check for a new group data, we need to handle the data of the previous group.
2246
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
3,479,753✔
2247
    pLimitInfo->numOfOutputGroups += 1;
183,164✔
2248
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
183,164✔
2249
      pOperator->status = OP_EXEC_DONE;
×
2250
      blockDataCleanup(pBlock);
×
2251

2252
      return PROJECT_RETRIEVE_DONE;
×
2253
    }
2254

2255
    // reset the value for a new group data
2256
    resetLimitInfoForNextGroup(pLimitInfo);
183,164✔
2257
    // existing rows that belongs to previous group.
2258
    if (pBlock->info.rows > 0) {
183,164✔
2259
      return PROJECT_RETRIEVE_DONE;
183,164✔
2260
    }
2261
  }
2262

2263
  // here we reach the start position, according to the limit/offset requirements.
2264

2265
  // set current group id
2266
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,296,589✔
2267

2268
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,296,589✔
2269
  if (pBlock->info.rows == 0) {
3,296,589✔
2270
    return PROJECT_RETRIEVE_CONTINUE;
1,636,553✔
2271
  } else {
2272
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,660,036✔
2273
      setOperatorCompleted(pOperator);
×
2274
      return PROJECT_RETRIEVE_DONE;
×
2275
    }
2276
  }
2277

2278
  // todo optimize performance
2279
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2280
  // they may not belong to the same group the limit/offset value is not valid in this case.
2281
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,660,036✔
2282
    return PROJECT_RETRIEVE_DONE;
1,660,036✔
2283
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2284
    return PROJECT_RETRIEVE_CONTINUE;
×
2285
  }
2286
}
2287

2288
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
215,937,128✔
2289
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
215,937,128✔
2290
  int32_t        code = TSDB_CODE_SUCCESS;
215,939,217✔
2291
  if (pTask->pWorkerCb) {
215,939,217✔
2292
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
215,938,315✔
2293
    if (code != TSDB_CODE_SUCCESS) {
215,940,873✔
2294
      pTask->code = code;
×
2295
      return pTask->code;
×
2296
    }
2297
  }
2298

2299
  code = tsem_wait(&pExchangeInfo->ready);
215,940,349✔
2300
  if (code != TSDB_CODE_SUCCESS) {
215,939,808✔
2301
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2302
    pTask->code = code;
×
2303
    return pTask->code;
×
2304
  }
2305

2306
  if (pTask->pWorkerCb) {
215,939,808✔
2307
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
215,940,745✔
2308
    if (code != TSDB_CODE_SUCCESS) {
215,940,722✔
2309
      pTask->code = code;
×
2310
      return pTask->code;
×
2311
    }
2312
  }
2313
  return TSDB_CODE_SUCCESS;
215,940,722✔
2314
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc