• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4806

17 Oct 2025 09:36AM UTC coverage: 61.094% (-0.2%) from 61.259%
#4806

push

travis-ci

web-flow
Merge da5cd734f into 21184b20f

155401 of 324487 branches covered (47.89%)

Branch coverage included in aggregate %.

72 of 84 new or added lines in 6 files covered. (85.71%)

778 existing lines in 110 files now uncovered.

207559 of 269610 relevant lines covered (76.98%)

127253104.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.5
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  uint32_t exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t            index;
36
  int64_t            seqId;
37
  SRWLatch           lock;
38
  SRetrieveTableRsp* pRsp;
39
  uint64_t           totalRows;
40
  int64_t            startTime;
41
  int32_t            code;
42
  EX_SOURCE_STATUS   status;
43
  const char*        taskId;
44
  SArray*            pSrcUidList;
45
  int32_t            srcOpType;
46
  bool               tableSeq;
47
  char*              decompBuf;
48
  int32_t            decompBufSize;
49
  SOrgTbInfo*        colMap;
50
  bool               isVtbRefScan;
51
  bool               isVtbTagScan;
52
  STimeWindow        window;
53
  bool               fetchSent; // need reset
54
} SSourceDataInfo;
55

56
static void destroyExchangeOperatorInfo(void* param);
57
static void freeBlock(void* pParam);
58
static void freeSourceDataInfo(void* param);
59
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
60

61
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
62
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
63
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
64
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
65
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
66
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
67
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
68
                                 bool holdDataInBuf);
69
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
70

71
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
72

73

74
static void streamConcurrentlyLoadRemoteData(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
20,641,205✔
75
                                           SExecTaskInfo* pTaskInfo) {
76
  int32_t code = 0;
20,641,205✔
77
  int32_t lino = 0;
20,649,149✔
78
  int64_t startTs = taosGetTimestampUs();  
20,641,205✔
79
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
20,641,205✔
80
  int32_t completed = 0;
20,640,740✔
81
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
20,640,740✔
82
  if (code != TSDB_CODE_SUCCESS) {
20,640,740!
83
    pTaskInfo->code = code;
×
84
    T_LONG_JMP(pTaskInfo->env, code);
×
85
  }
86
  if (completed == totalSources) {
20,640,740✔
87
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
5,105,017✔
88
    setAllSourcesCompleted(pOperator);
5,105,017✔
89
    return;
5,109,381✔
90
  }
91

92
  SSourceDataInfo* pDataInfo = NULL;
15,535,723✔
93

94
  while (1) {
8,764,702✔
95
    if (pExchangeInfo->current < 0) {
24,300,425✔
96
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
204,258✔
97
      setAllSourcesCompleted(pOperator);
204,258✔
98
      return;
204,258✔
99
    }
100
    
101
    if (pExchangeInfo->current >= totalSources) {
24,096,632✔
102
      completed = 0;
9,754,725✔
103
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
9,754,725✔
104
      if (code != TSDB_CODE_SUCCESS) {
9,754,172!
105
        pTaskInfo->code = code;
×
106
        T_LONG_JMP(pTaskInfo->env, code);
×
107
      }
108
      if (completed == totalSources) {
9,754,172✔
109
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
7,730,874✔
110
        setAllSourcesCompleted(pOperator);
7,730,874✔
111
        return;
7,730,321✔
112
      }
113
      
114
      pExchangeInfo->current = 0;
2,023,298✔
115
    }
116

117
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
16,365,205✔
118

119
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
16,365,670✔
120
    if (!pDataInfo) {
16,364,656!
121
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
122
      pTaskInfo->code = terrno;
×
123
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
124
    }
125

126
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
16,364,656✔
127
      pExchangeInfo->current++;
2,440✔
128
      continue;
3,016✔
129
    }
130

131
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
16,362,216✔
132

133
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
16,362,216✔
134
    if (code != TSDB_CODE_SUCCESS) {
16,363,318!
135
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
136
      pTaskInfo->code = code;
×
137
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
138
    }
139

140
    while (true) {
7,733✔
141
      code = exchangeWait(pOperator, pExchangeInfo);
16,371,051✔
142
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
16,370,510!
143
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
7,733!
144
      }
145

146
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
16,362,777✔
147
      if (pDataInfo->seqId != currSeqId) {
16,363,318✔
148
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
7,733✔
149
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
150
        taosMemoryFreeClear(pDataInfo->pRsp);
7,733!
151
        continue;
7,733✔
152
      }
153

154
      break;
16,355,585✔
155
    }
156

157
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
16,355,585✔
158
    if (!pSource) {
16,355,044!
159
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
160
      pTaskInfo->code = terrno;
×
161
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
162
    }
163

164
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
16,355,044✔
165
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
3,846!
166
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
167
             tstrerror(pDataInfo->code));
168
      pTaskInfo->code = pDataInfo->code;
3,846✔
169
      T_LONG_JMP(pTaskInfo->env, code);
3,846!
170
    }
171

172
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
16,351,198✔
173
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
16,351,198✔
174

175
    if (pRsp->numOfRows == 0) {
16,351,198✔
176
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
8,762,262✔
177
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
178
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
179
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
180

181
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
8,762,262✔
182
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
8,762,262!
183
        pExchangeInfo->current = -1;
204,258✔
184
      } else {
185
        pExchangeInfo->current += 1;
8,558,004✔
186
      }
187
      taosMemoryFreeClear(pDataInfo->pRsp);
8,762,262!
188
      continue;
8,761,686✔
189
    }
190

191
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
7,589,477✔
192
    TAOS_CHECK_EXIT(code);
7,589,477!
193

194
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
7,589,477✔
195
    if (pRsp->completed == 1) {
7,589,477✔
196
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
5,372,978✔
197
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
198
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
199
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
200
             pExchangeInfo->current + 1, totalSources);
201

202
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
5,372,978✔
203
      if (pDataInfo->isVtbRefScan) {
5,372,978!
204
        pExchangeInfo->current = -1;
×
205
        taosMemoryFreeClear(pDataInfo->pRsp);
×
206
        continue;
×
207
      }
208
    } else {
209
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
2,216,499✔
210
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
211
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
212
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
213
    }
214

215
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
7,589,477✔
216
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
7,589,477✔
217

218
    pExchangeInfo->current++;
7,589,477✔
219

220
    taosMemoryFreeClear(pDataInfo->pRsp);
7,589,477!
221
    return;
7,589,477✔
222
  }
223

224
_exit:
×
225

226
  if (code) {
×
227
    pTaskInfo->code = code;
×
228
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
229
  }
230
}
231

232

233
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
147,344,407✔
234
                                           SExecTaskInfo* pTaskInfo) {
235
  int32_t code = 0;
147,344,407✔
236
  int32_t lino = 0;
147,344,407✔
237
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
147,344,407✔
238
  int32_t completed = 0;
147,344,294✔
239
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
147,344,294✔
240
  if (code != TSDB_CODE_SUCCESS) {
147,344,122!
241
    pTaskInfo->code = code;
×
242
    T_LONG_JMP(pTaskInfo->env, code);
×
243
  }
244
  if (completed == totalSources) {
147,344,122✔
245
    setAllSourcesCompleted(pOperator);
45,658,184✔
246
    return;
45,658,552✔
247
  }
248

249
  SSourceDataInfo* pDataInfo = NULL;
101,685,938✔
250

251
  while (1) {
11,521,008✔
252
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
113,206,946✔
253
    code = exchangeWait(pOperator, pExchangeInfo);
113,206,946✔
254

255
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
113,206,968!
UNCOV
256
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
257
    }
258

259
    for (int32_t i = 0; i < totalSources; ++i) {
168,436,107✔
260
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
168,435,832✔
261
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
168,435,601!
262
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
168,435,601✔
263
        continue;
39,568,741✔
264
      }
265

266
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
128,867,366✔
267
        continue;
15,659,850✔
268
      }
269

270
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
113,207,516✔
271
      if (pDataInfo->seqId != currSeqId) {
113,207,243!
272
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
273
        taosMemoryFreeClear(pDataInfo->pRsp);
×
274
        break;
×
275
      }
276

277
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
113,207,516!
UNCOV
278
        code = pDataInfo->code;
×
UNCOV
279
        TAOS_CHECK_EXIT(code);
×
280
      }
281

282
      tmemory_barrier();
113,207,516✔
283
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
113,207,516✔
284
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
113,207,516✔
285
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
113,207,243!
286

287
      // todo
288
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
113,207,243✔
289
      if (pRsp->numOfRows == 0) {
113,207,243✔
290
        if (NULL != pDataInfo->pSrcUidList && (!pDataInfo->isVtbRefScan)) {
27,828,014!
291
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
292
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
293
          if (code != TSDB_CODE_SUCCESS) {
×
294
            taosMemoryFreeClear(pDataInfo->pRsp);
×
295
            TAOS_CHECK_EXIT(code);
×
296
          }
297
        } else {
298
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
27,828,014✔
299
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
27,828,287✔
300
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
301
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
302
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
303
          taosMemoryFreeClear(pDataInfo->pRsp);
27,828,287!
304
        }
305
        break;
27,828,287✔
306
      }
307

308
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
85,379,229!
309

310
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
85,379,229✔
311
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
85,379,229✔
312
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
85,379,016✔
313

314
      if (pRsp->completed == 1) {
85,379,229✔
315
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
72,508,888✔
316
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
72,508,888✔
317
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
318
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
319
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
320
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
321
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
322
      } else {
323
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
12,870,066!
324
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
325
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
326
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
327
      }
328

329
      taosMemoryFreeClear(pDataInfo->pRsp);
85,379,167!
330

331
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !pDataInfo->isVtbRefScan && !pDataInfo->isVtbTagScan) {
85,379,229!
332
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
12,870,066✔
333
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
12,870,066✔
334
        if (code != TSDB_CODE_SUCCESS) {
12,870,066!
335
          taosMemoryFreeClear(pDataInfo->pRsp);
×
336
          TAOS_CHECK_EXIT(code);
×
337
        }
338
      }
339
      
340
      return;
85,379,229✔
341
    }  // end loop
342

343
    int32_t complete1 = 0;
27,828,562✔
344
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
27,828,287✔
345
    if (code != TSDB_CODE_SUCCESS) {
27,828,014!
346
      pTaskInfo->code = code;
×
347
      T_LONG_JMP(pTaskInfo->env, code);
×
348
    }
349
    if (complete1 == totalSources) {
27,828,014✔
350
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
16,307,006✔
351
      return;
16,307,279✔
352
    }
353
  }
354

355
_exit:
273✔
356

UNCOV
357
  if (code) {
×
UNCOV
358
    pTaskInfo->code = code;
×
UNCOV
359
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
360
  }
361
}
362

363
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
427,242,512✔
364
  int32_t        code = TSDB_CODE_SUCCESS;
427,242,512✔
365
  SExchangeInfo* pExchangeInfo = pOperator->info;
427,242,512✔
366
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
427,242,797✔
367

368
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
427,242,244✔
369

370
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
427,241,703✔
371
  if (pOperator->status == OP_EXEC_DONE) {
427,241,703!
372
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
373
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
374
           pLoadInfo->totalElapsed / 1000.0);
375
    return NULL;
×
376
  }
377

378
  // we have buffered retrieved datablock, return it directly
379
  SSDataBlock* p = NULL;
427,242,237✔
380
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
427,242,512✔
381
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
226,630,837✔
382
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
226,630,837✔
383
  }
384

385
  if (p != NULL) {
427,242,323✔
386
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
226,630,837✔
387
    if (!tmp) {
226,629,214!
388
      code = terrno;
×
389
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
390
      pTaskInfo->code = code;
×
391
      T_LONG_JMP(pTaskInfo->env, code);
×
392
    }
393
    return p;
226,629,214✔
394
  } else {
395
    if (pExchangeInfo->seqLoadData) {
200,611,486!
396
      code = seqLoadRemoteData(pOperator);
32,626,262✔
397
      if (code != TSDB_CODE_SUCCESS) {
32,625,635!
398
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
399
        pTaskInfo->code = code;
×
400
        T_LONG_JMP(pTaskInfo->env, code);
×
401
      }
402
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
167,984,594✔
403
      streamConcurrentlyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
20,641,205✔
404
    } else {
405
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
147,344,009✔
406
    }
407
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
200,598,847!
UNCOV
408
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
×
UNCOV
409
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
×
410
    }
411
    
412
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
200,599,680✔
413
      qDebug("empty resultBlockList");
82,584,689✔
414
      return NULL;
82,584,689✔
415
    } else {
416
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
118,015,264✔
417
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
118,014,989✔
418
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
118,015,065✔
419
      if (!tmp) {
118,015,065!
420
        code = terrno;
×
421
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
422
        pTaskInfo->code = code;
×
423
        T_LONG_JMP(pTaskInfo->env, code);
×
424
      }
425

426
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
118,015,065✔
427
      return p;
118,015,065✔
428
    }
429
}
430
}
431

432
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
426,041,391✔
433
  int32_t        code = TSDB_CODE_SUCCESS;
426,041,391✔
434
  int32_t        lino = 0;
426,041,391✔
435
  SExchangeInfo* pExchangeInfo = pOperator->info;
426,041,391✔
436
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
426,042,806✔
437

438
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
426,041,496✔
439

440
  code = pOperator->fpSet._openFn(pOperator);
426,044,057✔
441
  QUERY_CHECK_CODE(code, lino, _end);
426,045,707!
442

443
  if (pOperator->status == OP_EXEC_DONE) {
426,045,707✔
444
    (*ppRes) = NULL;
55,293✔
445
    return code;
55,293✔
446
  }
447

448
  while (1) {
1,250,715✔
449
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
427,241,682✔
450
    if (pBlock == NULL) {
427,229,509✔
451
      (*ppRes) = NULL;
82,584,689✔
452
      return code;
82,584,689✔
453
    }
454

455
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
344,644,820✔
456
    QUERY_CHECK_CODE(code, lino, _end);
344,645,019!
457

458
    if (blockDataGetNumOfRows(pBlock) == 0) {
344,645,019✔
459
      qDebug("rows 0 block got, continue next load");
1,424!
460
      continue;
1,424✔
461
    }
462

463
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
344,644,677✔
464
    if (hasLimitOffsetInfo(pLimitInfo)) {
344,644,677✔
465
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
2,646,259✔
466
      if (status == PROJECT_RETRIEVE_CONTINUE) {
2,646,259✔
467
        qDebug("limit retrieve continue");
1,249,291✔
468
        continue;
1,249,291✔
469
      } else if (status == PROJECT_RETRIEVE_DONE) {
1,396,968!
470
        if (pBlock->info.rows == 0) {
1,396,968!
471
          setOperatorCompleted(pOperator);
×
472
          (*ppRes) = NULL;
×
473
          return code;
×
474
        } else {
475
          (*ppRes) = pBlock;
1,396,968✔
476
          return code;
1,396,968✔
477
        }
478
      }
479
    } else {
480
      (*ppRes) = pBlock;
341,997,135✔
481
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
341,996,083✔
482
      return code;
341,997,876✔
483
    }
484
  }
485

486
_end:
×
487

488
  if (code != TSDB_CODE_SUCCESS) {
×
489
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
490
    pTaskInfo->code = code;
×
491
    T_LONG_JMP(pTaskInfo->env, code);
×
492
  } else {
493
    qDebug("empty block returned in exchange");
×
494
  }
495
  
496
  (*ppRes) = NULL;
×
497
  return code;
×
498
}
499

500
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
70,894,262✔
501
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
70,894,262✔
502
  if (pInfo->pSourceDataInfo == NULL) {
70,892,779!
503
    return terrno;
×
504
  }
505

506
  if (pInfo->dynamicOp) {
70,894,077!
507
    return TSDB_CODE_SUCCESS;
2,734,949✔
508
  }
509

510
  int32_t len = strlen(id) + 1;
68,158,251!
511
  pInfo->pTaskId = taosMemoryCalloc(1, len);
68,158,251✔
512
  if (!pInfo->pTaskId) {
68,161,325!
513
    return terrno;
×
514
  }
515
  tstrncpy(pInfo->pTaskId, id, len);
68,157,444!
516
  for (int32_t i = 0; i < numOfSources; ++i) {
171,332,728✔
517
    SSourceDataInfo dataInfo = {0};
103,169,473✔
518
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
103,170,033✔
519
    dataInfo.taskId = pInfo->pTaskId;
103,170,033✔
520
    dataInfo.index = i;
103,172,470✔
521
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
103,172,470✔
522
    if (pDs == NULL) {
103,174,544!
523
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
524
      return terrno;
×
525
    }
526
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
103,174,544✔
527
  }
528

529
  return TSDB_CODE_SUCCESS;
68,163,255✔
530
}
531

532
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
70,896,448✔
533
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
70,896,448!
534

535
  if (numOfSources == 0) {
70,894,830!
536
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
537
    return TSDB_CODE_INVALID_PARA;
×
538
  }
539
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
70,894,830✔
540
  if (!pInfo->pFetchRpcHandles) {
70,895,543!
541
    return terrno;
×
542
  }
543
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
70,895,083✔
544
  if (!ret) {
70,894,601!
545
    return terrno;
×
546
  }
547

548
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
70,894,601✔
549
  if (pInfo->pSources == NULL) {
70,896,794!
550
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
551
    return terrno;
×
552
  }
553

554
  if (pExNode->node.dynamicOp) {
70,895,657!
555
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
2,734,949✔
556
    if (NULL == pInfo->pHashSources) {
2,734,949!
557
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
558
      return terrno;
×
559
    }
560
  }
561

562
  for (int32_t i = 0; i < numOfSources; ++i) {
181,910,939✔
563
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
111,014,083✔
564
    if (!pNode) {
111,012,504!
565
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
566
      return terrno;
×
567
    }
568
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
111,012,504✔
569
    if (!tmp) {
111,016,533!
570
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
571
      return terrno;
×
572
    }
573
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
111,016,533✔
574
    int32_t           code =
575
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
111,015,891✔
576
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
111,014,947!
577
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
578
      return code;
×
579
    }
580
  }
581

582
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
70,896,856✔
583
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
70,895,513✔
584
  if (refId < 0) {
70,895,087!
585
    int32_t code = terrno;
×
586
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
587
    return code;
×
588
  } else {
589
    pInfo->self = refId;
70,895,087✔
590
  }
591

592
  return initDataSource(numOfSources, pInfo, id);
70,895,387✔
593
}
594

595
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
33,318,916✔
596
  SExchangeInfo* pInfo = pOper->info;
33,318,916✔
597
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
33,326,028✔
598

599
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
33,328,749✔
600

601
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
33,333,697✔
602
  pOper->status = OP_NOT_OPENED;
33,332,599✔
603
  pInfo->current = 0;
33,332,599✔
604
  pInfo->loadInfo.totalElapsed = 0;
33,332,599✔
605
  pInfo->loadInfo.totalRows = 0;
33,332,599✔
606
  pInfo->loadInfo.totalSize = 0;
33,333,148✔
607
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
113,337,186✔
608
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
80,001,325✔
609
    taosWLockLatch(&pDataInfo->lock);
80,001,874✔
610
    taosMemoryFreeClear(pDataInfo->decompBuf);
80,004,579!
611
    taosMemoryFreeClear(pDataInfo->pRsp);
80,004,038!
612

613
    pDataInfo->totalRows = 0;
80,004,579✔
614
    pDataInfo->code = 0;
80,004,579✔
615
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
80,004,579✔
616
    pDataInfo->fetchSent = false;
80,004,030✔
617
    taosWUnLockLatch(&pDataInfo->lock);
80,002,948✔
618
  }
619

620
  if (pInfo->dynamicOp) {
33,333,148!
621
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
10,156,192✔
622
  } 
623

624
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
33,333,148✔
625
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
33,332,050✔
626

627
  blockDataCleanup(pInfo->pDummyBlock);
33,332,058✔
628

629
  void   *data = NULL;
33,330,972✔
630
  int32_t iter = 0;
33,330,972✔
631
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
47,854,669✔
632
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
14,523,144✔
633
  }
634
  
635
  pInfo->limitInfo = (SLimitInfo){0};
33,328,757✔
636
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
33,328,757✔
637

638
  return 0;
33,331,517✔
639
}
640

641
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
70,896,933✔
642
                                   SOperatorInfo** pOptrInfo) {
643
  QRY_PARAM_CHECK(pOptrInfo);
70,896,933!
644

645
  int32_t        code = 0;
70,897,557✔
646
  int32_t        lino = 0;
70,897,557✔
647
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
70,897,557✔
648
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
70,894,155✔
649
  if (pInfo == NULL || pOperator == NULL) {
70,892,686!
UNCOV
650
    code = terrno;
×
651
    goto _error;
×
652
  }
653

654
  pOperator->pPhyNode = pExNode;
70,893,227✔
655
  pInfo->dynamicOp = pExNode->node.dynamicOp;
70,893,227!
656
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
70,894,707✔
657
  QUERY_CHECK_CODE(code, lino, _error);
70,898,204!
658

659
  code = tsem_init(&pInfo->ready, 0, 0);
70,898,204✔
660
  QUERY_CHECK_CODE(code, lino, _error);
70,897,993!
661

662
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
70,897,993✔
663
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
70,898,477!
664

665
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
70,897,480✔
666
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
70,897,919!
667
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
70,897,976✔
668
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
70,898,204!
669

670
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
70,897,919✔
671
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
70,897,706✔
672
  QUERY_CHECK_CODE(code, lino, _error);
70,897,523!
673

674
  pInfo->seqLoadData = pExNode->seqRecvData;
70,897,523!
675
  pInfo->dynTbname = pExNode->dynTbname;
70,897,523!
676
  if (pInfo->dynTbname) {
70,896,845!
677
    pInfo->seqLoadData = true;
16,563✔
678
  }
679
  pInfo->pTransporter = pTransporter;
70,896,617✔
680

681
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
70,897,073✔
682
                  pTaskInfo);
683
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
70,895,277✔
684

685
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
70,897,793✔
686
                            pTaskInfo->pStreamRuntimeInfo);
70,895,794✔
687
  QUERY_CHECK_CODE(code, lino, _error);
70,897,150!
688
  qTrace("%s exchange op:%p", __func__, pOperator);
70,897,150!
689
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
70,897,150✔
690
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
691
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
70,894,677✔
692
  *pOptrInfo = pOperator;
70,896,736✔
693
  return TSDB_CODE_SUCCESS;
70,896,280✔
694

695
_error:
×
696
  if (code != TSDB_CODE_SUCCESS) {
×
697
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
698
    pTaskInfo->code = code;
×
699
  }
700
  if (pInfo != NULL) {
×
701
    doDestroyExchangeOperatorInfo(pInfo);
×
702
  }
703

704
  if (pOperator != NULL) {
×
705
    pOperator->info = NULL;
×
706
    destroyOperator(pOperator);
×
707
  }
708
  return code;
×
709
}
710

711
void destroyExchangeOperatorInfo(void* param) {
70,898,204✔
712
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
70,898,204✔
713
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
70,898,204✔
714
  if (code != TSDB_CODE_SUCCESS) {
70,897,041!
715
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
716
  }
717
}
70,897,041✔
718

719
void freeBlock(void* pParam) {
180,532,190✔
720
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
180,532,190✔
721
  blockDataDestroy(pBlock);
180,532,765✔
722
}
180,534,039✔
723

724
void freeSourceDataInfo(void* p) {
104,428,698✔
725
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
104,428,698✔
726
  taosMemoryFreeClear(pInfo->decompBuf);
104,428,698!
727
  taosMemoryFreeClear(pInfo->pRsp);
104,429,811!
728

729
  pInfo->decompBufSize = 0;
104,426,943✔
730
}
104,428,156✔
731

732
void doDestroyExchangeOperatorInfo(void* param) {
70,898,204✔
733
  if (param == NULL) {
70,898,204!
734
    return;
×
735
  }
736
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
70,898,204✔
737
  if (pExInfo->pFetchRpcHandles) {
70,898,204!
738
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
181,917,121✔
739
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
111,018,375✔
740
      if (*pRpcHandle > 0) {
111,017,532✔
741
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
5,683,189✔
742
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
5,683,189✔
743
      }
744
    }
745
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
70,897,662✔
746
  }
747

748
  taosArrayDestroy(pExInfo->pSources);
70,897,618✔
749
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
70,898,264✔
750

751
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
70,897,668✔
752
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
70,897,668✔
753

754
  blockDataDestroy(pExInfo->pDummyBlock);
70,898,264✔
755
  tSimpleHashCleanup(pExInfo->pHashSources);
70,896,833✔
756

757
  int32_t code = tsem_destroy(&pExInfo->ready);
70,897,126✔
758
  if (code != TSDB_CODE_SUCCESS) {
70,897,881!
759
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
760
  }
761
  taosMemoryFreeClear(pExInfo->pTaskId);
70,897,881✔
762

763
  taosMemoryFreeClear(param);
70,895,175!
764
}
765

766
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
164,153,677✔
767
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
164,153,677✔
768

769
  taosMemoryFreeClear(pMsg->pEpSet);
164,153,677!
770
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
164,157,794✔
771
  if (pExchangeInfo == NULL) {
164,158,266✔
772
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
31,313!
773
    taosMemoryFree(pMsg->pData);
31,313!
774
    return TSDB_CODE_SUCCESS;
31,313✔
775
  }
776

777
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
164,126,953✔
778
  if (pWrapper->seqId != currSeqId) {
164,123,543!
779
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
780
    taosMemoryFree(pMsg->pData);
×
781
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
782
    if (code != TSDB_CODE_SUCCESS) {
×
783
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
784
    }
785
    return TSDB_CODE_SUCCESS;
×
786
  }
787

788
  int32_t          index = pWrapper->sourceIndex;
164,115,496✔
789

790
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
164,122,973✔
791

792
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
164,127,511✔
793
  if (pRpcHandle != NULL) {
164,126,225✔
794
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
164,123,205✔
795
    if (ret != 0) {
164,129,854✔
796
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
7,547,698✔
797
    }
798
    *pRpcHandle = -1;
164,129,854✔
799
  }
800

801
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
164,126,406✔
802
  if (!pSourceDataInfo) {
164,129,632!
803
    return terrno;
×
804
  }
805

806
  if (0 == code && NULL == pMsg->pData) {
164,129,632!
807
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
808
    code = TSDB_CODE_QRY_INVALID_MSG;
×
809
  }
810

811
  taosWLockLatch(&pSourceDataInfo->lock);
164,132,986✔
812
  if (code == TSDB_CODE_SUCCESS) {
164,124,853✔
813
    pSourceDataInfo->seqId = pWrapper->seqId;
164,112,647✔
814
    pSourceDataInfo->pRsp = pMsg->pData;
164,110,092✔
815

816
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
164,101,812✔
817
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
164,094,449✔
818
    pRsp->compLen = htonl(pRsp->compLen);
164,108,195✔
819
    pRsp->payloadLen = htonl(pRsp->payloadLen);
164,109,266✔
820
    pRsp->numOfCols = htonl(pRsp->numOfCols);
164,089,336✔
821
    pRsp->useconds = htobe64(pRsp->useconds);
164,086,153✔
822
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
164,071,428✔
823

824
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
164,068,225✔
825
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
826
  } else {
827
    taosMemoryFree(pMsg->pData);
12,206!
828
    pSourceDataInfo->code = rpcCvtErrCode(code);
12,206✔
829
    if (pSourceDataInfo->code != code) {
12,206!
830
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
831
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
832
    } else {
833
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
12,206!
834
             pExchangeInfo);
835
    }
836
  }
837

838
  tmemory_barrier();
164,084,213✔
839
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
164,084,213✔
840
  taosWUnLockLatch(&pSourceDataInfo->lock);
164,105,848✔
841
  
842
  code = tsem_post(&pExchangeInfo->ready);
164,101,556✔
843
  if (code != TSDB_CODE_SUCCESS) {
164,119,621!
844
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
845
    return code;
×
846
  }
847

848
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
164,119,621✔
849
  if (code != TSDB_CODE_SUCCESS) {
164,129,682!
850
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
851
  }
852
  return code;
164,129,457✔
853
}
854

855
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
359,410✔
856
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
359,410!
857
  if (NULL == *ppRes) {
359,410!
858
    return terrno;
×
859
  }
860

861
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
359,410!
862
  if (NULL == pScan) {
359,410!
863
    taosMemoryFreeClear(*ppRes);
×
864
    return terrno;
×
865
  }
866

867
  pScan->pUidList = taosArrayDup(pUidList, NULL);
359,410✔
868
  if (NULL == pScan->pUidList) {
359,410!
869
    taosMemoryFree(pScan);
×
870
    taosMemoryFreeClear(*ppRes);
×
871
    return terrno;
×
872
  }
873
  pScan->tableSeq = tableSeq;
359,410✔
874
  pScan->pOrgTbInfo = NULL;
359,410✔
875
  pScan->window.skey = INT64_MAX;
359,410✔
876
  pScan->window.ekey = INT64_MIN;
359,410✔
877

878
  (*ppRes)->opType = srcOpType;
359,410✔
879
  (*ppRes)->downstreamIdx = 0;
359,410✔
880
  (*ppRes)->value = pScan;
359,410✔
881
  (*ppRes)->pChildren = NULL;
359,410✔
882
  (*ppRes)->reUse = false;
359,410✔
883

884
  return TSDB_CODE_SUCCESS;
359,410✔
885
}
886

887
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window) {
30,400,622✔
888
  int32_t                  code = TSDB_CODE_SUCCESS;
30,400,622✔
889
  int32_t                  lino = 0;
30,400,622✔
890
  STableScanOperatorParam* pScan = NULL;
30,400,622✔
891

892
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
30,400,622!
893
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
30,400,622!
894

895
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
30,400,622!
896
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
30,400,622!
897

898
  pScan->pUidList = taosArrayDup(pUidList, NULL);
30,400,622✔
899
  QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
30,400,622!
900

901
  pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
30,400,622!
902
  QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
30,400,622!
903

904
  pScan->pOrgTbInfo->vgId = pMap->vgId;
30,400,622✔
905
  tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
30,400,622!
906

907
  pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
30,400,622✔
908
  QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
30,400,622!
909

910
  pScan->tableSeq = tableSeq;
30,400,622✔
911
  pScan->window.skey = window->skey;
30,400,622✔
912
  pScan->window.ekey = window->ekey;
30,400,622✔
913

914
  (*ppRes)->opType = srcOpType;
30,400,622✔
915
  (*ppRes)->downstreamIdx = 0;
30,400,622✔
916
  (*ppRes)->value = pScan;
30,400,622✔
917
  (*ppRes)->pChildren = NULL;
30,400,622✔
918
  (*ppRes)->reUse = false;
30,400,622✔
919

920
  return code;
30,400,622✔
921
_return:
×
922
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
923
  taosMemoryFreeClear(*ppRes);
×
924
  if (pScan) {
×
925
    taosArrayDestroy(pScan->pUidList);
×
926
    if (pScan->pOrgTbInfo) {
×
927
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
928
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
929
    }
930
    taosMemoryFree(pScan);
×
931
  }
932
  return code;
×
933
}
934

935
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
2,254,035✔
936
  int32_t                  code = TSDB_CODE_SUCCESS;
2,254,035✔
937
  int32_t                  lino = 0;
2,254,035✔
938
  STagScanOperatorParam*   pScan = NULL;
2,254,035✔
939

940
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
2,254,035!
941
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
2,254,035!
942

943
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
2,254,035!
944
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
2,254,035!
945
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
2,254,035✔
946

947
  (*ppRes)->opType = srcOpType;
2,254,035✔
948
  (*ppRes)->downstreamIdx = 0;
2,254,035✔
949
  (*ppRes)->value = pScan;
2,254,035✔
950
  (*ppRes)->pChildren = NULL;
2,254,035✔
951
  (*ppRes)->reUse = false;
2,254,035✔
952

953
  return code;
2,254,035✔
954
_return:
×
955
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
956
  taosMemoryFreeClear(*ppRes);
×
957
  if (pScan) {
×
958
    taosMemoryFree(pScan);
×
959
  }
960
  return code;
×
961
}
962

963
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
6,591,959✔
964
  if (!pRuntimeInfo || !pTimeRange) {
6,591,959!
965
    return TSDB_CODE_INTERNAL_ERROR;
×
966
  }
967

968
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
6,591,959✔
969
  if (!pParam) {
6,592,508!
970
    return TSDB_CODE_INTERNAL_ERROR;
×
971
  }
972

973
  switch (pRuntimeInfo->triggerType) {
6,592,508✔
974
    case STREAM_TRIGGER_SLIDING:
5,550,709✔
975
      // Unable to distinguish whether there is an interval, all use wstart/wend
976
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
977
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
5,550,709✔
978
      pTimeRange->ekey = pParam->wend;    // is equal to wend
5,550,709✔
979
      break;
5,550,709✔
980
    case STREAM_TRIGGER_PERIOD:
77,910✔
981
      pTimeRange->skey = pParam->prevLocalTime;
77,910✔
982
      pTimeRange->ekey = pParam->triggerTime;
77,910✔
983
      break;
77,910✔
984
    default:
963,889✔
985
      pTimeRange->skey = pParam->wstart;
963,889✔
986
      pTimeRange->ekey = pParam->wend;
963,889✔
987
      break;
963,889✔
988
  }
989

990
  return TSDB_CODE_SUCCESS;
6,592,508✔
991
}
992

993
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
164,353,238✔
994
  int32_t          code = TSDB_CODE_SUCCESS;
164,353,238✔
995
  int32_t          lino = 0;
164,353,238✔
996
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
164,353,238✔
997
  if (!pDataInfo) {
164,349,995!
998
    return terrno;
×
999
  }
1000

1001
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
164,349,995!
1002
    return TSDB_CODE_SUCCESS;
×
1003
  }
1004

1005
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
164,351,982✔
1006
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
164,351,982✔
1007
  if (!pSource) {
164,351,319!
1008
    return terrno;
×
1009
  }
1010

1011
  pDataInfo->startTime = taosGetTimestampUs();
164,352,662✔
1012
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
164,352,575✔
1013

1014
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
164,351,421✔
1015
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
164,352,700!
1016
  pWrapper->exchangeId = pExchangeInfo->self;
164,352,700✔
1017
  pWrapper->sourceIndex = sourceIndex;
164,353,258✔
1018
  pWrapper->seqId = pExchangeInfo->seqId;
164,352,261✔
1019

1020
  if (pSource->localExec) {
164,352,055!
UNCOV
1021
    SDataBuf pBuf = {0};
×
UNCOV
1022
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
×
1023
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
1024
                                               pTaskInfo->localFetch.explainRes);
UNCOV
1025
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
UNCOV
1026
    taosMemoryFree(pWrapper);
×
UNCOV
1027
    QUERY_CHECK_CODE(code, lino, _end);
×
1028
  } else {
1029
    bool needStreamPesudoFuncVals = true;
164,351,679✔
1030
    SResFetchReq req = {0};
164,351,679✔
1031
    req.header.vgId = pSource->addr.nodeId;
164,352,301✔
1032
    req.sId = pSource->sId;
164,351,220✔
1033
    req.clientId = pSource->clientId;
164,351,938✔
1034
    req.taskId = pSource->taskId;
164,351,878✔
1035
    req.queryId = pTaskInfo->id.queryId;
164,351,715✔
1036
    req.execId = pSource->execId;
164,351,665✔
1037
    if (pTaskInfo->pStreamRuntimeInfo) {
164,351,694✔
1038
      req.dynTbname = pExchangeInfo->dynTbname;
16,465,581!
1039
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
16,465,581✔
1040
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
16,465,581✔
1041

1042
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
16,465,581✔
1043
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
46,466!
1044
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
16,419,115✔
1045
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
6,592,508✔
1046
        QUERY_CHECK_CODE(code, lino, _end);
6,592,508!
1047
        needStreamPesudoFuncVals = false;
6,592,508✔
1048
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
6,592,508✔
1049
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1050
               req.pStRtFuncInfo->curWindow.ekey);
1051
      }
1052
      if (!pDataInfo->fetchSent) {
16,465,116✔
1053
        req.reset = pDataInfo->fetchSent = true;
14,432,420!
1054
      }
1055
    }
1056
    if (pDataInfo->isVtbRefScan) {
164,347,680!
1057
      code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->colMap, pDataInfo->tableSeq, &pDataInfo->window);
30,400,622!
1058
      taosArrayDestroy(pDataInfo->colMap->colMap);
30,400,622✔
1059
      taosMemoryFreeClear(pDataInfo->colMap);
30,400,622!
1060
      taosArrayDestroy(pDataInfo->pSrcUidList);
30,400,622✔
1061
      pDataInfo->pSrcUidList = NULL;
30,400,622✔
1062
      if (TSDB_CODE_SUCCESS != code) {
30,400,622!
1063
        pTaskInfo->code = code;
×
1064
        taosMemoryFree(pWrapper);
×
1065
        return pTaskInfo->code;
×
1066
      }
1067
    } else if (pDataInfo->isVtbTagScan) {
133,949,781!
1068
      code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
2,254,035✔
1069
      taosArrayDestroy(pDataInfo->pSrcUidList);
2,254,035✔
1070
      pDataInfo->pSrcUidList = NULL;
2,254,035✔
1071
      if (TSDB_CODE_SUCCESS != code) {
2,254,035!
1072
        pTaskInfo->code = code;
×
1073
        taosMemoryFree(pWrapper);
×
1074
        return pTaskInfo->code;
×
1075
      }
1076
    } else {
1077
      if (pDataInfo->pSrcUidList) {
131,695,554✔
1078
        code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
343,996!
1079
        taosArrayDestroy(pDataInfo->pSrcUidList);
343,996✔
1080
        pDataInfo->pSrcUidList = NULL;
343,996✔
1081
        if (TSDB_CODE_SUCCESS != code) {
343,996!
1082
          pTaskInfo->code = code;
×
1083
          taosMemoryFree(pWrapper);
×
1084
          return pTaskInfo->code;
×
1085
        }
1086
      }
1087
    }
1088

1089
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
164,351,669✔
1090
    if (msgSize < 0) {
164,339,347!
1091
      pTaskInfo->code = msgSize;
×
1092
      taosMemoryFree(pWrapper);
×
1093
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1094
      return pTaskInfo->code;
×
1095
    }
1096

1097
    void* msg = taosMemoryCalloc(1, msgSize);
164,339,347✔
1098
    if (NULL == msg) {
164,343,806!
1099
      pTaskInfo->code = terrno;
×
1100
      taosMemoryFree(pWrapper);
×
1101
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1102
      return pTaskInfo->code;
×
1103
    }
1104

1105
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
164,343,806✔
1106
    if (msgSize < 0) {
164,340,479!
1107
      pTaskInfo->code = msgSize;
×
1108
      taosMemoryFree(pWrapper);
×
1109
      taosMemoryFree(msg);
×
1110
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1111
      return pTaskInfo->code;
×
1112
    }
1113

1114
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
164,340,479✔
1115

1116
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
164,343,988✔
1117
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1118
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1119
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1120

1121
    // send the fetch remote task result reques
1122
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
164,353,211✔
1123
    if (NULL == pMsgSendInfo) {
164,351,407!
1124
      taosMemoryFreeClear(msg);
×
1125
      taosMemoryFree(pWrapper);
×
1126
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1127
      pTaskInfo->code = terrno;
×
1128
      return pTaskInfo->code;
×
1129
    }
1130

1131
    pMsgSendInfo->param = pWrapper;
164,351,407✔
1132
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
164,352,156✔
1133
    pMsgSendInfo->msgInfo.pData = msg;
164,352,439✔
1134
    pMsgSendInfo->msgInfo.len = msgSize;
164,352,316✔
1135
    pMsgSendInfo->msgType = pSource->fetchMsgType;
164,351,955✔
1136
    pMsgSendInfo->fp = loadRemoteDataCallback;
164,352,501✔
1137
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
164,351,953✔
1138

1139
    int64_t transporterId = 0;
164,351,955✔
1140
    void* poolHandle = NULL;
164,352,511✔
1141
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
164,352,511✔
1142
    QUERY_CHECK_CODE(code, lino, _end);
164,353,523!
1143
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
164,353,523✔
1144
    *pRpcHandle = transporterId;
164,354,007✔
1145
  }
1146

1147
_end:
164,354,007✔
1148
  if (code != TSDB_CODE_SUCCESS) {
164,354,007!
1149
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1150
  }
1151
  return code;
164,353,523✔
1152
}
1153

1154
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
123,046,203✔
1155
                          SOperatorInfo* pOperator) {
1156
  pInfo->totalRows += numOfRows;
123,046,203✔
1157
  pInfo->totalSize += dataLen;
123,046,203✔
1158
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
123,046,203✔
1159
  pOperator->resultInfo.totalRows += numOfRows;
123,045,990✔
1160
}
123,045,990✔
1161

1162
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
361,271,824✔
1163
  int32_t      code = TSDB_CODE_SUCCESS;
361,271,824✔
1164
  int32_t      lino = 0;
361,271,824✔
1165
  SSDataBlock* pBlock = NULL;
361,271,824✔
1166
  if (pColList == NULL) {  // data from other sources
361,271,824✔
1167
    blockDataCleanup(pRes);
356,240,885✔
1168
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
356,238,721✔
1169
    if (code) {
356,239,021!
1170
      return code;
×
1171
    }
1172
  } else {  // extract data according to pColList
1173
    char* pStart = pData;
5,030,939✔
1174

1175
    int32_t numOfCols = htonl(*(int32_t*)pStart);
5,030,939✔
1176
    pStart += sizeof(int32_t);
5,030,939✔
1177

1178
    // todo refactor:extract method
1179
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
5,030,939✔
1180
    for (int32_t i = 0; i < numOfCols; ++i) {
72,088,271✔
1181
      SSysTableSchema* p = (SSysTableSchema*)pStart;
67,057,332✔
1182

1183
      p->colId = htons(p->colId);
67,057,332✔
1184
      p->bytes = htonl(p->bytes);
67,057,332✔
1185
      pStart += sizeof(SSysTableSchema);
67,057,332✔
1186
    }
1187

1188
    pBlock = NULL;
5,030,939✔
1189
    code = createDataBlock(&pBlock);
5,030,939✔
1190
    QUERY_CHECK_CODE(code, lino, _end);
5,030,939!
1191

1192
    for (int32_t i = 0; i < numOfCols; ++i) {
72,087,978✔
1193
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
67,057,039✔
1194
      code = blockDataAppendColInfo(pBlock, &idata);
67,057,039✔
1195
      QUERY_CHECK_CODE(code, lino, _end);
67,057,039!
1196
    }
1197

1198
    code = blockDecodeInternal(pBlock, pStart, NULL);
5,030,939✔
1199
    QUERY_CHECK_CODE(code, lino, _end);
5,030,939!
1200

1201
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
5,030,939✔
1202
    QUERY_CHECK_CODE(code, lino, _end);
5,030,939!
1203

1204
    // data from mnode
1205
    pRes->info.dataLoad = 1;
5,030,939✔
1206
    pRes->info.rows = pBlock->info.rows;
5,030,939✔
1207
    pRes->info.scanFlag = MAIN_SCAN;
5,030,939✔
1208
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
5,030,939✔
1209
    QUERY_CHECK_CODE(code, lino, _end);
5,030,939!
1210

1211
    blockDataDestroy(pBlock);
5,030,939✔
1212
    pBlock = NULL;
5,030,939✔
1213
  }
1214

1215
_end:
361,269,960✔
1216
  if (code != TSDB_CODE_SUCCESS) {
361,269,206!
1217
    blockDataDestroy(pBlock);
×
1218
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1219
  }
1220
  return code;
361,269,206✔
1221
}
1222

1223
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
66,277,410✔
1224
  SExchangeInfo* pExchangeInfo = pOperator->info;
66,277,410✔
1225
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
66,277,410✔
1226

1227
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
66,277,410✔
1228
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
66,277,410✔
1229
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
66,276,302✔
1230
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1231
         pLoadInfo->totalElapsed / 1000.0);
1232

1233
  setOperatorCompleted(pOperator);
66,276,302✔
1234
}
66,276,304✔
1235

1236
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
205,568,396✔
1237
  int32_t code = TSDB_CODE_SUCCESS;
205,568,396✔
1238
  int32_t lino = 0;
205,568,396✔
1239
  size_t  total = taosArrayGetSize(pArray);
205,568,396✔
1240

1241
  int32_t completed = 0;
205,568,064✔
1242
  for (int32_t k = 0; k < total; ++k) {
559,854,740✔
1243
    SSourceDataInfo* p = taosArrayGet(pArray, k);
354,287,777✔
1244
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
354,287,494!
1245
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
354,287,494✔
1246
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
166,639,473✔
1247
      completed += 1;
166,639,200✔
1248
    }
1249
  }
1250

1251
  *pRes = completed;
205,566,963✔
1252
_end:
205,568,071✔
1253
  if (code != TSDB_CODE_SUCCESS) {
205,568,071!
1254
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1255
  }
1256
  return code;
205,568,425✔
1257
}
1258

1259
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
67,638,282✔
1260
  SExchangeInfo* pExchangeInfo = pOperator->info;
67,638,282✔
1261
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
67,638,368✔
1262

1263
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
67,635,744✔
1264
  int64_t startTs = taosGetTimestampUs();
67,635,643✔
1265

1266
  // Asynchronously send all fetch requests to all sources.
1267
  for (int32_t i = 0; i < totalSources; ++i) {
170,162,805✔
1268
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
102,524,301✔
1269
    if (code != TSDB_CODE_SUCCESS) {
102,527,361✔
1270
      pTaskInfo->code = code;
199✔
1271
      return code;
×
1272
    }
1273
  }
1274

1275
  int64_t endTs = taosGetTimestampUs();
67,638,965✔
1276
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
67,638,965✔
1277
         totalSources, (endTs - startTs) / 1000.0);
1278

1279
  pOperator->status = OP_RES_TO_RETURN;
67,638,965✔
1280
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
67,638,766✔
1281
  if (isTaskKilled(pTaskInfo)) {
67,638,481!
1282
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1283
  }
1284

1285
  return TSDB_CODE_SUCCESS;
67,638,208✔
1286
}
1287

1288
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
118,015,264✔
1289
  int32_t            code = TSDB_CODE_SUCCESS;
118,015,264✔
1290
  int32_t            lino = 0;
118,015,264✔
1291
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
118,015,264✔
1292
  SSDataBlock*       pb = NULL;
118,015,264✔
1293

1294
  char* pNextStart = pRetrieveRsp->data;
118,015,264✔
1295
  char* pStart = pNextStart;
118,014,723✔
1296

1297
  int32_t index = 0;
118,014,723✔
1298

1299
  if (pRetrieveRsp->compressed) {  // decompress the data
118,014,723!
1300
    if (pDataInfo->decompBuf == NULL) {
×
1301
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1302
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1303
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1304
    } else {
1305
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1306
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1307
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1308
        if (p != NULL) {
×
1309
          pDataInfo->decompBuf = p;
×
1310
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1311
        }
1312
      }
1313
    }
1314
  }
1315

1316
  while (index++ < pRetrieveRsp->numOfBlocks) {
474,257,231✔
1317
    pStart = pNextStart;
356,240,610✔
1318

1319
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
356,240,610✔
1320
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
175,704,517✔
1321
      blockDataCleanup(pb);
175,704,517✔
1322
    } else {
1323
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
180,535,117✔
1324
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
180,533,760!
1325
    }
1326

1327
    int32_t compLen = *(int32_t*)pStart;
356,238,987✔
1328
    pStart += sizeof(int32_t);
356,239,528✔
1329

1330
    int32_t rawLen = *(int32_t*)pStart;
356,238,987✔
1331
    pStart += sizeof(int32_t);
356,238,987✔
1332
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
356,240,344!
1333

1334
    pNextStart = pStart + compLen;
356,240,344✔
1335
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
356,239,803!
1336
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1337
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1338
      pStart = pDataInfo->decompBuf;
×
1339
    }
1340

1341
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
356,239,803✔
1342
    if (code != 0) {
356,238,267!
1343
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1344
      goto _end;
×
1345
    }
1346

1347
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
356,238,267✔
1348
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
356,236,390!
1349
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
356,236,390✔
1350
    pb = NULL;
356,241,967✔
1351
  }
1352

1353
_end:
118,015,264✔
1354
  if (code != TSDB_CODE_SUCCESS) {
118,015,264!
1355
    blockDataDestroy(pb);
×
1356
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1357
  }
1358
  return code;
118,015,264✔
1359
}
1360

1361
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
32,626,262✔
1362
  SExchangeInfo* pExchangeInfo = pOperator->info;
32,626,262✔
1363
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
32,626,262✔
1364

1365
  int32_t code = 0;
32,626,262✔
1366
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
32,626,262✔
1367
  int64_t startTs = taosGetTimestampUs();
32,626,262✔
1368

1369
  int32_t vgId = 0;
32,626,262✔
1370
  if (pExchangeInfo->dynTbname) {
32,626,262!
1371
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
77,703✔
1372
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
77,703!
1373
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
77,703✔
1374
      if (pValue != NULL && pValue->isTbname) {
77,703!
1375
        vgId = pValue->vgId;
77,703✔
1376
        break;
77,703✔
1377
      }
1378
    }
1379
  }
1380

1381
  while (1) {
7,591,706✔
1382
    if (pExchangeInfo->current >= totalSources) {
40,217,968✔
1383
      setAllSourcesCompleted(pOperator);
7,579,077✔
1384
      return TSDB_CODE_SUCCESS;
7,579,077✔
1385
    }
1386

1387
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
32,638,891✔
1388
    if (!pSource) {
32,638,891!
1389
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1390
      pTaskInfo->code = terrno;
×
1391
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1392
    }
1393

1394
    if (vgId != 0 && pSource->addr.nodeId != vgId){
32,638,891✔
1395
      pExchangeInfo->current += 1;
46,113✔
1396
      continue;
46,113✔
1397
    }
1398

1399
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
32,592,778✔
1400
    if (!pDataInfo) {
32,592,778!
1401
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1402
      pTaskInfo->code = terrno;
×
1403
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1404
    }
1405
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
32,592,778✔
1406

1407
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
32,592,778✔
1408
    if (code != TSDB_CODE_SUCCESS) {
32,592,778!
1409
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1410
      pTaskInfo->code = code;
×
1411
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1412
    }
1413

1414
    while (true) {
627✔
1415
      code = exchangeWait(pOperator, pExchangeInfo);
32,593,405✔
1416
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
32,593,405!
1417
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
627!
1418
      }
1419

1420
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
32,592,778✔
1421
      if (pDataInfo->seqId != currSeqId) {
32,592,778✔
1422
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
627!
1423
        taosMemoryFreeClear(pDataInfo->pRsp);
627!
1424
        continue;
627✔
1425
      }
1426

1427
      break;
32,592,151✔
1428
    }
1429

1430
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
32,592,151!
1431
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
1432
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1433
             tstrerror(pDataInfo->code));
1434
      pOperator->pTaskInfo->code = pDataInfo->code;
×
1435
      return pOperator->pTaskInfo->code;
×
1436
    }
1437

1438
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
32,592,151✔
1439
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
32,592,151✔
1440

1441
    if (pRsp->numOfRows == 0) {
32,592,151✔
1442
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
7,545,593✔
1443
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1444
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1445
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1446

1447
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
7,545,593✔
1448
      if (pDataInfo->isVtbRefScan || pDataInfo->isVtbTagScan) {
7,545,593!
1449
        pExchangeInfo->current = totalSources;
7,489,536✔
1450
      } else {
1451
        pExchangeInfo->current += 1;
56,057✔
1452
      }
1453
      taosMemoryFreeClear(pDataInfo->pRsp);
7,545,593!
1454
      continue;
7,545,593✔
1455
    }
1456

1457
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
25,046,558✔
1458
    if (code != TSDB_CODE_SUCCESS) {
25,046,558!
1459
      goto _error;
×
1460
    }
1461

1462
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
25,046,558✔
1463
    if (pRsp->completed == 1) {
25,046,558✔
1464
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
108,536!
1465
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1466
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1467
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1468
             pExchangeInfo->current + 1, totalSources);
1469

1470
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
108,536✔
1471
      if (pDataInfo->isVtbRefScan) {
108,536!
1472
        pExchangeInfo->current = totalSources;
×
1473
      } else {
1474
        pExchangeInfo->current += 1;
108,536✔
1475
      }
1476
    } else {
1477
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
24,938,022✔
1478
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1479
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1480
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1481
    }
1482
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
25,046,558!
1483
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
24,776,511✔
1484
    }
1485
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
25,046,558✔
1486
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
25,046,558✔
1487

1488
    taosMemoryFreeClear(pDataInfo->pRsp);
25,046,558!
1489
    return TSDB_CODE_SUCCESS;
25,046,558✔
1490
  }
1491

1492
_error:
×
1493
  pTaskInfo->code = code;
×
1494
  return code;
×
1495
}
1496

1497
void clearVtbScanDataInfo(void* pItem) {
6,967,970✔
1498
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
6,967,970✔
1499
  if (pInfo->colMap) {
6,967,970!
1500
    taosArrayDestroy(pInfo->colMap->colMap);
×
1501
    taosMemoryFreeClear(pInfo->colMap);
×
1502
  }
1503
  taosArrayDestroy(pInfo->pSrcUidList);
6,967,970✔
1504
}
6,967,970✔
1505

1506
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
32,998,653✔
1507
  SExchangeInfo*     pExchangeInfo = pOperator->info;
32,998,653✔
1508
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
32,998,653✔
1509
  if (NULL == pIdx) {
32,998,653✔
1510
    if (pBasicParam->isNewDeployed) {
10,279!
1511
      SDownstreamSourceNode *pNode = NULL;
10,279✔
1512
      int32_t code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
10,279✔
1513
      if (code != TSDB_CODE_SUCCESS) {
10,279!
1514
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1515
        return code;
×
1516
      }
1517

1518
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
10,279✔
1519
      if (!tmp) {
10,279!
1520
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1521
        return terrno;
×
1522
      }
1523
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
10,279✔
1524
      code =
1525
          tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
10,279✔
1526
      if (pExchangeInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
10,279!
1527
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1528
        return code;
×
1529
      }
1530
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
10,279✔
1531
      if (pIdx == NULL) {
10,279!
1532
        qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1533
        return TSDB_CODE_INVALID_PARA;
×
1534
      }
1535
    } else {
1536
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1537
      return TSDB_CODE_INVALID_PARA;
×
1538
    }
1539
  }
1540

1541
  qDebug("start to add single exchange source");
32,998,653✔
1542

1543
  if (pBasicParam->isVtbRefScan) {
32,998,653!
1544
    SSourceDataInfo dataInfo = {0};
30,400,622✔
1545
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
30,400,622✔
1546
    dataInfo.taskId = pExchangeInfo->pTaskId;
30,400,622✔
1547
    dataInfo.index = pIdx->srcIdx;
30,400,622✔
1548
    dataInfo.window = pBasicParam->window;
30,400,622✔
1549
    dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
30,400,622!
1550
    dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
30,400,622✔
1551
    tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
30,400,622!
1552
    dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
30,400,622✔
1553

1554
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
30,400,622✔
1555
    if (dataInfo.pSrcUidList == NULL) {
30,400,622!
1556
      return terrno;
×
1557
    }
1558

1559
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
30,400,622!
1560
    dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
30,400,622!
1561
    dataInfo.srcOpType = pBasicParam->srcOpType;
30,400,622✔
1562
    dataInfo.tableSeq = pBasicParam->tableSeq;
30,400,622!
1563

1564
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
30,400,622✔
1565
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
30,400,622✔
1566
    if (!tmp) {
30,400,622!
1567
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1568
      return terrno;
×
1569
    }
1570
  } else if (pBasicParam->isVtbTagScan) {
2,598,031!
1571
    SSourceDataInfo dataInfo = {0};
2,254,035✔
1572
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2,254,035✔
1573
    dataInfo.taskId = pExchangeInfo->pTaskId;
2,254,035✔
1574
    dataInfo.index = pIdx->srcIdx;
2,254,035✔
1575
    dataInfo.window = pBasicParam->window;
2,254,035✔
1576
    dataInfo.colMap = NULL;
2,254,035✔
1577

1578
    dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,254,035✔
1579
    if (dataInfo.pSrcUidList == NULL) {
2,254,035!
1580
      return terrno;
×
1581
    }
1582

1583
    dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
2,254,035!
1584
    dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
2,254,035!
1585
    dataInfo.srcOpType = pBasicParam->srcOpType;
2,254,035✔
1586
    dataInfo.tableSeq = pBasicParam->tableSeq;
2,254,035!
1587

1588
    taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
2,254,035✔
1589
    void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
2,254,035✔
1590
    if (!tmp) {
2,254,035!
1591
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1592
      return terrno;
×
1593
    }
1594
  } else {
1595
    if (pIdx->inUseIdx < 0) {
343,996✔
1596
      SSourceDataInfo dataInfo = {0};
342,130✔
1597
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
342,130✔
1598
      dataInfo.taskId = pExchangeInfo->pTaskId;
342,130✔
1599
      dataInfo.index = pIdx->srcIdx;
342,130✔
1600
      if (pBasicParam->isVtbRefScan) {
342,130!
1601
        dataInfo.window = pBasicParam->window;
×
1602
        dataInfo.colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1603
        dataInfo.colMap->vgId = pBasicParam->colMap->vgId;
×
1604
        tstrncpy(dataInfo.colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1605
        dataInfo.colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1606
      }
1607

1608
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
342,130✔
1609
      if (dataInfo.pSrcUidList == NULL) {
342,130!
1610
        return terrno;
×
1611
      }
1612

1613
      dataInfo.isVtbRefScan = pBasicParam->isVtbRefScan;
342,130!
1614
      dataInfo.isVtbTagScan = pBasicParam->isVtbTagScan;
342,130!
1615
      dataInfo.srcOpType = pBasicParam->srcOpType;
342,130✔
1616
      dataInfo.tableSeq = pBasicParam->tableSeq;
342,130!
1617

1618
      void* tmp = taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
342,130✔
1619
      if (!tmp) {
342,130!
1620
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1621
        return terrno;
×
1622
      }
1623
      pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
342,130✔
1624
    } else {
1625
      SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
1,866✔
1626
      if (!pDataInfo) {
1,866!
1627
        return terrno;
×
1628
      }
1629
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
1,866!
1630
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,866✔
1631
      }
1632

1633
      if (pBasicParam->isVtbRefScan) {
1,866!
1634
        pDataInfo->window = pBasicParam->window;
×
1635
        if (!pDataInfo->colMap) {
×
1636
          pDataInfo->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
1637
        }
1638
        pDataInfo->colMap->vgId = pBasicParam->colMap->vgId;
×
1639
        tstrncpy(pDataInfo->colMap->tbName, pBasicParam->colMap->tbName, TSDB_TABLE_FNAME_LEN);
×
1640
        pDataInfo->colMap->colMap = taosArrayDup(pBasicParam->colMap->colMap, NULL);
×
1641
      }
1642

1643
      pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,866✔
1644
      if (pDataInfo->pSrcUidList == NULL) {
1,866!
1645
        return terrno;
×
1646
      }
1647

1648
      pDataInfo->isVtbRefScan = pBasicParam->isVtbRefScan;
1,866!
1649
      pDataInfo->isVtbTagScan = pBasicParam->isVtbTagScan;
1,866!
1650
      pDataInfo->srcOpType = pBasicParam->srcOpType;
1,866✔
1651
      pDataInfo->tableSeq = pBasicParam->tableSeq;
1,866!
1652
    }
1653
  }
1654

1655
  return TSDB_CODE_SUCCESS;
32,998,653✔
1656
}
1657

1658
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
32,846,719✔
1659
  SExchangeInfo*               pExchangeInfo = pOperator->info;
32,846,719✔
1660
  int32_t                      code = TSDB_CODE_SUCCESS;
32,846,719✔
1661
  SExchangeOperatorBasicParam* pBasicParam = NULL;
32,846,719✔
1662
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
32,846,719✔
1663
  if (pParam->multiParams) {
32,846,719!
1664
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
188,330✔
1665
    int32_t                      iter = 0;
188,330✔
1666
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
528,594✔
1667
      code = addSingleExchangeSource(pOperator, pBasicParam);
340,264✔
1668
      if (code) {
340,264!
1669
        return code;
×
1670
      }
1671
    }
1672
  } else {
1673
    pBasicParam = &pParam->basic;
32,658,389✔
1674
    code = addSingleExchangeSource(pOperator, pBasicParam);
32,658,389✔
1675
  }
1676

1677
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
32,846,719✔
1678
  pOperator->pOperatorGetParam = NULL;
32,846,719✔
1679

1680
  return code;
32,846,719✔
1681
}
1682

1683
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
458,873,481✔
1684
  SExchangeInfo* pExchangeInfo = pOperator->info;
458,873,481✔
1685
  int32_t        code = TSDB_CODE_SUCCESS;
458,874,495✔
1686
  int32_t        lino = 0;
458,874,495✔
1687
  
1688
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
458,874,495!
1689
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
113,827,728!
1690
    qDebug("skip prepare, opened:%d, dynamicOp:%d, getParam:%p", OPTR_IS_OPENED(pOperator), pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
345,629,018!
1691
    return TSDB_CODE_SUCCESS;
345,629,030✔
1692
  }
1693

1694
  if (pExchangeInfo->dynamicOp) {
113,244,122!
1695
    code = addDynamicExchangeSource(pOperator);
32,846,719✔
1696
    QUERY_CHECK_CODE(code, lino, _end);
32,846,719!
1697
  }
1698

1699
  if (pOperator->status == OP_NOT_OPENED && (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) || IS_STREAM_MODE(pOperator->pTaskInfo)) {
113,244,513!
1700
    pExchangeInfo->current = 0;
43,872,342✔
1701
  }
1702

1703
  int64_t st = taosGetTimestampUs();
113,244,341✔
1704

1705
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
113,244,341✔
1706
    code = prepareConcurrentlyLoad(pOperator);
67,636,654✔
1707
    QUERY_CHECK_CODE(code, lino, _end);
67,638,208!
1708
    pExchangeInfo->openedTs = taosGetTimestampUs();
67,638,208✔
1709
  }
1710

1711
  OPTR_SET_OPENED(pOperator);
113,247,386✔
1712
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
113,244,949✔
1713

1714
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
113,245,232✔
1715

1716
_end:
6,376,390✔
1717
  if (code != TSDB_CODE_SUCCESS) {
113,244,389!
1718
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1719
    pOperator->pTaskInfo->code = code;
×
1720
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1721
  }
1722
  return TSDB_CODE_SUCCESS;
113,244,389✔
1723
}
1724

1725
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
2,646,259✔
1726
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,646,259✔
1727

1728
  if (pLimitInfo->remainGroupOffset > 0) {
2,646,259!
1729
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
1730
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1731
      blockDataCleanup(pBlock);
×
1732
      return PROJECT_RETRIEVE_CONTINUE;
×
1733
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
1734
      // now it is the data from a new group
1735
      pLimitInfo->remainGroupOffset -= 1;
×
1736

1737
      // ignore data block in current group
1738
      if (pLimitInfo->remainGroupOffset > 0) {
×
1739
        blockDataCleanup(pBlock);
×
1740
        return PROJECT_RETRIEVE_CONTINUE;
×
1741
      }
1742
    }
1743

1744
    // set current group id of the project operator
1745
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
1746
  }
1747

1748
  // here check for a new group data, we need to handle the data of the previous group.
1749
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
2,646,259✔
1750
    pLimitInfo->numOfOutputGroups += 1;
110,539✔
1751
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
110,539!
1752
      pOperator->status = OP_EXEC_DONE;
×
1753
      blockDataCleanup(pBlock);
×
1754

1755
      return PROJECT_RETRIEVE_DONE;
×
1756
    }
1757

1758
    // reset the value for a new group data
1759
    resetLimitInfoForNextGroup(pLimitInfo);
110,539✔
1760
    // existing rows that belongs to previous group.
1761
    if (pBlock->info.rows > 0) {
110,539!
1762
      return PROJECT_RETRIEVE_DONE;
110,539✔
1763
    }
1764
  }
1765

1766
  // here we reach the start position, according to the limit/offset requirements.
1767

1768
  // set current group id
1769
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
2,535,720✔
1770

1771
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
2,535,720✔
1772
  if (pBlock->info.rows == 0) {
2,535,720✔
1773
    return PROJECT_RETRIEVE_CONTINUE;
1,249,291✔
1774
  } else {
1775
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,286,429!
1776
      setOperatorCompleted(pOperator);
×
1777
      return PROJECT_RETRIEVE_DONE;
×
1778
    }
1779
  }
1780

1781
  // todo optimize performance
1782
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
1783
  // they may not belong to the same group the limit/offset value is not valid in this case.
1784
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,286,429!
1785
    return PROJECT_RETRIEVE_DONE;
1,286,429✔
1786
  } else {  // not full enough, continue to accumulate the output data in the buffer.
1787
    return PROJECT_RETRIEVE_CONTINUE;
×
1788
  }
1789
}
1790

1791
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
162,171,687✔
1792
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
162,171,687✔
1793
  int32_t        code = TSDB_CODE_SUCCESS;
162,171,488✔
1794
  if (pTask->pWorkerCb) {
162,171,488✔
1795
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
162,171,299✔
1796
    if (code != TSDB_CODE_SUCCESS) {
162,171,972!
1797
      pTask->code = code;
×
1798
      return pTask->code;
×
1799
    }
1800
  }
1801

1802
  code = tsem_wait(&pExchangeInfo->ready);
162,172,085✔
1803
  if (code != TSDB_CODE_SUCCESS) {
162,171,697!
1804
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1805
    pTask->code = code;
×
1806
    return pTask->code;
×
1807
  }
1808

1809
  if (pTask->pWorkerCb) {
162,171,697!
1810
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
162,171,972✔
1811
    if (code != TSDB_CODE_SUCCESS) {
162,171,974!
1812
      pTask->code = code;
×
1813
      return pTask->code;
×
1814
    }
1815
  }
1816
  return TSDB_CODE_SUCCESS;
162,171,699✔
1817
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc