• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4873

04 Dec 2025 01:55AM UTC coverage: 64.558% (-0.1%) from 64.678%
#4873

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

719 of 2219 new or added lines in 36 files covered. (32.4%)

6363 existing lines in 135 files now uncovered.

159381 of 246882 relevant lines covered (64.56%)

108937395.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.72
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
500,962✔
41
  SStbJoinTableList* pNext = NULL;
500,962✔
42
  
43
  while (pListHead) {
501,537✔
44
    taosMemoryFree(pListHead->pLeftVg);
575✔
45
    taosMemoryFree(pListHead->pLeftUid);
575✔
46
    taosMemoryFree(pListHead->pRightVg);
575✔
47
    taosMemoryFree(pListHead->pRightUid);
575✔
48
    pNext = pListHead->pNext;
575✔
49
    taosMemoryFree(pListHead);
575✔
50
    pListHead = pNext;
575✔
51
  }
52
}
500,962✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
500,962✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
500,962✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
500,962✔
60
    if (pStbJoin->ctx.prev.leftHash) {
500,293✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
420,291✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
420,291✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
500,293✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
420,291✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
420,291✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
669✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
669✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
669✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
669✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
669✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
669✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
500,962✔
81
}
500,962✔
82

83
void destroyOrgTbInfo(void *info) {
932,422✔
84
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
932,422✔
85
  if (pOrgTbInfo) {
932,422✔
86
    taosArrayDestroy(pOrgTbInfo->colMap);
932,422✔
87
  }
88
}
932,422✔
89

90
void destroyColRefInfo(void *info) {
7,988,548✔
91
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
7,988,548✔
92
  if (pColRefInfo) {
7,988,548✔
93
    taosMemoryFree(pColRefInfo->colName);
7,988,548✔
94
    taosMemoryFree(pColRefInfo->colrefName);
7,988,548✔
95
  }
96
}
7,988,548✔
97

98
void destroyColRefArray(void *info) {
474,687✔
99
  SArray *pColRefArray = *(SArray **)info;
474,687✔
100
  if (pColRefArray) {
474,687✔
101
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
474,687✔
102
  }
103
}
474,687✔
104

105
void freeUseDbOutput(void* pOutput) {
153,738✔
106
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
153,738✔
107
  if (NULL == pOutput) {
153,738✔
108
    return;
×
109
  }
110

111
  if (pOut->dbVgroup) {
153,738✔
112
    freeVgInfo(pOut->dbVgroup);
153,738✔
113
  }
114
  taosMemFree(pOut);
153,738✔
115
}
116

117
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
149,338✔
118
  if (pVtbScan->dbName) {
149,338✔
119
    taosMemoryFreeClear(pVtbScan->dbName);
149,338✔
120
  }
121
  if (pVtbScan->tbName) {
149,338✔
122
    taosMemoryFreeClear(pVtbScan->tbName);
149,338✔
123
  }
124
  if (pVtbScan->childTableList) {
149,338✔
125
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
149,338✔
126
  }
127
  if (pVtbScan->colRefInfo) {
149,338✔
128
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
129
    pVtbScan->colRefInfo = NULL;
×
130
  }
131
  if (pVtbScan->childTableMap) {
149,338✔
132
    taosHashCleanup(pVtbScan->childTableMap);
117,105✔
133
  }
134
  if (pVtbScan->readColList) {
149,338✔
135
    taosArrayDestroy(pVtbScan->readColList);
149,338✔
136
  }
137
  if (pVtbScan->dbVgInfoMap) {
149,338✔
138
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
149,338✔
139
    taosHashCleanup(pVtbScan->dbVgInfoMap);
149,338✔
140
  }
141
  if (pVtbScan->orgTbVgColMap) {
149,338✔
142
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
143
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
144
  }
145
  if (pVtbScan->pRsp) {
149,338✔
146
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
147
    taosMemoryFreeClear(pVtbScan->pRsp);
×
148
  }
149
  if (pVtbScan->existOrgTbVg) {
149,338✔
150
    taosHashCleanup(pVtbScan->existOrgTbVg);
149,338✔
151
  }
152
  if (pVtbScan->curOrgTbVg) {
149,338✔
153
    taosHashCleanup(pVtbScan->curOrgTbVg);
3,196✔
154
  }
155
  if (pVtbScan->newAddedVgInfo) {
149,338✔
156
    taosHashCleanup(pVtbScan->newAddedVgInfo);
1,600✔
157
  }
158
}
149,338✔
159

NEW
160
void destroyWinArray(void *info) {
×
NEW
161
  SArray *pWinArray = *(SArray **)info;
×
NEW
162
  if (pWinArray) {
×
NEW
163
    taosArrayDestroy(pWinArray);
×
164
  }
NEW
165
}
×
166

NEW
167
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
×
NEW
168
  if (pVtbWindow->pRes) {
×
NEW
169
    blockDataDestroy(pVtbWindow->pRes);
×
170
  }
NEW
171
  if (pVtbWindow->pWins) {
×
NEW
172
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
173
  }
NEW
174
}
×
175

176
static void destroyDynQueryCtrlOperator(void* param) {
650,300✔
177
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
650,300✔
178

179
  switch (pDyn->qType) {
650,300✔
180
    case DYN_QTYPE_STB_HASH:
500,962✔
181
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
500,962✔
182
      break;
500,962✔
183
    case DYN_QTYPE_VTB_SCAN:
149,338✔
184
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
149,338✔
185
      break;
149,338✔
NEW
186
    case DYN_QTYPE_VTB_WINDOW:
×
NEW
187
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
×
NEW
188
      break;
×
189
    default:
×
190
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
191
      break;
×
192
  }
193

194
  taosMemoryFreeClear(param);
650,300✔
195
}
650,300✔
196

197
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
198
  if (batchFetch) {
3,185,190✔
199
    return true;
3,182,514✔
200
  }
201
  
202
  if (rightTable) {
2,676✔
203
    return pPost->rightCurrUid == pPost->rightNextUid;
1,338✔
204
  }
205

206
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
1,338✔
207

208
  return (NULL == num) ? false : true;
1,338✔
209
}
210

211
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
1,592,595✔
212
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,592,595✔
213
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
1,592,595✔
214
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,592,595✔
215
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
1,592,595✔
216
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
1,592,595✔
217
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
1,592,595✔
218
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
1,592,595✔
219
  int64_t                    readIdx = pNode->readIdx + 1;
1,592,595✔
220
  int64_t                    rightPrevUid = pPost->rightCurrUid;
1,592,595✔
221

222
  pPost->leftCurrUid = *leftUid;
1,592,595✔
223
  pPost->rightCurrUid = *rightUid;
1,592,595✔
224

225
  pPost->leftVgId = *leftVgId;
1,592,595✔
226
  pPost->rightVgId = *rightVgId;
1,592,595✔
227

228
  while (true) {
229
    if (readIdx < pNode->uidNum) {
1,592,595✔
230
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
1,512,499✔
231
      break;
1,512,499✔
232
    }
233
    
234
    pNode = pNode->pNext;
80,096✔
235
    if (NULL == pNode) {
80,096✔
236
      pPost->rightNextUid = 0;
80,096✔
237
      break;
80,096✔
238
    }
239
    
UNCOV
240
    rightUid = pNode->pRightUid;
×
UNCOV
241
    readIdx = 0;
×
242
  }
243

244
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
3,185,190✔
245
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
3,185,190✔
246

247
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
1,592,595✔
UNCOV
248
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
UNCOV
249
    pStbJoin->execInfo.rightCacheNum++;
×
250
  }  
251

252
  return TSDB_CODE_SUCCESS;
1,592,595✔
253
}
254

255

256
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
3,185,190✔
257
  int32_t code = TSDB_CODE_SUCCESS;
3,185,190✔
258
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,185,190✔
259
  if (NULL == *ppRes) {
3,185,190✔
260
    code = terrno;
×
UNCOV
261
    freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
262
    return code;
×
263
  }
264
  if (pChild) {
3,185,190✔
265
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
162,680✔
266
    if (NULL == (*ppRes)->pChildren) {
162,680✔
267
      code = terrno;
×
268
      freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
269
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
270
      *ppRes = NULL;
×
UNCOV
271
      return code;
×
272
    }
273
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
325,360✔
UNCOV
274
      code = terrno;
×
UNCOV
275
      freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
276
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
277
      *ppRes = NULL;
×
UNCOV
278
      return code;
×
279
    }
280
  } else {
281
    (*ppRes)->pChildren = NULL;
3,022,510✔
282
  }
283

284
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
3,185,190✔
285
  if (NULL == pGc) {
3,185,190✔
286
    code = terrno;
×
287
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
288
    *ppRes = NULL;
×
289
    return code;
×
290
  }
291

292
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
3,185,190✔
293
  pGc->downstreamIdx = downstreamIdx;
3,185,190✔
294
  pGc->vgId = vgId;
3,185,190✔
295
  pGc->tbUid = tbUid;
3,185,190✔
296
  pGc->needCache = needCache;
3,185,190✔
297

298
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
3,185,190✔
299
  (*ppRes)->downstreamIdx = downstreamIdx;
3,185,190✔
300
  (*ppRes)->value = pGc;
3,185,190✔
301
  (*ppRes)->reUse = false;
3,185,190✔
302

303
  return TSDB_CODE_SUCCESS;
3,185,190✔
304
}
305

306

307
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
308
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
309
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
310
  if (NULL == *ppRes) {
×
UNCOV
311
    return terrno;
×
312
  }
UNCOV
313
  (*ppRes)->pChildren = NULL;
×
314

UNCOV
315
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
UNCOV
316
  if (NULL == pGc) {
×
UNCOV
317
    code = terrno;
×
UNCOV
318
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
UNCOV
319
    return code;
×
320
  }
321

UNCOV
322
  pGc->downstreamIdx = downstreamIdx;
×
UNCOV
323
  pGc->vgId = vgId;
×
UNCOV
324
  pGc->tbUid = tbUid;
×
325

326
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
327
  (*ppRes)->downstreamIdx = downstreamIdx;
×
328
  (*ppRes)->value = pGc;
×
329
  (*ppRes)->reUse = false;
×
330

UNCOV
331
  return TSDB_CODE_SUCCESS;
×
332
}
333

334
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid);
NEW
335
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
NEW
336
  int32_t                   code = TSDB_CODE_SUCCESS;
×
NEW
337
  int32_t                   lino = 0;
×
NEW
338
  SExchangeOperatorParam*   pExc = NULL;
×
339

NEW
340
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
341
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
342

NEW
343
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
NEW
344
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
345

NEW
346
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
NEW
347
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
×
348

NEW
349
  pExc->multiParams = false;
×
NEW
350
  pExc->basic.vgId = 0;
×
NEW
351
  pExc->basic.tableSeq = true;
×
NEW
352
  pExc->basic.isVtbRefScan = false;
×
NEW
353
  pExc->basic.isVtbTagScan = false;
×
NEW
354
  pExc->basic.isVtbWinScan = true;
×
NEW
355
  pExc->basic.isNewParam = true;
×
NEW
356
  pExc->basic.window.skey = skey;
×
NEW
357
  pExc->basic.window.ekey = ekey;
×
NEW
358
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
NEW
359
  pExc->basic.colMap = NULL;
×
NEW
360
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
NEW
361
  QUERY_CHECK_NULL(pExc->basic.uidList, code, lino, _return, terrno)
×
362

NEW
363
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
NEW
364
  (*ppRes)->downstreamIdx = 0;
×
NEW
365
  (*ppRes)->reUse = true;
×
NEW
366
  (*ppRes)->value = pExc;
×
367

NEW
368
  return code;
×
NEW
369
_return:
×
NEW
370
  qError("failed to build exchange operator param for external window, code:%d, line:%d", code, lino);
×
NEW
371
  if (pExc) {
×
NEW
372
   if (pExc->basic.uidList) {
×
NEW
373
      taosArrayDestroy(pExc->basic.uidList);
×
374
    }
NEW
375
    taosMemoryFreeClear(pExc);
×
376
  }
NEW
377
  if (*ppRes) {
×
NEW
378
    if ((*ppRes)->pChildren) {
×
NEW
379
      taosArrayDestroy((*ppRes)->pChildren);
×
380
    }
NEW
381
    taosMemoryFreeClear(*ppRes);
×
382
  }
383

NEW
384
  return code;
×
385
}
386

387
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
2,676✔
388
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
2,676✔
389
  if (NULL == *ppRes) {
2,676✔
390
    return terrno;
×
391
  }
392
  (*ppRes)->pChildren = NULL;
2,676✔
393
  
394
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
2,676✔
395
  if (NULL == pExc) {
2,676✔
UNCOV
396
    return terrno;
×
397
  }
398

399
  pExc->multiParams = false;
2,676✔
400
  pExc->basic.vgId = *pVgId;
2,676✔
401
  pExc->basic.tableSeq = true;
2,676✔
402
  pExc->basic.isVtbRefScan = false;
2,676✔
403
  pExc->basic.isVtbWinScan = false;
2,676✔
404
  pExc->basic.isVtbTagScan = false;
2,676✔
405
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
2,676✔
406
  pExc->basic.colMap = NULL;
2,676✔
407
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
2,676✔
408
  if (NULL == pExc->basic.uidList) {
2,676✔
409
    taosMemoryFree(pExc);
×
410
    return terrno;
×
411
  }
412
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
5,352✔
413
    taosArrayDestroy(pExc->basic.uidList);
×
UNCOV
414
    taosMemoryFree(pExc);
×
415
    return terrno;
×
416
  }
417

418
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
2,676✔
419
  (*ppRes)->downstreamIdx = downstreamIdx;
2,676✔
420
  (*ppRes)->value = pExc;
2,676✔
421
  (*ppRes)->reUse = false;
2,676✔
422

423
  return TSDB_CODE_SUCCESS;
2,676✔
424
}
425

426
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
145,912✔
427
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
145,912✔
428
  if (NULL == *ppRes) {
145,912✔
429
    return terrno;
×
430
  }
431
  (*ppRes)->pChildren = NULL;
145,912✔
432
  
433
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
145,912✔
434
  if (NULL == pExc) {
145,912✔
435
    taosMemoryFreeClear(*ppRes);
×
436
    return terrno;
×
437
  }
438

439
  pExc->multiParams = true;
145,912✔
440
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
145,912✔
441
  if (NULL == pExc->pBatchs) {
145,912✔
442
    taosMemoryFree(pExc);
×
443
    taosMemoryFreeClear(*ppRes);
×
444
    return terrno;
×
445
  }
446
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
145,912✔
447
  
448
  SExchangeOperatorBasicParam basic;
145,912✔
449
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
145,912✔
450

451
  int32_t iter = 0;
145,912✔
452
  void* p = NULL;
145,912✔
453
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
408,591✔
454
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
262,679✔
455
    SArray* pUidList = *(SArray**)p;
262,679✔
456
    basic.vgId = *pVgId;
262,679✔
457
    basic.uidList = pUidList;
262,679✔
458
    basic.colMap = NULL;
262,679✔
459
    basic.tableSeq = false;
262,679✔
460
    basic.isVtbRefScan = false;
262,679✔
461
    basic.isVtbWinScan = false;
262,679✔
462
    basic.isVtbTagScan = false;
262,679✔
463
    
464
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
262,679✔
465

466
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
262,679✔
467
    *(SArray**)p = NULL;
262,679✔
468
  }
469

470
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
145,912✔
471
  (*ppRes)->downstreamIdx = downstreamIdx;
145,912✔
472
  (*ppRes)->value = pExc;
145,912✔
473
  (*ppRes)->reUse = false;
145,912✔
474

475
  return TSDB_CODE_SUCCESS;
145,912✔
476
}
477

478
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
600,360✔
479
  int32_t                      code = TSDB_CODE_SUCCESS;
600,360✔
480
  int32_t                      lino = 0;
600,360✔
481
  SExchangeOperatorParam*      pExc = NULL;
600,360✔
482
  SExchangeOperatorBasicParam* basic = NULL;
600,360✔
483

484
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
600,360✔
485
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
600,360✔
486
  (*ppRes)->pChildren = NULL;
600,360✔
487

488
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
600,360✔
489
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
600,360✔
490

491
  pExc->multiParams = false;
600,360✔
492

493
  basic = &pExc->basic;
600,360✔
494
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
600,360✔
495

496
  basic->vgId = vgId;
600,360✔
497
  basic->tableSeq = false;
600,360✔
498
  basic->isVtbRefScan = false;
600,360✔
499
  basic->isVtbTagScan = true;
600,360✔
500
  basic->isVtbWinScan = false;
600,360✔
501
  basic->isNewDeployed = false;
600,360✔
502
  basic->colMap = NULL;
600,360✔
503

504
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
600,360✔
505
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
600,360✔
506
  QUERY_CHECK_NULL(taosArrayPush(basic->uidList, &uid), code, lino, _return, terrno)
1,200,720✔
507

508
  (*ppRes)->pChildren = NULL;
600,360✔
509

510
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
600,360✔
511
  (*ppRes)->downstreamIdx = downstreamIdx;
600,360✔
512
  (*ppRes)->value = pExc;
600,360✔
513
  (*ppRes)->reUse = true;
600,360✔
514

515
  return TSDB_CODE_SUCCESS;
600,360✔
516

517
_return:
×
518
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
519
  taosMemoryFreeClear(*ppRes);
×
UNCOV
520
  if (basic) {
×
UNCOV
521
    if (basic->colMap) {
×
UNCOV
522
      taosArrayDestroy(basic->colMap->colMap);
×
UNCOV
523
      taosMemoryFreeClear(basic->colMap);
×
524
    }
UNCOV
525
    if (basic->uidList) {
×
UNCOV
526
      taosArrayDestroy(basic->uidList);
×
527
    }
UNCOV
528
    taosMemoryFreeClear(basic);
×
529
  }
UNCOV
530
  taosMemoryFreeClear(pExc);
×
UNCOV
531
  return code;
×
532
}
533

534
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
928,022✔
535
  int32_t                      code = TSDB_CODE_SUCCESS;
928,022✔
536
  int32_t                      lino = 0;
928,022✔
537
  SExchangeOperatorParam*      pExc = NULL;
928,022✔
538
  SExchangeOperatorBasicParam* basic = NULL;
928,022✔
539

540
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
928,022✔
541
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
928,022✔
542
  (*ppRes)->pChildren = NULL;
928,022✔
543

544
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
928,022✔
545
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
928,022✔
546

547
  pExc->multiParams = false;
928,022✔
548

549
  basic = &pExc->basic;
928,022✔
550
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
928,022✔
551

552
  basic->vgId = pMap->vgId;
928,022✔
553
  basic->tableSeq = false;
928,022✔
554
  basic->isVtbRefScan = true;
928,022✔
555
  basic->isVtbWinScan = false;
928,022✔
556
  basic->isVtbTagScan = false;
928,022✔
557
  basic->isNewDeployed = false;
928,022✔
558
  basic->isNewParam = true;
928,022✔
559
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
928,022✔
560
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
928,022✔
561
  basic->colMap->vgId = pMap->vgId;
928,022✔
562
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
928,022✔
563
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
928,022✔
564
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
928,022✔
565

566
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
928,022✔
567
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
928,022✔
568

569
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
928,022✔
570
  (*ppRes)->downstreamIdx = downstreamIdx;
928,022✔
571
  (*ppRes)->value = pExc;
928,022✔
572
  (*ppRes)->reUse = true;
928,022✔
573

574
  return TSDB_CODE_SUCCESS;
928,022✔
575

UNCOV
576
_return:
×
UNCOV
577
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
578
  taosMemoryFreeClear(*ppRes);
×
UNCOV
579
  if (basic) {
×
UNCOV
580
    if (basic->colMap) {
×
UNCOV
581
      taosArrayDestroy(basic->colMap->colMap);
×
UNCOV
582
      taosMemoryFreeClear(basic->colMap);
×
583
    }
UNCOV
584
    if (basic->uidList) {
×
UNCOV
585
      taosArrayDestroy(basic->uidList);
×
586
    }
UNCOV
587
    taosMemoryFreeClear(basic);
×
588
  }
UNCOV
589
  taosMemoryFreeClear(pExc);
×
UNCOV
590
  return code;
×
591
}
592

593
static int32_t buildExchangeOperatorParamForVScanEx(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap, uint64_t taskId, SStreamTaskAddr* pTaskAddr) {
4,400✔
594
  int32_t                      code = TSDB_CODE_SUCCESS;
4,400✔
595
  int32_t                      lino = 0;
4,400✔
596
  SExchangeOperatorParam*      pExc = NULL;
4,400✔
597
  SExchangeOperatorBasicParam* basic = NULL;
4,400✔
598

599
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,400✔
600
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,400✔
601
  (*ppRes)->pChildren = NULL;
4,400✔
602

603
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
4,400✔
604
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
4,400✔
605

606
  pExc->multiParams = false;
4,400✔
607

608
  basic = &pExc->basic;
4,400✔
609
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
4,400✔
610

611
  basic->vgId = pMap->vgId;
4,400✔
612
  basic->tableSeq = false;
4,400✔
613
  basic->isVtbRefScan = true;
4,400✔
614
  basic->isVtbWinScan = false;
4,400✔
615
  basic->isVtbTagScan = false;
4,400✔
616
  basic->isNewDeployed = true;
4,400✔
617
  basic->isNewParam = true;
4,400✔
618
  basic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
4,400✔
619
  basic->newDeployedSrc.clientId = taskId;// current task's taskid
4,400✔
620
  basic->newDeployedSrc.taskId = pTaskAddr->taskId;
4,400✔
621
  basic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
4,400✔
622
  basic->newDeployedSrc.localExec = false;
4,400✔
623
  basic->newDeployedSrc.addr.nodeId = pTaskAddr->nodeId;
4,400✔
624
  memcpy(&basic->newDeployedSrc.addr.epSet, &pTaskAddr->epset, sizeof(SEpSet));
4,400✔
625
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
4,400✔
626
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
4,400✔
627
  basic->colMap->vgId = pMap->vgId;
4,400✔
628
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
4,400✔
629
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
4,400✔
630
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
4,400✔
631

632
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
4,400✔
633
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
4,400✔
634

635
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
4,400✔
636
  (*ppRes)->downstreamIdx = downstreamIdx;
4,400✔
637
  (*ppRes)->value = pExc;
4,400✔
638
  (*ppRes)->reUse = true;
4,400✔
639

640
  return TSDB_CODE_SUCCESS;
4,400✔
641

UNCOV
642
_return:
×
UNCOV
643
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
644
  taosMemoryFreeClear(*ppRes);
×
UNCOV
645
  if (basic) {
×
UNCOV
646
    if (basic->colMap) {
×
UNCOV
647
      taosArrayDestroy(basic->colMap->colMap);
×
UNCOV
648
      taosMemoryFreeClear(basic->colMap);
×
649
    }
UNCOV
650
    if (basic->uidList) {
×
UNCOV
651
      taosArrayDestroy(basic->uidList);
×
652
    }
UNCOV
653
    taosMemoryFreeClear(basic);
×
654
  }
655
  taosMemoryFreeClear(pExc);
×
656
  return code;
×
657
}
658

659
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
1,592,595✔
660
  int32_t code = TSDB_CODE_SUCCESS;
1,592,595✔
661
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,592,595✔
662
  if (NULL == *ppRes) {
1,592,595✔
663
    code = terrno;
×
UNCOV
664
    return code;
×
665
  }
666
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
1,592,595✔
667
  if (NULL == (*ppRes)->pChildren) {
1,592,595✔
668
    code = terrno;
×
UNCOV
669
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
670
    *ppRes = NULL;
×
UNCOV
671
    return code;
×
672
  }
673
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
3,185,190✔
UNCOV
674
    code = terrno;
×
UNCOV
675
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
676
    *ppRes = NULL;
×
UNCOV
677
    return code;
×
678
  }
679
  *ppChild0 = NULL;
1,592,595✔
680
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
3,185,190✔
UNCOV
681
    code = terrno;
×
UNCOV
682
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
683
    *ppRes = NULL;
×
UNCOV
684
    return code;
×
685
  }
686
  *ppChild1 = NULL;
1,592,595✔
687
  
688
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
1,592,595✔
689
  if (NULL == pJoin) {
1,592,595✔
UNCOV
690
    code = terrno;
×
UNCOV
691
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
692
    *ppRes = NULL;
×
UNCOV
693
    return code;
×
694
  }
695

696
  pJoin->initDownstream = initParam;
1,592,595✔
697
  
698
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
1,592,595✔
699
  (*ppRes)->value = pJoin;
1,592,595✔
700
  (*ppRes)->reUse = false;
1,592,595✔
701

702
  return TSDB_CODE_SUCCESS;
1,592,595✔
703
}
704

UNCOV
705
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
UNCOV
706
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
707
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
708
  if (NULL == *ppRes) {
×
UNCOV
709
    code = terrno;
×
UNCOV
710
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
711
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
712
    return code;
×
713
  }
UNCOV
714
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
UNCOV
715
  if (NULL == *ppRes) {
×
UNCOV
716
    code = terrno;
×
UNCOV
717
    taosMemoryFreeClear(*ppRes);
×
UNCOV
718
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
719
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
720
    return code;
×
721
  }
722
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
723
    code = terrno;
×
724
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
725
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
726
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
727
    *ppRes = NULL;
×
728
    return code;
×
729
  }
UNCOV
730
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
731
    code = terrno;
×
UNCOV
732
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
733
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
734
    *ppRes = NULL;
×
UNCOV
735
    return code;
×
736
  }
737
  
UNCOV
738
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
UNCOV
739
  (*ppRes)->value = NULL;
×
UNCOV
740
  (*ppRes)->reUse = false;
×
741

742
  return TSDB_CODE_SUCCESS;
×
743
}
744

745
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
14,092✔
746
  int32_t code = TSDB_CODE_SUCCESS;
14,092✔
747
  int32_t vgNum = tSimpleHashGetSize(pVg);
14,092✔
748
  if (vgNum <= 0 || vgNum > 1) {
14,092✔
749
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
UNCOV
750
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
751
  }
752

753
  int32_t iter = 0;
14,092✔
754
  void* p = NULL;
14,092✔
755
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
28,184✔
756
    SArray* pUidList = *(SArray**)p;
14,092✔
757

758
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
14,092✔
759
    if (code) {
14,092✔
760
      return code;
×
761
    }
762
    taosArrayDestroy(pUidList);
14,092✔
763
    *(SArray**)p = NULL;
14,092✔
764
  }
765
  
766
  return TSDB_CODE_SUCCESS;
14,092✔
767
}
768

769

770
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
771
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
UNCOV
772
  if (NULL == pUidList) {
×
UNCOV
773
    return terrno;
×
774
  }
UNCOV
775
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
UNCOV
776
    return terrno;
×
777
  }
778

UNCOV
779
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
UNCOV
780
  taosArrayDestroy(pUidList);
×
UNCOV
781
  if (code) {
×
UNCOV
782
    return code;
×
783
  }
784
  
785
  return TSDB_CODE_SUCCESS;
×
786
}
787

788
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
1,592,595✔
789
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
1,592,595✔
790
  SOperatorParam*             pSrcParam0 = NULL;
1,592,595✔
791
  SOperatorParam*             pSrcParam1 = NULL;
1,592,595✔
792
  SOperatorParam*             pGcParam0 = NULL;
1,592,595✔
793
  SOperatorParam*             pGcParam1 = NULL;  
1,592,595✔
794
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
1,592,595✔
795
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
1,592,595✔
796
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
1,592,595✔
797
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
1,592,595✔
798
  int32_t                     code = TSDB_CODE_SUCCESS;
1,592,595✔
799

800
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
1,592,595✔
801
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
802

803
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
1,592,595✔
804
  
805
  if (pInfo->stbJoin.basic.batchFetch) {
1,592,595✔
806
    if (pPrev->leftHash) {
1,591,257✔
807
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
80,002✔
808
      if (TSDB_CODE_SUCCESS == code) {
80,002✔
809
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
80,002✔
810
      }
811
      if (TSDB_CODE_SUCCESS == code) {
80,002✔
812
        tSimpleHashCleanup(pPrev->leftHash);
80,002✔
813
        tSimpleHashCleanup(pPrev->rightHash);
80,002✔
814
        pPrev->leftHash = NULL;
80,002✔
815
        pPrev->rightHash = NULL;
80,002✔
816
      }
817
    }
818
  } else {
819
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
1,338✔
820
    if (TSDB_CODE_SUCCESS == code) {
1,338✔
821
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
1,338✔
822
    }
823
  }
824

825
  bool initParam = pSrcParam0 ? true : false;
1,592,595✔
826
  if (TSDB_CODE_SUCCESS == code) {
1,592,595✔
827
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
1,592,595✔
828
    pSrcParam0 = NULL;
1,592,595✔
829
  }
830
  if (TSDB_CODE_SUCCESS == code) {
1,592,595✔
831
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
1,592,595✔
832
    pSrcParam1 = NULL;
1,592,595✔
833
  }
834
  if (TSDB_CODE_SUCCESS == code) {
1,592,595✔
835
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
1,592,595✔
836
  }
837
  if (TSDB_CODE_SUCCESS != code) {
1,592,595✔
838
    if (pSrcParam0) {
×
UNCOV
839
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
840
    }
UNCOV
841
    if (pSrcParam1) {
×
UNCOV
842
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
843
    }
UNCOV
844
    if (pGcParam0) {
×
UNCOV
845
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
846
    }
UNCOV
847
    if (pGcParam1) {
×
848
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
849
    }
850
    if (*ppParam) {
×
851
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
UNCOV
852
      *ppParam = NULL;
×
853
    }
854
  }
855
  
856
  return code;
1,592,595✔
857
}
858

859
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,592,595✔
860
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,592,595✔
861
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,592,595✔
862
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,592,595✔
863
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
1,592,595✔
864
  SOperatorParam*            pParam = NULL;
1,592,595✔
865
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
1,592,595✔
866
  if (TSDB_CODE_SUCCESS != code) {
1,592,595✔
UNCOV
867
    pOperator->pTaskInfo->code = code;
×
UNCOV
868
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
869
  }
870

871
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
1,592,595✔
872
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
1,592,595✔
873
  if (*ppRes && (code == 0)) {
1,592,595✔
874
    code = blockDataCheck(*ppRes);
244,324✔
875
    if (code) {
244,324✔
UNCOV
876
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
UNCOV
877
      pOperator->pTaskInfo->code = code;
×
UNCOV
878
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
879
    }
880
    pPost->isStarted = true;
244,324✔
881
    pStbJoin->execInfo.postBlkNum++;
244,324✔
882
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
244,324✔
883
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
244,324✔
884
  } else {
885
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
1,348,271✔
886
  }
887
}
1,592,595✔
888

889

UNCOV
890
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
UNCOV
891
  SOperatorParam* pGcParam = NULL;
×
UNCOV
892
  SOperatorParam* pMergeJoinParam = NULL;
×
UNCOV
893
  int32_t         downstreamId = leftTable ? 0 : 1;
×
UNCOV
894
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
UNCOV
895
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
896

UNCOV
897
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
898

UNCOV
899
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
UNCOV
900
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
901
    return code;
×
902
  }
UNCOV
903
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
UNCOV
904
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
905
    return code;
×
906
  }
907

UNCOV
908
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
909
}
910

911
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
1,592,020✔
912
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
1,592,020✔
913
  int32_t code = 0;
1,592,020✔
914
  
915
  pPost->isStarted = false;
1,592,020✔
916
  
917
  if (pStbJoin->basic.batchFetch) {
1,592,020✔
918
    return TSDB_CODE_SUCCESS;
1,590,682✔
919
  }
920
  
921
  if (pPost->leftNeedCache) {
1,338✔
922
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
923
    if (num && --(*num) <= 0) {
×
UNCOV
924
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
925
      if (code) {
×
926
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
UNCOV
927
        QRY_ERR_RET(code);
×
928
      }
929
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
930
    }
931
  }
932
  
933
  if (!pPost->rightNeedCache) {
1,338✔
934
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
1,338✔
935
    if (NULL != v) {
1,338✔
UNCOV
936
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
UNCOV
937
      if (code) {
×
UNCOV
938
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
UNCOV
939
        QRY_ERR_RET(code);
×
940
      }
UNCOV
941
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
942
    }
943
  }
944

945
  return TSDB_CODE_SUCCESS;
1,338✔
946
}
947

948

949
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
950
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
324,420✔
951
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
324,420✔
952
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
324,420✔
953

954
  if (!pPost->isStarted) {
324,420✔
955
    return TSDB_CODE_SUCCESS;
80,671✔
956
  }
957
  
958
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
243,749✔
959
  
960
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
243,749✔
961
  if (NULL == *ppRes) {
243,749✔
962
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
243,749✔
963
    pPrev->pListHead->readIdx++;
243,749✔
964
  } else {
UNCOV
965
    pInfo->stbJoin.execInfo.postBlkNum++;
×
UNCOV
966
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
967
  }
968

969
  return TSDB_CODE_SUCCESS;
243,749✔
970
}
971

972
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
973
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
3,187,114✔
974
  if (NULL == ppArray) {
3,187,114✔
975
    SArray* pArray = taosArrayInit(10, valSize);
276,771✔
976
    if (NULL == pArray) {
276,771✔
977
      return terrno;
×
978
    }
979
    if (NULL == taosArrayPush(pArray, pVal)) {
553,542✔
UNCOV
980
      taosArrayDestroy(pArray);
×
981
      return terrno;
×
982
    }
983
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
276,771✔
UNCOV
984
      taosArrayDestroy(pArray);      
×
UNCOV
985
      return terrno;
×
986
    }
987
    return TSDB_CODE_SUCCESS;
276,771✔
988
  }
989

990
  if (NULL == taosArrayPush(*ppArray, pVal)) {
5,820,686✔
UNCOV
991
    return terrno;
×
992
  }
993
  
994
  return TSDB_CODE_SUCCESS;
2,910,343✔
995
}
996

997
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
998
  int32_t code = TSDB_CODE_SUCCESS;
1,338✔
999
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
1,338✔
1000
  if (NULL == pNum) {
1,338✔
1001
    uint32_t n = 1;
1,338✔
1002
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
1,338✔
1003
    if (code) {
1,338✔
1004
      return code;
×
1005
    }
1006
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
1,338✔
1007
    if (code) {
1,338✔
UNCOV
1008
      return code;
×
1009
    }
1010
    return TSDB_CODE_SUCCESS;
1,338✔
1011
  }
1012

UNCOV
1013
  switch (*pNum) {
×
1014
    case 0:
×
1015
      break;
×
1016
    case UINT32_MAX:
×
1017
      *pNum = 0;
×
UNCOV
1018
      break;
×
1019
    default:
×
UNCOV
1020
      if (1 == (*pNum)) {
×
UNCOV
1021
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
UNCOV
1022
        if (code) {
×
UNCOV
1023
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
UNCOV
1024
          QRY_ERR_RET(code);
×
1025
        }
1026
      }
UNCOV
1027
      (*pNum)++;
×
UNCOV
1028
      break;
×
1029
  }
1030
  
UNCOV
1031
  return TSDB_CODE_SUCCESS;
×
1032
}
1033

1034

1035
static void freeStbJoinTableList(SStbJoinTableList* pList) {
80,096✔
1036
  if (NULL == pList) {
80,096✔
UNCOV
1037
    return;
×
1038
  }
1039
  taosMemoryFree(pList->pLeftVg);
80,096✔
1040
  taosMemoryFree(pList->pLeftUid);
80,096✔
1041
  taosMemoryFree(pList->pRightVg);
80,096✔
1042
  taosMemoryFree(pList->pRightUid);
80,096✔
1043
  taosMemoryFree(pList);
80,096✔
1044
}
1045

1046
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
80,671✔
1047
  int32_t code = TSDB_CODE_SUCCESS;
80,671✔
1048
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
80,671✔
1049
  if (NULL == pNew) {
80,671✔
UNCOV
1050
    return terrno;
×
1051
  }
1052
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
80,671✔
1053
  if (NULL == pNew->pLeftVg) {
80,671✔
UNCOV
1054
    code = terrno;
×
1055
    freeStbJoinTableList(pNew);
×
UNCOV
1056
    return code;
×
1057
  }
1058
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
80,671✔
1059
  if (NULL == pNew->pLeftUid) {
80,671✔
UNCOV
1060
    code = terrno;
×
UNCOV
1061
    freeStbJoinTableList(pNew);
×
1062
    return code;
×
1063
  }
1064
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
80,671✔
1065
  if (NULL == pNew->pRightVg) {
80,671✔
UNCOV
1066
    code = terrno;
×
UNCOV
1067
    freeStbJoinTableList(pNew);
×
UNCOV
1068
    return code;
×
1069
  }
1070
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
80,671✔
1071
  if (NULL == pNew->pRightUid) {
80,671✔
UNCOV
1072
    code = terrno;
×
UNCOV
1073
    freeStbJoinTableList(pNew);
×
UNCOV
1074
    return code;
×
1075
  }
1076

1077
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
80,671✔
1078
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
80,671✔
1079
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
80,671✔
1080
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
80,671✔
1081

1082
  pNew->readIdx = 0;
80,671✔
1083
  pNew->uidNum = rows;
80,671✔
1084
  pNew->pNext = NULL;
80,671✔
1085
  
1086
  if (pCtx->pListTail) {
80,671✔
UNCOV
1087
    pCtx->pListTail->pNext = pNew;
×
UNCOV
1088
    pCtx->pListTail = pNew;
×
1089
  } else {
1090
    pCtx->pListHead = pNew;
80,671✔
1091
    pCtx->pListTail= pNew;
80,671✔
1092
  }
1093

1094
  return TSDB_CODE_SUCCESS;
80,671✔
1095
}
1096

1097
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
80,671✔
1098
  int32_t                    code = TSDB_CODE_SUCCESS;
80,671✔
1099
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
80,671✔
1100
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
80,671✔
1101
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
80,671✔
1102
  if (NULL == pVg0) {
80,671✔
UNCOV
1103
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1104
  }
1105
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
80,671✔
1106
  if (NULL == pVg1) {
80,671✔
UNCOV
1107
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1108
  }
1109
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
80,671✔
1110
  if (NULL == pUid0) {
80,671✔
UNCOV
1111
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1112
  }
1113
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
80,671✔
1114
  if (NULL == pUid1) {
80,671✔
1115
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1116
  }
1117

1118
  if (pStbJoin->basic.batchFetch) {
80,671✔
1119
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
1,673,559✔
1120
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
1,593,557✔
1121
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
1,593,557✔
1122
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
1,593,557✔
1123
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
1,593,557✔
1124

1125
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
1,593,557✔
1126
      if (TSDB_CODE_SUCCESS != code) {
1,593,557✔
UNCOV
1127
        break;
×
1128
      }
1129
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
1,593,557✔
1130
      if (TSDB_CODE_SUCCESS != code) {
1,593,557✔
UNCOV
1131
        break;
×
1132
      }
1133
    }
1134
  } else {
1135
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2,007✔
1136
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
1,338✔
1137
    
1138
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
1,338✔
1139
      if (TSDB_CODE_SUCCESS != code) {
1,338✔
1140
        break;
×
1141
      }
1142
    }
1143
  }
1144

1145
  if (TSDB_CODE_SUCCESS == code) {
80,671✔
1146
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
80,671✔
1147
    if (TSDB_CODE_SUCCESS == code) {
80,671✔
1148
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
80,671✔
1149
    }
1150
  }
1151

1152
_return:
×
1153

1154
  if (TSDB_CODE_SUCCESS != code) {
80,671✔
UNCOV
1155
    pOperator->pTaskInfo->code = code;
×
UNCOV
1156
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1157
  }
1158
}
80,671✔
1159

1160

1161
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
500,962✔
1162
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
500,962✔
1163
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
500,962✔
1164

1165
  if (pStbJoin->basic.batchFetch) {
500,962✔
1166
    return;
500,293✔
1167
  }
1168

1169
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
669✔
1170
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
669✔
1171
    return;
669✔
1172
  }
1173

UNCOV
1174
  uint64_t* pUid = NULL;
×
UNCOV
1175
  int32_t iter = 0;
×
UNCOV
1176
  int32_t code = 0;
×
UNCOV
1177
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
UNCOV
1178
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
UNCOV
1179
    if (code) {
×
UNCOV
1180
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1181
    }
1182
  }
1183

UNCOV
1184
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1185
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1186

1187
/*
1188
  // debug only
1189
  iter = 0;
1190
  uint32_t* num = NULL;
1191
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1192
    A S S E R T(*num > 1);
1193
  }
1194
*/  
1195
}
1196

1197
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
500,962✔
1198
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
500,962✔
1199
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
500,962✔
1200

1201
  while (true) {
80,671✔
1202
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
581,633✔
1203
    if (NULL == pBlock) {
581,633✔
1204
      break;
500,962✔
1205
    }
1206

1207
    pStbJoin->execInfo.prevBlkNum++;
80,671✔
1208
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
80,671✔
1209
    
1210
    doBuildStbJoinTableHash(pOperator, pBlock);
80,671✔
1211
  }
1212

1213
  postProcessStbJoinTableHash(pOperator);
500,962✔
1214

1215
  pStbJoin->ctx.prev.joinBuild = true;
500,962✔
1216
}
500,962✔
1217

1218
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
324,420✔
1219
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
324,420✔
1220
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
324,420✔
1221
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
324,420✔
1222
  SStbJoinTableList*         pNode = pPrev->pListHead;
324,420✔
1223

1224
  while (pNode) {
1,752,787✔
1225
    if (pNode->readIdx >= pNode->uidNum) {
1,672,691✔
1226
      pPrev->pListHead = pNode->pNext;
80,096✔
1227
      freeStbJoinTableList(pNode);
80,096✔
1228
      pNode = pPrev->pListHead;
80,096✔
1229
      continue;
80,096✔
1230
    }
1231
    
1232
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
1,592,595✔
1233
    if (*ppRes) {
1,592,595✔
1234
      return TSDB_CODE_SUCCESS;
244,324✔
1235
    }
1236

1237
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
1,348,271✔
1238
    pPrev->pListHead->readIdx++;
1,348,271✔
1239
  }
1240

1241
  *ppRes = NULL;
80,096✔
1242
  setOperatorCompleted(pOperator);
80,096✔
1243

1244
  return TSDB_CODE_SUCCESS;
80,096✔
1245
}
1246

1247
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
744,711✔
1248
  if (pBlock) {
744,711✔
1249
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
244,324✔
1250
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
244,324✔
1251
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
244,324✔
1252

1253
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
247,126✔
1254
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,802✔
1255
        if (pSlot == NULL) {
2,802✔
1256
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1257
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1258
        }
1259
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,802✔
1260
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,802✔
1261
        if (code != TSDB_CODE_SUCCESS) {
2,802✔
1262
          return code;
×
1263
        }
1264
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,802✔
1265
        if (code != TSDB_CODE_SUCCESS) {
2,802✔
UNCOV
1266
          return code;
×
1267
        }
1268
      }
1269
    } else {
UNCOV
1270
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
UNCOV
1271
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1272
    }
1273
  }
1274
  return TSDB_CODE_SUCCESS;
744,711✔
1275
}
1276

1277
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
757,652✔
1278
  int32_t                    code = TSDB_CODE_SUCCESS;
757,652✔
1279
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
757,652✔
1280
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
757,652✔
1281

1282
  QRY_PARAM_CHECK(pRes);
757,652✔
1283
  if (pOperator->status == OP_EXEC_DONE) {
757,652✔
1284
    return code;
12,941✔
1285
  }
1286

1287
  int64_t st = 0;
744,711✔
1288
  if (pOperator->cost.openCost == 0) {
744,711✔
1289
    st = taosGetTimestampUs();
500,962✔
1290
  }
1291

1292
  if (!pStbJoin->ctx.prev.joinBuild) {
744,711✔
1293
    buildStbJoinTableList(pOperator);
500,962✔
1294
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
500,962✔
1295
      setOperatorCompleted(pOperator);
420,291✔
1296
      goto _return;
420,291✔
1297
    }
1298
  }
1299

1300
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
324,420✔
1301
  if (*pRes) {
324,420✔
UNCOV
1302
    goto _return;
×
1303
  }
1304

1305
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
324,420✔
1306

1307
_return:
324,420✔
1308
  if (pOperator->cost.openCost == 0) {
744,711✔
1309
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
500,962✔
1310
  }
1311

1312
  if (code) {
744,711✔
UNCOV
1313
    qError("%s failed since %s", __func__, tstrerror(code));
×
UNCOV
1314
    pOperator->pTaskInfo->code = code;
×
UNCOV
1315
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1316
  } else {
1317
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
744,711✔
1318
  }
1319
  return code;
744,711✔
1320
}
1321

1322
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
600,360✔
1323
  int32_t                   code = TSDB_CODE_SUCCESS;
600,360✔
1324
  int32_t                   lino = 0;
600,360✔
1325
  SVTableScanOperatorParam* pVScan = NULL;
600,360✔
1326
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
600,360✔
1327
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
600,360✔
1328

1329
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
600,360✔
1330
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
600,360✔
1331

1332
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
600,360✔
1333
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
600,360✔
1334
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
600,360✔
1335
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
600,360✔
1336
  pVScan->uid = uid;
600,360✔
1337
  pVScan->window = pInfo->vtbScan.window;
600,360✔
1338

1339
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
600,360✔
1340
  (*ppRes)->downstreamIdx = 0;
600,360✔
1341
  (*ppRes)->value = pVScan;
600,360✔
1342
  (*ppRes)->reUse = false;
600,360✔
1343

1344
  return TSDB_CODE_SUCCESS;
600,360✔
1345
_return:
×
UNCOV
1346
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1347
  if (pVScan) {
×
UNCOV
1348
    taosArrayDestroy(pVScan->pOpParamArray);
×
1349
    taosMemoryFreeClear(pVScan);
×
1350
  }
UNCOV
1351
  if (*ppRes) {
×
UNCOV
1352
    taosArrayDestroy((*ppRes)->pChildren);
×
UNCOV
1353
    taosMemoryFreeClear(*ppRes);
×
1354
  }
UNCOV
1355
  return code;
×
1356
}
1357

1358
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
153,738✔
1359
  int32_t                    lino = 0;
153,738✔
1360
  SOperatorInfo*             operator=(SOperatorInfo*) param;
153,738✔
1361
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
153,738✔
1362

1363
  if (TSDB_CODE_SUCCESS != code) {
153,738✔
UNCOV
1364
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
UNCOV
1365
    if (operator->pTaskInfo->code != code) {
×
UNCOV
1366
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1367
             tstrerror(operator->pTaskInfo->code));
1368
    } else {
UNCOV
1369
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1370
    }
UNCOV
1371
    goto _return;
×
1372
  }
1373

1374
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
153,738✔
1375
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
153,738✔
1376

1377
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
153,738✔
1378
  QUERY_CHECK_CODE(code, lino, _return);
153,738✔
1379

1380
  taosMemoryFreeClear(pMsg->pData);
153,738✔
1381

1382
  code = tsem_post(&pScanResInfo->vtbScan.ready);
153,738✔
1383
  QUERY_CHECK_CODE(code, lino, _return);
153,738✔
1384

1385
  return code;
153,738✔
UNCOV
1386
_return:
×
UNCOV
1387
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1388
  return code;
×
1389
}
1390

1391
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
153,738✔
1392
  int32_t                    code = TSDB_CODE_SUCCESS;
153,738✔
1393
  int32_t                    lino = 0;
153,738✔
1394
  char*                      buf1 = NULL;
153,738✔
1395
  SUseDbReq*                 pReq = NULL;
153,738✔
1396
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
153,738✔
1397

1398
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
153,738✔
1399
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
153,738✔
1400
  code = tNameGetFullDbName(name, pReq->db);
153,738✔
1401
  QUERY_CHECK_CODE(code, lino, _return);
153,738✔
1402
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
153,738✔
1403
  buf1 = taosMemoryCalloc(1, contLen);
153,738✔
1404
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
153,738✔
1405
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
153,738✔
1406
  if (tempRes < 0) {
153,738✔
UNCOV
1407
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1408
  }
1409

1410
  // send the fetch remote task result request
1411
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
153,738✔
1412
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
153,738✔
1413

1414
  pMsgSendInfo->param = pOperator;
153,738✔
1415
  pMsgSendInfo->msgInfo.pData = buf1;
153,738✔
1416
  pMsgSendInfo->msgInfo.len = contLen;
153,738✔
1417
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
153,738✔
1418
  pMsgSendInfo->fp = dynProcessUseDbRsp;
153,738✔
1419
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
153,738✔
1420

1421
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
153,738✔
1422
  QUERY_CHECK_CODE(code, lino, _return);
153,738✔
1423

1424
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
153,738✔
1425
  QUERY_CHECK_CODE(code, lino, _return);
153,738✔
1426

1427
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
153,738✔
1428
  QUERY_CHECK_CODE(code, lino, _return);
153,738✔
1429

1430
_return:
153,738✔
1431
  if (code) {
153,738✔
1432
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1433
     taosMemoryFree(buf1);
×
1434
  }
1435
  taosMemoryFree(pReq);
153,738✔
1436
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
153,738✔
1437
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
153,738✔
1438
  return code;
153,738✔
1439
}
1440

1441
int dynVgInfoComp(const void* lp, const void* rp) {
289,044✔
1442
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
289,044✔
1443
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
289,044✔
1444
  if (pLeft->hashBegin < pRight->hashBegin) {
289,044✔
1445
    return -1;
289,044✔
UNCOV
1446
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
UNCOV
1447
    return 1;
×
1448
  }
1449

1450
  return 0;
×
1451
}
1452

1453
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
979,574✔
1454
  if (NULL == dbInfo) {
979,574✔
UNCOV
1455
    return TSDB_CODE_SUCCESS;
×
1456
  }
1457

1458
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
979,574✔
1459
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
153,738✔
1460
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
153,738✔
1461
    if (NULL == dbInfo->vgArray) {
153,738✔
UNCOV
1462
      return terrno;
×
1463
    }
1464

1465
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
153,738✔
1466
    while (pIter) {
451,998✔
1467
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
596,520✔
UNCOV
1468
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
UNCOV
1469
        return terrno;
×
1470
      }
1471

1472
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
298,260✔
1473
    }
1474

1475
    taosArraySort(dbInfo->vgArray, sort_func);
153,738✔
1476
  }
1477

1478
  return TSDB_CODE_SUCCESS;
979,574✔
1479
}
1480

1481
int32_t dynHashValueComp(void const* lp, void const* rp) {
1,427,273✔
1482
  uint32_t*    key = (uint32_t*)lp;
1,427,273✔
1483
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
1,427,273✔
1484

1485
  if (*key < pVg->hashBegin) {
1,427,273✔
1486
    return -1;
×
1487
  } else if (*key > pVg->hashEnd) {
1,427,273✔
1488
    return 1;
447,699✔
1489
  }
1490

1491
  return 0;
979,574✔
1492
}
1493

1494
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
979,574✔
1495
  int32_t code = 0;
979,574✔
1496
  int32_t lino = 0;
979,574✔
1497
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
979,574✔
1498
  QUERY_CHECK_CODE(code, lino, _return);
979,574✔
1499

1500
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
979,574✔
1501
  if (vgNum <= 0) {
979,574✔
UNCOV
1502
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
UNCOV
1503
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1504
  }
1505

1506
  SVgroupInfo* vgInfo = NULL;
979,574✔
1507
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
979,574✔
1508
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
979,574✔
1509
  int32_t offset = (int32_t)strlen(tbFullName);
979,574✔
1510

1511
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
979,574✔
1512
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
1,959,148✔
1513
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
979,574✔
1514

1515
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
979,574✔
1516
  if (NULL == vgInfo) {
979,574✔
UNCOV
1517
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1518
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
UNCOV
1519
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1520
  }
1521

1522
  *vgId = vgInfo->vgId;
979,574✔
1523

1524
_return:
979,574✔
1525
  return code;
979,574✔
1526
}
1527

1528
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
4,966,357✔
1529
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,966,357✔
1530
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,966,357✔
1531
  SArray *                   pColList = pVtbScan->readColList;
4,966,357✔
1532
  if (pVtbScan->scanAllCols) {
4,966,357✔
1533
    return true;
288,756✔
1534
  }
1535
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
23,867,908✔
1536
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
20,969,417✔
1537
      return true;
1,779,110✔
1538
    }
1539
  }
1540
  return false;
2,898,491✔
1541
}
1542

1543
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
2,115,018✔
1544
  int32_t                    code = TSDB_CODE_SUCCESS;
2,115,018✔
1545
  int32_t                    line = 0;
2,115,018✔
1546
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,115,018✔
1547
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,115,018✔
1548
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,115,018✔
1549
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
2,115,018✔
1550
  SUseDbOutput*              output = NULL;
2,115,018✔
1551
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
2,115,018✔
1552

1553
  QRY_PARAM_CHECK(dbVgInfo);
2,115,018✔
1554

1555
  if (find == NULL) {
2,115,018✔
1556
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
153,738✔
1557
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
153,738✔
1558
    QUERY_CHECK_CODE(code, line, _return);
153,738✔
1559
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
153,738✔
1560
    QUERY_CHECK_CODE(code, line, _return);
153,738✔
1561
  } else {
1562
    output = *find;
1,961,280✔
1563
  }
1564

1565
  *dbVgInfo = output->dbVgroup;
2,115,018✔
1566
  return code;
2,115,018✔
UNCOV
1567
_return:
×
UNCOV
1568
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
1569
  freeUseDbOutput(output);
×
UNCOV
1570
  return code;
×
1571
}
1572

1573
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
2,115,018✔
1574
  int32_t     code = TSDB_CODE_SUCCESS;
2,115,018✔
1575
  int32_t     line = 0;
2,115,018✔
1576

1577
  const char *first_dot = strchr(colref, '.');
2,115,018✔
1578
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
2,115,018✔
1579

1580
  const char *second_dot = strchr(first_dot + 1, '.');
2,115,018✔
1581
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
2,115,018✔
1582

1583
  size_t db_len = first_dot - colref;
2,115,018✔
1584
  size_t table_len = second_dot - first_dot - 1;
2,115,018✔
1585
  size_t col_len = strlen(second_dot + 1);
2,115,018✔
1586

1587
  *refDb = taosMemoryMalloc(db_len + 1);
2,115,018✔
1588
  *refTb = taosMemoryMalloc(table_len + 1);
2,115,018✔
1589
  *refCol = taosMemoryMalloc(col_len + 1);
2,115,018✔
1590
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
2,115,018✔
1591
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
2,115,018✔
1592
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
2,115,018✔
1593

1594
  tstrncpy(*refDb, colref, db_len + 1);
2,115,018✔
1595
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
2,115,018✔
1596
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
2,115,018✔
1597

1598
  return TSDB_CODE_SUCCESS;
2,115,018✔
UNCOV
1599
_return:
×
UNCOV
1600
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
1601
  if (*refDb) {
×
UNCOV
1602
    taosMemoryFree(*refDb);
×
UNCOV
1603
    *refDb = NULL;
×
1604
  }
UNCOV
1605
  if (*refTb) {
×
UNCOV
1606
    taosMemoryFree(*refTb);
×
UNCOV
1607
    *refTb = NULL;
×
1608
  }
UNCOV
1609
  if (*refCol) {
×
UNCOV
1610
    taosMemoryFree(*refCol);
×
UNCOV
1611
    *refCol = NULL;
×
1612
  }
UNCOV
1613
  return code;
×
1614
}
1615

1616
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
21,853,214✔
1617
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
21,853,214✔
1618
      strlen(expectTbName) == varDataLen(tbName) &&
7,988,548✔
1619
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
7,988,548✔
1620
      strlen(expectDbName) == varDataLen(dbName)) {
7,988,548✔
1621
    return true;
7,988,548✔
1622
  }
1623
  return false;
13,864,666✔
1624
}
1625

1626
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
7,988,548✔
1627
  int32_t          code = TSDB_CODE_SUCCESS;
7,988,548✔
1628
  int32_t          line = 0;
7,988,548✔
1629

1630
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
7,988,548✔
1631
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
7,988,548✔
1632
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
7,988,548✔
1633
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
7,988,548✔
1634
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
7,988,548✔
1635
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
7,988,548✔
1636

1637
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
7,988,548✔
1638
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
7,988,548✔
1639
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
7,988,548✔
1640
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
7,988,548✔
1641
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
7,988,548✔
1642
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
7,988,548✔
1643

1644
  if (colDataIsNull_s(pRefCol, index)) {
15,977,096✔
1645
    pInfo->colrefName = NULL;
3,010,591✔
1646
  } else {
1647
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
4,977,957✔
1648
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
4,977,957✔
1649
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
4,977,957✔
1650
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
4,977,957✔
1651
  }
1652

1653
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
7,988,548✔
1654
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
7,988,548✔
1655
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
7,988,548✔
1656
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
7,988,548✔
1657

1658
  if (!colDataIsNull_s(pUidCol, index)) {
15,977,096✔
1659
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
7,988,548✔
1660
  }
1661
  if (!colDataIsNull_s(pColIdCol, index)) {
15,977,096✔
1662
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
4,977,957✔
1663
  }
1664
  if (!colDataIsNull_s(pVgIdCol, index)) {
15,977,096✔
1665
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
7,988,548✔
1666
  }
1667

UNCOV
1668
_return:
×
1669
  return code;
7,988,548✔
1670
}
1671

1672
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
258,770✔
1673
  int32_t                    code = TSDB_CODE_SUCCESS;
258,770✔
1674
  int32_t                    line = 0;
258,770✔
1675

1676
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
258,770✔
1677
    return code;
117,105✔
1678
  }
1679

1680
  if (pVtbScan->existOrgTbVg == NULL) {
141,665✔
1681
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
1682
    pVtbScan->curOrgTbVg = NULL;
×
1683
  }
1684

1685
  if (pVtbScan->curOrgTbVg != NULL) {
141,665✔
1686
    // which means rversion has changed
1687
    void*   pCurIter = NULL;
18,392✔
1688
    SArray* tmpArray = NULL;
18,392✔
1689
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
53,184✔
1690
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
34,792✔
1691
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
34,792✔
1692
        if (tmpArray == NULL) {
4,400✔
1693
          tmpArray = taosArrayInit(1, sizeof(int32_t));
4,400✔
1694
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
4,400✔
1695
        }
1696
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
4,400✔
1697
      }
1698
    }
1699
    if (tmpArray == NULL) {
18,392✔
1700
      return TSDB_CODE_SUCCESS;
13,992✔
1701
    }
1702
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
4,400✔
1703
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
4,400✔
1704
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
4,400✔
UNCOV
1705
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
UNCOV
1706
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
UNCOV
1707
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
1708
        }
UNCOV
1709
        taosArrayDestroy(expiredInfo);
×
1710
      }
1711
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
4,400✔
UNCOV
1712
        taosArrayDestroy(tmpArray);
×
1713
      }
1714
    }
1715
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
4,400✔
1716
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
4,400✔
1717
    taosHashClear(pVtbScan->curOrgTbVg);
4,400✔
1718
    pVtbScan->needRedeploy = true;
4,400✔
1719
    pVtbScan->rversion = rversion;
4,400✔
1720
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
4,400✔
1721
  }
1722
  return code;
123,273✔
UNCOV
1723
_return:
×
UNCOV
1724
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
UNCOV
1725
  return code;
×
1726
}
1727

1728
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
47,152✔
1729
  int32_t                    code =TSDB_CODE_SUCCESS;
47,152✔
1730
  int32_t                    line = 0;
47,152✔
1731
  char*                      refDbName = NULL;
47,152✔
1732
  char*                      refTbName = NULL;
47,152✔
1733
  char*                      refColName = NULL;
47,152✔
1734
  SDBVgInfo*                 dbVgInfo = NULL;
47,152✔
1735
  SName                      name = {0};
47,152✔
1736
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
47,152✔
1737
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
47,152✔
1738

1739
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
47,152✔
1740
  QUERY_CHECK_CODE(code, line, _return);
47,152✔
1741

1742
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
47,152✔
1743

1744
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
47,152✔
1745
  QUERY_CHECK_CODE(code, line, _return);
47,152✔
1746

1747
  code = tNameGetFullDbName(&name, dbFname);
47,152✔
1748
  QUERY_CHECK_CODE(code, line, _return);
47,152✔
1749

1750
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
47,152✔
1751
  QUERY_CHECK_CODE(code, line, _return);
47,152✔
1752

1753
_return:
47,152✔
1754
  taosMemoryFree(refDbName);
47,152✔
1755
  taosMemoryFree(refTbName);
47,152✔
1756
  taosMemoryFree(refColName);
47,152✔
1757
  return code;
47,152✔
1758
}
1759

1760
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
127,497✔
1761
  int32_t                    code = TSDB_CODE_SUCCESS;
127,497✔
1762
  int32_t                    line = 0;
127,497✔
1763
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
127,497✔
1764
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
127,497✔
1765
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
127,497✔
1766
  SArray*                    pColRefArray = NULL;
127,497✔
1767
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
127,497✔
1768

1769
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
127,497✔
1770
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
127,497✔
1771

1772
  while (true) {
260,108✔
1773
    SSDataBlock *pChildInfo = NULL;
387,605✔
1774
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
387,605✔
1775
    QUERY_CHECK_CODE(code, line, _return);
387,605✔
1776
    if (pChildInfo == NULL) {
387,605✔
1777
      break;
127,497✔
1778
    }
1779
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
260,108✔
1780
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
260,108✔
1781
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
260,108✔
1782

1783
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
260,108✔
1784
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
260,108✔
1785
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
260,108✔
1786

1787
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
13,149,234✔
1788
      if (!colDataIsNull_s(pStbNameCol, i)) {
25,778,252✔
1789
        char* stbrawname = colDataGetData(pStbNameCol, i);
12,889,126✔
1790
        char* dbrawname = colDataGetData(pDbNameCol, i);
12,889,126✔
1791
        char *ctbName = colDataGetData(pTableNameCol, i);
12,889,126✔
1792

1793
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
12,889,126✔
1794
          SColRefInfo info = {0};
7,343,850✔
1795
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
7,343,850✔
1796
          QUERY_CHECK_CODE(code, line, _return);
7,343,850✔
1797

1798
          if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
7,343,850✔
UNCOV
1799
            destroyColRefInfo(&info);
×
UNCOV
1800
            continue;
×
1801
          }
1802

1803
          if (pTaskInfo->pStreamRuntimeInfo) {
7,343,850✔
1804
            if (pVtbScan->curOrgTbVg == NULL) {
63,936✔
1805
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,396✔
1806
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
2,396✔
1807
            }
1808

1809
            if (info.colrefName) {
63,936✔
1810
              int32_t vgId;
34,352✔
1811
              code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
34,352✔
1812
              QUERY_CHECK_CODE(code, line, _return);
34,352✔
1813
              code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
34,352✔
1814
              QUERY_CHECK_CODE(code, line, _return);
34,352✔
1815
            }
1816

1817
          }
1818

1819
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
7,343,850✔
1820
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
474,687✔
1821
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
474,687✔
1822
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
949,374✔
1823
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
474,687✔
1824
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
949,374✔
1825
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
474,687✔
1826
            QUERY_CHECK_CODE(code, line, _return);
474,687✔
1827
          } else {
1828
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
6,869,163✔
1829
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
6,869,163✔
1830
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
6,869,163✔
1831
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
6,869,163✔
1832
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
13,738,326✔
1833
          }
1834
        }
1835
      }
1836
    }
1837
  }
1838

1839
  code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
127,497✔
1840
  QUERY_CHECK_CODE(code, line, _return);
127,497✔
1841

1842
_return:
127,497✔
1843
  if (code) {
127,497✔
1844
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,000✔
1845
  }
1846
  return code;
127,497✔
1847
}
1848

1849
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
130,888✔
1850
  int32_t                    code = TSDB_CODE_SUCCESS;
130,888✔
1851
  int32_t                    line = 0;
130,888✔
1852
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
130,888✔
1853
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
131,273✔
1854
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
131,273✔
1855
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
131,273✔
1856
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
131,273✔
1857
  int32_t                    rversion = 0;
131,273✔
1858

1859
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
131,273✔
1860
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
131,273✔
1861

1862
  while (true) {
255,727✔
1863
    SSDataBlock *pTableInfo = NULL;
387,000✔
1864
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
387,000✔
1865
    if (pTableInfo == NULL) {
387,000✔
1866
      break;
131,273✔
1867
    }
1868

1869
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
255,727✔
1870
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
255,727✔
1871
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
255,727✔
1872

1873
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
255,727✔
1874
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
255,727✔
1875
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
255,727✔
1876

1877
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
9,219,815✔
1878
      if (!colDataIsNull_s(pRefVerCol, i)) {
17,928,176✔
1879
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
8,964,088✔
1880
      }
1881

1882
      if (!colDataIsNull_s(pTableNameCol, i)) {
17,928,176✔
1883
        char* tbrawname = colDataGetData(pTableNameCol, i);
8,964,088✔
1884
        char* dbrawname = colDataGetData(pDbNameCol, i);
8,964,088✔
1885
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
8,964,088✔
1886
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
8,964,088✔
1887

1888
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
8,964,088✔
1889
          SColRefInfo info = {0};
644,698✔
1890
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
644,698✔
1891
          QUERY_CHECK_CODE(code, line, _return);
644,698✔
1892

1893
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
644,698✔
1894
            if (pVtbScan->curOrgTbVg == NULL) {
12,800✔
1895
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
800✔
1896
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
800✔
1897
            }
1898
            int32_t vgId;
12,800✔
1899
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
12,800✔
1900
            QUERY_CHECK_CODE(code, line, _return);
12,800✔
1901
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
12,800✔
1902
            QUERY_CHECK_CODE(code, line, _return);
12,800✔
1903
          }
1904

1905
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,289,396✔
1906
        }
1907
      }
1908
    }
1909
  }
1910
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
131,273✔
1911
  QUERY_CHECK_CODE(code, line, _return);
131,273✔
1912

1913
_return:
128,873✔
1914
  if (code) {
131,273✔
1915
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,400✔
1916
  }
1917
  return code;
131,273✔
1918
}
1919

1920
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,740,800✔
1921
  int32_t                    code = TSDB_CODE_SUCCESS;
2,740,800✔
1922
  int32_t                    line = 0;
2,740,800✔
1923
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,740,800✔
1924
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,740,800✔
1925
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,740,800✔
1926

1927
  SArray *tmpArray = NULL;
2,740,800✔
1928
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,740,800✔
1929
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,740,400✔
1930
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
8,800✔
1931
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
4,400✔
1932
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
4,400✔
1933
      QUERY_CHECK_CODE(code, line, _return);
4,400✔
1934
      if (pVtbScan->newAddedVgInfo == NULL) {
4,400✔
1935
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,600✔
1936
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
1,600✔
1937
      }
1938
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
4,400✔
1939
      QUERY_CHECK_CODE(code, line, _return);
4,400✔
1940
    }
1941
    pVtbScan->needRedeploy = false;
4,400✔
1942
  } else {
1943
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,736,400✔
1944
    QUERY_CHECK_CODE(code, line, _return);
2,736,400✔
1945
  }
1946

UNCOV
1947
_return:
×
1948
  taosArrayClear(tmpArray);
2,740,800✔
1949
  taosArrayDestroy(tmpArray);
2,740,800✔
1950
  if (code) {
2,740,800✔
1951
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,736,400✔
1952
  }
1953
  return code;
2,740,800✔
1954
}
1955

1956
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
600,360✔
1957
  int32_t                    code = TSDB_CODE_SUCCESS;
600,360✔
1958
  int32_t                    line = 0;
600,360✔
1959
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
600,360✔
1960
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
600,360✔
1961
  SDBVgInfo*                 dbVgInfo = NULL;
600,360✔
1962

1963
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
8,566,508✔
1964
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
7,966,148✔
1965
    *uid = pKV->uid;
7,966,148✔
1966
    *vgId = pKV->vgId;
7,966,148✔
1967
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
7,966,148✔
1968
      char*   refDbName = NULL;
2,067,866✔
1969
      char*   refTbName = NULL;
2,067,866✔
1970
      char*   refColName = NULL;
2,067,866✔
1971
      SName   name = {0};
2,067,866✔
1972
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
2,067,866✔
1973
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
2,067,866✔
1974

1975
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
2,067,866✔
1976
      QUERY_CHECK_CODE(code, line, _return);
2,067,866✔
1977

1978
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
2,067,866✔
1979

1980
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
2,067,866✔
1981
      QUERY_CHECK_CODE(code, line, _return);
2,067,866✔
1982
      code = tNameGetFullDbName(&name, dbFname);
2,067,866✔
1983
      QUERY_CHECK_CODE(code, line, _return);
2,067,866✔
1984
      code = tNameGetFullTableName(&name, orgTbFName);
2,067,866✔
1985
      QUERY_CHECK_CODE(code, line, _return);
2,067,866✔
1986

1987
      void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
2,067,866✔
1988
      if (!pVal) {
2,067,866✔
1989
        SOrgTbInfo map = {0};
932,422✔
1990
        code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
932,422✔
1991
        QUERY_CHECK_CODE(code, line, _return);
932,422✔
1992
        tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
932,422✔
1993
        map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
932,422✔
1994
        QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno)
932,422✔
1995
        SColIdNameKV colIdNameKV = {0};
932,422✔
1996
        colIdNameKV.colId = pKV->colId;
932,422✔
1997
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
932,422✔
1998
        QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno)
1,864,844✔
1999
        code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
932,422✔
2000
        QUERY_CHECK_CODE(code, line, _return);
932,422✔
2001
      } else {
2002
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
1,135,444✔
2003
        SColIdNameKV colIdNameKV = {0};
1,135,444✔
2004
        colIdNameKV.colId = pKV->colId;
1,135,444✔
2005
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
1,135,444✔
2006
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
2,270,888✔
2007
      }
2008
      taosMemoryFree(refDbName);
2,067,866✔
2009
      taosMemoryFree(refTbName);
2,067,866✔
2010
      taosMemoryFree(refColName);
2,067,866✔
2011
    }
2012
  }
2013

2014
_return:
600,360✔
2015
  if (code) {
600,360✔
UNCOV
2016
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2017
  }
2018
  return code;
600,360✔
2019
}
2020

2021
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
600,360✔
2022
  int32_t                    code = TSDB_CODE_SUCCESS;
600,360✔
2023
  int32_t                    line = 0;
600,360✔
2024
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
600,360✔
2025
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
600,360✔
2026
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
600,360✔
2027

2028
  pVtbScan->vtbScanParam = NULL;
600,360✔
2029
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
600,360✔
2030
  QUERY_CHECK_CODE(code, line, _return);
600,360✔
2031

2032
  void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
600,360✔
2033
  while (pIter != NULL) {
1,532,782✔
2034
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
932,422✔
2035
    SOperatorParam*  pExchangeParam = NULL;
932,422✔
2036
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
932,422✔
2037
    if (addr != NULL) {
932,422✔
2038
      code = buildExchangeOperatorParamForVScanEx(&pExchangeParam, 0, pMap, pTaskInfo->id.taskId, addr);
4,400✔
2039
      QUERY_CHECK_CODE(code, line, _return);
4,400✔
2040
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
4,400✔
2041
      QUERY_CHECK_CODE(code, line, _return);
4,400✔
2042
    } else {
2043
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
928,022✔
2044
      QUERY_CHECK_CODE(code, line, _return);
928,022✔
2045
    }
2046
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
1,864,844✔
2047
    pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
932,422✔
2048
  }
2049

2050
  SOperatorParam*  pExchangeParam = NULL;
600,360✔
2051
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
600,360✔
2052
  QUERY_CHECK_CODE(code, line, _return);
600,360✔
2053
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
600,360✔
2054

2055
_return:
600,360✔
2056
  if (code) {
600,360✔
UNCOV
2057
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2058
  }
2059
  return code;
600,360✔
2060
}
2061

2062
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
2,007,586✔
2063
  int32_t                    code = TSDB_CODE_SUCCESS;
2,007,586✔
2064
  int32_t                    line = 0;
2,007,586✔
2065
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,007,586✔
2066
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,007,586✔
2067
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
2,007,586✔
2068

2069
  pVtbScan->orgTbVgColMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,007,586✔
2070
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno)
2,007,586✔
2071
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
2,007,586✔
2072

2073
  while (true) {
2074
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
2,353,576✔
2075
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
1,753,216✔
2076
      QUERY_CHECK_CODE(code, line, _return);
1,753,216✔
2077
    } else {
2078
      taosHashClear(pVtbScan->orgTbVgColMap);
600,360✔
2079
      SArray* pColRefInfo = NULL;
600,360✔
2080
      if (pVtbScan->isSuperTable) {
600,360✔
2081
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
471,487✔
2082
      } else {
2083
        pColRefInfo = pInfo->vtbScan.colRefInfo;
128,873✔
2084
      }
2085
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
600,360✔
2086

2087
      tb_uid_t uid = 0;
600,360✔
2088
      int32_t  vgId = 0;
600,360✔
2089
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
600,360✔
2090
      QUERY_CHECK_CODE(code, line, _return);
600,360✔
2091

2092
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
600,360✔
2093
      QUERY_CHECK_CODE(code, line, _return);
600,360✔
2094

2095
      // reset downstream operator's status
2096
      pVtbScanOp->status = OP_NOT_OPENED;
600,360✔
2097
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
600,360✔
2098
      QUERY_CHECK_CODE(code, line, _return);
600,360✔
2099
    }
2100

2101
    if (*pRes) {
2,353,576✔
2102
      // has result, still read data from this table.
2103
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
1,753,216✔
2104
      break;
1,753,216✔
2105
    } else {
2106
      // no result, read next table.
2107
      pVtbScan->curTableIdx++;
600,360✔
2108
      if (pVtbScan->isSuperTable) {
600,360✔
2109
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
471,487✔
2110
          setOperatorCompleted(pOperator);
125,497✔
2111
          break;
125,497✔
2112
        }
2113
      } else {
2114
        setOperatorCompleted(pOperator);
128,873✔
2115
        break;
128,873✔
2116
      }
2117
    }
2118
  }
2119

2120
_return:
2,007,586✔
2121
  taosHashCleanup(pVtbScan->orgTbVgColMap);
2,007,586✔
2122
  pVtbScan->orgTbVgColMap = NULL;
2,007,586✔
2123
  if (code) {
2,007,586✔
UNCOV
2124
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2125
  }
2126
  return code;
2,007,586✔
2127
}
2128

2129
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
2,011,986✔
2130
  int32_t                    code = TSDB_CODE_SUCCESS;
2,011,986✔
2131
  int32_t                    line = 0;
2,011,986✔
2132
  int64_t                    st = 0;
2,011,986✔
2133
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,011,986✔
2134
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,011,986✔
2135

2136
  if (OPTR_IS_OPENED(pOperator)) {
2,011,986✔
2137
    return code;
1,753,216✔
2138
  }
2139

2140
  if (pOperator->cost.openCost == 0) {
258,770✔
2141
    st = taosGetTimestampUs();
149,338✔
2142
  }
2143

2144
  if (pVtbScan->isSuperTable) {
258,770✔
2145
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
127,497✔
2146
    QUERY_CHECK_CODE(code, line, _return);
127,497✔
2147
  } else {
2148
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
131,273✔
2149
    QUERY_CHECK_CODE(code, line, _return);
131,273✔
2150
  }
2151

2152
  OPTR_SET_OPENED(pOperator);
254,370✔
2153

2154
_return:
258,770✔
2155
  if (pOperator->cost.openCost == 0) {
258,770✔
2156
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
149,338✔
2157
  }
2158
  if (code) {
258,770✔
2159
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
4,400✔
2160
    pOperator->pTaskInfo->code = code;
4,400✔
2161
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
4,400✔
2162
  }
2163
  return code;
254,370✔
2164
}
2165

2166
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
4,748,386✔
2167
  int32_t                    code = TSDB_CODE_SUCCESS;
4,748,386✔
2168
  int32_t                    line = 0;
4,748,386✔
2169
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
4,748,386✔
2170
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
4,748,386✔
2171

2172
  QRY_PARAM_CHECK(pRes);
4,748,386✔
2173
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
4,748,386✔
UNCOV
2174
    return code;
×
2175
  }
2176
  if (pOperator->pOperatorGetParam) {
4,748,386✔
NEW
2177
    if (pOperator->status == OP_EXEC_DONE) {
×
NEW
2178
      pOperator->status = OP_OPENED;
×
2179
    }
NEW
2180
    pVtbScan->curTableIdx = 0;
×
NEW
2181
    pVtbScan->lastTableIdx = -1;
×
NEW
2182
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
NEW
2183
    pOperator->pOperatorGetParam = NULL;
×
2184
  } else {
2185
    pVtbScan->window.skey = INT64_MAX;
4,748,386✔
2186
    pVtbScan->window.ekey = INT64_MIN;
4,748,386✔
2187
  }
2188

2189
  if (pVtbScan->needRedeploy) {
4,747,986✔
2190
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,740,800✔
2191
    QUERY_CHECK_CODE(code, line, _return);
2,740,800✔
2192
  }
2193

2194
  code = pOperator->fpSet._openFn(pOperator);
2,011,986✔
2195
  QUERY_CHECK_CODE(code, line, _return);
2,007,586✔
2196

2197
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
2,007,586✔
NEW
2198
    setOperatorCompleted(pOperator);
×
NEW
2199
    return code;
×
2200
  }
2201

2202
  code = virtualTableScanGetNext(pOperator, pRes);
2,007,586✔
2203
  QUERY_CHECK_CODE(code, line, _return);
2,007,586✔
2204

2205
  return code;
2,007,586✔
2206

2207
_return:
2,736,400✔
2208
  if (code) {
2,736,400✔
2209
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,736,400✔
2210
    pOperator->pTaskInfo->code = code;
2,736,400✔
2211
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,736,400✔
2212
  }
UNCOV
2213
  return code;
×
2214
}
2215

2216
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
500,962✔
2217
  if (batchFetch) {
500,962✔
2218
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
500,293✔
2219
    if (NULL == pPrev->leftHash) {
500,293✔
UNCOV
2220
      return terrno;
×
2221
    }
2222
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
500,293✔
2223
    if (NULL == pPrev->rightHash) {
500,293✔
UNCOV
2224
      return terrno;
×
2225
    }
2226
  } else {
2227
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
669✔
2228
    if (NULL == pPrev->leftCache) {
669✔
UNCOV
2229
      return terrno;
×
2230
    }
2231
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
669✔
2232
    if (NULL == pPrev->rightCache) {
669✔
UNCOV
2233
      return terrno;
×
2234
    }
2235
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
669✔
2236
    if (NULL == pPrev->onceTable) {
669✔
UNCOV
2237
      return terrno;
×
2238
    }
2239
  }
2240

2241
  return TSDB_CODE_SUCCESS;
500,962✔
2242
}
2243

2244
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
149,338✔
2245
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2246
  int32_t      code = TSDB_CODE_SUCCESS;
149,338✔
2247
  int32_t      line = 0;
149,338✔
2248

2249
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
149,338✔
2250
  QUERY_CHECK_CODE(code, line, _return);
149,338✔
2251

2252
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
149,338✔
2253
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
149,338✔
2254
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
149,338✔
2255
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
149,338✔
2256
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
149,338✔
2257
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
149,338✔
2258
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
149,338✔
2259
  pInfo->vtbScan.needRedeploy = false;
149,338✔
2260
  pInfo->vtbScan.pMsgCb = pMsgCb;
149,338✔
2261
  pInfo->vtbScan.curTableIdx = 0;
149,338✔
2262
  pInfo->vtbScan.lastTableIdx = -1;
149,338✔
2263
  pInfo->vtbScan.dynTbUid = 0;
149,338✔
2264
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
149,338✔
2265
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
149,338✔
2266
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
149,338✔
2267
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
149,338✔
2268
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
149,338✔
2269
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
149,338✔
2270
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
415,781✔
2271
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
266,443✔
2272
    int32_t vgId = (int32_t)valueNode->datum.i;
266,443✔
2273
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
266,443✔
2274
    QUERY_CHECK_CODE(code, line, _return);
266,443✔
2275
  }
2276

2277
  if (pPhyciNode->dynTbname) {
149,338✔
UNCOV
2278
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
UNCOV
2279
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
UNCOV
2280
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
UNCOV
2281
      if (pValue != NULL && pValue->isTbname) {
×
UNCOV
2282
        pInfo->vtbScan.dynTbUid = pValue->uid;
×
UNCOV
2283
        break;
×
2284
      }
2285
    }
2286
  }
2287

2288
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
149,338✔
2289
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
149,338✔
2290

2291
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
1,112,300✔
2292
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
962,962✔
2293
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
962,962✔
2294
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
1,925,924✔
2295
  }
2296

2297
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
149,338✔
2298
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
149,338✔
2299

2300
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
149,338✔
2301
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
149,338✔
2302

2303
  return code;
149,338✔
UNCOV
2304
_return:
×
2305
  // no need to destroy array and hashmap allocated in this function,
2306
  // since the operator's destroy function will take care of it
2307
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2308
  return code;
×
2309
}
2310

NEW
2311
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
×
2312
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
NEW
2313
  int32_t              code = TSDB_CODE_SUCCESS;
×
NEW
2314
  int32_t              line = 0;
×
NEW
2315
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
×
2316

NEW
2317
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
×
NEW
2318
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
×
NEW
2319
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
×
NEW
2320
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
×
NEW
2321
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
×
NEW
2322
  pInfo->vtbWindow.singleWinMode = pPhyciNode->vtbWindow.singleWinMode;
×
NEW
2323
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
×
2324

NEW
2325
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
×
NEW
2326
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
×
2327

NEW
2328
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
×
NEW
2329
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
×
2330

NEW
2331
  pInfo->vtbWindow.outputWstartSlotId = -1;
×
NEW
2332
  pInfo->vtbWindow.outputWendSlotId = -1;
×
NEW
2333
  pInfo->vtbWindow.outputWdurationSlotId = -1;
×
NEW
2334
  pInfo->vtbWindow.curWinBatchIdx = 0;
×
2335

NEW
2336
  initResultSizeInfo(&pOperator->resultInfo, 1);
×
NEW
2337
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
×
NEW
2338
  QUERY_CHECK_CODE(code, line, _return);
×
2339

NEW
2340
  return code;
×
NEW
2341
_return:
×
NEW
2342
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
NEW
2343
  return code;
×
2344
}
2345

NEW
2346
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
×
NEW
2347
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
2348
  int32_t lino = 0;
×
2349

NEW
2350
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
×
NEW
2351
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
×
NEW
2352
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
×
2353

NEW
2354
    *ppTsCols = (int64_t*)pColDataInfo->pData;
×
2355

NEW
2356
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
×
NEW
2357
      code = blockDataUpdateTsWindow(pBlock, slotId);
×
NEW
2358
      QUERY_CHECK_CODE(code, lino, _return);
×
2359
    }
2360
  }
2361

NEW
2362
  return code;
×
NEW
2363
_return:
×
NEW
2364
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2365
  return code;
×
2366
}
2367

NEW
2368
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
NEW
2369
  int32_t                       code = TSDB_CODE_SUCCESS;
×
NEW
2370
  int32_t                       lino = 0;
×
NEW
2371
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2372

NEW
2373
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
2374
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2375

NEW
2376
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
NEW
2377
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2378

NEW
2379
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
NEW
2380
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2381

NEW
2382
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
NEW
2383
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2384

NEW
2385
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
NEW
2386
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2387

NEW
2388
  SOperatorParam* pExchangeOperator = NULL;
×
NEW
2389
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
NEW
2390
  QUERY_CHECK_CODE(code, lino, _return);
×
NEW
2391
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
×
2392

NEW
2393
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
NEW
2394
  (*ppRes)->downstreamIdx = idx;
×
NEW
2395
  (*ppRes)->value = pExtWinOp;
×
NEW
2396
  (*ppRes)->reUse = false;
×
2397

NEW
2398
  return code;
×
NEW
2399
_return:
×
NEW
2400
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2401
  if (pExtWinOp) {
×
NEW
2402
    if (pExtWinOp->ExtWins) {
×
NEW
2403
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2404
    }
NEW
2405
    taosMemoryFree(pExtWinOp);
×
2406
  }
NEW
2407
  if (*ppRes) {
×
NEW
2408
    if ((*ppRes)->pChildren) {
×
NEW
2409
      taosArrayDestroy((*ppRes)->pChildren);
×
2410
    }
NEW
2411
    taosMemoryFree(*ppRes);
×
NEW
2412
    *ppRes = NULL;
×
2413
  }
NEW
2414
  return code;
×
2415
}
2416

NEW
2417
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
×
2418
                                       int32_t numOfDownstream, int32_t numOfWins) {
NEW
2419
  int32_t                   code = TSDB_CODE_SUCCESS;
×
NEW
2420
  int32_t                   lino = 0;
×
NEW
2421
  SMergeOperatorParam*      pMergeOp = NULL;
×
2422

NEW
2423
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
2424
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2425

NEW
2426
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
×
NEW
2427
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2428

NEW
2429
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
×
NEW
2430
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
×
2431

NEW
2432
  pMergeOp->winNum = numOfWins;
×
2433

NEW
2434
  for (int32_t i = 0; i < numOfDownstream; i++) {
×
NEW
2435
    SOperatorParam* pExternalWinParam = NULL;
×
NEW
2436
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
×
NEW
2437
    QUERY_CHECK_CODE(code, lino, _return);
×
NEW
2438
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
×
2439
  }
2440

NEW
2441
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
×
NEW
2442
  (*ppRes)->downstreamIdx = 0;
×
NEW
2443
  (*ppRes)->value = pMergeOp;
×
NEW
2444
  (*ppRes)->reUse = false;
×
2445

NEW
2446
  return TSDB_CODE_SUCCESS;
×
NEW
2447
_return:
×
NEW
2448
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2449
  if (pMergeOp) {
×
NEW
2450
    taosMemoryFree(pMergeOp);
×
2451
  }
NEW
2452
  if (*ppRes) {
×
NEW
2453
    if ((*ppRes)->pChildren) {
×
NEW
2454
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
NEW
2455
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
NEW
2456
        if (pChildParam) {
×
NEW
2457
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
NEW
2458
          if (pExtWinOp) {
×
NEW
2459
            if (pExtWinOp->ExtWins) {
×
NEW
2460
              taosArrayDestroy(pExtWinOp->ExtWins);
×
2461
            }
NEW
2462
            taosMemoryFree(pExtWinOp);
×
2463
          }
NEW
2464
          taosMemoryFree(pChildParam);
×
2465
        }
2466
      }
NEW
2467
      taosArrayDestroy((*ppRes)->pChildren);
×
2468
    }
NEW
2469
    taosMemoryFree(*ppRes);
×
NEW
2470
    *ppRes = NULL;
×
2471
  }
NEW
2472
  return code;
×
2473
}
2474

NEW
2475
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
×
NEW
2476
  int32_t                    code = TSDB_CODE_SUCCESS;
×
NEW
2477
  int32_t                    lino = 0;
×
NEW
2478
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
NEW
2479
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
NEW
2480
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
NEW
2481
  int64_t                    st = 0;
×
2482

NEW
2483
  if (OPTR_IS_OPENED(pOperator)) {
×
NEW
2484
    return code;
×
2485
  }
2486

NEW
2487
  if (pOperator->cost.openCost == 0) {
×
NEW
2488
    st = taosGetTimestampUs();
×
2489
  }
2490

NEW
2491
  while (1) {
×
NEW
2492
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
NEW
2493
    if (pBlock == NULL) {
×
NEW
2494
      break;
×
2495
    }
2496

NEW
2497
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
×
NEW
2498
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
×
NEW
2499
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
×
NEW
2500
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
×
NEW
2501
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
×
NEW
2502
            pInfo->outputWstartSlotId = i;
×
NEW
2503
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
×
NEW
2504
            pInfo->outputWendSlotId = i;
×
NEW
2505
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
×
NEW
2506
            pInfo->outputWdurationSlotId = i;
×
2507
          }
2508
        }
2509
      }
2510
    }
2511

NEW
2512
    TSKEY* wstartCol = NULL;
×
NEW
2513
    TSKEY* wendCol = NULL;
×
2514

NEW
2515
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
×
NEW
2516
    QUERY_CHECK_CODE(code, lino, _return);
×
NEW
2517
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
×
NEW
2518
    QUERY_CHECK_CODE(code, lino, _return);
×
2519

NEW
2520
    if (pDynInfo->vtbWindow.singleWinMode) {
×
NEW
2521
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
NEW
2522
        SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
NEW
2523
        QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
2524

NEW
2525
        QUERY_CHECK_NULL(taosArrayReserve(pWin, 1), code, lino, _return, terrno);
×
2526

NEW
2527
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, 0);
×
NEW
2528
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2529
        pWindow->tw.skey = wstartCol[i];
×
NEW
2530
        pWindow->tw.ekey = wendCol[i] + 1;
×
NEW
2531
        pWindow->winOutIdx = -1;
×
2532

NEW
2533
        QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
2534
      }
2535
    } else {
NEW
2536
      SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
NEW
2537
      QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
2538

NEW
2539
      QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
×
2540

NEW
2541
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
NEW
2542
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
×
NEW
2543
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2544
        pWindow->tw.skey = wstartCol[i];
×
NEW
2545
        pWindow->tw.ekey = wendCol[i] + 1;
×
NEW
2546
        pWindow->winOutIdx = -1;
×
2547
      }
2548

NEW
2549
      QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
2550
    }
2551
  }
2552

2553
  // handle first window's start key and last window's end key
NEW
2554
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
×
NEW
2555
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
×
2556

NEW
2557
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
×
NEW
2558
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
×
2559

NEW
2560
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
×
NEW
2561
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
×
2562

NEW
2563
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
NEW
2564
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2565

NEW
2566
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
×
NEW
2567
    lastWin->tw.ekey = INT64_MAX;
×
2568
  }
NEW
2569
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
×
NEW
2570
    firstWin->tw.skey = INT64_MIN;
×
2571
  }
2572

NEW
2573
  OPTR_SET_OPENED(pOperator);
×
2574

NEW
2575
  if (pOperator->cost.openCost == 0) {
×
NEW
2576
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
2577
  }
2578

NEW
2579
_return:
×
NEW
2580
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
2581
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2582
    pTaskInfo->code = code;
×
NEW
2583
    T_LONG_JMP(pTaskInfo->env, code);
×
2584
  }
NEW
2585
  return code;
×
2586
}
2587

NEW
2588
static int32_t buildDynQueryCtrlOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
NEW
2589
  int32_t                     code = TSDB_CODE_SUCCESS;
×
NEW
2590
  int32_t                     lino = 0;
×
NEW
2591
  SDynQueryCtrlOperatorParam* pDyn = NULL;
×
2592

NEW
2593
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
2594
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2595

NEW
2596
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
NEW
2597
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2598

NEW
2599
  pDyn = taosMemoryMalloc(sizeof(SDynQueryCtrlOperatorParam));
×
NEW
2600
  QUERY_CHECK_NULL(pDyn, code, lino, _return, terrno);
×
2601

NEW
2602
  pDyn->window.skey = skey;
×
NEW
2603
  pDyn->window.ekey = ekey;
×
2604

NEW
2605
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL;
×
NEW
2606
  (*ppRes)->downstreamIdx = 0;
×
NEW
2607
  (*ppRes)->reUse = false;
×
NEW
2608
  (*ppRes)->value = pDyn;
×
2609

NEW
2610
  return code;
×
NEW
2611
_return:
×
NEW
2612
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2613
  if (pDyn) {
×
NEW
2614
    taosMemoryFree(pDyn);
×
2615
  }
NEW
2616
  if (*ppRes) {
×
NEW
2617
    if ((*ppRes)->pChildren) {
×
NEW
2618
      taosArrayDestroy((*ppRes)->pChildren);
×
2619
    }
NEW
2620
    taosMemoryFree(*ppRes);
×
NEW
2621
    *ppRes = NULL;
×
2622
  }
NEW
2623
  return code;
×
2624
}
2625

NEW
2626
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
NEW
2627
  int32_t                       code = TSDB_CODE_SUCCESS;
×
NEW
2628
  int32_t                       lino = 0;
×
NEW
2629
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2630

NEW
2631
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
2632
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2633

NEW
2634
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
NEW
2635
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2636

NEW
2637
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
NEW
2638
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2639

NEW
2640
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
NEW
2641
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2642

NEW
2643
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
NEW
2644
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2645

NEW
2646
  SOperatorParam* pDynQueryCtrlParam = NULL;
×
NEW
2647
  code = buildDynQueryCtrlOperatorParamForExternalWindow(&pDynQueryCtrlParam, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
NEW
2648
  QUERY_CHECK_CODE(code, lino, _return);
×
NEW
2649
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pDynQueryCtrlParam), code, lino, _return, terrno)
×
2650

NEW
2651
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
NEW
2652
  (*ppRes)->downstreamIdx = idx;
×
NEW
2653
  (*ppRes)->value = pExtWinOp;
×
NEW
2654
  (*ppRes)->reUse = false;
×
2655

NEW
2656
  return code;
×
NEW
2657
_return:
×
NEW
2658
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2659
  if (pExtWinOp) {
×
NEW
2660
    if (pExtWinOp->ExtWins) {
×
NEW
2661
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2662
    }
NEW
2663
    taosMemoryFree(pExtWinOp);
×
2664
  }
NEW
2665
  if (*ppRes) {
×
NEW
2666
    if ((*ppRes)->pChildren) {
×
NEW
2667
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
NEW
2668
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
NEW
2669
        if (pChildParam) {
×
NEW
2670
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
NEW
2671
          if (pDynParam) {
×
NEW
2672
            taosMemoryFree(pDynParam);
×
2673
          }
NEW
2674
          taosMemoryFree(pChildParam);
×
2675
        }
2676
      }
NEW
2677
      taosArrayDestroy((*ppRes)->pChildren);
×
2678
    }
NEW
2679
    taosMemoryFree(*ppRes);
×
NEW
2680
    *ppRes = NULL;
×
2681
  }
NEW
2682
  return code;
×
2683
}
2684

NEW
2685
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
NEW
2686
  int32_t                    code = TSDB_CODE_SUCCESS;
×
NEW
2687
  int32_t                    lino = 0;
×
NEW
2688
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
NEW
2689
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
NEW
2690
  int64_t                    st = taosGetTimestampUs();
×
NEW
2691
  int32_t                    numOfWins = 0;
×
NEW
2692
  SOperatorInfo*             mergeOp = NULL;
×
NEW
2693
  SOperatorInfo*             extWinOp = NULL;
×
NEW
2694
  SOperatorParam*            pMergeParam = NULL;
×
NEW
2695
  SOperatorParam*            pExtWinParam = NULL;
×
NEW
2696
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
NEW
2697
  SSDataBlock*               pRes = pInfo->pRes;
×
2698

NEW
2699
  code = pOperator->fpSet._openFn(pOperator);
×
NEW
2700
  QUERY_CHECK_CODE(code, lino, _return);
×
2701

NEW
2702
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
×
NEW
2703
    *ppRes = NULL;
×
NEW
2704
    return code;
×
2705
  }
2706

NEW
2707
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
×
NEW
2708
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
×
2709

NEW
2710
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
×
2711

NEW
2712
  if (pInfo->isVstb) {
×
NEW
2713
    extWinOp = pOperator->pDownstream[1];
×
NEW
2714
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
×
NEW
2715
    QUERY_CHECK_CODE(code, lino, _return);
×
2716

NEW
2717
    SSDataBlock* pExtWinBlock = NULL;
×
NEW
2718
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
×
NEW
2719
    QUERY_CHECK_CODE(code, lino, _return);
×
2720

NEW
2721
    blockDataCleanup(pRes);
×
NEW
2722
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
NEW
2723
    QUERY_CHECK_CODE(code, lino, _return);
×
2724

NEW
2725
    if (pExtWinBlock) {
×
NEW
2726
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
×
NEW
2727
      QUERY_CHECK_CODE(code, lino, _return);
×
2728

NEW
2729
      if (pInfo->curWinBatchIdx == 0) {
×
2730
        // first batch, get _wstart from pMergedBlock
NEW
2731
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
NEW
2732
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2733

NEW
2734
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
×
2735
      }
NEW
2736
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
2737
        // last batch, get _wend from pMergedBlock
NEW
2738
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
NEW
2739
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2740

NEW
2741
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
×
2742
      }
2743
    }
2744
  } else {
NEW
2745
    mergeOp = pOperator->pDownstream[1];
×
NEW
2746
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
×
NEW
2747
    QUERY_CHECK_CODE(code, lino, _return);
×
2748

NEW
2749
    SSDataBlock* pMergedBlock = NULL;
×
NEW
2750
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
×
NEW
2751
    QUERY_CHECK_CODE(code, lino, _return);
×
2752

NEW
2753
    blockDataCleanup(pRes);
×
NEW
2754
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
NEW
2755
    QUERY_CHECK_CODE(code, lino, _return);
×
2756

NEW
2757
    if (pMergedBlock) {
×
NEW
2758
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
×
NEW
2759
      QUERY_CHECK_CODE(code, lino, _return);
×
2760

NEW
2761
      if (pInfo->curWinBatchIdx == 0) {
×
2762
        // first batch, get _wstart from pMergedBlock
NEW
2763
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
NEW
2764
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2765

NEW
2766
        firstWin->tw.skey = pMergedBlock->info.window.skey;
×
2767
      }
NEW
2768
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
2769
        // last batch, get _wend from pMergedBlock
NEW
2770
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
NEW
2771
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2772

NEW
2773
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
×
2774
      }
2775
    }
2776
  }
2777

2778

NEW
2779
  if (pInfo->outputWstartSlotId != -1) {
×
NEW
2780
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
×
NEW
2781
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
×
2782

NEW
2783
    for (int32_t i = 0; i < numOfWins; i++) {
×
NEW
2784
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
NEW
2785
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2786
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
×
NEW
2787
      QUERY_CHECK_CODE(code, lino, _return);
×
2788
    }
2789
  }
NEW
2790
  if (pInfo->outputWendSlotId != -1) {
×
NEW
2791
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
×
NEW
2792
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
×
2793

NEW
2794
    for (int32_t i = 0; i < numOfWins; i++) {
×
NEW
2795
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
NEW
2796
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2797
      TSKEY ekey = pWindow->tw.ekey - 1;
×
NEW
2798
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
×
NEW
2799
      QUERY_CHECK_CODE(code, lino, _return);
×
2800
    }
2801
  }
NEW
2802
  if (pInfo->outputWdurationSlotId != -1) {
×
NEW
2803
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
×
NEW
2804
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
×
2805

NEW
2806
    for (int32_t i = 0; i < numOfWins; i++) {
×
NEW
2807
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
NEW
2808
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2809
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
×
NEW
2810
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
×
NEW
2811
      QUERY_CHECK_CODE(code, lino, _return);
×
2812
    }
2813
  }
2814

NEW
2815
  pRes->info.rows = numOfWins;
×
NEW
2816
  *ppRes = pRes;
×
NEW
2817
  pInfo->curWinBatchIdx++;
×
2818

NEW
2819
  return code;
×
2820

NEW
2821
_return:
×
NEW
2822
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
2823
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2824
    pTaskInfo->code = code;
×
NEW
2825
    T_LONG_JMP(pTaskInfo->env, code);
×
2826
  }
NEW
2827
  return code;
×
2828
}
2829

2830
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,979,101✔
2831
  SDynQueryCtrlOperatorInfo* pDyn = pOper->info;
2,979,101✔
2832
  pOper->status = OP_NOT_OPENED;
2,979,901✔
2833

2834
  switch (pDyn->qType) {
2,979,901✔
UNCOV
2835
    case DYN_QTYPE_STB_HASH:{
×
UNCOV
2836
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
×
UNCOV
2837
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
×
UNCOV
2838
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
×
2839
      
UNCOV
2840
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
×
UNCOV
2841
      if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
2842
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
UNCOV
2843
        return code;
×
2844
      }
UNCOV
2845
      pStbJoin->ctx.prev.pListHead = NULL;
×
UNCOV
2846
      pStbJoin->ctx.prev.joinBuild = false;
×
UNCOV
2847
      pStbJoin->ctx.prev.pListTail = NULL;
×
UNCOV
2848
      pStbJoin->ctx.prev.tableNum = 0;
×
2849

UNCOV
2850
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
×
UNCOV
2851
      break; 
×
2852
    }
2853
    case DYN_QTYPE_VTB_SCAN: {
2,979,501✔
2854
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,979,501✔
2855
      
2856
      if (pVtbScan->orgTbVgColMap) {
2,979,101✔
UNCOV
2857
        taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
UNCOV
2858
        taosHashCleanup(pVtbScan->orgTbVgColMap);
×
UNCOV
2859
        pVtbScan->orgTbVgColMap = NULL;
×
2860
      }
2861
      if (pVtbScan->pRsp) {
2,979,101✔
UNCOV
2862
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
UNCOV
2863
        taosMemoryFreeClear(pVtbScan->pRsp);
×
2864
      }
2865
      if (pVtbScan->colRefInfo) {
2,979,501✔
2866
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
131,273✔
2867
        pVtbScan->colRefInfo = NULL;
131,273✔
2868
      }
2869
      if (pVtbScan->childTableMap) {
2,979,901✔
2870
        taosHashCleanup(pVtbScan->childTableMap);
10,392✔
2871
        pVtbScan->childTableMap = NULL;
10,392✔
2872
      }
2873
      if (pVtbScan->childTableList) {
2,979,501✔
2874
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,979,501✔
2875
      }
2876
      pVtbScan->curTableIdx = 0;
2,979,901✔
2877
      pVtbScan->lastTableIdx = -1;
2,979,901✔
2878
      break;
2,979,901✔
2879
    }
NEW
2880
    case DYN_QTYPE_VTB_WINDOW: {
×
NEW
2881
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
NEW
2882
      if (pVtbWindow->pRes) {
×
NEW
2883
        blockDataDestroy(pVtbWindow->pRes);
×
NEW
2884
        pVtbWindow->pRes = NULL;
×
2885
      }
NEW
2886
      if (pVtbWindow->pWins) {
×
NEW
2887
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
NEW
2888
        pVtbWindow->pWins = NULL;
×
2889
      }
NEW
2890
      pVtbWindow->outputWdurationSlotId = -1;
×
NEW
2891
      pVtbWindow->outputWendSlotId = -1;
×
NEW
2892
      pVtbWindow->outputWstartSlotId = -1;
×
NEW
2893
      pVtbWindow->curWinBatchIdx = 0;
×
NEW
2894
      break;
×
2895
    }
UNCOV
2896
    default:
×
UNCOV
2897
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
UNCOV
2898
      break;
×
2899
  }
2900
  return 0;
2,979,501✔
2901
}
2902

2903
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
650,300✔
2904
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
2905
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
2906
  QRY_PARAM_CHECK(pOptrInfo);
650,300✔
2907

2908
  int32_t                    code = TSDB_CODE_SUCCESS;
650,300✔
2909
  int32_t                    line = 0;
650,300✔
2910
  __optr_fn_t                nextFp = NULL;
650,300✔
2911
  __optr_open_fn_t           openFp = NULL;
650,300✔
2912
  SOperatorInfo*             pOperator = NULL;
650,300✔
2913
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
650,300✔
2914
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
650,300✔
2915

2916
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
650,300✔
2917
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
650,300✔
2918

2919
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
650,300✔
2920

2921
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
650,300✔
2922
  QUERY_CHECK_CODE(code, line, _error);
650,300✔
2923

2924
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
650,300✔
2925
                  pInfo, pTaskInfo);
2926

2927
  pInfo->qType = pPhyciNode->qType;
650,300✔
2928
  switch (pInfo->qType) {
650,300✔
2929
    case DYN_QTYPE_STB_HASH:
500,962✔
2930
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
500,962✔
2931
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
500,962✔
2932
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
500,962✔
2933
      QUERY_CHECK_CODE(code, line, _error);
500,962✔
2934
      nextFp = seqStableJoin;
500,962✔
2935
      openFp = optrDummyOpenFn;
500,962✔
2936
      break;
500,962✔
2937
    case DYN_QTYPE_VTB_SCAN:
149,338✔
2938
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
149,338✔
2939
      QUERY_CHECK_CODE(code, line, _error);
149,338✔
2940
      nextFp = vtbScanNext;
149,338✔
2941
      openFp = vtbScanOpen;
149,338✔
2942
      break;
149,338✔
NEW
2943
    case DYN_QTYPE_VTB_WINDOW:
×
NEW
2944
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
×
NEW
2945
      QUERY_CHECK_CODE(code, line, _error);
×
NEW
2946
      nextFp = vtbWindowNext;
×
NEW
2947
      openFp = vtbWindowOpen;
×
UNCOV
2948
      break;
×
UNCOV
2949
    default:
×
2950
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
2951
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2952
      goto _error;
×
2953
  }
2954

2955
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
650,300✔
2956
                                         NULL, optrDefaultGetNextExtFn, NULL);
2957

2958
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
650,300✔
2959
  *pOptrInfo = pOperator;
650,300✔
2960
  return TSDB_CODE_SUCCESS;
650,300✔
2961

2962
_error:
×
2963
  if (pInfo != NULL) {
×
2964
    destroyDynQueryCtrlOperator(pInfo);
×
2965
  }
UNCOV
2966
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
UNCOV
2967
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
2968
  pTaskInfo->code = code;
×
2969
  return code;
×
2970
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc