• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4911

04 Jan 2026 09:05AM UTC coverage: 65.028% (-0.8%) from 65.864%
#4911

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

1517 existing lines in 134 files now uncovered.

195276 of 300296 relevant lines covered (65.03%)

116931714.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.35
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
} SSourceDataInfo;
58

59
static void destroyExchangeOperatorInfo(void* param);
60
static void freeBlock(void* pParam);
61
static void freeSourceDataInfo(void* param);
62
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
63

64
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
65
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
66
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
67
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
68
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
69
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
70
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
71
                                 bool holdDataInBuf);
72
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
73

74
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
75

76
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
13,529,940✔
77
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
78
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
79
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
10,376,977✔
80
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
81

82

83
static void streamConcurrentlyLoadRemoteData(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
8,597,442✔
84
                                           SExecTaskInfo* pTaskInfo) {
85
  int32_t code = 0;
8,597,442✔
86
  int32_t lino = 0;
8,597,442✔
87
  int64_t startTs = taosGetTimestampUs();  
8,598,034✔
88
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
8,598,034✔
89
  int32_t completed = 0;
8,597,833✔
90
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
8,597,833✔
91
  if (code != TSDB_CODE_SUCCESS) {
8,597,632✔
92
    pTaskInfo->code = code;
×
93
    T_LONG_JMP(pTaskInfo->env, code);
×
94
  }
95
  if (completed == totalSources) {
8,597,632✔
96
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
1,646,988✔
97
    setAllSourcesCompleted(pOperator);
1,646,988✔
98
    return;
1,649,252✔
99
  }
100

101
  SSourceDataInfo* pDataInfo = NULL;
6,950,644✔
102

103
  while (1) {
4,097,724✔
104
    if (pExchangeInfo->current < 0) {
11,048,368✔
105
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
107,070✔
106
      setAllSourcesCompleted(pOperator);
107,070✔
107
      return;
107,070✔
108
    }
109
    
110
    if (pExchangeInfo->current >= totalSources) {
10,941,499✔
111
      completed = 0;
5,019,428✔
112
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
5,019,428✔
113
      if (code != TSDB_CODE_SUCCESS) {
5,019,428✔
114
        pTaskInfo->code = code;
×
115
        T_LONG_JMP(pTaskInfo->env, code);
×
116
      }
117
      if (completed == totalSources) {
5,019,428✔
118
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
3,545,703✔
119
        setAllSourcesCompleted(pOperator);
3,545,703✔
120
        return;
3,545,703✔
121
      }
122
      
123
      pExchangeInfo->current = 0;
1,473,725✔
124
    }
125

126
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
7,395,796✔
127

128
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
7,395,796✔
129
    if (!pDataInfo) {
7,395,595✔
130
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
131
      pTaskInfo->code = terrno;
×
132
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
133
    }
134

135
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
7,395,595✔
136
      pExchangeInfo->current++;
950✔
137
      continue;
950✔
138
    }
139

140
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
7,394,645✔
141

142
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
7,395,047✔
143
    if (code != TSDB_CODE_SUCCESS) {
7,395,047✔
144
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
145
      pTaskInfo->code = code;
×
146
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
147
    }
148

149
    while (true) {
729✔
150
      code = exchangeWait(pOperator, pExchangeInfo);
7,395,776✔
151
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
7,395,776✔
152
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
729✔
153
      }
154

155
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
7,395,047✔
156
      if (pDataInfo->seqId != currSeqId) {
7,395,047✔
157
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
729✔
158
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
159
        taosMemoryFreeClear(pDataInfo->pRsp);
729✔
160
        continue;
729✔
161
      }
162

163
      break;
7,394,318✔
164
    }
165

166
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
7,394,318✔
167
    if (!pSource) {
7,394,318✔
168
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
169
      pTaskInfo->code = terrno;
×
170
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
171
    }
172

173
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
7,394,318✔
174
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
175
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
176
             tstrerror(pDataInfo->code));
177
      pTaskInfo->code = pDataInfo->code;
×
178
      T_LONG_JMP(pTaskInfo->env, code);
×
179
    }
180

181
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
7,394,318✔
182
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
7,394,318✔
183

184
    if (pRsp->numOfRows == 0) {
7,394,318✔
185
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
4,096,774✔
186
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
187
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
188
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
189

190
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
4,096,774✔
191
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
4,096,774✔
192
        pExchangeInfo->current = -1;
107,070✔
193
      } else {
194
        pExchangeInfo->current += 1;
3,989,704✔
195
      }
196
      taosMemoryFreeClear(pDataInfo->pRsp);
4,096,774✔
197
      continue;
4,096,774✔
198
    }
199

200
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,297,544✔
201
    TAOS_CHECK_EXIT(code);
3,297,544✔
202

203
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,297,544✔
204
    if (pRsp->completed == 1) {
3,297,544✔
205
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,762,742✔
206
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
207
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
208
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
209
             pExchangeInfo->current + 1, totalSources);
210

211
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,762,742✔
212
      if (isVstbScan(pDataInfo)) {
1,762,742✔
213
        pExchangeInfo->current = -1;
×
214
        taosMemoryFreeClear(pDataInfo->pRsp);
×
215
        continue;
×
216
      }
217
    } else {
218
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
1,534,802✔
219
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
220
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
221
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
222
    }
223

224
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
3,297,777✔
225
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,297,343✔
226

227
    pExchangeInfo->current++;
3,297,544✔
228

229
    taosMemoryFreeClear(pDataInfo->pRsp);
3,297,544✔
230
    return;
3,297,544✔
231
  }
232

233
_exit:
×
234

235
  if (code) {
×
236
    pTaskInfo->code = code;
×
237
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
238
  }
239
}
240

241

242
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
184,540,054✔
243
                                           SExecTaskInfo* pTaskInfo) {
244
  int32_t code = 0;
184,540,054✔
245
  int32_t lino = 0;
184,540,054✔
246
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
184,540,054✔
247
  int32_t completed = 0;
184,539,906✔
248
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
184,539,906✔
249
  if (code != TSDB_CODE_SUCCESS) {
184,540,559✔
250
    pTaskInfo->code = code;
×
251
    T_LONG_JMP(pTaskInfo->env, code);
×
252
  }
253
  if (completed == totalSources) {
184,540,559✔
254
    setAllSourcesCompleted(pOperator);
56,827,711✔
255
    return;
56,828,873✔
256
  }
257

258
  SSourceDataInfo* pDataInfo = NULL;
127,712,848✔
259

260
  while (1) {
16,551,167✔
261
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
144,264,015✔
262
    code = exchangeWait(pOperator, pExchangeInfo);
144,264,015✔
263

264
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
144,265,007✔
UNCOV
265
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
266
    }
267

268
    for (int32_t i = 0; i < totalSources; ++i) {
223,309,057✔
269
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
223,309,392✔
270
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
223,309,211✔
271
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
223,309,211✔
272
        continue;
56,153,848✔
273
      }
274

275
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
167,155,028✔
276
        continue;
22,890,537✔
277
      }
278

279
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
144,264,491✔
280
      if (pDataInfo->seqId != currSeqId) {
144,264,156✔
281
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
282
        taosMemoryFreeClear(pDataInfo->pRsp);
×
283
        break;
×
284
      }
285

286
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
144,263,821✔
287
        code = pDataInfo->code;
108✔
288
        TAOS_CHECK_EXIT(code);
108✔
289
      }
290

291
      tmemory_barrier();
144,263,713✔
292
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
144,263,713✔
293
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
144,264,564✔
294
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
144,265,234✔
295

296
      // todo
297
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
144,265,234✔
298
      if (pRsp->numOfRows == 0) {
144,265,234✔
299
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
37,811,795✔
300
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
301
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
302
          if (code != TSDB_CODE_SUCCESS) {
×
303
            taosMemoryFreeClear(pDataInfo->pRsp);
×
304
            TAOS_CHECK_EXIT(code);
×
305
          }
306
        } else {
307
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
37,811,795✔
308
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
37,811,795✔
309
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
310
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
311
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
312
          taosMemoryFreeClear(pDataInfo->pRsp);
37,811,795✔
313
        }
314
        break;
37,811,795✔
315
      }
316

317
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
106,453,439✔
318

319
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
106,453,439✔
320
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
106,453,439✔
321
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
106,453,439✔
322

323
      if (pRsp->completed == 1) {
106,452,934✔
324
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
100,164,404✔
325
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
100,164,404✔
326
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
327
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
328
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
329
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
330
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
331
      } else {
332
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
6,289,035✔
333
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
334
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
335
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
336
      }
337

338
      taosMemoryFreeClear(pDataInfo->pRsp);
106,453,439✔
339

340
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
106,452,934✔
341
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
6,289,035✔
342
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
6,289,035✔
343
        if (code != TSDB_CODE_SUCCESS) {
6,289,035✔
344
          taosMemoryFreeClear(pDataInfo->pRsp);
×
345
          TAOS_CHECK_EXIT(code);
×
346
        }
347
      }
348
      
349
      return;
106,453,509✔
350
    }  // end loop
351

352
    int32_t complete1 = 0;
37,811,460✔
353
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
37,811,795✔
354
    if (code != TSDB_CODE_SUCCESS) {
37,811,795✔
355
      pTaskInfo->code = code;
×
356
      T_LONG_JMP(pTaskInfo->env, code);
×
357
    }
358
    if (complete1 == totalSources) {
37,811,795✔
359
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
21,260,628✔
360
      return;
21,260,628✔
361
    }
362
  }
363

UNCOV
364
_exit:
×
365

366
  if (code) {
108✔
367
    pTaskInfo->code = code;
108✔
368
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
108✔
369
  }
370
}
371

372
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
448,213,560✔
373
  int32_t        code = TSDB_CODE_SUCCESS;
448,213,560✔
374
  SExchangeInfo* pExchangeInfo = pOperator->info;
448,213,560✔
375
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
448,214,378✔
376

377
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
448,213,404✔
378

379
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
448,213,430✔
380
  if (pOperator->status == OP_EXEC_DONE) {
448,212,773✔
381
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
382
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
383
           pLoadInfo->totalElapsed / 1000.0);
384
    return NULL;
×
385
  }
386

387
  // we have buffered retrieved datablock, return it directly
388
  SSDataBlock* p = NULL;
448,213,095✔
389
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
448,212,921✔
390
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
250,734,357✔
391
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
250,733,874✔
392
  }
393

394
  if (p != NULL) {
448,211,915✔
395
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
250,734,158✔
396
    if (!tmp) {
250,733,957✔
397
      code = terrno;
×
398
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
399
      pTaskInfo->code = code;
×
400
      T_LONG_JMP(pTaskInfo->env, code);
×
401
    }
402
    return p;
250,733,957✔
403
  } else {
404
    if (pExchangeInfo->seqLoadData) {
197,477,757✔
405
      code = seqLoadRemoteData(pOperator);
4,340,811✔
406
      if (code != TSDB_CODE_SUCCESS) {
4,340,447✔
407
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
609✔
408
        pTaskInfo->code = code;
609✔
409
        T_LONG_JMP(pTaskInfo->env, code);
609✔
410
      }
411
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
193,137,887✔
412
      streamConcurrentlyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
8,597,833✔
413
    } else {
414
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
184,540,228✔
415
    }
416
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
197,479,029✔
417
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
108✔
418
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
108✔
419
    }
420
    
421
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
197,478,215✔
422
      qDebug("empty resultBlockList");
84,690,811✔
423
      return NULL;
84,690,970✔
424
    } else {
425
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
112,787,951✔
426
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
112,787,245✔
427
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
112,787,951✔
428
      if (!tmp) {
112,787,750✔
429
        code = terrno;
×
430
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
431
        pTaskInfo->code = code;
×
432
        T_LONG_JMP(pTaskInfo->env, code);
×
433
      }
434

435
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
112,787,750✔
436
      return p;
112,787,951✔
437
    }
438
}
439
}
440

441
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
446,430,848✔
442
  int32_t        code = TSDB_CODE_SUCCESS;
446,430,848✔
443
  int32_t        lino = 0;
446,430,848✔
444
  SExchangeInfo* pExchangeInfo = pOperator->info;
446,430,848✔
445
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
446,434,085✔
446

447
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
446,433,827✔
448

449
  code = pOperator->fpSet._openFn(pOperator);
446,437,456✔
450
  QUERY_CHECK_CODE(code, lino, _end);
446,434,239✔
451

452
  if (pOperator->status == OP_EXEC_DONE) {
446,434,239✔
453
    (*ppRes) = NULL;
110,349✔
454
    return code;
110,349✔
455
  }
456

457
  while (1) {
1,889,313✔
458
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
448,213,712✔
459
    if (pBlock == NULL) {
448,213,278✔
460
      (*ppRes) = NULL;
84,690,970✔
461
      return code;
84,690,970✔
462
    }
463

464
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
363,522,308✔
465
    QUERY_CHECK_CODE(code, lino, _end);
363,522,308✔
466

467
    if (blockDataGetNumOfRows(pBlock) == 0) {
363,522,308✔
468
      qDebug("rows 0 block got, continue next load");
968✔
469
      continue;
968✔
470
    }
471

472
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
363,521,108✔
473
    if (hasLimitOffsetInfo(pLimitInfo)) {
363,521,108✔
474
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
3,842,034✔
475
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,842,034✔
476
        qDebug("limit retrieve continue");
1,888,345✔
477
        continue;
1,888,345✔
478
      } else if (status == PROJECT_RETRIEVE_DONE) {
1,953,689✔
479
        if (pBlock->info.rows == 0) {
1,953,689✔
480
          setOperatorCompleted(pOperator);
×
481
          (*ppRes) = NULL;
×
482
          return code;
×
483
        } else {
484
          (*ppRes) = pBlock;
1,953,689✔
485
          return code;
1,953,689✔
486
        }
487
      }
488
    } else {
489
      (*ppRes) = pBlock;
359,678,600✔
490
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
359,679,306✔
491
      return code;
359,678,600✔
492
    }
493
  }
494

495
_end:
×
496

497
  if (code != TSDB_CODE_SUCCESS) {
×
498
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
499
    pTaskInfo->code = code;
×
500
    T_LONG_JMP(pTaskInfo->env, code);
×
501
  } else {
502
    qDebug("empty block returned in exchange");
×
503
  }
504
  
505
  (*ppRes) = NULL;
×
506
  return code;
×
507
}
508

509
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
86,881,651✔
510
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
86,881,651✔
511
  if (pInfo->pSourceDataInfo == NULL) {
86,882,792✔
512
    return terrno;
×
513
  }
514

515
  if (pInfo->dynamicOp) {
86,884,094✔
516
    return TSDB_CODE_SUCCESS;
2,735,563✔
517
  }
518

519
  int32_t len = strlen(id) + 1;
84,146,571✔
520
  pInfo->pTaskId = taosMemoryCalloc(1, len);
84,146,571✔
521
  if (!pInfo->pTaskId) {
84,146,869✔
522
    return terrno;
×
523
  }
524
  tstrncpy(pInfo->pTaskId, id, len);
84,143,986✔
525
  for (int32_t i = 0; i < numOfSources; ++i) {
224,097,152✔
526
    SSourceDataInfo dataInfo = {0};
139,950,283✔
527
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
139,950,283✔
528
    dataInfo.taskId = pInfo->pTaskId;
139,950,283✔
529
    dataInfo.index = i;
139,950,425✔
530
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
139,950,425✔
531
    if (pDs == NULL) {
139,953,234✔
532
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
533
      return terrno;
×
534
    }
535
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
139,953,234✔
536
  }
537

538
  return TSDB_CODE_SUCCESS;
84,146,869✔
539
}
540

541
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
86,882,135✔
542
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
86,882,135✔
543

544
  if (numOfSources == 0) {
86,883,100✔
545
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
546
    return TSDB_CODE_INVALID_PARA;
×
547
  }
548
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
86,883,100✔
549
  if (!pInfo->pFetchRpcHandles) {
86,883,585✔
550
    return terrno;
×
551
  }
552
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
86,881,651✔
553
  if (!ret) {
86,882,616✔
554
    return terrno;
×
555
  }
556

557
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
86,882,616✔
558
  if (pInfo->pSources == NULL) {
86,882,319✔
559
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
560
    return terrno;
×
561
  }
562

563
  if (pExNode->node.dynamicOp) {
86,883,270✔
564
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
2,735,563✔
565
    if (NULL == pInfo->pHashSources) {
2,735,563✔
566
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
567
      return terrno;
×
568
    }
569
  }
570

571
  for (int32_t i = 0; i < numOfSources; ++i) {
232,334,407✔
572
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
145,450,994✔
573
    if (!pNode) {
145,451,493✔
574
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
575
      return terrno;
×
576
    }
577
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
145,451,493✔
578
    if (!tmp) {
145,452,126✔
579
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
580
      return terrno;
×
581
    }
582
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
145,452,126✔
583
    int32_t           code =
584
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
145,451,642✔
585
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
145,451,657✔
586
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
587
      return code;
×
588
    }
589
  }
590

591
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
86,883,413✔
592
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
86,883,128✔
593
  if (refId < 0) {
86,879,835✔
594
    int32_t code = terrno;
×
595
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
596
    return code;
×
597
  } else {
598
    pInfo->self = refId;
86,879,835✔
599
  }
600

601
  return initDataSource(numOfSources, pInfo, id);
86,882,134✔
602
}
603

604
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
8,937,201✔
605
  SExchangeInfo* pInfo = pOper->info;
8,937,201✔
606
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
8,938,765✔
607

608
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
8,938,997✔
609

610
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
8,938,997✔
611
  pOper->status = OP_NOT_OPENED;
8,939,229✔
612
  pInfo->current = 0;
8,939,229✔
613
  pInfo->loadInfo.totalElapsed = 0;
8,939,229✔
614
  pInfo->loadInfo.totalRows = 0;
8,939,229✔
615
  pInfo->loadInfo.totalSize = 0;
8,939,229✔
616
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
25,193,092✔
617
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
16,253,465✔
618
    taosWLockLatch(&pDataInfo->lock);
16,254,062✔
619
    taosMemoryFreeClear(pDataInfo->decompBuf);
16,254,062✔
620
    taosMemoryFreeClear(pDataInfo->pRsp);
16,254,062✔
621

622
    pDataInfo->totalRows = 0;
16,253,863✔
623
    pDataInfo->code = 0;
16,253,863✔
624
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
16,253,863✔
625
    pDataInfo->fetchSent = false;
16,253,863✔
626
    taosWUnLockLatch(&pDataInfo->lock);
16,253,863✔
627
  }
628

629
  if (pInfo->dynamicOp) {
8,939,030✔
630
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,707,611✔
631
  } 
632

633
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
8,939,229✔
634
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
8,939,030✔
635

636
  blockDataCleanup(pInfo->pDummyBlock);
8,938,831✔
637

638
  void   *data = NULL;
8,939,030✔
639
  int32_t iter = 0;
8,939,030✔
640
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
12,543,356✔
641
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
3,604,326✔
642
  }
643
  
644
  pInfo->limitInfo = (SLimitInfo){0};
8,938,829✔
645
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
8,939,030✔
646

647
  return 0;
8,938,798✔
648
}
649

650
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
86,882,135✔
651
                                   SOperatorInfo** pOptrInfo) {
652
  QRY_PARAM_CHECK(pOptrInfo);
86,882,135✔
653

654
  int32_t        code = 0;
86,882,135✔
655
  int32_t        lino = 0;
86,882,135✔
656
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
86,882,135✔
657
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
86,880,976✔
658
  if (pInfo == NULL || pOperator == NULL) {
86,879,541✔
UNCOV
659
    code = terrno;
×
660
    goto _error;
×
661
  }
662

663
  pOperator->pPhyNode = pExNode;
86,879,541✔
664
  pInfo->dynamicOp = pExNode->node.dynamicOp;
86,880,529✔
665
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
86,881,480✔
666
  QUERY_CHECK_CODE(code, lino, _error);
86,882,432✔
667

668
  code = tsem_init(&pInfo->ready, 0, 0);
86,882,432✔
669
  QUERY_CHECK_CODE(code, lino, _error);
86,881,608✔
670

671
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
86,881,608✔
672
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
86,883,585✔
673

674
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
86,882,284✔
675
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
86,882,634✔
676
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
86,882,125✔
677
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
86,883,102✔
678

679
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
86,884,094✔
680
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
86,882,593✔
681
  QUERY_CHECK_CODE(code, lino, _error);
86,881,654✔
682

683
  pInfo->seqLoadData = pExNode->seqRecvData;
86,881,654✔
684
  pInfo->dynTbname = pExNode->dynTbname;
86,882,662✔
685
  if (pInfo->dynTbname) {
86,882,618✔
686
    pInfo->seqLoadData = true;
9,016✔
687
  }
688
  pInfo->pTransporter = pTransporter;
86,882,110✔
689

690
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
86,881,654✔
691
                  pTaskInfo);
692
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
86,882,089✔
693

694
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
86,883,626✔
695
                            pTaskInfo->pStreamRuntimeInfo);
86,881,654✔
696
  QUERY_CHECK_CODE(code, lino, _error);
86,882,281✔
697
  qTrace("%s exchange op:%p", __func__, pOperator);
86,882,281✔
698
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
86,882,281✔
699
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
700
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
86,880,830✔
701
  *pOptrInfo = pOperator;
86,881,319✔
702
  return TSDB_CODE_SUCCESS;
86,881,291✔
703

704
_error:
×
705
  if (code != TSDB_CODE_SUCCESS) {
×
706
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
707
    pTaskInfo->code = code;
×
708
  }
709
  if (pInfo != NULL) {
×
710
    doDestroyExchangeOperatorInfo(pInfo);
×
711
  }
712

713
  if (pOperator != NULL) {
×
714
    pOperator->info = NULL;
×
715
    destroyOperator(pOperator);
×
716
  }
717
  return code;
×
718
}
719

720
void destroyExchangeOperatorInfo(void* param) {
86,884,094✔
721
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
86,884,094✔
722
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
86,884,094✔
723
  if (code != TSDB_CODE_SUCCESS) {
86,883,626✔
724
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
725
  }
726
}
86,883,626✔
727

728
void freeBlock(void* pParam) {
214,512,408✔
729
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
214,512,408✔
730
  blockDataDestroy(pBlock);
214,512,408✔
731
}
214,511,590✔
732

733
void freeSourceDataInfo(void* p) {
140,490,594✔
734
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
140,490,594✔
735
  taosMemoryFreeClear(pInfo->decompBuf);
140,490,594✔
736
  taosMemoryFreeClear(pInfo->pRsp);
140,490,594✔
737

738
  pInfo->decompBufSize = 0;
140,490,594✔
739
}
140,490,594✔
740

741
void doDestroyExchangeOperatorInfo(void* param) {
86,884,094✔
742
  if (param == NULL) {
86,884,094✔
743
    return;
×
744
  }
745
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
86,884,094✔
746
  if (pExInfo->pFetchRpcHandles) {
86,884,094✔
747
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
232,337,044✔
748
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
145,452,950✔
749
      if (*pRpcHandle > 0) {
145,452,950✔
750
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
7,955,299✔
751
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
7,955,299✔
752
      }
753
    }
754
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
86,884,094✔
755
  }
756

757
  taosArrayDestroy(pExInfo->pSources);
86,884,094✔
758
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
86,884,094✔
759

760
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
86,884,094✔
761
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
86,884,094✔
762

763
  blockDataDestroy(pExInfo->pDummyBlock);
86,883,609✔
764
  tSimpleHashCleanup(pExInfo->pHashSources);
86,884,094✔
765

766
  int32_t code = tsem_destroy(&pExInfo->ready);
86,883,128✔
767
  if (code != TSDB_CODE_SUCCESS) {
86,883,626✔
768
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
769
  }
770
  taosMemoryFreeClear(pExInfo->pTaskId);
86,883,626✔
771

772
  taosMemoryFreeClear(param);
86,882,175✔
773
}
774

775
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
156,120,062✔
776
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
156,120,062✔
777

778
  taosMemoryFreeClear(pMsg->pEpSet);
156,120,062✔
779
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
156,134,940✔
780
  if (pExchangeInfo == NULL) {
156,146,301✔
781
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
5,269✔
782
    taosMemoryFree(pMsg->pData);
5,269✔
783
    return TSDB_CODE_SUCCESS;
5,269✔
784
  }
785

786
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
156,141,032✔
787
  if (pWrapper->seqId != currSeqId) {
156,135,662✔
788
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
789
    taosMemoryFree(pMsg->pData);
×
790
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
791
    if (code != TSDB_CODE_SUCCESS) {
×
792
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
793
    }
794
    return TSDB_CODE_SUCCESS;
×
795
  }
796

797
  int32_t          index = pWrapper->sourceIndex;
156,129,503✔
798

799
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
156,131,663✔
800

801
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
156,133,024✔
802
  if (pRpcHandle != NULL) {
156,129,268✔
803
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
156,111,910✔
804
    if (ret != 0) {
156,112,762✔
805
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
8,992,594✔
806
    }
807
    *pRpcHandle = -1;
156,112,762✔
808
  }
809

810
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
156,128,392✔
811
  if (!pSourceDataInfo) {
156,131,284✔
812
    return terrno;
×
813
  }
814

815
  if (0 == code && NULL == pMsg->pData) {
156,131,284✔
816
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
817
    code = TSDB_CODE_QRY_INVALID_MSG;
×
818
  }
819

820
  taosWLockLatch(&pSourceDataInfo->lock);
156,146,206✔
821
  if (code == TSDB_CODE_SUCCESS) {
156,142,689✔
822
    pSourceDataInfo->seqId = pWrapper->seqId;
156,140,879✔
823
    pSourceDataInfo->pRsp = pMsg->pData;
156,126,653✔
824

825
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
156,104,919✔
826
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
156,104,654✔
827
    pRsp->compLen = htonl(pRsp->compLen);
156,114,338✔
828
    pRsp->payloadLen = htonl(pRsp->payloadLen);
156,119,369✔
829
    pRsp->numOfCols = htonl(pRsp->numOfCols);
156,081,969✔
830
    pRsp->useconds = htobe64(pRsp->useconds);
156,121,263✔
831
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
156,058,485✔
832

833
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
156,093,073✔
834
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
835
  } else {
836
    taosMemoryFree(pMsg->pData);
1,810✔
837
    pSourceDataInfo->code = rpcCvtErrCode(code);
1,810✔
838
    if (pSourceDataInfo->code != code) {
1,810✔
839
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
840
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
841
    } else {
842
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
1,810✔
843
             pExchangeInfo);
844
    }
845
  }
846

847
  tmemory_barrier();
156,095,379✔
848
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
156,095,379✔
849
  taosWUnLockLatch(&pSourceDataInfo->lock);
156,121,931✔
850
  
851
  code = tsem_post(&pExchangeInfo->ready);
156,081,004✔
852
  if (code != TSDB_CODE_SUCCESS) {
156,137,593✔
853
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
854
    return code;
×
855
  }
856

857
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
156,137,593✔
858
  if (code != TSDB_CODE_SUCCESS) {
156,138,955✔
859
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
860
  }
861
  return code;
156,139,842✔
862
}
863

864
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
234,884✔
865
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
234,884✔
866
  if (NULL == *ppRes) {
234,884✔
867
    return terrno;
×
868
  }
869

870
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
234,884✔
871
  if (NULL == pScan) {
234,884✔
872
    taosMemoryFreeClear(*ppRes);
×
873
    return terrno;
×
874
  }
875

876
  pScan->pUidList = taosArrayDup(pUidList, NULL);
234,884✔
877
  if (NULL == pScan->pUidList) {
234,884✔
878
    taosMemoryFree(pScan);
×
879
    taosMemoryFreeClear(*ppRes);
×
880
    return terrno;
×
881
  }
882
  pScan->type = DYN_TYPE_STB_JOIN;
234,884✔
883
  pScan->tableSeq = tableSeq;
234,884✔
884
  pScan->pOrgTbInfo = NULL;
234,884✔
885
  pScan->pBatchTbInfo = NULL;
234,884✔
886
  pScan->pTagList = NULL;
234,884✔
887
  pScan->isNewParam = false;
234,884✔
888
  pScan->window.skey = INT64_MAX;
234,884✔
889
  pScan->window.ekey = INT64_MIN;
234,884✔
890

891
  (*ppRes)->opType = srcOpType;
234,884✔
892
  (*ppRes)->downstreamIdx = 0;
234,884✔
893
  (*ppRes)->value = pScan;
234,884✔
894
  (*ppRes)->pChildren = NULL;
234,884✔
895
  (*ppRes)->reUse = false;
234,884✔
896

897
  return TSDB_CODE_SUCCESS;
234,884✔
898
}
899

900
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam) {
3,480,816✔
901
  int32_t                  code = TSDB_CODE_SUCCESS;
3,480,816✔
902
  int32_t                  lino = 0;
3,480,816✔
903
  STableScanOperatorParam* pScan = NULL;
3,480,816✔
904

905
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,480,816✔
906
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
3,480,816✔
907

908
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
3,480,816✔
909
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
3,480,816✔
910

911
  if (pUidList) {
3,480,816✔
912
    pScan->pUidList = taosArrayDup(pUidList, NULL);
3,480,816✔
913
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
3,480,816✔
914
  } else {
915
    pScan->pUidList = NULL;
×
916
  }
917

918
  if (pMap) {
3,480,816✔
919
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
3,480,816✔
920
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
3,480,816✔
921

922
    pScan->pOrgTbInfo->vgId = pMap->vgId;
3,480,816✔
923
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
3,480,816✔
924

925
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
3,480,816✔
926
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
3,480,816✔
927
  } else {
928
    pScan->pOrgTbInfo = NULL;
×
929
  }
930
  pScan->pTagList = NULL;
3,480,816✔
931
  pScan->pBatchTbInfo = NULL;
3,480,816✔
932

933

934
  pScan->type = DYN_TYPE_VSTB_SINGLE_SCAN;
3,480,816✔
935
  pScan->tableSeq = tableSeq;
3,480,816✔
936
  pScan->window.skey = window->skey;
3,480,816✔
937
  pScan->window.ekey = window->ekey;
3,480,816✔
938
  pScan->isNewParam = isNewParam;
3,480,816✔
939
  (*ppRes)->opType = srcOpType;
3,480,816✔
940
  (*ppRes)->downstreamIdx = 0;
3,480,816✔
941
  (*ppRes)->value = pScan;
3,480,816✔
942
  (*ppRes)->pChildren = NULL;
3,480,816✔
943
  (*ppRes)->reUse = false;
3,480,816✔
944

945
  return code;
3,480,816✔
946
_return:
×
947
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
948
  taosMemoryFreeClear(*ppRes);
×
949
  if (pScan) {
×
950
    taosArrayDestroy(pScan->pUidList);
×
951
    if (pScan->pOrgTbInfo) {
×
952
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
953
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
954
    }
955
    taosMemoryFree(pScan);
×
956
  }
957
  return code;
×
958
}
959

UNCOV
960
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
×
UNCOV
961
  int32_t                  code = TSDB_CODE_SUCCESS;
×
UNCOV
962
  int32_t                  lino = 0;
×
UNCOV
963
  STableScanOperatorParam* pScan = NULL;
×
964

UNCOV
965
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
966
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
967

UNCOV
968
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
×
UNCOV
969
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
×
970

UNCOV
971
  pScan->groupid = groupid;
×
UNCOV
972
  if (pUidList) {
×
UNCOV
973
    pScan->pUidList = taosArrayDup(pUidList, NULL);
×
UNCOV
974
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
×
975
  } else {
976
    pScan->pUidList = NULL;
×
977
  }
UNCOV
978
  pScan->pOrgTbInfo = NULL;
×
979

UNCOV
980
  if (pBatchMap) {
×
UNCOV
981
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
UNCOV
982
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
×
UNCOV
983
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
×
UNCOV
984
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
×
UNCOV
985
      SOrgTbInfo batchInfo = {0};
×
UNCOV
986
      batchInfo.vgId = pSrcInfo->vgId;
×
UNCOV
987
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
×
UNCOV
988
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
×
UNCOV
989
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
×
UNCOV
990
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
×
UNCOV
991
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
×
992
    }
993
  } else {
994
    pScan->pBatchTbInfo = NULL;
×
995
  }
996

UNCOV
997
  if (pTagList) {
×
UNCOV
998
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
×
UNCOV
999
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
×
1000

UNCOV
1001
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
×
UNCOV
1002
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
×
UNCOV
1003
      STagVal  dstTag;
×
UNCOV
1004
      dstTag.type = pSrcTag->type;
×
UNCOV
1005
      dstTag.cid = pSrcTag->cid;
×
UNCOV
1006
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
×
UNCOV
1007
        dstTag.nData = pSrcTag->nData;
×
UNCOV
1008
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
×
UNCOV
1009
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
×
UNCOV
1010
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
×
1011
      } else {
UNCOV
1012
        dstTag.i64 = pSrcTag->i64;
×
1013
      }
1014

UNCOV
1015
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
×
1016
    }
1017
  } else {
UNCOV
1018
    pScan->pTagList = NULL;
×
1019
  }
1020

1021

UNCOV
1022
  pScan->type = DYN_TYPE_VSTB_BATCH_SCAN;
×
UNCOV
1023
  pScan->tableSeq = tableSeq;
×
UNCOV
1024
  pScan->window.skey = window->skey;
×
UNCOV
1025
  pScan->window.ekey = window->ekey;
×
UNCOV
1026
  pScan->isNewParam = isNewParam;
×
UNCOV
1027
  (*ppRes)->opType = srcOpType;
×
UNCOV
1028
  (*ppRes)->downstreamIdx = 0;
×
UNCOV
1029
  (*ppRes)->value = pScan;
×
UNCOV
1030
  (*ppRes)->pChildren = NULL;
×
UNCOV
1031
  (*ppRes)->reUse = false;
×
1032

UNCOV
1033
  return code;
×
1034
_return:
×
1035
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1036
  taosMemoryFreeClear(*ppRes);
×
1037
  if (pScan) {
×
1038
    taosArrayDestroy(pScan->pUidList);
×
1039
    if (pScan->pBatchTbInfo) {
×
1040
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1041
    }
1042
    taosMemoryFree(pScan);
×
1043
  }
1044
  return code;
×
1045
}
1046

UNCOV
1047
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
×
UNCOV
1048
  int32_t                  code = TSDB_CODE_SUCCESS;
×
UNCOV
1049
  int32_t                  lino = 0;
×
1050

UNCOV
1051
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
1052
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
1053

UNCOV
1054
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
1055
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
×
1056

UNCOV
1057
  SOperatorParam* pTableScanParam = NULL;
×
UNCOV
1058
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
×
UNCOV
1059
  QUERY_CHECK_CODE(code, lino, _return);
×
1060

UNCOV
1061
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pTableScanParam), code, lino, _return, terrno);
×
1062

UNCOV
1063
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
×
UNCOV
1064
  (*ppRes)->downstreamIdx = 0;
×
UNCOV
1065
  (*ppRes)->value = NULL;
×
UNCOV
1066
  (*ppRes)->reUse = false;
×
1067

UNCOV
1068
_return:
×
UNCOV
1069
  return code;
×
1070
}
1071

1072
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
704,740✔
1073
  int32_t                  code = TSDB_CODE_SUCCESS;
704,740✔
1074
  int32_t                  lino = 0;
704,740✔
1075
  STagScanOperatorParam*   pScan = NULL;
704,740✔
1076

1077
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
704,740✔
1078
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
704,740✔
1079

1080
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
704,740✔
1081
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
704,740✔
1082
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
704,740✔
1083

1084
  (*ppRes)->opType = srcOpType;
704,740✔
1085
  (*ppRes)->downstreamIdx = 0;
704,740✔
1086
  (*ppRes)->value = pScan;
704,740✔
1087
  (*ppRes)->pChildren = NULL;
704,740✔
1088
  (*ppRes)->reUse = false;
704,740✔
1089

1090
  return code;
704,740✔
1091
_return:
×
1092
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1093
  taosMemoryFreeClear(*ppRes);
×
1094
  if (pScan) {
×
1095
    taosMemoryFree(pScan);
×
1096
  }
1097
  return code;
×
1098
}
1099

1100
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,118,057✔
1101
  if (!pRuntimeInfo || !pTimeRange) {
4,118,057✔
1102
    return TSDB_CODE_INTERNAL_ERROR;
×
1103
  }
1104

1105
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,118,057✔
1106
  if (!pParam) {
4,118,057✔
1107
    return TSDB_CODE_INTERNAL_ERROR;
×
1108
  }
1109

1110
  switch (pRuntimeInfo->triggerType) {
4,118,057✔
1111
    case STREAM_TRIGGER_SLIDING:
3,198,045✔
1112
      // Unable to distinguish whether there is an interval, all use wstart/wend
1113
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1114
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,198,045✔
1115
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,198,045✔
1116
      break;
3,198,045✔
1117
    case STREAM_TRIGGER_PERIOD:
192,953✔
1118
      pTimeRange->skey = pParam->prevLocalTime;
192,953✔
1119
      pTimeRange->ekey = pParam->triggerTime;
192,953✔
1120
      break;
192,953✔
1121
    default:
727,059✔
1122
      pTimeRange->skey = pParam->wstart;
727,059✔
1123
      pTimeRange->ekey = pParam->wend;
727,059✔
1124
      break;
727,059✔
1125
  }
1126

1127
  return TSDB_CODE_SUCCESS;
4,118,057✔
1128
}
1129

1130
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
156,178,989✔
1131
  int32_t          code = TSDB_CODE_SUCCESS;
156,178,989✔
1132
  int32_t          lino = 0;
156,178,989✔
1133
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
156,178,989✔
1134
  if (!pDataInfo) {
156,180,431✔
1135
    return terrno;
×
1136
  }
1137

1138
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
156,180,431✔
UNCOV
1139
    return TSDB_CODE_SUCCESS;
×
1140
  }
1141

1142
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
156,179,597✔
1143
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
156,180,589✔
1144
  if (!pSource) {
156,179,731✔
1145
    return terrno;
×
1146
  }
1147

1148
  pDataInfo->startTime = taosGetTimestampUs();
156,179,218✔
1149
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
156,179,388✔
1150

1151
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
156,179,203✔
1152
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
156,178,943✔
1153
  pWrapper->exchangeId = pExchangeInfo->self;
156,178,943✔
1154
  pWrapper->sourceIndex = sourceIndex;
156,180,768✔
1155
  pWrapper->seqId = pExchangeInfo->seqId;
156,181,125✔
1156

1157
  if (pSource->localExec) {
156,181,273✔
1158
    SDataBuf pBuf = {0};
×
1159
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
×
1160
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
1161
                                               pTaskInfo->localFetch.explainRes);
1162
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1163
    taosMemoryFree(pWrapper);
×
1164
    QUERY_CHECK_CODE(code, lino, _end);
×
1165
  } else {
1166
    bool needStreamPesudoFuncVals = true;
156,179,428✔
1167
    SResFetchReq req = {0};
156,179,428✔
1168
    req.header.vgId = pSource->addr.nodeId;
156,179,061✔
1169
    req.sId = pSource->sId;
156,179,932✔
1170
    req.clientId = pSource->clientId;
156,179,165✔
1171
    req.taskId = pSource->taskId;
156,177,828✔
1172
    req.queryId = pTaskInfo->id.queryId;
156,176,988✔
1173
    req.execId = pSource->execId;
156,179,047✔
1174
    if (pTaskInfo->pStreamRuntimeInfo) {
156,179,302✔
1175
      req.dynTbname = pExchangeInfo->dynTbname;
7,483,357✔
1176
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
7,483,357✔
1177
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
7,483,156✔
1178

1179
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
7,483,357✔
1180
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
21,348✔
1181
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
7,461,607✔
1182
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,118,057✔
1183
        QUERY_CHECK_CODE(code, lino, _end);
4,118,057✔
1184
        needStreamPesudoFuncVals = false;
4,118,057✔
1185
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,118,057✔
1186
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1187
               req.pStRtFuncInfo->curWindow.ekey);
1188
      }
1189
      if (!pDataInfo->fetchSent) {
7,483,156✔
1190
        req.reset = pDataInfo->fetchSent = true;
6,006,581✔
1191
      }
1192
    }
1193

1194
    switch (pDataInfo->type) {
156,180,118✔
1195
      case EX_SRC_TYPE_VSTB_SCAN: {
3,480,816✔
1196
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
3,480,816✔
1197
        taosArrayDestroy(pDataInfo->orgTbInfo->colMap);
3,480,816✔
1198
        taosMemoryFreeClear(pDataInfo->orgTbInfo);
3,480,816✔
1199
        taosArrayDestroy(pDataInfo->pSrcUidList);
3,480,816✔
1200
        pDataInfo->pSrcUidList = NULL;
3,480,816✔
1201
        if (TSDB_CODE_SUCCESS != code) {
3,480,816✔
1202
          pTaskInfo->code = code;
×
1203
          taosMemoryFree(pWrapper);
×
1204
          return pTaskInfo->code;
×
1205
        }
1206
        break;
3,480,816✔
1207
      }
1208
      case EX_SRC_TYPE_VSTB_WIN_SCAN: {
×
1209
        if (pDataInfo->pSrcUidList) {
×
1210
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false);
×
1211
          taosArrayDestroy(pDataInfo->pSrcUidList);
×
1212
          pDataInfo->pSrcUidList = NULL;
×
1213
          if (TSDB_CODE_SUCCESS != code) {
×
1214
            pTaskInfo->code = code;
×
1215
            taosMemoryFree(pWrapper);
×
1216
            return pTaskInfo->code;
×
1217
          }
1218
        }
1219
        break;
×
1220
      }
1221
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
704,740✔
1222
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
704,740✔
1223
        taosArrayDestroy(pDataInfo->pSrcUidList);
704,740✔
1224
        pDataInfo->pSrcUidList = NULL;
704,740✔
1225
        if (TSDB_CODE_SUCCESS != code) {
704,740✔
1226
          pTaskInfo->code = code;
×
1227
          taosMemoryFree(pWrapper);
×
1228
          return pTaskInfo->code;
×
1229
        }
1230
        break;
704,740✔
1231
      }
UNCOV
1232
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
×
UNCOV
1233
        if (pDataInfo->batchOrgTbInfo) {
×
UNCOV
1234
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
×
UNCOV
1235
          if (pDataInfo->batchOrgTbInfo) {
×
UNCOV
1236
            for (int32_t i = 0; i < taosArrayGetSize(pDataInfo->batchOrgTbInfo); ++i) {
×
UNCOV
1237
              SOrgTbInfo* pColMap = taosArrayGet(pDataInfo->batchOrgTbInfo, i);
×
UNCOV
1238
              if (pColMap) {
×
UNCOV
1239
                taosArrayDestroy(pColMap->colMap);
×
1240
              }
1241
            }
UNCOV
1242
            taosArrayDestroy(pDataInfo->batchOrgTbInfo);
×
UNCOV
1243
            pDataInfo->batchOrgTbInfo = NULL;
×
1244
          }
UNCOV
1245
          if (pDataInfo->tagList) {
×
UNCOV
1246
            taosArrayDestroyEx(pDataInfo->tagList, destroyTagVal);
×
UNCOV
1247
            pDataInfo->tagList = NULL;
×
1248
          }
UNCOV
1249
          if (pDataInfo->pSrcUidList) {
×
UNCOV
1250
            taosArrayDestroy(pDataInfo->pSrcUidList);
×
UNCOV
1251
            pDataInfo->pSrcUidList = NULL;
×
1252
          }
1253

UNCOV
1254
          if (TSDB_CODE_SUCCESS != code) {
×
1255
            pTaskInfo->code = code;
×
1256
            taosMemoryFree(pWrapper);
×
1257
            return pTaskInfo->code;
×
1258
          }
1259
        }
UNCOV
1260
        break;
×
1261
      }
1262
      case EX_SRC_TYPE_STB_JOIN_SCAN:
151,990,775✔
1263
      default: {
1264
        if (pDataInfo->pSrcUidList) {
151,990,775✔
1265
          code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
224,011✔
1266
          taosArrayDestroy(pDataInfo->pSrcUidList);
224,011✔
1267
          pDataInfo->pSrcUidList = NULL;
224,011✔
1268
          if (TSDB_CODE_SUCCESS != code) {
224,011✔
1269
            pTaskInfo->code = code;
×
1270
            taosMemoryFree(pWrapper);
×
1271
            return pTaskInfo->code;
×
1272
          }
1273
        }
1274
        break;
151,993,585✔
1275
      }
1276
    }
1277

1278
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
156,179,141✔
1279
    if (msgSize < 0) {
156,175,822✔
1280
      pTaskInfo->code = msgSize;
×
1281
      taosMemoryFree(pWrapper);
×
1282
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1283
      return pTaskInfo->code;
×
1284
    }
1285

1286
    void* msg = taosMemoryCalloc(1, msgSize);
156,175,822✔
1287
    if (NULL == msg) {
156,175,768✔
1288
      pTaskInfo->code = terrno;
×
1289
      taosMemoryFree(pWrapper);
×
1290
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1291
      return pTaskInfo->code;
×
1292
    }
1293

1294
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
156,175,768✔
1295
    if (msgSize < 0) {
156,174,179✔
1296
      pTaskInfo->code = msgSize;
×
1297
      taosMemoryFree(pWrapper);
×
1298
      taosMemoryFree(msg);
×
1299
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1300
      return pTaskInfo->code;
×
1301
    }
1302

1303
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
156,174,179✔
1304

1305
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
156,173,536✔
1306
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1307
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1308
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1309

1310
    // send the fetch remote task result reques
1311
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
156,179,222✔
1312
    if (NULL == pMsgSendInfo) {
156,177,822✔
1313
      taosMemoryFreeClear(msg);
×
1314
      taosMemoryFree(pWrapper);
×
1315
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1316
      pTaskInfo->code = terrno;
×
1317
      return pTaskInfo->code;
×
1318
    }
1319

1320
    pMsgSendInfo->param = pWrapper;
156,177,822✔
1321
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
156,180,563✔
1322
    pMsgSendInfo->msgInfo.pData = msg;
156,179,877✔
1323
    pMsgSendInfo->msgInfo.len = msgSize;
156,178,459✔
1324
    pMsgSendInfo->msgType = pSource->fetchMsgType;
156,178,885✔
1325
    pMsgSendInfo->fp = loadRemoteDataCallback;
156,179,085✔
1326
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
156,178,947✔
1327

1328
    int64_t transporterId = 0;
156,181,407✔
1329
    void* poolHandle = NULL;
156,180,737✔
1330
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
156,180,737✔
1331
    QUERY_CHECK_CODE(code, lino, _end);
156,180,133✔
1332
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
156,180,133✔
1333
    *pRpcHandle = transporterId;
156,180,758✔
1334
  }
1335

1336
_end:
156,180,281✔
1337
  if (code != TSDB_CODE_SUCCESS) {
156,180,281✔
1338
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1339
  }
1340
  return code;
156,179,437✔
1341
}
1342

1343
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
116,827,950✔
1344
                          SOperatorInfo* pOperator) {
1345
  pInfo->totalRows += numOfRows;
116,827,950✔
1346
  pInfo->totalSize += dataLen;
116,827,950✔
1347
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
116,827,950✔
1348
  pOperator->resultInfo.totalRows += numOfRows;
116,827,950✔
1349
}
116,827,950✔
1350

1351
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
374,043,312✔
1352
  int32_t      code = TSDB_CODE_SUCCESS;
374,043,312✔
1353
  int32_t      lino = 0;
374,043,312✔
1354
  SSDataBlock* pBlock = NULL;
374,043,312✔
1355
  if (pColList == NULL) {  // data from other sources
374,043,312✔
1356
    blockDataCleanup(pRes);
370,003,715✔
1357
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
370,002,665✔
1358
    if (code) {
370,000,367✔
1359
      return code;
×
1360
    }
1361
  } else {  // extract data according to pColList
1362
    char* pStart = pData;
4,039,597✔
1363

1364
    int32_t numOfCols = htonl(*(int32_t*)pStart);
4,039,597✔
1365
    pStart += sizeof(int32_t);
4,039,999✔
1366

1367
    // todo refactor:extract method
1368
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
4,039,999✔
1369
    for (int32_t i = 0; i < numOfCols; ++i) {
56,347,774✔
1370
      SSysTableSchema* p = (SSysTableSchema*)pStart;
52,307,775✔
1371

1372
      p->colId = htons(p->colId);
52,307,775✔
1373
      p->bytes = htonl(p->bytes);
52,307,775✔
1374
      pStart += sizeof(SSysTableSchema);
52,307,775✔
1375
    }
1376

1377
    pBlock = NULL;
4,039,999✔
1378
    code = createDataBlock(&pBlock);
4,039,999✔
1379
    QUERY_CHECK_CODE(code, lino, _end);
4,039,999✔
1380

1381
    for (int32_t i = 0; i < numOfCols; ++i) {
56,347,774✔
1382
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
52,307,775✔
1383
      code = blockDataAppendColInfo(pBlock, &idata);
52,307,775✔
1384
      QUERY_CHECK_CODE(code, lino, _end);
52,307,295✔
1385
    }
1386

1387
    code = blockDecodeInternal(pBlock, pStart, NULL);
4,039,999✔
1388
    QUERY_CHECK_CODE(code, lino, _end);
4,039,999✔
1389

1390
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
4,039,999✔
1391
    QUERY_CHECK_CODE(code, lino, _end);
4,039,999✔
1392

1393
    // data from mnode
1394
    pRes->info.dataLoad = 1;
4,039,999✔
1395
    pRes->info.rows = pBlock->info.rows;
4,039,999✔
1396
    pRes->info.scanFlag = MAIN_SCAN;
4,039,999✔
1397
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
4,039,999✔
1398
    QUERY_CHECK_CODE(code, lino, _end);
4,039,999✔
1399

1400
    blockDataDestroy(pBlock);
4,039,999✔
1401
    pBlock = NULL;
4,039,999✔
1402
  }
1403

1404
_end:
374,039,886✔
1405
  if (code != TSDB_CODE_SUCCESS) {
374,040,768✔
1406
    blockDataDestroy(pBlock);
×
1407
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1408
  }
1409
  return code;
374,040,768✔
1410
}
1411

1412
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
63,430,342✔
1413
  SExchangeInfo* pExchangeInfo = pOperator->info;
63,430,342✔
1414
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
63,430,342✔
1415

1416
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
63,430,342✔
1417
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
63,430,342✔
1418
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
63,430,342✔
1419
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1420
         pLoadInfo->totalElapsed / 1000.0);
1421

1422
  setOperatorCompleted(pOperator);
63,430,342✔
1423
}
63,430,342✔
1424

1425
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
235,968,574✔
1426
  int32_t code = TSDB_CODE_SUCCESS;
235,968,574✔
1427
  int32_t lino = 0;
235,968,574✔
1428
  size_t  total = taosArrayGetSize(pArray);
235,968,574✔
1429

1430
  int32_t completed = 0;
235,968,964✔
1431
  for (int32_t k = 0; k < total; ++k) {
687,033,772✔
1432
    SSourceDataInfo* p = taosArrayGet(pArray, k);
451,066,600✔
1433
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
451,066,251✔
1434
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
451,066,251✔
1435
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
218,319,869✔
1436
      completed += 1;
218,320,350✔
1437
    }
1438
  }
1439

1440
  *pRes = completed;
235,967,172✔
1441
_end:
235,968,931✔
1442
  if (code != TSDB_CODE_SUCCESS) {
235,968,931✔
1443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1444
  }
1445
  return code;
235,968,636✔
1446
}
1447

1448
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
82,554,076✔
1449
  SExchangeInfo* pExchangeInfo = pOperator->info;
82,554,076✔
1450
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
82,554,546✔
1451

1452
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
82,553,741✔
1453
  int64_t startTs = taosGetTimestampUs();
82,553,578✔
1454

1455
  // Asynchronously send all fetch requests to all sources.
1456
  for (int32_t i = 0; i < totalSources; ++i) {
220,737,399✔
1457
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
138,183,190✔
1458
    if (code != TSDB_CODE_SUCCESS) {
138,183,821✔
UNCOV
1459
      pTaskInfo->code = code;
×
1460
      return code;
×
1461
    }
1462
  }
1463

1464
  int64_t endTs = taosGetTimestampUs();
82,555,551✔
1465
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
82,555,551✔
1466
         totalSources, (endTs - startTs) / 1000.0);
1467

1468
  pOperator->status = OP_RES_TO_RETURN;
82,555,551✔
1469
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
82,555,042✔
1470
  if (isTaskKilled(pTaskInfo)) {
82,555,403✔
UNCOV
1471
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1472
  }
1473

1474
  return TSDB_CODE_SUCCESS;
82,554,559✔
1475
}
1476

1477
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
112,787,951✔
1478
  int32_t            code = TSDB_CODE_SUCCESS;
112,787,951✔
1479
  int32_t            lino = 0;
112,787,951✔
1480
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
112,787,951✔
1481
  SSDataBlock*       pb = NULL;
112,787,951✔
1482

1483
  char* pNextStart = pRetrieveRsp->data;
112,787,951✔
1484
  char* pStart = pNextStart;
112,787,951✔
1485

1486
  int32_t index = 0;
112,787,951✔
1487

1488
  if (pRetrieveRsp->compressed) {  // decompress the data
112,787,951✔
1489
    if (pDataInfo->decompBuf == NULL) {
×
1490
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1491
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1492
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1493
    } else {
1494
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1495
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1496
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1497
        if (p != NULL) {
×
1498
          pDataInfo->decompBuf = p;
×
1499
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1500
        }
1501
      }
1502
    }
1503
  }
1504

1505
  while (index++ < pRetrieveRsp->numOfBlocks) {
482,790,905✔
1506
    pStart = pNextStart;
370,003,715✔
1507

1508
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
370,003,715✔
1509
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
155,490,717✔
1510
      blockDataCleanup(pb);
155,490,717✔
1511
    } else {
1512
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
214,513,197✔
1513
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
214,513,278✔
1514
    }
1515

1516
    int32_t compLen = *(int32_t*)pStart;
370,003,995✔
1517
    pStart += sizeof(int32_t);
370,003,995✔
1518

1519
    int32_t rawLen = *(int32_t*)pStart;
370,003,995✔
1520
    pStart += sizeof(int32_t);
370,003,792✔
1521
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
370,003,625✔
1522

1523
    pNextStart = pStart + compLen;
370,003,625✔
1524
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
370,003,625✔
1525
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1526
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1527
      pStart = pDataInfo->decompBuf;
×
1528
    }
1529

1530
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
370,003,625✔
1531
    if (code != 0) {
370,000,570✔
1532
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1533
      goto _end;
×
1534
    }
1535

1536
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
370,000,570✔
1537
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
370,002,345✔
1538
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
370,002,345✔
1539
    pb = NULL;
370,003,914✔
1540
  }
1541

1542
_end:
112,787,951✔
1543
  if (code != TSDB_CODE_SUCCESS) {
112,787,951✔
1544
    blockDataDestroy(pb);
×
1545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1546
  }
1547
  return code;
112,787,951✔
1548
}
1549

1550
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
4,340,811✔
1551
  SExchangeInfo* pExchangeInfo = pOperator->info;
4,340,811✔
1552
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,340,811✔
1553

1554
  int32_t code = 0;
4,340,811✔
1555
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
4,340,811✔
1556
  int64_t startTs = taosGetTimestampUs();
4,340,811✔
1557

1558
  int32_t vgId = 0;
4,340,811✔
1559
  if (pExchangeInfo->dynTbname) {
4,340,811✔
1560
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
87,920✔
1561
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
87,920✔
1562
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
87,920✔
1563
      if (pValue != NULL && pValue->isTbname) {
87,920✔
1564
        vgId = pValue->vgId;
87,920✔
1565
        break;
87,920✔
1566
      }
1567
    }
1568
  }
1569

1570
  while (1) {
1,342,115✔
1571
    if (pExchangeInfo->current >= totalSources) {
5,682,926✔
1572
      setAllSourcesCompleted(pOperator);
1,302,870✔
1573
      return TSDB_CODE_SUCCESS;
1,302,870✔
1574
    }
1575

1576
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
4,380,056✔
1577
    if (!pSource) {
4,380,056✔
1578
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1579
      pTaskInfo->code = terrno;
×
1580
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1581
    }
1582

1583
    if (vgId != 0 && pSource->addr.nodeId != vgId){
4,380,056✔
1584
      pExchangeInfo->current += 1;
67,704✔
1585
      continue;
67,704✔
1586
    }
1587

1588
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
4,312,352✔
1589
    if (!pDataInfo) {
4,312,352✔
1590
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1591
      pTaskInfo->code = terrno;
×
1592
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1593
    }
1594
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
4,312,352✔
1595

1596
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
4,312,352✔
1597
    if (code != TSDB_CODE_SUCCESS) {
4,312,352✔
1598
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1599
      pTaskInfo->code = code;
×
1600
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1601
    }
1602

1603
    while (true) {
364✔
1604
      code = exchangeWait(pOperator, pExchangeInfo);
4,312,716✔
1605
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
4,312,716✔
1606
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
364✔
1607
      }
1608

1609
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
4,312,352✔
1610
      if (pDataInfo->seqId != currSeqId) {
4,312,352✔
1611
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
364✔
1612
        taosMemoryFreeClear(pDataInfo->pRsp);
364✔
1613
        continue;
364✔
1614
      }
1615

1616
      break;
4,311,988✔
1617
    }
1618

1619
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
4,311,988✔
1620
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
609✔
1621
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1622
             tstrerror(pDataInfo->code));
1623
      pOperator->pTaskInfo->code = pDataInfo->code;
609✔
1624
      return pOperator->pTaskInfo->code;
609✔
1625
    }
1626

1627
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
4,311,379✔
1628
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
4,311,379✔
1629

1630
    if (pRsp->numOfRows == 0) {
4,311,379✔
1631
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
1,274,411✔
1632
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1633
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1634
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1635

1636
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,274,411✔
1637
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
1,274,411✔
1638
        pExchangeInfo->current = totalSources;
1,176,173✔
1639
      } else {
1640
        pExchangeInfo->current += 1;
98,238✔
1641
      }
1642
      taosMemoryFreeClear(pDataInfo->pRsp);
1,274,411✔
1643
      continue;
1,274,411✔
1644
    }
1645

1646
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,036,968✔
1647
    if (code != TSDB_CODE_SUCCESS) {
3,036,968✔
1648
      goto _error;
×
1649
    }
1650

1651
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,036,968✔
1652
    if (pRsp->completed == 1) {
3,036,968✔
1653
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
107,181✔
1654
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1655
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1656
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1657
             pExchangeInfo->current + 1, totalSources);
1658

1659
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
107,181✔
1660
      if (isVstbScan(pDataInfo)) {
107,181✔
1661
        pExchangeInfo->current = totalSources;
×
1662
      } else {
1663
        pExchangeInfo->current += 1;
107,181✔
1664
      }
1665
    } else {
1666
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
2,929,787✔
1667
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1668
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1669
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1670
    }
1671
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
3,036,968✔
1672
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
2,843,477✔
1673
    }
1674
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
3,036,968✔
1675
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,036,968✔
1676

1677
    taosMemoryFreeClear(pDataInfo->pRsp);
3,036,968✔
1678
    return TSDB_CODE_SUCCESS;
3,036,968✔
1679
  }
1680

1681
_error:
×
1682
  pTaskInfo->code = code;
×
1683
  return code;
×
1684
}
1685

1686
void clearVtbScanDataInfo(void* pItem) {
1,028,050✔
1687
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
1,028,050✔
1688
  if (pInfo->orgTbInfo) {
1,028,050✔
1689
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
×
1690
    taosMemoryFreeClear(pInfo->orgTbInfo);
×
1691
  }
1692
  if (pInfo->batchOrgTbInfo) {
1,028,050✔
1693
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
×
1694
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
×
1695
      if (pColMap) {
×
1696
        taosArrayDestroy(pColMap->colMap);
×
1697
      }
1698
    }
1699
    taosArrayDestroy(pInfo->batchOrgTbInfo);
×
1700
  }
1701
  if (pInfo->tagList) {
1,028,050✔
1702
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
×
1703
    pInfo->tagList = NULL;
×
1704
  }
1705
  taosArrayDestroy(pInfo->pSrcUidList);
1,028,050✔
1706
}
1,028,050✔
1707

UNCOV
1708
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
×
UNCOV
1709
  int32_t  code = TSDB_CODE_SUCCESS;
×
UNCOV
1710
  int32_t  lino = 0;
×
UNCOV
1711
  STagVal  dstTag;
×
UNCOV
1712
  bool     needFree = false;
×
1713

UNCOV
1714
  if (pDataInfo->tagList) {
×
1715
    taosArrayClear(pDataInfo->tagList);
×
1716
  }
1717

UNCOV
1718
  if (pBasicParam->tagList) {
×
UNCOV
1719
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
×
UNCOV
1720
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
×
1721

UNCOV
1722
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
×
UNCOV
1723
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
×
UNCOV
1724
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
×
1725

UNCOV
1726
      dstTag = (STagVal){0};
×
UNCOV
1727
      dstTag.type = pSrcTag->type;
×
UNCOV
1728
      dstTag.cid = pSrcTag->cid;
×
UNCOV
1729
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
×
UNCOV
1730
        dstTag.nData = pSrcTag->nData;
×
UNCOV
1731
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
×
UNCOV
1732
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
×
UNCOV
1733
        needFree = true;
×
UNCOV
1734
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
×
1735
      } else {
UNCOV
1736
        dstTag.i64 = pSrcTag->i64;
×
1737
      }
1738

UNCOV
1739
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
×
UNCOV
1740
      needFree = false;
×
1741
    }
1742
  } else {
UNCOV
1743
    pDataInfo->tagList = NULL;
×
1744
  }
1745

UNCOV
1746
  return code;
×
1747
_return:
×
1748
  if (needFree) {
×
1749
    taosMemoryFreeClear(dstTag.pData);
×
1750
  }
1751
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1752
  return code;
×
1753
}
1754

UNCOV
1755
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
×
UNCOV
1756
  int32_t     code = TSDB_CODE_SUCCESS;
×
UNCOV
1757
  int32_t     lino = 0;
×
UNCOV
1758
  SOrgTbInfo  dstOrgTbInfo = {0};
×
UNCOV
1759
  bool        needFree = false;
×
1760

UNCOV
1761
  if (pBasicParam->batchOrgTbInfo) {
×
UNCOV
1762
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
UNCOV
1763
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
×
1764

UNCOV
1765
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
×
UNCOV
1766
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
×
UNCOV
1767
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
×
1768

UNCOV
1769
      dstOrgTbInfo = (SOrgTbInfo){0};
×
UNCOV
1770
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
×
UNCOV
1771
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
×
1772

UNCOV
1773
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
×
UNCOV
1774
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
×
1775

UNCOV
1776
      needFree = true;
×
UNCOV
1777
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
×
UNCOV
1778
      needFree = false;
×
1779
    }
1780
  } else {
1781
    pBasicParam->batchOrgTbInfo = NULL;
×
1782
  }
1783

UNCOV
1784
  return code;
×
1785
_return:
×
1786
  if (needFree) {
×
1787
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
1788
  }
1789
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1790
  return code;
×
1791
}
1792

1793
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
4,409,567✔
1794
  int32_t            code = TSDB_CODE_SUCCESS;
4,409,567✔
1795
  int32_t            lino = 0;
4,409,567✔
1796
  SExchangeInfo*     pExchangeInfo = pOperator->info;
4,409,567✔
1797
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
4,409,567✔
1798

1799
  if (NULL == pIdx) {
4,409,567✔
1800
    if (pBasicParam->isNewDeployed) {
2,189✔
1801
      SDownstreamSourceNode *pNode = NULL;
2,189✔
1802
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,189✔
1803
      QUERY_CHECK_CODE(code, lino, _return);
2,189✔
1804

1805
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,189✔
1806
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,189✔
1807
      QUERY_CHECK_CODE(code, lino, _return);
2,189✔
1808

1809
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,189✔
1810
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,189✔
1811

1812
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,189✔
1813
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,189✔
1814
      if (pExchangeInfo->pHashSources) {
2,189✔
1815
        QUERY_CHECK_CODE(code, lino, _return);
2,189✔
1816
      }
1817
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,189✔
1818
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,189✔
1819
    } else {
1820
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1821
      return TSDB_CODE_INVALID_PARA;
×
1822
    }
1823
  }
1824

1825
  qDebug("start to add single exchange source");
4,409,567✔
1826

1827
  switch (pBasicParam->type) {
4,409,567✔
UNCOV
1828
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
×
UNCOV
1829
      if (pIdx->inUseIdx < 0) {
×
UNCOV
1830
        SSourceDataInfo dataInfo = {0};
×
UNCOV
1831
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
×
UNCOV
1832
        dataInfo.taskId = pExchangeInfo->pTaskId;
×
UNCOV
1833
        dataInfo.index = pIdx->srcIdx;
×
UNCOV
1834
        dataInfo.groupid = pBasicParam->groupid;
×
UNCOV
1835
        dataInfo.window = pBasicParam->window;
×
UNCOV
1836
        dataInfo.isNewParam = pBasicParam->isNewParam;
×
UNCOV
1837
        dataInfo.batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
UNCOV
1838
        QUERY_CHECK_NULL(dataInfo.batchOrgTbInfo, code, lino, _return, terrno);
×
1839

UNCOV
1840
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
×
UNCOV
1841
        QUERY_CHECK_CODE(code, lino, _return);
×
1842

UNCOV
1843
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
×
UNCOV
1844
        QUERY_CHECK_CODE(code, lino, _return);
×
1845

UNCOV
1846
        dataInfo.orgTbInfo = NULL;
×
1847

UNCOV
1848
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
UNCOV
1849
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
×
1850

UNCOV
1851
        dataInfo.type = pBasicParam->type;
×
UNCOV
1852
        dataInfo.srcOpType = pBasicParam->srcOpType;
×
UNCOV
1853
        dataInfo.tableSeq = pBasicParam->tableSeq;
×
1854

UNCOV
1855
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
×
1856

UNCOV
1857
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
×
1858
      } else {
UNCOV
1859
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
×
UNCOV
1860
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
×
1861

UNCOV
1862
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
×
UNCOV
1863
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
1864
        }
1865

UNCOV
1866
        pDataInfo->taskId = pExchangeInfo->pTaskId;
×
UNCOV
1867
        pDataInfo->index = pIdx->srcIdx;
×
UNCOV
1868
        pDataInfo->window = pBasicParam->window;
×
UNCOV
1869
        pDataInfo->groupid = pBasicParam->groupid;
×
UNCOV
1870
        pDataInfo->isNewParam = pBasicParam->isNewParam;
×
1871

UNCOV
1872
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
×
UNCOV
1873
        QUERY_CHECK_CODE(code, lino, _return);
×
1874

UNCOV
1875
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
×
UNCOV
1876
        QUERY_CHECK_CODE(code, lino, _return);
×
1877

UNCOV
1878
        pDataInfo->orgTbInfo = NULL;
×
1879

UNCOV
1880
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
UNCOV
1881
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
×
1882

UNCOV
1883
        pDataInfo->type = pBasicParam->type;
×
UNCOV
1884
        pDataInfo->srcOpType = pBasicParam->srcOpType;
×
UNCOV
1885
        pDataInfo->tableSeq = pBasicParam->tableSeq;
×
1886
      }
UNCOV
1887
      break;
×
1888
    }
1889
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
704,740✔
1890
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
1891
      SSourceDataInfo dataInfo = {0};
704,740✔
1892
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
704,740✔
1893
      dataInfo.taskId = pExchangeInfo->pTaskId;
704,740✔
1894
      dataInfo.index = pIdx->srcIdx;
704,740✔
1895
      dataInfo.window = pBasicParam->window;
704,740✔
1896
      dataInfo.groupid = 0;
704,740✔
1897
      dataInfo.orgTbInfo = NULL;
704,740✔
1898
      dataInfo.tagList = NULL;
704,740✔
1899

1900
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
704,740✔
1901
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
704,740✔
1902

1903
      dataInfo.isNewParam = false;
704,740✔
1904
      dataInfo.type = pBasicParam->type;
704,740✔
1905
      dataInfo.srcOpType = pBasicParam->srcOpType;
704,740✔
1906
      dataInfo.tableSeq = pBasicParam->tableSeq;
704,740✔
1907

1908
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
704,740✔
1909
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
1,409,480✔
1910
      break;
704,740✔
1911
    }
1912
    case EX_SRC_TYPE_VSTB_SCAN: {
3,480,816✔
1913
      SSourceDataInfo dataInfo = {0};
3,480,816✔
1914
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
3,480,816✔
1915
      dataInfo.taskId = pExchangeInfo->pTaskId;
3,480,816✔
1916
      dataInfo.index = pIdx->srcIdx;
3,480,816✔
1917
      dataInfo.window = pBasicParam->window;
3,480,816✔
1918
      dataInfo.groupid = 0;
3,480,816✔
1919
      dataInfo.isNewParam = pBasicParam->isNewParam;
3,480,816✔
1920
      dataInfo.tagList = NULL;
3,480,816✔
1921
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
3,480,816✔
1922
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
3,480,816✔
1923
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
3,480,816✔
1924
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
3,480,816✔
1925
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
3,480,816✔
1926
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
3,480,816✔
1927

1928
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
3,480,816✔
1929
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
3,480,816✔
1930

1931
      dataInfo.type = pBasicParam->type;
3,480,816✔
1932
      dataInfo.srcOpType = pBasicParam->srcOpType;
3,480,816✔
1933
      dataInfo.tableSeq = pBasicParam->tableSeq;
3,480,816✔
1934

1935
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
3,480,816✔
1936
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
6,961,632✔
1937
      break;
3,480,816✔
1938
    }
1939
    case EX_SRC_TYPE_STB_JOIN_SCAN:
224,011✔
1940
    default: {
1941
      if (pIdx->inUseIdx < 0) {
224,011✔
1942
        SSourceDataInfo dataInfo = {0};
221,881✔
1943
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
221,881✔
1944
        dataInfo.taskId = pExchangeInfo->pTaskId;
221,881✔
1945
        dataInfo.index = pIdx->srcIdx;
221,881✔
1946
        dataInfo.groupid = 0;
221,881✔
1947
        dataInfo.tagList = NULL;
221,881✔
1948

1949
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
221,881✔
1950
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
221,881✔
1951

1952
        dataInfo.isNewParam = false;
221,881✔
1953
        dataInfo.type = pBasicParam->type;
221,881✔
1954
        dataInfo.srcOpType = pBasicParam->srcOpType;
221,881✔
1955
        dataInfo.tableSeq = pBasicParam->tableSeq;
221,881✔
1956

1957
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
443,762✔
1958

1959
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
221,881✔
1960
      } else {
1961
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,130✔
1962
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,130✔
1963
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,130✔
1964
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,130✔
1965
        }
1966

1967
        pDataInfo->tagList = NULL;
2,130✔
1968
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,130✔
1969
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,130✔
1970

1971
        pDataInfo->groupid = 0;
2,130✔
1972
        pDataInfo->isNewParam = false;
2,130✔
1973
        pDataInfo->type = pBasicParam->type;
2,130✔
1974
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,130✔
1975
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,130✔
1976
      }
1977
      break;
224,011✔
1978
    }
1979
  }
1980

1981
  return code;
4,409,567✔
1982
_return:
×
1983
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1984
  return code;
×
1985
}
1986

1987
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
4,313,639✔
1988
  SExchangeInfo*               pExchangeInfo = pOperator->info;
4,313,639✔
1989
  int32_t                      code = TSDB_CODE_SUCCESS;
4,313,639✔
1990
  SExchangeOperatorBasicParam* pBasicParam = NULL;
4,313,639✔
1991
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
4,313,639✔
1992
  if (pParam->multiParams) {
4,313,639✔
1993
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
123,823✔
1994
    int32_t                      iter = 0;
123,823✔
1995
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
343,574✔
1996
      code = addSingleExchangeSource(pOperator, pBasicParam);
219,751✔
1997
      if (code) {
219,751✔
1998
        return code;
×
1999
      }
2000
    }
2001
  } else {
2002
    pBasicParam = &pParam->basic;
4,189,816✔
2003
    code = addSingleExchangeSource(pOperator, pBasicParam);
4,189,816✔
2004
  }
2005

2006
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
4,313,639✔
2007
  pOperator->pOperatorGetParam = NULL;
4,313,639✔
2008

2009
  return code;
4,313,639✔
2010
}
2011

2012
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
476,445,886✔
2013
  SExchangeInfo* pExchangeInfo = pOperator->info;
476,445,886✔
2014
  int32_t        code = TSDB_CODE_SUCCESS;
476,447,925✔
2015
  int32_t        lino = 0;
476,447,925✔
2016
  
2017
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
476,447,925✔
2018
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
92,549,292✔
2019
    qDebug("skip prepare, opened:%d, dynamicOp:%d, getParam:%p", OPTR_IS_OPENED(pOperator), pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
384,376,773✔
2020
    return TSDB_CODE_SUCCESS;
384,377,812✔
2021
  }
2022

2023
  if (pExchangeInfo->dynamicOp) {
92,069,461✔
2024
    code = addDynamicExchangeSource(pOperator);
4,313,639✔
2025
    QUERY_CHECK_CODE(code, lino, _end);
4,313,639✔
2026
  }
2027

2028
  if (pOperator->status == OP_NOT_OPENED && (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) || IS_STREAM_MODE(pOperator->pTaskInfo)) {
92,068,786✔
2029
    pExchangeInfo->current = 0;
8,967,303✔
2030
  }
2031

2032
  int64_t st = taosGetTimestampUs();
92,066,841✔
2033

2034
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
92,066,841✔
2035
    code = prepareConcurrentlyLoad(pOperator);
82,553,410✔
2036
    QUERY_CHECK_CODE(code, lino, _end);
82,554,707✔
2037
    pExchangeInfo->openedTs = taosGetTimestampUs();
82,554,559✔
2038
  }
2039

2040
  OPTR_SET_OPENED(pOperator);
92,070,612✔
2041
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
92,069,543✔
2042

2043
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
92,069,429✔
2044

2045
_end:
23,975,420✔
2046
  if (code != TSDB_CODE_SUCCESS) {
92,068,250✔
2047
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2048
    pOperator->pTaskInfo->code = code;
×
2049
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2050
  }
2051
  return TSDB_CODE_SUCCESS;
92,068,250✔
2052
}
2053

2054
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
3,842,034✔
2055
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,842,034✔
2056

2057
  if (pLimitInfo->remainGroupOffset > 0) {
3,842,034✔
2058
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2059
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2060
      blockDataCleanup(pBlock);
×
2061
      return PROJECT_RETRIEVE_CONTINUE;
×
2062
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2063
      // now it is the data from a new group
2064
      pLimitInfo->remainGroupOffset -= 1;
×
2065

2066
      // ignore data block in current group
2067
      if (pLimitInfo->remainGroupOffset > 0) {
×
2068
        blockDataCleanup(pBlock);
×
2069
        return PROJECT_RETRIEVE_CONTINUE;
×
2070
      }
2071
    }
2072

2073
    // set current group id of the project operator
2074
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2075
  }
2076

2077
  // here check for a new group data, we need to handle the data of the previous group.
2078
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
3,842,034✔
2079
    pLimitInfo->numOfOutputGroups += 1;
180,998✔
2080
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
180,998✔
2081
      pOperator->status = OP_EXEC_DONE;
×
2082
      blockDataCleanup(pBlock);
×
2083

2084
      return PROJECT_RETRIEVE_DONE;
×
2085
    }
2086

2087
    // reset the value for a new group data
2088
    resetLimitInfoForNextGroup(pLimitInfo);
180,998✔
2089
    // existing rows that belongs to previous group.
2090
    if (pBlock->info.rows > 0) {
180,998✔
2091
      return PROJECT_RETRIEVE_DONE;
180,998✔
2092
    }
2093
  }
2094

2095
  // here we reach the start position, according to the limit/offset requirements.
2096

2097
  // set current group id
2098
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,661,036✔
2099

2100
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,661,036✔
2101
  if (pBlock->info.rows == 0) {
3,661,036✔
2102
    return PROJECT_RETRIEVE_CONTINUE;
1,888,345✔
2103
  } else {
2104
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,772,691✔
2105
      setOperatorCompleted(pOperator);
×
2106
      return PROJECT_RETRIEVE_DONE;
×
2107
    }
2108
  }
2109

2110
  // todo optimize performance
2111
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2112
  // they may not belong to the same group the limit/offset value is not valid in this case.
2113
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,772,691✔
2114
    return PROJECT_RETRIEVE_DONE;
1,772,691✔
2115
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2116
    return PROJECT_RETRIEVE_CONTINUE;
×
2117
  }
2118
}
2119

2120
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
155,972,507✔
2121
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
155,972,507✔
2122
  int32_t        code = TSDB_CODE_SUCCESS;
155,973,351✔
2123
  if (pTask->pWorkerCb) {
155,973,351✔
2124
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
155,973,325✔
2125
    if (code != TSDB_CODE_SUCCESS) {
155,973,834✔
2126
      pTask->code = code;
×
2127
      return pTask->code;
×
2128
    }
2129
  }
2130

2131
  code = tsem_wait(&pExchangeInfo->ready);
155,973,016✔
2132
  if (code != TSDB_CODE_SUCCESS) {
155,973,834✔
2133
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2134
    pTask->code = code;
×
2135
    return pTask->code;
×
2136
  }
2137

2138
  if (pTask->pWorkerCb) {
155,973,834✔
2139
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
155,973,834✔
2140
    if (code != TSDB_CODE_SUCCESS) {
155,973,834✔
2141
      pTask->code = code;
×
2142
      return pTask->code;
×
2143
    }
2144
  }
2145
  return TSDB_CODE_SUCCESS;
155,973,834✔
2146
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc