• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5016

03 Apr 2026 03:59PM UTC coverage: 72.299% (+0.01%) from 72.289%
#5016

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4055 of 5985 new or added lines in 68 files covered. (67.75%)

13126 existing lines in 156 files now uncovered.

257424 of 356056 relevant lines covered (72.3%)

133108577.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.17
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
  uint64_t            fetchTimes;   // per-source fetch count
58
  int64_t             fetchCostUs;  // per-source total RPC round-trip (us)
59
} SSourceDataInfo;
60

61
static void destroyExchangeOperatorInfo(void* param);
62
static void freeBlock(void* pParam);
63
static void freeSourceDataInfo(void* param);
64
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
65

66
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
67
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
68
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
69
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
70
static void    storeNotifyInfo(SOperatorInfo* pOperator);
71
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
72
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
73
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
74
                                 bool holdDataInBuf);
75
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
76

77
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
78

79
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
35,248,136✔
80
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
81
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
82
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
18,572,094✔
83
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
84

85

86
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
16,919,628✔
87
                                             SExchangeInfo* pExchangeInfo,
88
                                             SExecTaskInfo* pTaskInfo) {
89
  int32_t code = 0;
16,919,628✔
90
  int32_t lino = 0;
16,919,628✔
91
  int64_t startTs = taosGetTimestampUs();  
16,921,023✔
92
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
16,921,023✔
93
  int32_t completed = 0;
16,920,791✔
94
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
16,921,023✔
95
  if (code != TSDB_CODE_SUCCESS) {
16,920,089✔
96
    pTaskInfo->code = code;
×
97
    T_LONG_JMP(pTaskInfo->env, code);
×
98
  }
99
  if (completed == totalSources) {
16,920,089✔
100
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
2,865,017✔
101
    setAllSourcesCompleted(pOperator);
2,865,017✔
102
    return;
3,514,206✔
103
  }
104

105
  SSourceDataInfo* pDataInfo = NULL;
14,055,072✔
106
  SStreamRuntimeFuncInfo* pStream = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
14,055,072✔
107

108
  while (1) {
5,864,819✔
109
    if (pExchangeInfo->current < 0) {
19,920,379✔
110
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
145,171✔
111
      setAllSourcesCompleted(pOperator);
145,171✔
112
      return;
145,171✔
113
    }
114
    
115
    if (pExchangeInfo->current >= totalSources) {
19,775,196✔
116
      completed = 0;
9,924,160✔
117
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
9,924,160✔
118
      if (code != TSDB_CODE_SUCCESS) {
9,924,160✔
UNCOV
119
        pTaskInfo->code = code;
×
120
        T_LONG_JMP(pTaskInfo->env, code);
×
121
      }
122
      if (completed == totalSources) {
9,924,160✔
123
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
4,277,701✔
124
        setAllSourcesCompleted(pOperator);
4,277,701✔
125
        return;
4,277,701✔
126
      }
127
      
128
      pExchangeInfo->current = 0;
5,646,459✔
129
    }
130

131
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
15,497,495✔
132

133
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
15,497,744✔
134
    if (!pDataInfo) {
15,497,603✔
UNCOV
135
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
136
      pTaskInfo->code = terrno;
×
137
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
138
    }
139

140
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
15,497,603✔
141
      pExchangeInfo->current++;
633,545✔
142
      continue;
633,545✔
143
    }
144

145
    if (!IS_STREAM_SINGLE_GRP(pTaskInfo) && pStream->pGroupReadInfos) {
14,864,290✔
NEW
146
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
×
NEW
147
      if (!pDataInfo) {
×
NEW
148
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
NEW
149
        pTaskInfo->code = terrno;
×
NEW
150
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
151
      }
152

NEW
153
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH) {
×
NEW
154
        SArray** ppNode = tSimpleHashGet(pStream->pGroupReadInfos, &pSource->addr.nodeId, sizeof(pSource->addr.nodeId));
×
NEW
155
        if (NULL == ppNode) {
×
NEW
156
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
×
NEW
157
          pExchangeInfo->current++;
×
NEW
158
          continue;
×
159
        }
160

NEW
161
        pStream->curNodeId = pSource->addr.nodeId;
×
NEW
162
        pStream->curGrpRead = *ppNode;
×
163
      }
164
    }
165

166
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
14,863,510✔
167

168
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
14,864,663✔
169
    if (code != TSDB_CODE_SUCCESS) {
14,864,431✔
170
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
171
      pTaskInfo->code = code;
×
172
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
173
    }
174

175
    while (true) {
1,794✔
176
      recordOpExecBeforeDownstream(pOperator);
14,866,225✔
177
      code = exchangeWait(pOperator, pExchangeInfo);
14,866,225✔
178
      recordOpExecAfterDownstream(pOperator, 0);
14,866,457✔
179

180
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
14,866,457✔
181
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,886✔
182
      }
183

184
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
14,864,571✔
185
      if (pDataInfo->seqId != currSeqId) {
14,864,430✔
186
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
1,794✔
187
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
188
        taosMemoryFreeClear(pDataInfo->pRsp);
1,794✔
189
        continue;
1,794✔
190
      }
191

192
      break;
14,862,777✔
193
    }
194

195
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
14,862,777✔
196
    if (!pSource) {
14,862,777✔
197
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
23✔
198
      pTaskInfo->code = terrno;
23✔
UNCOV
199
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
200
    }
201

202
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
14,862,754✔
203
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
464✔
204
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
205
             tstrerror(pDataInfo->code));
206
      pTaskInfo->code = pDataInfo->code;
464✔
207
      T_LONG_JMP(pTaskInfo->env, code);
464✔
208
    }
209

210
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
14,862,117✔
211
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
14,862,117✔
212

213
    if (pRsp->numOfRows == 0) {
14,862,290✔
214
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
5,231,274✔
215
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
216
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
217
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
218

219
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
5,231,274✔
220
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
5,231,274✔
221
        pExchangeInfo->current = -1;
145,171✔
222
      } else {
223
        pExchangeInfo->current += 1;
5,086,103✔
224
      }
225
      taosMemoryFreeClear(pDataInfo->pRsp);
5,231,274✔
226
      continue;
5,231,274✔
227
    }
228

229
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
9,631,016✔
230
    TAOS_CHECK_EXIT(code);
9,631,039✔
231

232
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
9,631,039✔
233
    if (pRsp->completed == 1) {
9,631,039✔
234
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
3,043,961✔
235
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
236
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
237
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
238
             pExchangeInfo->current + 1, totalSources);
239

240
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
3,043,961✔
241
      if (isVstbScan(pDataInfo)) {
3,043,961✔
UNCOV
242
        pExchangeInfo->current = -1;
×
UNCOV
243
        taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
244
        continue;
×
245
      }
246
    } else {
247
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
6,587,078✔
248
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
249
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
250
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
251
    }
252

253
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
9,631,039✔
254
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
9,631,039✔
255

256
    pExchangeInfo->current++;
9,631,039✔
257

258
    taosMemoryFreeClear(pDataInfo->pRsp);
9,631,039✔
259
    return;
9,631,039✔
260
  }
261

UNCOV
262
_exit:
×
263

264
  if (code) {
×
265
    pTaskInfo->code = code;
×
266
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
267
  }
268
}
269

270

271
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
247,835,146✔
272
                                           SExecTaskInfo* pTaskInfo) {
273
  int32_t code = 0;
247,835,146✔
274
  int32_t lino = 0;
247,835,146✔
275
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
247,835,146✔
276
  int32_t completed = 0;
247,833,619✔
277
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
247,834,003✔
278
  if (code != TSDB_CODE_SUCCESS) {
247,831,620✔
UNCOV
279
    pTaskInfo->code = code;
×
UNCOV
280
    T_LONG_JMP(pTaskInfo->env, code);
×
281
  }
282
  if (completed == totalSources) {
247,831,620✔
283
    setAllSourcesCompleted(pOperator);
83,457,395✔
284
    return;
83,458,448✔
285
  }
286

287
  SSourceDataInfo* pDataInfo = NULL;
164,374,225✔
288

289
  while (1) {
28,714,559✔
290
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
193,088,784✔
291
    recordOpExecBeforeDownstream(pOperator);
193,090,874✔
292
    code = exchangeWait(pOperator, pExchangeInfo);
193,093,127✔
293
    recordOpExecAfterDownstream(pOperator, 0);
193,094,425✔
294

295
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
193,093,088✔
296
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
3,291✔
297
    }
298

299
    for (int32_t i = 0; i < totalSources; ++i) {
343,527,930✔
300
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
343,527,930✔
301
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
343,528,458✔
302
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
343,528,458✔
303
        continue;
120,976,645✔
304
      }
305

306
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
222,552,360✔
307
        continue;
29,460,151✔
308
      }
309

310
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
193,092,209✔
311
      if (pDataInfo->seqId != currSeqId) {
193,091,825✔
UNCOV
312
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
UNCOV
313
        taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
314
        break;
×
315
      }
316

317
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
193,091,134✔
318
        code = pDataInfo->code;
3,640✔
319
        TAOS_CHECK_EXIT(code);
3,640✔
320
      }
321

322
      tmemory_barrier();
193,088,041✔
323
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
193,088,041✔
324
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
193,088,185✔
325
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
193,088,569✔
326

327
      // todo
328
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
193,088,569✔
329
      if (pRsp->numOfRows == 0) {
193,088,022✔
330
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
49,620,214✔
UNCOV
331
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
UNCOV
332
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
UNCOV
333
          if (code != TSDB_CODE_SUCCESS) {
×
334
            taosMemoryFreeClear(pDataInfo->pRsp);
×
335
            TAOS_CHECK_EXIT(code);
×
336
          }
337
        } else {
338
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
49,620,214✔
339
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
49,620,744✔
340
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
341
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
342
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
343
          taosMemoryFreeClear(pDataInfo->pRsp);
49,620,744✔
344
        }
345
        break;
49,620,744✔
346
      }
347

348
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
143,467,825✔
349

350
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
143,466,763✔
351
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
143,467,317✔
352
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
143,466,211✔
353

354
      if (pRsp->completed == 1) {
143,465,317✔
355
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
130,088,125✔
356
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
130,087,572✔
357
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
358
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
359
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
360
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
361
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
362
      } else {
363
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
13,379,170✔
364
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
365
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
366
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
367
      }
368

369
      taosMemoryFreeClear(pDataInfo->pRsp);
143,466,742✔
370

371
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
143,468,885✔
372
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
13,379,170✔
373
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
13,379,170✔
374
        if (code != TSDB_CODE_SUCCESS) {
13,379,170✔
UNCOV
375
          taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
376
          TAOS_CHECK_EXIT(code);
×
377
        }
378
      }
379
      
380
      return;
143,466,420✔
381
    }  // end loop
382

383
    int32_t complete1 = 0;
49,620,744✔
384
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
49,620,744✔
385
    if (code != TSDB_CODE_SUCCESS) {
49,620,360✔
UNCOV
386
      pTaskInfo->code = code;
×
UNCOV
387
      T_LONG_JMP(pTaskInfo->env, code);
×
388
    }
389
    if (complete1 == totalSources) {
49,620,360✔
390
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
20,905,801✔
391
      return;
20,905,801✔
392
    }
393
  }
394

395
_exit:
3,110✔
396

397
  if (code) {
3,640✔
398
    pTaskInfo->code = code;
3,640✔
399
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
3,640✔
400
  }
401
}
402

403
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
653,195,025✔
404
  int32_t        code = TSDB_CODE_SUCCESS;
653,195,025✔
405
  SExchangeInfo* pExchangeInfo = pOperator->info;
653,195,025✔
406
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
653,195,296✔
407

408
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
653,195,436✔
409

410
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
653,193,200✔
411
  if (pOperator->status == OP_EXEC_DONE) {
653,193,200✔
UNCOV
412
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
413
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
414
           pLoadInfo->totalElapsed / 1000.0);
UNCOV
415
    return NULL;
×
416
  }
417

418
  // we have buffered retrieved datablock, return it directly
419
  SSDataBlock* p = NULL;
653,192,614✔
420
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
653,192,752✔
421
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
337,324,917✔
422
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
337,323,533✔
423
  }
424

425
  if (p != NULL) {
653,197,278✔
426
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
337,328,385✔
427
    if (!tmp) {
337,326,488✔
UNCOV
428
      code = terrno;
×
UNCOV
429
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
430
      pTaskInfo->code = code;
×
UNCOV
431
      T_LONG_JMP(pTaskInfo->env, code);
×
432
    }
433
    return p;
337,326,488✔
434
  } else {
435
    if (pExchangeInfo->seqLoadData && (IS_NON_STREAM_MODE(pTaskInfo) || IS_STREAM_SINGLE_GRP(pTaskInfo))) {
315,868,893✔
436
      code = seqLoadRemoteData(pOperator);
51,114,313✔
437
      if (code != TSDB_CODE_SUCCESS) {
51,113,910✔
438
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1,291✔
439
        pTaskInfo->code = code;
1,291✔
440
        T_LONG_JMP(pTaskInfo->env, code);
1,291✔
441
      }
442
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
264,754,888✔
443
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
16,920,582✔
444
    } else {
445
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
247,832,330✔
446
    }
447
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
315,865,212✔
448
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
4,309✔
449
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
4,309✔
450
    }
451
    
452
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
315,860,087✔
453
      qDebug("empty resultBlockList");
125,145,559✔
454
      return NULL;
125,145,559✔
455
    } else {
456
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
190,715,550✔
457
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
190,714,893✔
458
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
190,715,036✔
459
      if (!tmp) {
190,715,977✔
UNCOV
460
        code = terrno;
×
UNCOV
461
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
462
        pTaskInfo->code = code;
×
UNCOV
463
        T_LONG_JMP(pTaskInfo->env, code);
×
464
      }
465

466
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
190,715,977✔
467
      return p;
190,715,458✔
468
    }
469
  }
470
}
471

472
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
651,270,374✔
473
  int32_t        code = TSDB_CODE_SUCCESS;
651,270,374✔
474
  int32_t        lino = 0;
651,270,374✔
475
  SExchangeInfo* pExchangeInfo = pOperator->info;
651,270,374✔
476
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
651,280,340✔
477

478
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
651,269,347✔
479

480
  code = pOperator->fpSet._openFn(pOperator);
651,286,511✔
481
  QUERY_CHECK_CODE(code, lino, _end);
651,289,061✔
482

483
  if (pOperator->status == OP_EXEC_DONE) {
651,289,061✔
484
    (*ppRes) = NULL;
149,520✔
485
    return code;
149,520✔
486
  }
487

488
  while (1) {
2,056,246✔
489
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
653,196,555✔
490
    if (pBlock == NULL) {
653,187,982✔
491
      (*ppRes) = NULL;
125,145,559✔
492
      return code;
125,145,559✔
493
    }
494

495
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
528,042,423✔
496
    QUERY_CHECK_CODE(code, lino, _end);
528,040,188✔
497

498
    if (blockDataGetNumOfRows(pBlock) == 0) {
528,040,188✔
499
      qDebug("rows 0 block got, continue next load");
3,372✔
500
      continue;
3,372✔
501
    }
502

503
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
528,037,584✔
504
    if (hasLimitOffsetInfo(pLimitInfo)) {
528,038,906✔
505
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
4,241,816✔
506
      if (status == PROJECT_RETRIEVE_CONTINUE) {
4,241,816✔
507
        qDebug("limit retrieve continue");
2,052,874✔
508
        continue;
2,052,874✔
509
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,188,942✔
510
        if (pBlock->info.rows == 0) {
2,188,942✔
UNCOV
511
          setOperatorCompleted(pOperator);
×
UNCOV
512
          (*ppRes) = NULL;
×
UNCOV
513
          return code;
×
514
        } else {
515
          (*ppRes) = pBlock;
2,188,942✔
516
          return code;
2,188,942✔
517
        }
518
      }
519
    } else {
520
      (*ppRes) = pBlock;
523,796,120✔
521
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
523,796,730✔
522
      return code;
523,798,368✔
523
    }
524
  }
525

UNCOV
526
_end:
×
527

UNCOV
528
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
529
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
530
    pTaskInfo->code = code;
×
UNCOV
531
    T_LONG_JMP(pTaskInfo->env, code);
×
532
  } else {
533
    qDebug("empty block returned in exchange");
×
534
  }
535
  
UNCOV
536
  (*ppRes) = NULL;
×
UNCOV
537
  return code;
×
538
}
539

540
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
111,884,209✔
541
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
111,884,209✔
542
  if (pInfo->pSourceDataInfo == NULL) {
111,895,558✔
UNCOV
543
    return terrno;
×
544
  }
545

546
  if (pInfo->dynamicOp) {
111,890,319✔
547
    return TSDB_CODE_SUCCESS;
7,733,491✔
548
  }
549

550
  int32_t len = strlen(id) + 1;
104,160,555✔
551
  pInfo->pTaskId = taosMemoryCalloc(1, len);
104,160,555✔
552
  if (!pInfo->pTaskId) {
104,152,718✔
553
    return terrno;
×
554
  }
555
  tstrncpy(pInfo->pTaskId, id, len);
104,146,379✔
556
  for (int32_t i = 0; i < numOfSources; ++i) {
277,016,560✔
557
    SSourceDataInfo dataInfo = {0};
172,849,342✔
558
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
172,844,607✔
559
    dataInfo.taskId = pInfo->pTaskId;
172,844,607✔
560
    dataInfo.index = i;
172,848,385✔
561
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
172,848,385✔
562
    if (pDs == NULL) {
172,868,700✔
UNCOV
563
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
UNCOV
564
      return terrno;
×
565
    }
566
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
172,868,700✔
567
  }
568

569
  return TSDB_CODE_SUCCESS;
104,167,218✔
570
}
571

572
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
111,893,239✔
573
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
111,893,239✔
574

575
  if (numOfSources == 0) {
111,888,660✔
UNCOV
576
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
UNCOV
577
    return TSDB_CODE_INVALID_PARA;
×
578
  }
579
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
111,888,660✔
580
  if (!pInfo->pFetchRpcHandles) {
111,897,159✔
UNCOV
581
    return terrno;
×
582
  }
583
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
111,889,367✔
584
  if (!ret) {
111,888,783✔
585
    return terrno;
×
586
  }
587

588
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
111,888,783✔
589
  if (pInfo->pSources == NULL) {
111,887,936✔
UNCOV
590
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
591
    return terrno;
×
592
  }
593

594
  if (pExNode->node.dynamicOp) {
111,889,538✔
595
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
7,733,491✔
596
    if (NULL == pInfo->pHashSources) {
7,733,522✔
UNCOV
597
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
598
      return terrno;
×
599
    }
600
  }
601

602
  for (int32_t i = 0; i < numOfSources; ++i) {
306,434,794✔
603
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
194,541,276✔
604
    if (!pNode) {
194,534,984✔
UNCOV
605
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
606
      return terrno;
×
607
    }
608
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
194,534,984✔
609
    if (!tmp) {
194,547,555✔
UNCOV
610
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
611
      return terrno;
×
612
    }
613
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
194,547,555✔
614
    int32_t           code =
615
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
194,545,264✔
616
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
194,539,111✔
UNCOV
617
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
618
      return code;
×
619
    }
620
  }
621

622
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
111,893,518✔
623
  int64_t refId = taosAddRef(fetchObjRefPool, pInfo);
111,888,648✔
624
  if (refId < 0) {
111,880,276✔
UNCOV
625
    int32_t code = terrno;
×
UNCOV
626
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
627
    return code;
×
628
  } else {
629
    pInfo->self = refId;
111,880,276✔
630
  }
631

632
  return initDataSource(numOfSources, pInfo, id);
111,881,879✔
633
}
634

635
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
12,594,582✔
636
  SExchangeInfo* pInfo = pOper->info;
12,594,582✔
637
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
12,595,223✔
638

639
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
12,595,675✔
640

641
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
12,595,970✔
642
  pOper->status = OP_NOT_OPENED;
12,595,947✔
643
  pInfo->current = 0;
12,595,947✔
644
  pInfo->loadInfo.totalElapsed = 0;
12,595,947✔
645
  pInfo->loadInfo.totalRows = 0;
12,595,947✔
646
  pInfo->loadInfo.totalSize = 0;
12,595,947✔
647
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
35,621,464✔
648
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
23,025,975✔
649
    taosWLockLatch(&pDataInfo->lock);
23,025,739✔
650
    taosMemoryFreeClear(pDataInfo->decompBuf);
23,026,879✔
651
    taosMemoryFreeClear(pDataInfo->pRsp);
23,026,879✔
652

653
    pDataInfo->totalRows = 0;
23,026,653✔
654
    pDataInfo->code = 0;
23,026,653✔
655
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
23,026,653✔
656
    pDataInfo->fetchSent = false;
23,026,653✔
657
    pDataInfo->fetchTimes = 0;
23,026,421✔
658
    pDataInfo->fetchCostUs = 0;
23,024,833✔
659
    taosWUnLockLatch(&pDataInfo->lock);
23,024,839✔
660
  }
661

662
  if (pInfo->dynamicOp) {
12,595,744✔
663
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
2,228,695✔
664
  } 
665

666
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
12,595,744✔
667
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
12,594,812✔
668

669
  blockDataCleanup(pInfo->pDummyBlock);
12,595,280✔
670

671
  void   *data = NULL;
12,594,586✔
672
  int32_t iter = 0;
12,594,586✔
673
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
16,355,939✔
674
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
3,760,885✔
675
  }
676
  
677
  pInfo->limitInfo = (SLimitInfo){0};
12,594,795✔
678
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
12,595,027✔
679

680
  return 0;
12,595,282✔
681
}
682

683
static int32_t exchangeGetExplainExecInfo(SOperatorInfo* pOptr,
1,685,149✔
684
                                          void** pOptrExplain, uint32_t* len) {
685
  const SExchangeInfo* pExchangeInfo = pOptr->info;
1,685,149✔
686
  int32_t numSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
1,685,550✔
687

688
  SExchangeExplainInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeExplainInfo));
1,685,550✔
689
  if (!pInfo) {
1,684,743✔
UNCOV
690
    return terrno;
×
691
  }
692

693
  pInfo->mode = pExchangeInfo->seqLoadData ? 1 : 0;
1,684,743✔
694
  pInfo->numSources = numSources;
1,684,743✔
695

696
  /* all sources are exhausted, thus no need to lock the sources data info */
697
  for (int32_t i = 0; i < numSources; ++i) {
3,686,290✔
698
    const SSourceDataInfo* pSrc = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
2,001,146✔
699
    pInfo->avgFetchTimes += (double)pSrc->fetchTimes / numSources;
2,001,953✔
700
    pInfo->avgFetchRows += (double)pSrc->totalRows / numSources;
2,002,354✔
701
    pInfo->avgFetchCost += (double)pSrc->fetchCostUs / numSources;
2,002,354✔
702
    pInfo->maxFetchTimes = TMAX(pInfo->maxFetchTimes, pSrc->fetchTimes);
2,001,953✔
703
    pInfo->maxFetchRows = TMAX(pInfo->maxFetchRows, pSrc->totalRows);
2,001,547✔
704
    pInfo->maxFetchCost = TMAX(pInfo->maxFetchCost, pSrc->fetchCostUs);
2,001,953✔
705
  }
706

707
  *pOptrExplain = pInfo;
1,685,144✔
708
  *len = sizeof(SExchangeExplainInfo);
1,685,149✔
709
  return TSDB_CODE_SUCCESS;
1,685,550✔
710
}
711

712
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
111,895,294✔
713
                                   SOperatorInfo** pOptrInfo) {
714
  QRY_PARAM_CHECK(pOptrInfo);
111,895,294✔
715

716
  int32_t        code = 0;
111,896,704✔
717
  int32_t        lino = 0;
111,896,704✔
718
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
111,896,704✔
719
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
111,884,843✔
720
  if (pInfo == NULL || pOperator == NULL) {
111,885,870✔
721
    code = terrno;
526✔
UNCOV
722
    goto _error;
×
723
  }
724
  initOperatorCostInfo(pOperator);
111,885,344✔
725

726
  pInfo->isExchange = true;
111,889,662✔
727
  pOperator->pPhyNode = pExNode;
111,894,902✔
728
  pInfo->dynamicOp = pExNode->node.dynamicOp;
111,896,497✔
729
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
111,890,890✔
730
  QUERY_CHECK_CODE(code, lino, _error);
111,900,733✔
731

732
  code = tsem_init(&pInfo->ready, 0, 0);
111,900,733✔
733
  QUERY_CHECK_CODE(code, lino, _error);
111,901,263✔
734

735
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
111,901,263✔
736
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
111,902,163✔
737

738
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
111,900,267✔
739
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
111,897,076✔
740
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
111,897,617✔
741
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
111,898,204✔
742

743
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
111,899,174✔
744
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
111,897,680✔
745
  QUERY_CHECK_CODE(code, lino, _error);
111,900,122✔
746

747
  pInfo->seqLoadData = pExNode->seqRecvData;
111,900,122✔
748
  pInfo->dynTbname = pExNode->dynTbname;
111,898,654✔
749
  if (pInfo->dynTbname) {
111,897,576✔
750
    pInfo->seqLoadData = true;
33,710✔
751
  }
752
  pInfo->pTransporter = pTransporter;
111,894,310✔
753

754
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
111,893,836✔
755
                  pTaskInfo);
756
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
111,891,941✔
757

758
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
111,894,123✔
759
                            pTaskInfo->pStreamRuntimeInfo);
111,893,121✔
760
  QUERY_CHECK_CODE(code, lino, _error);
111,893,304✔
761
  qTrace("%s exchange op:%p", __func__, pOperator);
111,893,304✔
762
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL,
111,893,710✔
763
                                         destroyExchangeOperatorInfo, optrDefaultBufFn,
764
                                         exchangeGetExplainExecInfo, optrDefaultGetNextExtFn, NULL);
765
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
111,888,646✔
766
  *pOptrInfo = pOperator;
111,894,134✔
767
  return TSDB_CODE_SUCCESS;
111,895,047✔
768

UNCOV
769
_error:
×
UNCOV
770
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
771
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
772
    pTaskInfo->code = code;
×
773
  }
UNCOV
774
  if (pInfo != NULL) {
×
UNCOV
775
    doDestroyExchangeOperatorInfo(pInfo);
×
776
  }
777

UNCOV
778
  if (pOperator != NULL) {
×
UNCOV
779
    pOperator->info = NULL;
×
UNCOV
780
    destroyOperator(pOperator);
×
781
  }
UNCOV
782
  return code;
×
783
}
784

785
void destroyExchangeOperatorInfo(void* param) {
111,900,334✔
786
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
111,900,334✔
787
  int32_t        code = taosRemoveRef(fetchObjRefPool, pExInfo->self);
111,900,334✔
788
  if (code != TSDB_CODE_SUCCESS) {
111,900,670✔
UNCOV
789
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
790
  }
791
}
111,900,670✔
792

793
void freeBlock(void* pParam) {
318,918,584✔
794
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
318,918,584✔
795
  blockDataDestroy(pBlock);
318,918,584✔
796
}
318,917,755✔
797

798
void freeSourceDataInfo(void* p) {
179,697,124✔
799
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
179,697,124✔
800
  taosMemoryFreeClear(pInfo->decompBuf);
179,697,124✔
801
  taosMemoryFreeClear(pInfo->pRsp);
179,699,395✔
802

803
  pInfo->decompBufSize = 0;
179,699,447✔
804
}
179,696,886✔
805

806
void doDestroyExchangeOperatorInfo(void* param) {
111,899,552✔
807
  if (param == NULL) {
111,899,552✔
UNCOV
808
    return;
×
809
  }
810
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
111,899,552✔
811
  if (pExInfo->pFetchRpcHandles) {
111,899,552✔
812
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
306,456,951✔
813
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
194,555,720✔
814
      if (*pRpcHandle > 0) {
194,556,186✔
815
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
1,179,808✔
816
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
1,179,808✔
817
      }
818
    }
819
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
111,901,550✔
820
  }
821

822
  taosArrayDestroy(pExInfo->pSources);
111,899,915✔
823
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
111,899,022✔
824

825
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
111,897,025✔
826
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
111,894,165✔
827

828
  blockDataDestroy(pExInfo->pDummyBlock);
111,897,373✔
829
  tSimpleHashCleanup(pExInfo->pHashSources);
111,898,615✔
830

831
  int32_t code = tsem_destroy(&pExInfo->ready);
111,898,486✔
832
  if (code != TSDB_CODE_SUCCESS) {
111,897,263✔
UNCOV
833
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
834
  }
835
  taosMemoryFreeClear(pExInfo->pTaskId);
111,897,263✔
836

837
  taosMemoryFreeClear(param);
111,893,539✔
838
}
839

840
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
259,825,402✔
841
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
259,825,402✔
842

843
  taosMemoryFreeClear(pMsg->pEpSet);
259,825,402✔
844
  SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pWrapper->exchangeId);
259,884,419✔
845
  if (pExchangeInfo == NULL) {
259,892,704✔
846
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
29,856✔
847
    taosMemoryFree(pMsg->pData);
29,856✔
848
    return TSDB_CODE_SUCCESS;
29,856✔
849
  }
850

851
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
259,862,848✔
852
  if (pWrapper->seqId != currSeqId) {
259,875,614✔
UNCOV
853
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
UNCOV
854
    taosMemoryFree(pMsg->pData);
×
855
    code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
×
UNCOV
856
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
857
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
858
    }
UNCOV
859
    return TSDB_CODE_SUCCESS;
×
860
  }
861

862
  int32_t          index = pWrapper->sourceIndex;
259,863,877✔
863

864
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
259,857,608✔
865

866
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
259,870,273✔
867
  if (pRpcHandle != NULL) {
259,827,642✔
868
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
259,838,401✔
869
    if (ret != 0) {
259,809,191✔
870
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
1,531,992✔
871
    }
872
    *pRpcHandle = -1;
259,809,191✔
873
  }
874

875
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
259,745,746✔
876
  if (!pSourceDataInfo) {
259,845,009✔
877
    return terrno;
×
878
  }
879

880
  if (0 == code && NULL == pMsg->pData) {
259,845,009✔
881
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
UNCOV
882
    code = TSDB_CODE_QRY_INVALID_MSG;
×
883
  }
884

885
  taosWLockLatch(&pSourceDataInfo->lock);
259,846,091✔
886
  if (code == TSDB_CODE_SUCCESS) {
259,882,426✔
887
    pSourceDataInfo->fetchCostUs += taosGetTimestampUs() - pSourceDataInfo->startTime;
259,849,280✔
888
    pSourceDataInfo->fetchTimes++;
259,794,919✔
889

890
    pSourceDataInfo->seqId = pWrapper->seqId;
259,681,928✔
891
    pSourceDataInfo->pRsp = pMsg->pData;
259,754,204✔
892

893
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
259,670,568✔
894
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
259,721,736✔
895
    pRsp->compLen = htonl(pRsp->compLen);
259,784,546✔
896
    pRsp->payloadLen = htonl(pRsp->payloadLen);
259,772,219✔
897
    pRsp->numOfCols = htonl(pRsp->numOfCols);
259,672,587✔
898
    pRsp->useconds = htobe64(pRsp->useconds);
259,666,038✔
899
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
259,530,698✔
900

901
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
259,592,076✔
902
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
903
  } else {
904
    taosMemoryFree(pMsg->pData);
8,254✔
905
    pSourceDataInfo->code = rpcCvtErrCode(code);
8,254✔
906
    if (pSourceDataInfo->code != code) {
8,254✔
UNCOV
907
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
908
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
909
    } else {
910
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
8,254✔
911
             pExchangeInfo);
912
    }
913
  }
914

915
  tmemory_barrier();
259,638,054✔
916
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
259,638,054✔
917
  taosWUnLockLatch(&pSourceDataInfo->lock);
259,668,118✔
918
  
919
  code = tsem_post(&pExchangeInfo->ready);
259,700,362✔
920
  if (code != TSDB_CODE_SUCCESS) {
259,808,491✔
UNCOV
921
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
UNCOV
922
    return code;
×
923
  }
924

925
  code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
259,808,491✔
926
  if (code != TSDB_CODE_SUCCESS) {
259,895,376✔
UNCOV
927
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
928
  }
929
  return code;
259,881,435✔
930
}
931

932
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
313,154✔
933
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
313,154✔
934
  if (NULL == *ppRes) {
313,154✔
UNCOV
935
    return terrno;
×
936
  }
937

938
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
313,154✔
939
  if (NULL == pScan) {
313,154✔
UNCOV
940
    taosMemoryFreeClear(*ppRes);
×
UNCOV
941
    return terrno;
×
942
  }
943

944
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
313,154✔
945
  pScan->pUidList = taosArrayDup(pUidList, NULL);
313,154✔
946
  if (NULL == pScan->pUidList) {
313,154✔
UNCOV
947
    taosMemoryFree(pScan);
×
UNCOV
948
    taosMemoryFreeClear(*ppRes);
×
949
    return terrno;
×
950
  }
951
  pScan->dynType = DYN_TYPE_STB_JOIN;
313,154✔
952
  pScan->tableSeq = tableSeq;
313,154✔
953
  pScan->pOrgTbInfo = NULL;
313,154✔
954
  pScan->pBatchTbInfo = NULL;
313,154✔
955
  pScan->pTagList = NULL;
313,154✔
956
  pScan->isNewParam = false;
313,154✔
957
  pScan->window.skey = INT64_MAX;
313,154✔
958
  pScan->window.ekey = INT64_MIN;
313,154✔
959
  pScan->notifyToProcess = false;
313,154✔
960
  pScan->notifyTs = 0;
313,154✔
961

962
  (*ppRes)->opType = srcOpType;
313,154✔
963
  (*ppRes)->downstreamIdx = 0;
313,154✔
964
  (*ppRes)->value = pScan;
313,154✔
965
  (*ppRes)->pChildren = NULL;
313,154✔
966
  (*ppRes)->reUse = false;
313,154✔
967

968
  return TSDB_CODE_SUCCESS;
313,154✔
969
}
970

971
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
46,675,195✔
972
  int32_t                  code = TSDB_CODE_SUCCESS;
46,675,195✔
973
  int32_t                  lino = 0;
46,675,195✔
974
  STableScanOperatorParam* pScan = NULL;
46,675,195✔
975

976
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
46,675,195✔
977
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
46,675,195✔
978

979
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
46,675,195✔
980
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
46,675,195✔
981

982
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
46,675,195✔
983
  if (pUidList) {
46,675,195✔
984
    pScan->pUidList = taosArrayDup(pUidList, NULL);
46,675,195✔
985
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
46,675,195✔
986
  } else {
UNCOV
987
    pScan->pUidList = NULL;
×
988
  }
989

990
  if (pMap) {
46,675,195✔
991
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
45,915,271✔
992
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
45,915,271✔
993

994
    pScan->pOrgTbInfo->vgId = pMap->vgId;
45,915,271✔
995
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
45,915,271✔
996

997
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
45,915,271✔
998
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
45,915,271✔
999
  } else {
1000
    pScan->pOrgTbInfo = NULL;
759,924✔
1001
  }
1002
  pScan->pTagList = NULL;
46,675,195✔
1003
  pScan->pBatchTbInfo = NULL;
46,675,195✔
1004

1005

1006
  pScan->dynType = type;
46,675,195✔
1007
  pScan->tableSeq = tableSeq;
46,675,195✔
1008
  pScan->window.skey = window->skey;
46,675,195✔
1009
  pScan->window.ekey = window->ekey;
46,675,195✔
1010
  pScan->isNewParam = isNewParam;
46,675,195✔
1011
  pScan->notifyToProcess = false;
46,675,195✔
1012
  pScan->notifyTs = 0;
46,675,195✔
1013
  (*ppRes)->opType = srcOpType;
46,675,195✔
1014
  (*ppRes)->downstreamIdx = 0;
46,675,195✔
1015
  (*ppRes)->value = pScan;
46,675,195✔
1016
  (*ppRes)->pChildren = NULL;
46,675,195✔
1017
  (*ppRes)->reUse = false;
46,675,195✔
1018

1019
  return code;
46,675,195✔
UNCOV
1020
_return:
×
UNCOV
1021
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
1022
  taosMemoryFreeClear(*ppRes);
×
UNCOV
1023
  if (pScan) {
×
UNCOV
1024
    taosArrayDestroy(pScan->pUidList);
×
UNCOV
1025
    if (pScan->pOrgTbInfo) {
×
UNCOV
1026
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
UNCOV
1027
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
1028
    }
UNCOV
1029
    taosMemoryFree(pScan);
×
1030
  }
UNCOV
1031
  return code;
×
1032
}
1033

1034
/**
1035
  @brief build the table scan operator param for notify message
1036
*/
1037
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
235,812✔
1038
                                          int32_t srcOpType, TSKEY notifyTs) {
1039
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
235,812✔
UNCOV
1040
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
UNCOV
1041
    return TSDB_CODE_INVALID_PARA;
×
1042
  }
1043
  int32_t code = TSDB_CODE_SUCCESS;
235,812✔
1044
  int32_t lino = 0;
235,812✔
1045
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
235,812✔
1046
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
235,812✔
1047

1048
  STableScanOperatorParam* pTsParam =
471,624✔
1049
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
235,812✔
1050
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
235,812✔
1051

1052
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
235,812✔
1053
  pTsParam->notifyToProcess = true;
235,812✔
1054
  pTsParam->notifyTs = notifyTs;
235,812✔
1055

1056
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
235,812✔
1057
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1058
  (*ppRes)->downstreamIdx = 0;
235,812✔
1059
  (*ppRes)->value = pTsParam;
235,812✔
1060
  (*ppRes)->pChildren = NULL;
235,812✔
1061
  /* param is not reusable when it is transferred by message */
1062
  (*ppRes)->reUse = false;
235,812✔
1063

1064
_return:
235,812✔
1065
  if (TSDB_CODE_SUCCESS != code) {
235,812✔
UNCOV
1066
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1067
           __func__, lino, tstrerror(code));
UNCOV
1068
    taosMemoryFreeClear(*ppRes);
×
UNCOV
1069
    if (pTsParam) {
×
UNCOV
1070
      taosMemoryFree(pTsParam);
×
1071
    }
1072
  }
1073
  return code;
235,812✔
1074
}
1075

1076
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
8,334,749✔
1077
  int32_t                  code = TSDB_CODE_SUCCESS;
8,334,749✔
1078
  int32_t                  lino = 0;
8,334,749✔
1079
  STableScanOperatorParam* pScan = NULL;
8,334,749✔
1080

1081
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
8,334,749✔
1082
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
8,334,749✔
1083

1084
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
8,334,749✔
1085
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
8,334,749✔
1086

1087
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
8,334,749✔
1088
  pScan->groupid = groupid;
8,334,749✔
1089
  if (pUidList) {
8,334,749✔
1090
    pScan->pUidList = taosArrayDup(pUidList, NULL);
8,334,749✔
1091
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
8,334,749✔
1092
  } else {
UNCOV
1093
    pScan->pUidList = NULL;
×
1094
  }
1095
  pScan->pOrgTbInfo = NULL;
8,334,749✔
1096

1097
  if (pBatchMap) {
8,334,749✔
1098
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
8,334,749✔
1099
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
8,334,749✔
1100
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
22,035,703✔
1101
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
13,700,954✔
1102
      SOrgTbInfo batchInfo = {0};
13,700,954✔
1103
      batchInfo.vgId = pSrcInfo->vgId;
13,700,954✔
1104
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
13,700,954✔
1105
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
13,700,954✔
1106
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
13,700,954✔
1107
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
13,700,954✔
1108
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
13,700,954✔
1109
    }
1110
  } else {
UNCOV
1111
    pScan->pBatchTbInfo = NULL;
×
1112
  }
1113

1114
  if (pTagList) {
8,334,749✔
1115
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
3,633,222✔
1116
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
3,633,222✔
1117

1118
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
24,209,676✔
1119
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
20,576,454✔
1120
      STagVal  dstTag;
20,576,454✔
1121
      dstTag.type = pSrcTag->type;
20,576,454✔
1122
      dstTag.cid = pSrcTag->cid;
20,576,454✔
1123
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
20,576,454✔
1124
        dstTag.nData = pSrcTag->nData;
9,025,146✔
1125
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
9,025,146✔
1126
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
9,025,146✔
1127
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
9,025,146✔
1128
      } else {
1129
        dstTag.i64 = pSrcTag->i64;
11,551,308✔
1130
      }
1131

1132
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
41,152,908✔
1133
    }
1134
  } else {
1135
    pScan->pTagList = NULL;
4,701,527✔
1136
  }
1137

1138

1139
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
8,334,749✔
1140
  pScan->tableSeq = tableSeq;
8,334,749✔
1141
  pScan->window.skey = window->skey;
8,334,749✔
1142
  pScan->window.ekey = window->ekey;
8,334,749✔
1143
  pScan->isNewParam = isNewParam;
8,334,749✔
1144
  pScan->notifyToProcess = false;
8,334,749✔
1145
  pScan->notifyTs = 0;
8,334,749✔
1146
  (*ppRes)->opType = srcOpType;
8,334,749✔
1147
  (*ppRes)->downstreamIdx = 0;
8,334,749✔
1148
  (*ppRes)->value = pScan;
8,334,749✔
1149
  (*ppRes)->pChildren = NULL;
8,334,749✔
1150
  (*ppRes)->reUse = false;
8,334,749✔
1151

1152
  return code;
8,334,749✔
UNCOV
1153
_return:
×
UNCOV
1154
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
1155
  taosMemoryFreeClear(*ppRes);
×
UNCOV
1156
  if (pScan) {
×
UNCOV
1157
    taosArrayDestroy(pScan->pUidList);
×
UNCOV
1158
    if (pScan->pBatchTbInfo) {
×
UNCOV
1159
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1160
    }
UNCOV
1161
    taosMemoryFree(pScan);
×
1162
  }
UNCOV
1163
  return code;
×
1164
}
1165

1166
/*
1167
 * Build hash-agg operator get-param for dynamic virtual-table aggregation.
1168
 *
1169
 * @param ppRes Output operator param.
1170
 * @param groupid Group id bound to this batch.
1171
 * @param pUidList Source table uid list.
1172
 * @param pBatchMap Batch table metadata map.
1173
 * @param pTagList Optional tag value list.
1174
 * @param tableSeq Whether to keep table-sequential mode.
1175
 * @param window Time window for downstream scan.
1176
 * @param isNewParam Whether downstream should treat this as a new param batch.
1177
 *
1178
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1179
 */
1180
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
7,466,791✔
1181
                              SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1182
  int32_t                  code = TSDB_CODE_SUCCESS;
7,466,791✔
1183
  int32_t                  lino = 0;
7,466,791✔
1184
  SOperatorParam*          pParam = NULL;
7,466,791✔
1185

1186
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
7,466,791✔
1187
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
7,466,791✔
1188

1189
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
7,466,791✔
1190
  pParam->downstreamIdx = 0;
7,466,791✔
1191
  pParam->value = NULL;
7,466,791✔
1192
  pParam->pChildren = NULL;
7,466,791✔
1193
  pParam->reUse = false;
7,466,791✔
1194

1195
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
7,466,791✔
1196
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
7,466,791✔
1197

1198
  SOperatorParam* pTableScanParam = NULL;
7,466,791✔
1199
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
7,466,791✔
1200
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1201
  QUERY_CHECK_CODE(code, lino, _return);
7,466,791✔
1202

1203
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
14,933,582✔
1204

1205
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
7,466,791✔
1206
  pParam->downstreamIdx = 0;
7,466,791✔
1207
  pParam->value = NULL;
7,466,791✔
1208
  pParam->reUse = false;
7,466,791✔
1209

1210
  *ppRes = pParam;
7,466,791✔
1211
  return code;
7,466,791✔
1212

UNCOV
1213
_return:
×
UNCOV
1214
  freeOperatorParam(pParam, OP_GET_PARAM);
×
UNCOV
1215
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
1216
  return code;
×
1217
}
1218

1219
/*
1220
 * Build hash-interval operator get-param for dynamic virtual-table interval query.
1221
 *
1222
 * @param ppRes Output operator param.
1223
 * @param groupid Group id bound to this batch.
1224
 * @param pUidList Source table uid list.
1225
 * @param pBatchMap Batch table metadata map.
1226
 * @param pTagList Optional tag value list.
1227
 * @param tableSeq Whether to keep table-sequential mode.
1228
 * @param window Time window for downstream scan.
1229
 * @param isNewParam Whether downstream should treat this as a new param batch.
1230
 *
1231
 * @return TSDB_CODE_SUCCESS on success, otherwise error code.
1232
 */
1233
static int32_t buildIntervalOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, SArray* pBatchMap,
68,324✔
1234
                                          SArray* pTagList, bool tableSeq, STimeWindow* window, bool isNewParam) {
1235
  int32_t         code = TSDB_CODE_SUCCESS;
68,324✔
1236
  int32_t         lino = 0;
68,324✔
1237
  SOperatorParam* pParam = NULL;
68,324✔
1238

1239
  pParam = taosMemoryCalloc(1, sizeof(SOperatorParam));
68,324✔
1240
  QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
68,324✔
1241

1242
  pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL;
68,324✔
1243
  pParam->downstreamIdx = 0;
68,324✔
1244
  pParam->value = NULL;
68,324✔
1245
  pParam->reUse = false;
68,324✔
1246
  pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
68,324✔
1247
  QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
68,324✔
1248

1249
  SOperatorParam* pTableScanParam = NULL;
68,324✔
1250
  code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
68,324✔
1251
                                              pBatchMap, pTagList, tableSeq, window, isNewParam);
1252
  QUERY_CHECK_CODE(code, lino, _return);
68,324✔
1253

1254
  QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
136,648✔
1255

1256
  *ppRes = pParam;
68,324✔
1257
  return code;
68,324✔
1258

UNCOV
1259
_return:
×
UNCOV
1260
  freeOperatorParam(pParam, OP_GET_PARAM);
×
UNCOV
1261
  qError("%s failed at %d, failed to build interval scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
1262
  return code;
×
1263
}
1264

1265
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
4,887,212✔
1266
  int32_t                  code = TSDB_CODE_SUCCESS;
4,887,212✔
1267
  int32_t                  lino = 0;
4,887,212✔
1268
  STagScanOperatorParam*   pScan = NULL;
4,887,212✔
1269

1270
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
4,887,212✔
1271
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
4,887,212✔
1272

1273
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
4,887,212✔
1274
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
4,887,212✔
1275
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
4,887,212✔
1276

1277
  (*ppRes)->opType = srcOpType;
4,887,212✔
1278
  (*ppRes)->downstreamIdx = 0;
4,887,212✔
1279
  (*ppRes)->value = pScan;
4,887,212✔
1280
  (*ppRes)->pChildren = NULL;
4,887,212✔
1281
  (*ppRes)->reUse = false;
4,887,212✔
1282

1283
  return code;
4,887,212✔
1284
_return:
×
UNCOV
1285
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
1286
  taosMemoryFreeClear(*ppRes);
×
UNCOV
1287
  if (pScan) {
×
UNCOV
1288
    taosMemoryFree(pScan);
×
1289
  }
UNCOV
1290
  return code;
×
1291
}
1292

1293
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,976,735✔
1294
  if (!pRuntimeInfo || !pTimeRange) {
4,976,735✔
UNCOV
1295
    return TSDB_CODE_INTERNAL_ERROR;
×
1296
  }
1297

1298
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,976,735✔
1299
  if (!pParam) {
4,976,735✔
UNCOV
1300
    return TSDB_CODE_INTERNAL_ERROR;
×
1301
  }
1302

1303
  switch (pRuntimeInfo->triggerType) {
4,976,735✔
1304
    case STREAM_TRIGGER_SLIDING:
3,808,276✔
1305
      // Unable to distinguish whether there is an interval, all use wstart/wend
1306
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1307
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,808,276✔
1308
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,808,276✔
1309
      break;
3,808,276✔
1310
    case STREAM_TRIGGER_PERIOD:
273,535✔
1311
      pTimeRange->skey = pParam->prevLocalTime;
273,535✔
1312
      pTimeRange->ekey = pParam->triggerTime;
273,535✔
1313
      break;
273,535✔
1314
    default:
894,924✔
1315
      pTimeRange->skey = pParam->wstart;
894,924✔
1316
      pTimeRange->ekey = pParam->wend;
895,156✔
1317
      break;
894,924✔
1318
  }
1319

1320
  return TSDB_CODE_SUCCESS;
4,976,735✔
1321
}
1322

1323
void clearVtbScanDataInfo(void* pItem) {
66,186,811✔
1324
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
66,186,811✔
1325
  if (pInfo->orgTbInfo) {
66,186,811✔
1326
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
45,915,271✔
1327
    taosMemoryFreeClear(pInfo->orgTbInfo);
45,915,271✔
1328
  }
1329
  if (pInfo->batchOrgTbInfo) {
66,186,811✔
1330
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
22,035,703✔
1331
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
13,700,954✔
1332
      if (pColMap) {
13,700,954✔
1333
        taosArrayDestroy(pColMap->colMap);
13,700,954✔
1334
      }
1335
    }
1336
    taosArrayDestroy(pInfo->batchOrgTbInfo);
8,334,749✔
1337
    pInfo->batchOrgTbInfo = NULL;
8,334,749✔
1338
  }
1339
  if (pInfo->tagList) {
66,186,811✔
1340
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
3,633,222✔
1341
    pInfo->tagList = NULL;
3,633,222✔
1342
  }
1343
  if (pInfo->pSrcUidList) {
66,186,811✔
1344
    taosArrayDestroy(pInfo->pSrcUidList);
54,250,020✔
1345
    pInfo->pSrcUidList = NULL;
54,250,020✔
1346
  }
1347
}
66,186,811✔
1348

1349
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
275,735,225✔
1350
  int32_t          code = TSDB_CODE_SUCCESS;
275,735,225✔
1351
  int32_t          lino = 0;
275,735,225✔
1352
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
275,735,225✔
1353
  QUERY_CHECK_NULL(pDataInfo, code, lino, _end, terrno);
275,728,193✔
1354

1355
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
275,728,193✔
1356
    return TSDB_CODE_SUCCESS;
15,708,612✔
1357
  }
1358

1359
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
260,024,208✔
1360
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
260,025,251✔
1361
  QUERY_CHECK_NULL(pSource, code, lino, _end, terrno);
260,018,758✔
1362

1363
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
260,018,758✔
1364

1365
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
260,021,459✔
1366
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
260,021,324✔
1367
  pWrapper->exchangeId = pExchangeInfo->self;
260,021,324✔
1368
  pWrapper->sourceIndex = sourceIndex;
260,015,750✔
1369
  pWrapper->seqId = pExchangeInfo->seqId;
260,024,257✔
1370

1371
  if (pSource->localExec) {
260,019,156✔
UNCOV
1372
    SDataBuf pBuf = {0};
×
1373
    code =
UNCOV
1374
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1375
                                  pTaskInfo->id.queryId, pSource->clientId,
1376
                                  pSource->taskId, 0, pSource->execId,
1377
                                  &pBuf.pData,
1378
                                  pTaskInfo->localFetch.explainRes);
UNCOV
1379
    QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
1380
    pDataInfo->startTime = taosGetTimestampUs();
×
UNCOV
1381
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
UNCOV
1382
    taosMemoryFreeClear(pWrapper);
×
UNCOV
1383
    QUERY_CHECK_CODE(code, lino, _end);
×
1384
  } else {
1385
    bool needStreamRtInfo = true;
260,020,491✔
1386
    bool needStreamGrpInfo = false;
260,020,491✔
1387
    SResFetchReq req = {0};
260,020,491✔
1388
    req.header.vgId = pSource->addr.nodeId;
260,020,484✔
1389
    req.sId = pSource->sId;
260,020,837✔
1390
    req.clientId = pSource->clientId;
260,018,424✔
1391
    req.taskId = pSource->taskId;
260,013,797✔
1392
    req.queryId = pTaskInfo->id.queryId;
260,013,300✔
1393
    req.execId = pSource->execId;
260,017,758✔
1394
    if (pTaskInfo->pStreamRuntimeInfo) {
260,015,314✔
1395
      req.dynTbname = pExchangeInfo->dynTbname;
15,301,982✔
1396
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
15,302,214✔
1397
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
15,301,982✔
1398

1399
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
15,301,982✔
1400
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
1,680,422✔
1401
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
13,621,121✔
1402
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,976,735✔
1403
        QUERY_CHECK_CODE(code, lino, _end);
4,976,735✔
1404
        needStreamRtInfo = false;
4,976,735✔
1405
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,976,735✔
1406
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1407
               req.pStRtFuncInfo->curWindow.ekey);
1408
      } else {
1409
        needStreamGrpInfo = true;
8,643,899✔
1410
      }
1411
      
1412
      if (!pDataInfo->fetchSent) {
15,301,056✔
1413
        req.reset = pDataInfo->fetchSent = true;
8,739,233✔
1414
      }
1415
    }
1416

1417
    switch (pDataInfo->type) {
260,010,014✔
1418
      case EX_SRC_TYPE_VSTB_SCAN: {
45,915,271✔
1419
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
45,915,271✔
1420
        clearVtbScanDataInfo(pDataInfo);
45,915,271✔
1421
        QUERY_CHECK_CODE(code, lino, _end);
45,915,271✔
1422
        break;
45,915,271✔
1423
      }
1424
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
1,612,644✔
1425
        if (pDataInfo->pSrcUidList) {
1,612,644✔
1426
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
759,924✔
1427
          taosArrayDestroy(pDataInfo->pSrcUidList);
759,924✔
1428
          pDataInfo->pSrcUidList = NULL;
759,924✔
1429
          QUERY_CHECK_CODE(code, lino, _end);
759,924✔
1430
        }
1431
        break;
1,612,644✔
1432
      }
1433
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
4,887,212✔
1434
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
4,887,212✔
1435
        taosArrayDestroy(pDataInfo->pSrcUidList);
4,887,212✔
1436
        pDataInfo->pSrcUidList = NULL;
4,887,212✔
1437
        QUERY_CHECK_CODE(code, lino, _end);
4,887,212✔
1438
        break;
4,887,212✔
1439
      }
1440
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
1,880,760✔
1441
      case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
1442
      case EX_SRC_TYPE_VSTB_TS_SCAN: {
1443
        if (pDataInfo->batchOrgTbInfo) {
1,880,760✔
1444
          int32_t srcOpType =
799,634✔
1445
              (pDataInfo->type == EX_SRC_TYPE_VSTB_TS_SCAN)
799,634✔
1446
                  ? QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
1447
                  : QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
799,634✔
1448
          code = buildTableScanOperatorParamBatchInfo(
1,599,268✔
1449
              &req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, srcOpType,
1450
              pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window,
799,634✔
1451
              pDataInfo->isNewParam);
799,634✔
1452
          clearVtbScanDataInfo(pDataInfo);
799,634✔
1453
          QUERY_CHECK_CODE(code, lino, _end);
799,634✔
1454
        }
1455
        break;
1,880,760✔
1456
      }
1457
      case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN: {
68,324✔
1458
        if (pDataInfo->batchOrgTbInfo) {
68,324✔
1459
          code = buildIntervalOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
136,648✔
1460
                                            pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq,
68,324✔
1461
                                            &pDataInfo->window, pDataInfo->isNewParam);
68,324✔
1462
          clearVtbScanDataInfo(pDataInfo);
68,324✔
1463
          QUERY_CHECK_CODE(code, lino, _end);
68,324✔
1464
        }
1465
        break;
68,324✔
1466
      }
1467
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
7,466,791✔
1468
        if (pDataInfo->batchOrgTbInfo) {
7,466,791✔
1469
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList,
14,933,582✔
1470
                                       pDataInfo->batchOrgTbInfo, pDataInfo->tagList,
1471
                                       pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam);
14,933,582✔
1472
          clearVtbScanDataInfo(pDataInfo);
7,466,791✔
1473
          QUERY_CHECK_CODE(code, lino, _end);
7,466,791✔
1474
        }
1475
        break;
7,466,791✔
1476
      }
1477
      case EX_SRC_TYPE_STB_JOIN_SCAN:
198,186,234✔
1478
      default: {
1479
        if (pDataInfo->pSrcUidList) {
198,186,234✔
1480
          code = buildTableScanOperatorParam(&req.pOpParam,
300,749✔
1481
                                             pDataInfo->pSrcUidList,
1482
                                             pDataInfo->srcOpType,
1483
                                             pDataInfo->tableSeq);
300,749✔
1484
          /* source uid list can be reused in vnode size, so only use once */
1485
          taosArrayDestroy(pDataInfo->pSrcUidList);
300,749✔
1486
          pDataInfo->pSrcUidList = NULL;
300,749✔
1487
          QUERY_CHECK_CODE(code, lino, _end);
300,749✔
1488
        }
1489
        if (pExchangeInfo->notifyToSend) {
198,194,313✔
1490
          if (NULL == req.pOpParam) {
235,812✔
1491
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
235,812✔
1492
                                                     pDataInfo->srcOpType,
1493
                                                     pExchangeInfo->notifyTs);
1494
            QUERY_CHECK_CODE(code, lino, _end);
235,812✔
1495
          } else {
1496
            /**
1497
              Currently don't support use the same param for multiple times!
1498
            */
UNCOV
1499
            qError("%s, %s failed, currently don't support use the same param "
×
1500
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
UNCOV
1501
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1502
            taosMemoryFree(pWrapper);
×
UNCOV
1503
            return pTaskInfo->code;
×
1504
          }
1505
          pExchangeInfo->notifyToSend = false;
235,812✔
1506
        }
1507
        break;
198,192,766✔
1508
      }
1509
    }
1510

1511
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamRtInfo, needStreamGrpInfo);
260,023,768✔
1512
    if (msgSize < 0) {
259,998,525✔
UNCOV
1513
      pTaskInfo->code = msgSize;
×
UNCOV
1514
      taosMemoryFree(pWrapper);
×
UNCOV
1515
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
UNCOV
1516
      return pTaskInfo->code;
×
1517
    }
1518

1519
    void* msg = taosMemoryCalloc(1, msgSize);
259,998,525✔
1520
    if (NULL == msg) {
259,989,800✔
UNCOV
1521
      pTaskInfo->code = terrno;
×
UNCOV
1522
      taosMemoryFree(pWrapper);
×
UNCOV
1523
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
UNCOV
1524
      return pTaskInfo->code;
×
1525
    }
1526

1527
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamRtInfo, needStreamGrpInfo);
259,989,800✔
1528
    if (msgSize < 0) {
260,008,610✔
1529
      pTaskInfo->code = msgSize;
×
UNCOV
1530
      taosMemoryFree(pWrapper);
×
UNCOV
1531
      taosMemoryFree(msg);
×
UNCOV
1532
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
UNCOV
1533
      return pTaskInfo->code;
×
1534
    }
1535

1536
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
260,008,610✔
1537

1538
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
260,009,025✔
1539
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1540
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1541
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1542

1543
    // send the fetch remote task result reques
1544
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
260,028,354✔
1545
    if (NULL == pMsgSendInfo) {
260,021,659✔
UNCOV
1546
      taosMemoryFreeClear(msg);
×
1547
      taosMemoryFree(pWrapper);
×
1548
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1549
      pTaskInfo->code = terrno;
×
1550
      return pTaskInfo->code;
×
1551
    }
1552

1553
    pMsgSendInfo->param = pWrapper;
260,021,659✔
1554
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
260,026,131✔
1555
    pMsgSendInfo->msgInfo.pData = msg;
260,023,971✔
1556
    pMsgSendInfo->msgInfo.len = msgSize;
260,022,527✔
1557
    pMsgSendInfo->msgType = pSource->fetchMsgType;
260,024,885✔
1558
    pMsgSendInfo->fp = loadRemoteDataCallback;
260,024,663✔
1559
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
260,022,958✔
1560

1561
    int64_t transporterId = 0;
260,021,962✔
1562
    void* poolHandle = NULL;
260,021,035✔
1563
    pDataInfo->startTime = taosGetTimestampUs();
260,026,197✔
1564
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
260,020,831✔
1565
    QUERY_CHECK_CODE(code, lino, _end);
260,028,521✔
1566
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
260,028,521✔
1567
    *pRpcHandle = transporterId;
260,030,158✔
1568
  }
1569

1570
_end:
260,030,034✔
1571
  if (code != TSDB_CODE_SUCCESS) {
260,030,034✔
1572
    if (pWrapper) {
×
1573
      taosMemoryFree(pWrapper);
×
1574
    }
1575
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1576
  }
1577
  return code;
260,029,673✔
1578
}
1579

1580
/**
1581
  @brief record the data loading metrics of the exchange operator, including
1582
  the number of rows, the data length, and the elapsed time of current load operation.
1583
*/
1584
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows,
195,819,352✔
1585
                          int32_t dataLen, int64_t startTs, SOperatorInfo* pOperator) {
1586
  pInfo->totalRows += numOfRows;
195,819,352✔
1587
  pInfo->totalSize += dataLen;
195,818,822✔
1588
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
195,820,026✔
1589
}
195,819,352✔
1590

1591
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan) {
541,066,704✔
1592
  int32_t      code = TSDB_CODE_SUCCESS;
541,066,704✔
1593
  int32_t      lino = 0;
541,066,704✔
1594
  SSDataBlock* pBlock = NULL;
541,066,704✔
1595
  if (isVstbScan) {
541,067,898✔
1596
    blockDataCleanup(pRes);
32,470,884✔
1597
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
32,470,884✔
1598
    QUERY_CHECK_CODE(code, lino, _end);
32,469,268✔
1599
  }
1600
  if (pColList == NULL) {  // data from other sources
541,066,282✔
1601
    blockDataCleanup(pRes);
535,963,298✔
1602
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
535,960,211✔
1603
    QUERY_CHECK_CODE(code, lino, _end);
535,961,633✔
1604
  } else {  // extract data according to pColList
1605
    char* pStart = pData;
5,102,984✔
1606

1607
    int32_t numOfCols = htonl(*(int32_t*)pStart);
5,102,984✔
1608
    pStart += sizeof(int32_t);
5,102,984✔
1609

1610
    // todo refactor:extract method
1611
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
5,102,984✔
1612
    for (int32_t i = 0; i < numOfCols; ++i) {
71,935,990✔
1613
      SSysTableSchema* p = (SSysTableSchema*)pStart;
66,833,006✔
1614

1615
      p->colId = htons(p->colId);
66,833,006✔
1616
      p->bytes = htonl(p->bytes);
66,833,006✔
1617
      pStart += sizeof(SSysTableSchema);
66,833,006✔
1618
    }
1619

1620
    pBlock = NULL;
5,102,984✔
1621
    code = createDataBlock(&pBlock);
5,102,984✔
1622
    QUERY_CHECK_CODE(code, lino, _end);
5,102,984✔
1623

1624
    for (int32_t i = 0; i < numOfCols; ++i) {
71,935,459✔
1625
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
66,832,088✔
1626
      code = blockDataAppendColInfo(pBlock, &idata);
66,832,190✔
1627
      QUERY_CHECK_CODE(code, lino, _end);
66,832,475✔
1628
    }
1629

1630
    code = blockDecodeInternal(pBlock, pStart, NULL);
5,103,371✔
1631
    QUERY_CHECK_CODE(code, lino, _end);
5,102,840✔
1632

1633
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
5,102,840✔
1634
    QUERY_CHECK_CODE(code, lino, _end);
5,102,840✔
1635

1636
    // data from mnode
1637
    pRes->info.dataLoad = 1;
5,102,840✔
1638
    pRes->info.rows = pBlock->info.rows;
5,102,984✔
1639
    pRes->info.scanFlag = pBlock->info.scanFlag;
5,102,840✔
1640
    pRes->info.id.groupId = pBlock->info.id.groupId;
5,102,840✔
1641
    pRes->info.id.baseGId = pBlock->info.id.baseGId;
5,102,840✔
1642
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
5,102,840✔
1643
    QUERY_CHECK_CODE(code, lino, _end);
5,102,984✔
1644

1645
    blockDataDestroy(pBlock);
5,102,984✔
1646
    pBlock = NULL;
5,102,984✔
1647
  }
1648

1649
_end:
541,064,617✔
1650
  if (code != TSDB_CODE_SUCCESS) {
541,064,250✔
UNCOV
1651
    blockDataDestroy(pBlock);
×
UNCOV
1652
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1653
  }
1654
  return code;
541,064,250✔
1655
}
1656

1657
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
104,239,702✔
1658
  SExchangeInfo* pExchangeInfo = pOperator->info;
104,239,702✔
1659
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
104,239,702✔
1660

1661
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
104,239,702✔
1662
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
104,239,702✔
1663
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
104,239,257✔
1664
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1665
         pLoadInfo->totalElapsed / 1000.0);
1666

1667
  setOperatorCompleted(pOperator);
104,239,257✔
1668
}
104,239,025✔
1669

1670
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
324,298,893✔
1671
  int32_t code = TSDB_CODE_SUCCESS;
324,298,893✔
1672
  int32_t lino = 0;
324,298,893✔
1673
  size_t  total = taosArrayGetSize(pArray);
324,298,893✔
1674

1675
  int32_t completed = 0;
324,298,710✔
1676
  for (int32_t k = 0; k < total; ++k) {
1,038,108,525✔
1677
    SSourceDataInfo* p = taosArrayGet(pArray, k);
713,813,574✔
1678
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
713,812,676✔
1679
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
713,812,676✔
1680
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
348,020,738✔
1681
      completed += 1;
348,020,502✔
1682
    }
1683
  }
1684

1685
  *pRes = completed;
324,294,951✔
1686
_end:
324,296,208✔
1687
  if (code != TSDB_CODE_SUCCESS) {
324,296,208✔
UNCOV
1688
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1689
  }
1690
  return code;
324,296,002✔
1691
}
1692

1693
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
115,774,304✔
1694
  SExchangeInfo* pExchangeInfo = pOperator->info;
115,774,304✔
1695
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
115,774,313✔
1696

1697
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
115,772,434✔
1698
  int64_t startTs = taosGetTimestampUs();
115,763,342✔
1699

1700
  // Asynchronously send all fetch requests to all sources.
1701
  for (int32_t i = 0; i < totalSources; ++i) {
312,234,215✔
1702
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
196,457,733✔
1703
    if (code != TSDB_CODE_SUCCESS) {
196,470,873✔
UNCOV
1704
      pTaskInfo->code = code;
×
UNCOV
1705
      return code;
×
1706
    }
1707
  }
1708

1709
  int64_t endTs = taosGetTimestampUs();
115,776,020✔
1710
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
115,776,020✔
1711
         totalSources, (endTs - startTs) / 1000.0);
1712

1713
  pOperator->status = OP_RES_TO_RETURN;
115,776,020✔
1714
  if (isTaskKilled(pTaskInfo)) {
115,775,252✔
UNCOV
1715
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1716
  }
1717

1718
  return TSDB_CODE_SUCCESS;
115,775,636✔
1719
}
1720

1721
/**
1722
  @brief store STEP DONE notification info
1723
*/
1724
void storeNotifyInfo(SOperatorInfo* pOperator) {
4,274,544✔
1725
  SExchangeInfo*  pExchangeInfo = pOperator->info;
4,274,544✔
1726
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
4,274,544✔
1727
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,274,544✔
1728

1729
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
4,274,544✔
1730
  if (!pParam->multiParams) {
4,274,544✔
1731
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
4,274,544✔
1732
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
4,274,544✔
1733
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1734
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
UNCOV
1735
      return;
×
1736
    }
1737

1738
    pExchangeInfo->notifyToSend = true;
4,274,544✔
1739
    pExchangeInfo->notifyTs = pBasic->notifyTs;
4,274,544✔
1740
  } else {
UNCOV
1741
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1742
           GET_TASKID(pTaskInfo), __func__);
1743
  }
1744
}
1745

1746
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
190,716,869✔
1747
  int32_t            code = TSDB_CODE_SUCCESS;
190,716,869✔
1748
  int32_t            lino = 0;
190,716,869✔
1749
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
190,716,869✔
1750
  SSDataBlock*       pb = NULL;
190,717,042✔
1751

1752
  char* pNextStart = pRetrieveRsp->data;
190,716,658✔
1753
  char* pStart = pNextStart;
190,716,147✔
1754

1755
  int32_t index = 0;
190,716,147✔
1756

1757
  if (pRetrieveRsp->compressed) {  // decompress the data
190,716,147✔
UNCOV
1758
    if (pDataInfo->decompBuf == NULL) {
×
UNCOV
1759
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
UNCOV
1760
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
1761
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1762
    } else {
1763
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
UNCOV
1764
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
UNCOV
1765
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
UNCOV
1766
        if (p != NULL) {
×
UNCOV
1767
          pDataInfo->decompBuf = p;
×
UNCOV
1768
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1769
        }
1770
      }
1771
    }
1772
  }
1773

1774
  while (index++ < pRetrieveRsp->numOfBlocks) {
726,684,268✔
1775
    pStart = pNextStart;
535,966,642✔
1776

1777
    if (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN) {
535,966,642✔
1778
      pb = taosMemoryCalloc(1, sizeof(SSDataBlock));
32,470,884✔
1779
      QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
32,470,884✔
1780
    } else if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
503,495,850✔
1781
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
217,045,999✔
1782
      blockDataCleanup(pb);
217,045,999✔
1783
    } else {
1784
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
286,450,454✔
1785
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
286,449,501✔
1786
    }
1787

1788
    int32_t compLen = *(int32_t*)pStart;
535,966,407✔
1789
    pStart += sizeof(int32_t);
535,966,113✔
1790

1791
    int32_t rawLen = *(int32_t*)pStart;
535,966,113✔
1792
    pStart += sizeof(int32_t);
535,966,660✔
1793
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
535,965,950✔
1794

1795
    pNextStart = pStart + compLen;
535,965,950✔
1796
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
535,966,161✔
UNCOV
1797
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
UNCOV
1798
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
1799
      pStart = pDataInfo->decompBuf;
×
1800
    }
1801

1802
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart, (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN));
535,965,791✔
1803
    if (code != 0) {
535,957,043✔
UNCOV
1804
      taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
1805
      goto _end;
×
1806
    }
1807

1808
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
535,957,043✔
1809
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
535,961,888✔
1810
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
535,961,888✔
1811
    pb = NULL;
535,967,226✔
1812
  }
1813

1814
_end:
190,716,534✔
1815
  if (code != TSDB_CODE_SUCCESS) {
190,716,534✔
UNCOV
1816
    blockDataDestroy(pb);
×
UNCOV
1817
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1818
  }
1819
  return code;
190,716,534✔
1820
}
1821

1822
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
51,114,313✔
1823
  SExchangeInfo* pExchangeInfo = pOperator->info;
51,114,313✔
1824
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
51,114,313✔
1825

1826
  int32_t code = 0;
51,114,313✔
1827
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
51,114,313✔
1828
  int64_t startTs = taosGetTimestampUs();
51,114,313✔
1829

1830
  int32_t vgId = 0;
51,114,313✔
1831
  if (pExchangeInfo->dynTbname) {
51,114,313✔
1832
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
261,471✔
1833
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
261,471✔
1834
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
261,471✔
1835
      if (pValue != NULL && pValue->isTbname) {
261,471✔
1836
        vgId = pValue->vgId;
261,471✔
1837
        break;
261,471✔
1838
      }
1839
    }
1840
  }
1841

1842
  while (1) {
13,627,499✔
1843
    if (pExchangeInfo->current >= totalSources) {
64,741,812✔
1844
      setAllSourcesCompleted(pOperator);
13,494,441✔
1845
      return TSDB_CODE_SUCCESS;
13,494,441✔
1846
    }
1847

1848
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
51,247,371✔
1849
    if (!pSource) {
51,247,371✔
UNCOV
1850
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1851
      pTaskInfo->code = terrno;
×
UNCOV
1852
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1853
    }
1854

1855
    if (vgId != 0 && pSource->addr.nodeId != vgId){
51,247,371✔
1856
      pExchangeInfo->current += 1;
222,753✔
1857
      continue;
222,753✔
1858
    }
1859

1860
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
51,024,618✔
1861
    if (!pDataInfo) {
51,024,618✔
UNCOV
1862
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
1863
      pTaskInfo->code = terrno;
×
UNCOV
1864
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1865
    }
1866
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
51,024,618✔
1867

1868
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
51,024,618✔
1869
    if (code != TSDB_CODE_SUCCESS) {
51,024,618✔
UNCOV
1870
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
1871
      pTaskInfo->code = code;
×
UNCOV
1872
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1873
    }
1874

1875
    while (true) {
403✔
1876
      recordOpExecBeforeDownstream(pOperator);
51,025,021✔
1877
      code = exchangeWait(pOperator, pExchangeInfo);
51,025,021✔
1878
      recordOpExecAfterDownstream(pOperator, 0);
51,025,021✔
1879

1880
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
51,025,021✔
1881
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
403✔
1882
      }
1883

1884
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
51,024,618✔
1885
      if (pDataInfo->seqId != currSeqId) {
51,024,618✔
1886
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
403✔
1887
        taosMemoryFreeClear(pDataInfo->pRsp);
403✔
1888
        continue;
403✔
1889
      }
1890

1891
      break;
51,024,215✔
1892
    }
1893

1894
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
51,024,215✔
1895
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
1,291✔
1896
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1897
             tstrerror(pDataInfo->code));
1898
      pOperator->pTaskInfo->code = pDataInfo->code;
1,291✔
1899
      return pOperator->pTaskInfo->code;
1,291✔
1900
    }
1901

1902
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
51,022,924✔
1903
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
51,022,924✔
1904

1905
    if (pRsp->numOfRows == 0) {
51,022,924✔
1906
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
13,404,746✔
1907
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1908
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1909
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1910

1911
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
13,404,746✔
1912
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
13,404,746✔
1913
        pExchangeInfo->current = totalSources;
13,297,925✔
1914
      } else {
1915
        pExchangeInfo->current += 1;
106,821✔
1916
      }
1917
      taosMemoryFreeClear(pDataInfo->pRsp);
13,404,746✔
1918
      continue;
13,404,746✔
1919
    }
1920

1921
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
37,618,178✔
1922
    if (code != TSDB_CODE_SUCCESS) {
37,618,178✔
UNCOV
1923
      goto _error;
×
1924
    }
1925

1926
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
37,618,178✔
1927
    if (pRsp->completed == 1) {
37,618,178✔
1928
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
188,985✔
1929
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1930
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1931
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1932
             pExchangeInfo->current + 1, totalSources);
1933

1934
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
188,985✔
1935
      if (isVstbScan(pDataInfo)) {
188,985✔
UNCOV
1936
        pExchangeInfo->current = totalSources;
×
1937
      } else {
1938
        pExchangeInfo->current += 1;
188,985✔
1939
      }
1940
    } else {
1941
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
37,429,193✔
1942
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1943
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1944
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1945
    }
1946
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
37,618,178✔
1947
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
37,283,125✔
1948
    }
1949
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
37,618,178✔
1950
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
37,618,178✔
1951

1952
    taosMemoryFreeClear(pDataInfo->pRsp);
37,618,178✔
1953
    return TSDB_CODE_SUCCESS;
37,618,178✔
1954
  }
1955

UNCOV
1956
_error:
×
UNCOV
1957
  pTaskInfo->code = code;
×
UNCOV
1958
  return code;
×
1959
}
1960

1961
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
8,334,749✔
1962
  int32_t  code = TSDB_CODE_SUCCESS;
8,334,749✔
1963
  int32_t  lino = 0;
8,334,749✔
1964
  STagVal  dstTag;
8,334,749✔
1965
  bool     needFree = false;
8,334,749✔
1966

1967
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
8,334,749✔
UNCOV
1968
    qError("%s failed since invalid exchange operator param type %d",
×
1969
      __func__, pBasicParam->paramType);
UNCOV
1970
    return TSDB_CODE_INVALID_PARA;
×
1971
  }
1972

1973
  if (pDataInfo->tagList) {
8,334,749✔
UNCOV
1974
    taosArrayClear(pDataInfo->tagList);
×
1975
  }
1976

1977
  if (pBasicParam->tagList) {
8,334,749✔
1978
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
3,633,222✔
1979
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
3,633,222✔
1980

1981
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
24,209,676✔
1982
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
20,576,454✔
1983
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
20,576,454✔
1984

1985
      dstTag = (STagVal){0};
20,576,454✔
1986
      dstTag.type = pSrcTag->type;
20,576,454✔
1987
      dstTag.cid = pSrcTag->cid;
20,576,454✔
1988
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
20,576,454✔
1989
        dstTag.nData = pSrcTag->nData;
9,025,146✔
1990
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
9,025,146✔
1991
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
9,025,146✔
1992
        needFree = true;
9,025,146✔
1993
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
9,025,146✔
1994
      } else {
1995
        dstTag.i64 = pSrcTag->i64;
11,551,308✔
1996
      }
1997

1998
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
41,152,908✔
1999
      needFree = false;
20,576,454✔
2000
    }
2001
  } else {
2002
    pDataInfo->tagList = NULL;
4,701,527✔
2003
  }
2004

2005
  return code;
8,334,749✔
UNCOV
2006
_return:
×
UNCOV
2007
  if (needFree) {
×
UNCOV
2008
    taosMemoryFreeClear(dstTag.pData);
×
2009
  }
UNCOV
2010
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2011
  return code;
×
2012
}
2013

2014
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
8,334,749✔
2015
  int32_t     code = TSDB_CODE_SUCCESS;
8,334,749✔
2016
  int32_t     lino = 0;
8,334,749✔
2017
  SOrgTbInfo  dstOrgTbInfo = {0};
8,334,749✔
2018
  bool        needFree = false;
8,334,749✔
2019

2020
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
8,334,749✔
UNCOV
2021
    qError("%s failed since invalid exchange operator param type %d",
×
2022
      __func__, pBasicParam->paramType);
UNCOV
2023
    return TSDB_CODE_INVALID_PARA;
×
2024
  }
2025

2026
  if (pBasicParam->batchOrgTbInfo) {
8,334,749✔
2027
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
8,334,749✔
2028
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
8,334,749✔
2029

2030
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
22,035,703✔
2031
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
13,700,954✔
2032
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
13,700,954✔
2033

2034
      dstOrgTbInfo = (SOrgTbInfo){0};
13,700,954✔
2035
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
13,700,954✔
2036
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
13,700,954✔
2037

2038
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
13,700,954✔
2039
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
13,700,954✔
2040

2041
      needFree = true;
13,700,954✔
2042
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
27,401,908✔
2043
      needFree = false;
13,700,954✔
2044
    }
2045
  } else {
UNCOV
2046
    pBasicParam->batchOrgTbInfo = NULL;
×
2047
  }
2048

2049
  return code;
8,334,749✔
UNCOV
2050
_return:
×
2051
  if (needFree) {
×
UNCOV
2052
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
2053
  }
UNCOV
2054
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2055
  return code;
×
2056
}
2057

2058
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
60,860,341✔
2059
                                SExchangeOperatorBasicParam* pBasicParam) {
2060
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
60,860,341✔
UNCOV
2061
    qWarn("%s, %s found invalid exchange operator param type %d",
×
2062
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
UNCOV
2063
    return TSDB_CODE_SUCCESS;
×
2064
  }
2065

2066
  int32_t            code = TSDB_CODE_SUCCESS;
60,860,341✔
2067
  int32_t            lino = 0;
60,860,341✔
2068
  SExchangeInfo*     pExchangeInfo = pOperator->info;
60,860,341✔
2069
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
60,860,341✔
2070

2071
  if (NULL == pIdx) {
60,860,341✔
2072
    if (pBasicParam->isNewDeployed) {
664,922✔
2073
      SDownstreamSourceNode *pNode = NULL;
2,486✔
2074
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,486✔
2075
      QUERY_CHECK_CODE(code, lino, _return);
2,486✔
2076

2077
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,486✔
2078
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,486✔
2079
      QUERY_CHECK_CODE(code, lino, _return);
2,486✔
2080

2081
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,486✔
2082
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,486✔
2083

2084
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,486✔
2085
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,486✔
2086
      if (pExchangeInfo->pHashSources) {
2,486✔
2087
        QUERY_CHECK_CODE(code, lino, _return);
2,486✔
2088
      }
2089
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,486✔
2090
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,486✔
2091
    } else if (pBasicParam->type == EX_SRC_TYPE_VSTB_TS_SCAN || pBasicParam->type == EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN) {
662,436✔
2092
      // Multi-exchange virtual table paths build each exchange param from the full vg map.
2093
      // If this exchange does not own the current vg source, skip it and let the matching exchange consume it.
2094
      qDebug("addSingleExchangeSource found no existing source for vgId: %d, sourceType:%d, skip it",
662,436✔
2095
             pBasicParam->vgId, pBasicParam->type);
2096
      return TSDB_CODE_SUCCESS;
662,436✔
2097
    } else {
UNCOV
2098
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
UNCOV
2099
      return TSDB_CODE_INVALID_PARA;
×
2100
    }
2101
  }
2102

2103
  qDebug("start to add single exchange source");
60,197,905✔
2104

2105
  switch (pBasicParam->type) {
60,197,905✔
2106
    case EX_SRC_TYPE_VSTB_TS_SCAN:
8,334,749✔
2107
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
2108
    case EX_SRC_TYPE_VSTB_INTERVAL_SCAN:
2109
    case EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN:
2110
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
2111
      if (pIdx->inUseIdx < 0) {
8,334,749✔
2112
        SSourceDataInfo dataInfo = {0};
4,183,726✔
2113
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
4,183,726✔
2114
        dataInfo.taskId = pExchangeInfo->pTaskId;
4,183,726✔
2115
        dataInfo.index = pIdx->srcIdx;
4,183,726✔
2116
        dataInfo.groupid = pBasicParam->groupid;
4,183,726✔
2117
        dataInfo.window = pBasicParam->window;
4,183,726✔
2118
        dataInfo.isNewParam = pBasicParam->isNewParam;
4,183,726✔
2119
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
4,183,726✔
2120
        QUERY_CHECK_CODE(code, lino, _return);
4,183,726✔
2121

2122
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
4,183,726✔
2123
        QUERY_CHECK_CODE(code, lino, _return);
4,183,726✔
2124

2125
        dataInfo.orgTbInfo = NULL;
4,183,726✔
2126

2127
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
4,183,726✔
2128
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
4,183,726✔
2129

2130
        dataInfo.type = pBasicParam->type;
4,183,726✔
2131
        dataInfo.srcOpType = pBasicParam->srcOpType;
4,183,726✔
2132
        dataInfo.tableSeq = pBasicParam->tableSeq;
4,183,726✔
2133

2134
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
8,367,452✔
2135

2136
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
4,183,726✔
2137
      } else {
2138
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
4,151,023✔
2139
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
4,151,023✔
2140

2141
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
4,151,023✔
2142
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
4,151,023✔
2143
        }
2144

2145
        pDataInfo->taskId = pExchangeInfo->pTaskId;
4,151,023✔
2146
        pDataInfo->index = pIdx->srcIdx;
4,151,023✔
2147
        pDataInfo->window = pBasicParam->window;
4,151,023✔
2148
        pDataInfo->groupid = pBasicParam->groupid;
4,151,023✔
2149
        pDataInfo->isNewParam = pBasicParam->isNewParam;
4,151,023✔
2150

2151
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
4,151,023✔
2152
        QUERY_CHECK_CODE(code, lino, _return);
4,151,023✔
2153

2154
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
4,151,023✔
2155
        QUERY_CHECK_CODE(code, lino, _return);
4,151,023✔
2156

2157
        pDataInfo->orgTbInfo = NULL;
4,151,023✔
2158

2159
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
4,151,023✔
2160
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
4,151,023✔
2161

2162
        pDataInfo->type = pBasicParam->type;
4,151,023✔
2163
        pDataInfo->srcOpType = pBasicParam->srcOpType;
4,151,023✔
2164
        pDataInfo->tableSeq = pBasicParam->tableSeq;
4,151,023✔
2165
      }
2166
      break;
8,334,749✔
2167
    }
2168
    case EX_SRC_TYPE_VTB_WIN_SCAN:
5,647,136✔
2169
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2170
      SSourceDataInfo dataInfo = {0};
5,647,136✔
2171
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
5,647,136✔
2172
      dataInfo.taskId = pExchangeInfo->pTaskId;
5,647,136✔
2173
      dataInfo.index = pIdx->srcIdx;
5,647,136✔
2174
      dataInfo.window = pBasicParam->window;
5,647,136✔
2175
      dataInfo.groupid = 0;
5,647,136✔
2176
      dataInfo.orgTbInfo = NULL;
5,647,136✔
2177
      dataInfo.tagList = NULL;
5,647,136✔
2178

2179
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
5,647,136✔
2180
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
5,647,136✔
2181

2182
      dataInfo.isNewParam = false;
5,647,136✔
2183
      dataInfo.type = pBasicParam->type;
5,647,136✔
2184
      dataInfo.srcOpType = pBasicParam->srcOpType;
5,647,136✔
2185
      dataInfo.tableSeq = pBasicParam->tableSeq;
5,647,136✔
2186

2187
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
5,647,136✔
2188
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
11,294,272✔
2189
      break;
5,647,136✔
2190
    }
2191
    case EX_SRC_TYPE_VSTB_SCAN: {
45,915,271✔
2192
      SSourceDataInfo dataInfo = {0};
45,915,271✔
2193
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
45,915,271✔
2194
      dataInfo.taskId = pExchangeInfo->pTaskId;
45,915,271✔
2195
      dataInfo.index = pIdx->srcIdx;
45,915,271✔
2196
      dataInfo.window = pBasicParam->window;
45,915,271✔
2197
      dataInfo.groupid = 0;
45,915,271✔
2198
      dataInfo.isNewParam = pBasicParam->isNewParam;
45,915,271✔
2199
      dataInfo.tagList = NULL;
45,915,271✔
2200
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
45,915,271✔
2201
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
45,915,271✔
2202
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
45,915,271✔
2203
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
45,915,271✔
2204
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
45,915,271✔
2205
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
45,915,271✔
2206

2207
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
45,915,271✔
2208
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
45,915,271✔
2209

2210
      dataInfo.type = pBasicParam->type;
45,915,271✔
2211
      dataInfo.srcOpType = pBasicParam->srcOpType;
45,915,271✔
2212
      dataInfo.tableSeq = pBasicParam->tableSeq;
45,915,271✔
2213

2214
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
45,915,271✔
2215
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
91,830,542✔
2216
      break;
45,915,271✔
2217
    }
2218
    case EX_SRC_TYPE_STB_JOIN_SCAN:
300,749✔
2219
    default: {
2220
      if (pIdx->inUseIdx < 0) {
300,749✔
2221
        SSourceDataInfo dataInfo = {0};
298,397✔
2222
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
298,397✔
2223
        dataInfo.taskId = pExchangeInfo->pTaskId;
298,397✔
2224
        dataInfo.index = pIdx->srcIdx;
298,397✔
2225
        dataInfo.groupid = 0;
298,397✔
2226
        dataInfo.tagList = NULL;
298,397✔
2227

2228
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
298,397✔
2229
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
298,397✔
2230

2231
        dataInfo.isNewParam = false;
298,397✔
2232
        dataInfo.type = pBasicParam->type;
298,397✔
2233
        dataInfo.srcOpType = pBasicParam->srcOpType;
298,397✔
2234
        dataInfo.tableSeq = pBasicParam->tableSeq;
298,397✔
2235

2236
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
596,794✔
2237

2238
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
298,397✔
2239
      } else {
2240
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,352✔
2241
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,352✔
2242
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,352✔
2243
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,352✔
2244
        }
2245

2246
        pDataInfo->tagList = NULL;
2,352✔
2247
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,352✔
2248
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,352✔
2249

2250
        pDataInfo->groupid = 0;
2,352✔
2251
        pDataInfo->isNewParam = false;
2,352✔
2252
        pDataInfo->type = pBasicParam->type;
2,352✔
2253
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,352✔
2254
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,352✔
2255
      }
2256
      break;
300,749✔
2257
    }
2258
  }
2259

2260
  return code;
60,197,905✔
UNCOV
2261
_return:
×
UNCOV
2262
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2263
  return code;
×
2264
}
2265

2266
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
59,281,954✔
2267
  SExchangeInfo*               pExchangeInfo = pOperator->info;
59,281,954✔
2268
  int32_t                      code = TSDB_CODE_SUCCESS;
59,281,954✔
2269
  SExchangeOperatorBasicParam* pBasicParam = NULL;
59,281,954✔
2270
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
59,281,954✔
2271
  if (pParam->multiParams) {
59,281,954✔
2272
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
7,714,843✔
2273
    int32_t                      iter = 0;
7,714,843✔
2274
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
17,008,073✔
2275
      code = addSingleExchangeSource(pOperator, pBasicParam);
9,293,230✔
2276
      if (code) {
9,293,230✔
UNCOV
2277
        return code;
×
2278
      }
2279
    }
2280
  } else {
2281
    pBasicParam = &pParam->basic;
51,567,111✔
2282
    code = addSingleExchangeSource(pOperator, pBasicParam);
51,567,111✔
2283
  }
2284

2285
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
59,281,954✔
2286
  pOperator->pOperatorGetParam = NULL;
59,281,954✔
2287

2288
  return code;
59,281,954✔
2289
}
2290

2291
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
695,034,171✔
2292
  SExchangeInfo* pExchangeInfo = pOperator->info;
695,034,171✔
2293
  int32_t        code = TSDB_CODE_SUCCESS;
695,035,737✔
2294
  int32_t        lino = 0;
695,035,737✔
2295
  
2296
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
695,035,737✔
2297
       NULL == pOperator->pOperatorGetParam) ||
512,481,649✔
2298
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
186,824,824✔
2299
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
520,939,046✔
2300
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2301
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2302
    return TSDB_CODE_SUCCESS;
520,939,941✔
2303
  }
2304

2305
  if (pExchangeInfo->dynamicOp) {
174,094,205✔
2306
    code = addDynamicExchangeSource(pOperator);
59,281,954✔
2307
    QUERY_CHECK_CODE(code, lino, _end);
59,281,954✔
2308
  }
2309

2310
  if (pOperator->status == OP_NOT_OPENED &&
174,093,188✔
2311
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
160,250,554✔
2312
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
127,237,212✔
2313
    pExchangeInfo->current = 0;
54,519,693✔
2314
  }
2315

2316
  if (NULL != pOperator->pOperatorGetParam) {
174,084,384✔
2317
    SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
4,274,544✔
2318
    storeNotifyInfo(pOperator);
4,274,544✔
2319

2320
    if (!pGetParam->reUse) {
4,274,544✔
UNCOV
2321
      freeOperatorParam(pGetParam, OP_GET_PARAM);
×
2322
    } else {
2323
      /**
2324
        The param is referenced by getParam, and it will be freed by
2325
        the parent operator after getting next block.
2326
      */
2327
      pGetParam->reUse = false;
4,274,544✔
2328
    }
2329
    pOperator->pOperatorGetParam = NULL;
4,274,544✔
2330
  }
2331

2332
  int64_t st = taosGetTimestampUs();
174,084,544✔
2333

2334
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
174,084,544✔
2335
    code = prepareConcurrentlyLoad(pOperator);
115,766,571✔
2336
    QUERY_CHECK_CODE(code, lino, _end);
115,773,192✔
2337
    pExchangeInfo->openedTs = taosGetTimestampUs();
115,774,855✔
2338
  }
2339

2340
  OPTR_SET_OPENED(pOperator);
174,098,634✔
2341
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
174,093,167✔
2342

2343
_end:
79,191,175✔
2344
  if (code != TSDB_CODE_SUCCESS) {
174,092,162✔
UNCOV
2345
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2346
    pOperator->pTaskInfo->code = code;
×
UNCOV
2347
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2348
  }
2349
  return code;
174,092,162✔
2350
}
2351

2352
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
4,241,816✔
2353
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
4,241,816✔
2354

2355
  if (pLimitInfo->remainGroupOffset > 0) {
4,241,816✔
UNCOV
2356
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
UNCOV
2357
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
UNCOV
2358
      blockDataCleanup(pBlock);
×
UNCOV
2359
      return PROJECT_RETRIEVE_CONTINUE;
×
UNCOV
2360
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2361
      // now it is the data from a new group
UNCOV
2362
      pLimitInfo->remainGroupOffset -= 1;
×
2363

2364
      // ignore data block in current group
UNCOV
2365
      if (pLimitInfo->remainGroupOffset > 0) {
×
UNCOV
2366
        blockDataCleanup(pBlock);
×
UNCOV
2367
        return PROJECT_RETRIEVE_CONTINUE;
×
2368
      }
2369
    }
2370

2371
    // set current group id of the project operator
UNCOV
2372
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2373
  }
2374

2375
  // here check for a new group data, we need to handle the data of the previous group.
2376
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
4,241,816✔
2377
    pLimitInfo->numOfOutputGroups += 1;
207,487✔
2378
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
207,487✔
UNCOV
2379
      pOperator->status = OP_EXEC_DONE;
×
UNCOV
2380
      blockDataCleanup(pBlock);
×
2381

UNCOV
2382
      return PROJECT_RETRIEVE_DONE;
×
2383
    }
2384

2385
    // reset the value for a new group data
2386
    resetLimitInfoForNextGroup(pLimitInfo);
207,487✔
2387
    // existing rows that belongs to previous group.
2388
    if (pBlock->info.rows > 0) {
207,487✔
2389
      return PROJECT_RETRIEVE_DONE;
207,487✔
2390
    }
2391
  }
2392

2393
  // here we reach the start position, according to the limit/offset requirements.
2394

2395
  // set current group id
2396
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
4,034,329✔
2397

2398
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
4,034,329✔
2399
  if (pBlock->info.rows == 0) {
4,034,329✔
2400
    return PROJECT_RETRIEVE_CONTINUE;
2,052,874✔
2401
  } else {
2402
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,981,455✔
UNCOV
2403
      setOperatorCompleted(pOperator);
×
UNCOV
2404
      return PROJECT_RETRIEVE_DONE;
×
2405
    }
2406
  }
2407

2408
  // todo optimize performance
2409
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2410
  // they may not belong to the same group the limit/offset value is not valid in this case.
2411
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,981,455✔
2412
    return PROJECT_RETRIEVE_DONE;
1,981,455✔
2413
  } else {  // not full enough, continue to accumulate the output data in the buffer.
UNCOV
2414
    return PROJECT_RETRIEVE_CONTINUE;
×
2415
  }
2416
}
2417

2418
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
258,983,451✔
2419
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
258,983,451✔
2420
  int32_t        code = TSDB_CODE_SUCCESS;
258,984,365✔
2421
  if (pTask->pWorkerCb) {
258,984,365✔
2422
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
258,983,292✔
2423
    if (code != TSDB_CODE_SUCCESS) {
258,985,903✔
UNCOV
2424
      pTask->code = code;
×
UNCOV
2425
      return pTask->code;
×
2426
    }
2427
  }
2428

2429
  code = tsem_wait(&pExchangeInfo->ready);
258,985,140✔
2430
  if (code != TSDB_CODE_SUCCESS) {
258,985,375✔
2431
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
23✔
2432
    pTask->code = code;
×
UNCOV
2433
    return pTask->code;
×
2434
  }
2435

2436
  if (pTask->pWorkerCb) {
258,985,352✔
2437
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
258,985,100✔
2438
    if (code != TSDB_CODE_SUCCESS) {
258,985,651✔
UNCOV
2439
      pTask->code = code;
×
UNCOV
2440
      return pTask->code;
×
2441
    }
2442
  }
2443
  return TSDB_CODE_SUCCESS;
258,985,903✔
2444
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc