• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4976

06 Mar 2026 09:48AM UTC coverage: 68.446% (+0.08%) from 68.37%
#4976

push

travis-ci

web-flow
feat(TDgpt): support multiple input data columns for anomaly detection. (#34606)

0 of 93 new or added lines in 9 files covered. (0.0%)

5718 existing lines in 144 files now uncovered.

211146 of 308486 relevant lines covered (68.45%)

136170362.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.26
/source/libs/executor/src/exchangeoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "operator.h"
20
#include "query.h"
21
#include "querytask.h"
22
#include "tdatablock.h"
23
#include "thash.h"
24
#include "tmsg.h"
25
#include "tref.h"
26
#include "trpc.h"
27

28
typedef struct SFetchRspHandleWrapper {
29
  int64_t  exchangeId;
30
  int32_t  sourceIndex;
31
  int64_t  seqId;
32
} SFetchRspHandleWrapper;
33

34
typedef struct SSourceDataInfo {
35
  int32_t             index;
36
  int64_t             seqId;
37
  SRWLatch            lock;
38
  SRetrieveTableRsp*  pRsp;
39
  uint64_t            totalRows;
40
  int64_t             startTime;
41
  int32_t             code;
42
  EX_SOURCE_STATUS    status;
43
  const char*         taskId;
44
  SArray*             pSrcUidList;
45
  int32_t             srcOpType;
46
  bool                tableSeq;
47
  char*               decompBuf;
48
  int32_t             decompBufSize;
49
  SOrgTbInfo*         orgTbInfo;
50
  SArray*             batchOrgTbInfo; // SArray<SOrgTbInfo>
51
  SArray*             tagList;
52
  EExchangeSourceType type;
53
  bool                isNewParam;
54
  STimeWindow         window;
55
  uint64_t            groupid;
56
  bool                fetchSent; // need reset
57
} SSourceDataInfo;
58

59
static void destroyExchangeOperatorInfo(void* param);
60
static void freeBlock(void* pParam);
61
static void freeSourceDataInfo(void* param);
62
static void setAllSourcesCompleted(SOperatorInfo* pOperator);
63

64
static int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code);
65
static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex);
66
static int32_t getCompletedSources(const SArray* pArray, int32_t* pRes);
67
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
68
static void    storeNotifyInfo(SOperatorInfo* pOperator);
69
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
70
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
71
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock,
72
                                 bool holdDataInBuf);
73
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
74

75
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo);
76

77
static bool isVstbScan(SSourceDataInfo* pDataInfo) {return pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN; }
29,741,949✔
78
static bool isVstbWinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_WIN_SCAN; }
×
79
static bool isVstbAggScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_AGG_SCAN; }
×
80
static bool isVstbTagScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_VSTB_TAG_SCAN; }
19,261,490✔
81
static bool isStbJoinScan(SSourceDataInfo* pDataInfo) { return pDataInfo->type == EX_SRC_TYPE_STB_JOIN_SCAN; }
×
82

83

84
static void streamSequenciallyLoadRemoteData(SOperatorInfo* pOperator,
9,711,404✔
85
                                             SExchangeInfo* pExchangeInfo,
86
                                             SExecTaskInfo* pTaskInfo) {
87
  int32_t code = 0;
9,711,404✔
88
  int32_t lino = 0;
9,711,404✔
89
  int64_t startTs = taosGetTimestampUs();  
9,711,852✔
90
  int32_t  totalSources = (int32_t)taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
9,711,852✔
91
  int32_t completed = 0;
9,712,076✔
92
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
9,712,076✔
93
  if (code != TSDB_CODE_SUCCESS) {
9,711,852✔
94
    pTaskInfo->code = code;
×
95
    T_LONG_JMP(pTaskInfo->env, code);
×
96
  }
97
  if (completed == totalSources) {
9,711,852✔
98
    qDebug("%s no load since all sources completed, completed:%d, totalSources:%d", pTaskInfo->id.str, completed, totalSources);
1,828,304✔
99
    setAllSourcesCompleted(pOperator);
1,828,304✔
100
    return;
1,830,772✔
101
  }
102

103
  SSourceDataInfo* pDataInfo = NULL;
7,883,548✔
104

105
  while (1) {
4,547,457✔
106
    if (pExchangeInfo->current < 0) {
12,431,005✔
107
      qDebug("current %d and all sources complted, totalSources:%d", pExchangeInfo->current, totalSources);
120,057✔
108
      setAllSourcesCompleted(pOperator);
120,057✔
109
      return;
120,057✔
110
    }
111
    
112
    if (pExchangeInfo->current >= totalSources) {
12,310,724✔
113
      completed = 0;
5,669,919✔
114
      code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
5,669,919✔
115
      if (code != TSDB_CODE_SUCCESS) {
5,669,695✔
116
        pTaskInfo->code = code;
×
117
        T_LONG_JMP(pTaskInfo->env, code);
×
118
      }
119
      if (completed == totalSources) {
5,669,695✔
120
        qDebug("stop to load since all sources complted, completed:%d, totalSources:%d", completed, totalSources);
3,918,358✔
121
        setAllSourcesCompleted(pOperator);
3,918,358✔
122
        return;
3,918,358✔
123
      }
124
      
125
      pExchangeInfo->current = 0;
1,751,337✔
126
    }
127

128
    qDebug("%s start stream exchange %p idx:%d fetch", GET_TASKID(pTaskInfo), pExchangeInfo, pExchangeInfo->current);
8,392,142✔
129

130
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
8,392,142✔
131
    if (!pDataInfo) {
8,392,366✔
132
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
133
      pTaskInfo->code = terrno;
×
134
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
135
    }
136

137
    if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
8,392,366✔
138
      pExchangeInfo->current++;
995✔
139
      continue;
995✔
140
    }
141

142
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
8,390,923✔
143

144
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
8,390,923✔
145
    if (code != TSDB_CODE_SUCCESS) {
8,391,560✔
146
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
147
      pTaskInfo->code = code;
×
148
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
149
    }
150

151
    while (true) {
1,472✔
152
      code = exchangeWait(pOperator, pExchangeInfo);
8,393,032✔
153
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
8,392,299✔
154
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
480✔
155
      }
156

157
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
8,391,819✔
158
      if (pDataInfo->seqId != currSeqId) {
8,391,819✔
159
        qDebug("%s seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", 
1,472✔
160
            GET_TASKID(pTaskInfo), pDataInfo->seqId, pExchangeInfo, currSeqId);
161
        taosMemoryFreeClear(pDataInfo->pRsp);
1,472✔
162
        continue;
1,472✔
163
      }
164

165
      break;
8,390,347✔
166
    }
167

168
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
8,390,347✔
169
    if (!pSource) {
8,390,347✔
170
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
171
      pTaskInfo->code = terrno;
×
172
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
173
    }
174

175
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
8,390,347✔
UNCOV
176
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
×
177
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
178
             tstrerror(pDataInfo->code));
UNCOV
179
      pTaskInfo->code = pDataInfo->code;
×
UNCOV
180
      T_LONG_JMP(pTaskInfo->env, code);
×
181
    }
182

183
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
8,390,347✔
184
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
8,390,347✔
185

186
    if (pRsp->numOfRows == 0) {
8,390,347✔
187
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
4,546,462✔
188
             " execId:%d idx %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
189
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
190
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
191

192
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
4,546,462✔
193
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
4,546,462✔
194
        pExchangeInfo->current = -1;
120,057✔
195
      } else {
196
        pExchangeInfo->current += 1;
4,426,405✔
197
      }
198
      taosMemoryFreeClear(pDataInfo->pRsp);
4,546,462✔
199
      continue;
4,546,462✔
200
    }
201

202
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
3,843,885✔
203
    TAOS_CHECK_EXIT(code);
3,843,661✔
204

205
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
3,843,661✔
206
    if (pRsp->completed == 1) {
3,843,885✔
207
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
1,961,534✔
208
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%d", pDataInfo,
209
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
210
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
211
             pExchangeInfo->current + 1, totalSources);
212

213
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
1,961,534✔
214
      if (isVstbScan(pDataInfo)) {
1,961,534✔
215
        pExchangeInfo->current = -1;
×
216
        taosMemoryFreeClear(pDataInfo->pRsp);
×
217
        continue;
×
218
      }
219
    } else {
220
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d idx:%d numOfRows:%" PRId64
1,882,351✔
221
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
222
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
223
             pExchangeInfo->current, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
224
    }
225

226
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
3,843,885✔
227
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
3,843,885✔
228

229
    pExchangeInfo->current++;
3,843,885✔
230

231
    taosMemoryFreeClear(pDataInfo->pRsp);
3,843,885✔
232
    return;
3,843,661✔
233
  }
234

235
_exit:
×
236

237
  if (code) {
×
238
    pTaskInfo->code = code;
×
239
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
×
240
  }
241
}
242

243

244
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
244,042,364✔
245
                                           SExecTaskInfo* pTaskInfo) {
246
  int32_t code = 0;
244,042,364✔
247
  int32_t lino = 0;
244,042,364✔
248
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
244,042,364✔
249
  int32_t completed = 0;
244,039,061✔
250
  code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &completed);
244,039,061✔
251
  if (code != TSDB_CODE_SUCCESS) {
244,042,727✔
252
    pTaskInfo->code = code;
×
253
    T_LONG_JMP(pTaskInfo->env, code);
×
254
  }
255
  if (completed == totalSources) {
244,042,727✔
256
    setAllSourcesCompleted(pOperator);
79,815,142✔
257
    return;
79,815,794✔
258
  }
259

260
  SSourceDataInfo* pDataInfo = NULL;
164,227,585✔
261

262
  while (1) {
19,456,372✔
263
    qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
183,683,957✔
264
    code = exchangeWait(pOperator, pExchangeInfo);
183,684,464✔
265

266
    if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
183,683,727✔
267
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
1,474✔
268
    }
269

270
    for (int32_t i = 0; i < totalSources; ++i) {
303,034,125✔
271
      pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
303,036,117✔
272
      QUERY_CHECK_NULL(pDataInfo, code, lino, _exit, terrno);
303,035,761✔
273
      if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
303,035,761✔
274
        continue;
87,386,043✔
275
      }
276

277
      if (pDataInfo->status != EX_SOURCE_DATA_READY) {
215,650,074✔
278
        continue;
31,965,802✔
279
      }
280

281
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
183,684,272✔
282
      if (pDataInfo->seqId != currSeqId) {
183,684,272✔
283
        qDebug("concurrent rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
×
284
        taosMemoryFreeClear(pDataInfo->pRsp);
×
285
        break;
×
286
      }
287

288
      if (pDataInfo->code != TSDB_CODE_SUCCESS) {
183,684,272✔
289
        code = pDataInfo->code;
735✔
290
        TAOS_CHECK_EXIT(code);
735✔
291
      }
292

293
      tmemory_barrier();
183,683,537✔
294
      SRetrieveTableRsp*     pRsp = pDataInfo->pRsp;
183,683,537✔
295
      SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
183,683,153✔
296
      QUERY_CHECK_NULL(pSource, code, lino, _exit, terrno);
183,683,537✔
297

298
      // todo
299
      SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
183,683,537✔
300
      if (pRsp->numOfRows == 0) {
183,683,537✔
301
        if (NULL != pDataInfo->pSrcUidList && !isVstbScan(pDataInfo)) {
40,604,275✔
302
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
×
303
          code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
×
304
          if (code != TSDB_CODE_SUCCESS) {
×
305
            taosMemoryFreeClear(pDataInfo->pRsp);
×
306
            TAOS_CHECK_EXIT(code);
×
307
          }
308
        } else {
309
          pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
40,604,275✔
310
          qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
40,604,275✔
311
                 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, pDataInfo,
312
                 GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
313
                 pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
314
          taosMemoryFreeClear(pDataInfo->pRsp);
40,604,275✔
315
        }
316
        break;
40,604,275✔
317
      }
318

319
      TAOS_CHECK_EXIT(doExtractResultBlocks(pExchangeInfo, pDataInfo));
143,079,262✔
320

321
      SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
143,078,899✔
322
      updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
143,079,262✔
323
      pDataInfo->totalRows += pRetrieveRsp->numOfRows;
143,078,899✔
324

325
      if (pRsp->completed == 1) {
143,079,262✔
326
        pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
128,329,710✔
327
        qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
128,329,347✔
328
               " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
329
               ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, pDataInfo,
330
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId, i,
331
               pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows,
332
               pLoadInfo->totalSize / 1024.0, i + 1, totalSources);
333
      } else {
334
        qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
14,749,552✔
335
               " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb",
336
               GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
337
               pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
338
      }
339

340
      taosMemoryFreeClear(pDataInfo->pRsp);
143,078,899✔
341

342
      if ((pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) && !isVstbScan(pDataInfo) && !isVstbTagScan(pDataInfo)) {
143,079,262✔
343
        pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
14,749,552✔
344
        code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
14,749,552✔
345
        if (code != TSDB_CODE_SUCCESS) {
14,749,552✔
346
          taosMemoryFreeClear(pDataInfo->pRsp);
×
347
          TAOS_CHECK_EXIT(code);
×
348
        }
349
      }
350
      
351
      return;
143,078,817✔
352
    }  // end loop
353

354
    int32_t complete1 = 0;
40,602,283✔
355
    code = getCompletedSources(pExchangeInfo->pSourceDataInfo, &complete1);
40,604,275✔
356
    if (code != TSDB_CODE_SUCCESS) {
40,604,275✔
357
      pTaskInfo->code = code;
×
358
      T_LONG_JMP(pTaskInfo->env, code);
×
359
    }
360
    if (complete1 == totalSources) {
40,604,275✔
361
      qDebug("all sources are completed, %s", GET_TASKID(pTaskInfo));
21,147,903✔
362
      return;
21,147,903✔
363
    }
364
  }
365

366
_exit:
735✔
367

368
  if (code) {
735✔
369
    pTaskInfo->code = code;
735✔
370
    qError("%s failed at line %d since %s", __FUNCTION__, lino, tstrerror(code));
735✔
371
  }
372
}
373

374
static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
645,226,733✔
375
  int32_t        code = TSDB_CODE_SUCCESS;
645,226,733✔
376
  SExchangeInfo* pExchangeInfo = pOperator->info;
645,226,733✔
377
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
645,227,320✔
378

379
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
645,225,579✔
380

381
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
645,225,202✔
382
  if (pOperator->status == OP_EXEC_DONE) {
645,226,563✔
383
    qDebug("%s all %" PRIzu " source(s) are exhausted, total rows:%" PRIu64 " bytes:%" PRIu64 ", elapsed:%.2f ms",
×
384
           GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize,
385
           pLoadInfo->totalElapsed / 1000.0);
386
    return NULL;
×
387
  }
388

389
  // we have buffered retrieved datablock, return it directly
390
  SSDataBlock* p = NULL;
645,225,364✔
391
  if (taosArrayGetSize(pExchangeInfo->pResultBlockList) > 0) {
645,226,888✔
392
    p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
358,442,160✔
393
    taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
358,442,160✔
394
  }
395

396
  if (p != NULL) {
645,225,879✔
397
    void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
358,441,712✔
398
    if (!tmp) {
358,441,941✔
399
      code = terrno;
×
400
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
401
      pTaskInfo->code = code;
×
402
      T_LONG_JMP(pTaskInfo->env, code);
×
403
    }
404
    return p;
358,441,941✔
405
  } else {
406
    if (pExchangeInfo->seqLoadData) {
286,784,167✔
407
      code = seqLoadRemoteData(pOperator);
33,030,581✔
408
      if (code != TSDB_CODE_SUCCESS) {
33,030,198✔
409
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
642✔
410
        pTaskInfo->code = code;
642✔
411
        T_LONG_JMP(pTaskInfo->env, code);
642✔
412
      }
413
    } else if (IS_STREAM_MODE(pOperator->pTaskInfo))   {
253,753,942✔
414
      streamSequenciallyLoadRemoteData(pOperator, pExchangeInfo, pTaskInfo);
9,711,852✔
415
    } else {
416
      concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
244,041,659✔
417
    }
418
    if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
286,782,374✔
419
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(pOperator->pTaskInfo->code));
1,270✔
420
      T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
1,270✔
421
    }
422
    
423
    if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
286,781,844✔
424
      qDebug("empty resultBlockList");
115,226,119✔
425
      return NULL;
115,226,119✔
426
    } else {
427
      p = taosArrayGetP(pExchangeInfo->pResultBlockList, 0);
171,555,813✔
428
      taosArrayRemove(pExchangeInfo->pResultBlockList, 0);
171,555,813✔
429
      void* tmp = taosArrayPush(pExchangeInfo->pRecycledBlocks, &p);
171,555,813✔
430
      if (!tmp) {
171,555,813✔
431
        code = terrno;
×
432
        qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
433
        pTaskInfo->code = code;
×
434
        T_LONG_JMP(pTaskInfo->env, code);
×
435
      }
436

437
      qDebug("block with rows:%" PRId64 " loaded", p->info.rows);
171,555,813✔
438
      return p;
171,555,813✔
439
    }
440
  }
441
}
442

443
static int32_t loadRemoteDataNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
643,403,049✔
444
  int32_t        code = TSDB_CODE_SUCCESS;
643,403,049✔
445
  int32_t        lino = 0;
643,403,049✔
446
  SExchangeInfo* pExchangeInfo = pOperator->info;
643,403,049✔
447
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
643,404,889✔
448

449
  qDebug("%s start to load from exchange %p", pTaskInfo->id.str, pExchangeInfo);
643,403,829✔
450

451
  code = pOperator->fpSet._openFn(pOperator);
643,406,932✔
452
  QUERY_CHECK_CODE(code, lino, _end);
643,403,284✔
453

454
  if (pOperator->status == OP_EXEC_DONE) {
643,403,284✔
455
    (*ppRes) = NULL;
131,468✔
456
    return code;
131,468✔
457
  }
458

459
  while (1) {
1,952,959✔
460
    SSDataBlock* pBlock = doLoadRemoteDataImpl(pOperator);
645,225,625✔
461
    if (pBlock == NULL) {
645,223,425✔
462
      (*ppRes) = NULL;
115,226,119✔
463
      return code;
115,226,119✔
464
    }
465

466
    code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
529,997,306✔
467
    QUERY_CHECK_CODE(code, lino, _end);
529,997,167✔
468

469
    if (blockDataGetNumOfRows(pBlock) == 0) {
529,997,167✔
470
      qDebug("rows 0 block got, continue next load");
3,246✔
471
      continue;
3,246✔
472
    }
473

474
    SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
529,993,836✔
475
    if (hasLimitOffsetInfo(pLimitInfo)) {
529,993,697✔
476
      int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
3,955,698✔
477
      if (status == PROJECT_RETRIEVE_CONTINUE) {
3,955,698✔
478
        qDebug("limit retrieve continue");
1,949,713✔
479
        continue;
1,949,713✔
480
      } else if (status == PROJECT_RETRIEVE_DONE) {
2,005,985✔
481
        if (pBlock->info.rows == 0) {
2,005,985✔
482
          setOperatorCompleted(pOperator);
×
483
          (*ppRes) = NULL;
×
484
          return code;
×
485
        } else {
486
          (*ppRes) = pBlock;
2,005,985✔
487
          return code;
2,005,985✔
488
        }
489
      }
490
    } else {
491
      (*ppRes) = pBlock;
526,037,999✔
492
      qDebug("block with rows %" PRId64 " returned in exechange", pBlock->info.rows);
526,038,362✔
493
      return code;
526,038,442✔
494
    }
495
  }
496

497
_end:
×
498

499
  if (code != TSDB_CODE_SUCCESS) {
×
500
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
501
    pTaskInfo->code = code;
×
502
    T_LONG_JMP(pTaskInfo->env, code);
×
503
  } else {
504
    qDebug("empty block returned in exchange");
×
505
  }
506
  
507
  (*ppRes) = NULL;
×
508
  return code;
×
509
}
510

511
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const char* id) {
109,893,827✔
512
  pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
109,893,827✔
513
  if (pInfo->pSourceDataInfo == NULL) {
109,894,058✔
514
    return terrno;
×
515
  }
516

517
  if (pInfo->dynamicOp) {
109,895,075✔
518
    return TSDB_CODE_SUCCESS;
7,438,366✔
519
  }
520

521
  int32_t len = strlen(id) + 1;
102,456,159✔
522
  pInfo->pTaskId = taosMemoryCalloc(1, len);
102,456,159✔
523
  if (!pInfo->pTaskId) {
102,458,583✔
524
    return terrno;
×
525
  }
526
  tstrncpy(pInfo->pTaskId, id, len);
102,451,205✔
527
  for (int32_t i = 0; i < numOfSources; ++i) {
266,590,943✔
528
    SSourceDataInfo dataInfo = {0};
164,131,319✔
529
    dataInfo.status = EX_SOURCE_DATA_NOT_READY;
164,131,319✔
530
    dataInfo.taskId = pInfo->pTaskId;
164,131,319✔
531
    dataInfo.index = i;
164,130,430✔
532
    SSourceDataInfo* pDs = taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
164,130,430✔
533
    if (pDs == NULL) {
164,133,994✔
534
      taosArrayDestroyEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
×
535
      return terrno;
×
536
    }
537
    qDebug("init source data info %d, pDs:%p, status:%d", i, pDs, pDs->status);
164,133,994✔
538
  }
539

540
  return TSDB_CODE_SUCCESS;
102,459,624✔
541
}
542

543
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) {
109,895,282✔
544
  size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
109,895,282✔
545

546
  if (numOfSources == 0) {
109,895,573✔
547
    qError("%s invalid number: %d of sources in exchange operator", id, (int32_t)numOfSources);
×
548
    return TSDB_CODE_INVALID_PARA;
×
549
  }
550
  pInfo->pFetchRpcHandles = taosArrayInit(numOfSources, sizeof(int64_t));
109,895,573✔
551
  if (!pInfo->pFetchRpcHandles) {
109,895,592✔
552
    return terrno;
×
553
  }
554
  void* ret = taosArrayReserve(pInfo->pFetchRpcHandles, numOfSources);
109,893,465✔
555
  if (!ret) {
109,894,901✔
556
    return terrno;
×
557
  }
558

559
  pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
109,894,901✔
560
  if (pInfo->pSources == NULL) {
109,897,424✔
561
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
562
    return terrno;
×
563
  }
564

565
  if (pExNode->node.dynamicOp) {
109,897,928✔
566
    pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
7,438,366✔
567
    if (NULL == pInfo->pHashSources) {
7,438,366✔
568
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
569
      return terrno;
×
570
    }
571
  }
572

573
  for (int32_t i = 0; i < numOfSources; ++i) {
289,304,031✔
574
    SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
179,409,984✔
575
    if (!pNode) {
179,409,287✔
576
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
577
      return terrno;
×
578
    }
579
    void* tmp = taosArrayPush(pInfo->pSources, pNode);
179,409,287✔
580
    if (!tmp) {
179,412,063✔
581
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
582
      return terrno;
×
583
    }
584
    SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1};
179,412,063✔
585
    int32_t           code =
586
        tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
179,411,532✔
587
    if (pInfo->pHashSources && code != TSDB_CODE_SUCCESS) {
179,410,707✔
588
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
589
      return code;
×
590
    }
591
  }
592

593
  initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo);
109,894,047✔
594
  int64_t refId = taosAddRef(fetchObjRefPool, pInfo);
109,895,916✔
595
  if (refId < 0) {
109,891,944✔
596
    int32_t code = terrno;
×
597
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
598
    return code;
×
599
  } else {
600
    pInfo->self = refId;
109,891,944✔
601
  }
602

603
  return initDataSource(numOfSources, pInfo, id);
109,894,512✔
604
}
605

606
int32_t resetExchangeOperState(SOperatorInfo* pOper) {
9,432,246✔
607
  SExchangeInfo* pInfo = pOper->info;
9,432,246✔
608
  SExchangePhysiNode* pPhynode = (SExchangePhysiNode*)pOper->pPhyNode;
9,433,366✔
609

610
  qDebug("%s reset exchange op:%p info:%p", pOper->pTaskInfo->id.str, pOper, pInfo);
9,433,117✔
611

612
  (void)atomic_add_fetch_64(&pInfo->seqId, 1);
9,434,536✔
613
  pOper->status = OP_NOT_OPENED;
9,433,871✔
614
  pInfo->current = 0;
9,433,871✔
615
  pInfo->loadInfo.totalElapsed = 0;
9,433,871✔
616
  pInfo->loadInfo.totalRows = 0;
9,433,647✔
617
  pInfo->loadInfo.totalSize = 0;
9,433,426✔
618
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pSourceDataInfo); ++i) {
26,418,579✔
619
    SSourceDataInfo* pDataInfo = taosArrayGet(pInfo->pSourceDataInfo, i);
16,984,487✔
620
    taosWLockLatch(&pDataInfo->lock);
16,984,711✔
621
    taosMemoryFreeClear(pDataInfo->decompBuf);
16,985,374✔
622
    taosMemoryFreeClear(pDataInfo->pRsp);
16,985,374✔
623

624
    pDataInfo->totalRows = 0;
16,985,150✔
625
    pDataInfo->code = 0;
16,985,150✔
626
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
16,985,374✔
627
    pDataInfo->fetchSent = false;
16,985,374✔
628
    taosWUnLockLatch(&pDataInfo->lock);
16,985,374✔
629
  }
630

631
  if (pInfo->dynamicOp) {
9,432,399✔
632
    taosArrayClearEx(pInfo->pSourceDataInfo, freeSourceDataInfo);
1,684,325✔
633
  } 
634

635
  taosArrayClearEx(pInfo->pResultBlockList, freeBlock);
9,433,871✔
636
  taosArrayClearEx(pInfo->pRecycledBlocks, freeBlock);
9,433,871✔
637

638
  blockDataCleanup(pInfo->pDummyBlock);
9,433,871✔
639

640
  void   *data = NULL;
9,433,871✔
641
  int32_t iter = 0;
9,433,871✔
642
  while ((data = tSimpleHashIterate(pInfo->pHashSources, data, &iter))) {
12,979,460✔
643
    ((SExchangeSrcIndex *)data)->inUseIdx = -1;
3,545,589✔
644
  }
645
  
646
  pInfo->limitInfo = (SLimitInfo){0};
9,433,426✔
647
  initLimitInfo(pPhynode->node.pLimit, pPhynode->node.pSlimit, &pInfo->limitInfo);
9,433,426✔
648

649
  return 0;
9,433,426✔
650
}
651

652
int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo,
109,895,294✔
653
                                   SOperatorInfo** pOptrInfo) {
654
  QRY_PARAM_CHECK(pOptrInfo);
109,895,294✔
655

656
  int32_t        code = 0;
109,895,294✔
657
  int32_t        lino = 0;
109,895,294✔
658
  SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
109,895,294✔
659
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
109,893,371✔
660
  if (pInfo == NULL || pOperator == NULL) {
109,891,470✔
661
    code = terrno;
×
662
    goto _error;
×
663
  }
664

665
  pInfo->isExchange = true;
109,891,470✔
666
  pOperator->pPhyNode = pExNode;
109,891,470✔
667
  pInfo->dynamicOp = pExNode->node.dynamicOp;
109,891,470✔
668
  code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo));
109,894,039✔
669
  QUERY_CHECK_CODE(code, lino, _error);
109,897,136✔
670

671
  code = tsem_init(&pInfo->ready, 0, 0);
109,897,136✔
672
  QUERY_CHECK_CODE(code, lino, _error);
109,897,492✔
673

674
  pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc);
109,897,492✔
675
  QUERY_CHECK_NULL(pInfo->pDummyBlock, code, lino, _error, terrno);
109,898,608✔
676

677
  pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES);
109,897,896✔
678
  QUERY_CHECK_NULL(pInfo->pResultBlockList, code, lino, _error, terrno);
109,897,125✔
679
  pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES);
109,898,466✔
680
  QUERY_CHECK_NULL(pInfo->pRecycledBlocks, code, lino, _error, terrno);
109,897,213✔
681

682
  SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
109,898,093✔
683
  code = qAppendTaskStopInfo(pTaskInfo, &stopInfo);
109,898,217✔
684
  QUERY_CHECK_CODE(code, lino, _error);
109,897,586✔
685

686
  pInfo->seqLoadData = pExNode->seqRecvData;
109,897,586✔
687
  pInfo->dynTbname = pExNode->dynTbname;
109,897,391✔
688
  if (pInfo->dynTbname) {
109,898,110✔
689
    pInfo->seqLoadData = true;
12,424✔
690
  }
691
  pInfo->pTransporter = pTransporter;
109,898,110✔
692

693
  setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo,
109,897,586✔
694
                  pTaskInfo);
695
  pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
109,896,090✔
696

697
  code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
109,896,592✔
698
                            pTaskInfo->pStreamRuntimeInfo);
109,897,458✔
699
  QUERY_CHECK_CODE(code, lino, _error);
109,895,839✔
700
  qTrace("%s exchange op:%p", __func__, pOperator);
109,895,839✔
701
  pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
109,895,839✔
702
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
703
  setOperatorResetStateFn(pOperator, resetExchangeOperState);
109,895,352✔
704
  *pOptrInfo = pOperator;
109,895,850✔
705
  return TSDB_CODE_SUCCESS;
109,895,476✔
706

707
_error:
×
708
  if (code != TSDB_CODE_SUCCESS) {
×
709
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
710
    pTaskInfo->code = code;
×
711
  }
712
  if (pInfo != NULL) {
×
UNCOV
713
    doDestroyExchangeOperatorInfo(pInfo);
×
714
  }
715

716
  if (pOperator != NULL) {
×
717
    pOperator->info = NULL;
×
UNCOV
718
    destroyOperator(pOperator);
×
719
  }
UNCOV
720
  return code;
×
721
}
722

723
void destroyExchangeOperatorInfo(void* param) {
109,897,479✔
724
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
109,897,479✔
725
  int32_t        code = taosRemoveRef(fetchObjRefPool, pExInfo->self);
109,897,479✔
726
  if (code != TSDB_CODE_SUCCESS) {
109,896,883✔
UNCOV
727
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
728
  }
729
}
109,896,883✔
730

731
void freeBlock(void* pParam) {
311,714,247✔
732
  SSDataBlock* pBlock = *(SSDataBlock**)pParam;
311,714,247✔
733
  blockDataDestroy(pBlock);
311,714,734✔
734
}
311,715,243✔
735

736
void freeSourceDataInfo(void* p) {
169,877,225✔
737
  SSourceDataInfo* pInfo = (SSourceDataInfo*)p;
169,877,225✔
738
  taosMemoryFreeClear(pInfo->decompBuf);
169,877,225✔
739
  taosMemoryFreeClear(pInfo->pRsp);
169,877,725✔
740

741
  pInfo->decompBufSize = 0;
169,877,225✔
742
}
169,877,712✔
743

744
void doDestroyExchangeOperatorInfo(void* param) {
109,897,479✔
745
  if (param == NULL) {
109,897,479✔
UNCOV
746
    return;
×
747
  }
748
  SExchangeInfo* pExInfo = (SExchangeInfo*)param;
109,897,479✔
749
  if (pExInfo->pFetchRpcHandles) {
109,897,479✔
750
    for (int32_t i = 0; i < pExInfo->pFetchRpcHandles->size; ++i) {
289,309,297✔
751
      int64_t* pRpcHandle = taosArrayGet(pExInfo->pFetchRpcHandles, i);
179,414,798✔
752
      if (*pRpcHandle > 0) {
179,415,154✔
753
        SDownstreamSourceNode* pSource = taosArrayGet(pExInfo->pSources, i);
7,594,546✔
754
        (void)asyncFreeConnById(pExInfo->pTransporter, *pRpcHandle);
7,594,546✔
755
      }
756
    }
757
    taosArrayDestroy(pExInfo->pFetchRpcHandles);
109,897,966✔
758
  }
759

760
  taosArrayDestroy(pExInfo->pSources);
109,899,209✔
761
  taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo);
109,898,098✔
762

763
  taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock);
109,896,355✔
764
  taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock);
109,896,881✔
765

766
  blockDataDestroy(pExInfo->pDummyBlock);
109,897,581✔
767
  tSimpleHashCleanup(pExInfo->pHashSources);
109,898,482✔
768

769
  int32_t code = tsem_destroy(&pExInfo->ready);
109,897,960✔
770
  if (code != TSDB_CODE_SUCCESS) {
109,897,979✔
UNCOV
771
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
772
  }
773
  taosMemoryFreeClear(pExInfo->pTaskId);
109,897,979✔
774

775
  taosMemoryFreeClear(param);
109,896,871✔
776
}
777

778
int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
226,321,809✔
779
  SFetchRspHandleWrapper* pWrapper = (SFetchRspHandleWrapper*)param;
226,321,809✔
780

781
  taosMemoryFreeClear(pMsg->pEpSet);
226,321,809✔
782
  SExchangeInfo* pExchangeInfo = taosAcquireRef(fetchObjRefPool, pWrapper->exchangeId);
226,348,706✔
783
  if (pExchangeInfo == NULL) {
226,354,465✔
784
    qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
34,444✔
785
    taosMemoryFree(pMsg->pData);
34,444✔
786
    return TSDB_CODE_SUCCESS;
34,444✔
787
  }
788

789
  int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
226,320,021✔
790
  if (pWrapper->seqId != currSeqId) {
226,317,460✔
791
    qDebug("rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pWrapper->seqId, pExchangeInfo, currSeqId);
×
792
    taosMemoryFree(pMsg->pData);
×
793
    code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
×
794
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
795
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
796
    }
UNCOV
797
    return TSDB_CODE_SUCCESS;
×
798
  }
799

800
  int32_t          index = pWrapper->sourceIndex;
226,292,582✔
801

802
  qDebug("%s exchange %p %dth source got rsp, code:%d, rsp:%p", pExchangeInfo->pTaskId, pExchangeInfo, index, code, pMsg->pData);
226,302,974✔
803

804
  int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, index);
226,316,538✔
805
  if (pRpcHandle != NULL) {
226,297,088✔
806
    int32_t ret = asyncFreeConnById(pExchangeInfo->pTransporter, *pRpcHandle);
226,308,555✔
807
    if (ret != 0) {
226,293,979✔
808
      qDebug("failed to free rpc handle, code:%s, %p", tstrerror(ret), pExchangeInfo);
9,872,299✔
809
    }
810
    *pRpcHandle = -1;
226,293,979✔
811
  }
812

813
  SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
226,253,154✔
814
  if (!pSourceDataInfo) {
226,309,336✔
UNCOV
815
    return terrno;
×
816
  }
817

818
  if (0 == code && NULL == pMsg->pData) {
226,309,336✔
819
    qError("invalid rsp msg, msgType:%d, len:%d", pMsg->msgType, pMsg->len);
×
UNCOV
820
    code = TSDB_CODE_QRY_INVALID_MSG;
×
821
  }
822

823
  taosWLockLatch(&pSourceDataInfo->lock);
226,341,101✔
824
  if (code == TSDB_CODE_SUCCESS) {
226,304,784✔
825
    pSourceDataInfo->seqId = pWrapper->seqId;
226,300,272✔
826
    pSourceDataInfo->pRsp = pMsg->pData;
226,281,513✔
827

828
    SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
226,260,982✔
829
    pRsp->numOfRows = htobe64(pRsp->numOfRows);
226,284,059✔
830
    pRsp->compLen = htonl(pRsp->compLen);
226,285,953✔
831
    pRsp->payloadLen = htonl(pRsp->payloadLen);
226,276,699✔
832
    pRsp->numOfCols = htonl(pRsp->numOfCols);
226,251,329✔
833
    pRsp->useconds = htobe64(pRsp->useconds);
226,256,280✔
834
    pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
226,186,732✔
835

836
    qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
226,283,047✔
837
           pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
838
  } else {
839
    taosMemoryFree(pMsg->pData);
4,512✔
840
    pSourceDataInfo->code = rpcCvtErrCode(code);
4,512✔
841
    if (pSourceDataInfo->code != code) {
4,512✔
UNCOV
842
      qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
×
843
             tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
844
    } else {
845
      qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
4,512✔
846
             pExchangeInfo);
847
    }
848
  }
849

850
  tmemory_barrier();
226,297,346✔
851
  pSourceDataInfo->status = EX_SOURCE_DATA_READY;
226,297,346✔
852
  taosWUnLockLatch(&pSourceDataInfo->lock);
226,292,265✔
853
  
854
  code = tsem_post(&pExchangeInfo->ready);
226,266,717✔
855
  if (code != TSDB_CODE_SUCCESS) {
226,314,813✔
856
    qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo);
×
UNCOV
857
    return code;
×
858
  }
859

860
  code = taosReleaseRef(fetchObjRefPool, pWrapper->exchangeId);
226,314,813✔
861
  if (code != TSDB_CODE_SUCCESS) {
226,332,409✔
UNCOV
862
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
863
  }
864
  return code;
226,328,694✔
865
}
866

867
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) {
252,961✔
868
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
252,961✔
869
  if (NULL == *ppRes) {
252,961✔
UNCOV
870
    return terrno;
×
871
  }
872

873
  STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
252,961✔
874
  if (NULL == pScan) {
252,961✔
875
    taosMemoryFreeClear(*ppRes);
×
UNCOV
876
    return terrno;
×
877
  }
878

879
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
252,961✔
880
  pScan->pUidList = taosArrayDup(pUidList, NULL);
252,961✔
881
  if (NULL == pScan->pUidList) {
252,961✔
882
    taosMemoryFree(pScan);
×
883
    taosMemoryFreeClear(*ppRes);
×
UNCOV
884
    return terrno;
×
885
  }
886
  pScan->dynType = DYN_TYPE_STB_JOIN;
252,961✔
887
  pScan->tableSeq = tableSeq;
252,961✔
888
  pScan->pOrgTbInfo = NULL;
252,961✔
889
  pScan->pBatchTbInfo = NULL;
252,961✔
890
  pScan->pTagList = NULL;
252,961✔
891
  pScan->isNewParam = false;
252,961✔
892
  pScan->window.skey = INT64_MAX;
252,961✔
893
  pScan->window.ekey = INT64_MIN;
252,961✔
894
  pScan->notifyToProcess = false;
252,961✔
895
  pScan->notifyTs = 0;
252,961✔
896

897
  (*ppRes)->opType = srcOpType;
252,961✔
898
  (*ppRes)->downstreamIdx = 0;
252,961✔
899
  (*ppRes)->value = pScan;
252,961✔
900
  (*ppRes)->pChildren = NULL;
252,961✔
901
  (*ppRes)->reUse = false;
252,961✔
902

903
  return TSDB_CODE_SUCCESS;
252,961✔
904
}
905

906
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type) {
29,883,117✔
907
  int32_t                  code = TSDB_CODE_SUCCESS;
29,883,117✔
908
  int32_t                  lino = 0;
29,883,117✔
909
  STableScanOperatorParam* pScan = NULL;
29,883,117✔
910

911
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
29,883,117✔
912
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
29,883,117✔
913

914
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
29,883,117✔
915
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
29,883,117✔
916

917
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
29,883,117✔
918
  if (pUidList) {
29,883,117✔
919
    pScan->pUidList = taosArrayDup(pUidList, NULL);
29,883,117✔
920
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
29,883,117✔
921
  } else {
UNCOV
922
    pScan->pUidList = NULL;
×
923
  }
924

925
  if (pMap) {
29,883,117✔
926
    pScan->pOrgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
28,521,738✔
927
    QUERY_CHECK_NULL(pScan->pOrgTbInfo, code, lino, _return, terrno);
28,521,738✔
928

929
    pScan->pOrgTbInfo->vgId = pMap->vgId;
28,521,738✔
930
    tstrncpy(pScan->pOrgTbInfo->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
28,521,738✔
931

932
    pScan->pOrgTbInfo->colMap = taosArrayDup(pMap->colMap, NULL);
28,521,738✔
933
    QUERY_CHECK_NULL(pScan->pOrgTbInfo->colMap, code, lino, _return, terrno);
28,521,738✔
934
  } else {
935
    pScan->pOrgTbInfo = NULL;
1,361,379✔
936
  }
937
  pScan->pTagList = NULL;
29,883,117✔
938
  pScan->pBatchTbInfo = NULL;
29,883,117✔
939

940

941
  pScan->dynType = type;
29,883,117✔
942
  pScan->tableSeq = tableSeq;
29,883,117✔
943
  pScan->window.skey = window->skey;
29,883,117✔
944
  pScan->window.ekey = window->ekey;
29,883,117✔
945
  pScan->isNewParam = isNewParam;
29,883,117✔
946
  pScan->notifyToProcess = false;
29,883,117✔
947
  pScan->notifyTs = 0;
29,883,117✔
948
  (*ppRes)->opType = srcOpType;
29,883,117✔
949
  (*ppRes)->downstreamIdx = 0;
29,883,117✔
950
  (*ppRes)->value = pScan;
29,883,117✔
951
  (*ppRes)->pChildren = NULL;
29,883,117✔
952
  (*ppRes)->reUse = false;
29,883,117✔
953

954
  return code;
29,883,117✔
955
_return:
×
956
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
957
  taosMemoryFreeClear(*ppRes);
×
958
  if (pScan) {
×
959
    taosArrayDestroy(pScan->pUidList);
×
960
    if (pScan->pOrgTbInfo) {
×
961
      taosArrayDestroy(pScan->pOrgTbInfo->colMap);
×
UNCOV
962
      taosMemoryFreeClear(pScan->pOrgTbInfo);
×
963
    }
UNCOV
964
    taosMemoryFree(pScan);
×
965
  }
UNCOV
966
  return code;
×
967
}
968

969
/**
970
  @brief build the table scan operator param for notify message
971
*/
972
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
221,246✔
973
                                          int32_t srcOpType, TSKEY notifyTs) {
974
  if (srcOpType != 0 && srcOpType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
221,246✔
975
    qWarn("%s, invalid srcOpType:%d", __func__, srcOpType);
×
UNCOV
976
    return TSDB_CODE_INVALID_PARA;
×
977
  }
978
  int32_t code = TSDB_CODE_SUCCESS;
221,246✔
979
  int32_t lino = 0;
221,246✔
980
  *ppRes = taosMemoryCalloc(1, sizeof(SOperatorParam));
221,246✔
981
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
221,246✔
982

983
  STableScanOperatorParam* pTsParam =
442,492✔
984
    taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
221,246✔
985
  QUERY_CHECK_NULL(pTsParam, code, lino, _return, terrno);
221,246✔
986

987
  pTsParam->paramType = NOTIFY_TYPE_SCAN_PARAM;
221,246✔
988
  pTsParam->notifyToProcess = true;
221,246✔
989
  pTsParam->notifyTs = notifyTs;
221,246✔
990

991
  (*ppRes)->opType = srcOpType != 0 ? srcOpType :
221,246✔
992
                                      QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
993
  (*ppRes)->downstreamIdx = 0;
221,246✔
994
  (*ppRes)->value = pTsParam;
221,246✔
995
  (*ppRes)->pChildren = NULL;
221,246✔
996
  /* param is not reusable when it is transferred by message */
997
  (*ppRes)->reUse = false;
221,246✔
998

999
_return:
221,246✔
1000
  if (TSDB_CODE_SUCCESS != code) {
221,246✔
UNCOV
1001
    qError("%s failed at %d, failed to build scan operator msg:%s",
×
1002
           __func__, lino, tstrerror(code));
1003
    taosMemoryFreeClear(*ppRes);
×
1004
    if (pTsParam) {
×
UNCOV
1005
      taosMemoryFree(pTsParam);
×
1006
    }
1007
  }
1008
  return code;
221,246✔
1009
}
1010

1011
int32_t buildTableScanOperatorParamBatchInfo(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam) {
5,738,820✔
1012
  int32_t                  code = TSDB_CODE_SUCCESS;
5,738,820✔
1013
  int32_t                  lino = 0;
5,738,820✔
1014
  STableScanOperatorParam* pScan = NULL;
5,738,820✔
1015

1016
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
5,738,820✔
1017
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
5,738,820✔
1018

1019
  pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam));
5,738,820✔
1020
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
5,738,820✔
1021

1022
  pScan->paramType = DYN_TYPE_SCAN_PARAM;
5,738,820✔
1023
  pScan->groupid = groupid;
5,738,820✔
1024
  if (pUidList) {
5,738,820✔
1025
    pScan->pUidList = taosArrayDup(pUidList, NULL);
5,738,820✔
1026
    QUERY_CHECK_NULL(pScan->pUidList, code, lino, _return, terrno);
5,738,820✔
1027
  } else {
UNCOV
1028
    pScan->pUidList = NULL;
×
1029
  }
1030
  pScan->pOrgTbInfo = NULL;
5,738,820✔
1031

1032
  if (pBatchMap) {
5,738,820✔
1033
    pScan->pBatchTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
5,738,820✔
1034
    QUERY_CHECK_NULL(pScan->pBatchTbInfo, code, lino, _return, terrno);
5,738,820✔
1035
    for (int32_t i = 0; i < taosArrayGetSize(pBatchMap); i++) {
15,102,596✔
1036
      SOrgTbInfo *pSrcInfo = taosArrayGet(pBatchMap, i);
9,363,776✔
1037
      SOrgTbInfo batchInfo = {0};
9,363,776✔
1038
      batchInfo.vgId = pSrcInfo->vgId;
9,363,776✔
1039
      tstrncpy(batchInfo.tbName, pSrcInfo->tbName, TSDB_TABLE_FNAME_LEN);
9,363,776✔
1040
      batchInfo.colMap = taosArrayDup(pSrcInfo->colMap, NULL);
9,363,776✔
1041
      QUERY_CHECK_NULL(batchInfo.colMap, code, lino, _return, terrno);
9,363,776✔
1042
      SOrgTbInfo *pDstInfo = taosArrayPush(pScan->pBatchTbInfo, &batchInfo);
9,363,776✔
1043
      QUERY_CHECK_NULL(pDstInfo, code, lino, _return, terrno);
9,363,776✔
1044
    }
1045
  } else {
UNCOV
1046
    pScan->pBatchTbInfo = NULL;
×
1047
  }
1048

1049
  if (pTagList) {
5,738,820✔
1050
    pScan->pTagList = taosArrayInit(1, sizeof(STagVal));
2,718,408✔
1051
    QUERY_CHECK_NULL(pScan->pTagList, code, lino, _return, terrno);
2,718,408✔
1052

1053
    for (int32_t i = 0; i < taosArrayGetSize(pTagList); ++i) {
18,133,752✔
1054
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pTagList, i);
15,415,344✔
1055
      STagVal  dstTag;
15,415,344✔
1056
      dstTag.type = pSrcTag->type;
15,415,344✔
1057
      dstTag.cid = pSrcTag->cid;
15,415,344✔
1058
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
15,415,344✔
1059
        dstTag.nData = pSrcTag->nData;
6,762,192✔
1060
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
6,762,192✔
1061
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
6,762,192✔
1062
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
6,762,192✔
1063
      } else {
1064
        dstTag.i64 = pSrcTag->i64;
8,653,152✔
1065
      }
1066

1067
      QUERY_CHECK_NULL(taosArrayPush(pScan->pTagList, &dstTag), code, lino, _return, terrno);
30,830,688✔
1068
    }
1069
  } else {
1070
    pScan->pTagList = NULL;
3,020,412✔
1071
  }
1072

1073

1074
  pScan->dynType = DYN_TYPE_VSTB_BATCH_SCAN;
5,738,820✔
1075
  pScan->tableSeq = tableSeq;
5,738,820✔
1076
  pScan->window.skey = window->skey;
5,738,820✔
1077
  pScan->window.ekey = window->ekey;
5,738,820✔
1078
  pScan->isNewParam = isNewParam;
5,738,820✔
1079
  pScan->notifyToProcess = false;
5,738,820✔
1080
  pScan->notifyTs = 0;
5,738,820✔
1081
  (*ppRes)->opType = srcOpType;
5,738,820✔
1082
  (*ppRes)->downstreamIdx = 0;
5,738,820✔
1083
  (*ppRes)->value = pScan;
5,738,820✔
1084
  (*ppRes)->pChildren = NULL;
5,738,820✔
1085
  (*ppRes)->reUse = false;
5,738,820✔
1086

1087
  return code;
5,738,820✔
1088
_return:
×
1089
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1090
  taosMemoryFreeClear(*ppRes);
×
1091
  if (pScan) {
×
1092
    taosArrayDestroy(pScan->pUidList);
×
1093
    if (pScan->pBatchTbInfo) {
×
UNCOV
1094
      taosArrayDestroy(pScan->pBatchTbInfo);
×
1095
    }
UNCOV
1096
    taosMemoryFree(pScan);
×
1097
  }
UNCOV
1098
  return code;
×
1099
}
1100

1101
int32_t buildAggOperatorParam(SOperatorParam** ppRes, uint64_t groupid, SArray* pUidList, int32_t srcOpType, SArray *pBatchMap, SArray *pTagList, bool tableSeq, STimeWindow *window, bool isNewParam, EExchangeSourceType type) {
5,738,820✔
1102
  int32_t                  code = TSDB_CODE_SUCCESS;
5,738,820✔
1103
  int32_t                  lino = 0;
5,738,820✔
1104
  SOperatorParam*          pParam = NULL;
5,738,820✔
1105

1106
  switch (type) {
5,738,820✔
1107
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
5,532,548✔
1108
      pParam = taosMemoryMalloc(sizeof(SOperatorParam));
5,532,548✔
1109
      QUERY_CHECK_NULL(pParam, code, lino, _return, terrno);
5,532,548✔
1110

1111
      pParam->pChildren = taosArrayInit(1, POINTER_BYTES);
5,532,548✔
1112
      QUERY_CHECK_NULL(pParam->pChildren, code, lino, _return, terrno);
5,532,548✔
1113

1114
      SOperatorParam* pTableScanParam = NULL;
5,532,548✔
1115
      code = buildTableScanOperatorParamBatchInfo(&pTableScanParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
5,532,548✔
1116
      QUERY_CHECK_CODE(code, lino, _return);
5,532,548✔
1117

1118
      QUERY_CHECK_NULL(taosArrayPush(pParam->pChildren, &pTableScanParam), code, lino, _return, terrno);
11,065,096✔
1119

1120
      pParam->opType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
5,532,548✔
1121
      pParam->downstreamIdx = 0;
5,532,548✔
1122
      pParam->value = NULL;
5,532,548✔
1123
      pParam->reUse = false;
5,532,548✔
1124

1125
      break;
5,532,548✔
1126
    }
1127
    case EX_SRC_TYPE_VSTB_WIN_SCAN: {
206,272✔
1128
      code = buildTableScanOperatorParamBatchInfo(&pParam, groupid, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, pBatchMap, pTagList, tableSeq, window, isNewParam);
206,272✔
1129
      QUERY_CHECK_CODE(code, lino, _return);
206,272✔
1130
      break;
206,272✔
1131
    }
1132
    default: {
×
1133
      code = TSDB_CODE_INVALID_PARA;
×
1134
      qError("%s failed at %d, invalid exchange source type:%d", __FUNCTION__, lino, type);
×
UNCOV
1135
      goto _return;
×
1136
    }
1137
  }
1138

1139
  *ppRes = pParam;
5,738,820✔
1140
  return code;
5,738,820✔
1141

1142
_return:
×
1143
  freeOperatorParam(pParam, OP_GET_PARAM);
×
1144
  qError("%s failed at %d, failed to build agg scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
1145
  return code;
×
1146
}
1147

1148
int32_t buildTagScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) {
4,362,046✔
1149
  int32_t                  code = TSDB_CODE_SUCCESS;
4,362,046✔
1150
  int32_t                  lino = 0;
4,362,046✔
1151
  STagScanOperatorParam*   pScan = NULL;
4,362,046✔
1152

1153
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,362,046✔
1154
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
4,362,046✔
1155

1156
  pScan = taosMemoryMalloc(sizeof(STagScanOperatorParam));
4,362,046✔
1157
  QUERY_CHECK_NULL(pScan, code, lino, _return, terrno);
4,362,046✔
1158
  pScan->vcUid = *(tb_uid_t*)taosArrayGet(pUidList, 0);
4,362,046✔
1159

1160
  (*ppRes)->opType = srcOpType;
4,362,046✔
1161
  (*ppRes)->downstreamIdx = 0;
4,362,046✔
1162
  (*ppRes)->value = pScan;
4,362,046✔
1163
  (*ppRes)->pChildren = NULL;
4,362,046✔
1164
  (*ppRes)->reUse = false;
4,362,046✔
1165

1166
  return code;
4,362,046✔
1167
_return:
×
1168
  qError("%s failed at %d, failed to build scan operator msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1169
  taosMemoryFreeClear(*ppRes);
×
1170
  if (pScan) {
×
UNCOV
1171
    taosMemoryFree(pScan);
×
1172
  }
UNCOV
1173
  return code;
×
1174
}
1175

1176
static int32_t getCurrentWinCalcTimeRange(SStreamRuntimeFuncInfo* pRuntimeInfo, STimeWindow* pTimeRange) {
4,506,123✔
1177
  if (!pRuntimeInfo || !pTimeRange) {
4,506,123✔
UNCOV
1178
    return TSDB_CODE_INTERNAL_ERROR;
×
1179
  }
1180

1181
  SSTriggerCalcParam* pParam = taosArrayGet(pRuntimeInfo->pStreamPesudoFuncVals, pRuntimeInfo->curIdx);
4,506,347✔
1182
  if (!pParam) {
4,506,347✔
UNCOV
1183
    return TSDB_CODE_INTERNAL_ERROR;
×
1184
  }
1185

1186
  switch (pRuntimeInfo->triggerType) {
4,506,347✔
1187
    case STREAM_TRIGGER_SLIDING:
3,393,967✔
1188
      // Unable to distinguish whether there is an interval, all use wstart/wend
1189
      // and the results are equal to those of prevTs/currentTs, using the same address of union.
1190
      pTimeRange->skey = pParam->wstart;  // is equal to wstart
3,393,967✔
1191
      pTimeRange->ekey = pParam->wend;    // is equal to wend
3,393,967✔
1192
      break;
3,393,967✔
1193
    case STREAM_TRIGGER_PERIOD:
235,854✔
1194
      pTimeRange->skey = pParam->prevLocalTime;
235,854✔
1195
      pTimeRange->ekey = pParam->triggerTime;
235,854✔
1196
      break;
235,854✔
1197
    default:
876,526✔
1198
      pTimeRange->skey = pParam->wstart;
876,526✔
1199
      pTimeRange->ekey = pParam->wend;
876,526✔
1200
      break;
876,526✔
1201
  }
1202

1203
  return TSDB_CODE_SUCCESS;
4,506,347✔
1204
}
1205

1206
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
238,725,107✔
1207
  int32_t          code = TSDB_CODE_SUCCESS;
238,725,107✔
1208
  int32_t          lino = 0;
238,725,107✔
1209
  SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
238,725,107✔
1210
  if (!pDataInfo) {
238,725,331✔
UNCOV
1211
    return terrno;
×
1212
  }
1213

1214
  if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
238,725,331✔
1215
    return TSDB_CODE_SUCCESS;
12,214,599✔
1216
  }
1217

1218
  pDataInfo->status = EX_SOURCE_DATA_STARTED;
226,511,700✔
1219
  SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index);
226,511,344✔
1220
  if (!pSource) {
226,509,212✔
UNCOV
1221
    return terrno;
×
1222
  }
1223

1224
  pDataInfo->startTime = taosGetTimestampUs();
226,509,681✔
1225
  size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
226,510,549✔
1226

1227
  SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
226,509,347✔
1228
  QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
226,511,089✔
1229
  pWrapper->exchangeId = pExchangeInfo->self;
226,511,089✔
1230
  pWrapper->sourceIndex = sourceIndex;
226,515,385✔
1231
  pWrapper->seqId = pExchangeInfo->seqId;
226,512,147✔
1232

1233
  if (pSource->localExec) {
226,508,743✔
UNCOV
1234
    SDataBuf pBuf = {0};
×
1235
    int32_t  code =
UNCOV
1236
      (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId,
×
1237
                                  pTaskInfo->id.queryId, pSource->clientId,
1238
                                  pSource->taskId, 0, pSource->execId,
1239
                                  &pBuf.pData,
1240
                                  pTaskInfo->localFetch.explainRes);
1241
    code = loadRemoteDataCallback(pWrapper, &pBuf, code);
×
1242
    taosMemoryFree(pWrapper);
×
UNCOV
1243
    QUERY_CHECK_CODE(code, lino, _end);
×
1244
  } else {
1245
    bool needStreamPesudoFuncVals = true;
226,507,523✔
1246
    SResFetchReq req = {0};
226,507,523✔
1247
    req.header.vgId = pSource->addr.nodeId;
226,509,842✔
1248
    req.sId = pSource->sId;
226,510,040✔
1249
    req.clientId = pSource->clientId;
226,510,750✔
1250
    req.taskId = pSource->taskId;
226,508,929✔
1251
    req.queryId = pTaskInfo->id.queryId;
226,507,249✔
1252
    req.execId = pSource->execId;
226,507,538✔
1253
    if (pTaskInfo->pStreamRuntimeInfo) {
226,505,296✔
1254
      req.dynTbname = pExchangeInfo->dynTbname;
8,470,525✔
1255
      req.execId = pTaskInfo->pStreamRuntimeInfo->execId;
8,470,525✔
1256
      req.pStRtFuncInfo = &pTaskInfo->pStreamRuntimeInfo->funcInfo;
8,470,301✔
1257

1258
      if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_RUNNER) {
8,470,525✔
1259
        qDebug("%s stream fetch from runner, execId:%d, %p", GET_TASKID(pTaskInfo), req.execId, pTaskInfo->pStreamRuntimeInfo);
135,103✔
1260
      } else if (pSource->fetchMsgType == TDMT_STREAM_FETCH_FROM_CACHE) {
8,334,974✔
1261
        code = getCurrentWinCalcTimeRange(req.pStRtFuncInfo, &req.pStRtFuncInfo->curWindow);
4,506,347✔
1262
        QUERY_CHECK_CODE(code, lino, _end);
4,506,347✔
1263
        needStreamPesudoFuncVals = false;
4,506,347✔
1264
        qDebug("%s stream fetch from cache, execId:%d, curWinIdx:%d, time range:[%" PRId64 ", %" PRId64 "]",
4,506,347✔
1265
               GET_TASKID(pTaskInfo), req.execId, req.pStRtFuncInfo->curIdx, req.pStRtFuncInfo->curWindow.skey,
1266
               req.pStRtFuncInfo->curWindow.ekey);
1267
      }
1268
      if (!pDataInfo->fetchSent) {
8,470,525✔
1269
        req.reset = pDataInfo->fetchSent = true;
6,650,311✔
1270
      }
1271
    }
1272

1273
    switch (pDataInfo->type) {
226,512,653✔
1274
      case EX_SRC_TYPE_VSTB_SCAN: {
28,521,738✔
1275
        code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->orgTbInfo, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, DYN_TYPE_VSTB_SINGLE_SCAN);
28,521,738✔
1276
        taosArrayDestroy(pDataInfo->orgTbInfo->colMap);
28,521,738✔
1277
        taosMemoryFreeClear(pDataInfo->orgTbInfo);
28,521,738✔
1278
        taosArrayDestroy(pDataInfo->pSrcUidList);
28,521,738✔
1279
        pDataInfo->pSrcUidList = NULL;
28,521,738✔
1280
        if (TSDB_CODE_SUCCESS != code) {
28,521,738✔
1281
          pTaskInfo->code = code;
×
1282
          taosMemoryFree(pWrapper);
×
UNCOV
1283
          return pTaskInfo->code;
×
1284
        }
1285
        break;
28,521,738✔
1286
      }
1287
      case EX_SRC_TYPE_VTB_WIN_SCAN: {
2,887,286✔
1288
        if (pDataInfo->pSrcUidList) {
2,887,286✔
1289
          code = buildTableScanOperatorParamEx(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, NULL, pDataInfo->tableSeq, &pDataInfo->window, false, DYN_TYPE_VSTB_WIN_SCAN);
1,361,379✔
1290
          taosArrayDestroy(pDataInfo->pSrcUidList);
1,361,379✔
1291
          pDataInfo->pSrcUidList = NULL;
1,361,379✔
1292
          if (TSDB_CODE_SUCCESS != code) {
1,361,379✔
1293
            pTaskInfo->code = code;
×
1294
            taosMemoryFree(pWrapper);
×
UNCOV
1295
            return pTaskInfo->code;
×
1296
          }
1297
        }
1298
        break;
2,887,286✔
1299
      }
1300
      case EX_SRC_TYPE_VSTB_TAG_SCAN: {
4,362,046✔
1301
        code = buildTagScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType);
4,362,046✔
1302
        taosArrayDestroy(pDataInfo->pSrcUidList);
4,362,046✔
1303
        pDataInfo->pSrcUidList = NULL;
4,362,046✔
1304
        if (TSDB_CODE_SUCCESS != code) {
4,362,046✔
1305
          pTaskInfo->code = code;
×
1306
          taosMemoryFree(pWrapper);
×
UNCOV
1307
          return pTaskInfo->code;
×
1308
        }
1309
        break;
4,362,046✔
1310
      }
1311
      case EX_SRC_TYPE_VSTB_WIN_SCAN:
6,018,344✔
1312
      case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1313
        if (pDataInfo->batchOrgTbInfo) {
6,018,344✔
1314
          code = buildAggOperatorParam(&req.pOpParam, pDataInfo->groupid, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->batchOrgTbInfo, pDataInfo->tagList, pDataInfo->tableSeq, &pDataInfo->window, pDataInfo->isNewParam, pDataInfo->type);
5,738,820✔
1315
          if (pDataInfo->batchOrgTbInfo) {
5,738,820✔
1316
            for (int32_t i = 0; i < taosArrayGetSize(pDataInfo->batchOrgTbInfo); ++i) {
15,102,596✔
1317
              SOrgTbInfo* pColMap = taosArrayGet(pDataInfo->batchOrgTbInfo, i);
9,363,776✔
1318
              if (pColMap) {
9,363,776✔
1319
                taosArrayDestroy(pColMap->colMap);
9,363,776✔
1320
              }
1321
            }
1322
            taosArrayDestroy(pDataInfo->batchOrgTbInfo);
5,738,820✔
1323
            pDataInfo->batchOrgTbInfo = NULL;
5,738,820✔
1324
          }
1325
          if (pDataInfo->tagList) {
5,738,820✔
1326
            taosArrayDestroyEx(pDataInfo->tagList, destroyTagVal);
2,718,408✔
1327
            pDataInfo->tagList = NULL;
2,718,408✔
1328
          }
1329
          if (pDataInfo->pSrcUidList) {
5,738,820✔
1330
            taosArrayDestroy(pDataInfo->pSrcUidList);
5,738,820✔
1331
            pDataInfo->pSrcUidList = NULL;
5,738,820✔
1332
          }
1333

1334
          if (TSDB_CODE_SUCCESS != code) {
5,738,820✔
1335
            pTaskInfo->code = code;
×
1336
            taosMemoryFree(pWrapper);
×
UNCOV
1337
            return pTaskInfo->code;
×
1338
          }
1339
        }
1340
        break;
6,018,344✔
1341
      }
1342
      case EX_SRC_TYPE_STB_JOIN_SCAN:
184,717,562✔
1343
      default: {
1344
        if (pDataInfo->pSrcUidList) {
184,717,562✔
1345
          code = buildTableScanOperatorParam(&req.pOpParam,
241,199✔
1346
                                             pDataInfo->pSrcUidList,
1347
                                             pDataInfo->srcOpType,
1348
                                             pDataInfo->tableSeq);
241,199✔
1349
          /* source uid list can be reused in vnode size, so only use once */
1350
          taosArrayDestroy(pDataInfo->pSrcUidList);
241,199✔
1351
          pDataInfo->pSrcUidList = NULL;
241,199✔
1352
          if (TSDB_CODE_SUCCESS != code) {
241,199✔
1353
            pTaskInfo->code = code;
×
1354
            taosMemoryFree(pWrapper);
×
UNCOV
1355
            return pTaskInfo->code;
×
1356
          }
1357
        }
1358
        if (pExchangeInfo->notifyToSend) {
184,723,470✔
1359
          if (NULL == req.pOpParam) {
221,246✔
1360
            code = buildTableScanOperatorParamNotify(&req.pOpParam,
221,246✔
1361
                                                     pDataInfo->srcOpType,
1362
                                                     pExchangeInfo->notifyTs);
1363
            if (TSDB_CODE_SUCCESS != code) {
221,246✔
1364
              pTaskInfo->code = code;
×
1365
              taosMemoryFree(pWrapper);
×
UNCOV
1366
              return pTaskInfo->code;
×
1367
            }
1368
          } else {
1369
            /**
1370
              Currently don't support use the same param for multiple times!
1371
            */
UNCOV
1372
            qError("%s, %s failed, currently don't support use the same param "
×
1373
                   "for multiple times!", GET_TASKID(pTaskInfo), __func__);
1374
            pTaskInfo->code = TSDB_CODE_INVALID_PARA;
×
1375
            taosMemoryFree(pWrapper);
×
UNCOV
1376
            return pTaskInfo->code;
×
1377
          }
1378
          pExchangeInfo->notifyToSend = false;
221,246✔
1379
        }
1380
        break;
184,721,628✔
1381
      }
1382
    }
1383

1384
    int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req, needStreamPesudoFuncVals);
226,511,042✔
1385
    if (msgSize < 0) {
226,506,995✔
1386
      pTaskInfo->code = msgSize;
×
1387
      taosMemoryFree(pWrapper);
×
1388
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
UNCOV
1389
      return pTaskInfo->code;
×
1390
    }
1391

1392
    void* msg = taosMemoryCalloc(1, msgSize);
226,506,995✔
1393
    if (NULL == msg) {
226,505,422✔
1394
      pTaskInfo->code = terrno;
×
1395
      taosMemoryFree(pWrapper);
×
1396
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
UNCOV
1397
      return pTaskInfo->code;
×
1398
    }
1399

1400
    msgSize = tSerializeSResFetchReq(msg, msgSize, &req, needStreamPesudoFuncVals);
226,505,422✔
1401
    if (msgSize < 0) {
226,507,166✔
1402
      pTaskInfo->code = msgSize;
×
1403
      taosMemoryFree(pWrapper);
×
1404
      taosMemoryFree(msg);
×
1405
      freeOperatorParam(req.pOpParam, OP_GET_PARAM);
×
UNCOV
1406
      return pTaskInfo->code;
×
1407
    }
1408

1409
    freeOperatorParam(req.pOpParam, OP_GET_PARAM);
226,507,166✔
1410

1411
    qDebug("%s build fetch msg and send to vgId:%d, ep:%s, clientId:0x%" PRIx64 " taskId:0x%" PRIx64
226,507,955✔
1412
           ", seqId:%" PRId64 ", execId:%d, %p, %d/%" PRIzu,
1413
           GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->clientId,
1414
           pSource->taskId, pExchangeInfo->seqId, pSource->execId, pExchangeInfo, sourceIndex, totalSources);
1415

1416
    // send the fetch remote task result reques
1417
    SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
226,511,891✔
1418
    if (NULL == pMsgSendInfo) {
226,508,242✔
1419
      taosMemoryFreeClear(msg);
×
1420
      taosMemoryFree(pWrapper);
×
1421
      qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
×
1422
      pTaskInfo->code = terrno;
×
UNCOV
1423
      return pTaskInfo->code;
×
1424
    }
1425

1426
    pMsgSendInfo->param = pWrapper;
226,508,242✔
1427
    pMsgSendInfo->paramFreeFp = taosAutoMemoryFree;
226,512,976✔
1428
    pMsgSendInfo->msgInfo.pData = msg;
226,512,962✔
1429
    pMsgSendInfo->msgInfo.len = msgSize;
226,514,666✔
1430
    pMsgSendInfo->msgType = pSource->fetchMsgType;
226,510,534✔
1431
    pMsgSendInfo->fp = loadRemoteDataCallback;
226,514,168✔
1432
    pMsgSendInfo->requestId = pTaskInfo->id.queryId;
226,514,168✔
1433

1434
    int64_t transporterId = 0;
226,511,853✔
1435
    void* poolHandle = NULL;
226,509,403✔
1436
    code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
226,509,403✔
1437
    QUERY_CHECK_CODE(code, lino, _end);
226,515,029✔
1438
    int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
226,515,029✔
1439
    *pRpcHandle = transporterId;
226,515,385✔
1440
  }
1441

1442
_end:
226,515,029✔
1443
  if (code != TSDB_CODE_SUCCESS) {
226,515,029✔
UNCOV
1444
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1445
  }
1446
  return code;
226,514,542✔
1447
}
1448

1449
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
176,241,182✔
1450
                          SOperatorInfo* pOperator) {
1451
  pInfo->totalRows += numOfRows;
176,241,182✔
1452
  pInfo->totalSize += dataLen;
176,241,182✔
1453
  pInfo->totalElapsed += (taosGetTimestampUs() - startTs);
176,240,819✔
1454
  pOperator->resultInfo.totalRows += numOfRows;
176,241,182✔
1455
}
176,241,182✔
1456

1457
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan) {
542,674,687✔
1458
  int32_t      code = TSDB_CODE_SUCCESS;
542,674,687✔
1459
  int32_t      lino = 0;
542,674,687✔
1460
  SSDataBlock* pBlock = NULL;
542,674,687✔
1461
  if (isVstbScan) {
542,675,554✔
1462
    blockDataCleanup(pRes);
20,122,637✔
1463
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
20,122,637✔
1464
    QUERY_CHECK_CODE(code, lino, _end);
20,123,155✔
1465
  }
1466
  if (pColList == NULL) {  // data from other sources
542,676,072✔
1467
    blockDataCleanup(pRes);
537,990,703✔
1468
    code = blockDecodeInternal(pRes, pData, (const char**)pNextStart);
537,989,482✔
1469
    QUERY_CHECK_CODE(code, lino, _end);
537,991,196✔
1470
  } else {  // extract data according to pColList
1471
    char* pStart = pData;
4,685,369✔
1472

1473
    int32_t numOfCols = htonl(*(int32_t*)pStart);
4,685,369✔
1474
    pStart += sizeof(int32_t);
4,685,369✔
1475

1476
    // todo refactor:extract method
1477
    SSysTableSchema* pSchema = (SSysTableSchema*)pStart;
4,685,369✔
1478
    for (int32_t i = 0; i < numOfCols; ++i) {
66,197,162✔
1479
      SSysTableSchema* p = (SSysTableSchema*)pStart;
61,511,793✔
1480

1481
      p->colId = htons(p->colId);
61,511,793✔
1482
      p->bytes = htonl(p->bytes);
61,511,793✔
1483
      pStart += sizeof(SSysTableSchema);
61,511,793✔
1484
    }
1485

1486
    pBlock = NULL;
4,685,369✔
1487
    code = createDataBlock(&pBlock);
4,685,369✔
1488
    QUERY_CHECK_CODE(code, lino, _end);
4,685,369✔
1489

1490
    for (int32_t i = 0; i < numOfCols; ++i) {
66,196,792✔
1491
      SColumnInfoData idata = createColumnInfoData(pSchema[i].type, pSchema[i].bytes, pSchema[i].colId);
61,511,423✔
1492
      code = blockDataAppendColInfo(pBlock, &idata);
61,511,423✔
1493
      QUERY_CHECK_CODE(code, lino, _end);
61,511,423✔
1494
    }
1495

1496
    code = blockDecodeInternal(pBlock, pStart, NULL);
4,685,369✔
1497
    QUERY_CHECK_CODE(code, lino, _end);
4,685,369✔
1498

1499
    code = blockDataEnsureCapacity(pRes, pBlock->info.rows);
4,685,369✔
1500
    QUERY_CHECK_CODE(code, lino, _end);
4,685,369✔
1501

1502
    // data from mnode
1503
    pRes->info.dataLoad = 1;
4,685,369✔
1504
    pRes->info.rows = pBlock->info.rows;
4,685,369✔
1505
    pRes->info.scanFlag = MAIN_SCAN;
4,685,369✔
1506
    code = relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
4,685,369✔
1507
    QUERY_CHECK_CODE(code, lino, _end);
4,684,999✔
1508

1509
    blockDataDestroy(pBlock);
4,684,999✔
1510
    pBlock = NULL;
4,684,870✔
1511
  }
1512

1513
_end:
542,676,066✔
1514
  if (code != TSDB_CODE_SUCCESS) {
542,675,618✔
1515
    blockDataDestroy(pBlock);
×
UNCOV
1516
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1517
  }
1518
  return code;
542,675,618✔
1519
}
1520

1521
void setAllSourcesCompleted(SOperatorInfo* pOperator) {
94,078,751✔
1522
  SExchangeInfo* pExchangeInfo = pOperator->info;
94,078,751✔
1523
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
94,079,249✔
1524

1525
  SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
94,078,751✔
1526
  size_t               totalSources = taosArrayGetSize(pExchangeInfo->pSources);
94,079,026✔
1527
  qDebug("%s all %" PRIzu " sources are exhausted, total rows: %" PRIu64 ", %.2f Kb, elapsed:%.2f ms",
94,079,026✔
1528
         GET_TASKID(pTaskInfo), totalSources, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0,
1529
         pLoadInfo->totalElapsed / 1000.0);
1530

1531
  setOperatorCompleted(pOperator);
94,079,026✔
1532
}
94,078,395✔
1533

1534
int32_t getCompletedSources(const SArray* pArray, int32_t* pRes) {
300,026,498✔
1535
  int32_t code = TSDB_CODE_SUCCESS;
300,026,498✔
1536
  int32_t lino = 0;
300,026,498✔
1537
  size_t  total = taosArrayGetSize(pArray);
300,026,498✔
1538

1539
  int32_t completed = 0;
300,028,545✔
1540
  for (int32_t k = 0; k < total; ++k) {
910,244,032✔
1541
    SSourceDataInfo* p = taosArrayGet(pArray, k);
610,214,800✔
1542
    QUERY_CHECK_NULL(p, code, lino, _end, terrno);
610,216,323✔
1543
    if (p->status == EX_SOURCE_DATA_EXHAUSTED) {
610,216,323✔
1544
      qDebug("source %d is completed, info:%p %p", k, pArray, p);
298,658,540✔
1545
      completed += 1;
298,658,540✔
1546
    }
1547
  }
1548

1549
  *pRes = completed;
300,029,232✔
1550
_end:
300,028,684✔
1551
  if (code != TSDB_CODE_SUCCESS) {
300,028,684✔
UNCOV
1552
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1553
  }
1554
  return code;
300,028,760✔
1555
}
1556

1557
int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
112,364,536✔
1558
  SExchangeInfo* pExchangeInfo = pOperator->info;
112,364,536✔
1559
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
112,365,390✔
1560

1561
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo);
112,364,049✔
1562
  int64_t startTs = taosGetTimestampUs();
112,362,802✔
1563

1564
  // Asynchronously send all fetch requests to all sources.
1565
  for (int32_t i = 0; i < totalSources; ++i) {
294,952,946✔
1566
    int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
182,587,099✔
1567
    if (code != TSDB_CODE_SUCCESS) {
182,590,144✔
1568
      pTaskInfo->code = code;
×
UNCOV
1569
      return code;
×
1570
    }
1571
  }
1572

1573
  int64_t endTs = taosGetTimestampUs();
112,367,450✔
1574
  qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%.2fms", GET_TASKID(pTaskInfo),
112,367,450✔
1575
         totalSources, (endTs - startTs) / 1000.0);
1576

1577
  pOperator->status = OP_RES_TO_RETURN;
112,367,450✔
1578
  pOperator->cost.openCost = taosGetTimestampUs() - startTs;
112,366,120✔
1579
  if (isTaskKilled(pTaskInfo)) {
112,367,450✔
UNCOV
1580
    T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1581
  }
1582

1583
  return TSDB_CODE_SUCCESS;
112,367,450✔
1584
}
1585

1586
/**
1587
  @brief store STEP DONE notification info
1588
*/
1589
void storeNotifyInfo(SOperatorInfo* pOperator) {
3,999,954✔
1590
  SExchangeInfo*  pExchangeInfo = pOperator->info;
3,999,954✔
1591
  SExecTaskInfo*  pTaskInfo = pOperator->pTaskInfo;
3,999,954✔
1592
  SOperatorParam* pGetParam = pOperator->pOperatorGetParam;
3,999,954✔
1593

1594
  SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pGetParam->value;
3,999,954✔
1595
  if (!pParam->multiParams) {
3,999,954✔
1596
    SExchangeOperatorBasicParam* pBasic = &pParam->basic;
3,999,954✔
1597
    if (pBasic->paramType != NOTIFY_TYPE_EXCHANGE_PARAM) {
3,999,954✔
UNCOV
1598
      qWarn("%s, %s found invalid exchange operator param type %d",
×
1599
             GET_TASKID(pTaskInfo), __func__, pBasic->paramType);
UNCOV
1600
      return;
×
1601
    }
1602

1603
    pExchangeInfo->notifyToSend = true;
3,999,954✔
1604
    pExchangeInfo->notifyTs = pBasic->notifyTs;
3,999,954✔
1605
  } else {
UNCOV
1606
    qWarn("%s, %s found multi params are not supported for notify msg",
×
1607
           GET_TASKID(pTaskInfo), __func__);
1608
  }
1609
}
1610

1611
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
171,555,331✔
1612
  int32_t            code = TSDB_CODE_SUCCESS;
171,555,331✔
1613
  int32_t            lino = 0;
171,555,331✔
1614
  SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
171,555,331✔
1615
  SSDataBlock*       pb = NULL;
171,555,331✔
1616

1617
  char* pNextStart = pRetrieveRsp->data;
171,555,331✔
1618
  char* pStart = pNextStart;
171,555,813✔
1619

1620
  int32_t index = 0;
171,555,813✔
1621

1622
  if (pRetrieveRsp->compressed) {  // decompress the data
171,555,813✔
1623
    if (pDataInfo->decompBuf == NULL) {
×
1624
      pDataInfo->decompBuf = taosMemoryMalloc(pRetrieveRsp->payloadLen);
×
1625
      QUERY_CHECK_NULL(pDataInfo->decompBuf, code, lino, _end, terrno);
×
UNCOV
1626
      pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1627
    } else {
1628
      if (pDataInfo->decompBufSize < pRetrieveRsp->payloadLen) {
×
1629
        char* p = taosMemoryRealloc(pDataInfo->decompBuf, pRetrieveRsp->payloadLen);
×
1630
        QUERY_CHECK_NULL(p, code, lino, _end, terrno);
×
1631
        if (p != NULL) {
×
1632
          pDataInfo->decompBuf = p;
×
UNCOV
1633
          pDataInfo->decompBufSize = pRetrieveRsp->payloadLen;
×
1634
        }
1635
      }
1636
    }
1637
  }
1638

1639
  while (index++ < pRetrieveRsp->numOfBlocks) {
709,546,828✔
1640
    pStart = pNextStart;
537,991,471✔
1641

1642
    if (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN) {
537,991,471✔
1643
      pb = taosMemoryCalloc(1, sizeof(SSDataBlock));
20,122,637✔
1644
      QUERY_CHECK_NULL(pb, code, lino, _end, terrno);
20,122,637✔
1645
    } else if (taosArrayGetSize(pExchangeInfo->pRecycledBlocks) > 0) {
517,868,487✔
1646
      pb = *(SSDataBlock**)taosArrayPop(pExchangeInfo->pRecycledBlocks);
226,275,963✔
1647
      blockDataCleanup(pb);
226,276,187✔
1648
    } else {
1649
      code = createOneDataBlock(pExchangeInfo->pDummyBlock, false, &pb);
291,591,068✔
1650
      QUERY_CHECK_NULL(pb, code, lino, _end, code);
291,592,820✔
1651
    }
1652

1653
    int32_t compLen = *(int32_t*)pStart;
537,991,196✔
1654
    pStart += sizeof(int32_t);
537,990,250✔
1655

1656
    int32_t rawLen = *(int32_t*)pStart;
537,990,678✔
1657
    pStart += sizeof(int32_t);
537,991,196✔
1658
    QUERY_CHECK_CONDITION((compLen <= rawLen && compLen != 0), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
537,991,420✔
1659

1660
    pNextStart = pStart + compLen;
537,991,420✔
1661
    if (pRetrieveRsp->compressed && (compLen < rawLen)) {
537,990,250✔
1662
      int32_t t = tsDecompressString(pStart, compLen, 1, pDataInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
1663
      QUERY_CHECK_CONDITION((t == rawLen), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
UNCOV
1664
      pStart = pDataInfo->decompBuf;
×
1665
    }
1666

1667
    code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart, (pDataInfo->type == EX_SRC_TYPE_VSTB_SCAN));
537,990,902✔
1668
    if (code != 0) {
537,990,160✔
1669
      taosMemoryFreeClear(pDataInfo->pRsp);
×
UNCOV
1670
      goto _end;
×
1671
    }
1672

1673
    void* tmp = taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
537,990,160✔
1674
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
537,990,635✔
1675
    qDebug("%dth block added to resultBlockList, rows:%" PRId64, index, pb->info.rows);
537,990,635✔
1676
    pb = NULL;
537,991,497✔
1677
  }
1678

1679
_end:
171,555,589✔
1680
  if (code != TSDB_CODE_SUCCESS) {
171,555,589✔
1681
    blockDataDestroy(pb);
×
UNCOV
1682
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1683
  }
1684
  return code;
171,555,589✔
1685
}
1686

1687
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
33,030,581✔
1688
  SExchangeInfo* pExchangeInfo = pOperator->info;
33,030,581✔
1689
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
33,030,581✔
1690

1691
  int32_t code = 0;
33,030,581✔
1692
  size_t  totalSources = taosArrayGetSize(pExchangeInfo->pSources);
33,030,581✔
1693
  int64_t startTs = taosGetTimestampUs();
33,030,581✔
1694

1695
  int32_t vgId = 0;
33,030,581✔
1696
  if (pExchangeInfo->dynTbname) {
33,030,581✔
1697
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
80,552✔
1698
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
80,552✔
1699
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
80,552✔
1700
      if (pValue != NULL && pValue->isTbname) {
80,552✔
1701
        vgId = pValue->vgId;
80,552✔
1702
        break;
80,552✔
1703
      }
1704
    }
1705
  }
1706

1707
  while (1) {
8,420,995✔
1708
    if (pExchangeInfo->current >= totalSources) {
41,451,576✔
1709
      setAllSourcesCompleted(pOperator);
8,396,890✔
1710
      return TSDB_CODE_SUCCESS;
8,396,890✔
1711
    }
1712

1713
    SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
33,054,686✔
1714
    if (!pSource) {
33,054,686✔
1715
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1716
      pTaskInfo->code = terrno;
×
UNCOV
1717
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1718
    }
1719

1720
    if (vgId != 0 && pSource->addr.nodeId != vgId){
33,054,686✔
1721
      pExchangeInfo->current += 1;
57,060✔
1722
      continue;
57,060✔
1723
    }
1724

1725
    SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
32,997,626✔
1726
    if (!pDataInfo) {
32,997,626✔
1727
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
1728
      pTaskInfo->code = terrno;
×
UNCOV
1729
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1730
    }
1731
    pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
32,997,626✔
1732

1733
    code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
32,997,626✔
1734
    if (code != TSDB_CODE_SUCCESS) {
32,997,626✔
1735
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1736
      pTaskInfo->code = code;
×
UNCOV
1737
      T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
×
1738
    }
1739

1740
    while (true) {
383✔
1741
      code = exchangeWait(pOperator, pExchangeInfo);
32,998,009✔
1742
      if (code != TSDB_CODE_SUCCESS || isTaskKilled(pTaskInfo)) {
32,998,009✔
1743
        T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
383✔
1744
      }
1745

1746
      int64_t currSeqId = atomic_load_64(&pExchangeInfo->seqId);
32,997,626✔
1747
      if (pDataInfo->seqId != currSeqId) {
32,997,626✔
1748
        qDebug("seq rsp reqId %" PRId64 " mismatch with exchange %p curr seqId %" PRId64 ", ignore it", pDataInfo->seqId, pExchangeInfo, currSeqId);
383✔
1749
        taosMemoryFreeClear(pDataInfo->pRsp);
383✔
1750
        continue;
383✔
1751
      }
1752

1753
      break;
32,997,243✔
1754
    }
1755

1756
    if (pDataInfo->code != TSDB_CODE_SUCCESS) {
32,997,243✔
1757
      qError("%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64 " execId:%d error happens, code:%s",
642✔
1758
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1759
             tstrerror(pDataInfo->code));
1760
      pOperator->pTaskInfo->code = pDataInfo->code;
642✔
1761
      return pOperator->pTaskInfo->code;
642✔
1762
    }
1763

1764
    SRetrieveTableRsp*   pRsp = pDataInfo->pRsp;
32,996,601✔
1765
    SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
32,996,601✔
1766

1767
    if (pRsp->numOfRows == 0) {
32,996,601✔
1768
      qDebug("exhausted %p,%s vgId:%d, clientId:0x%" PRIx64 " taskID:0x%" PRIx64
8,363,935✔
1769
             " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", pDataInfo,
1770
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1771
             pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);
1772

1773
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
8,363,935✔
1774
      if (isVstbScan(pDataInfo) || isVstbTagScan(pDataInfo)) {
8,363,935✔
1775
        pExchangeInfo->current = totalSources;
8,278,402✔
1776
      } else {
1777
        pExchangeInfo->current += 1;
85,533✔
1778
      }
1779
      taosMemoryFreeClear(pDataInfo->pRsp);
8,363,935✔
1780
      continue;
8,363,935✔
1781
    }
1782

1783
    code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
24,632,666✔
1784
    if (code != TSDB_CODE_SUCCESS) {
24,632,666✔
UNCOV
1785
      goto _error;
×
1786
    }
1787

1788
    SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
24,632,666✔
1789
    if (pRsp->completed == 1) {
24,632,666✔
1790
      qDebug("exhausted %p,%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
120,466✔
1791
             ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, pDataInfo,
1792
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1793
             pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize,
1794
             pExchangeInfo->current + 1, totalSources);
1795

1796
      pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
120,466✔
1797
      if (isVstbScan(pDataInfo)) {
120,466✔
UNCOV
1798
        pExchangeInfo->current = totalSources;
×
1799
      } else {
1800
        pExchangeInfo->current += 1;
120,466✔
1801
      }
1802
    } else {
1803
      qDebug("%s fetch msg rsp from vgId:%d, clientId:0x%" PRIx64 " taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
24,512,200✔
1804
             ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
1805
             GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->clientId, pSource->taskId, pSource->execId,
1806
             pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize);
1807
    }
1808
    if (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) {
24,632,666✔
1809
      taosArrayClear(pExchangeInfo->pSourceDataInfo);
24,422,770✔
1810
    }
1811
    updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator);
24,632,666✔
1812
    pDataInfo->totalRows += pRetrieveRsp->numOfRows;
24,632,666✔
1813

1814
    taosMemoryFreeClear(pDataInfo->pRsp);
24,632,666✔
1815
    return TSDB_CODE_SUCCESS;
24,632,666✔
1816
  }
1817

1818
_error:
×
1819
  pTaskInfo->code = code;
×
UNCOV
1820
  return code;
×
1821
}
1822

1823
void clearVtbScanDataInfo(void* pItem) {
7,012,221✔
1824
  SSourceDataInfo *pInfo = (SSourceDataInfo *)pItem;
7,012,221✔
1825
  if (pInfo->orgTbInfo) {
7,012,221✔
1826
    taosArrayDestroy(pInfo->orgTbInfo->colMap);
×
UNCOV
1827
    taosMemoryFreeClear(pInfo->orgTbInfo);
×
1828
  }
1829
  if (pInfo->batchOrgTbInfo) {
7,012,221✔
1830
    for (int32_t i = 0; i < taosArrayGetSize(pInfo->batchOrgTbInfo); ++i) {
×
1831
      SOrgTbInfo* pColMap = taosArrayGet(pInfo->batchOrgTbInfo, i);
×
1832
      if (pColMap) {
×
UNCOV
1833
        taosArrayDestroy(pColMap->colMap);
×
1834
      }
1835
    }
UNCOV
1836
    taosArrayDestroy(pInfo->batchOrgTbInfo);
×
1837
  }
1838
  if (pInfo->tagList) {
7,012,221✔
1839
    taosArrayDestroyEx(pInfo->tagList, destroyTagVal);
×
UNCOV
1840
    pInfo->tagList = NULL;
×
1841
  }
1842
  taosArrayDestroy(pInfo->pSrcUidList);
7,012,221✔
1843
}
7,012,221✔
1844

1845
static int32_t loadTagListFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
5,738,820✔
1846
  int32_t  code = TSDB_CODE_SUCCESS;
5,738,820✔
1847
  int32_t  lino = 0;
5,738,820✔
1848
  STagVal  dstTag;
5,738,820✔
1849
  bool     needFree = false;
5,738,820✔
1850

1851
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
5,738,820✔
UNCOV
1852
    qError("%s failed since invalid exchange operator param type %d",
×
1853
      __func__, pBasicParam->paramType);
UNCOV
1854
    return TSDB_CODE_INVALID_PARA;
×
1855
  }
1856

1857
  if (pDataInfo->tagList) {
5,738,820✔
UNCOV
1858
    taosArrayClear(pDataInfo->tagList);
×
1859
  }
1860

1861
  if (pBasicParam->tagList) {
5,738,820✔
1862
    pDataInfo->tagList = taosArrayInit(1, sizeof(STagVal));
2,718,408✔
1863
    QUERY_CHECK_NULL(pDataInfo->tagList, code, lino, _return, terrno);
2,718,408✔
1864

1865
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->tagList); ++i) {
18,133,752✔
1866
      STagVal *pSrcTag = (STagVal*)taosArrayGet(pBasicParam->tagList, i);
15,415,344✔
1867
      QUERY_CHECK_NULL(pSrcTag, code, lino, _return, terrno);
15,415,344✔
1868

1869
      dstTag = (STagVal){0};
15,415,344✔
1870
      dstTag.type = pSrcTag->type;
15,415,344✔
1871
      dstTag.cid = pSrcTag->cid;
15,415,344✔
1872
      if (IS_VAR_DATA_TYPE(pSrcTag->type)) {
15,415,344✔
1873
        dstTag.nData = pSrcTag->nData;
6,762,192✔
1874
        dstTag.pData = taosMemoryMalloc(dstTag.nData);
6,762,192✔
1875
        QUERY_CHECK_NULL(dstTag.pData, code, lino, _return, terrno);
6,762,192✔
1876
        needFree = true;
6,762,192✔
1877
        memcpy(dstTag.pData, pSrcTag->pData, dstTag.nData);
6,762,192✔
1878
      } else {
1879
        dstTag.i64 = pSrcTag->i64;
8,653,152✔
1880
      }
1881

1882
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->tagList, &dstTag), code, lino, _return, terrno);
30,830,688✔
1883
      needFree = false;
15,415,344✔
1884
    }
1885
  } else {
1886
    pDataInfo->tagList = NULL;
3,020,412✔
1887
  }
1888

1889
  return code;
5,738,820✔
1890
_return:
×
1891
  if (needFree) {
×
UNCOV
1892
    taosMemoryFreeClear(dstTag.pData);
×
1893
  }
1894
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1895
  return code;
×
1896
}
1897

1898
int32_t loadBatchColMapFromBasicParam(SSourceDataInfo* pDataInfo, SExchangeOperatorBasicParam* pBasicParam) {
5,738,820✔
1899
  int32_t     code = TSDB_CODE_SUCCESS;
5,738,820✔
1900
  int32_t     lino = 0;
5,738,820✔
1901
  SOrgTbInfo  dstOrgTbInfo = {0};
5,738,820✔
1902
  bool        needFree = false;
5,738,820✔
1903

1904
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
5,738,820✔
UNCOV
1905
    qError("%s failed since invalid exchange operator param type %d",
×
1906
      __func__, pBasicParam->paramType);
UNCOV
1907
    return TSDB_CODE_INVALID_PARA;
×
1908
  }
1909

1910
  if (pBasicParam->batchOrgTbInfo) {
5,738,820✔
1911
    pDataInfo->batchOrgTbInfo = taosArrayInit(1, sizeof(SOrgTbInfo));
5,738,820✔
1912
    QUERY_CHECK_NULL(pDataInfo->batchOrgTbInfo, code, lino, _return, terrno);
5,738,820✔
1913

1914
    for (int32_t i = 0; i < taosArrayGetSize(pBasicParam->batchOrgTbInfo); ++i) {
15,102,596✔
1915
      SOrgTbInfo* pSrcOrgTbInfo = taosArrayGet(pBasicParam->batchOrgTbInfo, i);
9,363,776✔
1916
      QUERY_CHECK_NULL(pSrcOrgTbInfo, code, lino, _return, terrno);
9,363,776✔
1917

1918
      dstOrgTbInfo = (SOrgTbInfo){0};
9,363,776✔
1919
      dstOrgTbInfo.vgId = pSrcOrgTbInfo->vgId;
9,363,776✔
1920
      tstrncpy(dstOrgTbInfo.tbName, pSrcOrgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
9,363,776✔
1921

1922
      dstOrgTbInfo.colMap = taosArrayDup(pSrcOrgTbInfo->colMap, NULL);
9,363,776✔
1923
      QUERY_CHECK_NULL(dstOrgTbInfo.colMap, code, lino, _return, terrno);
9,363,776✔
1924

1925
      needFree = true;
9,363,776✔
1926
      QUERY_CHECK_NULL(taosArrayPush(pDataInfo->batchOrgTbInfo, &dstOrgTbInfo), code, lino, _return, terrno);
18,727,552✔
1927
      needFree = false;
9,363,776✔
1928
    }
1929
  } else {
UNCOV
1930
    pBasicParam->batchOrgTbInfo = NULL;
×
1931
  }
1932

1933
  return code;
5,738,820✔
1934
_return:
×
1935
  if (needFree) {
×
UNCOV
1936
    taosArrayDestroy(dstOrgTbInfo.colMap);
×
1937
  }
1938
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1939
  return code;
×
1940
}
1941

1942
int32_t addSingleExchangeSource(SOperatorInfo* pOperator,
40,225,182✔
1943
                                SExchangeOperatorBasicParam* pBasicParam) {
1944
  if (pBasicParam->paramType != DYN_TYPE_EXCHANGE_PARAM) {
40,225,182✔
UNCOV
1945
    qWarn("%s, %s found invalid exchange operator param type %d",
×
1946
      GET_TASKID(pOperator->pTaskInfo), __func__, pBasicParam->paramType);
UNCOV
1947
    return TSDB_CODE_SUCCESS;
×
1948
  }
1949

1950
  int32_t            code = TSDB_CODE_SUCCESS;
40,225,182✔
1951
  int32_t            lino = 0;
40,225,182✔
1952
  SExchangeInfo*     pExchangeInfo = pOperator->info;
40,225,182✔
1953
  SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
40,225,182✔
1954

1955
  if (NULL == pIdx) {
40,225,182✔
1956
    if (pBasicParam->isNewDeployed) {
2,431✔
1957
      SDownstreamSourceNode *pNode = NULL;
2,431✔
1958
      code = nodesCloneNode((SNode*)&pBasicParam->newDeployedSrc, (SNode**)&pNode);
2,431✔
1959
      QUERY_CHECK_CODE(code, lino, _return);
2,431✔
1960

1961
      SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pOperator->pPhyNode;
2,431✔
1962
      code = nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, (SNode*)pNode);
2,431✔
1963
      QUERY_CHECK_CODE(code, lino, _return);
2,431✔
1964

1965
      void* tmp = taosArrayPush(pExchangeInfo->pSources, pNode);
2,431✔
1966
      QUERY_CHECK_NULL(tmp, code, lino, _return, terrno);
2,431✔
1967

1968
      SExchangeSrcIndex idx = {.srcIdx = taosArrayGetSize(pExchangeInfo->pSources) - 1, .inUseIdx = -1};
2,431✔
1969
      code = tSimpleHashPut(pExchangeInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx));
2,431✔
1970
      if (pExchangeInfo->pHashSources) {
2,431✔
1971
        QUERY_CHECK_CODE(code, lino, _return);
2,431✔
1972
      }
1973
      pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
2,431✔
1974
      QUERY_CHECK_NULL(pIdx, code, lino, _return, TSDB_CODE_INVALID_PARA);
2,431✔
1975
    } else {
1976
      qError("No exchange source for vgId: %d", pBasicParam->vgId);
×
UNCOV
1977
      return TSDB_CODE_INVALID_PARA;
×
1978
    }
1979
  }
1980

1981
  qDebug("start to add single exchange source");
40,225,182✔
1982

1983
  switch (pBasicParam->type) {
40,225,182✔
1984
    case EX_SRC_TYPE_VSTB_WIN_SCAN:
5,738,820✔
1985
    case EX_SRC_TYPE_VSTB_AGG_SCAN: {
1986
      if (pIdx->inUseIdx < 0) {
5,738,820✔
1987
        SSourceDataInfo dataInfo = {0};
2,692,510✔
1988
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
2,692,510✔
1989
        dataInfo.taskId = pExchangeInfo->pTaskId;
2,692,510✔
1990
        dataInfo.index = pIdx->srcIdx;
2,692,510✔
1991
        dataInfo.groupid = pBasicParam->groupid;
2,692,510✔
1992
        dataInfo.window = pBasicParam->window;
2,692,510✔
1993
        dataInfo.isNewParam = pBasicParam->isNewParam;
2,692,510✔
1994
        code = loadTagListFromBasicParam(&dataInfo, pBasicParam);
2,692,510✔
1995
        QUERY_CHECK_CODE(code, lino, _return);
2,692,510✔
1996

1997
        code = loadBatchColMapFromBasicParam(&dataInfo, pBasicParam);
2,692,510✔
1998
        QUERY_CHECK_CODE(code, lino, _return);
2,692,510✔
1999

2000
        dataInfo.orgTbInfo = NULL;
2,692,510✔
2001

2002
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,692,510✔
2003
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
2,692,510✔
2004

2005
        dataInfo.type = pBasicParam->type;
2,692,510✔
2006
        dataInfo.srcOpType = pBasicParam->srcOpType;
2,692,510✔
2007
        dataInfo.tableSeq = pBasicParam->tableSeq;
2,692,510✔
2008

2009
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
5,385,020✔
2010

2011
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
2,692,510✔
2012
      } else {
2013
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
3,046,310✔
2014
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
3,046,310✔
2015

2016
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
3,046,310✔
2017
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
3,046,310✔
2018
        }
2019

2020
        pDataInfo->taskId = pExchangeInfo->pTaskId;
3,046,310✔
2021
        pDataInfo->index = pIdx->srcIdx;
3,046,310✔
2022
        pDataInfo->window = pBasicParam->window;
3,046,310✔
2023
        pDataInfo->groupid = pBasicParam->groupid;
3,046,310✔
2024
        pDataInfo->isNewParam = pBasicParam->isNewParam;
3,046,310✔
2025

2026
        code = loadTagListFromBasicParam(pDataInfo, pBasicParam);
3,046,310✔
2027
        QUERY_CHECK_CODE(code, lino, _return);
3,046,310✔
2028

2029
        code = loadBatchColMapFromBasicParam(pDataInfo, pBasicParam);
3,046,310✔
2030
        QUERY_CHECK_CODE(code, lino, _return);
3,046,310✔
2031

2032
        pDataInfo->orgTbInfo = NULL;
3,046,310✔
2033

2034
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
3,046,310✔
2035
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
3,046,310✔
2036

2037
        pDataInfo->type = pBasicParam->type;
3,046,310✔
2038
        pDataInfo->srcOpType = pBasicParam->srcOpType;
3,046,310✔
2039
        pDataInfo->tableSeq = pBasicParam->tableSeq;
3,046,310✔
2040
      }
2041
      break;
5,738,820✔
2042
    }
2043
    case EX_SRC_TYPE_VTB_WIN_SCAN:
5,723,425✔
2044
    case EX_SRC_TYPE_VSTB_TAG_SCAN: {
2045
      SSourceDataInfo dataInfo = {0};
5,723,425✔
2046
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
5,723,425✔
2047
      dataInfo.taskId = pExchangeInfo->pTaskId;
5,723,425✔
2048
      dataInfo.index = pIdx->srcIdx;
5,723,425✔
2049
      dataInfo.window = pBasicParam->window;
5,723,425✔
2050
      dataInfo.groupid = 0;
5,723,425✔
2051
      dataInfo.orgTbInfo = NULL;
5,723,425✔
2052
      dataInfo.tagList = NULL;
5,723,425✔
2053

2054
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
5,723,425✔
2055
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
5,723,425✔
2056

2057
      dataInfo.isNewParam = false;
5,723,425✔
2058
      dataInfo.type = pBasicParam->type;
5,723,425✔
2059
      dataInfo.srcOpType = pBasicParam->srcOpType;
5,723,425✔
2060
      dataInfo.tableSeq = pBasicParam->tableSeq;
5,723,425✔
2061

2062
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
5,723,425✔
2063
      QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
11,446,850✔
2064
      break;
5,723,425✔
2065
    }
2066
    case EX_SRC_TYPE_VSTB_SCAN: {
28,521,738✔
2067
      SSourceDataInfo dataInfo = {0};
28,521,738✔
2068
      dataInfo.status = EX_SOURCE_DATA_NOT_READY;
28,521,738✔
2069
      dataInfo.taskId = pExchangeInfo->pTaskId;
28,521,738✔
2070
      dataInfo.index = pIdx->srcIdx;
28,521,738✔
2071
      dataInfo.window = pBasicParam->window;
28,521,738✔
2072
      dataInfo.groupid = 0;
28,521,738✔
2073
      dataInfo.isNewParam = pBasicParam->isNewParam;
28,521,738✔
2074
      dataInfo.tagList = NULL;
28,521,738✔
2075
      dataInfo.orgTbInfo = taosMemoryMalloc(sizeof(SOrgTbInfo));
28,521,738✔
2076
      QUERY_CHECK_NULL(dataInfo.orgTbInfo, code, lino, _return, terrno);
28,521,738✔
2077
      dataInfo.orgTbInfo->vgId = pBasicParam->orgTbInfo->vgId;
28,521,738✔
2078
      tstrncpy(dataInfo.orgTbInfo->tbName, pBasicParam->orgTbInfo->tbName, TSDB_TABLE_FNAME_LEN);
28,521,738✔
2079
      dataInfo.orgTbInfo->colMap = taosArrayDup(pBasicParam->orgTbInfo->colMap, NULL);
28,521,738✔
2080
      QUERY_CHECK_NULL(dataInfo.orgTbInfo->colMap, code, lino, _return, terrno);
28,521,738✔
2081

2082
      dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
28,521,738✔
2083
      QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
28,521,738✔
2084

2085
      dataInfo.type = pBasicParam->type;
28,521,738✔
2086
      dataInfo.srcOpType = pBasicParam->srcOpType;
28,521,738✔
2087
      dataInfo.tableSeq = pBasicParam->tableSeq;
28,521,738✔
2088

2089
      taosArrayClearEx(pExchangeInfo->pSourceDataInfo, clearVtbScanDataInfo);
28,521,738✔
2090
      QUERY_CHECK_NULL( taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
57,043,476✔
2091
      break;
28,521,738✔
2092
    }
2093
    case EX_SRC_TYPE_STB_JOIN_SCAN:
241,199✔
2094
    default: {
2095
      if (pIdx->inUseIdx < 0) {
241,199✔
2096
        SSourceDataInfo dataInfo = {0};
238,931✔
2097
        dataInfo.status = EX_SOURCE_DATA_NOT_READY;
238,931✔
2098
        dataInfo.taskId = pExchangeInfo->pTaskId;
238,931✔
2099
        dataInfo.index = pIdx->srcIdx;
238,931✔
2100
        dataInfo.groupid = 0;
238,931✔
2101
        dataInfo.tagList = NULL;
238,931✔
2102

2103
        dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
238,931✔
2104
        QUERY_CHECK_NULL(dataInfo.pSrcUidList, code, lino, _return, terrno);
238,931✔
2105

2106
        dataInfo.isNewParam = false;
238,931✔
2107
        dataInfo.type = pBasicParam->type;
238,931✔
2108
        dataInfo.srcOpType = pBasicParam->srcOpType;
238,931✔
2109
        dataInfo.tableSeq = pBasicParam->tableSeq;
238,931✔
2110

2111
        QUERY_CHECK_NULL(taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo), code, lino, _return, terrno);
477,862✔
2112

2113
        pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
238,931✔
2114
      } else {
2115
        SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx);
2,268✔
2116
        QUERY_CHECK_NULL(pDataInfo, code, lino, _return, terrno);
2,268✔
2117
        if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
2,268✔
2118
          pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
2,268✔
2119
        }
2120

2121
        pDataInfo->tagList = NULL;
2,268✔
2122
        pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
2,268✔
2123
        QUERY_CHECK_NULL(pDataInfo->pSrcUidList, code, lino, _return, terrno);
2,268✔
2124

2125
        pDataInfo->groupid = 0;
2,268✔
2126
        pDataInfo->isNewParam = false;
2,268✔
2127
        pDataInfo->type = pBasicParam->type;
2,268✔
2128
        pDataInfo->srcOpType = pBasicParam->srcOpType;
2,268✔
2129
        pDataInfo->tableSeq = pBasicParam->tableSeq;
2,268✔
2130
      }
2131
      break;
241,199✔
2132
    }
2133
  }
2134

2135
  return code;
40,225,182✔
2136
_return:
×
2137
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
2138
  return code;
×
2139
}
2140

2141
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
39,728,596✔
2142
  SExchangeInfo*               pExchangeInfo = pOperator->info;
39,728,596✔
2143
  int32_t                      code = TSDB_CODE_SUCCESS;
39,728,596✔
2144
  SExchangeOperatorBasicParam* pBasicParam = NULL;
39,728,596✔
2145
  SExchangeOperatorParam*      pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
39,728,596✔
2146
  if (pParam->multiParams) {
39,728,596✔
2147
    SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
5,478,897✔
2148
    int32_t                      iter = 0;
5,478,897✔
2149
    while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
11,454,380✔
2150
      code = addSingleExchangeSource(pOperator, pBasicParam);
5,975,483✔
2151
      if (code) {
5,975,483✔
UNCOV
2152
        return code;
×
2153
      }
2154
    }
2155
  } else {
2156
    pBasicParam = &pParam->basic;
34,249,699✔
2157
    code = addSingleExchangeSource(pOperator, pBasicParam);
34,249,699✔
2158
  }
2159

2160
  freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
39,728,596✔
2161
  pOperator->pOperatorGetParam = NULL;
39,728,596✔
2162

2163
  return code;
39,728,596✔
2164
}
2165

2166
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
688,348,063✔
2167
  SExchangeInfo* pExchangeInfo = pOperator->info;
688,348,063✔
2168
  int32_t        code = TSDB_CODE_SUCCESS;
688,350,620✔
2169
  int32_t        lino = 0;
688,350,620✔
2170
  
2171
  if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp &&
688,350,620✔
2172
       NULL == pOperator->pOperatorGetParam) ||
532,052,117✔
2173
      (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
160,296,087✔
2174
    qDebug("%s, skip prepare, opened:%d, dynamicOp:%d, getParam:%p",
537,069,478✔
2175
      GET_TASKID(pOperator->pTaskInfo), OPTR_IS_OPENED(pOperator),
2176
      pExchangeInfo->dynamicOp, pOperator->pOperatorGetParam);
2177
    return TSDB_CODE_SUCCESS;
537,068,991✔
2178
  }
2179

2180
  if (pExchangeInfo->dynamicOp) {
151,278,567✔
2181
    code = addDynamicExchangeSource(pOperator);
39,728,596✔
2182
    QUERY_CHECK_CODE(code, lino, _end);
39,728,596✔
2183
  }
2184

2185
  if (pOperator->status == OP_NOT_OPENED &&
151,278,791✔
2186
      (pExchangeInfo->dynamicOp && pExchangeInfo->seqLoadData) ||
139,727,353✔
2187
      IS_STREAM_MODE(pOperator->pTaskInfo)) {
121,752,782✔
2188
    pExchangeInfo->current = 0;
35,668,967✔
2189
  }
2190

2191
  if (NULL != pOperator->pOperatorGetParam) {
151,277,181✔
2192
    storeNotifyInfo(pOperator);
3,999,954✔
2193
    /**
2194
      The param is referenced by getParam, and it will be freed by
2195
      the parent operator after getting next block.
2196
    */
2197
    pOperator->pOperatorGetParam->reUse = false;
3,999,954✔
2198
    pOperator->pOperatorGetParam = NULL;
3,999,954✔
2199
  }
2200

2201
  int64_t st = taosGetTimestampUs();
151,279,764✔
2202

2203
  if (!IS_STREAM_MODE(pOperator->pTaskInfo) && !pExchangeInfo->seqLoadData) {
151,279,764✔
2204
    code = prepareConcurrentlyLoad(pOperator);
112,363,300✔
2205
    QUERY_CHECK_CODE(code, lino, _end);
112,367,450✔
2206
    pExchangeInfo->openedTs = taosGetTimestampUs();
112,367,450✔
2207
  }
2208

2209
  OPTR_SET_OPENED(pOperator);
151,281,597✔
2210
  pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
151,281,929✔
2211

2212
  qDebug("%s prepare load complete", pOperator->pTaskInfo->id.str);
151,281,218✔
2213

2214
_end:
62,211,126✔
2215
  if (code != TSDB_CODE_SUCCESS) {
151,280,770✔
2216
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2217
    pOperator->pTaskInfo->code = code;
×
UNCOV
2218
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2219
  }
2220
  return code;
151,280,770✔
2221
}
2222

2223
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
3,955,698✔
2224
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,955,698✔
2225

2226
  if (pLimitInfo->remainGroupOffset > 0) {
3,955,698✔
2227
    if (pLimitInfo->currentGroupId == 0) {  // it is the first group
×
2228
      pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2229
      blockDataCleanup(pBlock);
×
2230
      return PROJECT_RETRIEVE_CONTINUE;
×
UNCOV
2231
    } else if (pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
×
2232
      // now it is the data from a new group
UNCOV
2233
      pLimitInfo->remainGroupOffset -= 1;
×
2234

2235
      // ignore data block in current group
2236
      if (pLimitInfo->remainGroupOffset > 0) {
×
2237
        blockDataCleanup(pBlock);
×
UNCOV
2238
        return PROJECT_RETRIEVE_CONTINUE;
×
2239
      }
2240
    }
2241

2242
    // set current group id of the project operator
UNCOV
2243
    pLimitInfo->currentGroupId = pBlock->info.id.groupId;
×
2244
  }
2245

2246
  // here check for a new group data, we need to handle the data of the previous group.
2247
  if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.id.groupId) {
3,955,698✔
2248
    pLimitInfo->numOfOutputGroups += 1;
173,074✔
2249
    if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
173,074✔
2250
      pOperator->status = OP_EXEC_DONE;
×
UNCOV
2251
      blockDataCleanup(pBlock);
×
2252

UNCOV
2253
      return PROJECT_RETRIEVE_DONE;
×
2254
    }
2255

2256
    // reset the value for a new group data
2257
    resetLimitInfoForNextGroup(pLimitInfo);
173,074✔
2258
    // existing rows that belongs to previous group.
2259
    if (pBlock->info.rows > 0) {
173,074✔
2260
      return PROJECT_RETRIEVE_DONE;
173,074✔
2261
    }
2262
  }
2263

2264
  // here we reach the start position, according to the limit/offset requirements.
2265

2266
  // set current group id
2267
  pLimitInfo->currentGroupId = pBlock->info.id.groupId;
3,782,624✔
2268

2269
  bool limitReached = applyLimitOffset(pLimitInfo, pBlock, pTaskInfo);
3,782,624✔
2270
  if (pBlock->info.rows == 0) {
3,782,624✔
2271
    return PROJECT_RETRIEVE_CONTINUE;
1,949,713✔
2272
  } else {
2273
    if (limitReached && (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
1,832,911✔
2274
      setOperatorCompleted(pOperator);
×
UNCOV
2275
      return PROJECT_RETRIEVE_DONE;
×
2276
    }
2277
  }
2278

2279
  // todo optimize performance
2280
  // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
2281
  // they may not belong to the same group the limit/offset value is not valid in this case.
2282
  if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || hasSlimitOffsetInfo(pLimitInfo)) {
1,832,911✔
2283
    return PROJECT_RETRIEVE_DONE;
1,832,911✔
2284
  } else {  // not full enough, continue to accumulate the output data in the buffer.
UNCOV
2285
    return PROJECT_RETRIEVE_CONTINUE;
×
2286
  }
2287
}
2288

2289
static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) {
225,075,033✔
2290
  SExecTaskInfo* pTask = pOperator->pTaskInfo;
225,075,033✔
2291
  int32_t        code = TSDB_CODE_SUCCESS;
225,075,744✔
2292
  if (pTask->pWorkerCb) {
225,075,744✔
2293
    code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool);
225,073,335✔
2294
    if (code != TSDB_CODE_SUCCESS) {
225,075,751✔
2295
      pTask->code = code;
×
UNCOV
2296
      return pTask->code;
×
2297
    }
2298
  }
2299

2300
  code = tsem_wait(&pExchangeInfo->ready);
225,077,673✔
2301
  if (code != TSDB_CODE_SUCCESS) {
225,075,608✔
2302
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
2303
    pTask->code = code;
×
UNCOV
2304
    return pTask->code;
×
2305
  }
2306

2307
  if (pTask->pWorkerCb) {
225,075,608✔
2308
    code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool);
225,076,107✔
2309
    if (code != TSDB_CODE_SUCCESS) {
225,076,107✔
2310
      pTask->code = code;
×
UNCOV
2311
      return pTask->code;
×
2312
    }
2313
  }
2314
  return TSDB_CODE_SUCCESS;
225,076,107✔
2315
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc