• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4887

16 Dec 2025 08:27AM UTC coverage: 65.289% (-0.003%) from 65.292%
#4887

push

travis-ci

web-flow
feat[TS-7233]: audit (#33850)

377 of 536 new or added lines in 28 files covered. (70.34%)

1025 existing lines in 111 files now uncovered.

178977 of 274129 relevant lines covered (65.29%)

102580217.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.96
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t            index;
36
  int64_t            seqId;
37
  SRWLatch           lock;
38
  SRetrieveTableRsp* pRsp;
39
  uint64_t           totalRows;
40
  int64_t            startTime;
41
  int32_t            code;
42
  EX_SOURCE_STATUS   status;
43
  const char*        taskId;
44
  SArray*            pSrcUidList;
45
  int32_t            srcOpType;
46
  bool               tableSeq;
47
  char*              decompBuf;
48
  int32_t            decompBufSize;
49
  SOrgTbInfo*        colMap;
50
  bool               isVtbRefScan;
51
  bool               isVtbTagScan;
52
  bool               isVtbWinScan;
53
  bool               isNewParam;
54
  STimeWindow        window;
55
  bool               fetchSent; // need reset
56
} SSourceDataInfo;
57

58
static void destroyExchangeOperatorInfo(void* param);
59
static void freeBlock(void* pParam);
60
static void freeSourceDataInfo(void* param);
61
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
62

63
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
64
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
65
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
66
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
67
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
68
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
69
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
70
                                 bool holdDataInBuf);
71
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
72

73
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
74

75

76
static void streamConcurrentlyLoadRemoteData(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
14,974,124✔
77
                                           SExecTaskInfo* pTaskInfo) {
78
  int32_t code = 0;
14,974,124✔
79
  int32_t lino = 0;
14,974,124✔
80
  int64_t startTs = taosGetTimestampUs();  
14,975,331✔
81
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
14,975,331✔
82
  int32_t completed = 0;
14,975,331✔
83
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
14,975,331✔
84
  if (code != TSDB_CODE_SUCCESS) {
14,974,129✔
85
    pTaskInfo->code = code;
×
86
    T_LONG_JMP(pTaskInfo->env, code);
×
87
  }
88
  if (completed == totalSources) {
14,974,129✔
89
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
3,243,610✔
90
    setAllSourcesCompleted(pOperator);
3,243,610✔
91
    return;
3,246,275✔
92
  }
93

94
  SSourceDataInfo* pDataInfo = NULL;
11,730,519✔
95

96
  while (1) {
6,705,118✔
97
    if (pExchangeInfo->current < 0) {
18,435,637✔
98
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
106,844✔
99
      setAllSourcesCompleted(pOperator);
106,844✔
100
      return;
106,844✔
101
    }
102
    
103
    if (pExchangeInfo->current >= totalSources) {
18,328,788✔
104
      completed = 0;
8,185,394✔
105
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
8,185,394✔
106
      if (code != TSDB_CODE_SUCCESS) {
8,185,812✔
107
        pTaskInfo->code = code;
×
108
        T_LONG_JMP(pTaskInfo->env, code);
×
109
      }
110
      if (completed == totalSources) {
8,185,812✔
111
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
6,137,851✔
112
        setAllSourcesCompleted(pOperator);
6,137,851✔
113
        return;
6,137,851✔
114
      }
115
      
116
      pExchangeInfo->current = 0;
2,047,961✔
117
    }
118

119
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
12,190,953✔
120

121
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
12,191,355✔
122
    if (!pDataInfo) {
12,192,160✔
123
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
124
      pTaskInfo->code = terrno;
×
125
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
126
    }
127

128
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
12,192,160✔
129
      pExchangeInfo->current++;
1,890✔
130
      continue;
1,512✔
131
    }
132

133
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
12,190,677✔
134

135
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
12,190,270✔
136
    if (code != TSDB_CODE_SUCCESS) {
12,190,677✔
137
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
138
      pTaskInfo->code = code;
×
139
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
140
    }
141

UNCOV
142
    while (true) {
×
143
      code = exchangeWait(pOperator, pExchangeInfo);
12,190,677✔
144
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
12,190,677✔
145
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
440✔
146
      }
147

148
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
12,190,237✔
149
      if (pDataInfo->seqId != currSeqId) {
12,190,677✔
UNCOV
150
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
×
151
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
UNCOV
152
        taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
153
        continue;
×
154
      }
155

156
      break;
12,190,237✔
157
    }
158

159
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
12,190,237✔
160
    if (!pSource) {
12,190,237✔
161
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
162
      pTaskInfo->code = terrno;
×
163
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
164
    }
165

166
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
12,190,237✔
167
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
168
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
169
             tstrerror(pDataInfo->code));
170
      pTaskInfo->code = pDataInfo->code;
×
171
      T_LONG_JMP(pTaskInfo->env, code);
×
172
    }
173

174
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
12,189,046✔
175
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
12,190,677✔
176

177
    if (pRsp->numOfRows == 0) {
12,190,677✔
178
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
6,703,228✔
179
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
180
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
181
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
182

183
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
6,703,228✔
184
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
6,703,646✔
185
        pExchangeInfo->current = -1;
106,844✔
186
      } else {
187
        pExchangeInfo->current += 1;
6,596,802✔
188
      }
189
      taosMemoryFreeClear(pDataInfo->pRsp);
6,703,646✔
190
      continue;
6,703,646✔
191
    }
192

193
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
5,487,031✔
194
    TAOS_CHECK_EXIT(code);
5,487,031✔
195

196
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
5,487,031✔
197
    if (pRsp->completed == 1) {
5,487,031✔
198
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
3,368,298✔
199
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
200
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
201
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
202
             pExchangeInfo->current + 1, totalSources);
203

204
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
3,368,298✔
205
      if (pDataInfo->isVtbRefScan) {
3,368,298✔
206
        pExchangeInfo->current = -1;
×
207
        taosMemoryFreeClear(pDataInfo->pRsp);
×
208
        continue;
×
209
      }
210
    } else {
211
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
2,118,733✔
212
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
213
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
214
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
215
    }
216

217
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
5,487,031✔
218
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
5,487,031✔
219

220
    pExchangeInfo->current++;
5,487,031✔
221

222
    taosMemoryFreeClear(pDataInfo->pRsp);
5,487,031✔
223
    return;
5,487,031✔
224
  }
225

226
_exit:
×
227

228
  if (code) {
×
229
    pTaskInfo->code = code;
×
230
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
231
  }
232
}
233

234

235
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
106,130,219✔
236
                                           SExecTaskInfo* pTaskInfo) {
237
  int32_t code = 0;
106,130,219✔
238
  int32_t lino = 0;
106,130,219✔
239
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
106,130,219✔
240
  int32_t completed = 0;
106,130,613✔
241
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
106,130,613✔
242
  if (code != TSDB_CODE_SUCCESS) {
106,130,420✔
243
    pTaskInfo->code = code;
×
244
    T_LONG_JMP(pTaskInfo->env, code);
×
245
  }
246
  if (completed == totalSources) {
106,130,420✔
247
    setAllSourcesCompleted(pOperator);
33,239,053✔
248
    return;
33,242,061✔
249
  }
250

251
  SSourceDataInfo* pDataInfo = NULL;
72,891,367✔
252

253
  while (1) {
6,917,963✔
254
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
79,809,330✔
255
    code = exchangeWait(pOperator, pExchangeInfo);
79,809,330✔
256

257
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
79,809,523✔
258
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
124✔
259
    }
260

261
    for (int32_t i = 0; i < totalSources; ++i) {
115,076,658✔
262
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
115,076,301✔
263
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
115,076,782✔
264
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
115,076,782✔
265
        continue;
24,964,886✔
266
      }
267

268
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
90,111,896✔
269
        continue;
10,302,373✔
270
      }
271

272
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
79,809,523✔
273
      if (pDataInfo->seqId != currSeqId) {
79,809,523✔
274
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
275
        taosMemoryFreeClear(pDataInfo->pRsp);
×
276
        break;
×
277
      }
278

279
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
79,809,273✔
280
        code = pDataInfo->code;
632✔
281
        TAOS_CHECK_EXIT(code);
632✔
282
      }
283

284
      tmemory_barrier();
79,808,891✔
285
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
79,808,891✔
286
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
79,808,891✔
287
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
79,808,891✔
288

289
      // todo
290
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
79,808,891✔
291
      if (pRsp->numOfRows == 0) {
79,808,758✔
292
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
18,706,652✔
293
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
294
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
295
          if (code != TSDB_CODE_SUCCESS) {
×
296
            taosMemoryFreeClear(pDataInfo->pRsp);
×
297
            TAOS_CHECK_EXIT(code);
×
298
          }
299
        } else {
300
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
18,706,652✔
301
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
18,706,652✔
302
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
303
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
304
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
305
          taosMemoryFreeClear(pDataInfo->pRsp);
18,706,652✔
306
        }
307
        break;
18,706,652✔
308
      }
309

310
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
61,102,239✔
311

312
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
61,102,239✔
313
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
61,102,239✔
314
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
61,102,239✔
315

316
      if (pRsp->completed == 1) {
61,102,239✔
317
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
55,023,162✔
318
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
55,023,162✔
319
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
320
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
321
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
322
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
323
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
324
      } else {
325
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
6,079,077✔
326
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
327
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
328
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
329
      }
330

331
      taosMemoryFreeClear(pDataInfo->pRsp);
61,102,239✔
332

333
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan && !pDataInfo->isVtbTagScan) {
61,102,239✔
334
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
6,079,077✔
335
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
6,079,077✔
336
        if (code != TSDB_CODE_SUCCESS) {
6,079,077✔
337
          taosMemoryFreeClear(pDataInfo->pRsp);
×
338
          TAOS_CHECK_EXIT(code);
×
339
        }
340
      }
341
      
342
      return;
61,102,559✔
343
    }  // end loop
344

345
    int32_t complete1 = 0;
18,707,009✔
346
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
18,706,652✔
347
    if (code != TSDB_CODE_SUCCESS) {
18,706,652✔
348
      pTaskInfo->code = code;
×
349
      T_LONG_JMP(pTaskInfo->env, code);
×
350
    }
351
    if (complete1 == totalSources) {
18,706,652✔
352
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
11,788,689✔
353
      return;
11,788,689✔
354
    }
355
  }
356

357
_exit:
508✔
358

359
  if (code) {
632✔
360
    pTaskInfo->code = code;
632✔
361
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
632✔
362
  }
363
}
364

365
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
352,446,193✔
366
  int32_t        code = TSDB_CODE_SUCCESS;
352,446,193✔
367
  SExchangeInfo* pExchangeInfo = pOperator->info;
352,446,193✔
368
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
352,446,386✔
369

370
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
352,445,590✔
371

372
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
352,444,777✔
373
  if (pOperator->status == OP_EXEC_DONE) {
352,444,777✔
374
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
375
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
376
           pLoadInfo->totalElapsed / 1000.0);
377
    return NULL;
×
378
  }
379

380
  // we have buffered retrieved datablock, return it directly
381
  SSDataBlock* p = NULL;
352,444,179✔
382
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
352,444,179✔
383
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
230,838,683✔
384
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
230,838,683✔
385
  }
386

387
  if (p != NULL) {
352,444,782✔
388
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
230,839,457✔
389
    if (!tmp) {
230,838,683✔
390
      code = terrno;
×
391
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
392
      pTaskInfo->code = code;
×
393
      T_LONG_JMP(pTaskInfo->env, code);
×
394
    }
395
    return p;
230,838,683✔
396
  } else {
397
    if (pExchangeInfo->seqLoadData) {
121,605,325✔
398
      code = seqLoadRemoteData(pOperator);
500,971✔
399
      if (code != TSDB_CODE_SUCCESS) {
500,505✔
400
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
401
        pTaskInfo->code = code;
×
402
        T_LONG_JMP(pTaskInfo->env, code);
×
403
      }
404
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
121,102,912✔
405
      streamConcurrentlyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
14,973,286✔
406
    } else {
407
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
106,130,613✔
408
    }
409
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
121,605,602✔
410
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
632✔
411
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
632✔
412
    }
413
    
414
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
121,606,227✔
415
      qDebug("empty resultBlockList");
54,653,656✔
416
      return NULL;
54,653,656✔
417
    } else {
418
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
66,952,164✔
419
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
66,952,164✔
420
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
66,952,164✔
421
      if (!tmp) {
66,952,164✔
422
        code = terrno;
×
423
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
424
        pTaskInfo->code = code;
×
425
        T_LONG_JMP(pTaskInfo->env, code);
×
426
      }
427

428
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
66,952,164✔
429
      return p;
66,951,149✔
430
    }
431
}
432
}
433

434
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
351,372,839✔
435
  int32_t        code = TSDB_CODE_SUCCESS;
351,372,839✔
436
  int32_t        lino = 0;
351,372,839✔
437
  SExchangeInfo* pExchangeInfo = pOperator->info;
351,372,839✔
438
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
351,373,315✔
439

440
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
351,373,298✔
441

442
  code = pOperator->fpSet._openFn(pOperator);
351,374,528✔
443
  QUERY_CHECK_CODE(code, lino, _end);
351,372,313✔
444

445
  if (pOperator->status == OP_EXEC_DONE) {
351,372,313✔
446
    (*ppRes) = NULL;
95,448✔
447
    return code;
95,448✔
448
  }
449

450
  while (1) {
1,167,447✔
451
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
352,445,373✔
452
    if (pBlock == NULL) {
352,443,696✔
453
      (*ppRes) = NULL;
54,653,656✔
454
      return code;
54,653,656✔
455
    }
456

457
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
297,790,040✔
458
    QUERY_CHECK_CODE(code, lino, _end);
297,791,414✔
459

460
    if (blockDataGetNumOfRows(pBlock) == 0) {
297,791,414✔
461
      qDebug("rows 0 block got, continue next load");
1,156✔
462
      continue;
1,156✔
463
    }
464

465
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
297,789,880✔
466
    if (hasLimitOffsetInfo(pLimitInfo)) {
297,790,069✔
467
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
2,189,543✔
468
      if (status == PROJECT_RETRIEVE_CONTINUE) {
2,189,543✔
469
        qDebug("limit retrieve continue");
1,166,291✔
470
        continue;
1,166,291✔
471
      } else if (status == PROJECT_RETRIEVE_DONE) {
1,023,252✔
472
        if (pBlock->info.rows == 0) {
1,023,252✔
473
          setOperatorCompleted(pOperator);
×
474
          (*ppRes) = NULL;
×
475
          return code;
×
476
        } else {
477
          (*ppRes) = pBlock;
1,023,252✔
478
          return code;
1,023,252✔
479
        }
480
      }
481
    } else {
482
      (*ppRes) = pBlock;
295,601,111✔
483
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
295,601,111✔
484
      return code;
295,600,706✔
485
    }
486
  }
487

488
_end:
×
489

490
  if (code != TSDB_CODE_SUCCESS) {
×
491
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
492
    pTaskInfo->code = code;
×
493
    T_LONG_JMP(pTaskInfo->env, code);
×
494
  } else {
495
    qDebug("empty block returned in exchange");
×
496
  }
497
  
498
  (*ppRes) = NULL;
×
499
  return code;
×
500
}
501

502
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
49,588,212✔
503
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
49,588,212✔
504
  if (pInfo->pSourceDataInfo == NULL) {
49,588,157✔
505
    return terrno;
×
506
  }
507

508
  if (pInfo->dynamicOp) {
49,587,893✔
509
    return TSDB_CODE_SUCCESS;
1,024,634✔
510
  }
511

512
  int32_t len = strlen(id) + 1;
48,564,353✔
513
  pInfo->pTaskId = taosMemoryCalloc(1, len);
48,564,353✔
514
  if (!pInfo->pTaskId) {
48,564,589✔
515
    return terrno;
×
516
  }
517
  tstrncpy(pInfo->pTaskId, id, len);
48,563,555✔
518
  for (int32_t i = 0; i < numOfSources; ++i) {
123,062,047✔
519
    SSourceDataInfo dataInfo = {0};
74,497,123✔
520
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
74,496,376✔
521
    dataInfo.taskId = pInfo->pTaskId;
74,496,376✔
522
    dataInfo.index = i;
74,497,085✔
523
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
74,497,085✔
524
    if (pDs == NULL) {
74,498,102✔
525
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
526
      return terrno;
×
527
    }
528
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
74,498,102✔
529
  }
530

531
  return TSDB_CODE_SUCCESS;
48,564,924✔
532
}
533

534
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
49,588,016✔
535
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
49,588,016✔
536

537
  if (numOfSources == 0) {
49,588,024✔
538
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
539
    return TSDB_CODE_INVALID_PARA;
×
540
  }
541
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
49,588,024✔
542
  if (!pInfo->pFetchRpcHandles) {
49,588,833✔
543
    return terrno;
×
544
  }
545
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
49,588,066✔
546
  if (!ret) {
49,587,778✔
547
    return terrno;
×
548
  }
549

550
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
49,587,778✔
551
  if (pInfo->pSources == NULL) {
49,588,280✔
552
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
553
    return terrno;
×
554
  }
555

556
  if (pExNode->node.dynamicOp) {
49,588,649✔
557
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
1,024,634✔
558
    if (NULL == pInfo->pHashSources) {
1,024,634✔
559
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
560
      return terrno;
×
561
    }
562
  }
563

564
  for (int32_t i = 0; i < numOfSources; ++i) {
126,175,783✔
565
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
76,586,173✔
566
    if (!pNode) {
76,586,075✔
567
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
568
      return terrno;
×
569
    }
570
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
76,586,075✔
571
    if (!tmp) {
76,587,276✔
572
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
573
      return terrno;
×
574
    }
575
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
76,587,276✔
576
    int32_t           code =
577
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
76,586,563✔
578
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
76,586,928✔
579
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
580
      return code;
×
581
    }
582
  }
583

584
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
49,589,610✔
585
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
49,587,459✔
586
  if (refId < 0) {
49,587,309✔
587
    int32_t code = terrno;
×
588
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
589
    return code;
×
590
  } else {
591
    pInfo->self = refId;
49,587,309✔
592
  }
593

594
  return initDataSource(numOfSources, pInfo, id);
49,587,309✔
595
}
596

597
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
13,790,789✔
598
  SExchangeInfo* pInfo = pOper->info;
13,790,789✔
599
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
13,791,991✔
600

601
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
13,791,222✔
602

603
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
13,792,797✔
604
  pOper->status = OP_NOT_OPENED;
13,792,804✔
605
  pInfo->current = 0;
13,792,804✔
606
  pInfo->loadInfo.totalElapsed = 0;
13,792,804✔
607
  pInfo->loadInfo.totalRows = 0;
13,792,804✔
608
  pInfo->loadInfo.totalSize = 0;
13,792,804✔
609
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
37,382,074✔
610
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
23,589,678✔
611
    taosWLockLatch(&pDataInfo->lock);
23,589,678✔
612
    taosMemoryFreeClear(pDataInfo->decompBuf);
23,589,678✔
613
    taosMemoryFreeClear(pDataInfo->pRsp);
23,589,678✔
614

615
    pDataInfo->totalRows = 0;
23,589,678✔
616
    pDataInfo->code = 0;
23,589,678✔
617
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
23,589,678✔
618
    pDataInfo->fetchSent = false;
23,589,678✔
619
    taosWUnLockLatch(&pDataInfo->lock);
23,589,678✔
620
  }
621

622
  if (pInfo->dynamicOp) {
13,792,804✔
623
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
2,014,285✔
624
  } 
625

626
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
13,792,804✔
627
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
13,792,394✔
628

629
  blockDataCleanup(pInfo->pDummyBlock);
13,791,563✔
630

631
  void   *data = NULL;
13,791,993✔
632
  int32_t iter = 0;
13,791,993✔
633
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
17,798,878✔
634
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
4,006,885✔
635
  }
636
  
637
  pInfo->limitInfo = (SLimitInfo){0};
13,791,973✔
638
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
13,791,973✔
639

640
  return 0;
13,792,403✔
641
}
642

643
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
49,588,005✔
644
                                   SOperatorInfo** pOptrInfo) {
645
  QRY_PARAM_CHECK(pOptrInfo);
49,588,005✔
646

647
  int32_t        code = 0;
49,588,525✔
648
  int32_t        lino = 0;
49,588,525✔
649
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
49,588,525✔
650
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
49,588,188✔
651
  if (pInfo == NULL || pOperator == NULL) {
49,586,897✔
652
    code = terrno;
×
653
    goto _error;
×
654
  }
655

656
  pOperator->pPhyNode = pExNode;
49,586,897✔
657
  pInfo->dynamicOp = pExNode->node.dynamicOp;
49,587,269✔
658
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
49,587,654✔
659
  QUERY_CHECK_CODE(code, lino, _error);
49,589,558✔
660

661
  code = tsem_init(&pInfo->ready, 0, 0);
49,589,558✔
662
  QUERY_CHECK_CODE(code, lino, _error);
49,589,558✔
663

664
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
49,589,558✔
665
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
49,589,751✔
666

667
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
49,589,618✔
668
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
49,589,425✔
669
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
49,589,287✔
670
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
49,589,175✔
671

672
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
49,589,154✔
673
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
49,589,425✔
674
  QUERY_CHECK_CODE(code, lino, _error);
49,589,021✔
675

676
  pInfo->seqLoadData = pExNode->seqRecvData;
49,589,021✔
677
  pInfo->dynTbname = pExNode->dynTbname;
49,588,964✔
678
  if (pInfo->dynTbname) {
49,588,393✔
679
    pInfo->seqLoadData = true;
13,662✔
680
  }
681
  pInfo->pTransporter = pTransporter;
49,588,719✔
682

683
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
49,588,888✔
684
                  pTaskInfo);
685
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
49,589,063✔
686

687
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
49,589,180✔
688
                            pTaskInfo->pStreamRuntimeInfo);
49,588,797✔
689
  QUERY_CHECK_CODE(code, lino, _error);
49,589,102✔
690
  qTrace("%s exchange op:%p", __func__, pOperator);
49,589,102✔
691
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
49,589,102✔
692
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
693
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
49,588,388✔
694
  *pOptrInfo = pOperator;
49,588,312✔
695
  return TSDB_CODE_SUCCESS;
49,588,588✔
696

697
_error:
×
698
  if (code != TSDB_CODE_SUCCESS) {
×
699
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
700
    pTaskInfo->code = code;
×
701
  }
702
  if (pInfo != NULL) {
×
703
    doDestroyExchangeOperatorInfo(pInfo);
×
704
  }
705

706
  if (pOperator != NULL) {
×
707
    pOperator->info = NULL;
×
708
    destroyOperator(pOperator);
×
709
  }
710
  return code;
×
711
}
712

713
void destroyExchangeOperatorInfo(void* param) {
49,589,751✔
714
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
49,589,751✔
715
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
49,589,751✔
716
  if (code != TSDB_CODE_SUCCESS) {
49,589,309✔
717
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
718
  }
719
}
49,589,309✔
720

721
void freeBlock(void* pParam) {
161,073,143✔
722
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
161,073,143✔
723
  blockDataDestroy(pBlock);
161,073,143✔
724
}
161,072,525✔
725

726
void freeSourceDataInfo(void* p) {
74,882,650✔
727
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
74,882,650✔
728
  taosMemoryFreeClear(pInfo->decompBuf);
74,882,650✔
729
  taosMemoryFreeClear(pInfo->pRsp);
74,882,650✔
730

731
  pInfo->decompBufSize = 0;
74,882,650✔
732
}
74,882,650✔
733

734
void doDestroyExchangeOperatorInfo(void* param) {
49,589,751✔
735
  if (param == NULL) {
49,589,751✔
736
    return;
×
737
  }
738
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
49,589,751✔
739
  if (pExInfo->pFetchRpcHandles) {
49,589,751✔
740
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
126,177,410✔
741
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
76,588,349✔
742
      if (*pRpcHandle > 0) {
76,587,783✔
743
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
4,614,917✔
744
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
4,614,917✔
745
      }
746
    }
747
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
49,588,996✔
748
  }
749

750
  taosArrayDestroy(pExInfo->pSources);
49,589,875✔
751
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
49,589,751✔
752

753
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
49,589,309✔
754
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
49,589,562✔
755

756
  blockDataDestroy(pExInfo->pDummyBlock);
49,589,627✔
757
  tSimpleHashCleanup(pExInfo->pHashSources);
49,589,751✔
758

759
  int32_t code = tsem_destroy(&pExInfo->ready);
49,589,751✔
760
  if (code != TSDB_CODE_SUCCESS) {
49,589,751✔
761
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
762
  }
763
  taosMemoryFreeClear(pExInfo->pTaskId);
49,589,751✔
764

765
  taosMemoryFreeClear(param);
49,588,984✔
766
}
767

768
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
92,682,863✔
769
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
92,682,863✔
770

771
  taosMemoryFreeClear(pMsg->pEpSet);
92,682,863✔
772
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
92,687,191✔
773
  if (pExchangeInfo == NULL) {
92,686,488✔
774
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
3,829✔
775
    taosMemoryFree(pMsg->pData);
3,829✔
776
    return TSDB_CODE_SUCCESS;
3,829✔
777
  }
778

779
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
92,682,659✔
780
  if (pWrapper->seqId != currSeqId) {
92,684,241✔
781
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
782
    taosMemoryFree(pMsg->pData);
×
783
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
784
    if (code != TSDB_CODE_SUCCESS) {
×
785
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
786
    }
787
    return TSDB_CODE_SUCCESS;
×
788
  }
789

790
  int32_t          index = pWrapper->sourceIndex;
92,680,569✔
791

792
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
92,679,975✔
793

794
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
92,682,050✔
795
  if (pRpcHandle != NULL) {
92,677,037✔
796
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
92,681,440✔
797
    if (ret != 0) {
92,679,192✔
798
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
5,426,075✔
799
    }
800
    *pRpcHandle = -1;
92,679,192✔
801
  }
802

803
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
92,675,182✔
804
  if (!pSourceDataInfo) {
92,681,422✔
805
    return terrno;
×
806
  }
807

808
  if (0 == code && NULL == pMsg->pData) {
92,681,422✔
809
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
810
    code = TSDB_CODE_QRY_INVALID_MSG;
×
811
  }
812

813
  taosWLockLatch(&pSourceDataInfo->lock);
92,682,030✔
814
  if (code == TSDB_CODE_SUCCESS) {
92,685,147✔
815
    pSourceDataInfo->seqId = pWrapper->seqId;
92,684,049✔
816
    pSourceDataInfo->pRsp = pMsg->pData;
92,681,434✔
817

818
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
92,678,992✔
819
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
92,676,516✔
820
    pRsp->compLen = htonl(pRsp->compLen);
92,680,287✔
821
    pRsp->payloadLen = htonl(pRsp->payloadLen);
92,672,665✔
822
    pRsp->numOfCols = htonl(pRsp->numOfCols);
92,665,657✔
823
    pRsp->useconds = htobe64(pRsp->useconds);
92,659,962✔
824
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
92,665,901✔
825

826
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
92,666,542✔
827
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
828
  } else {
829
    taosMemoryFree(pMsg->pData);
1,098✔
830
    pSourceDataInfo->code = rpcCvtErrCode(code);
1,098✔
831
    if (pSourceDataInfo->code != code) {
1,098✔
832
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
833
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
834
    } else {
835
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
1,098✔
836
             pExchangeInfo);
837
    }
838
  }
839

840
  tmemory_barrier();
92,668,289✔
841
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
92,668,289✔
842
  taosWUnLockLatch(&pSourceDataInfo->lock);
92,672,298✔
843
  
844
  code = tsem_post(&pExchangeInfo->ready);
92,672,061✔
845
  if (code != TSDB_CODE_SUCCESS) {
92,679,006✔
846
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
847
    return code;
×
848
  }
849

850
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
92,679,006✔
851
  if (code != TSDB_CODE_SUCCESS) {
92,685,726✔
852
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
853
  }
854
  return code;
92,680,985✔
855
}
856

857
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
281,428✔
858
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
281,428✔
859
  if (NULL == *ppRes) {
281,428✔
860
    return terrno;
×
861
  }
862

863
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
281,428✔
864
  if (NULL == pScan) {
281,428✔
865
    taosMemoryFreeClear(*ppRes);
×
866
    return terrno;
×
867
  }
868

869
  pScan->pUidList = taosArrayDup(pUidList, NULL);
281,428✔
870
  if (NULL == pScan->pUidList) {
281,428✔
871
    taosMemoryFree(pScan);
×
872
    taosMemoryFreeClear(*ppRes);
×
873
    return terrno;
×
874
  }
875
  pScan->tableSeq = tableSeq;
281,428✔
876
  pScan->pOrgTbInfo = NULL;
281,428✔
877
  pScan->isNewParam = false;
281,428✔
878
  pScan->window.skey = INT64_MAX;
281,428✔
879
  pScan->window.ekey = INT64_MIN;
281,428✔
880

881
  (*ppRes)->opType = srcOpType;
281,428✔
882
  (*ppRes)->downstreamIdx = 0;
281,428✔
883
  (*ppRes)->value = pScan;
281,428✔
884
  (*ppRes)->pChildren = NULL;
281,428✔
885
  (*ppRes)->reUse = false;
281,428✔
886

887
  return TSDB_CODE_SUCCESS;
281,428✔
888
}
889

890
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam) {
384,964✔
891
  int32_t                  code = TSDB_CODE_SUCCESS;
384,964✔
892
  int32_t                  lino = 0;
384,964✔
893
  STableScanOperatorParam* pScan = NULL;
384,964✔
894

895
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
384,964✔
896
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
384,964✔
897

898
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
384,964✔
899
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
384,964✔
900

901
  if (pUidList) {
384,964✔
902
    pScan->pUidList = taosArrayDup(pUidList, NULL);
384,964✔
903
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
384,964✔
904
  } else {
905
    pScan->pUidList = NULL;
×
906
  }
907

908
  if (pMap) {
384,964✔
909
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
384,964✔
910
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
384,964✔
911

912
    pScan->pOrgTbInfo->vgId = pMap->vgId;
384,964✔
913
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
384,964✔
914

915
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
384,964✔
916
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
384,964✔
917
  } else {
918
    pScan->pOrgTbInfo = NULL;
×
919
  }
920

921

922
  pScan->tableSeq = tableSeq;
384,964✔
923
  pScan->window.skey = window->skey;
384,964✔
924
  pScan->window.ekey = window->ekey;
384,964✔
925
  pScan->isNewParam = isNewParam;
384,964✔
926
  (*ppRes)->opType = srcOpType;
384,964✔
927
  (*ppRes)->downstreamIdx = 0;
384,964✔
928
  (*ppRes)->value = pScan;
384,964✔
929
  (*ppRes)->pChildren = NULL;
384,964✔
930
  (*ppRes)->reUse = false;
384,964✔
931

932
  return code;
384,964✔
933
_return:
×
934
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
935
  taosMemoryFreeClear(*ppRes);
×
936
  if (pScan) {
×
937
    taosArrayDestroy(pScan->pUidList);
×
938
    if (pScan->pOrgTbInfo) {
×
939
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
940
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
941
    }
942
    taosMemoryFree(pScan);
×
943
  }
944
  return code;
×
945
}
946

947
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
30,773✔
948
  int32_t                  code = TSDB_CODE_SUCCESS;
30,773✔
949
  int32_t                  lino = 0;
30,773✔
950
  STagScanOperatorParam*   pScan = NULL;
30,773✔
951

952
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
30,773✔
953
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
30,773✔
954

955
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
30,773✔
956
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
30,773✔
957
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
30,773✔
958

959
  (*ppRes)->opType = srcOpType;
30,773✔
960
  (*ppRes)->downstreamIdx = 0;
30,773✔
961
  (*ppRes)->value = pScan;
30,773✔
962
  (*ppRes)->pChildren = NULL;
30,773✔
963
  (*ppRes)->reUse = false;
30,773✔
964

965
  return code;
30,773✔
966
_return:
×
967
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
968
  taosMemoryFreeClear(*ppRes);
×
969
  if (pScan) {
×
970
    taosMemoryFree(pScan);
×
971
  }
972
  return code;
×
973
}
974

975
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
6,090,927✔
976
  if (!pRuntimeInfo || !pTimeRange) {
6,090,927✔
977
    return TSDB_CODE_INTERNAL_ERROR;
×
978
  }
979

980
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
6,090,927✔
981
  if (!pParam) {
6,090,139✔
982
    return TSDB_CODE_INTERNAL_ERROR;
×
983
  }
984

985
  switch (pRuntimeInfo->triggerType) {
6,090,139✔
986
    case STREAM_TRIGGER_SLIDING:
4,987,054✔
987
      // Unable to distinguish whether there is an interval, all use wstart/wend
988
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
989
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
4,987,054✔
990
      pTimeRange->ekey = pParam->wend;    // is equal to wend
4,987,054✔
991
      break;
4,987,054✔
992
    case STREAM_TRIGGER_PERIOD:
168,178✔
993
      pTimeRange->skey = pParam->prevLocalTime;
168,178✔
994
      pTimeRange->ekey = pParam->triggerTime;
168,178✔
995
      break;
168,178✔
996
    default:
935,288✔
997
      pTimeRange->skey = pParam->wstart;
935,288✔
998
      pTimeRange->ekey = pParam->wend;
935,719✔
999
      break;
936,126✔
1000
  }
1001

1002
  return TSDB_CODE_SUCCESS;
6,091,358✔
1003
}
1004

1005
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
92,704,470✔
1006
  int32_t          code = TSDB_CODE_SUCCESS;
92,704,470✔
1007
  int32_t          lino = 0;
92,704,470✔
1008
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
92,704,470✔
1009
  if (!pDataInfo) {
92,702,306✔
1010
    return terrno;
×
1011
  }
1012

1013
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
92,702,306✔
1014
    return TSDB_CODE_SUCCESS;
×
1015
  }
1016

1017
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
92,702,626✔
1018
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
92,702,851✔
1019
  if (!pSource) {
92,701,528✔
1020
    return terrno;
×
1021
  }
1022

1023
  pDataInfo->startTime = taosGetTimestampUs();
92,702,897✔
1024
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
92,702,104✔
1025

1026
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
92,702,430✔
1027
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
92,703,488✔
1028
  pWrapper->exchangeId = pExchangeInfo->self;
92,703,488✔
1029
  pWrapper->sourceIndex = sourceIndex;
92,703,718✔
1030
  pWrapper->seqId = pExchangeInfo->seqId;
92,704,337✔
1031

1032
  if (pSource->localExec) {
92,704,028✔
1033
    SDataBuf pBuf = {0};
×
1034
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
×
1035
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
1036
                                               pTaskInfo->localFetch.explainRes);
1037
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1038
    taosMemoryFree(pWrapper);
×
1039
    QUERY_CHECK_CODE(code, lino, _end);
×
1040
  } else {
1041
    bool needStreamPesudoFuncVals = true;
92,702,810✔
1042
    SResFetchReq req = {0};
92,702,810✔
1043
    req.header.vgId = pSource->addr.nodeId;
92,701,439✔
1044
    req.sId = pSource->sId;
92,702,966✔
1045
    req.clientId = pSource->clientId;
92,702,801✔
1046
    req.taskId = pSource->taskId;
92,703,904✔
1047
    req.queryId = pTaskInfo->id.queryId;
92,703,097✔
1048
    req.execId = pSource->execId;
92,703,724✔
1049
    if (pTaskInfo->pStreamRuntimeInfo) {
92,703,583✔
1050
      req.dynTbname = pExchangeInfo->dynTbname;
12,270,553✔
1051
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
12,270,553✔
1052
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
12,270,122✔
1053

1054
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
12,268,898✔
1055
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
37,692✔
1056
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
12,230,399✔
1057
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
6,090,951✔
1058
        QUERY_CHECK_CODE(code, lino, _end);
6,090,570✔
1059
        needStreamPesudoFuncVals = false;
6,090,570✔
1060
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
6,090,570✔
1061
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1062
               req.pStRtFuncInfo->curWindow.ekey);
1063
      }
1064
      if (!pDataInfo->fetchSent) {
12,268,924✔
1065
        req.reset = pDataInfo->fetchSent = true;
10,215,296✔
1066
      }
1067
    }
1068

1069
    if (pDataInfo->isVtbWinScan) {
92,702,354✔
1070
      if (pDataInfo->pSrcUidList) {
×
1071
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false);
×
1072
        taosArrayDestroy(pDataInfo->pSrcUidList);
×
1073
        pDataInfo->pSrcUidList = NULL;
×
1074
        if (TSDB_CODE_SUCCESS != code) {
×
1075
          pTaskInfo->code = code;
×
1076
          taosMemoryFree(pWrapper);
×
1077
          return pTaskInfo->code;
×
1078
        }
1079
      }
1080
    } else if (pDataInfo->isVtbRefScan) {
92,702,303✔
1081
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
384,964✔
1082
      taosArrayDestroy(pDataInfo->colMap->colMap);
384,964✔
1083
      taosMemoryFreeClear(pDataInfo->colMap);
384,964✔
1084
      taosArrayDestroy(pDataInfo->pSrcUidList);
384,964✔
1085
      pDataInfo->pSrcUidList = NULL;
384,964✔
1086
      if (TSDB_CODE_SUCCESS != code) {
384,964✔
1087
        pTaskInfo->code = code;
×
1088
        taosMemoryFree(pWrapper);
×
1089
        return pTaskInfo->code;
×
1090
      }
1091
    } else if (pDataInfo->isVtbTagScan) {
92,318,640✔
1092
      code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
30,773✔
1093
      taosArrayDestroy(pDataInfo->pSrcUidList);
30,773✔
1094
      pDataInfo->pSrcUidList = NULL;
30,773✔
1095
      if (TSDB_CODE_SUCCESS != code) {
30,773✔
1096
        pTaskInfo->code = code;
×
1097
        taosMemoryFree(pWrapper);
×
1098
        return pTaskInfo->code;
×
1099
      }
1100
    } else {
1101
      if (pDataInfo->pSrcUidList) {
92,287,089✔
1102
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
267,414✔
1103
        taosArrayDestroy(pDataInfo->pSrcUidList);
267,414✔
1104
        pDataInfo->pSrcUidList = NULL;
267,414✔
1105
        if (TSDB_CODE_SUCCESS != code) {
267,414✔
1106
          pTaskInfo->code = code;
×
1107
          taosMemoryFree(pWrapper);
×
1108
          return pTaskInfo->code;
×
1109
        }
1110
      }
1111
    }
1112

1113
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
92,702,086✔
1114
    if (msgSize < 0) {
92,696,841✔
1115
      pTaskInfo->code = msgSize;
×
1116
      taosMemoryFree(pWrapper);
×
1117
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1118
      return pTaskInfo->code;
×
1119
    }
1120

1121
    void* msg = taosMemoryCalloc(1, msgSize);
92,696,841✔
1122
    if (NULL == msg) {
92,698,556✔
1123
      pTaskInfo->code = terrno;
×
1124
      taosMemoryFree(pWrapper);
×
1125
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1126
      return pTaskInfo->code;
×
1127
    }
1128

1129
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
92,698,556✔
1130
    if (msgSize < 0) {
92,698,635✔
1131
      pTaskInfo->code = msgSize;
×
1132
      taosMemoryFree(pWrapper);
×
1133
      taosMemoryFree(msg);
×
1134
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1135
      return pTaskInfo->code;
×
1136
    }
1137

1138
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
92,698,635✔
1139

1140
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
92,699,240✔
1141
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1142
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1143
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1144

1145
    // send the fetch remote task result reques
1146
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
92,704,470✔
1147
    if (NULL == pMsgSendInfo) {
92,703,972✔
1148
      taosMemoryFreeClear(msg);
×
1149
      taosMemoryFree(pWrapper);
×
1150
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1151
      pTaskInfo->code = terrno;
×
1152
      return pTaskInfo->code;
×
1153
    }
1154

1155
    pMsgSendInfo->param = pWrapper;
92,703,972✔
1156
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
92,704,165✔
1157
    pMsgSendInfo->msgInfo.pData = msg;
92,704,165✔
1158
    pMsgSendInfo->msgInfo.len = msgSize;
92,704,470✔
1159
    pMsgSendInfo->msgType = pSource->fetchMsgType;
92,703,951✔
1160
    pMsgSendInfo->fp = loadRemoteDataCallback;
92,703,955✔
1161
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
92,703,762✔
1162

1163
    int64_t transporterId = 0;
92,703,646✔
1164
    void* poolHandle = NULL;
92,703,569✔
1165
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
92,703,569✔
1166
    QUERY_CHECK_CODE(code, lino, _end);
92,704,663✔
1167
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
92,704,663✔
1168
    *pRpcHandle = transporterId;
92,704,663✔
1169
  }
1170

1171
_end:
92,704,663✔
1172
  if (code != TSDB_CODE_SUCCESS) {
92,704,663✔
1173
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1174
  }
1175
  return code;
92,704,663✔
1176
}
1177

1178
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
71,990,446✔
1179
                          SOperatorInfo* pOperator) {
1180
  pInfo->totalRows += numOfRows;
71,990,446✔
1181
  pInfo->totalSize += dataLen;
71,990,446✔
1182
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
71,990,446✔
1183
  pOperator->resultInfo.totalRows += numOfRows;
71,990,446✔
1184
}
71,990,446✔
1185

1186
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
308,556,066✔
1187
  int32_t      code = TSDB_CODE_SUCCESS;
308,556,066✔
1188
  int32_t      lino = 0;
308,556,066✔
1189
  SSDataBlock* pBlock = NULL;
308,556,066✔
1190
  if (pColList == NULL) {  // data from other sources
308,556,508✔
1191
    blockDataCleanup(pRes);
303,518,628✔
1192
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
303,516,838✔
1193
    if (code) {
303,516,577✔
1194
      return code;
×
1195
    }
1196
  } else {  // extract data according to pColList
1197
    char* pStart = pData;
5,037,880✔
1198

1199
    int32_t numOfCols = htonl(*(int32_t*)pStart);
5,037,880✔
1200
    pStart += sizeof(int32_t);
5,038,282✔
1201

1202
    // todo refactor:extract method
1203
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
5,038,282✔
1204
    for (int32_t i = 0; i < numOfCols; ++i) {
71,662,676✔
1205
      SSysTableSchema* p = (SSysTableSchema*)pStart;
66,624,394✔
1206

1207
      p->colId = htons(p->colId);
66,624,394✔
1208
      p->bytes = htonl(p->bytes);
66,624,394✔
1209
      pStart += sizeof(SSysTableSchema);
66,624,394✔
1210
    }
1211

1212
    pBlock = NULL;
5,038,282✔
1213
    code = createDataBlock(&pBlock);
5,038,282✔
1214
    QUERY_CHECK_CODE(code, lino, _end);
5,038,282✔
1215

1216
    for (int32_t i = 0; i < numOfCols; ++i) {
71,662,676✔
1217
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
66,624,394✔
1218
      code = blockDataAppendColInfo(pBlock, &idata);
66,624,394✔
1219
      QUERY_CHECK_CODE(code, lino, _end);
66,624,394✔
1220
    }
1221

1222
    code = blockDecodeInternal(pBlock, pStart, NULL);
5,038,282✔
1223
    QUERY_CHECK_CODE(code, lino, _end);
5,038,282✔
1224

1225
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
5,038,282✔
1226
    QUERY_CHECK_CODE(code, lino, _end);
5,038,282✔
1227

1228
    // data from mnode
1229
    pRes->info.dataLoad = 1;
5,038,282✔
1230
    pRes->info.rows = pBlock->info.rows;
5,038,282✔
1231
    pRes->info.scanFlag = MAIN_SCAN;
5,038,282✔
1232
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
5,038,282✔
1233
    QUERY_CHECK_CODE(code, lino, _end);
5,038,282✔
1234

1235
    blockDataDestroy(pBlock);
5,038,282✔
1236
    pBlock = NULL;
5,038,282✔
1237
  }
1238

1239
_end:
308,554,859✔
1240
  if (code != TSDB_CODE_SUCCESS) {
308,554,177✔
1241
    blockDataDestroy(pBlock);
×
1242
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1243
  }
1244
  return code;
308,553,988✔
1245
}
1246

1247
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
42,864,969✔
1248
  SExchangeInfo* pExchangeInfo = pOperator->info;
42,864,969✔
1249
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
42,865,374✔
1250

1251
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
42,865,374✔
1252
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
42,865,374✔
1253
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
42,864,964✔
1254
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1255
         pLoadInfo->totalElapsed / 1000.0);
1256

1257
  setOperatorCompleted(pOperator);
42,864,964✔
1258
}
42,864,557✔
1259

1260
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
147,998,220✔
1261
  int32_t code = TSDB_CODE_SUCCESS;
147,998,220✔
1262
  int32_t lino = 0;
147,998,220✔
1263
  size_t  total = taosArrayGetSize(pArray);
147,998,220✔
1264

1265
  int32_t completed = 0;
147,998,625✔
1266
  for (int32_t k = 0; k < total; ++k) {
394,077,320✔
1267
    SSourceDataInfo* p = taosArrayGet(pArray, k);
246,079,544✔
1268
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
246,079,337✔
1269
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
246,079,337✔
1270
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
115,907,475✔
1271
      completed += 1;
115,907,475✔
1272
    }
1273
  }
1274

1275
  *pRes = completed;
147,997,776✔
1276
_end:
147,997,166✔
1277
  if (code != TSDB_CODE_SUCCESS) {
147,997,166✔
1278
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1279
  }
1280
  return code;
147,996,761✔
1281
}
1282

1283
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
48,169,817✔
1284
  SExchangeInfo* pExchangeInfo = pOperator->info;
48,169,817✔
1285
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
48,169,705✔
1286

1287
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
48,170,259✔
1288
  int64_t startTs = taosGetTimestampUs();
48,169,649✔
1289

1290
  // Asynchronously send all fetch requests to all sources.
1291
  for (int32_t i = 0; i < totalSources; ++i) {
122,138,649✔
1292
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
73,968,390✔
1293
    if (code != TSDB_CODE_SUCCESS) {
73,969,000✔
UNCOV
1294
      pTaskInfo->code = code;
×
1295
      return code;
×
1296
    }
1297
  }
1298

1299
  int64_t endTs = taosGetTimestampUs();
48,170,259✔
1300
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
48,170,259✔
1301
         totalSources, (endTs - startTs) / 1000.0);
1302

1303
  pOperator->status = OP_RES_TO_RETURN;
48,170,259✔
1304
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
48,170,259✔
1305
  if (isTaskKilled(pTaskInfo)) {
48,170,259✔
1306
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1307
  }
1308

1309
  return TSDB_CODE_SUCCESS;
48,170,259✔
1310
}
1311

1312
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
66,952,164✔
1313
  int32_t            code = TSDB_CODE_SUCCESS;
66,952,164✔
1314
  int32_t            lino = 0;
66,952,164✔
1315
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
66,952,164✔
1316
  SSDataBlock*       pb = NULL;
66,952,164✔
1317

1318
  char* pNextStart = pRetrieveRsp->data;
66,952,164✔
1319
  char* pStart = pNextStart;
66,952,164✔
1320

1321
  int32_t index = 0;
66,952,164✔
1322

1323
  if (pRetrieveRsp->compressed) {  // decompress the data
66,952,164✔
1324
    if (pDataInfo->decompBuf == NULL) {
×
1325
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1326
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1327
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1328
    } else {
1329
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1330
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1331
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1332
        if (p != NULL) {
×
1333
          pDataInfo->decompBuf = p;
×
1334
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1335
        }
1336
      }
1337
    }
1338
  }
1339

1340
  while (index++ < pRetrieveRsp->numOfBlocks) {
370,471,733✔
1341
    pStart = pNextStart;
303,519,065✔
1342

1343
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
303,519,065✔
1344
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
142,446,456✔
1345
      blockDataCleanup(pb);
142,446,456✔
1346
    } else {
1347
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
161,073,163✔
1348
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
161,071,569✔
1349
    }
1350

1351
    int32_t compLen = *(int32_t*)pStart;
303,518,025✔
1352
    pStart += sizeof(int32_t);
303,517,583✔
1353

1354
    int32_t rawLen = *(int32_t*)pStart;
303,518,834✔
1355
    pStart += sizeof(int32_t);
303,518,834✔
1356
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
303,518,834✔
1357

1358
    pNextStart = pStart + compLen;
303,518,834✔
1359
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
303,518,834✔
1360
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1361
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1362
      pStart = pDataInfo->decompBuf;
×
1363
    }
1364

1365
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
303,518,834✔
1366
    if (code != 0) {
303,515,706✔
1367
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1368
      goto _end;
×
1369
    }
1370

1371
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
303,515,706✔
1372
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
303,516,702✔
1373
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
303,516,702✔
1374
    pb = NULL;
303,519,826✔
1375
  }
1376

1377
_end:
66,952,164✔
1378
  if (code != TSDB_CODE_SUCCESS) {
66,952,164✔
1379
    blockDataDestroy(pb);
×
1380
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1381
  }
1382
  return code;
66,952,164✔
1383
}
1384

1385
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
500,598✔
1386
  SExchangeInfo* pExchangeInfo = pOperator->info;
500,598✔
1387
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
500,971✔
1388

1389
  int32_t code = 0;
500,971✔
1390
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
500,971✔
1391
  int64_t startTs = taosGetTimestampUs();
500,971✔
1392

1393
  int32_t vgId = 0;
500,971✔
1394
  if (pExchangeInfo->dynTbname) {
500,971✔
1395
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
71,726✔
1396
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
71,726✔
1397
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
71,726✔
1398
      if (pValue != NULL && pValue->isTbname) {
71,726✔
1399
        vgId = pValue->vgId;
71,726✔
1400
        break;
71,726✔
1401
      }
1402
    }
1403
  }
1404

1405
  while (1) {
140,601✔
1406
    if (pExchangeInfo->current >= totalSources) {
641,572✔
1407
      setAllSourcesCompleted(pOperator);
137,611✔
1408
      return TSDB_CODE_SUCCESS;
137,611✔
1409
    }
1410

1411
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
503,961✔
1412
    if (!pSource) {
503,588✔
1413
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1414
      pTaskInfo->code = terrno;
×
1415
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1416
    }
1417

1418
    if (vgId != 0 && pSource->addr.nodeId != vgId){
503,588✔
1419
      pExchangeInfo->current += 1;
38,052✔
1420
      continue;
38,052✔
1421
    }
1422

1423
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
465,536✔
1424
    if (!pDataInfo) {
465,536✔
1425
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1426
      pTaskInfo->code = terrno;
×
1427
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1428
    }
1429
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
465,536✔
1430

1431
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
465,536✔
1432
    if (code != TSDB_CODE_SUCCESS) {
465,909✔
1433
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1434
      pTaskInfo->code = code;
×
1435
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1436
    }
1437

1438
    while (true) {
466✔
1439
      code = exchangeWait(pOperator, pExchangeInfo);
466,375✔
1440
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
466,375✔
1441
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
466✔
1442
      }
1443

1444
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
465,909✔
1445
      if (pDataInfo->seqId != currSeqId) {
465,909✔
1446
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
466✔
1447
        taosMemoryFreeClear(pDataInfo->pRsp);
466✔
1448
        continue;
466✔
1449
      }
1450

1451
      break;
465,443✔
1452
    }
1453

1454
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
465,443✔
1455
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
1456
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1457
             tstrerror(pDataInfo->code));
1458
      pOperator->pTaskInfo->code = pDataInfo->code;
×
1459
      return pOperator->pTaskInfo->code;
×
1460
    }
1461

1462
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
465,443✔
1463
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
465,443✔
1464

1465
    if (pRsp->numOfRows == 0) {
465,443✔
1466
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
102,549✔
1467
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1468
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1469
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1470

1471
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
102,549✔
1472
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
102,549✔
1473
        pExchangeInfo->current = totalSources;
69,383✔
1474
      } else {
1475
        pExchangeInfo->current += 1;
33,166✔
1476
      }
1477
      taosMemoryFreeClear(pDataInfo->pRsp);
102,549✔
1478
      continue;
102,549✔
1479
    }
1480

1481
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
362,894✔
1482
    if (code != TSDB_CODE_SUCCESS) {
362,894✔
1483
      goto _error;
×
1484
    }
1485

1486
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
362,894✔
1487
    if (pRsp->completed == 1) {
362,894✔
1488
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
86,556✔
1489
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1490
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1491
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1492
             pExchangeInfo->current + 1, totalSources);
1493

1494
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
86,556✔
1495
      if (pDataInfo->isVtbRefScan) {
86,556✔
1496
        pExchangeInfo->current = totalSources;
×
1497
      } else {
1498
        pExchangeInfo->current += 1;
86,556✔
1499
      }
1500
    } else {
1501
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
276,338✔
1502
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1503
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1504
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1505
    }
1506
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
362,894✔
1507
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
174,408✔
1508
    }
1509
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
362,894✔
1510
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
362,894✔
1511

1512
    taosMemoryFreeClear(pDataInfo->pRsp);
362,894✔
1513
    return TSDB_CODE_SUCCESS;
362,894✔
1514
  }
1515

1516
_error:
×
1517
  pTaskInfo->code = code;
×
1518
  return code;
×
1519
}
1520

1521
void clearVtbScanDataInfo(void* pItem) {
124,219✔
1522
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
124,219✔
1523
  if (pInfo->colMap) {
124,219✔
1524
    taosArrayDestroy(pInfo->colMap->colMap);
×
1525
    taosMemoryFreeClear(pInfo->colMap);
×
1526
  }
1527
  taosArrayDestroy(pInfo->pSrcUidList);
124,219✔
1528
}
124,219✔
1529

1530
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
683,151✔
1531
  SExchangeInfo*     pExchangeInfo = pOperator->info;
683,151✔
1532
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
683,151✔
1533
  if (NULL == pIdx) {
683,151✔
1534
    if (pBasicParam->isNewDeployed) {
4,411✔
1535
      SDownstreamSourceNode *pNode = NULL;
4,411✔
1536
      int32_t code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
4,411✔
1537
      if (code != TSDB_CODE_SUCCESS) {
4,411✔
1538
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1539
        return code;
×
1540
      }
1541

1542
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
4,411✔
1543
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
4,411✔
1544
      if (code != TSDB_CODE_SUCCESS) {
4,411✔
1545
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1546
        return code;
×
1547
      }
1548
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
4,411✔
1549
      if (!tmp) {
4,411✔
1550
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1551
        return terrno;
×
1552
      }
1553
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
4,411✔
1554
      code =
1555
          tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
4,411✔
1556
      if (pExchangeInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
4,411✔
1557
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1558
        return code;
×
1559
      }
1560
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
4,411✔
1561
      if (pIdx == NULL) {
4,411✔
1562
        qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1563
        return TSDB_CODE_INVALID_PARA;
×
1564
      }
1565
    } else {
1566
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1567
      return TSDB_CODE_INVALID_PARA;
×
1568
    }
1569
  }
1570

1571
  qDebug("start to add single exchange source");
683,151✔
1572

1573
  if (pBasicParam->isVtbWinScan) {
683,151✔
1574
    SSourceDataInfo dataInfo = {0};
×
1575
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
×
1576
    dataInfo.taskId = pExchangeInfo->pTaskId;
×
1577
    dataInfo.index = pIdx->srcIdx;
×
1578
    dataInfo.window = pBasicParam->window;
×
1579
    dataInfo.colMap = NULL;
×
1580

1581
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1582
    if (dataInfo.pSrcUidList == NULL) {
×
1583
      return terrno;
×
1584
    }
1585
    dataInfo.isNewParam = false;
×
1586
    dataInfo.isVtbRefScan = false;
×
1587
    dataInfo.isVtbTagScan = false;
×
1588
    dataInfo.isVtbWinScan = true;
×
1589
    dataInfo.srcOpType = pBasicParam->srcOpType;
×
1590
    dataInfo.tableSeq = pBasicParam->tableSeq;
×
1591

1592
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
×
1593
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
×
1594
    if (!tmp) {
×
1595
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1596
      return terrno;
×
1597
    }
1598
  } else if (pBasicParam->isVtbRefScan) {
683,151✔
1599
    SSourceDataInfo dataInfo = {0};
384,964✔
1600
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
384,964✔
1601
    dataInfo.taskId = pExchangeInfo->pTaskId;
384,964✔
1602
    dataInfo.index = pIdx->srcIdx;
384,964✔
1603
    dataInfo.window = pBasicParam->window;
384,964✔
1604
    dataInfo.isNewParam = pBasicParam->isNewParam;
384,964✔
1605
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
384,964✔
1606
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
384,964✔
1607
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
384,964✔
1608
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
384,964✔
1609

1610
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
384,964✔
1611
    if (dataInfo.pSrcUidList == NULL) {
384,964✔
1612
      return terrno;
×
1613
    }
1614

1615
    dataInfo.isVtbRefScan = true;
384,964✔
1616
    dataInfo.isVtbTagScan = false;
384,964✔
1617
    dataInfo.isVtbWinScan = false;
384,964✔
1618
    dataInfo.srcOpType = pBasicParam->srcOpType;
384,964✔
1619
    dataInfo.tableSeq = pBasicParam->tableSeq;
384,964✔
1620

1621
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
384,964✔
1622
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
384,964✔
1623
    if (!tmp) {
384,964✔
1624
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1625
      return terrno;
×
1626
    }
1627
  } else if (pBasicParam->isVtbTagScan) {
298,187✔
1628
    SSourceDataInfo dataInfo = {0};
30,773✔
1629
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
30,773✔
1630
    dataInfo.taskId = pExchangeInfo->pTaskId;
30,773✔
1631
    dataInfo.index = pIdx->srcIdx;
30,773✔
1632
    dataInfo.window = pBasicParam->window;
30,773✔
1633
    dataInfo.colMap = NULL;
30,773✔
1634

1635
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
30,773✔
1636
    if (dataInfo.pSrcUidList == NULL) {
30,773✔
1637
      return terrno;
×
1638
    }
1639

1640
    dataInfo.isNewParam = false;
30,773✔
1641
    dataInfo.isVtbRefScan = false;
30,773✔
1642
    dataInfo.isVtbTagScan = true;
30,773✔
1643
    dataInfo.isVtbWinScan = false;
30,773✔
1644
    dataInfo.srcOpType = pBasicParam->srcOpType;
30,773✔
1645
    dataInfo.tableSeq = pBasicParam->tableSeq;
30,773✔
1646

1647
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
30,773✔
1648
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
30,773✔
1649
    if (!tmp) {
30,773✔
1650
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1651
      return terrno;
×
1652
    }
1653
  } else {
1654
    if (pIdx->inUseIdx < 0) {
267,414✔
1655
      SSourceDataInfo dataInfo = {0};
266,070✔
1656
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
266,070✔
1657
      dataInfo.taskId = pExchangeInfo->pTaskId;
266,070✔
1658
      dataInfo.index = pIdx->srcIdx;
266,070✔
1659

1660
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
266,070✔
1661
      if (dataInfo.pSrcUidList == NULL) {
266,070✔
1662
        return terrno;
×
1663
      }
1664

1665
      dataInfo.isNewParam = false;
266,070✔
1666
      dataInfo.isVtbRefScan = false;
266,070✔
1667
      dataInfo.isVtbTagScan = false;
266,070✔
1668
      dataInfo.isVtbWinScan = false;
266,070✔
1669
      dataInfo.srcOpType = pBasicParam->srcOpType;
266,070✔
1670
      dataInfo.tableSeq = pBasicParam->tableSeq;
266,070✔
1671

1672
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
266,070✔
1673
      if (!tmp) {
266,070✔
1674
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1675
        return terrno;
×
1676
      }
1677
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
266,070✔
1678
    } else {
1679
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
1,344✔
1680
      if (!pDataInfo) {
1,344✔
1681
        return terrno;
×
1682
      }
1683
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1,344✔
1684
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,344✔
1685
      }
1686

1687
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,344✔
1688
      if (pDataInfo->pSrcUidList == NULL) {
1,344✔
1689
        return terrno;
×
1690
      }
1691

1692
      pDataInfo->isNewParam = false;
1,344✔
1693
      pDataInfo->isVtbRefScan = false;
1,344✔
1694
      pDataInfo->isVtbTagScan = false;
1,344✔
1695
      pDataInfo->isVtbWinScan = false;
1,344✔
1696
      pDataInfo->srcOpType = pBasicParam->srcOpType;
1,344✔
1697
      pDataInfo->tableSeq = pBasicParam->tableSeq;
1,344✔
1698
    }
1699
  }
1700

1701
  return TSDB_CODE_SUCCESS;
683,151✔
1702
}
1703

1704
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
566,659✔
1705
  SExchangeInfo*               pExchangeInfo = pOperator->info;
566,659✔
1706
  int32_t                      code = TSDB_CODE_SUCCESS;
566,659✔
1707
  SExchangeOperatorBasicParam* pBasicParam = NULL;
566,659✔
1708
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
566,659✔
1709
  if (pParam->multiParams) {
566,659✔
1710
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
148,234✔
1711
    int32_t                      iter = 0;
148,234✔
1712
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
412,960✔
1713
      code = addSingleExchangeSource(pOperator, pBasicParam);
264,726✔
1714
      if (code) {
264,726✔
1715
        return code;
×
1716
      }
1717
    }
1718
  } else {
1719
    pBasicParam = &pParam->basic;
418,425✔
1720
    code = addSingleExchangeSource(pOperator, pBasicParam);
418,425✔
1721
  }
1722

1723
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
566,659✔
1724
  pOperator->pOperatorGetParam = NULL;
566,659✔
1725

1726
  return code;
566,659✔
1727
}
1728

1729
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
372,840,291✔
1730
  SExchangeInfo* pExchangeInfo = pOperator->info;
372,840,291✔
1731
  int32_t        code = TSDB_CODE_SUCCESS;
372,841,782✔
1732
  int32_t        lino = 0;
372,841,782✔
1733
  
1734
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
372,841,782✔
1735
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
58,486,243✔
1736
    qDebug("skip prepare, opened:%d, dynamicOp:%d, getParam:%p", OPTR_IS_OPENED(pOperator), pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
314,795,000✔
1737
    return TSDB_CODE_SUCCESS;
314,795,376✔
1738
  }
1739

1740
  if (pExchangeInfo->dynamicOp) {
58,045,797✔
1741
    code = addDynamicExchangeSource(pOperator);
566,659✔
1742
    QUERY_CHECK_CODE(code, lino, _end);
566,659✔
1743
  }
1744

1745
  if (pOperator->status == OP_NOT_OPENED && (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) || IS_STREAM_MODE(pOperator->pTaskInfo)) {
58,046,589✔
1746
    pExchangeInfo->current = 0;
9,820,932✔
1747
  }
1748

1749
  int64_t st = taosGetTimestampUs();
58,045,479✔
1750

1751
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
58,045,479✔
1752
    code = prepareConcurrentlyLoad(pOperator);
48,169,954✔
1753
    QUERY_CHECK_CODE(code, lino, _end);
48,170,259✔
1754
    pExchangeInfo->openedTs = taosGetTimestampUs();
48,170,259✔
1755
  }
1756

1757
  OPTR_SET_OPENED(pOperator);
58,046,589✔
1758
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
58,045,404✔
1759

1760
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
58,046,179✔
1761

1762
_end:
4,394,912✔
1763
  if (code != TSDB_CODE_SUCCESS) {
58,045,772✔
1764
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1765
    pOperator->pTaskInfo->code = code;
×
1766
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1767
  }
1768
  return TSDB_CODE_SUCCESS;
58,045,772✔
1769
}
1770

1771
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
2,189,543✔
1772
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,189,543✔
1773

1774
  if (pLimitInfo->remainGroupOffset > 0) {
2,189,543✔
1775
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1776
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1777
      blockDataCleanup(pBlock);
×
1778
      return PROJECT_RETRIEVE_CONTINUE;
×
1779
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1780
      // now it is the data from a new group
1781
      pLimitInfo->remainGroupOffset -= 1;
×
1782

1783
      // ignore data block in current group
1784
      if (pLimitInfo->remainGroupOffset > 0) {
×
1785
        blockDataCleanup(pBlock);
×
1786
        return PROJECT_RETRIEVE_CONTINUE;
×
1787
      }
1788
    }
1789

1790
    // set current group id of the project operator
1791
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1792
  }
1793

1794
  // here check for a new group data, we need to handle the data of the previous group.
1795
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
2,189,543✔
1796
    pLimitInfo->numOfOutputGroups += 1;
80,033✔
1797
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
80,033✔
1798
      pOperator->status = OP_EXEC_DONE;
×
1799
      blockDataCleanup(pBlock);
×
1800

1801
      return PROJECT_RETRIEVE_DONE;
×
1802
    }
1803

1804
    // reset the value for a new group data
1805
    resetLimitInfoForNextGroup(pLimitInfo);
80,033✔
1806
    // existing rows that belongs to previous group.
1807
    if (pBlock->info.rows > 0) {
80,033✔
1808
      return PROJECT_RETRIEVE_DONE;
80,033✔
1809
    }
1810
  }
1811

1812
  // here we reach the start position, according to the limit/offset requirements.
1813

1814
  // set current group id
1815
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
2,109,510✔
1816

1817
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
2,109,510✔
1818
  if (pBlock->info.rows == 0) {
2,109,510✔
1819
    return PROJECT_RETRIEVE_CONTINUE;
1,166,291✔
1820
  } else {
1821
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
943,219✔
1822
      setOperatorCompleted(pOperator);
×
1823
      return PROJECT_RETRIEVE_DONE;
×
1824
    }
1825
  }
1826

1827
  // todo optimize performance
1828
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1829
  // they may not belong to the same group the limit/offset value is not valid in this case.
1830
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
943,219✔
1831
    return PROJECT_RETRIEVE_DONE;
943,219✔
1832
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1833
    return PROJECT_RETRIEVE_CONTINUE;
×
1834
  }
1835
}
1836

1837
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
92,465,802✔
1838
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
92,465,802✔
1839
  int32_t        code = TSDB_CODE_SUCCESS;
92,466,575✔
1840
  if (pTask->pWorkerCb) {
92,466,575✔
1841
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
92,466,382✔
1842
    if (code != TSDB_CODE_SUCCESS) {
92,466,575✔
1843
      pTask->code = code;
×
1844
      return pTask->code;
×
1845
    }
1846
  }
1847

1848
  code = tsem_wait(&pExchangeInfo->ready);
92,466,361✔
1849
  if (code != TSDB_CODE_SUCCESS) {
92,465,415✔
1850
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1851
    pTask->code = code;
×
1852
    return pTask->code;
×
1853
  }
1854

1855
  if (pTask->pWorkerCb) {
92,465,415✔
1856
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
92,466,167✔
1857
    if (code != TSDB_CODE_SUCCESS) {
92,466,575✔
1858
      pTask->code = code;
×
1859
      return pTask->code;
×
1860
    }
1861
  }
1862
  return TSDB_CODE_SUCCESS;
92,466,575✔
1863
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc