• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4894

22 Dec 2025 09:33AM UTC coverage: 65.72% (+0.2%) from 65.57%
#4894

push

travis-ci

web-flow
Update README.md (#34007)

* Update README.md

* docs: update table of contents and improve installation instructions in README

* docs: adjust words

---------

Signed-off-by: WANG Xu <feici02@outlook.com>
Co-authored-by: WANG Xu <feici02@outlook.com>

184394 of 280577 relevant lines covered (65.72%)

111859687.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.38
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t            index;
36
  int64_t            seqId;
37
  SRWLatch           lock;
38
  SRetrieveTableRsp* pRsp;
39
  uint64_t           totalRows;
40
  int64_t            startTime;
41
  int32_t            code;
42
  EX_SOURCE_STATUS   status;
43
  const char*        taskId;
44
  SArray*            pSrcUidList;
45
  int32_t            srcOpType;
46
  bool               tableSeq;
47
  char*              decompBuf;
48
  int32_t            decompBufSize;
49
  SOrgTbInfo*        colMap;
50
  bool               isVtbRefScan;
51
  bool               isVtbTagScan;
52
  bool               isVtbWinScan;
53
  bool               isNewParam;
54
  STimeWindow        window;
55
  bool               fetchSent; // need reset
56
} SSourceDataInfo;
57

58
static void destroyExchangeOperatorInfo(void* param);
59
static void freeBlock(void* pParam);
60
static void freeSourceDataInfo(void* param);
61
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
62

63
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
64
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
65
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
66
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
67
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
68
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
69
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
70
                                 bool holdDataInBuf);
71
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
72

73
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
74

75

76
static void streamConcurrentlyLoadRemoteData(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
14,918,786✔
77
                                           SExecTaskInfo* pTaskInfo) {
78
  int32_t code = 0;
14,918,786✔
79
  int32_t lino = 0;
14,918,786✔
80
  int64_t startTs = taosGetTimestampUs();  
14,918,377✔
81
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
14,918,377✔
82
  int32_t completed = 0;
14,918,377✔
83
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
14,918,377✔
84
  if (code != TSDB_CODE_SUCCESS) {
14,918,740✔
85
    pTaskInfo->code = code;
×
86
    T_LONG_JMP(pTaskInfo->env, code);
×
87
  }
88
  if (completed == totalSources) {
14,918,740✔
89
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
3,090,309✔
90
    setAllSourcesCompleted(pOperator);
3,090,309✔
91
    return;
3,094,195✔
92
  }
93

94
  SSourceDataInfo* pDataInfo = NULL;
11,828,431✔
95

96
  while (1) {
6,925,284✔
97
    if (pExchangeInfo->current < 0) {
18,753,715✔
98
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
134,691✔
99
      setAllSourcesCompleted(pOperator);
134,691✔
100
      return;
134,691✔
101
    }
102
    
103
    if (pExchangeInfo->current >= totalSources) {
18,619,433✔
104
      completed = 0;
8,364,643✔
105
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
8,364,643✔
106
      if (code != TSDB_CODE_SUCCESS) {
8,364,233✔
107
        pTaskInfo->code = code;
×
108
        T_LONG_JMP(pTaskInfo->env, code);
×
109
      }
110
      if (completed == totalSources) {
8,364,233✔
111
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
6,217,815✔
112
        setAllSourcesCompleted(pOperator);
6,217,815✔
113
        return;
6,217,815✔
114
      }
115
      
116
      pExchangeInfo->current = 0;
2,146,418✔
117
    }
118

119
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
12,401,208✔
120

121
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
12,401,208✔
122
    if (!pDataInfo) {
12,401,208✔
123
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
124
      pTaskInfo->code = terrno;
×
125
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
126
    }
127

128
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
12,401,208✔
129
      pExchangeInfo->current++;
2,000✔
130
      continue;
2,000✔
131
    }
132

133
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
12,399,208✔
134

135
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
12,399,208✔
136
    if (code != TSDB_CODE_SUCCESS) {
12,399,618✔
137
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
138
      pTaskInfo->code = code;
×
139
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
140
    }
141

142
    while (true) {
3,086✔
143
      code = exchangeWait(pOperator, pExchangeInfo);
12,402,704✔
144
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
12,400,536✔
145
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,361✔
146
      }
147

148
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
12,399,176✔
149
      if (pDataInfo->seqId != currSeqId) {
12,399,176✔
150
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
3,086✔
151
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
152
        taosMemoryFreeClear(pDataInfo->pRsp);
3,086✔
153
        continue;
3,086✔
154
      }
155

156
      break;
12,395,681✔
157
    }
158

159
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
12,395,681✔
160
    if (!pSource) {
12,396,090✔
161
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
162
      pTaskInfo->code = terrno;
×
163
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
164
    }
165

166
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
12,396,090✔
167
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
168
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
169
             tstrerror(pDataInfo->code));
170
      pTaskInfo->code = pDataInfo->code;
×
171
      T_LONG_JMP(pTaskInfo->env, code);
×
172
    }
173

174
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
12,396,090✔
175
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
12,396,090✔
176

177
    if (pRsp->numOfRows == 0) {
12,396,090✔
178
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
6,923,284✔
179
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
180
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
181
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
182

183
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
6,923,284✔
184
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
6,923,284✔
185
        pExchangeInfo->current = -1;
134,691✔
186
      } else {
187
        pExchangeInfo->current += 1;
6,788,593✔
188
      }
189
      taosMemoryFreeClear(pDataInfo->pRsp);
6,923,284✔
190
      continue;
6,923,284✔
191
    }
192

193
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
5,472,806✔
194
    TAOS_CHECK_EXIT(code);
5,472,806✔
195

196
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
5,472,806✔
197
    if (pRsp->completed == 1) {
5,472,806✔
198
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
3,242,655✔
199
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
200
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
201
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
202
             pExchangeInfo->current + 1, totalSources);
203

204
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
3,242,285✔
205
      if (pDataInfo->isVtbRefScan) {
3,242,285✔
206
        pExchangeInfo->current = -1;
×
207
        taosMemoryFreeClear(pDataInfo->pRsp);
×
208
        continue;
×
209
      }
210
    } else {
211
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
2,230,151✔
212
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
213
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
214
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
215
    }
216

217
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
5,472,436✔
218
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
5,472,806✔
219

220
    pExchangeInfo->current++;
5,472,806✔
221

222
    taosMemoryFreeClear(pDataInfo->pRsp);
5,472,806✔
223
    return;
5,472,806✔
224
  }
225

226
_exit:
×
227

228
  if (code) {
×
229
    pTaskInfo->code = code;
×
230
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
231
  }
232
}
233

234

235
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
135,843,286✔
236
                                           SExecTaskInfo* pTaskInfo) {
237
  int32_t code = 0;
135,843,286✔
238
  int32_t lino = 0;
135,843,286✔
239
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
135,843,286✔
240
  int32_t completed = 0;
135,843,286✔
241
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
135,843,286✔
242
  if (code != TSDB_CODE_SUCCESS) {
135,843,286✔
243
    pTaskInfo->code = code;
×
244
    T_LONG_JMP(pTaskInfo->env, code);
×
245
  }
246
  if (completed == totalSources) {
135,843,286✔
247
    setAllSourcesCompleted(pOperator);
43,324,848✔
248
    return;
43,327,844✔
249
  }
250

251
  SSourceDataInfo* pDataInfo = NULL;
92,518,438✔
252

253
  while (1) {
10,971,714✔
254
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
103,490,152✔
255
    code = exchangeWait(pOperator, pExchangeInfo);
103,490,152✔
256

257
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
103,490,152✔
258
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,353✔
259
    }
260

261
    for (int32_t i = 0; i < totalSources; ++i) {
146,216,048✔
262
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
146,216,002✔
263
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
146,216,203✔
264
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
146,216,203✔
265
        continue;
30,679,773✔
266
      }
267

268
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
115,536,430✔
269
        continue;
12,047,476✔
270
      }
271

272
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
103,488,754✔
273
      if (pDataInfo->seqId != currSeqId) {
103,488,154✔
274
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
275
        taosMemoryFreeClear(pDataInfo->pRsp);
×
276
        break;
×
277
      }
278

279
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
103,488,753✔
280
        code = pDataInfo->code;
1,914✔
281
        TAOS_CHECK_EXIT(code);
1,914✔
282
      }
283

284
      tmemory_barrier();
103,487,040✔
285
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
103,487,040✔
286
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
103,486,839✔
287
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
103,486,441✔
288

289
      // todo
290
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
103,486,441✔
291
      if (pRsp->numOfRows == 0) {
103,486,839✔
292
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
26,348,400✔
293
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
294
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
295
          if (code != TSDB_CODE_SUCCESS) {
×
296
            taosMemoryFreeClear(pDataInfo->pRsp);
×
297
            TAOS_CHECK_EXIT(code);
×
298
          }
299
        } else {
300
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
26,348,400✔
301
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
26,348,400✔
302
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
303
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
304
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
305
          taosMemoryFreeClear(pDataInfo->pRsp);
26,348,400✔
306
        }
307
        break;
26,348,400✔
308
      }
309

310
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
77,137,840✔
311

312
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
77,138,640✔
313
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
77,138,640✔
314
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
77,138,640✔
315

316
      if (pRsp->completed == 1) {
77,138,640✔
317
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
70,831,636✔
318
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
70,831,636✔
319
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
320
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
321
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
322
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
323
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
324
      } else {
325
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
6,307,004✔
326
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
327
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
328
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
329
      }
330

331
      taosMemoryFreeClear(pDataInfo->pRsp);
77,138,795✔
332

333
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan && !pDataInfo->isVtbTagScan) {
77,137,442✔
334
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
6,307,004✔
335
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
6,307,004✔
336
        if (code != TSDB_CODE_SUCCESS) {
6,307,004✔
337
          taosMemoryFreeClear(pDataInfo->pRsp);
×
338
          TAOS_CHECK_EXIT(code);
×
339
        }
340
      }
341
      
342
      return;
77,138,776✔
343
    }  // end loop
344

345
    int32_t complete1 = 0;
26,348,446✔
346
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
26,348,400✔
347
    if (code != TSDB_CODE_SUCCESS) {
26,348,400✔
348
      pTaskInfo->code = code;
×
349
      T_LONG_JMP(pTaskInfo->env, code);
×
350
    }
351
    if (complete1 == totalSources) {
26,348,400✔
352
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
15,376,686✔
353
      return;
15,376,686✔
354
    }
355
  }
356

357
_exit:
1,914✔
358

359
  if (code) {
1,914✔
360
    pTaskInfo->code = code;
1,914✔
361
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
1,914✔
362
  }
363
}
364

365
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
380,848,481✔
366
  int32_t        code = TSDB_CODE_SUCCESS;
380,848,481✔
367
  SExchangeInfo* pExchangeInfo = pOperator->info;
380,848,481✔
368
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
380,848,481✔
369

370
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
380,848,072✔
371

372
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
380,848,683✔
373
  if (pOperator->status == OP_EXEC_DONE) {
380,848,683✔
374
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
375
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
376
           pLoadInfo->totalElapsed / 1000.0);
377
    return NULL;
×
378
  }
379

380
  // we have buffered retrieved datablock, return it directly
381
  SSDataBlock* p = NULL;
380,848,891✔
382
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
380,848,683✔
383
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
222,378,023✔
384
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
222,378,023✔
385
  }
386

387
  if (p != NULL) {
380,848,482✔
388
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
222,377,822✔
389
    if (!tmp) {
222,378,231✔
390
      code = terrno;
×
391
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
392
      pTaskInfo->code = code;
×
393
      T_LONG_JMP(pTaskInfo->env, code);
×
394
    }
395
    return p;
222,378,231✔
396
  } else {
397
    if (pExchangeInfo->seqLoadData) {
158,470,660✔
398
      code = seqLoadRemoteData(pOperator);
7,708,433✔
399
      if (code != TSDB_CODE_SUCCESS) {
7,708,433✔
400
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
692✔
401
        pTaskInfo->code = code;
692✔
402
        T_LONG_JMP(pTaskInfo->env, code);
692✔
403
      }
404
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
150,762,435✔
405
      streamConcurrentlyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
14,919,149✔
406
    } else {
407
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
135,843,286✔
408
    }
409
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
158,465,450✔
410
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
1,914✔
411
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
1,914✔
412
    }
413
    
414
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
158,463,536✔
415
      qDebug("empty resultBlockList");
70,247,736✔
416
      return NULL;
70,247,736✔
417
    } else {
418
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
88,215,800✔
419
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
88,214,735✔
420
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
88,214,735✔
421
      if (!tmp) {
88,215,201✔
422
        code = terrno;
×
423
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
424
        pTaskInfo->code = code;
×
425
        T_LONG_JMP(pTaskInfo->env, code);
×
426
      }
427

428
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
88,215,201✔
429
      return p;
88,215,800✔
430
    }
431
}
432
}
433

434
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
379,689,423✔
435
  int32_t        code = TSDB_CODE_SUCCESS;
379,689,423✔
436
  int32_t        lino = 0;
379,689,423✔
437
  SExchangeInfo* pExchangeInfo = pOperator->info;
379,689,423✔
438
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
379,690,395✔
439

440
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
379,690,033✔
441

442
  code = pOperator->fpSet._openFn(pOperator);
379,691,233✔
443
  QUERY_CHECK_CODE(code, lino, _end);
379,689,732✔
444

445
  if (pOperator->status == OP_EXEC_DONE) {
379,689,732✔
446
    (*ppRes) = NULL;
135,644✔
447
    return code;
135,644✔
448
  }
449

450
  while (1) {
1,293,530✔
451
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
380,847,618✔
452
    if (pBlock == NULL) {
380,841,330✔
453
      (*ppRes) = NULL;
70,247,736✔
454
      return code;
70,247,736✔
455
    }
456

457
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
310,593,594✔
458
    QUERY_CHECK_CODE(code, lino, _end);
310,593,623✔
459

460
    if (blockDataGetNumOfRows(pBlock) == 0) {
310,593,623✔
461
      qDebug("rows 0 block got, continue next load");
1,168✔
462
      continue;
1,168✔
463
    }
464

465
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
310,592,863✔
466
    if (hasLimitOffsetInfo(pLimitInfo)) {
310,592,863✔
467
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
2,511,843✔
468
      if (status == PROJECT_RETRIEVE_CONTINUE) {
2,511,843✔
469
        qDebug("limit retrieve continue");
1,292,362✔
470
        continue;
1,292,362✔
471
      } else if (status == PROJECT_RETRIEVE_DONE) {
1,219,481✔
472
        if (pBlock->info.rows == 0) {
1,219,481✔
473
          setOperatorCompleted(pOperator);
×
474
          (*ppRes) = NULL;
×
475
          return code;
×
476
        } else {
477
          (*ppRes) = pBlock;
1,219,481✔
478
          return code;
1,219,481✔
479
        }
480
      }
481
    } else {
482
      (*ppRes) = pBlock;
308,080,819✔
483
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
308,080,411✔
484
      return code;
308,080,819✔
485
    }
486
  }
487

488
_end:
×
489

490
  if (code != TSDB_CODE_SUCCESS) {
×
491
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
492
    pTaskInfo->code = code;
×
493
    T_LONG_JMP(pTaskInfo->env, code);
×
494
  } else {
495
    qDebug("empty block returned in exchange");
×
496
  }
497
  
498
  (*ppRes) = NULL;
×
499
  return code;
×
500
}
501

502
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
65,363,074✔
503
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
65,363,074✔
504
  if (pInfo->pSourceDataInfo == NULL) {
65,363,453✔
505
    return terrno;
×
506
  }
507

508
  if (pInfo->dynamicOp) {
65,363,654✔
509
    return TSDB_CODE_SUCCESS;
1,930,182✔
510
  }
511

512
  int32_t len = strlen(id) + 1;
63,433,472✔
513
  pInfo->pTaskId = taosMemoryCalloc(1, len);
63,433,472✔
514
  if (!pInfo->pTaskId) {
63,433,343✔
515
    return terrno;
×
516
  }
517
  tstrncpy(pInfo->pTaskId, id, len);
63,432,127✔
518
  for (int32_t i = 0; i < numOfSources; ++i) {
163,023,484✔
519
    SSourceDataInfo dataInfo = {0};
99,590,389✔
520
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
99,589,935✔
521
    dataInfo.taskId = pInfo->pTaskId;
99,589,935✔
522
    dataInfo.index = i;
99,591,957✔
523
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
99,591,957✔
524
    if (pDs == NULL) {
99,592,033✔
525
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
526
      return terrno;
×
527
    }
528
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
99,592,033✔
529
  }
530

531
  return TSDB_CODE_SUCCESS;
63,433,095✔
532
}
533

534
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
65,363,540✔
535
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
65,363,540✔
536

537
  if (numOfSources == 0) {
65,362,475✔
538
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
539
    return TSDB_CODE_INVALID_PARA;
×
540
  }
541
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
65,362,475✔
542
  if (!pInfo->pFetchRpcHandles) {
65,363,090✔
543
    return terrno;
×
544
  }
545
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
65,363,839✔
546
  if (!ret) {
65,362,415✔
547
    return terrno;
×
548
  }
549

550
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
65,362,415✔
551
  if (pInfo->pSources == NULL) {
65,362,711✔
552
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
553
    return terrno;
×
554
  }
555

556
  if (pExNode->node.dynamicOp) {
65,362,575✔
557
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,930,182✔
558
    if (NULL == pInfo->pHashSources) {
1,930,182✔
559
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
560
      return terrno;
×
561
    }
562
  }
563

564
  for (int32_t i = 0; i < numOfSources; ++i) {
168,853,437✔
565
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
103,490,185✔
566
    if (!pNode) {
103,489,119✔
567
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
568
      return terrno;
×
569
    }
570
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
103,489,119✔
571
    if (!tmp) {
103,489,718✔
572
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
573
      return terrno;
×
574
    }
575
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
103,489,718✔
576
    int32_t           code =
577
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
103,489,984✔
578
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
103,488,665✔
579
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
580
      return code;
×
581
    }
582
  }
583

584
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
65,363,252✔
585
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
65,362,490✔
586
  if (refId < 0) {
65,362,153✔
587
    int32_t code = terrno;
×
588
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
589
    return code;
×
590
  } else {
591
    pInfo->self = refId;
65,362,153✔
592
  }
593

594
  return initDataSource(numOfSources, pInfo, id);
65,362,153✔
595
}
596

597
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
12,420,700✔
598
  SExchangeInfo* pInfo = pOper->info;
12,420,700✔
599
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
12,421,580✔
600

601
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
12,422,019✔
602

603
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
12,423,341✔
604
  pOper->status = OP_NOT_OPENED;
12,422,459✔
605
  pInfo->current = 0;
12,422,459✔
606
  pInfo->loadInfo.totalElapsed = 0;
12,422,459✔
607
  pInfo->loadInfo.totalRows = 0;
12,422,459✔
608
  pInfo->loadInfo.totalSize = 0;
12,422,459✔
609
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
30,605,249✔
610
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
18,182,790✔
611
    taosWLockLatch(&pDataInfo->lock);
18,182,382✔
612
    taosMemoryFreeClear(pDataInfo->decompBuf);
18,182,790✔
613
    taosMemoryFreeClear(pDataInfo->pRsp);
18,182,790✔
614

615
    pDataInfo->totalRows = 0;
18,182,380✔
616
    pDataInfo->code = 0;
18,182,380✔
617
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
18,182,380✔
618
    pDataInfo->fetchSent = false;
18,182,380✔
619
    taosWUnLockLatch(&pDataInfo->lock);
18,182,790✔
620
  }
621

622
  if (pInfo->dynamicOp) {
12,422,459✔
623
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,327,307✔
624
  } 
625

626
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
12,422,459✔
627
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
12,422,459✔
628

629
  blockDataCleanup(pInfo->pDummyBlock);
12,422,459✔
630

631
  void   *data = NULL;
12,422,017✔
632
  int32_t iter = 0;
12,422,017✔
633
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
14,941,080✔
634
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
2,519,063✔
635
  }
636
  
637
  pInfo->limitInfo = (SLimitInfo){0};
12,422,049✔
638
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
12,422,049✔
639

640
  return 0;
12,422,459✔
641
}
642

643
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
65,363,528✔
644
                                   SOperatorInfo** pOptrInfo) {
645
  QRY_PARAM_CHECK(pOptrInfo);
65,363,528✔
646

647
  int32_t        code = 0;
65,363,528✔
648
  int32_t        lino = 0;
65,363,528✔
649
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
65,363,528✔
650
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
65,363,195✔
651
  if (pInfo == NULL || pOperator == NULL) {
65,362,192✔
652
    code = terrno;
×
653
    goto _error;
×
654
  }
655

656
  pOperator->pPhyNode = pExNode;
65,362,192✔
657
  pInfo->dynamicOp = pExNode->node.dynamicOp;
65,362,192✔
658
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
65,361,888✔
659
  QUERY_CHECK_CODE(code, lino, _error);
65,363,277✔
660

661
  code = tsem_init(&pInfo->ready, 0, 0);
65,363,277✔
662
  QUERY_CHECK_CODE(code, lino, _error);
65,363,395✔
663

664
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
65,363,395✔
665
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
65,363,395✔
666

667
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
65,363,793✔
668
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
65,363,076✔
669
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
65,363,076✔
670
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
65,363,474✔
671

672
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
65,363,474✔
673
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
65,363,474✔
674
  QUERY_CHECK_CODE(code, lino, _error);
65,363,994✔
675

676
  pInfo->seqLoadData = pExNode->seqRecvData;
65,363,994✔
677
  pInfo->dynTbname = pExNode->dynTbname;
65,363,793✔
678
  if (pInfo->dynTbname) {
65,363,643✔
679
    pInfo->seqLoadData = true;
14,106✔
680
  }
681
  pInfo->pTransporter = pTransporter;
65,363,025✔
682

683
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
65,363,370✔
684
                  pTaskInfo);
685
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
65,362,717✔
686

687
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
65,363,518✔
688
                            pTaskInfo->pStreamRuntimeInfo);
65,362,458✔
689
  QUERY_CHECK_CODE(code, lino, _error);
65,363,083✔
690
  qTrace("%s exchange op:%p", __func__, pOperator);
65,363,083✔
691
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
65,363,083✔
692
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
693
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
65,363,284✔
694
  *pOptrInfo = pOperator;
65,363,675✔
695
  return TSDB_CODE_SUCCESS;
65,363,375✔
696

697
_error:
×
698
  if (code != TSDB_CODE_SUCCESS) {
×
699
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
700
    pTaskInfo->code = code;
×
701
  }
702
  if (pInfo != NULL) {
×
703
    doDestroyExchangeOperatorInfo(pInfo);
×
704
  }
705

706
  if (pOperator != NULL) {
×
707
    pOperator->info = NULL;
×
708
    destroyOperator(pOperator);
×
709
  }
710
  return code;
×
711
}
712

713
void destroyExchangeOperatorInfo(void* param) {
65,363,994✔
714
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
65,363,994✔
715
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
65,363,994✔
716
  if (code != TSDB_CODE_SUCCESS) {
65,363,994✔
717
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
718
  }
719
}
65,363,994✔
720

721
void freeBlock(void* pParam) {
157,523,817✔
722
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
157,523,817✔
723
  blockDataDestroy(pBlock);
157,523,817✔
724
}
157,523,817✔
725

726
void freeSourceDataInfo(void* p) {
100,400,190✔
727
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
100,400,190✔
728
  taosMemoryFreeClear(pInfo->decompBuf);
100,400,190✔
729
  taosMemoryFreeClear(pInfo->pRsp);
100,400,644✔
730

731
  pInfo->decompBufSize = 0;
100,399,523✔
732
}
100,399,724✔
733

734
void doDestroyExchangeOperatorInfo(void* param) {
65,363,994✔
735
  if (param == NULL) {
65,363,994✔
736
    return;
×
737
  }
738
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
65,363,994✔
739
  if (pExInfo->pFetchRpcHandles) {
65,363,994✔
740
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
168,854,027✔
741
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
103,490,437✔
742
      if (*pRpcHandle > 0) {
103,490,437✔
743
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
4,632,076✔
744
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
4,632,076✔
745
      }
746
    }
747
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
65,363,792✔
748
  }
749

750
  taosArrayDestroy(pExInfo->pSources);
65,363,637✔
751
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
65,363,793✔
752

753
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
65,363,419✔
754
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
65,363,338✔
755

756
  blockDataDestroy(pExInfo->pDummyBlock);
65,363,994✔
757
  tSimpleHashCleanup(pExInfo->pHashSources);
65,363,994✔
758

759
  int32_t code = tsem_destroy(&pExInfo->ready);
65,363,994✔
760
  if (code != TSDB_CODE_SUCCESS) {
65,362,931✔
761
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
762
  }
763
  taosMemoryFreeClear(pExInfo->pTaskId);
65,362,931✔
764

765
  taosMemoryFreeClear(param);
65,363,540✔
766
}
767

768
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
123,895,687✔
769
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
123,895,687✔
770

771
  taosMemoryFreeClear(pMsg->pEpSet);
123,895,687✔
772
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
123,915,831✔
773
  if (pExchangeInfo == NULL) {
123,927,254✔
774
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
3,999✔
775
    taosMemoryFree(pMsg->pData);
3,999✔
776
    return TSDB_CODE_SUCCESS;
3,999✔
777
  }
778

779
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
123,923,255✔
780
  if (pWrapper->seqId != currSeqId) {
123,923,355✔
781
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
782
    taosMemoryFree(pMsg->pData);
×
783
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
784
    if (code != TSDB_CODE_SUCCESS) {
×
785
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
786
    }
787
    return TSDB_CODE_SUCCESS;
×
788
  }
789

790
  int32_t          index = pWrapper->sourceIndex;
123,901,400✔
791

792
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
123,903,849✔
793

794
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
123,905,326✔
795
  if (pRpcHandle != NULL) {
123,912,286✔
796
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
123,897,609✔
797
    if (ret != 0) {
123,909,527✔
798
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
5,278,133✔
799
    }
800
    *pRpcHandle = -1;
123,909,527✔
801
  }
802

803
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
123,907,443✔
804
  if (!pSourceDataInfo) {
123,914,498✔
805
    return terrno;
×
806
  }
807

808
  if (0 == code && NULL == pMsg->pData) {
123,914,498✔
809
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
810
    code = TSDB_CODE_QRY_INVALID_MSG;
×
811
  }
812

813
  taosWLockLatch(&pSourceDataInfo->lock);
123,919,332✔
814
  if (code == TSDB_CODE_SUCCESS) {
123,924,368✔
815
    pSourceDataInfo->seqId = pWrapper->seqId;
123,917,719✔
816
    pSourceDataInfo->pRsp = pMsg->pData;
123,895,851✔
817

818
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
123,882,352✔
819
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
123,876,047✔
820
    pRsp->compLen = htonl(pRsp->compLen);
123,897,526✔
821
    pRsp->payloadLen = htonl(pRsp->payloadLen);
123,897,487✔
822
    pRsp->numOfCols = htonl(pRsp->numOfCols);
123,869,398✔
823
    pRsp->useconds = htobe64(pRsp->useconds);
123,872,443✔
824
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
123,864,297✔
825

826
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
123,864,402✔
827
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
828
  } else {
829
    taosMemoryFree(pMsg->pData);
6,649✔
830
    pSourceDataInfo->code = rpcCvtErrCode(code);
6,649✔
831
    if (pSourceDataInfo->code != code) {
6,649✔
832
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
833
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
834
    } else {
835
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
6,649✔
836
             pExchangeInfo);
837
    }
838
  }
839

840
  tmemory_barrier();
123,871,504✔
841
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
123,871,504✔
842
  taosWUnLockLatch(&pSourceDataInfo->lock);
123,879,017✔
843
  
844
  code = tsem_post(&pExchangeInfo->ready);
123,888,546✔
845
  if (code != TSDB_CODE_SUCCESS) {
123,915,934✔
846
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
847
    return code;
×
848
  }
849

850
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
123,915,934✔
851
  if (code != TSDB_CODE_SUCCESS) {
123,924,935✔
852
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
853
  }
854
  return code;
123,924,935✔
855
}
856

857
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
293,446✔
858
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
293,446✔
859
  if (NULL == *ppRes) {
293,446✔
860
    return terrno;
×
861
  }
862

863
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
293,446✔
864
  if (NULL == pScan) {
293,446✔
865
    taosMemoryFreeClear(*ppRes);
×
866
    return terrno;
×
867
  }
868

869
  pScan->pUidList = taosArrayDup(pUidList, NULL);
293,446✔
870
  if (NULL == pScan->pUidList) {
293,446✔
871
    taosMemoryFree(pScan);
×
872
    taosMemoryFreeClear(*ppRes);
×
873
    return terrno;
×
874
  }
875
  pScan->tableSeq = tableSeq;
293,446✔
876
  pScan->pOrgTbInfo = NULL;
293,446✔
877
  pScan->isNewParam = false;
293,446✔
878
  pScan->window.skey = INT64_MAX;
293,446✔
879
  pScan->window.ekey = INT64_MIN;
293,446✔
880

881
  (*ppRes)->opType = srcOpType;
293,446✔
882
  (*ppRes)->downstreamIdx = 0;
293,446✔
883
  (*ppRes)->value = pScan;
293,446✔
884
  (*ppRes)->pChildren = NULL;
293,446✔
885
  (*ppRes)->reUse = false;
293,446✔
886

887
  return TSDB_CODE_SUCCESS;
293,446✔
888
}
889

890
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam) {
6,703,764✔
891
  int32_t                  code = TSDB_CODE_SUCCESS;
6,703,764✔
892
  int32_t                  lino = 0;
6,703,764✔
893
  STableScanOperatorParam* pScan = NULL;
6,703,764✔
894

895
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
6,703,764✔
896
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
6,703,764✔
897

898
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
6,703,764✔
899
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
6,703,764✔
900

901
  if (pUidList) {
6,703,764✔
902
    pScan->pUidList = taosArrayDup(pUidList, NULL);
6,703,764✔
903
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
6,703,764✔
904
  } else {
905
    pScan->pUidList = NULL;
×
906
  }
907

908
  if (pMap) {
6,703,764✔
909
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
6,703,764✔
910
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
6,703,764✔
911

912
    pScan->pOrgTbInfo->vgId = pMap->vgId;
6,703,764✔
913
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
6,703,764✔
914

915
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
6,703,764✔
916
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
6,703,764✔
917
  } else {
918
    pScan->pOrgTbInfo = NULL;
×
919
  }
920

921

922
  pScan->tableSeq = tableSeq;
6,703,764✔
923
  pScan->window.skey = window->skey;
6,703,764✔
924
  pScan->window.ekey = window->ekey;
6,703,764✔
925
  pScan->isNewParam = isNewParam;
6,703,764✔
926
  (*ppRes)->opType = srcOpType;
6,703,764✔
927
  (*ppRes)->downstreamIdx = 0;
6,703,764✔
928
  (*ppRes)->value = pScan;
6,703,764✔
929
  (*ppRes)->pChildren = NULL;
6,703,764✔
930
  (*ppRes)->reUse = false;
6,703,764✔
931

932
  return code;
6,703,764✔
933
_return:
×
934
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
935
  taosMemoryFreeClear(*ppRes);
×
936
  if (pScan) {
×
937
    taosArrayDestroy(pScan->pUidList);
×
938
    if (pScan->pOrgTbInfo) {
×
939
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
940
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
941
    }
942
    taosMemoryFree(pScan);
×
943
  }
944
  return code;
×
945
}
946

947
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
944,638✔
948
  int32_t                  code = TSDB_CODE_SUCCESS;
944,638✔
949
  int32_t                  lino = 0;
944,638✔
950
  STagScanOperatorParam*   pScan = NULL;
944,638✔
951

952
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
944,638✔
953
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
944,638✔
954

955
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
944,638✔
956
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
944,638✔
957
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
944,638✔
958

959
  (*ppRes)->opType = srcOpType;
944,638✔
960
  (*ppRes)->downstreamIdx = 0;
944,638✔
961
  (*ppRes)->value = pScan;
944,638✔
962
  (*ppRes)->pChildren = NULL;
944,638✔
963
  (*ppRes)->reUse = false;
944,638✔
964

965
  return code;
944,638✔
966
_return:
×
967
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
968
  taosMemoryFreeClear(*ppRes);
×
969
  if (pScan) {
×
970
    taosMemoryFree(pScan);
×
971
  }
972
  return code;
×
973
}
974

975
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
6,350,884✔
976
  if (!pRuntimeInfo || !pTimeRange) {
6,350,884✔
977
    return TSDB_CODE_INTERNAL_ERROR;
×
978
  }
979

980
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
6,350,884✔
981
  if (!pParam) {
6,350,884✔
982
    return TSDB_CODE_INTERNAL_ERROR;
×
983
  }
984

985
  switch (pRuntimeInfo->triggerType) {
6,350,884✔
986
    case STREAM_TRIGGER_SLIDING:
5,135,486✔
987
      // Unable to distinguish whether there is an interval, all use wstart/wend
988
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
989
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
5,135,486✔
990
      pTimeRange->ekey = pParam->wend;    // is equal to wend
5,135,486✔
991
      break;
5,135,486✔
992
    case STREAM_TRIGGER_PERIOD:
236,002✔
993
      pTimeRange->skey = pParam->prevLocalTime;
236,002✔
994
      pTimeRange->ekey = pParam->triggerTime;
236,002✔
995
      break;
236,002✔
996
    default:
979,396✔
997
      pTimeRange->skey = pParam->wstart;
979,396✔
998
      pTimeRange->ekey = pParam->wend;
979,396✔
999
      break;
979,396✔
1000
  }
1001

1002
  return TSDB_CODE_SUCCESS;
6,350,884✔
1003
}
1004

1005
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
123,942,648✔
1006
  int32_t          code = TSDB_CODE_SUCCESS;
123,942,648✔
1007
  int32_t          lino = 0;
123,942,648✔
1008
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
123,942,648✔
1009
  if (!pDataInfo) {
123,942,636✔
1010
    return terrno;
×
1011
  }
1012

1013
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
123,942,636✔
1014
    return TSDB_CODE_SUCCESS;
×
1015
  }
1016

1017
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
123,942,565✔
1018
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
123,942,491✔
1019
  if (!pSource) {
123,941,652✔
1020
    return terrno;
×
1021
  }
1022

1023
  pDataInfo->startTime = taosGetTimestampUs();
123,940,901✔
1024
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
123,941,174✔
1025

1026
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
123,941,702✔
1027
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
123,942,397✔
1028
  pWrapper->exchangeId = pExchangeInfo->self;
123,942,397✔
1029
  pWrapper->sourceIndex = sourceIndex;
123,942,452✔
1030
  pWrapper->seqId = pExchangeInfo->seqId;
123,942,452✔
1031

1032
  if (pSource->localExec) {
123,941,177✔
1033
    SDataBuf pBuf = {0};
×
1034
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
×
1035
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
1036
                                               pTaskInfo->localFetch.explainRes);
1037
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1038
    taosMemoryFree(pWrapper);
×
1039
    QUERY_CHECK_CODE(code, lino, _end);
×
1040
  } else {
1041
    bool needStreamPesudoFuncVals = true;
123,941,014✔
1042
    SResFetchReq req = {0};
123,941,014✔
1043
    req.header.vgId = pSource->addr.nodeId;
123,941,859✔
1044
    req.sId = pSource->sId;
123,941,284✔
1045
    req.clientId = pSource->clientId;
123,942,111✔
1046
    req.taskId = pSource->taskId;
123,941,859✔
1047
    req.queryId = pTaskInfo->id.queryId;
123,940,219✔
1048
    req.execId = pSource->execId;
123,941,536✔
1049
    if (pTaskInfo->pStreamRuntimeInfo) {
123,942,163✔
1050
      req.dynTbname = pExchangeInfo->dynTbname;
12,486,108✔
1051
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
12,486,108✔
1052
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
12,486,108✔
1053

1054
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
12,486,108✔
1055
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
39,420✔
1056
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
12,446,688✔
1057
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
6,350,884✔
1058
        QUERY_CHECK_CODE(code, lino, _end);
6,350,884✔
1059
        needStreamPesudoFuncVals = false;
6,350,884✔
1060
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
6,350,884✔
1061
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1062
               req.pStRtFuncInfo->curWindow.ekey);
1063
      }
1064
      if (!pDataInfo->fetchSent) {
12,486,108✔
1065
        req.reset = pDataInfo->fetchSent = true;
10,333,280✔
1066
      }
1067
    }
1068

1069
    if (pDataInfo->isVtbWinScan) {
123,941,341✔
1070
      if (pDataInfo->pSrcUidList) {
×
1071
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false);
×
1072
        taosArrayDestroy(pDataInfo->pSrcUidList);
×
1073
        pDataInfo->pSrcUidList = NULL;
×
1074
        if (TSDB_CODE_SUCCESS != code) {
×
1075
          pTaskInfo->code = code;
×
1076
          taosMemoryFree(pWrapper);
×
1077
          return pTaskInfo->code;
×
1078
        }
1079
      }
1080
    } else if (pDataInfo->isVtbRefScan) {
123,942,313✔
1081
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
6,703,764✔
1082
      taosArrayDestroy(pDataInfo->colMap->colMap);
6,703,764✔
1083
      taosMemoryFreeClear(pDataInfo->colMap);
6,703,764✔
1084
      taosArrayDestroy(pDataInfo->pSrcUidList);
6,703,764✔
1085
      pDataInfo->pSrcUidList = NULL;
6,703,764✔
1086
      if (TSDB_CODE_SUCCESS != code) {
6,703,764✔
1087
        pTaskInfo->code = code;
×
1088
        taosMemoryFree(pWrapper);
×
1089
        return pTaskInfo->code;
×
1090
      }
1091
    } else if (pDataInfo->isVtbTagScan) {
117,237,439✔
1092
      code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
944,638✔
1093
      taosArrayDestroy(pDataInfo->pSrcUidList);
944,638✔
1094
      pDataInfo->pSrcUidList = NULL;
944,638✔
1095
      if (TSDB_CODE_SUCCESS != code) {
944,638✔
1096
        pTaskInfo->code = code;
×
1097
        taosMemoryFree(pWrapper);
×
1098
        return pTaskInfo->code;
×
1099
      }
1100
    } else {
1101
      if (pDataInfo->pSrcUidList) {
116,292,727✔
1102
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
279,105✔
1103
        taosArrayDestroy(pDataInfo->pSrcUidList);
279,105✔
1104
        pDataInfo->pSrcUidList = NULL;
279,105✔
1105
        if (TSDB_CODE_SUCCESS != code) {
279,105✔
1106
          pTaskInfo->code = code;
×
1107
          taosMemoryFree(pWrapper);
×
1108
          return pTaskInfo->code;
×
1109
        }
1110
      }
1111
    }
1112

1113
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
123,941,904✔
1114
    if (msgSize < 0) {
123,939,443✔
1115
      pTaskInfo->code = msgSize;
×
1116
      taosMemoryFree(pWrapper);
×
1117
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1118
      return pTaskInfo->code;
×
1119
    }
1120

1121
    void* msg = taosMemoryCalloc(1, msgSize);
123,939,443✔
1122
    if (NULL == msg) {
123,939,993✔
1123
      pTaskInfo->code = terrno;
×
1124
      taosMemoryFree(pWrapper);
×
1125
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1126
      return pTaskInfo->code;
×
1127
    }
1128

1129
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
123,939,993✔
1130
    if (msgSize < 0) {
123,937,749✔
1131
      pTaskInfo->code = msgSize;
×
1132
      taosMemoryFree(pWrapper);
×
1133
      taosMemoryFree(msg);
×
1134
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1135
      return pTaskInfo->code;
×
1136
    }
1137

1138
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
123,937,749✔
1139

1140
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
123,937,771✔
1141
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1142
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1143
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1144

1145
    // send the fetch remote task result reques
1146
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
123,942,129✔
1147
    if (NULL == pMsgSendInfo) {
123,941,838✔
1148
      taosMemoryFreeClear(msg);
×
1149
      taosMemoryFree(pWrapper);
×
1150
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1151
      pTaskInfo->code = terrno;
×
1152
      return pTaskInfo->code;
×
1153
    }
1154

1155
    pMsgSendInfo->param = pWrapper;
123,941,838✔
1156
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
123,942,039✔
1157
    pMsgSendInfo->msgInfo.pData = msg;
123,942,681✔
1158
    pMsgSendInfo->msgInfo.len = msgSize;
123,942,123✔
1159
    pMsgSendInfo->msgType = pSource->fetchMsgType;
123,942,278✔
1160
    pMsgSendInfo->fp = loadRemoteDataCallback;
123,942,271✔
1161
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
123,942,479✔
1162

1163
    int64_t transporterId = 0;
123,942,889✔
1164
    void* poolHandle = NULL;
123,941,838✔
1165
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
123,941,838✔
1166
    QUERY_CHECK_CODE(code, lino, _end);
123,943,090✔
1167
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
123,943,090✔
1168
    *pRpcHandle = transporterId;
123,942,636✔
1169
  }
1170

1171
_end:
123,943,090✔
1172
  if (code != TSDB_CODE_SUCCESS) {
123,943,090✔
1173
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1174
  }
1175
  return code;
123,943,090✔
1176
}
1177

1178
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
93,291,507✔
1179
                          SOperatorInfo* pOperator) {
1180
  pInfo->totalRows += numOfRows;
93,291,507✔
1181
  pInfo->totalSize += dataLen;
93,291,507✔
1182
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
93,290,908✔
1183
  pOperator->resultInfo.totalRows += numOfRows;
93,290,908✔
1184
}
93,291,507✔
1185

1186
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
321,766,415✔
1187
  int32_t      code = TSDB_CODE_SUCCESS;
321,766,415✔
1188
  int32_t      lino = 0;
321,766,415✔
1189
  SSDataBlock* pBlock = NULL;
321,766,415✔
1190
  if (pColList == NULL) {  // data from other sources
321,766,415✔
1191
    blockDataCleanup(pRes);
316,690,708✔
1192
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
316,690,712✔
1193
    if (code) {
316,688,470✔
1194
      return code;
×
1195
    }
1196
  } else {  // extract data according to pColList
1197
    char* pStart = pData;
5,075,707✔
1198

1199
    int32_t numOfCols = htonl(*(int32_t*)pStart);
5,075,707✔
1200
    pStart += sizeof(int32_t);
5,075,707✔
1201

1202
    // todo refactor:extract method
1203
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
5,075,707✔
1204
    for (int32_t i = 0; i < numOfCols; ++i) {
72,121,507✔
1205
      SSysTableSchema* p = (SSysTableSchema*)pStart;
67,045,800✔
1206

1207
      p->colId = htons(p->colId);
67,045,800✔
1208
      p->bytes = htonl(p->bytes);
67,045,800✔
1209
      pStart += sizeof(SSysTableSchema);
67,045,800✔
1210
    }
1211

1212
    pBlock = NULL;
5,075,707✔
1213
    code = createDataBlock(&pBlock);
5,075,707✔
1214
    QUERY_CHECK_CODE(code, lino, _end);
5,075,707✔
1215

1216
    for (int32_t i = 0; i < numOfCols; ++i) {
72,121,507✔
1217
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
67,045,800✔
1218
      code = blockDataAppendColInfo(pBlock, &idata);
67,045,800✔
1219
      QUERY_CHECK_CODE(code, lino, _end);
67,045,800✔
1220
    }
1221

1222
    code = blockDecodeInternal(pBlock, pStart, NULL);
5,075,707✔
1223
    QUERY_CHECK_CODE(code, lino, _end);
5,075,707✔
1224

1225
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
5,075,707✔
1226
    QUERY_CHECK_CODE(code, lino, _end);
5,075,707✔
1227

1228
    // data from mnode
1229
    pRes->info.dataLoad = 1;
5,075,707✔
1230
    pRes->info.rows = pBlock->info.rows;
5,075,707✔
1231
    pRes->info.scanFlag = MAIN_SCAN;
5,075,707✔
1232
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
5,075,707✔
1233
    QUERY_CHECK_CODE(code, lino, _end);
5,075,707✔
1234

1235
    blockDataDestroy(pBlock);
5,075,707✔
1236
    pBlock = NULL;
5,075,707✔
1237
  }
1238

1239
_end:
321,764,177✔
1240
  if (code != TSDB_CODE_SUCCESS) {
321,764,583✔
1241
    blockDataDestroy(pBlock);
×
1242
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1243
  }
1244
  return code;
321,764,583✔
1245
}
1246

1247
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
54,871,050✔
1248
  SExchangeInfo* pExchangeInfo = pOperator->info;
54,871,050✔
1249
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
54,871,050✔
1250

1251
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
54,871,050✔
1252
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
54,871,050✔
1253
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
54,871,050✔
1254
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1255
         pLoadInfo->totalElapsed / 1000.0);
1256

1257
  setOperatorCompleted(pOperator);
54,871,050✔
1258
}
54,871,050✔
1259

1260
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
185,473,340✔
1261
  int32_t code = TSDB_CODE_SUCCESS;
185,473,340✔
1262
  int32_t lino = 0;
185,473,340✔
1263
  size_t  total = taosArrayGetSize(pArray);
185,473,340✔
1264

1265
  int32_t completed = 0;
185,474,659✔
1266
  for (int32_t k = 0; k < total; ++k) {
494,568,523✔
1267
    SSourceDataInfo* p = taosArrayGet(pArray, k);
309,094,504✔
1268
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
309,094,504✔
1269
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
309,094,504✔
1270
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
147,280,472✔
1271
      completed += 1;
147,280,271✔
1272
    }
1273
  }
1274

1275
  *pRes = completed;
185,474,019✔
1276
_end:
185,474,019✔
1277
  if (code != TSDB_CODE_SUCCESS) {
185,474,019✔
1278
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1279
  }
1280
  return code;
185,472,867✔
1281
}
1282

1283
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
61,572,714✔
1284
  SExchangeInfo* pExchangeInfo = pOperator->info;
61,572,714✔
1285
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
61,572,906✔
1286

1287
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
61,572,008✔
1288
  int64_t startTs = taosGetTimestampUs();
61,571,076✔
1289

1290
  // Asynchronously send all fetch requests to all sources.
1291
  for (int32_t i = 0; i < totalSources; ++i) {
159,132,114✔
1292
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
97,559,864✔
1293
    if (code != TSDB_CODE_SUCCESS) {
97,561,038✔
1294
      pTaskInfo->code = code;
×
1295
      return code;
×
1296
    }
1297
  }
1298

1299
  int64_t endTs = taosGetTimestampUs();
61,573,108✔
1300
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
61,573,108✔
1301
         totalSources, (endTs - startTs) / 1000.0);
1302

1303
  pOperator->status = OP_RES_TO_RETURN;
61,573,108✔
1304
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
61,573,108✔
1305
  if (isTaskKilled(pTaskInfo)) {
61,573,108✔
1306
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1307
  }
1308

1309
  return TSDB_CODE_SUCCESS;
61,573,108✔
1310
}
1311

1312
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
88,215,145✔
1313
  int32_t            code = TSDB_CODE_SUCCESS;
88,215,145✔
1314
  int32_t            lino = 0;
88,215,145✔
1315
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
88,215,145✔
1316
  SSDataBlock*       pb = NULL;
88,215,800✔
1317

1318
  char* pNextStart = pRetrieveRsp->data;
88,215,800✔
1319
  char* pStart = pNextStart;
88,215,599✔
1320

1321
  int32_t index = 0;
88,215,800✔
1322

1323
  if (pRetrieveRsp->compressed) {  // decompress the data
88,215,800✔
1324
    if (pDataInfo->decompBuf == NULL) {
×
1325
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1326
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1327
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1328
    } else {
1329
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1330
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1331
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1332
        if (p != NULL) {
×
1333
          pDataInfo->decompBuf = p;
×
1334
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1335
        }
1336
      }
1337
    }
1338
  }
1339

1340
  while (index++ < pRetrieveRsp->numOfBlocks) {
404,905,925✔
1341
    pStart = pNextStart;
316,691,569✔
1342

1343
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
316,691,569✔
1344
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
159,167,906✔
1345
      blockDataCleanup(pb);
159,167,906✔
1346
    } else {
1347
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
157,523,409✔
1348
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
157,523,819✔
1349
    }
1350

1351
    int32_t compLen = *(int32_t*)pStart;
316,691,725✔
1352
    pStart += sizeof(int32_t);
316,691,725✔
1353

1354
    int32_t rawLen = *(int32_t*)pStart;
316,691,725✔
1355
    pStart += sizeof(int32_t);
316,691,524✔
1356
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
316,691,524✔
1357

1358
    pNextStart = pStart + compLen;
316,691,524✔
1359
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
316,691,317✔
1360
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1361
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1362
      pStart = pDataInfo->decompBuf;
×
1363
    }
1364

1365
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
316,691,116✔
1366
    if (code != 0) {
316,688,674✔
1367
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1368
      goto _end;
×
1369
    }
1370

1371
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
316,688,674✔
1372
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
316,690,907✔
1373
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
316,690,907✔
1374
    pb = NULL;
316,691,724✔
1375
  }
1376

1377
_end:
88,215,800✔
1378
  if (code != TSDB_CODE_SUCCESS) {
88,215,800✔
1379
    blockDataDestroy(pb);
×
1380
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1381
  }
1382
  return code;
88,215,800✔
1383
}
1384

1385
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
7,708,433✔
1386
  SExchangeInfo* pExchangeInfo = pOperator->info;
7,708,433✔
1387
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
7,708,433✔
1388

1389
  int32_t code = 0;
7,708,433✔
1390
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
7,708,433✔
1391
  int64_t startTs = taosGetTimestampUs();
7,708,433✔
1392

1393
  int32_t vgId = 0;
7,708,433✔
1394
  if (pExchangeInfo->dynTbname) {
7,708,433✔
1395
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
78,987✔
1396
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
78,987✔
1397
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
78,987✔
1398
      if (pValue != NULL && pValue->isTbname) {
78,987✔
1399
        vgId = pValue->vgId;
78,987✔
1400
        break;
78,987✔
1401
      }
1402
    }
1403
  }
1404

1405
  while (1) {
2,114,514✔
1406
    if (pExchangeInfo->current >= totalSources) {
9,822,947✔
1407
      setAllSourcesCompleted(pOperator);
2,103,387✔
1408
      return TSDB_CODE_SUCCESS;
2,103,387✔
1409
    }
1410

1411
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
7,719,560✔
1412
    if (!pSource) {
7,719,560✔
1413
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1414
      pTaskInfo->code = terrno;
×
1415
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1416
    }
1417

1418
    if (vgId != 0 && pSource->addr.nodeId != vgId){
7,719,560✔
1419
      pExchangeInfo->current += 1;
44,130✔
1420
      continue;
44,130✔
1421
    }
1422

1423
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
7,675,430✔
1424
    if (!pDataInfo) {
7,675,430✔
1425
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1426
      pTaskInfo->code = terrno;
×
1427
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1428
    }
1429
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
7,675,430✔
1430

1431
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
7,675,430✔
1432
    if (code != TSDB_CODE_SUCCESS) {
7,675,430✔
1433
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1434
      pTaskInfo->code = code;
×
1435
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1436
    }
1437

1438
    while (true) {
×
1439
      code = exchangeWait(pOperator, pExchangeInfo);
7,675,430✔
1440
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
7,675,430✔
1441
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1442
      }
1443

1444
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
7,675,430✔
1445
      if (pDataInfo->seqId != currSeqId) {
7,675,430✔
1446
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
1447
        taosMemoryFreeClear(pDataInfo->pRsp);
×
1448
        continue;
×
1449
      }
1450

1451
      break;
7,675,430✔
1452
    }
1453

1454
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
7,675,430✔
1455
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
692✔
1456
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1457
             tstrerror(pDataInfo->code));
1458
      pOperator->pTaskInfo->code = pDataInfo->code;
692✔
1459
      return pOperator->pTaskInfo->code;
692✔
1460
    }
1461

1462
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
7,674,738✔
1463
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
7,674,738✔
1464

1465
    if (pRsp->numOfRows == 0) {
7,674,738✔
1466
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
2,070,384✔
1467
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1468
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1469
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1470

1471
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
2,070,384✔
1472
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
2,070,384✔
1473
        pExchangeInfo->current = totalSources;
2,027,968✔
1474
      } else {
1475
        pExchangeInfo->current += 1;
42,416✔
1476
      }
1477
      taosMemoryFreeClear(pDataInfo->pRsp);
2,070,384✔
1478
      continue;
2,070,384✔
1479
    }
1480

1481
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
5,604,354✔
1482
    if (code != TSDB_CODE_SUCCESS) {
5,604,354✔
1483
      goto _error;
×
1484
    }
1485

1486
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
5,604,354✔
1487
    if (pRsp->completed == 1) {
5,604,354✔
1488
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
90,378✔
1489
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1490
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1491
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1492
             pExchangeInfo->current + 1, totalSources);
1493

1494
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
90,378✔
1495
      if (pDataInfo->isVtbRefScan) {
90,378✔
1496
        pExchangeInfo->current = totalSources;
×
1497
      } else {
1498
        pExchangeInfo->current += 1;
90,378✔
1499
      }
1500
    } else {
1501
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
5,513,976✔
1502
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1503
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1504
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1505
    }
1506
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
5,604,354✔
1507
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
5,407,728✔
1508
    }
1509
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
5,604,354✔
1510
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
5,604,354✔
1511

1512
    taosMemoryFreeClear(pDataInfo->pRsp);
5,604,354✔
1513
    return TSDB_CODE_SUCCESS;
5,604,354✔
1514
  }
1515

1516
_error:
×
1517
  pTaskInfo->code = code;
×
1518
  return code;
×
1519
}
1520

1521
void clearVtbScanDataInfo(void* pItem) {
1,711,079✔
1522
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
1,711,079✔
1523
  if (pInfo->colMap) {
1,711,079✔
1524
    taosArrayDestroy(pInfo->colMap->colMap);
×
1525
    taosMemoryFreeClear(pInfo->colMap);
×
1526
  }
1527
  taosArrayDestroy(pInfo->pSrcUidList);
1,711,079✔
1528
}
1,711,079✔
1529

1530
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
7,927,507✔
1531
  SExchangeInfo*     pExchangeInfo = pOperator->info;
7,927,507✔
1532
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
7,927,507✔
1533
  if (NULL == pIdx) {
7,927,507✔
1534
    if (pBasicParam->isNewDeployed) {
4,488✔
1535
      SDownstreamSourceNode *pNode = NULL;
4,488✔
1536
      int32_t code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
4,488✔
1537
      if (code != TSDB_CODE_SUCCESS) {
4,488✔
1538
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1539
        return code;
×
1540
      }
1541

1542
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
4,488✔
1543
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
4,488✔
1544
      if (code != TSDB_CODE_SUCCESS) {
4,488✔
1545
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1546
        return code;
×
1547
      }
1548
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
4,488✔
1549
      if (!tmp) {
4,488✔
1550
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1551
        return terrno;
×
1552
      }
1553
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
4,488✔
1554
      code =
1555
          tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
4,488✔
1556
      if (pExchangeInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
4,488✔
1557
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1558
        return code;
×
1559
      }
1560
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
4,488✔
1561
      if (pIdx == NULL) {
4,488✔
1562
        qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1563
        return TSDB_CODE_INVALID_PARA;
×
1564
      }
1565
    } else {
1566
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1567
      return TSDB_CODE_INVALID_PARA;
×
1568
    }
1569
  }
1570

1571
  qDebug("start to add single exchange source");
7,927,507✔
1572

1573
  if (pBasicParam->isVtbWinScan) {
7,927,507✔
1574
    SSourceDataInfo dataInfo = {0};
×
1575
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
×
1576
    dataInfo.taskId = pExchangeInfo->pTaskId;
×
1577
    dataInfo.index = pIdx->srcIdx;
×
1578
    dataInfo.window = pBasicParam->window;
×
1579
    dataInfo.colMap = NULL;
×
1580

1581
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1582
    if (dataInfo.pSrcUidList == NULL) {
×
1583
      return terrno;
×
1584
    }
1585
    dataInfo.isNewParam = false;
×
1586
    dataInfo.isVtbRefScan = false;
×
1587
    dataInfo.isVtbTagScan = false;
×
1588
    dataInfo.isVtbWinScan = true;
×
1589
    dataInfo.srcOpType = pBasicParam->srcOpType;
×
1590
    dataInfo.tableSeq = pBasicParam->tableSeq;
×
1591

1592
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
×
1593
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
×
1594
    if (!tmp) {
×
1595
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1596
      return terrno;
×
1597
    }
1598
  } else if (pBasicParam->isVtbRefScan) {
7,927,507✔
1599
    SSourceDataInfo dataInfo = {0};
6,703,764✔
1600
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
6,703,764✔
1601
    dataInfo.taskId = pExchangeInfo->pTaskId;
6,703,764✔
1602
    dataInfo.index = pIdx->srcIdx;
6,703,764✔
1603
    dataInfo.window = pBasicParam->window;
6,703,764✔
1604
    dataInfo.isNewParam = pBasicParam->isNewParam;
6,703,764✔
1605
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
6,703,764✔
1606
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
6,703,764✔
1607
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
6,703,764✔
1608
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
6,703,764✔
1609

1610
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
6,703,764✔
1611
    if (dataInfo.pSrcUidList == NULL) {
6,703,764✔
1612
      return terrno;
×
1613
    }
1614

1615
    dataInfo.isVtbRefScan = true;
6,703,764✔
1616
    dataInfo.isVtbTagScan = false;
6,703,764✔
1617
    dataInfo.isVtbWinScan = false;
6,703,764✔
1618
    dataInfo.srcOpType = pBasicParam->srcOpType;
6,703,764✔
1619
    dataInfo.tableSeq = pBasicParam->tableSeq;
6,703,764✔
1620

1621
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
6,703,764✔
1622
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
6,703,764✔
1623
    if (!tmp) {
6,703,764✔
1624
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1625
      return terrno;
×
1626
    }
1627
  } else if (pBasicParam->isVtbTagScan) {
1,223,743✔
1628
    SSourceDataInfo dataInfo = {0};
944,638✔
1629
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
944,638✔
1630
    dataInfo.taskId = pExchangeInfo->pTaskId;
944,638✔
1631
    dataInfo.index = pIdx->srcIdx;
944,638✔
1632
    dataInfo.window = pBasicParam->window;
944,638✔
1633
    dataInfo.colMap = NULL;
944,638✔
1634

1635
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
944,638✔
1636
    if (dataInfo.pSrcUidList == NULL) {
944,638✔
1637
      return terrno;
×
1638
    }
1639

1640
    dataInfo.isNewParam = false;
944,638✔
1641
    dataInfo.isVtbRefScan = false;
944,638✔
1642
    dataInfo.isVtbTagScan = true;
944,638✔
1643
    dataInfo.isVtbWinScan = false;
944,638✔
1644
    dataInfo.srcOpType = pBasicParam->srcOpType;
944,638✔
1645
    dataInfo.tableSeq = pBasicParam->tableSeq;
944,638✔
1646

1647
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
944,638✔
1648
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
944,638✔
1649
    if (!tmp) {
944,638✔
1650
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1651
      return terrno;
×
1652
    }
1653
  } else {
1654
    if (pIdx->inUseIdx < 0) {
279,105✔
1655
      SSourceDataInfo dataInfo = {0};
277,761✔
1656
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
277,761✔
1657
      dataInfo.taskId = pExchangeInfo->pTaskId;
277,761✔
1658
      dataInfo.index = pIdx->srcIdx;
277,761✔
1659

1660
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
277,761✔
1661
      if (dataInfo.pSrcUidList == NULL) {
277,761✔
1662
        return terrno;
×
1663
      }
1664

1665
      dataInfo.isNewParam = false;
277,761✔
1666
      dataInfo.isVtbRefScan = false;
277,761✔
1667
      dataInfo.isVtbTagScan = false;
277,761✔
1668
      dataInfo.isVtbWinScan = false;
277,761✔
1669
      dataInfo.srcOpType = pBasicParam->srcOpType;
277,761✔
1670
      dataInfo.tableSeq = pBasicParam->tableSeq;
277,761✔
1671

1672
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
277,761✔
1673
      if (!tmp) {
277,761✔
1674
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1675
        return terrno;
×
1676
      }
1677
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
277,761✔
1678
    } else {
1679
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
1,344✔
1680
      if (!pDataInfo) {
1,344✔
1681
        return terrno;
×
1682
      }
1683
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1,344✔
1684
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,344✔
1685
      }
1686

1687
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,344✔
1688
      if (pDataInfo->pSrcUidList == NULL) {
1,344✔
1689
        return terrno;
×
1690
      }
1691

1692
      pDataInfo->isNewParam = false;
1,344✔
1693
      pDataInfo->isVtbRefScan = false;
1,344✔
1694
      pDataInfo->isVtbTagScan = false;
1,344✔
1695
      pDataInfo->isVtbWinScan = false;
1,344✔
1696
      pDataInfo->srcOpType = pBasicParam->srcOpType;
1,344✔
1697
      pDataInfo->tableSeq = pBasicParam->tableSeq;
1,344✔
1698
    }
1699
  }
1700

1701
  return TSDB_CODE_SUCCESS;
7,927,507✔
1702
}
1703

1704
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
7,808,649✔
1705
  SExchangeInfo*               pExchangeInfo = pOperator->info;
7,808,649✔
1706
  int32_t                      code = TSDB_CODE_SUCCESS;
7,808,649✔
1707
  SExchangeOperatorBasicParam* pBasicParam = NULL;
7,808,649✔
1708
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
7,808,649✔
1709
  if (pParam->multiParams) {
7,808,649✔
1710
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
157,559✔
1711
    int32_t                      iter = 0;
157,559✔
1712
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
433,976✔
1713
      code = addSingleExchangeSource(pOperator, pBasicParam);
276,417✔
1714
      if (code) {
276,417✔
1715
        return code;
×
1716
      }
1717
    }
1718
  } else {
1719
    pBasicParam = &pParam->basic;
7,651,090✔
1720
    code = addSingleExchangeSource(pOperator, pBasicParam);
7,651,090✔
1721
  }
1722

1723
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
7,808,649✔
1724
  pOperator->pOperatorGetParam = NULL;
7,808,649✔
1725

1726
  return code;
7,808,649✔
1727
}
1728

1729
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
401,889,143✔
1730
  SExchangeInfo* pExchangeInfo = pOperator->info;
401,889,143✔
1731
  int32_t        code = TSDB_CODE_SUCCESS;
401,889,276✔
1732
  int32_t        lino = 0;
401,889,276✔
1733
  
1734
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
401,889,276✔
1735
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
79,064,484✔
1736
    qDebug("skip prepare, opened:%d, dynamicOp:%d, getParam:%p", OPTR_IS_OPENED(pOperator), pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
323,269,240✔
1737
    return TSDB_CODE_SUCCESS;
323,269,236✔
1738
  }
1739

1740
  if (pExchangeInfo->dynamicOp) {
78,618,710✔
1741
    code = addDynamicExchangeSource(pOperator);
7,808,649✔
1742
    QUERY_CHECK_CODE(code, lino, _end);
7,808,649✔
1743
  }
1744

1745
  if (pOperator->status == OP_NOT_OPENED && (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) || IS_STREAM_MODE(pOperator->pTaskInfo)) {
78,619,377✔
1746
    pExchangeInfo->current = 0;
16,449,943✔
1747
  }
1748

1749
  int64_t st = taosGetTimestampUs();
78,619,073✔
1750

1751
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
78,619,073✔
1752
    code = prepareConcurrentlyLoad(pOperator);
61,571,732✔
1753
    QUERY_CHECK_CODE(code, lino, _end);
61,573,108✔
1754
    pExchangeInfo->openedTs = taosGetTimestampUs();
61,573,108✔
1755
  }
1756

1757
  OPTR_SET_OPENED(pOperator);
78,621,693✔
1758
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
78,619,800✔
1759

1760
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
78,620,002✔
1761

1762
_end:
24,029,538✔
1763
  if (code != TSDB_CODE_SUCCESS) {
78,620,002✔
1764
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1765
    pOperator->pTaskInfo->code = code;
×
1766
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1767
  }
1768
  return TSDB_CODE_SUCCESS;
78,620,002✔
1769
}
1770

1771
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
2,511,843✔
1772
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,511,843✔
1773

1774
  if (pLimitInfo->remainGroupOffset > 0) {
2,511,843✔
1775
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1776
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1777
      blockDataCleanup(pBlock);
×
1778
      return PROJECT_RETRIEVE_CONTINUE;
×
1779
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1780
      // now it is the data from a new group
1781
      pLimitInfo->remainGroupOffset -= 1;
×
1782

1783
      // ignore data block in current group
1784
      if (pLimitInfo->remainGroupOffset > 0) {
×
1785
        blockDataCleanup(pBlock);
×
1786
        return PROJECT_RETRIEVE_CONTINUE;
×
1787
      }
1788
    }
1789

1790
    // set current group id of the project operator
1791
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1792
  }
1793

1794
  // here check for a new group data, we need to handle the data of the previous group.
1795
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
2,511,843✔
1796
    pLimitInfo->numOfOutputGroups += 1;
104,829✔
1797
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
104,829✔
1798
      pOperator->status = OP_EXEC_DONE;
×
1799
      blockDataCleanup(pBlock);
×
1800

1801
      return PROJECT_RETRIEVE_DONE;
×
1802
    }
1803

1804
    // reset the value for a new group data
1805
    resetLimitInfoForNextGroup(pLimitInfo);
104,829✔
1806
    // existing rows that belongs to previous group.
1807
    if (pBlock->info.rows > 0) {
104,829✔
1808
      return PROJECT_RETRIEVE_DONE;
104,829✔
1809
    }
1810
  }
1811

1812
  // here we reach the start position, according to the limit/offset requirements.
1813

1814
  // set current group id
1815
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
2,407,014✔
1816

1817
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
2,407,014✔
1818
  if (pBlock->info.rows == 0) {
2,407,014✔
1819
    return PROJECT_RETRIEVE_CONTINUE;
1,292,362✔
1820
  } else {
1821
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,114,652✔
1822
      setOperatorCompleted(pOperator);
×
1823
      return PROJECT_RETRIEVE_DONE;
×
1824
    }
1825
  }
1826

1827
  // todo optimize performance
1828
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1829
  // they may not belong to the same group the limit/offset value is not valid in this case.
1830
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,114,652✔
1831
    return PROJECT_RETRIEVE_DONE;
1,114,652✔
1832
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1833
    return PROJECT_RETRIEVE_CONTINUE;
×
1834
  }
1835
}
1836

1837
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
123,568,286✔
1838
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
123,568,286✔
1839
  int32_t        code = TSDB_CODE_SUCCESS;
123,568,286✔
1840
  if (pTask->pWorkerCb) {
123,568,286✔
1841
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
123,568,286✔
1842
    if (code != TSDB_CODE_SUCCESS) {
123,568,286✔
1843
      pTask->code = code;
×
1844
      return pTask->code;
×
1845
    }
1846
  }
1847

1848
  code = tsem_wait(&pExchangeInfo->ready);
123,568,286✔
1849
  if (code != TSDB_CODE_SUCCESS) {
123,568,286✔
1850
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1851
    pTask->code = code;
×
1852
    return pTask->code;
×
1853
  }
1854

1855
  if (pTask->pWorkerCb) {
123,568,286✔
1856
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
123,567,478✔
1857
    if (code != TSDB_CODE_SUCCESS) {
123,567,478✔
1858
      pTask->code = code;
×
1859
      return pTask->code;
×
1860
    }
1861
  }
1862
  return TSDB_CODE_SUCCESS;
123,568,286✔
1863
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc