• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4917

07 Jan 2026 03:52PM UTC coverage: 65.42% (+0.02%) from 65.402%
#4917

push

travis-ci

web-flow
merge: from main to 3.0 branch #34204

31 of 34 new or added lines in 2 files covered. (91.18%)

819 existing lines in 129 files now uncovered.

202679 of 309814 relevant lines covered (65.42%)

116724351.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.64
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
} SSourceDataInfo;
58

59
static void destroyExchangeOperatorInfo(void* param);
60
static void freeBlock(void* pParam);
61
static void freeSourceDataInfo(void* param);
62
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
63

64
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
65
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
66
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
67
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
68
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
69
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
70
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
71
                                 bool holdDataInBuf);
72
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
73

74
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
75

76
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
12,817,136✔
77
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
78
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
79
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
10,504,526✔
80
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
81

82

83
static void streamConcurrentlyLoadRemoteData(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
8,664,047✔
84
                                           SExecTaskInfo* pTaskInfo) {
85
  int32_t code = 0;
8,664,047✔
86
  int32_t lino = 0;
8,664,047✔
87
  int64_t startTs = taosGetTimestampUs();  
8,664,251✔
88
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
8,664,251✔
89
  int32_t completed = 0;
8,664,047✔
90
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
8,664,251✔
91
  if (code != TSDB_CODE_SUCCESS) {
8,664,047✔
92
    pTaskInfo->code = code;
×
93
    T_LONG_JMP(pTaskInfo->env, code);
×
94
  }
95
  if (completed == totalSources) {
8,664,047✔
96
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
1,628,610✔
97
    setAllSourcesCompleted(pOperator);
1,628,610✔
98
    return;
1,630,561✔
99
  }
100

101
  SSourceDataInfo* pDataInfo = NULL;
7,035,437✔
102

103
  while (1) {
4,173,370✔
104
    if (pExchangeInfo->current < 0) {
11,208,807✔
105
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
113,236✔
106
      setAllSourcesCompleted(pOperator);
113,236✔
107
      return;
113,236✔
108
    }
109
    
110
    if (pExchangeInfo->current >= totalSources) {
11,095,571✔
111
      completed = 0;
5,106,280✔
112
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
5,106,280✔
113
      if (code != TSDB_CODE_SUCCESS) {
5,106,076✔
114
        pTaskInfo->code = code;
×
115
        T_LONG_JMP(pTaskInfo->env, code);
×
116
      }
117
      if (completed == totalSources) {
5,106,076✔
118
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
3,590,258✔
119
        setAllSourcesCompleted(pOperator);
3,590,258✔
120
        return;
3,590,258✔
121
      }
122
      
123
      pExchangeInfo->current = 0;
1,515,818✔
124
    }
125

126
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
7,505,077✔
127

128
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
7,505,077✔
129
    if (!pDataInfo) {
7,505,313✔
130
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
131
      pTaskInfo->code = terrno;
×
132
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
133
    }
134

135
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
7,505,313✔
136
      pExchangeInfo->current++;
960✔
137
      continue;
960✔
138
    }
139

140
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
7,504,353✔
141

142
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
7,504,557✔
143
    if (code != TSDB_CODE_SUCCESS) {
7,504,349✔
144
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
145
      pTaskInfo->code = code;
×
146
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
147
    }
148

149
    while (true) {
1,203✔
150
      code = exchangeWait(pOperator, pExchangeInfo);
7,505,552✔
151
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
7,505,760✔
152
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,203✔
153
      }
154

155
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
7,504,557✔
156
      if (pDataInfo->seqId != currSeqId) {
7,504,557✔
157
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
1,203✔
158
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
159
        taosMemoryFreeClear(pDataInfo->pRsp);
1,203✔
160
        continue;
1,203✔
161
      }
162

163
      break;
7,503,354✔
164
    }
165

166
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
7,503,354✔
167
    if (!pSource) {
7,503,354✔
168
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
169
      pTaskInfo->code = terrno;
×
170
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
171
    }
172

173
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
7,503,354✔
UNCOV
174
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
175
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
176
             tstrerror(pDataInfo->code));
UNCOV
177
      pTaskInfo->code = pDataInfo->code;
×
UNCOV
178
      T_LONG_JMP(pTaskInfo->env, code);
×
179
    }
180

181
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
7,502,623✔
182
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
7,503,354✔
183

184
    if (pRsp->numOfRows == 0) {
7,503,354✔
185
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
4,172,410✔
186
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
187
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
188
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
189

190
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
4,172,410✔
191
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
4,172,410✔
192
        pExchangeInfo->current = -1;
113,236✔
193
      } else {
194
        pExchangeInfo->current += 1;
4,059,174✔
195
      }
196
      taosMemoryFreeClear(pDataInfo->pRsp);
4,172,410✔
197
      continue;
4,172,410✔
198
    }
199

200
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,330,944✔
201
    TAOS_CHECK_EXIT(code);
3,330,944✔
202

203
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,330,944✔
204
    if (pRsp->completed == 1) {
3,330,944✔
205
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,750,707✔
206
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
207
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
208
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
209
             pExchangeInfo->current + 1, totalSources);
210

211
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,750,707✔
212
      if (isVstbScan(pDataInfo)) {
1,750,707✔
213
        pExchangeInfo->current = -1;
×
214
        taosMemoryFreeClear(pDataInfo->pRsp);
×
215
        continue;
×
216
      }
217
    } else {
218
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
1,580,237✔
219
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
220
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
221
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
222
    }
223

224
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
3,330,944✔
225
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,330,944✔
226

227
    pExchangeInfo->current++;
3,330,944✔
228

229
    taosMemoryFreeClear(pDataInfo->pRsp);
3,330,944✔
230
    return;
3,330,944✔
231
  }
232

233
_exit:
×
234

235
  if (code) {
×
236
    pTaskInfo->code = code;
×
237
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
238
  }
239
}
240

241

242
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
219,926,910✔
243
                                           SExecTaskInfo* pTaskInfo) {
244
  int32_t code = 0;
219,926,910✔
245
  int32_t lino = 0;
219,926,910✔
246
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
219,926,910✔
247
  int32_t completed = 0;
219,925,202✔
248
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
219,925,202✔
249
  if (code != TSDB_CODE_SUCCESS) {
219,927,478✔
250
    pTaskInfo->code = code;
×
251
    T_LONG_JMP(pTaskInfo->env, code);
×
252
  }
253
  if (completed == totalSources) {
219,927,478✔
254
    setAllSourcesCompleted(pOperator);
70,645,095✔
255
    return;
70,645,942✔
256
  }
257

258
  SSourceDataInfo* pDataInfo = NULL;
149,282,383✔
259

260
  while (1) {
17,118,206✔
261
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
166,400,589✔
262
    code = exchangeWait(pOperator, pExchangeInfo);
166,400,589✔
263

264
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
166,402,472✔
265
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
11,018✔
266
    }
267

268
    for (int32_t i = 0; i < totalSources; ++i) {
243,716,111✔
269
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
243,715,343✔
270
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
243,715,346✔
271
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
243,715,346✔
272
        continue;
54,048,992✔
273
      }
274

275
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
189,667,403✔
276
        continue;
23,275,381✔
277
      }
278

279
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
166,392,022✔
280
      if (pDataInfo->seqId != currSeqId) {
166,391,738✔
281
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
282
        taosMemoryFreeClear(pDataInfo->pRsp);
×
283
        break;
×
284
      }
285

286
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
166,391,170✔
287
        code = pDataInfo->code;
2,254✔
288
        TAOS_CHECK_EXIT(code);
2,254✔
289
      }
290

291
      tmemory_barrier();
166,390,052✔
292
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
166,390,052✔
293
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
166,389,768✔
294
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
166,388,632✔
295

296
      // todo
297
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
166,388,632✔
298
      if (pRsp->numOfRows == 0) {
166,389,768✔
299
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
38,208,977✔
300
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
301
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
302
          if (code != TSDB_CODE_SUCCESS) {
×
303
            taosMemoryFreeClear(pDataInfo->pRsp);
×
304
            TAOS_CHECK_EXIT(code);
×
305
          }
306
        } else {
307
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
38,208,977✔
308
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
38,208,977✔
309
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
310
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
311
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
312
          taosMemoryFreeClear(pDataInfo->pRsp);
38,208,977✔
313
        }
314
        break;
38,208,977✔
315
      }
316

317
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
128,179,371✔
318

319
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
128,180,449✔
320
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
128,180,165✔
321
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
128,179,881✔
322

323
      if (pRsp->completed == 1) {
128,177,893✔
324
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
121,814,999✔
325
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
121,815,804✔
326
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
327
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
328
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
329
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
330
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
331
      } else {
332
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
6,364,987✔
333
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
334
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
335
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
336
      }
337

338
      taosMemoryFreeClear(pDataInfo->pRsp);
128,181,254✔
339

340
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
128,181,075✔
341
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
6,364,987✔
342
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
6,364,987✔
343
        if (code != TSDB_CODE_SUCCESS) {
6,364,987✔
344
          taosMemoryFreeClear(pDataInfo->pRsp);
×
345
          TAOS_CHECK_EXIT(code);
×
346
        }
347
      }
348
      
349
      return;
128,179,430✔
350
    }  // end loop
351

352
    int32_t complete1 = 0;
38,209,745✔
353
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
38,208,977✔
354
    if (code != TSDB_CODE_SUCCESS) {
38,208,015✔
355
      pTaskInfo->code = code;
×
356
      T_LONG_JMP(pTaskInfo->env, code);
×
357
    }
358
    if (complete1 == totalSources) {
38,208,015✔
359
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
21,090,771✔
360
      return;
21,090,771✔
361
    }
362
  }
363

364
_exit:
2,254✔
365

366
  if (code) {
2,254✔
367
    pTaskInfo->code = code;
2,254✔
368
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
2,254✔
369
  }
370
}
371

372
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
477,368,187✔
373
  int32_t        code = TSDB_CODE_SUCCESS;
477,368,187✔
374
  SExchangeInfo* pExchangeInfo = pOperator->info;
477,368,187✔
375
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
477,370,090✔
376

377
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
477,368,386✔
378

379
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
477,365,748✔
380
  if (pOperator->status == OP_EXEC_DONE) {
477,367,818✔
381
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
382
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
383
           pLoadInfo->totalElapsed / 1000.0);
384
    return NULL;
×
385
  }
386

387
  // we have buffered retrieved datablock, return it directly
388
  SSDataBlock* p = NULL;
477,366,557✔
389
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
477,367,255✔
390
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
247,077,718✔
391
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
247,077,381✔
392
  }
393

394
  if (p != NULL) {
477,367,285✔
395
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
247,077,464✔
396
    if (!tmp) {
247,077,478✔
397
      code = terrno;
×
398
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
399
      pTaskInfo->code = code;
×
400
      T_LONG_JMP(pTaskInfo->env, code);
×
401
    }
402
    return p;
247,077,478✔
403
  } else {
404
    if (pExchangeInfo->seqLoadData) {
230,289,821✔
405
      code = seqLoadRemoteData(pOperator);
1,698,476✔
406
      if (code != TSDB_CODE_SUCCESS) {
1,697,381✔
407
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
611✔
408
        pTaskInfo->code = code;
611✔
409
        T_LONG_JMP(pTaskInfo->env, code);
611✔
410
      }
411
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
228,590,410✔
412
      streamConcurrentlyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
8,664,251✔
413
    } else {
414
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
219,925,775✔
415
    }
416
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
230,277,069✔
417
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
2,822✔
418
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
2,822✔
419
    }
420
    
421
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
230,270,737✔
422
      qDebug("empty resultBlockList");
97,517,192✔
423
      return NULL;
97,517,476✔
424
    } else {
425
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
132,757,011✔
426
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
132,755,023✔
427
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
132,757,011✔
428
      if (!tmp) {
132,758,715✔
429
        code = terrno;
×
430
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
431
        pTaskInfo->code = code;
×
432
        T_LONG_JMP(pTaskInfo->env, code);
×
433
      }
434

435
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
132,758,715✔
436
      return p;
132,757,295✔
437
    }
438
}
439
}
440

441
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
475,710,841✔
442
  int32_t        code = TSDB_CODE_SUCCESS;
475,710,841✔
443
  int32_t        lino = 0;
475,710,841✔
444
  SExchangeInfo* pExchangeInfo = pOperator->info;
475,710,841✔
445
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
475,713,449✔
446

447
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
475,710,731✔
448

449
  code = pOperator->fpSet._openFn(pOperator);
475,711,216✔
450
  QUERY_CHECK_CODE(code, lino, _end);
475,708,894✔
451

452
  if (pOperator->status == OP_EXEC_DONE) {
475,708,894✔
453
    (*ppRes) = NULL;
107,673✔
454
    return code;
107,673✔
455
  }
456

457
  while (1) {
1,763,336✔
458
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
477,365,351✔
459
    if (pBlock == NULL) {
477,350,829✔
460
      (*ppRes) = NULL;
97,517,476✔
461
      return code;
97,517,476✔
462
    }
463

464
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
379,833,353✔
465
    QUERY_CHECK_CODE(code, lino, _end);
379,832,895✔
466

467
    if (blockDataGetNumOfRows(pBlock) == 0) {
379,832,895✔
468
      qDebug("rows 0 block got, continue next load");
968✔
469
      continue;
968✔
470
    }
471

472
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
379,833,761✔
473
    if (hasLimitOffsetInfo(pLimitInfo)) {
379,831,831✔
474
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
3,463,855✔
475
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,463,855✔
476
        qDebug("limit retrieve continue");
1,762,368✔
477
        continue;
1,762,368✔
478
      } else if (status == PROJECT_RETRIEVE_DONE) {
1,701,487✔
479
        if (pBlock->info.rows == 0) {
1,701,487✔
480
          setOperatorCompleted(pOperator);
×
481
          (*ppRes) = NULL;
×
482
          return code;
×
483
        } else {
484
          (*ppRes) = pBlock;
1,701,487✔
485
          return code;
1,701,487✔
486
        }
487
      }
488
    } else {
489
      (*ppRes) = pBlock;
376,366,997✔
490
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
376,366,702✔
491
      return code;
376,366,671✔
492
    }
493
  }
494

495
_end:
×
496

497
  if (code != TSDB_CODE_SUCCESS) {
×
498
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
499
    pTaskInfo->code = code;
×
500
    T_LONG_JMP(pTaskInfo->env, code);
×
501
  } else {
502
    qDebug("empty block returned in exchange");
×
503
  }
504
  
505
  (*ppRes) = NULL;
×
506
  return code;
×
507
}
508

509
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
102,886,317✔
510
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
102,886,317✔
511
  if (pInfo->pSourceDataInfo == NULL) {
102,884,634✔
512
    return terrno;
×
513
  }
514

515
  if (pInfo->dynamicOp) {
102,885,009✔
516
    return TSDB_CODE_SUCCESS;
2,377,850✔
517
  }
518

519
  int32_t len = strlen(id) + 1;
100,506,355✔
520
  pInfo->pTaskId = taosMemoryCalloc(1, len);
100,506,355✔
521
  if (!pInfo->pTaskId) {
100,509,063✔
522
    return terrno;
×
523
  }
524
  tstrncpy(pInfo->pTaskId, id, len);
100,504,336✔
525
  for (int32_t i = 0; i < numOfSources; ++i) {
262,798,389✔
526
    SSourceDataInfo dataInfo = {0};
162,287,003✔
527
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
162,283,479✔
528
    dataInfo.taskId = pInfo->pTaskId;
162,283,479✔
529
    dataInfo.index = i;
162,289,585✔
530
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
162,289,585✔
531
    if (pDs == NULL) {
162,288,088✔
532
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
533
      return terrno;
×
534
    }
535
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
162,288,088✔
536
  }
537

538
  return TSDB_CODE_SUCCESS;
100,511,386✔
539
}
540

541
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
102,889,452✔
542
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
102,889,452✔
543

544
  if (numOfSources == 0) {
102,878,180✔
545
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
546
    return TSDB_CODE_INVALID_PARA;
×
547
  }
548
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
102,878,180✔
549
  if (!pInfo->pFetchRpcHandles) {
102,885,881✔
550
    return terrno;
×
551
  }
552
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
102,888,216✔
553
  if (!ret) {
102,888,162✔
554
    return terrno;
×
555
  }
556

557
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
102,888,162✔
558
  if (pInfo->pSources == NULL) {
102,884,558✔
559
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
560
    return terrno;
×
561
  }
562

563
  if (pExNode->node.dynamicOp) {
102,883,455✔
564
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
2,377,850✔
565
    if (NULL == pInfo->pHashSources) {
2,377,850✔
566
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
567
      return terrno;
×
568
    }
569
  }
570

571
  for (int32_t i = 0; i < numOfSources; ++i) {
269,928,928✔
572
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
167,041,116✔
573
    if (!pNode) {
167,035,269✔
574
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
575
      return terrno;
×
576
    }
577
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
167,035,269✔
578
    if (!tmp) {
167,045,344✔
579
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
580
      return terrno;
×
581
    }
582
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
167,045,344✔
583
    int32_t           code =
584
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
167,046,178✔
585
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
167,037,079✔
586
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
587
      return code;
×
588
    }
589
  }
590

591
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
102,887,812✔
592
  int64_t refId = taosAddRef(exchangeObjRefPool, pInfo);
102,881,691✔
593
  if (refId < 0) {
102,881,494✔
594
    int32_t code = terrno;
×
595
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
596
    return code;
×
597
  } else {
598
    pInfo->self = refId;
102,881,494✔
599
  }
600

601
  return initDataSource(numOfSources, pInfo, id);
102,882,187✔
602
}
603

604
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
8,159,184✔
605
  SExchangeInfo* pInfo = pOper->info;
8,159,184✔
606
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
8,160,217✔
607

608
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
8,159,184✔
609

610
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
8,160,218✔
611
  pOper->status = OP_NOT_OPENED;
8,160,217✔
612
  pInfo->current = 0;
8,160,217✔
613
  pInfo->loadInfo.totalElapsed = 0;
8,160,217✔
614
  pInfo->loadInfo.totalRows = 0;
8,160,217✔
615
  pInfo->loadInfo.totalSize = 0;
8,160,217✔
616
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
22,326,712✔
617
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
14,166,495✔
618
    taosWLockLatch(&pDataInfo->lock);
14,166,495✔
619
    taosMemoryFreeClear(pDataInfo->decompBuf);
14,166,495✔
620
    taosMemoryFreeClear(pDataInfo->pRsp);
14,166,495✔
621

622
    pDataInfo->totalRows = 0;
14,166,495✔
623
    pDataInfo->code = 0;
14,166,495✔
624
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
14,166,495✔
625
    pDataInfo->fetchSent = false;
14,166,495✔
626
    taosWUnLockLatch(&pDataInfo->lock);
14,166,495✔
627
  }
628

629
  if (pInfo->dynamicOp) {
8,160,217✔
630
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,318,130✔
631
  } 
632

633
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
8,160,217✔
634
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
8,160,217✔
635

636
  blockDataCleanup(pInfo->pDummyBlock);
8,160,217✔
637

638
  void   *data = NULL;
8,160,217✔
639
  int32_t iter = 0;
8,160,217✔
640
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
10,720,391✔
641
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
2,560,574✔
642
  }
643
  
644
  pInfo->limitInfo = (SLimitInfo){0};
8,159,817✔
645
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
8,159,817✔
646

647
  return 0;
8,160,017✔
648
}
649

650
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
102,888,884✔
651
                                   SOperatorInfo** pOptrInfo) {
652
  QRY_PARAM_CHECK(pOptrInfo);
102,888,884✔
653

654
  int32_t        code = 0;
102,890,588✔
655
  int32_t        lino = 0;
102,890,588✔
656
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
102,890,588✔
657
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
102,876,950✔
658
  if (pInfo == NULL || pOperator == NULL) {
102,876,073✔
659
    code = terrno;
×
660
    goto _error;
×
661
  }
662

663
  pOperator->pPhyNode = pExNode;
102,876,641✔
664
  pInfo->dynamicOp = pExNode->node.dynamicOp;
102,876,925✔
665
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
102,885,459✔
666
  QUERY_CHECK_CODE(code, lino, _error);
102,887,927✔
667

668
  code = tsem_init(&pInfo->ready, 0, 0);
102,887,927✔
669
  QUERY_CHECK_CODE(code, lino, _error);
102,883,001✔
670

671
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
102,883,001✔
672
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
102,886,165✔
673

674
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
102,882,762✔
675
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
102,881,422✔
676
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
102,884,235✔
677
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
102,884,257✔
678

679
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
102,886,640✔
680
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
102,885,788✔
681
  QUERY_CHECK_CODE(code, lino, _error);
102,889,578✔
682

683
  pInfo->seqLoadData = pExNode->seqRecvData;
102,889,578✔
684
  pInfo->dynTbname = pExNode->dynTbname;
102,888,495✔
685
  if (pInfo->dynTbname) {
102,886,117✔
686
    pInfo->seqLoadData = true;
9,498✔
687
  }
688
  pInfo->pTransporter = pTransporter;
102,887,306✔
689

690
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
102,887,590✔
691
                  pTaskInfo);
692
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
102,881,568✔
693

694
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
102,885,544✔
695
                            pTaskInfo->pStreamRuntimeInfo);
102,886,401✔
696
  QUERY_CHECK_CODE(code, lino, _error);
102,884,931✔
697
  qTrace("%s exchange op:%p", __func__, pOperator);
102,884,931✔
698
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
102,884,931✔
699
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
700
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
102,883,261✔
701
  *pOptrInfo = pOperator;
102,881,664✔
702
  return TSDB_CODE_SUCCESS;
102,882,747✔
703

704
_error:
×
705
  if (code != TSDB_CODE_SUCCESS) {
×
706
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
707
    pTaskInfo->code = code;
×
708
  }
709
  if (pInfo != NULL) {
×
710
    doDestroyExchangeOperatorInfo(pInfo);
×
711
  }
712

713
  if (pOperator != NULL) {
×
714
    pOperator->info = NULL;
×
715
    destroyOperator(pOperator);
×
716
  }
717
  return code;
×
718
}
719

720
void destroyExchangeOperatorInfo(void* param) {
102,891,845✔
721
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
102,891,845✔
722
  int32_t        code = taosRemoveRef(exchangeObjRefPool, pExInfo->self);
102,891,845✔
723
  if (code != TSDB_CODE_SUCCESS) {
102,890,594✔
724
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
725
  }
726
}
102,890,594✔
727

728
void freeBlock(void* pParam) {
225,489,401✔
729
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
225,489,401✔
730
  blockDataDestroy(pBlock);
225,489,685✔
731
}
225,491,695✔
732

733
void freeSourceDataInfo(void* p) {
162,653,713✔
734
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
162,653,713✔
735
  taosMemoryFreeClear(pInfo->decompBuf);
162,653,713✔
736
  taosMemoryFreeClear(pInfo->pRsp);
162,653,429✔
737

738
  pInfo->decompBufSize = 0;
162,654,180✔
739
}
162,653,612✔
740

741
void doDestroyExchangeOperatorInfo(void* param) {
102,891,845✔
742
  if (param == NULL) {
102,891,845✔
743
    return;
×
744
  }
745
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
102,891,845✔
746
  if (pExInfo->pFetchRpcHandles) {
102,891,845✔
747
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
269,943,215✔
748
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
167,051,654✔
749
      if (*pRpcHandle > 0) {
167,051,938✔
750
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
8,752,677✔
751
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
8,752,677✔
752
      }
753
    }
754
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
102,890,530✔
755
  }
756

757
  taosArrayDestroy(pExInfo->pSources);
102,891,098✔
758
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
102,891,561✔
759

760
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
102,891,277✔
761
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
102,890,509✔
762

763
  blockDataDestroy(pExInfo->pDummyBlock);
102,892,413✔
764
  tSimpleHashCleanup(pExInfo->pHashSources);
102,891,077✔
765

766
  int32_t code = tsem_destroy(&pExInfo->ready);
102,891,361✔
767
  if (code != TSDB_CODE_SUCCESS) {
102,891,361✔
768
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
769
  }
770
  taosMemoryFreeClear(pExInfo->pTaskId);
102,891,361✔
771

772
  taosMemoryFreeClear(param);
102,888,321✔
773
}
774

775
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
175,680,463✔
776
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
175,680,463✔
777

778
  taosMemoryFreeClear(pMsg->pEpSet);
175,680,463✔
779
  SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
175,731,562✔
780
  if (pExchangeInfo == NULL) {
175,748,610✔
781
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
8,289✔
782
    taosMemoryFree(pMsg->pData);
8,289✔
783
    return TSDB_CODE_SUCCESS;
8,289✔
784
  }
785

786
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
175,740,321✔
787
  if (pWrapper->seqId != currSeqId) {
175,760,175✔
788
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
789
    taosMemoryFree(pMsg->pData);
×
790
    code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
×
791
    if (code != TSDB_CODE_SUCCESS) {
×
792
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
793
    }
794
    return TSDB_CODE_SUCCESS;
×
795
  }
796

797
  int32_t          index = pWrapper->sourceIndex;
175,751,740✔
798

799
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
175,729,758✔
800

801
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
175,732,256✔
802
  if (pRpcHandle != NULL) {
175,726,693✔
803
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
175,683,171✔
804
    if (ret != 0) {
175,675,293✔
805
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
9,628,866✔
806
    }
807
    *pRpcHandle = -1;
175,675,293✔
808
  }
809

810
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
175,707,145✔
811
  if (!pSourceDataInfo) {
175,739,022✔
812
    return terrno;
×
813
  }
814

815
  if (0 == code && NULL == pMsg->pData) {
175,739,022✔
816
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
817
    code = TSDB_CODE_QRY_INVALID_MSG;
×
818
  }
819

820
  taosWLockLatch(&pSourceDataInfo->lock);
175,779,408✔
821
  if (code == TSDB_CODE_SUCCESS) {
175,766,292✔
822
    pSourceDataInfo->seqId = pWrapper->seqId;
175,758,857✔
823
    pSourceDataInfo->pRsp = pMsg->pData;
175,743,121✔
824

825
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
175,587,881✔
826
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
175,656,901✔
827
    pRsp->compLen = htonl(pRsp->compLen);
175,686,830✔
828
    pRsp->payloadLen = htonl(pRsp->payloadLen);
175,704,628✔
829
    pRsp->numOfCols = htonl(pRsp->numOfCols);
175,550,848✔
830
    pRsp->useconds = htobe64(pRsp->useconds);
175,685,894✔
831
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
175,467,677✔
832

833
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
175,606,258✔
834
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
835
  } else {
836
    taosMemoryFree(pMsg->pData);
7,435✔
837
    pSourceDataInfo->code = rpcCvtErrCode(code);
7,435✔
838
    if (pSourceDataInfo->code != code) {
7,435✔
839
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
840
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
841
    } else {
842
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
7,435✔
843
             pExchangeInfo);
844
    }
845
  }
846

847
  tmemory_barrier();
175,613,498✔
848
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
175,613,498✔
849
  taosWUnLockLatch(&pSourceDataInfo->lock);
175,698,144✔
850
  
851
  code = tsem_post(&pExchangeInfo->ready);
175,560,483✔
852
  if (code != TSDB_CODE_SUCCESS) {
175,757,670✔
853
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
854
    return code;
×
855
  }
856

857
  code = taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
175,757,670✔
858
  if (code != TSDB_CODE_SUCCESS) {
175,765,217✔
859
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
860
  }
861
  return code;
175,766,360✔
862
}
863

864
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
175,546✔
865
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
175,546✔
866
  if (NULL == *ppRes) {
175,546✔
867
    return terrno;
×
868
  }
869

870
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
175,546✔
871
  if (NULL == pScan) {
175,546✔
872
    taosMemoryFreeClear(*ppRes);
×
873
    return terrno;
×
874
  }
875

876
  pScan->pUidList = taosArrayDup(pUidList, NULL);
175,546✔
877
  if (NULL == pScan->pUidList) {
175,546✔
878
    taosMemoryFree(pScan);
×
879
    taosMemoryFreeClear(*ppRes);
×
880
    return terrno;
×
881
  }
882
  pScan->type = DYN_TYPE_STB_JOIN;
175,546✔
883
  pScan->tableSeq = tableSeq;
175,546✔
884
  pScan->pOrgTbInfo = NULL;
175,546✔
885
  pScan->pBatchTbInfo = NULL;
175,546✔
886
  pScan->pTagList = NULL;
175,546✔
887
  pScan->isNewParam = false;
175,546✔
888
  pScan->window.skey = INT64_MAX;
175,546✔
889
  pScan->window.ekey = INT64_MIN;
175,546✔
890

891
  (*ppRes)->opType = srcOpType;
175,546✔
892
  (*ppRes)->downstreamIdx = 0;
175,546✔
893
  (*ppRes)->value = pScan;
175,546✔
894
  (*ppRes)->pChildren = NULL;
175,546✔
895
  (*ppRes)->reUse = false;
175,546✔
896

897
  return TSDB_CODE_SUCCESS;
175,546✔
898
}
899

900
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam) {
1,490,250✔
901
  int32_t                  code = TSDB_CODE_SUCCESS;
1,490,250✔
902
  int32_t                  lino = 0;
1,490,250✔
903
  STableScanOperatorParam* pScan = NULL;
1,490,250✔
904

905
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,490,250✔
906
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
1,490,250✔
907

908
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
1,490,250✔
909
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
1,490,250✔
910

911
  if (pUidList) {
1,490,250✔
912
    pScan->pUidList = taosArrayDup(pUidList, NULL);
1,490,250✔
913
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
1,490,250✔
914
  } else {
915
    pScan->pUidList = NULL;
×
916
  }
917

918
  if (pMap) {
1,490,250✔
919
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
1,490,250✔
920
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
1,490,250✔
921

922
    pScan->pOrgTbInfo->vgId = pMap->vgId;
1,490,250✔
923
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
1,490,250✔
924

925
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
1,490,250✔
926
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
1,490,250✔
927
  } else {
928
    pScan->pOrgTbInfo = NULL;
×
929
  }
930
  pScan->pTagList = NULL;
1,490,250✔
931
  pScan->pBatchTbInfo = NULL;
1,490,250✔
932

933

934
  pScan->type = DYN_TYPE_VSTB_SINGLE_SCAN;
1,490,250✔
935
  pScan->tableSeq = tableSeq;
1,490,250✔
936
  pScan->window.skey = window->skey;
1,490,250✔
937
  pScan->window.ekey = window->ekey;
1,490,250✔
938
  pScan->isNewParam = isNewParam;
1,490,250✔
939
  (*ppRes)->opType = srcOpType;
1,490,250✔
940
  (*ppRes)->downstreamIdx = 0;
1,490,250✔
941
  (*ppRes)->value = pScan;
1,490,250✔
942
  (*ppRes)->pChildren = NULL;
1,490,250✔
943
  (*ppRes)->reUse = false;
1,490,250✔
944

945
  return code;
1,490,250✔
946
_return:
×
947
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
948
  taosMemoryFreeClear(*ppRes);
×
949
  if (pScan) {
×
950
    taosArrayDestroy(pScan->pUidList);
×
951
    if (pScan->pOrgTbInfo) {
×
952
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
953
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
954
    }
955
    taosMemoryFree(pScan);
×
956
  }
957
  return code;
×
958
}
959

960
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
×
961
  int32_t                  code = TSDB_CODE_SUCCESS;
×
962
  int32_t                  lino = 0;
×
963
  STableScanOperatorParam* pScan = NULL;
×
964

965
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
966
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
967

968
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
×
969
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
×
970

971
  pScan->groupid = groupid;
×
972
  if (pUidList) {
×
973
    pScan->pUidList = taosArrayDup(pUidList, NULL);
×
974
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
×
975
  } else {
976
    pScan->pUidList = NULL;
×
977
  }
978
  pScan->pOrgTbInfo = NULL;
×
979

980
  if (pBatchMap) {
×
981
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
982
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
×
983
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
×
984
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
×
985
      SOrgTbInfo batchInfo = {0};
×
986
      batchInfo.vgId = pSrcInfo->vgId;
×
987
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
×
988
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
×
989
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
×
990
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
×
991
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
×
992
    }
993
  } else {
994
    pScan->pBatchTbInfo = NULL;
×
995
  }
996

997
  if (pTagList) {
×
998
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
×
999
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
×
1000

1001
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
×
1002
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
×
1003
      STagVal  dstTag;
×
1004
      dstTag.type = pSrcTag->type;
×
1005
      dstTag.cid = pSrcTag->cid;
×
1006
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
×
1007
        dstTag.nData = pSrcTag->nData;
×
1008
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
×
1009
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
×
1010
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
×
1011
      } else {
1012
        dstTag.i64 = pSrcTag->i64;
×
1013
      }
1014

1015
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
×
1016
    }
1017
  } else {
1018
    pScan->pTagList = NULL;
×
1019
  }
1020

1021

1022
  pScan->type = DYN_TYPE_VSTB_BATCH_SCAN;
×
1023
  pScan->tableSeq = tableSeq;
×
1024
  pScan->window.skey = window->skey;
×
1025
  pScan->window.ekey = window->ekey;
×
1026
  pScan->isNewParam = isNewParam;
×
1027
  (*ppRes)->opType = srcOpType;
×
1028
  (*ppRes)->downstreamIdx = 0;
×
1029
  (*ppRes)->value = pScan;
×
1030
  (*ppRes)->pChildren = NULL;
×
1031
  (*ppRes)->reUse = false;
×
1032

1033
  return code;
×
1034
_return:
×
1035
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1036
  taosMemoryFreeClear(*ppRes);
×
1037
  if (pScan) {
×
1038
    taosArrayDestroy(pScan->pUidList);
×
1039
    if (pScan->pBatchTbInfo) {
×
1040
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1041
    }
1042
    taosMemoryFree(pScan);
×
1043
  }
1044
  return code;
×
1045
}
1046

1047
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
×
1048
  int32_t                  code = TSDB_CODE_SUCCESS;
×
1049
  int32_t                  lino = 0;
×
1050

1051
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
1052
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
1053

1054
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
1055
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
×
1056

1057
  SOperatorParam* pTableScanParam = NULL;
×
1058
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
×
1059
  QUERY_CHECK_CODE(code, lino, _return);
×
1060

1061
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pTableScanParam), code, lino, _return, terrno);
×
1062

1063
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
×
1064
  (*ppRes)->downstreamIdx = 0;
×
1065
  (*ppRes)->value = NULL;
×
1066
  (*ppRes)->reUse = false;
×
1067

1068
_return:
×
1069
  return code;
×
1070
}
1071

1072
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
78,725✔
1073
  int32_t                  code = TSDB_CODE_SUCCESS;
78,725✔
1074
  int32_t                  lino = 0;
78,725✔
1075
  STagScanOperatorParam*   pScan = NULL;
78,725✔
1076

1077
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
78,725✔
1078
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
78,725✔
1079

1080
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
78,725✔
1081
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
78,725✔
1082
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
78,725✔
1083

1084
  (*ppRes)->opType = srcOpType;
78,725✔
1085
  (*ppRes)->downstreamIdx = 0;
78,725✔
1086
  (*ppRes)->value = pScan;
78,725✔
1087
  (*ppRes)->pChildren = NULL;
78,725✔
1088
  (*ppRes)->reUse = false;
78,725✔
1089

1090
  return code;
78,725✔
1091
_return:
×
1092
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1093
  taosMemoryFreeClear(*ppRes);
×
1094
  if (pScan) {
×
1095
    taosMemoryFree(pScan);
×
1096
  }
1097
  return code;
×
1098
}
1099

1100
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,204,353✔
1101
  if (!pRuntimeInfo || !pTimeRange) {
4,204,353✔
1102
    return TSDB_CODE_INTERNAL_ERROR;
×
1103
  }
1104

1105
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,204,353✔
1106
  if (!pParam) {
4,204,353✔
1107
    return TSDB_CODE_INTERNAL_ERROR;
×
1108
  }
1109

1110
  switch (pRuntimeInfo->triggerType) {
4,204,353✔
1111
    case STREAM_TRIGGER_SLIDING:
3,295,194✔
1112
      // Unable to distinguish whether there is an interval, all use wstart/wend
1113
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1114
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,295,194✔
1115
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,295,194✔
1116
      break;
3,295,194✔
1117
    case STREAM_TRIGGER_PERIOD:
192,760✔
1118
      pTimeRange->skey = pParam->prevLocalTime;
192,760✔
1119
      pTimeRange->ekey = pParam->triggerTime;
192,760✔
1120
      break;
192,760✔
1121
    default:
716,399✔
1122
      pTimeRange->skey = pParam->wstart;
716,399✔
1123
      pTimeRange->ekey = pParam->wend;
716,399✔
1124
      break;
716,399✔
1125
  }
1126

1127
  return TSDB_CODE_SUCCESS;
4,204,353✔
1128
}
1129

1130
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
175,796,963✔
1131
  int32_t          code = TSDB_CODE_SUCCESS;
175,796,963✔
1132
  int32_t          lino = 0;
175,796,963✔
1133
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
175,796,963✔
1134
  if (!pDataInfo) {
175,786,797✔
1135
    return terrno;
×
1136
  }
1137

1138
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
175,786,797✔
1139
    return TSDB_CODE_SUCCESS;
×
1140
  }
1141

1142
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
175,795,742✔
1143
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
175,796,079✔
1144
  if (!pSource) {
175,790,874✔
1145
    return terrno;
×
1146
  }
1147

1148
  pDataInfo->startTime = taosGetTimestampUs();
175,792,676✔
1149
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
175,790,773✔
1150

1151
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
175,793,812✔
1152
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
175,788,895✔
1153
  pWrapper->exchangeId = pExchangeInfo->self;
175,788,895✔
1154
  pWrapper->sourceIndex = sourceIndex;
175,790,892✔
1155
  pWrapper->seqId = pExchangeInfo->seqId;
175,787,185✔
1156

1157
  if (pSource->localExec) {
175,795,019✔
1158
    SDataBuf pBuf = {0};
×
1159
    int32_t  code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
×
1160
                                               pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
1161
                                               pTaskInfo->localFetch.explainRes);
1162
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1163
    taosMemoryFree(pWrapper);
×
1164
    QUERY_CHECK_CODE(code, lino, _end);
×
1165
  } else {
1166
    bool needStreamPesudoFuncVals = true;
175,782,788✔
1167
    SResFetchReq req = {0};
175,782,788✔
1168
    req.header.vgId = pSource->addr.nodeId;
175,795,762✔
1169
    req.sId = pSource->sId;
175,798,218✔
1170
    req.clientId = pSource->clientId;
175,787,916✔
1171
    req.taskId = pSource->taskId;
175,788,235✔
1172
    req.queryId = pTaskInfo->id.queryId;
175,787,382✔
1173
    req.execId = pSource->execId;
175,796,948✔
1174
    if (pTaskInfo->pStreamRuntimeInfo) {
175,795,516✔
1175
      req.dynTbname = pExchangeInfo->dynTbname;
7,573,389✔
1176
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
7,574,120✔
1177
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
7,574,485✔
1178

1179
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
7,574,120✔
1180
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
21,520✔
1181
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
7,552,965✔
1182
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,204,353✔
1183
        QUERY_CHECK_CODE(code, lino, _end);
4,204,353✔
1184
        needStreamPesudoFuncVals = false;
4,204,353✔
1185
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,204,353✔
1186
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1187
               req.pStRtFuncInfo->curWindow.ekey);
1188
      }
1189
      if (!pDataInfo->fetchSent) {
7,574,485✔
1190
        req.reset = pDataInfo->fetchSent = true;
6,055,583✔
1191
      }
1192
    }
1193

1194
    switch (pDataInfo->type) {
175,785,075✔
1195
      case EX_SRC_TYPE_VSTB_SCAN: {
1,490,250✔
1196
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
1,490,250✔
1197
        taosArrayDestroy(pDataInfo->orgTbInfo->colMap);
1,490,250✔
1198
        taosMemoryFreeClear(pDataInfo->orgTbInfo);
1,490,250✔
1199
        taosArrayDestroy(pDataInfo->pSrcUidList);
1,490,250✔
1200
        pDataInfo->pSrcUidList = NULL;
1,490,250✔
1201
        if (TSDB_CODE_SUCCESS != code) {
1,490,250✔
1202
          pTaskInfo->code = code;
×
1203
          taosMemoryFree(pWrapper);
×
1204
          return pTaskInfo->code;
×
1205
        }
1206
        break;
1,490,250✔
1207
      }
1208
      case EX_SRC_TYPE_VSTB_WIN_SCAN: {
×
1209
        if (pDataInfo->pSrcUidList) {
×
1210
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false);
×
1211
          taosArrayDestroy(pDataInfo->pSrcUidList);
×
1212
          pDataInfo->pSrcUidList = NULL;
×
1213
          if (TSDB_CODE_SUCCESS != code) {
×
1214
            pTaskInfo->code = code;
×
1215
            taosMemoryFree(pWrapper);
×
1216
            return pTaskInfo->code;
×
1217
          }
1218
        }
1219
        break;
×
1220
      }
1221
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
78,725✔
1222
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
78,725✔
1223
        taosArrayDestroy(pDataInfo->pSrcUidList);
78,725✔
1224
        pDataInfo->pSrcUidList = NULL;
78,725✔
1225
        if (TSDB_CODE_SUCCESS != code) {
78,725✔
1226
          pTaskInfo->code = code;
×
1227
          taosMemoryFree(pWrapper);
×
1228
          return pTaskInfo->code;
×
1229
        }
1230
        break;
78,725✔
1231
      }
1232
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
×
1233
        if (pDataInfo->batchOrgTbInfo) {
×
1234
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
×
1235
          if (pDataInfo->batchOrgTbInfo) {
×
1236
            for (int32_t i = 0; i < taosArrayGetSize(pDataInfo->batchOrgTbInfo); ++i) {
×
1237
              SOrgTbInfo* pColMap = taosArrayGet(pDataInfo->batchOrgTbInfo, i);
×
1238
              if (pColMap) {
×
1239
                taosArrayDestroy(pColMap->colMap);
×
1240
              }
1241
            }
1242
            taosArrayDestroy(pDataInfo->batchOrgTbInfo);
×
1243
            pDataInfo->batchOrgTbInfo = NULL;
×
1244
          }
1245
          if (pDataInfo->tagList) {
×
1246
            taosArrayDestroyEx(pDataInfo->tagList, destroyTagVal);
×
1247
            pDataInfo->tagList = NULL;
×
1248
          }
1249
          if (pDataInfo->pSrcUidList) {
×
1250
            taosArrayDestroy(pDataInfo->pSrcUidList);
×
1251
            pDataInfo->pSrcUidList = NULL;
×
1252
          }
1253

1254
          if (TSDB_CODE_SUCCESS != code) {
×
1255
            pTaskInfo->code = code;
×
1256
            taosMemoryFree(pWrapper);
×
1257
            return pTaskInfo->code;
×
1258
          }
1259
        }
1260
        break;
×
1261
      }
1262
      case EX_SRC_TYPE_STB_JOIN_SCAN:
174,216,279✔
1263
      default: {
1264
        if (pDataInfo->pSrcUidList) {
174,216,279✔
1265
          code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
168,413✔
1266
          taosArrayDestroy(pDataInfo->pSrcUidList);
168,413✔
1267
          pDataInfo->pSrcUidList = NULL;
168,413✔
1268
          if (TSDB_CODE_SUCCESS != code) {
170,229✔
1269
            pTaskInfo->code = code;
×
1270
            taosMemoryFree(pWrapper);
×
1271
            return pTaskInfo->code;
×
1272
          }
1273
        }
1274
        break;
174,226,384✔
1275
      }
1276
    }
1277

1278
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
175,795,359✔
1279
    if (msgSize < 0) {
175,789,466✔
1280
      pTaskInfo->code = msgSize;
×
1281
      taosMemoryFree(pWrapper);
×
1282
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1283
      return pTaskInfo->code;
×
1284
    }
1285

1286
    void* msg = taosMemoryCalloc(1, msgSize);
175,789,466✔
1287
    if (NULL == msg) {
175,783,294✔
1288
      pTaskInfo->code = terrno;
×
1289
      taosMemoryFree(pWrapper);
×
1290
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1291
      return pTaskInfo->code;
×
1292
    }
1293

1294
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
175,783,294✔
1295
    if (msgSize < 0) {
175,792,590✔
1296
      pTaskInfo->code = msgSize;
×
1297
      taosMemoryFree(pWrapper);
×
1298
      taosMemoryFree(msg);
×
1299
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
1300
      return pTaskInfo->code;
×
1301
    }
1302

1303
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
175,792,590✔
1304

1305
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
175,794,653✔
1306
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1307
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1308
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1309

1310
    // send the fetch remote task result reques
1311
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
175,797,167✔
1312
    if (NULL == pMsgSendInfo) {
175,783,599✔
1313
      taosMemoryFreeClear(msg);
×
1314
      taosMemoryFree(pWrapper);
×
1315
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1316
      pTaskInfo->code = terrno;
×
1317
      return pTaskInfo->code;
×
1318
    }
1319

1320
    pMsgSendInfo->param = pWrapper;
175,783,599✔
1321
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
175,795,041✔
1322
    pMsgSendInfo->msgInfo.pData = msg;
175,794,091✔
1323
    pMsgSendInfo->msgInfo.len = msgSize;
175,790,834✔
1324
    pMsgSendInfo->msgType = pSource->fetchMsgType;
175,797,451✔
1325
    pMsgSendInfo->fp = loadRemoteDataCallback;
175,786,996✔
1326
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
175,788,618✔
1327

1328
    int64_t transporterId = 0;
175,798,786✔
1329
    void* poolHandle = NULL;
175,797,419✔
1330
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
175,797,419✔
1331
    QUERY_CHECK_CODE(code, lino, _end);
175,799,123✔
1332
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
175,799,123✔
1333
    *pRpcHandle = transporterId;
175,798,407✔
1334
  }
1335

1336
_end:
175,799,405✔
1337
  if (code != TSDB_CODE_SUCCESS) {
175,799,405✔
1338
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1339
  }
1340
  return code;
175,799,490✔
1341
}
1342

1343
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
137,038,052✔
1344
                          SOperatorInfo* pOperator) {
1345
  pInfo->totalRows += numOfRows;
137,038,052✔
1346
  pInfo->totalSize += dataLen;
137,037,200✔
1347
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
137,036,411✔
1348
  pOperator->resultInfo.totalRows += numOfRows;
137,036,348✔
1349
}
137,037,426✔
1350

1351
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart) {
390,666,427✔
1352
  int32_t      code = TSDB_CODE_SUCCESS;
390,666,427✔
1353
  int32_t      lino = 0;
390,666,427✔
1354
  SSDataBlock* pBlock = NULL;
390,666,427✔
1355
  if (pColList == NULL) {  // data from other sources
390,666,910✔
1356
    blockDataCleanup(pRes);
386,385,611✔
1357
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
386,384,031✔
1358
    if (code) {
386,384,539✔
1359
      return code;
×
1360
    }
1361
  } else {  // extract data according to pColList
1362
    char* pStart = pData;
4,281,299✔
1363

1364
    int32_t numOfCols = htonl(*(int32_t*)pStart);
4,281,299✔
1365
    pStart += sizeof(int32_t);
4,279,963✔
1366

1367
    // todo refactor:extract method
1368
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
4,279,963✔
1369
    for (int32_t i = 0; i < numOfCols; ++i) {
59,345,081✔
1370
      SSysTableSchema* p = (SSysTableSchema*)pStart;
55,065,118✔
1371

1372
      p->colId = htons(p->colId);
55,065,118✔
1373
      p->bytes = htonl(p->bytes);
55,065,118✔
1374
      pStart += sizeof(SSysTableSchema);
55,065,118✔
1375
    }
1376

1377
    pBlock = NULL;
4,279,963✔
1378
    code = createDataBlock(&pBlock);
4,279,963✔
1379
    QUERY_CHECK_CODE(code, lino, _end);
4,279,963✔
1380

1381
    for (int32_t i = 0; i < numOfCols; ++i) {
59,345,081✔
1382
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
55,065,118✔
1383
      code = blockDataAppendColInfo(pBlock, &idata);
55,065,118✔
1384
      QUERY_CHECK_CODE(code, lino, _end);
55,065,118✔
1385
    }
1386

1387
    code = blockDecodeInternal(pBlock, pStart, NULL);
4,279,963✔
1388
    QUERY_CHECK_CODE(code, lino, _end);
4,279,963✔
1389

1390
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
4,279,963✔
1391
    QUERY_CHECK_CODE(code, lino, _end);
4,279,963✔
1392

1393
    // data from mnode
1394
    pRes->info.dataLoad = 1;
4,279,963✔
1395
    pRes->info.rows = pBlock->info.rows;
4,279,963✔
1396
    pRes->info.scanFlag = MAIN_SCAN;
4,279,963✔
1397
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
4,279,963✔
1398
    QUERY_CHECK_CODE(code, lino, _end);
4,279,963✔
1399

1400
    blockDataDestroy(pBlock);
4,279,963✔
1401
    pBlock = NULL;
4,279,963✔
1402
  }
1403

1404
_end:
390,664,502✔
1405
  if (code != TSDB_CODE_SUCCESS) {
390,665,772✔
1406
    blockDataDestroy(pBlock);
×
1407
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1408
  }
1409
  return code;
390,665,975✔
1410
}
1411

1412
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
76,426,989✔
1413
  SExchangeInfo* pExchangeInfo = pOperator->info;
76,426,989✔
1414
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
76,426,989✔
1415

1416
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
76,426,705✔
1417
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
76,426,989✔
1418
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
76,426,705✔
1419
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1420
         pLoadInfo->totalElapsed / 1000.0);
1421

1422
  setOperatorCompleted(pOperator);
76,426,705✔
1423
}
76,426,705✔
1424

1425
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
271,905,485✔
1426
  int32_t code = TSDB_CODE_SUCCESS;
271,905,485✔
1427
  int32_t lino = 0;
271,905,485✔
1428
  size_t  total = taosArrayGetSize(pArray);
271,905,485✔
1429

1430
  int32_t completed = 0;
271,906,928✔
1431
  for (int32_t k = 0; k < total; ++k) {
761,636,431✔
1432
    SSourceDataInfo* p = taosArrayGet(pArray, k);
489,733,091✔
1433
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
489,733,946✔
1434
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
489,733,946✔
1435
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
236,320,531✔
1436
      completed += 1;
236,320,294✔
1437
    }
1438
  }
1439

1440
  *pRes = completed;
271,903,340✔
1441
_end:
271,904,043✔
1442
  if (code != TSDB_CODE_SUCCESS) {
271,904,043✔
1443
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1444
  }
1445
  return code;
271,905,132✔
1446
}
1447

1448
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
98,703,533✔
1449
  SExchangeInfo* pExchangeInfo = pOperator->info;
98,703,533✔
1450
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
98,703,302✔
1451

1452
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
98,703,501✔
1453
  int64_t startTs = taosGetTimestampUs();
98,694,928✔
1454

1455
  // Asynchronously send all fetch requests to all sources.
1456
  for (int32_t i = 0; i < totalSources; ++i) {
258,952,159✔
1457
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
160,253,319✔
1458
    if (code != TSDB_CODE_SUCCESS) {
160,257,799✔
1459
      pTaskInfo->code = code;
568✔
1460
      return code;
×
1461
    }
1462
  }
1463

1464
  int64_t endTs = taosGetTimestampUs();
98,706,909✔
1465
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
98,706,909✔
1466
         totalSources, (endTs - startTs) / 1000.0);
1467

1468
  pOperator->status = OP_RES_TO_RETURN;
98,706,909✔
1469
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
98,707,392✔
1470
  if (isTaskKilled(pTaskInfo)) {
98,706,909✔
1471
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
2,272✔
1472
  }
1473

1474
  return TSDB_CODE_SUCCESS;
98,704,170✔
1475
}
1476

1477
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
132,757,874✔
1478
  int32_t            code = TSDB_CODE_SUCCESS;
132,757,874✔
1479
  int32_t            lino = 0;
132,757,874✔
1480
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
132,757,874✔
1481
  SSDataBlock*       pb = NULL;
132,758,431✔
1482

1483
  char* pNextStart = pRetrieveRsp->data;
132,758,147✔
1484
  char* pStart = pNextStart;
132,758,715✔
1485

1486
  int32_t index = 0;
132,758,715✔
1487

1488
  if (pRetrieveRsp->compressed) {  // decompress the data
132,758,715✔
1489
    if (pDataInfo->decompBuf == NULL) {
×
1490
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1491
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1492
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1493
    } else {
1494
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1495
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1496
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1497
        if (p != NULL) {
×
1498
          pDataInfo->decompBuf = p;
×
1499
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1500
        }
1501
      }
1502
    }
1503
  }
1504

1505
  while (index++ < pRetrieveRsp->numOfBlocks) {
519,143,571✔
1506
    pStart = pNextStart;
386,386,741✔
1507

1508
    if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
386,386,741✔
1509
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
160,895,311✔
1510
      blockDataCleanup(pb);
160,895,311✔
1511
    } else {
1512
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
225,491,830✔
1513
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
225,490,619✔
1514
    }
1515

1516
    int32_t compLen = *(int32_t*)pStart;
386,385,930✔
1517
    pStart += sizeof(int32_t);
386,386,498✔
1518

1519
    int32_t rawLen = *(int32_t*)pStart;
386,386,296✔
1520
    pStart += sizeof(int32_t);
386,386,296✔
1521
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
386,386,580✔
1522

1523
    pNextStart = pStart + compLen;
386,386,580✔
1524
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
386,384,592✔
1525
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1526
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1527
      pStart = pDataInfo->decompBuf;
×
1528
    }
1529

1530
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
386,384,057✔
1531
    if (code != 0) {
386,385,670✔
1532
      taosMemoryFreeClear(pDataInfo->pRsp);
×
1533
      goto _end;
×
1534
    }
1535

1536
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
386,385,670✔
1537
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
386,384,480✔
1538
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
386,384,480✔
1539
    pb = NULL;
386,386,143✔
1540
  }
1541

1542
_end:
132,757,863✔
1543
  if (code != TSDB_CODE_SUCCESS) {
132,757,863✔
1544
    blockDataDestroy(pb);
×
1545
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1546
  }
1547
  return code;
132,757,863✔
1548
}
1549

1550
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
1,698,476✔
1551
  SExchangeInfo* pExchangeInfo = pOperator->info;
1,698,476✔
1552
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,698,476✔
1553

1554
  int32_t code = 0;
1,698,476✔
1555
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
1,698,476✔
1556
  int64_t startTs = taosGetTimestampUs();
1,698,476✔
1557

1558
  int32_t vgId = 0;
1,698,476✔
1559
  if (pExchangeInfo->dynTbname) {
1,698,476✔
1560
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
69,420✔
1561
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
69,420✔
1562
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
69,420✔
1563
      if (pValue != NULL && pValue->isTbname) {
69,420✔
1564
        vgId = pValue->vgId;
69,420✔
1565
        break;
69,420✔
1566
      }
1567
    }
1568
  }
1569

1570
  while (1) {
471,777✔
1571
    if (pExchangeInfo->current >= totalSources) {
2,170,253✔
1572
      setAllSourcesCompleted(pOperator);
449,790✔
1573
      return TSDB_CODE_SUCCESS;
449,790✔
1574
    }
1575

1576
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
1,720,463✔
1577
    if (!pSource) {
1,720,463✔
1578
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1579
      pTaskInfo->code = terrno;
×
1580
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1581
    }
1582

1583
    if (vgId != 0 && pSource->addr.nodeId != vgId){
1,720,463✔
1584
      pExchangeInfo->current += 1;
49,020✔
1585
      continue;
49,020✔
1586
    }
1587

1588
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
1,671,443✔
1589
    if (!pDataInfo) {
1,671,443✔
1590
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1591
      pTaskInfo->code = terrno;
×
1592
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1593
    }
1594
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
1,671,443✔
1595

1596
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
1,671,443✔
1597
    if (code != TSDB_CODE_SUCCESS) {
1,671,443✔
1598
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1599
      pTaskInfo->code = code;
×
1600
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1601
    }
1602

1603
    while (true) {
1,095✔
1604
      code = exchangeWait(pOperator, pExchangeInfo);
1,672,538✔
1605
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
1,672,538✔
1606
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,095✔
1607
      }
1608

1609
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
1,671,443✔
1610
      if (pDataInfo->seqId != currSeqId) {
1,671,443✔
1611
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
1,095✔
1612
        taosMemoryFreeClear(pDataInfo->pRsp);
1,095✔
1613
        continue;
1,095✔
1614
      }
1615

1616
      break;
1,670,348✔
1617
    }
1618

1619
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
1,670,348✔
1620
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
611✔
1621
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1622
             tstrerror(pDataInfo->code));
1623
      pOperator->pTaskInfo->code = pDataInfo->code;
611✔
1624
      return pOperator->pTaskInfo->code;
611✔
1625
    }
1626

1627
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
1,669,737✔
1628
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
1,669,737✔
1629

1630
    if (pRsp->numOfRows == 0) {
1,669,737✔
1631
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
422,757✔
1632
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1633
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1634
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1635

1636
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
422,757✔
1637
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
422,757✔
1638
        pExchangeInfo->current = totalSources;
342,392✔
1639
      } else {
1640
        pExchangeInfo->current += 1;
80,365✔
1641
      }
1642
      taosMemoryFreeClear(pDataInfo->pRsp);
422,757✔
1643
      continue;
422,757✔
1644
    }
1645

1646
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
1,246,980✔
1647
    if (code != TSDB_CODE_SUCCESS) {
1,246,980✔
1648
      goto _error;
×
1649
    }
1650

1651
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
1,246,980✔
1652
    if (pRsp->completed == 1) {
1,246,980✔
1653
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
106,275✔
1654
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1655
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1656
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1657
             pExchangeInfo->current + 1, totalSources);
1658

1659
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
106,275✔
1660
      if (isVstbScan(pDataInfo)) {
106,275✔
1661
        pExchangeInfo->current = totalSources;
×
1662
      } else {
1663
        pExchangeInfo->current += 1;
106,275✔
1664
      }
1665
    } else {
1666
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,140,705✔
1667
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1668
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1669
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1670
    }
1671
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
1,246,980✔
1672
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
1,051,401✔
1673
    }
1674
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
1,246,980✔
1675
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
1,246,980✔
1676

1677
    taosMemoryFreeClear(pDataInfo->pRsp);
1,246,980✔
1678
    return TSDB_CODE_SUCCESS;
1,246,980✔
1679
  }
1680

1681
_error:
×
1682
  pTaskInfo->code = code;
×
1683
  return code;
×
1684
}
1685

1686
void clearVtbScanDataInfo(void* pItem) {
328,483✔
1687
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
328,483✔
1688
  if (pInfo->orgTbInfo) {
328,483✔
1689
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
×
1690
    taosMemoryFreeClear(pInfo->orgTbInfo);
×
1691
  }
1692
  if (pInfo->batchOrgTbInfo) {
328,483✔
1693
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
×
1694
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
×
1695
      if (pColMap) {
×
1696
        taosArrayDestroy(pColMap->colMap);
×
1697
      }
1698
    }
1699
    taosArrayDestroy(pInfo->batchOrgTbInfo);
×
1700
  }
1701
  if (pInfo->tagList) {
328,483✔
1702
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
×
1703
    pInfo->tagList = NULL;
×
1704
  }
1705
  taosArrayDestroy(pInfo->pSrcUidList);
328,483✔
1706
}
328,483✔
1707

1708
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
×
1709
  int32_t  code = TSDB_CODE_SUCCESS;
×
1710
  int32_t  lino = 0;
×
1711
  STagVal  dstTag;
×
1712
  bool     needFree = false;
×
1713

1714
  if (pDataInfo->tagList) {
×
1715
    taosArrayClear(pDataInfo->tagList);
×
1716
  }
1717

1718
  if (pBasicParam->tagList) {
×
1719
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
×
1720
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
×
1721

1722
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
×
1723
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
×
1724
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
×
1725

1726
      dstTag = (STagVal){0};
×
1727
      dstTag.type = pSrcTag->type;
×
1728
      dstTag.cid = pSrcTag->cid;
×
1729
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
×
1730
        dstTag.nData = pSrcTag->nData;
×
1731
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
×
1732
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
×
1733
        needFree = true;
×
1734
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
×
1735
      } else {
1736
        dstTag.i64 = pSrcTag->i64;
×
1737
      }
1738

1739
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
×
1740
      needFree = false;
×
1741
    }
1742
  } else {
1743
    pDataInfo->tagList = NULL;
×
1744
  }
1745

1746
  return code;
×
1747
_return:
×
1748
  if (needFree) {
×
1749
    taosMemoryFreeClear(dstTag.pData);
×
1750
  }
1751
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1752
  return code;
×
1753
}
1754

1755
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
×
1756
  int32_t     code = TSDB_CODE_SUCCESS;
×
1757
  int32_t     lino = 0;
×
1758
  SOrgTbInfo  dstOrgTbInfo = {0};
×
1759
  bool        needFree = false;
×
1760

1761
  if (pBasicParam->batchOrgTbInfo) {
×
1762
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
1763
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
×
1764

1765
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
×
1766
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
×
1767
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
×
1768

1769
      dstOrgTbInfo = (SOrgTbInfo){0};
×
1770
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
×
1771
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
×
1772

1773
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
×
1774
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
×
1775

1776
      needFree = true;
×
1777
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
×
1778
      needFree = false;
×
1779
    }
1780
  } else {
1781
    pBasicParam->batchOrgTbInfo = NULL;
×
1782
  }
1783

1784
  return code;
×
1785
_return:
×
1786
  if (needFree) {
×
1787
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
1788
  }
1789
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1790
  return code;
×
1791
}
1792

1793
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
1,737,388✔
1794
  int32_t            code = TSDB_CODE_SUCCESS;
1,737,388✔
1795
  int32_t            lino = 0;
1,737,388✔
1796
  SExchangeInfo*     pExchangeInfo = pOperator->info;
1,737,388✔
1797
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
1,737,388✔
1798

1799
  if (NULL == pIdx) {
1,737,388✔
1800
    if (pBasicParam->isNewDeployed) {
2,200✔
1801
      SDownstreamSourceNode *pNode = NULL;
2,200✔
1802
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,200✔
1803
      QUERY_CHECK_CODE(code, lino, _return);
2,200✔
1804

1805
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,200✔
1806
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,200✔
1807
      QUERY_CHECK_CODE(code, lino, _return);
2,200✔
1808

1809
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,200✔
1810
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,200✔
1811

1812
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,200✔
1813
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,200✔
1814
      if (pExchangeInfo->pHashSources) {
2,200✔
1815
        QUERY_CHECK_CODE(code, lino, _return);
2,200✔
1816
      }
1817
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,200✔
1818
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,200✔
1819
    } else {
1820
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
1821
      return TSDB_CODE_INVALID_PARA;
×
1822
    }
1823
  }
1824

1825
  qDebug("start to add single exchange source");
1,737,388✔
1826

1827
  switch (pBasicParam->type) {
1,737,388✔
1828
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
×
1829
      if (pIdx->inUseIdx < 0) {
×
1830
        SSourceDataInfo dataInfo = {0};
×
1831
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
×
1832
        dataInfo.taskId = pExchangeInfo->pTaskId;
×
1833
        dataInfo.index = pIdx->srcIdx;
×
1834
        dataInfo.groupid = pBasicParam->groupid;
×
1835
        dataInfo.window = pBasicParam->window;
×
1836
        dataInfo.isNewParam = pBasicParam->isNewParam;
×
1837
        dataInfo.batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
×
1838
        QUERY_CHECK_NULL(dataInfo.batchOrgTbInfo, code, lino, _return, terrno);
×
1839

1840
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
×
1841
        QUERY_CHECK_CODE(code, lino, _return);
×
1842

1843
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
×
1844
        QUERY_CHECK_CODE(code, lino, _return);
×
1845

1846
        dataInfo.orgTbInfo = NULL;
×
1847

1848
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1849
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
×
1850

1851
        dataInfo.type = pBasicParam->type;
×
1852
        dataInfo.srcOpType = pBasicParam->srcOpType;
×
1853
        dataInfo.tableSeq = pBasicParam->tableSeq;
×
1854

1855
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
×
1856

1857
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
×
1858
      } else {
1859
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
×
1860
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
×
1861

1862
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
×
1863
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
1864
        }
1865

1866
        pDataInfo->taskId = pExchangeInfo->pTaskId;
×
1867
        pDataInfo->index = pIdx->srcIdx;
×
1868
        pDataInfo->window = pBasicParam->window;
×
1869
        pDataInfo->groupid = pBasicParam->groupid;
×
1870
        pDataInfo->isNewParam = pBasicParam->isNewParam;
×
1871

1872
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
×
1873
        QUERY_CHECK_CODE(code, lino, _return);
×
1874

1875
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
×
1876
        QUERY_CHECK_CODE(code, lino, _return);
×
1877

1878
        pDataInfo->orgTbInfo = NULL;
×
1879

1880
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
×
1881
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
×
1882

1883
        pDataInfo->type = pBasicParam->type;
×
1884
        pDataInfo->srcOpType = pBasicParam->srcOpType;
×
1885
        pDataInfo->tableSeq = pBasicParam->tableSeq;
×
1886
      }
1887
      break;
×
1888
    }
1889
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
78,725✔
1890
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
1891
      SSourceDataInfo dataInfo = {0};
78,725✔
1892
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
78,725✔
1893
      dataInfo.taskId = pExchangeInfo->pTaskId;
78,725✔
1894
      dataInfo.index = pIdx->srcIdx;
78,725✔
1895
      dataInfo.window = pBasicParam->window;
78,725✔
1896
      dataInfo.groupid = 0;
78,725✔
1897
      dataInfo.orgTbInfo = NULL;
78,725✔
1898
      dataInfo.tagList = NULL;
78,725✔
1899

1900
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
78,725✔
1901
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
78,725✔
1902

1903
      dataInfo.isNewParam = false;
78,725✔
1904
      dataInfo.type = pBasicParam->type;
78,725✔
1905
      dataInfo.srcOpType = pBasicParam->srcOpType;
78,725✔
1906
      dataInfo.tableSeq = pBasicParam->tableSeq;
78,725✔
1907

1908
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
78,725✔
1909
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
157,450✔
1910
      break;
78,725✔
1911
    }
1912
    case EX_SRC_TYPE_VSTB_SCAN: {
1,490,250✔
1913
      SSourceDataInfo dataInfo = {0};
1,490,250✔
1914
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
1,490,250✔
1915
      dataInfo.taskId = pExchangeInfo->pTaskId;
1,490,250✔
1916
      dataInfo.index = pIdx->srcIdx;
1,490,250✔
1917
      dataInfo.window = pBasicParam->window;
1,490,250✔
1918
      dataInfo.groupid = 0;
1,490,250✔
1919
      dataInfo.isNewParam = pBasicParam->isNewParam;
1,490,250✔
1920
      dataInfo.tagList = NULL;
1,490,250✔
1921
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
1,490,250✔
1922
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
1,490,250✔
1923
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
1,490,250✔
1924
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
1,490,250✔
1925
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
1,490,250✔
1926
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
1,490,250✔
1927

1928
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
1,490,250✔
1929
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
1,490,250✔
1930

1931
      dataInfo.type = pBasicParam->type;
1,490,250✔
1932
      dataInfo.srcOpType = pBasicParam->srcOpType;
1,490,250✔
1933
      dataInfo.tableSeq = pBasicParam->tableSeq;
1,490,250✔
1934

1935
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
1,490,250✔
1936
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
2,980,500✔
1937
      break;
1,490,250✔
1938
    }
1939
    case EX_SRC_TYPE_STB_JOIN_SCAN:
168,413✔
1940
    default: {
1941
      if (pIdx->inUseIdx < 0) {
168,413✔
1942
        SSourceDataInfo dataInfo = {0};
166,283✔
1943
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
166,283✔
1944
        dataInfo.taskId = pExchangeInfo->pTaskId;
166,283✔
1945
        dataInfo.index = pIdx->srcIdx;
166,283✔
1946
        dataInfo.groupid = 0;
166,283✔
1947
        dataInfo.tagList = NULL;
166,283✔
1948

1949
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
166,283✔
1950
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
166,283✔
1951

1952
        dataInfo.isNewParam = false;
166,283✔
1953
        dataInfo.type = pBasicParam->type;
166,283✔
1954
        dataInfo.srcOpType = pBasicParam->srcOpType;
166,283✔
1955
        dataInfo.tableSeq = pBasicParam->tableSeq;
166,283✔
1956

1957
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
332,566✔
1958

1959
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
166,283✔
1960
      } else {
1961
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,130✔
1962
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,130✔
1963
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,130✔
1964
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,130✔
1965
        }
1966

1967
        pDataInfo->tagList = NULL;
2,130✔
1968
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,130✔
1969
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,130✔
1970

1971
        pDataInfo->groupid = 0;
2,130✔
1972
        pDataInfo->isNewParam = false;
2,130✔
1973
        pDataInfo->type = pBasicParam->type;
2,130✔
1974
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,130✔
1975
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,130✔
1976
      }
1977
      break;
168,413✔
1978
    }
1979
  }
1980

1981
  return code;
1,737,388✔
1982
_return:
×
1983
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1984
  return code;
×
1985
}
1986

1987
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
1,668,024✔
1988
  SExchangeInfo*               pExchangeInfo = pOperator->info;
1,668,024✔
1989
  int32_t                      code = TSDB_CODE_SUCCESS;
1,668,024✔
1990
  SExchangeOperatorBasicParam* pBasicParam = NULL;
1,668,024✔
1991
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
1,668,024✔
1992
  if (pParam->multiParams) {
1,668,024✔
1993
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
94,789✔
1994
    int32_t                      iter = 0;
94,789✔
1995
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
258,942✔
1996
      code = addSingleExchangeSource(pOperator, pBasicParam);
164,153✔
1997
      if (code) {
164,153✔
1998
        return code;
×
1999
      }
2000
    }
2001
  } else {
2002
    pBasicParam = &pParam->basic;
1,573,235✔
2003
    code = addSingleExchangeSource(pOperator, pBasicParam);
1,573,235✔
2004
  }
2005

2006
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
1,668,024✔
2007
  pOperator->pOperatorGetParam = NULL;
1,668,024✔
2008

2009
  return code;
1,668,024✔
2010
}
2011

2012
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
509,670,665✔
2013
  SExchangeInfo* pExchangeInfo = pOperator->info;
509,670,665✔
2014
  int32_t        code = TSDB_CODE_SUCCESS;
509,674,834✔
2015
  int32_t        lino = 0;
509,674,834✔
2016
  
2017
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
509,674,834✔
2018
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
106,031,535✔
2019
    qDebug("skip prepare, opened:%d, dynamicOp:%d, getParam:%p", OPTR_IS_OPENED(pOperator), pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
404,062,454✔
2020
    return TSDB_CODE_SUCCESS;
404,062,538✔
2021
  }
2022

2023
  if (pExchangeInfo->dynamicOp) {
105,612,683✔
2024
    code = addDynamicExchangeSource(pOperator);
1,668,024✔
2025
    QUERY_CHECK_CODE(code, lino, _end);
1,668,024✔
2026
  }
2027

2028
  if (pOperator->status == OP_NOT_OPENED && (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) || IS_STREAM_MODE(pOperator->pTaskInfo)) {
105,609,022✔
2029
    pExchangeInfo->current = 0;
6,825,031✔
2030
  }
2031

2032
  int64_t st = taosGetTimestampUs();
105,603,943✔
2033

2034
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
105,603,943✔
2035
    code = prepareConcurrentlyLoad(pOperator);
98,695,350✔
2036
    QUERY_CHECK_CODE(code, lino, _end);
98,704,170✔
2037
    pExchangeInfo->openedTs = taosGetTimestampUs();
98,704,170✔
2038
  }
2039

2040
  OPTR_SET_OPENED(pOperator);
105,621,609✔
2041
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
105,609,822✔
2042

2043
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
105,609,838✔
2044

2045
_end:
35,796,470✔
2046
  if (code != TSDB_CODE_SUCCESS) {
105,609,769✔
2047
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2048
    pOperator->pTaskInfo->code = code;
×
2049
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2050
  }
2051
  return TSDB_CODE_SUCCESS;
105,609,769✔
2052
}
2053

2054
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
3,463,855✔
2055
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,463,855✔
2056

2057
  if (pLimitInfo->remainGroupOffset > 0) {
3,463,855✔
2058
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2059
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2060
      blockDataCleanup(pBlock);
×
2061
      return PROJECT_RETRIEVE_CONTINUE;
×
2062
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2063
      // now it is the data from a new group
2064
      pLimitInfo->remainGroupOffset -= 1;
×
2065

2066
      // ignore data block in current group
2067
      if (pLimitInfo->remainGroupOffset > 0) {
×
2068
        blockDataCleanup(pBlock);
×
2069
        return PROJECT_RETRIEVE_CONTINUE;
×
2070
      }
2071
    }
2072

2073
    // set current group id of the project operator
2074
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2075
  }
2076

2077
  // here check for a new group data, we need to handle the data of the previous group.
2078
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
3,463,855✔
2079
    pLimitInfo->numOfOutputGroups += 1;
208,820✔
2080
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
208,820✔
2081
      pOperator->status = OP_EXEC_DONE;
×
2082
      blockDataCleanup(pBlock);
×
2083

2084
      return PROJECT_RETRIEVE_DONE;
×
2085
    }
2086

2087
    // reset the value for a new group data
2088
    resetLimitInfoForNextGroup(pLimitInfo);
208,820✔
2089
    // existing rows that belongs to previous group.
2090
    if (pBlock->info.rows > 0) {
208,820✔
2091
      return PROJECT_RETRIEVE_DONE;
208,820✔
2092
    }
2093
  }
2094

2095
  // here we reach the start position, according to the limit/offset requirements.
2096

2097
  // set current group id
2098
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,255,035✔
2099

2100
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,255,035✔
2101
  if (pBlock->info.rows == 0) {
3,255,035✔
2102
    return PROJECT_RETRIEVE_CONTINUE;
1,762,368✔
2103
  } else {
2104
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,492,667✔
2105
      setOperatorCompleted(pOperator);
×
2106
      return PROJECT_RETRIEVE_DONE;
×
2107
    }
2108
  }
2109

2110
  // todo optimize performance
2111
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2112
  // they may not belong to the same group the limit/offset value is not valid in this case.
2113
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,492,667✔
2114
    return PROJECT_RETRIEVE_DONE;
1,492,667✔
2115
  } else {  // not full enough, continue to accumulate the output data in the buffer.
2116
    return PROJECT_RETRIEVE_CONTINUE;
×
2117
  }
2118
}
2119

2120
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
175,579,355✔
2121
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
175,579,355✔
2122
  int32_t        code = TSDB_CODE_SUCCESS;
175,579,639✔
2123
  if (pTask->pWorkerCb) {
175,579,639✔
2124
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
175,581,054✔
2125
    if (code != TSDB_CODE_SUCCESS) {
175,581,054✔
2126
      pTask->code = code;
×
2127
      return pTask->code;
×
2128
    }
2129
  }
2130

2131
  code = tsem_wait(&pExchangeInfo->ready);
175,579,639✔
2132
  if (code != TSDB_CODE_SUCCESS) {
175,578,703✔
2133
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2134
    pTask->code = code;
×
2135
    return pTask->code;
×
2136
  }
2137

2138
  if (pTask->pWorkerCb) {
175,578,703✔
2139
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
175,580,770✔
2140
    if (code != TSDB_CODE_SUCCESS) {
175,581,391✔
2141
      pTask->code = code;
×
2142
      return pTask->code;
×
2143
    }
2144
  }
2145
  return TSDB_CODE_SUCCESS;
175,580,770✔
2146
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc