• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4872

04 Dec 2025 01:55AM UTC coverage: 64.678% (+0.02%) from 64.654%
#4872

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

880 of 2219 new or added lines in 36 files covered. (39.66%)

6146 existing lines in 122 files now uncovered.

159679 of 246882 relevant lines covered (64.68%)

110947965.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.72
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
505,939✔
41
  SStbJoinTableList* pNext = NULL;
505,939✔
42
  
43
  while (pListHead) {
506,505✔
44
    taosMemoryFree(pListHead->pLeftVg);
566✔
45
    taosMemoryFree(pListHead->pLeftUid);
566✔
46
    taosMemoryFree(pListHead->pRightVg);
566✔
47
    taosMemoryFree(pListHead->pRightUid);
566✔
48
    pNext = pListHead->pNext;
566✔
49
    taosMemoryFree(pListHead);
566✔
50
    pListHead = pNext;
566✔
51
  }
52
}
505,939✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
505,939✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
505,939✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
505,939✔
60
    if (pStbJoin->ctx.prev.leftHash) {
505,279✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
420,280✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
420,280✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
505,279✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
420,280✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
420,280✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
660✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
660✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
660✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
660✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
660✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
660✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
505,939✔
81
}
505,939✔
82

83
void destroyOrgTbInfo(void *info) {
1,236,360✔
84
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
1,236,360✔
85
  if (pOrgTbInfo) {
1,236,360✔
86
    taosArrayDestroy(pOrgTbInfo->colMap);
1,236,360✔
87
  }
88
}
1,236,360✔
89

90
void destroyColRefInfo(void *info) {
10,751,993✔
91
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
10,751,993✔
92
  if (pColRefInfo) {
10,751,993✔
93
    taosMemoryFree(pColRefInfo->colName);
10,751,993✔
94
    taosMemoryFree(pColRefInfo->colrefName);
10,751,993✔
95
  }
96
}
10,751,993✔
97

98
void destroyColRefArray(void *info) {
650,057✔
99
  SArray *pColRefArray = *(SArray **)info;
650,057✔
100
  if (pColRefArray) {
650,057✔
101
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
650,057✔
102
  }
103
}
650,057✔
104

105
void freeUseDbOutput(void* pOutput) {
240,383✔
106
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
240,383✔
107
  if (NULL == pOutput) {
240,383✔
108
    return;
×
109
  }
110

111
  if (pOut->dbVgroup) {
240,383✔
112
    freeVgInfo(pOut->dbVgroup);
240,383✔
113
  }
114
  taosMemFree(pOut);
240,383✔
115
}
116

117
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
251,160✔
118
  if (pVtbScan->dbName) {
251,160✔
119
    taosMemoryFreeClear(pVtbScan->dbName);
251,160✔
120
  }
121
  if (pVtbScan->tbName) {
251,160✔
122
    taosMemoryFreeClear(pVtbScan->tbName);
251,160✔
123
  }
124
  if (pVtbScan->childTableList) {
251,160✔
125
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
251,160✔
126
  }
127
  if (pVtbScan->colRefInfo) {
251,160✔
128
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
129
    pVtbScan->colRefInfo = NULL;
×
130
  }
131
  if (pVtbScan->childTableMap) {
251,160✔
132
    taosHashCleanup(pVtbScan->childTableMap);
206,372✔
133
  }
134
  if (pVtbScan->readColList) {
251,160✔
135
    taosArrayDestroy(pVtbScan->readColList);
251,160✔
136
  }
137
  if (pVtbScan->dbVgInfoMap) {
251,160✔
138
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
251,160✔
139
    taosHashCleanup(pVtbScan->dbVgInfoMap);
251,160✔
140
  }
141
  if (pVtbScan->orgTbVgColMap) {
251,160✔
142
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
143
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
144
  }
145
  if (pVtbScan->pRsp) {
251,160✔
146
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
147
    taosMemoryFreeClear(pVtbScan->pRsp);
×
148
  }
149
  if (pVtbScan->existOrgTbVg) {
251,160✔
150
    taosHashCleanup(pVtbScan->existOrgTbVg);
251,160✔
151
  }
152
  if (pVtbScan->curOrgTbVg) {
251,160✔
153
    taosHashCleanup(pVtbScan->curOrgTbVg);
2,794✔
154
  }
155
  if (pVtbScan->newAddedVgInfo) {
251,160✔
156
    taosHashCleanup(pVtbScan->newAddedVgInfo);
1,588✔
157
  }
158
}
251,160✔
159

NEW
160
void destroyWinArray(void *info) {
×
NEW
161
  SArray *pWinArray = *(SArray **)info;
×
NEW
162
  if (pWinArray) {
×
NEW
163
    taosArrayDestroy(pWinArray);
×
164
  }
NEW
165
}
×
166

NEW
167
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
×
NEW
168
  if (pVtbWindow->pRes) {
×
NEW
169
    blockDataDestroy(pVtbWindow->pRes);
×
170
  }
NEW
171
  if (pVtbWindow->pWins) {
×
NEW
172
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
173
  }
NEW
174
}
×
175

176
static void destroyDynQueryCtrlOperator(void* param) {
757,099✔
177
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
757,099✔
178

179
  switch (pDyn->qType) {
757,099✔
180
    case DYN_QTYPE_STB_HASH:
505,939✔
181
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
505,939✔
182
      break;
505,939✔
183
    case DYN_QTYPE_VTB_SCAN:
251,160✔
184
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
251,160✔
185
      break;
251,160✔
NEW
186
    case DYN_QTYPE_VTB_WINDOW:
×
NEW
187
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
×
NEW
188
      break;
×
189
    default:
×
190
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
191
      break;
×
192
  }
193

194
  taosMemoryFreeClear(param);
757,099✔
195
}
757,099✔
196

197
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
198
  if (batchFetch) {
3,205,328✔
199
    return true;
3,202,688✔
200
  }
201
  
202
  if (rightTable) {
2,640✔
203
    return pPost->rightCurrUid == pPost->rightNextUid;
1,320✔
204
  }
205

206
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
1,320✔
207

208
  return (NULL == num) ? false : true;
1,320✔
209
}
210

211
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
1,602,664✔
212
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,602,664✔
213
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
1,602,664✔
214
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,602,664✔
215
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
1,602,664✔
216
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
1,602,664✔
217
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
1,602,664✔
218
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
1,602,664✔
219
  int64_t                    readIdx = pNode->readIdx + 1;
1,602,664✔
220
  int64_t                    rightPrevUid = pPost->rightCurrUid;
1,602,664✔
221

222
  pPost->leftCurrUid = *leftUid;
1,602,664✔
223
  pPost->rightCurrUid = *rightUid;
1,602,664✔
224

225
  pPost->leftVgId = *leftVgId;
1,602,664✔
226
  pPost->rightVgId = *rightVgId;
1,602,664✔
227

228
  while (true) {
229
    if (readIdx < pNode->uidNum) {
1,602,664✔
230
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
1,517,571✔
231
      break;
1,517,571✔
232
    }
233
    
234
    pNode = pNode->pNext;
85,093✔
235
    if (NULL == pNode) {
85,093✔
236
      pPost->rightNextUid = 0;
85,093✔
237
      break;
85,093✔
238
    }
239
    
UNCOV
240
    rightUid = pNode->pRightUid;
×
UNCOV
241
    readIdx = 0;
×
242
  }
243

244
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
3,205,328✔
245
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
3,205,328✔
246

247
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
1,602,664✔
UNCOV
248
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
UNCOV
249
    pStbJoin->execInfo.rightCacheNum++;
×
250
  }  
251

252
  return TSDB_CODE_SUCCESS;
1,602,664✔
253
}
254

255

256
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
3,205,328✔
257
  int32_t code = TSDB_CODE_SUCCESS;
3,205,328✔
258
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,205,328✔
259
  if (NULL == *ppRes) {
3,205,328✔
260
    code = terrno;
×
UNCOV
261
    freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
262
    return code;
×
263
  }
264
  if (pChild) {
3,205,328✔
265
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
172,638✔
266
    if (NULL == (*ppRes)->pChildren) {
172,638✔
267
      code = terrno;
×
268
      freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
269
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
270
      *ppRes = NULL;
×
UNCOV
271
      return code;
×
272
    }
273
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
345,276✔
UNCOV
274
      code = terrno;
×
UNCOV
275
      freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
276
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
277
      *ppRes = NULL;
×
UNCOV
278
      return code;
×
279
    }
280
  } else {
281
    (*ppRes)->pChildren = NULL;
3,032,690✔
282
  }
283

284
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
3,205,328✔
285
  if (NULL == pGc) {
3,205,328✔
286
    code = terrno;
×
287
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
288
    *ppRes = NULL;
×
289
    return code;
×
290
  }
291

292
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
3,205,328✔
293
  pGc->downstreamIdx = downstreamIdx;
3,205,328✔
294
  pGc->vgId = vgId;
3,205,328✔
295
  pGc->tbUid = tbUid;
3,205,328✔
296
  pGc->needCache = needCache;
3,205,328✔
297

298
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
3,205,328✔
299
  (*ppRes)->downstreamIdx = downstreamIdx;
3,205,328✔
300
  (*ppRes)->value = pGc;
3,205,328✔
301
  (*ppRes)->reUse = false;
3,205,328✔
302

303
  return TSDB_CODE_SUCCESS;
3,205,328✔
304
}
305

306

307
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
308
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
309
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
310
  if (NULL == *ppRes) {
×
UNCOV
311
    return terrno;
×
312
  }
UNCOV
313
  (*ppRes)->pChildren = NULL;
×
314

UNCOV
315
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
UNCOV
316
  if (NULL == pGc) {
×
UNCOV
317
    code = terrno;
×
UNCOV
318
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
UNCOV
319
    return code;
×
320
  }
321

UNCOV
322
  pGc->downstreamIdx = downstreamIdx;
×
UNCOV
323
  pGc->vgId = vgId;
×
UNCOV
324
  pGc->tbUid = tbUid;
×
325

326
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
327
  (*ppRes)->downstreamIdx = downstreamIdx;
×
328
  (*ppRes)->value = pGc;
×
329
  (*ppRes)->reUse = false;
×
330

UNCOV
331
  return TSDB_CODE_SUCCESS;
×
332
}
333

334
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid);
NEW
335
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
NEW
336
  int32_t                   code = TSDB_CODE_SUCCESS;
×
NEW
337
  int32_t                   lino = 0;
×
NEW
338
  SExchangeOperatorParam*   pExc = NULL;
×
339

NEW
340
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
341
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
342

NEW
343
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
NEW
344
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
345

NEW
346
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
NEW
347
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
×
348

NEW
349
  pExc->multiParams = false;
×
NEW
350
  pExc->basic.vgId = 0;
×
NEW
351
  pExc->basic.tableSeq = true;
×
NEW
352
  pExc->basic.isVtbRefScan = false;
×
NEW
353
  pExc->basic.isVtbTagScan = false;
×
NEW
354
  pExc->basic.isVtbWinScan = true;
×
NEW
355
  pExc->basic.isNewParam = true;
×
NEW
356
  pExc->basic.window.skey = skey;
×
NEW
357
  pExc->basic.window.ekey = ekey;
×
NEW
358
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
NEW
359
  pExc->basic.colMap = NULL;
×
NEW
360
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
NEW
361
  QUERY_CHECK_NULL(pExc->basic.uidList, code, lino, _return, terrno)
×
362

NEW
363
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
NEW
364
  (*ppRes)->downstreamIdx = 0;
×
NEW
365
  (*ppRes)->reUse = true;
×
NEW
366
  (*ppRes)->value = pExc;
×
367

NEW
368
  return code;
×
NEW
369
_return:
×
NEW
370
  qError("failed to build exchange operator param for external window, code:%d, line:%d", code, lino);
×
NEW
371
  if (pExc) {
×
NEW
372
   if (pExc->basic.uidList) {
×
NEW
373
      taosArrayDestroy(pExc->basic.uidList);
×
374
    }
NEW
375
    taosMemoryFreeClear(pExc);
×
376
  }
NEW
377
  if (*ppRes) {
×
NEW
378
    if ((*ppRes)->pChildren) {
×
NEW
379
      taosArrayDestroy((*ppRes)->pChildren);
×
380
    }
NEW
381
    taosMemoryFreeClear(*ppRes);
×
382
  }
383

NEW
384
  return code;
×
385
}
386

387
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
2,640✔
388
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
2,640✔
389
  if (NULL == *ppRes) {
2,640✔
390
    return terrno;
×
391
  }
392
  (*ppRes)->pChildren = NULL;
2,640✔
393
  
394
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
2,640✔
395
  if (NULL == pExc) {
2,640✔
UNCOV
396
    return terrno;
×
397
  }
398

399
  pExc->multiParams = false;
2,640✔
400
  pExc->basic.vgId = *pVgId;
2,640✔
401
  pExc->basic.tableSeq = true;
2,640✔
402
  pExc->basic.isVtbRefScan = false;
2,640✔
403
  pExc->basic.isVtbWinScan = false;
2,640✔
404
  pExc->basic.isVtbTagScan = false;
2,640✔
405
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
2,640✔
406
  pExc->basic.colMap = NULL;
2,640✔
407
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
2,640✔
408
  if (NULL == pExc->basic.uidList) {
2,640✔
409
    taosMemoryFree(pExc);
×
410
    return terrno;
×
411
  }
412
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
5,280✔
413
    taosArrayDestroy(pExc->basic.uidList);
×
UNCOV
414
    taosMemoryFree(pExc);
×
415
    return terrno;
×
416
  }
417

418
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
2,640✔
419
  (*ppRes)->downstreamIdx = downstreamIdx;
2,640✔
420
  (*ppRes)->value = pExc;
2,640✔
421
  (*ppRes)->reUse = false;
2,640✔
422

423
  return TSDB_CODE_SUCCESS;
2,640✔
424
}
425

426
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
155,947✔
427
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
155,947✔
428
  if (NULL == *ppRes) {
155,947✔
429
    return terrno;
×
430
  }
431
  (*ppRes)->pChildren = NULL;
155,947✔
432
  
433
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
155,947✔
434
  if (NULL == pExc) {
155,947✔
435
    taosMemoryFreeClear(*ppRes);
×
436
    return terrno;
×
437
  }
438

439
  pExc->multiParams = true;
155,947✔
440
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
155,947✔
441
  if (NULL == pExc->pBatchs) {
155,947✔
442
    taosMemoryFree(pExc);
×
443
    taosMemoryFreeClear(*ppRes);
×
444
    return terrno;
×
445
  }
446
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
155,947✔
447
  
448
  SExchangeOperatorBasicParam basic;
155,947✔
449
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
155,947✔
450

451
  int32_t iter = 0;
155,947✔
452
  void* p = NULL;
155,947✔
453
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
428,328✔
454
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
272,381✔
455
    SArray* pUidList = *(SArray**)p;
272,381✔
456
    basic.vgId = *pVgId;
272,381✔
457
    basic.uidList = pUidList;
272,381✔
458
    basic.colMap = NULL;
272,381✔
459
    basic.tableSeq = false;
272,381✔
460
    basic.isVtbRefScan = false;
272,381✔
461
    basic.isVtbWinScan = false;
272,381✔
462
    basic.isVtbTagScan = false;
272,381✔
463
    
464
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
272,381✔
465

466
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
272,381✔
467
    *(SArray**)p = NULL;
272,381✔
468
  }
469

470
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
155,947✔
471
  (*ppRes)->downstreamIdx = downstreamIdx;
155,947✔
472
  (*ppRes)->value = pExc;
155,947✔
473
  (*ppRes)->reUse = false;
155,947✔
474

475
  return TSDB_CODE_SUCCESS;
155,947✔
476
}
477

478
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
766,368✔
479
  int32_t                      code = TSDB_CODE_SUCCESS;
766,368✔
480
  int32_t                      lino = 0;
766,368✔
481
  SExchangeOperatorParam*      pExc = NULL;
766,368✔
482
  SExchangeOperatorBasicParam* basic = NULL;
766,368✔
483

484
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
766,368✔
485
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
766,368✔
486
  (*ppRes)->pChildren = NULL;
766,368✔
487

488
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
766,368✔
489
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
766,368✔
490

491
  pExc->multiParams = false;
766,368✔
492

493
  basic = &pExc->basic;
766,368✔
494
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
766,368✔
495

496
  basic->vgId = vgId;
766,368✔
497
  basic->tableSeq = false;
766,368✔
498
  basic->isVtbRefScan = false;
766,368✔
499
  basic->isVtbTagScan = true;
766,368✔
500
  basic->isVtbWinScan = false;
766,368✔
501
  basic->isNewDeployed = false;
766,368✔
502
  basic->colMap = NULL;
766,368✔
503

504
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
766,368✔
505
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
765,967✔
506
  QUERY_CHECK_NULL(taosArrayPush(basic->uidList, &uid), code, lino, _return, terrno)
1,532,736✔
507

508
  (*ppRes)->pChildren = NULL;
766,368✔
509

510
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
765,967✔
511
  (*ppRes)->downstreamIdx = downstreamIdx;
766,368✔
512
  (*ppRes)->value = pExc;
766,368✔
513
  (*ppRes)->reUse = true;
766,368✔
514

515
  return TSDB_CODE_SUCCESS;
766,368✔
516

517
_return:
×
518
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
519
  taosMemoryFreeClear(*ppRes);
×
UNCOV
520
  if (basic) {
×
UNCOV
521
    if (basic->colMap) {
×
UNCOV
522
      taosArrayDestroy(basic->colMap->colMap);
×
UNCOV
523
      taosMemoryFreeClear(basic->colMap);
×
524
    }
UNCOV
525
    if (basic->uidList) {
×
UNCOV
526
      taosArrayDestroy(basic->uidList);
×
527
    }
UNCOV
528
    taosMemoryFreeClear(basic);
×
529
  }
UNCOV
530
  taosMemoryFreeClear(pExc);
×
UNCOV
531
  return code;
×
532
}
533

534
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
1,231,993✔
535
  int32_t                      code = TSDB_CODE_SUCCESS;
1,231,993✔
536
  int32_t                      lino = 0;
1,231,993✔
537
  SExchangeOperatorParam*      pExc = NULL;
1,231,993✔
538
  SExchangeOperatorBasicParam* basic = NULL;
1,231,993✔
539

540
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,231,993✔
541
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
1,231,993✔
542
  (*ppRes)->pChildren = NULL;
1,231,993✔
543

544
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
1,231,993✔
545
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
1,231,993✔
546

547
  pExc->multiParams = false;
1,231,993✔
548

549
  basic = &pExc->basic;
1,231,993✔
550
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1,231,993✔
551

552
  basic->vgId = pMap->vgId;
1,231,993✔
553
  basic->tableSeq = false;
1,231,993✔
554
  basic->isVtbRefScan = true;
1,231,993✔
555
  basic->isVtbWinScan = false;
1,231,993✔
556
  basic->isVtbTagScan = false;
1,231,993✔
557
  basic->isNewDeployed = false;
1,231,993✔
558
  basic->isNewParam = true;
1,231,993✔
559
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
1,231,993✔
560
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
1,231,993✔
561
  basic->colMap->vgId = pMap->vgId;
1,231,993✔
562
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
1,231,993✔
563
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
1,231,993✔
564
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
1,231,993✔
565

566
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
1,231,993✔
567
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
1,231,993✔
568

569
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
1,231,993✔
570
  (*ppRes)->downstreamIdx = downstreamIdx;
1,231,993✔
571
  (*ppRes)->value = pExc;
1,231,993✔
572
  (*ppRes)->reUse = true;
1,231,993✔
573

574
  return TSDB_CODE_SUCCESS;
1,231,993✔
575

UNCOV
576
_return:
×
UNCOV
577
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
578
  taosMemoryFreeClear(*ppRes);
×
UNCOV
579
  if (basic) {
×
UNCOV
580
    if (basic->colMap) {
×
UNCOV
581
      taosArrayDestroy(basic->colMap->colMap);
×
UNCOV
582
      taosMemoryFreeClear(basic->colMap);
×
583
    }
UNCOV
584
    if (basic->uidList) {
×
UNCOV
585
      taosArrayDestroy(basic->uidList);
×
586
    }
UNCOV
587
    taosMemoryFreeClear(basic);
×
588
  }
UNCOV
589
  taosMemoryFreeClear(pExc);
×
UNCOV
590
  return code;
×
591
}
592

593
static int32_t buildExchangeOperatorParamForVScanEx(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap, uint64_t taskId, SStreamTaskAddr* pTaskAddr) {
4,367✔
594
  int32_t                      code = TSDB_CODE_SUCCESS;
4,367✔
595
  int32_t                      lino = 0;
4,367✔
596
  SExchangeOperatorParam*      pExc = NULL;
4,367✔
597
  SExchangeOperatorBasicParam* basic = NULL;
4,367✔
598

599
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,367✔
600
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,367✔
601
  (*ppRes)->pChildren = NULL;
4,367✔
602

603
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
4,367✔
604
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
4,367✔
605

606
  pExc->multiParams = false;
4,367✔
607

608
  basic = &pExc->basic;
4,367✔
609
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
4,367✔
610

611
  basic->vgId = pMap->vgId;
4,367✔
612
  basic->tableSeq = false;
4,367✔
613
  basic->isVtbRefScan = true;
4,367✔
614
  basic->isVtbWinScan = false;
4,367✔
615
  basic->isVtbTagScan = false;
4,367✔
616
  basic->isNewDeployed = true;
4,367✔
617
  basic->isNewParam = true;
4,367✔
618
  basic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
4,367✔
619
  basic->newDeployedSrc.clientId = taskId;// current task's taskid
4,367✔
620
  basic->newDeployedSrc.taskId = pTaskAddr->taskId;
4,367✔
621
  basic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
4,367✔
622
  basic->newDeployedSrc.localExec = false;
4,367✔
623
  basic->newDeployedSrc.addr.nodeId = pTaskAddr->nodeId;
4,367✔
624
  memcpy(&basic->newDeployedSrc.addr.epSet, &pTaskAddr->epset, sizeof(SEpSet));
4,367✔
625
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
4,367✔
626
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
4,367✔
627
  basic->colMap->vgId = pMap->vgId;
4,367✔
628
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
4,367✔
629
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
4,367✔
630
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
4,367✔
631

632
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
4,367✔
633
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
4,367✔
634

635
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
4,367✔
636
  (*ppRes)->downstreamIdx = downstreamIdx;
4,367✔
637
  (*ppRes)->value = pExc;
4,367✔
638
  (*ppRes)->reUse = true;
4,367✔
639

640
  return TSDB_CODE_SUCCESS;
4,367✔
641

UNCOV
642
_return:
×
UNCOV
643
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
644
  taosMemoryFreeClear(*ppRes);
×
UNCOV
645
  if (basic) {
×
UNCOV
646
    if (basic->colMap) {
×
UNCOV
647
      taosArrayDestroy(basic->colMap->colMap);
×
UNCOV
648
      taosMemoryFreeClear(basic->colMap);
×
649
    }
UNCOV
650
    if (basic->uidList) {
×
UNCOV
651
      taosArrayDestroy(basic->uidList);
×
652
    }
UNCOV
653
    taosMemoryFreeClear(basic);
×
654
  }
655
  taosMemoryFreeClear(pExc);
×
656
  return code;
×
657
}
658

659
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
1,602,664✔
660
  int32_t code = TSDB_CODE_SUCCESS;
1,602,664✔
661
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,602,664✔
662
  if (NULL == *ppRes) {
1,602,664✔
663
    code = terrno;
×
UNCOV
664
    return code;
×
665
  }
666
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
1,602,664✔
667
  if (NULL == (*ppRes)->pChildren) {
1,602,664✔
668
    code = terrno;
×
UNCOV
669
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
670
    *ppRes = NULL;
×
UNCOV
671
    return code;
×
672
  }
673
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
3,205,328✔
UNCOV
674
    code = terrno;
×
UNCOV
675
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
676
    *ppRes = NULL;
×
UNCOV
677
    return code;
×
678
  }
679
  *ppChild0 = NULL;
1,602,664✔
680
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
3,205,328✔
UNCOV
681
    code = terrno;
×
UNCOV
682
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
683
    *ppRes = NULL;
×
UNCOV
684
    return code;
×
685
  }
686
  *ppChild1 = NULL;
1,602,664✔
687
  
688
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
1,602,664✔
689
  if (NULL == pJoin) {
1,602,664✔
UNCOV
690
    code = terrno;
×
UNCOV
691
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
692
    *ppRes = NULL;
×
UNCOV
693
    return code;
×
694
  }
695

696
  pJoin->initDownstream = initParam;
1,602,664✔
697
  
698
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
1,602,664✔
699
  (*ppRes)->value = pJoin;
1,602,664✔
700
  (*ppRes)->reUse = false;
1,602,664✔
701

702
  return TSDB_CODE_SUCCESS;
1,602,664✔
703
}
704

UNCOV
705
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
UNCOV
706
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
707
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
708
  if (NULL == *ppRes) {
×
UNCOV
709
    code = terrno;
×
UNCOV
710
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
711
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
712
    return code;
×
713
  }
UNCOV
714
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
UNCOV
715
  if (NULL == *ppRes) {
×
UNCOV
716
    code = terrno;
×
UNCOV
717
    taosMemoryFreeClear(*ppRes);
×
UNCOV
718
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
719
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
720
    return code;
×
721
  }
722
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
723
    code = terrno;
×
724
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
725
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
726
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
727
    *ppRes = NULL;
×
728
    return code;
×
729
  }
UNCOV
730
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
731
    code = terrno;
×
UNCOV
732
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
733
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
734
    *ppRes = NULL;
×
UNCOV
735
    return code;
×
736
  }
737
  
UNCOV
738
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
UNCOV
739
  (*ppRes)->value = NULL;
×
UNCOV
740
  (*ppRes)->reUse = false;
×
741

742
  return TSDB_CODE_SUCCESS;
×
743
}
744

745
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
14,051✔
746
  int32_t code = TSDB_CODE_SUCCESS;
14,051✔
747
  int32_t vgNum = tSimpleHashGetSize(pVg);
14,051✔
748
  if (vgNum <= 0 || vgNum > 1) {
14,051✔
749
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
UNCOV
750
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
751
  }
752

753
  int32_t iter = 0;
14,051✔
754
  void* p = NULL;
14,051✔
755
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
28,102✔
756
    SArray* pUidList = *(SArray**)p;
14,051✔
757

758
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
14,051✔
759
    if (code) {
14,051✔
760
      return code;
×
761
    }
762
    taosArrayDestroy(pUidList);
14,051✔
763
    *(SArray**)p = NULL;
14,051✔
764
  }
765
  
766
  return TSDB_CODE_SUCCESS;
14,051✔
767
}
768

769

770
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
771
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
UNCOV
772
  if (NULL == pUidList) {
×
UNCOV
773
    return terrno;
×
774
  }
UNCOV
775
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
UNCOV
776
    return terrno;
×
777
  }
778

UNCOV
779
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
UNCOV
780
  taosArrayDestroy(pUidList);
×
UNCOV
781
  if (code) {
×
UNCOV
782
    return code;
×
783
  }
784
  
785
  return TSDB_CODE_SUCCESS;
×
786
}
787

788
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
1,602,664✔
789
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
1,602,664✔
790
  SOperatorParam*             pSrcParam0 = NULL;
1,602,664✔
791
  SOperatorParam*             pSrcParam1 = NULL;
1,602,664✔
792
  SOperatorParam*             pGcParam0 = NULL;
1,602,664✔
793
  SOperatorParam*             pGcParam1 = NULL;  
1,602,664✔
794
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
1,602,664✔
795
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
1,602,664✔
796
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
1,602,664✔
797
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
1,602,664✔
798
  int32_t                     code = TSDB_CODE_SUCCESS;
1,602,664✔
799

800
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
1,602,664✔
801
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
802

803
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
1,602,664✔
804
  
805
  if (pInfo->stbJoin.basic.batchFetch) {
1,602,664✔
806
    if (pPrev->leftHash) {
1,601,344✔
807
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
84,999✔
808
      if (TSDB_CODE_SUCCESS == code) {
84,999✔
809
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
84,999✔
810
      }
811
      if (TSDB_CODE_SUCCESS == code) {
84,999✔
812
        tSimpleHashCleanup(pPrev->leftHash);
84,999✔
813
        tSimpleHashCleanup(pPrev->rightHash);
84,999✔
814
        pPrev->leftHash = NULL;
84,999✔
815
        pPrev->rightHash = NULL;
84,999✔
816
      }
817
    }
818
  } else {
819
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
1,320✔
820
    if (TSDB_CODE_SUCCESS == code) {
1,320✔
821
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
1,320✔
822
    }
823
  }
824

825
  bool initParam = pSrcParam0 ? true : false;
1,602,664✔
826
  if (TSDB_CODE_SUCCESS == code) {
1,602,664✔
827
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
1,602,664✔
828
    pSrcParam0 = NULL;
1,602,664✔
829
  }
830
  if (TSDB_CODE_SUCCESS == code) {
1,602,664✔
831
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
1,602,664✔
832
    pSrcParam1 = NULL;
1,602,664✔
833
  }
834
  if (TSDB_CODE_SUCCESS == code) {
1,602,664✔
835
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
1,602,664✔
836
  }
837
  if (TSDB_CODE_SUCCESS != code) {
1,602,664✔
838
    if (pSrcParam0) {
×
UNCOV
839
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
840
    }
UNCOV
841
    if (pSrcParam1) {
×
UNCOV
842
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
843
    }
UNCOV
844
    if (pGcParam0) {
×
UNCOV
845
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
846
    }
UNCOV
847
    if (pGcParam1) {
×
848
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
849
    }
850
    if (*ppParam) {
×
851
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
UNCOV
852
      *ppParam = NULL;
×
853
    }
854
  }
855
  
856
  return code;
1,602,664✔
857
}
858

859
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,602,664✔
860
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,602,664✔
861
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,602,664✔
862
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,602,664✔
863
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
1,602,664✔
864
  SOperatorParam*            pParam = NULL;
1,602,664✔
865
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
1,602,664✔
866
  if (TSDB_CODE_SUCCESS != code) {
1,602,664✔
UNCOV
867
    pOperator->pTaskInfo->code = code;
×
UNCOV
868
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
869
  }
870

871
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
1,602,664✔
872
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
1,602,664✔
873
  if (*ppRes && (code == 0)) {
1,602,664✔
874
    code = blockDataCheck(*ppRes);
242,724✔
875
    if (code) {
242,724✔
UNCOV
876
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
UNCOV
877
      pOperator->pTaskInfo->code = code;
×
UNCOV
878
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
879
    }
880
    pPost->isStarted = true;
242,724✔
881
    pStbJoin->execInfo.postBlkNum++;
242,724✔
882
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
242,724✔
883
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
242,724✔
884
  } else {
885
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
1,359,940✔
886
  }
887
}
1,602,664✔
888

889

UNCOV
890
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
UNCOV
891
  SOperatorParam* pGcParam = NULL;
×
UNCOV
892
  SOperatorParam* pMergeJoinParam = NULL;
×
UNCOV
893
  int32_t         downstreamId = leftTable ? 0 : 1;
×
UNCOV
894
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
UNCOV
895
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
896

UNCOV
897
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
898

UNCOV
899
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
UNCOV
900
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
901
    return code;
×
902
  }
UNCOV
903
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
UNCOV
904
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
905
    return code;
×
906
  }
907

UNCOV
908
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
909
}
910

911
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
1,602,098✔
912
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
1,602,098✔
913
  int32_t code = 0;
1,602,098✔
914
  
915
  pPost->isStarted = false;
1,602,098✔
916
  
917
  if (pStbJoin->basic.batchFetch) {
1,602,098✔
918
    return TSDB_CODE_SUCCESS;
1,600,778✔
919
  }
920
  
921
  if (pPost->leftNeedCache) {
1,320✔
922
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
923
    if (num && --(*num) <= 0) {
×
UNCOV
924
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
925
      if (code) {
×
926
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
UNCOV
927
        QRY_ERR_RET(code);
×
928
      }
929
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
930
    }
931
  }
932
  
933
  if (!pPost->rightNeedCache) {
1,320✔
934
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
1,320✔
935
    if (NULL != v) {
1,320✔
UNCOV
936
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
UNCOV
937
      if (code) {
×
UNCOV
938
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
UNCOV
939
        QRY_ERR_RET(code);
×
940
      }
UNCOV
941
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
942
    }
943
  }
944

945
  return TSDB_CODE_SUCCESS;
1,320✔
946
}
947

948

949
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
950
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
327,817✔
951
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
327,817✔
952
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
327,817✔
953

954
  if (!pPost->isStarted) {
327,817✔
955
    return TSDB_CODE_SUCCESS;
85,659✔
956
  }
957
  
958
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
242,158✔
959
  
960
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
242,158✔
961
  if (NULL == *ppRes) {
242,158✔
962
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
242,158✔
963
    pPrev->pListHead->readIdx++;
242,158✔
964
  } else {
UNCOV
965
    pInfo->stbJoin.execInfo.postBlkNum++;
×
UNCOV
966
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
967
  }
968

969
  return TSDB_CODE_SUCCESS;
242,158✔
970
}
971

972
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
973
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
3,207,216✔
974
  if (NULL == ppArray) {
3,207,216✔
975
    SArray* pArray = taosArrayInit(10, valSize);
286,432✔
976
    if (NULL == pArray) {
286,432✔
977
      return terrno;
×
978
    }
979
    if (NULL == taosArrayPush(pArray, pVal)) {
572,864✔
UNCOV
980
      taosArrayDestroy(pArray);
×
981
      return terrno;
×
982
    }
983
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
286,432✔
UNCOV
984
      taosArrayDestroy(pArray);      
×
UNCOV
985
      return terrno;
×
986
    }
987
    return TSDB_CODE_SUCCESS;
286,432✔
988
  }
989

990
  if (NULL == taosArrayPush(*ppArray, pVal)) {
5,841,568✔
UNCOV
991
    return terrno;
×
992
  }
993
  
994
  return TSDB_CODE_SUCCESS;
2,920,784✔
995
}
996

997
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
998
  int32_t code = TSDB_CODE_SUCCESS;
1,320✔
999
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
1,320✔
1000
  if (NULL == pNum) {
1,320✔
1001
    uint32_t n = 1;
1,320✔
1002
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
1,320✔
1003
    if (code) {
1,320✔
1004
      return code;
×
1005
    }
1006
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
1,320✔
1007
    if (code) {
1,320✔
UNCOV
1008
      return code;
×
1009
    }
1010
    return TSDB_CODE_SUCCESS;
1,320✔
1011
  }
1012

UNCOV
1013
  switch (*pNum) {
×
1014
    case 0:
×
1015
      break;
×
1016
    case UINT32_MAX:
×
1017
      *pNum = 0;
×
UNCOV
1018
      break;
×
1019
    default:
×
UNCOV
1020
      if (1 == (*pNum)) {
×
UNCOV
1021
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
UNCOV
1022
        if (code) {
×
UNCOV
1023
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
UNCOV
1024
          QRY_ERR_RET(code);
×
1025
        }
1026
      }
UNCOV
1027
      (*pNum)++;
×
UNCOV
1028
      break;
×
1029
  }
1030
  
UNCOV
1031
  return TSDB_CODE_SUCCESS;
×
1032
}
1033

1034

1035
static void freeStbJoinTableList(SStbJoinTableList* pList) {
85,093✔
1036
  if (NULL == pList) {
85,093✔
UNCOV
1037
    return;
×
1038
  }
1039
  taosMemoryFree(pList->pLeftVg);
85,093✔
1040
  taosMemoryFree(pList->pLeftUid);
85,093✔
1041
  taosMemoryFree(pList->pRightVg);
85,093✔
1042
  taosMemoryFree(pList->pRightUid);
85,093✔
1043
  taosMemoryFree(pList);
85,093✔
1044
}
1045

1046
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
85,659✔
1047
  int32_t code = TSDB_CODE_SUCCESS;
85,659✔
1048
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
85,659✔
1049
  if (NULL == pNew) {
85,659✔
UNCOV
1050
    return terrno;
×
1051
  }
1052
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
85,659✔
1053
  if (NULL == pNew->pLeftVg) {
85,659✔
UNCOV
1054
    code = terrno;
×
1055
    freeStbJoinTableList(pNew);
×
UNCOV
1056
    return code;
×
1057
  }
1058
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
85,659✔
1059
  if (NULL == pNew->pLeftUid) {
85,659✔
UNCOV
1060
    code = terrno;
×
UNCOV
1061
    freeStbJoinTableList(pNew);
×
1062
    return code;
×
1063
  }
1064
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
85,659✔
1065
  if (NULL == pNew->pRightVg) {
85,659✔
UNCOV
1066
    code = terrno;
×
UNCOV
1067
    freeStbJoinTableList(pNew);
×
UNCOV
1068
    return code;
×
1069
  }
1070
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
85,659✔
1071
  if (NULL == pNew->pRightUid) {
85,659✔
UNCOV
1072
    code = terrno;
×
UNCOV
1073
    freeStbJoinTableList(pNew);
×
UNCOV
1074
    return code;
×
1075
  }
1076

1077
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
85,659✔
1078
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
85,659✔
1079
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
85,659✔
1080
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
85,659✔
1081

1082
  pNew->readIdx = 0;
85,659✔
1083
  pNew->uidNum = rows;
85,659✔
1084
  pNew->pNext = NULL;
85,659✔
1085
  
1086
  if (pCtx->pListTail) {
85,659✔
UNCOV
1087
    pCtx->pListTail->pNext = pNew;
×
UNCOV
1088
    pCtx->pListTail = pNew;
×
1089
  } else {
1090
    pCtx->pListHead = pNew;
85,659✔
1091
    pCtx->pListTail= pNew;
85,659✔
1092
  }
1093

1094
  return TSDB_CODE_SUCCESS;
85,659✔
1095
}
1096

1097
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
85,659✔
1098
  int32_t                    code = TSDB_CODE_SUCCESS;
85,659✔
1099
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
85,659✔
1100
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
85,659✔
1101
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
85,659✔
1102
  if (NULL == pVg0) {
85,659✔
UNCOV
1103
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1104
  }
1105
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
85,659✔
1106
  if (NULL == pVg1) {
85,659✔
UNCOV
1107
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1108
  }
1109
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
85,659✔
1110
  if (NULL == pUid0) {
85,659✔
UNCOV
1111
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1112
  }
1113
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
85,659✔
1114
  if (NULL == pUid1) {
85,659✔
1115
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1116
  }
1117

1118
  if (pStbJoin->basic.batchFetch) {
85,659✔
1119
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
1,688,607✔
1120
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
1,603,608✔
1121
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
1,603,608✔
1122
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
1,603,608✔
1123
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
1,603,608✔
1124

1125
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
1,603,608✔
1126
      if (TSDB_CODE_SUCCESS != code) {
1,603,608✔
UNCOV
1127
        break;
×
1128
      }
1129
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
1,603,608✔
1130
      if (TSDB_CODE_SUCCESS != code) {
1,603,608✔
UNCOV
1131
        break;
×
1132
      }
1133
    }
1134
  } else {
1135
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
1,980✔
1136
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
1,320✔
1137
    
1138
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
1,320✔
1139
      if (TSDB_CODE_SUCCESS != code) {
1,320✔
1140
        break;
×
1141
      }
1142
    }
1143
  }
1144

1145
  if (TSDB_CODE_SUCCESS == code) {
85,659✔
1146
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
85,659✔
1147
    if (TSDB_CODE_SUCCESS == code) {
85,659✔
1148
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
85,659✔
1149
    }
1150
  }
1151

1152
_return:
×
1153

1154
  if (TSDB_CODE_SUCCESS != code) {
85,659✔
UNCOV
1155
    pOperator->pTaskInfo->code = code;
×
UNCOV
1156
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1157
  }
1158
}
85,659✔
1159

1160

1161
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
505,939✔
1162
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
505,939✔
1163
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
505,939✔
1164

1165
  if (pStbJoin->basic.batchFetch) {
505,939✔
1166
    return;
505,279✔
1167
  }
1168

1169
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
660✔
1170
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
660✔
1171
    return;
660✔
1172
  }
1173

UNCOV
1174
  uint64_t* pUid = NULL;
×
UNCOV
1175
  int32_t iter = 0;
×
UNCOV
1176
  int32_t code = 0;
×
UNCOV
1177
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
UNCOV
1178
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
UNCOV
1179
    if (code) {
×
UNCOV
1180
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1181
    }
1182
  }
1183

UNCOV
1184
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1185
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1186

1187
/*
1188
  // debug only
1189
  iter = 0;
1190
  uint32_t* num = NULL;
1191
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1192
    A S S E R T(*num > 1);
1193
  }
1194
*/  
1195
}
1196

1197
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
505,939✔
1198
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
505,939✔
1199
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
505,939✔
1200

1201
  while (true) {
85,659✔
1202
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
591,598✔
1203
    if (NULL == pBlock) {
591,598✔
1204
      break;
505,939✔
1205
    }
1206

1207
    pStbJoin->execInfo.prevBlkNum++;
85,659✔
1208
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
85,659✔
1209
    
1210
    doBuildStbJoinTableHash(pOperator, pBlock);
85,659✔
1211
  }
1212

1213
  postProcessStbJoinTableHash(pOperator);
505,939✔
1214

1215
  pStbJoin->ctx.prev.joinBuild = true;
505,939✔
1216
}
505,939✔
1217

1218
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
327,817✔
1219
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
327,817✔
1220
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
327,817✔
1221
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
327,817✔
1222
  SStbJoinTableList*         pNode = pPrev->pListHead;
327,817✔
1223

1224
  while (pNode) {
1,772,850✔
1225
    if (pNode->readIdx >= pNode->uidNum) {
1,687,757✔
1226
      pPrev->pListHead = pNode->pNext;
85,093✔
1227
      freeStbJoinTableList(pNode);
85,093✔
1228
      pNode = pPrev->pListHead;
85,093✔
1229
      continue;
85,093✔
1230
    }
1231
    
1232
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
1,602,664✔
1233
    if (*ppRes) {
1,602,664✔
1234
      return TSDB_CODE_SUCCESS;
242,724✔
1235
    }
1236

1237
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
1,359,940✔
1238
    pPrev->pListHead->readIdx++;
1,359,940✔
1239
  }
1240

1241
  *ppRes = NULL;
85,093✔
1242
  setOperatorCompleted(pOperator);
85,093✔
1243

1244
  return TSDB_CODE_SUCCESS;
85,093✔
1245
}
1246

1247
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
748,097✔
1248
  if (pBlock) {
748,097✔
1249
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
242,724✔
1250
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
242,724✔
1251
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
242,724✔
1252

1253
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
245,484✔
1254
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,760✔
1255
        if (pSlot == NULL) {
2,760✔
1256
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1257
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1258
        }
1259
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,760✔
1260
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,760✔
1261
        if (code != TSDB_CODE_SUCCESS) {
2,760✔
1262
          return code;
×
1263
        }
1264
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,760✔
1265
        if (code != TSDB_CODE_SUCCESS) {
2,760✔
UNCOV
1266
          return code;
×
1267
        }
1268
      }
1269
    } else {
UNCOV
1270
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
UNCOV
1271
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1272
    }
1273
  }
1274
  return TSDB_CODE_SUCCESS;
748,097✔
1275
}
1276

1277
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
761,861✔
1278
  int32_t                    code = TSDB_CODE_SUCCESS;
761,861✔
1279
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
761,861✔
1280
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
761,861✔
1281

1282
  QRY_PARAM_CHECK(pRes);
761,861✔
1283
  if (pOperator->status == OP_EXEC_DONE) {
761,861✔
1284
    return code;
13,764✔
1285
  }
1286

1287
  int64_t st = 0;
748,097✔
1288
  if (pOperator->cost.openCost == 0) {
748,097✔
1289
    st = taosGetTimestampUs();
505,939✔
1290
  }
1291

1292
  if (!pStbJoin->ctx.prev.joinBuild) {
748,097✔
1293
    buildStbJoinTableList(pOperator);
505,939✔
1294
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
505,939✔
1295
      setOperatorCompleted(pOperator);
420,280✔
1296
      goto _return;
420,280✔
1297
    }
1298
  }
1299

1300
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
327,817✔
1301
  if (*pRes) {
327,817✔
UNCOV
1302
    goto _return;
×
1303
  }
1304

1305
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
327,817✔
1306

1307
_return:
327,817✔
1308
  if (pOperator->cost.openCost == 0) {
748,097✔
1309
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
505,939✔
1310
  }
1311

1312
  if (code) {
748,097✔
UNCOV
1313
    qError("%s failed since %s", __func__, tstrerror(code));
×
UNCOV
1314
    pOperator->pTaskInfo->code = code;
×
UNCOV
1315
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1316
  } else {
1317
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
748,097✔
1318
  }
1319
  return code;
748,097✔
1320
}
1321

1322
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
766,368✔
1323
  int32_t                   code = TSDB_CODE_SUCCESS;
766,368✔
1324
  int32_t                   lino = 0;
766,368✔
1325
  SVTableScanOperatorParam* pVScan = NULL;
766,368✔
1326
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
766,368✔
1327
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
766,368✔
1328

1329
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
766,368✔
1330
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
766,368✔
1331

1332
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
766,368✔
1333
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
766,368✔
1334
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
766,368✔
1335
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
766,368✔
1336
  pVScan->uid = uid;
766,368✔
1337
  pVScan->window = pInfo->vtbScan.window;
766,368✔
1338

1339
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
766,368✔
1340
  (*ppRes)->downstreamIdx = 0;
766,368✔
1341
  (*ppRes)->value = pVScan;
766,368✔
1342
  (*ppRes)->reUse = false;
766,368✔
1343

1344
  return TSDB_CODE_SUCCESS;
766,368✔
1345
_return:
×
UNCOV
1346
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1347
  if (pVScan) {
×
UNCOV
1348
    taosArrayDestroy(pVScan->pOpParamArray);
×
1349
    taosMemoryFreeClear(pVScan);
×
1350
  }
UNCOV
1351
  if (*ppRes) {
×
UNCOV
1352
    taosArrayDestroy((*ppRes)->pChildren);
×
UNCOV
1353
    taosMemoryFreeClear(*ppRes);
×
1354
  }
UNCOV
1355
  return code;
×
1356
}
1357

1358
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
240,383✔
1359
  int32_t                    lino = 0;
240,383✔
1360
  SOperatorInfo*             operator=(SOperatorInfo*) param;
240,383✔
1361
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
240,383✔
1362

1363
  if (TSDB_CODE_SUCCESS != code) {
240,383✔
UNCOV
1364
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
UNCOV
1365
    if (operator->pTaskInfo->code != code) {
×
UNCOV
1366
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1367
             tstrerror(operator->pTaskInfo->code));
1368
    } else {
UNCOV
1369
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1370
    }
UNCOV
1371
    goto _return;
×
1372
  }
1373

1374
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
240,383✔
1375
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
240,383✔
1376

1377
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
240,383✔
1378
  QUERY_CHECK_CODE(code, lino, _return);
240,383✔
1379

1380
  taosMemoryFreeClear(pMsg->pData);
240,383✔
1381

1382
  code = tsem_post(&pScanResInfo->vtbScan.ready);
240,383✔
1383
  QUERY_CHECK_CODE(code, lino, _return);
240,383✔
1384

1385
  return code;
240,383✔
UNCOV
1386
_return:
×
UNCOV
1387
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1388
  return code;
×
1389
}
1390

1391
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
240,383✔
1392
  int32_t                    code = TSDB_CODE_SUCCESS;
240,383✔
1393
  int32_t                    lino = 0;
240,383✔
1394
  char*                      buf1 = NULL;
240,383✔
1395
  SUseDbReq*                 pReq = NULL;
240,383✔
1396
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
240,383✔
1397

1398
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
240,383✔
1399
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
240,383✔
1400
  code = tNameGetFullDbName(name, pReq->db);
240,383✔
1401
  QUERY_CHECK_CODE(code, lino, _return);
240,383✔
1402
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
240,383✔
1403
  buf1 = taosMemoryCalloc(1, contLen);
240,383✔
1404
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
240,383✔
1405
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
240,383✔
1406
  if (tempRes < 0) {
240,383✔
UNCOV
1407
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1408
  }
1409

1410
  // send the fetch remote task result request
1411
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
240,383✔
1412
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
240,383✔
1413

1414
  pMsgSendInfo->param = pOperator;
240,383✔
1415
  pMsgSendInfo->msgInfo.pData = buf1;
240,383✔
1416
  pMsgSendInfo->msgInfo.len = contLen;
240,383✔
1417
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
240,383✔
1418
  pMsgSendInfo->fp = dynProcessUseDbRsp;
240,383✔
1419
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
240,383✔
1420

1421
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
240,383✔
1422
  QUERY_CHECK_CODE(code, lino, _return);
240,383✔
1423

1424
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
240,383✔
1425
  QUERY_CHECK_CODE(code, lino, _return);
240,383✔
1426

1427
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
240,383✔
1428
  QUERY_CHECK_CODE(code, lino, _return);
240,383✔
1429

1430
_return:
240,383✔
1431
  if (code) {
240,383✔
1432
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1433
     taosMemoryFree(buf1);
×
1434
  }
1435
  taosMemoryFree(pReq);
240,383✔
1436
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
240,383✔
1437
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
240,383✔
1438
  return code;
240,383✔
1439
}
1440

1441
int dynVgInfoComp(const void* lp, const void* rp) {
463,236✔
1442
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
463,236✔
1443
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
463,236✔
1444
  if (pLeft->hashBegin < pRight->hashBegin) {
463,236✔
1445
    return -1;
463,236✔
UNCOV
1446
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
UNCOV
1447
    return 1;
×
1448
  }
1449

1450
  return 0;
×
1451
}
1452

1453
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
1,283,446✔
1454
  if (NULL == dbInfo) {
1,283,446✔
UNCOV
1455
    return TSDB_CODE_SUCCESS;
×
1456
  }
1457

1458
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
1,283,446✔
1459
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
240,383✔
1460
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
240,383✔
1461
    if (NULL == dbInfo->vgArray) {
239,982✔
UNCOV
1462
      return terrno;
×
1463
    }
1464

1465
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
239,982✔
1466
    while (pIter) {
712,384✔
1467
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
944,002✔
UNCOV
1468
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
UNCOV
1469
        return terrno;
×
1470
      }
1471

1472
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
472,001✔
1473
    }
1474

1475
    taosArraySort(dbInfo->vgArray, sort_func);
240,383✔
1476
  }
1477

1478
  return TSDB_CODE_SUCCESS;
1,283,446✔
1479
}
1480

1481
int32_t dynHashValueComp(void const* lp, void const* rp) {
1,916,024✔
1482
  uint32_t*    key = (uint32_t*)lp;
1,916,024✔
1483
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
1,916,024✔
1484

1485
  if (*key < pVg->hashBegin) {
1,916,024✔
1486
    return -1;
×
1487
  } else if (*key > pVg->hashEnd) {
1,915,623✔
1488
    return 1;
632,578✔
1489
  }
1490

1491
  return 0;
1,283,446✔
1492
}
1493

1494
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
1,283,446✔
1495
  int32_t code = 0;
1,283,446✔
1496
  int32_t lino = 0;
1,283,446✔
1497
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
1,283,446✔
1498
  QUERY_CHECK_CODE(code, lino, _return);
1,283,446✔
1499

1500
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
1,283,446✔
1501
  if (vgNum <= 0) {
1,283,446✔
UNCOV
1502
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
UNCOV
1503
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1504
  }
1505

1506
  SVgroupInfo* vgInfo = NULL;
1,283,446✔
1507
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1,283,446✔
1508
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
1,283,446✔
1509
  int32_t offset = (int32_t)strlen(tbFullName);
1,283,446✔
1510

1511
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
1,283,446✔
1512
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
2,566,491✔
1513
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
1,283,446✔
1514

1515
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
1,283,446✔
1516
  if (NULL == vgInfo) {
1,283,045✔
UNCOV
1517
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1518
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
UNCOV
1519
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1520
  }
1521

1522
  *vgId = vgInfo->vgId;
1,283,045✔
1523

1524
_return:
1,283,446✔
1525
  return code;
1,283,446✔
1526
}
1527

1528
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
6,643,055✔
1529
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
6,643,055✔
1530
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
6,643,055✔
1531
  SArray *                   pColList = pVtbScan->readColList;
6,643,055✔
1532
  if (pVtbScan->scanAllCols) {
6,643,055✔
1533
    return true;
630,942✔
1534
  }
1535
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
30,430,429✔
1536
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
26,575,398✔
1537
      return true;
2,157,082✔
1538
    }
1539
  }
1540
  return false;
3,855,031✔
1541
}
1542

1543
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
2,835,110✔
1544
  int32_t                    code = TSDB_CODE_SUCCESS;
2,835,110✔
1545
  int32_t                    line = 0;
2,835,110✔
1546
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,835,110✔
1547
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,835,110✔
1548
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,835,110✔
1549
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
2,835,110✔
1550
  SUseDbOutput*              output = NULL;
2,835,110✔
1551
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
2,835,110✔
1552

1553
  QRY_PARAM_CHECK(dbVgInfo);
2,835,110✔
1554

1555
  if (find == NULL) {
2,835,110✔
1556
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
240,383✔
1557
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
240,383✔
1558
    QUERY_CHECK_CODE(code, line, _return);
240,383✔
1559
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
240,383✔
1560
    QUERY_CHECK_CODE(code, line, _return);
240,383✔
1561
  } else {
1562
    output = *find;
2,594,727✔
1563
  }
1564

1565
  *dbVgInfo = output->dbVgroup;
2,835,110✔
1566
  return code;
2,835,110✔
UNCOV
1567
_return:
×
UNCOV
1568
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
1569
  freeUseDbOutput(output);
×
UNCOV
1570
  return code;
×
1571
}
1572

1573
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
2,835,110✔
1574
  int32_t     code = TSDB_CODE_SUCCESS;
2,835,110✔
1575
  int32_t     line = 0;
2,835,110✔
1576

1577
  const char *first_dot = strchr(colref, '.');
2,835,110✔
1578
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
2,835,110✔
1579

1580
  const char *second_dot = strchr(first_dot + 1, '.');
2,835,110✔
1581
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
2,835,110✔
1582

1583
  size_t db_len = first_dot - colref;
2,835,110✔
1584
  size_t table_len = second_dot - first_dot - 1;
2,835,110✔
1585
  size_t col_len = strlen(second_dot + 1);
2,835,110✔
1586

1587
  *refDb = taosMemoryMalloc(db_len + 1);
2,835,110✔
1588
  *refTb = taosMemoryMalloc(table_len + 1);
2,835,110✔
1589
  *refCol = taosMemoryMalloc(col_len + 1);
2,835,110✔
1590
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
2,835,110✔
1591
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
2,835,110✔
1592
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
2,835,110✔
1593

1594
  tstrncpy(*refDb, colref, db_len + 1);
2,835,110✔
1595
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
2,835,110✔
1596
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
2,835,110✔
1597

1598
  return TSDB_CODE_SUCCESS;
2,835,110✔
UNCOV
1599
_return:
×
UNCOV
1600
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
1601
  if (*refDb) {
×
UNCOV
1602
    taosMemoryFree(*refDb);
×
UNCOV
1603
    *refDb = NULL;
×
1604
  }
UNCOV
1605
  if (*refTb) {
×
UNCOV
1606
    taosMemoryFree(*refTb);
×
UNCOV
1607
    *refTb = NULL;
×
1608
  }
UNCOV
1609
  if (*refCol) {
×
UNCOV
1610
    taosMemoryFree(*refCol);
×
UNCOV
1611
    *refCol = NULL;
×
1612
  }
UNCOV
1613
  return code;
×
1614
}
1615

1616
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
28,292,699✔
1617
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
28,292,699✔
1618
      strlen(expectTbName) == varDataLen(tbName) &&
10,751,993✔
1619
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
10,751,993✔
1620
      strlen(expectDbName) == varDataLen(dbName)) {
10,751,993✔
1621
    return true;
10,751,993✔
1622
  }
1623
  return false;
17,540,706✔
1624
}
1625

1626
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
10,751,993✔
1627
  int32_t          code = TSDB_CODE_SUCCESS;
10,751,993✔
1628
  int32_t          line = 0;
10,751,993✔
1629

1630
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
10,751,993✔
1631
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
10,751,993✔
1632
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
10,751,993✔
1633
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
10,751,993✔
1634
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
10,751,993✔
1635
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
10,751,993✔
1636

1637
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
10,751,993✔
1638
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
10,751,993✔
1639
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
10,751,993✔
1640
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
10,751,993✔
1641
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
10,751,993✔
1642
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
10,751,993✔
1643

1644
  if (colDataIsNull_s(pRefCol, index)) {
21,503,986✔
1645
    pInfo->colrefName = NULL;
4,097,425✔
1646
  } else {
1647
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
6,654,568✔
1648
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
6,654,568✔
1649
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
6,654,568✔
1650
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
6,654,568✔
1651
  }
1652

1653
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
10,751,993✔
1654
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
10,751,993✔
1655
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
10,751,993✔
1656
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
10,751,993✔
1657

1658
  if (!colDataIsNull_s(pUidCol, index)) {
21,503,986✔
1659
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
10,751,993✔
1660
  }
1661
  if (!colDataIsNull_s(pColIdCol, index)) {
21,503,986✔
1662
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
6,654,568✔
1663
  }
1664
  if (!colDataIsNull_s(pVgIdCol, index)) {
21,503,986✔
1665
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
10,751,993✔
1666
  }
1667

UNCOV
1668
_return:
×
1669
  return code;
10,751,993✔
1670
}
1671

1672
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
338,603✔
1673
  int32_t                    code = TSDB_CODE_SUCCESS;
338,603✔
1674
  int32_t                    line = 0;
338,603✔
1675

1676
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
338,603✔
1677
    return code;
206,372✔
1678
  }
1679

1680
  if (pVtbScan->existOrgTbVg == NULL) {
132,231✔
1681
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
1682
    pVtbScan->curOrgTbVg = NULL;
×
1683
  }
1684

1685
  if (pVtbScan->curOrgTbVg != NULL) {
132,231✔
1686
    // which means rversion has changed
1687
    void*   pCurIter = NULL;
18,302✔
1688
    SArray* tmpArray = NULL;
18,302✔
1689
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
52,881✔
1690
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
34,579✔
1691
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
34,579✔
1692
        if (tmpArray == NULL) {
4,367✔
1693
          tmpArray = taosArrayInit(1, sizeof(int32_t));
4,367✔
1694
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
4,367✔
1695
        }
1696
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
4,367✔
1697
      }
1698
    }
1699
    if (tmpArray == NULL) {
18,302✔
1700
      return TSDB_CODE_SUCCESS;
13,935✔
1701
    }
1702
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
4,367✔
1703
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
4,367✔
1704
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
4,367✔
UNCOV
1705
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
UNCOV
1706
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
UNCOV
1707
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
1708
        }
UNCOV
1709
        taosArrayDestroy(expiredInfo);
×
1710
      }
1711
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
4,367✔
UNCOV
1712
        taosArrayDestroy(tmpArray);
×
1713
      }
1714
    }
1715
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
4,367✔
1716
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
4,367✔
1717
    taosHashClear(pVtbScan->curOrgTbVg);
4,367✔
1718
    pVtbScan->needRedeploy = true;
4,367✔
1719
    pVtbScan->rversion = rversion;
4,367✔
1720
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
4,367✔
1721
  }
1722
  return code;
113,929✔
UNCOV
1723
_return:
×
UNCOV
1724
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
UNCOV
1725
  return code;
×
1726
}
1727

1728
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
47,086✔
1729
  int32_t                    code =TSDB_CODE_SUCCESS;
47,086✔
1730
  int32_t                    line = 0;
47,086✔
1731
  char*                      refDbName = NULL;
47,086✔
1732
  char*                      refTbName = NULL;
47,086✔
1733
  char*                      refColName = NULL;
47,086✔
1734
  SDBVgInfo*                 dbVgInfo = NULL;
47,086✔
1735
  SName                      name = {0};
47,086✔
1736
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
47,086✔
1737
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
47,086✔
1738

1739
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
47,086✔
1740
  QUERY_CHECK_CODE(code, line, _return);
47,086✔
1741

1742
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
47,086✔
1743

1744
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
47,086✔
1745
  QUERY_CHECK_CODE(code, line, _return);
47,086✔
1746

1747
  code = tNameGetFullDbName(&name, dbFname);
47,086✔
1748
  QUERY_CHECK_CODE(code, line, _return);
47,086✔
1749

1750
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
47,086✔
1751
  QUERY_CHECK_CODE(code, line, _return);
47,086✔
1752

1753
_return:
47,086✔
1754
  taosMemoryFree(refDbName);
47,086✔
1755
  taosMemoryFree(refTbName);
47,086✔
1756
  taosMemoryFree(refColName);
47,086✔
1757
  return code;
47,086✔
1758
}
1759

1760
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
216,734✔
1761
  int32_t                    code = TSDB_CODE_SUCCESS;
216,734✔
1762
  int32_t                    line = 0;
216,734✔
1763
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
216,734✔
1764
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
216,734✔
1765
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
216,734✔
1766
  SArray*                    pColRefArray = NULL;
216,734✔
1767
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
216,734✔
1768

1769
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
216,734✔
1770
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
216,734✔
1771

1772
  while (true) {
438,630✔
1773
    SSDataBlock *pChildInfo = NULL;
655,364✔
1774
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
655,364✔
1775
    QUERY_CHECK_CODE(code, line, _return);
655,364✔
1776
    if (pChildInfo == NULL) {
655,364✔
1777
      break;
216,734✔
1778
    }
1779
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
438,630✔
1780
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
438,630✔
1781
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
438,630✔
1782

1783
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
438,630✔
1784
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
438,630✔
1785
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
438,630✔
1786

1787
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
20,421,000✔
1788
      if (!colDataIsNull_s(pStbNameCol, i)) {
39,964,740✔
1789
        char* stbrawname = colDataGetData(pStbNameCol, i);
19,982,370✔
1790
        char* dbrawname = colDataGetData(pDbNameCol, i);
19,982,370✔
1791
        char *ctbName = colDataGetData(pTableNameCol, i);
19,982,370✔
1792

1793
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
19,982,370✔
1794
          SColRefInfo info = {0};
10,150,670✔
1795
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
10,150,670✔
1796
          QUERY_CHECK_CODE(code, line, _return);
10,150,670✔
1797

1798
          if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
10,150,670✔
UNCOV
1799
            destroyColRefInfo(&info);
×
UNCOV
1800
            continue;
×
1801
          }
1802

1803
          if (pTaskInfo->pStreamRuntimeInfo) {
10,150,670✔
1804
            if (pVtbScan->curOrgTbVg == NULL) {
63,840✔
1805
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,000✔
1806
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
2,000✔
1807
            }
1808

1809
            if (info.colrefName) {
63,840✔
1810
              int32_t vgId;
34,382✔
1811
              code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
34,382✔
1812
              QUERY_CHECK_CODE(code, line, _return);
34,382✔
1813
              code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
34,382✔
1814
              QUERY_CHECK_CODE(code, line, _return);
34,382✔
1815
            }
1816

1817
          }
1818

1819
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
10,150,670✔
1820
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
650,057✔
1821
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
650,057✔
1822
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
1,300,114✔
1823
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
650,057✔
1824
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
1,300,114✔
1825
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
650,057✔
1826
            QUERY_CHECK_CODE(code, line, _return);
650,057✔
1827
          } else {
1828
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
9,500,613✔
1829
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
9,500,613✔
1830
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
9,500,613✔
1831
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
9,500,613✔
1832
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
19,001,226✔
1833
          }
1834
        }
1835
      }
1836
    }
1837
  }
1838

1839
  code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
216,734✔
1840
  QUERY_CHECK_CODE(code, line, _return);
216,734✔
1841

1842
_return:
216,734✔
1843
  if (code) {
216,734✔
1844
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,985✔
1845
  }
1846
  return code;
216,734✔
1847
}
1848

1849
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
121,869✔
1850
  int32_t                    code = TSDB_CODE_SUCCESS;
121,869✔
1851
  int32_t                    line = 0;
121,869✔
1852
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
121,869✔
1853
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
121,869✔
1854
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
121,869✔
1855
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
121,869✔
1856
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
121,869✔
1857
  int32_t                    rversion = 0;
121,869✔
1858

1859
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
121,869✔
1860
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
121,869✔
1861

1862
  while (true) {
236,974✔
1863
    SSDataBlock *pTableInfo = NULL;
358,843✔
1864
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
358,843✔
1865
    if (pTableInfo == NULL) {
358,843✔
1866
      break;
121,869✔
1867
    }
1868

1869
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
236,974✔
1870
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
236,974✔
1871
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
236,974✔
1872

1873
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
236,974✔
1874
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
236,974✔
1875
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
236,974✔
1876

1877
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
8,547,303✔
1878
      if (!colDataIsNull_s(pRefVerCol, i)) {
16,620,658✔
1879
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
8,310,329✔
1880
      }
1881

1882
      if (!colDataIsNull_s(pTableNameCol, i)) {
16,620,658✔
1883
        char* tbrawname = colDataGetData(pTableNameCol, i);
8,310,329✔
1884
        char* dbrawname = colDataGetData(pDbNameCol, i);
8,310,329✔
1885
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
8,310,329✔
1886
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
8,310,329✔
1887

1888
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
8,310,329✔
1889
          SColRefInfo info = {0};
601,323✔
1890
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
601,323✔
1891
          QUERY_CHECK_CODE(code, line, _return);
601,323✔
1892

1893
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
601,323✔
1894
            if (pVtbScan->curOrgTbVg == NULL) {
12,704✔
1895
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
794✔
1896
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
794✔
1897
            }
1898
            int32_t vgId;
12,704✔
1899
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
12,704✔
1900
            QUERY_CHECK_CODE(code, line, _return);
12,704✔
1901
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
12,704✔
1902
            QUERY_CHECK_CODE(code, line, _return);
12,704✔
1903
          }
1904

1905
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,202,646✔
1906
        }
1907
      }
1908
    }
1909
  }
1910
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
121,869✔
1911
  QUERY_CHECK_CODE(code, line, _return);
121,869✔
1912

1913
_return:
119,487✔
1914
  if (code) {
121,869✔
1915
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,382✔
1916
  }
1917
  return code;
121,869✔
1918
}
1919

1920
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
2,583,279✔
1921
  int32_t                    code = TSDB_CODE_SUCCESS;
2,583,279✔
1922
  int32_t                    line = 0;
2,583,279✔
1923
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,583,279✔
1924
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,583,279✔
1925
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
2,583,279✔
1926

1927
  SArray *tmpArray = NULL;
2,583,279✔
1928
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
2,583,279✔
1929
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
2,583,279✔
1930
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
8,734✔
1931
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
4,367✔
1932
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
4,367✔
1933
      QUERY_CHECK_CODE(code, line, _return);
4,367✔
1934
      if (pVtbScan->newAddedVgInfo == NULL) {
4,367✔
1935
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,588✔
1936
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
1,588✔
1937
      }
1938
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
4,367✔
1939
      QUERY_CHECK_CODE(code, line, _return);
4,367✔
1940
    }
1941
    pVtbScan->needRedeploy = false;
4,367✔
1942
  } else {
1943
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
2,578,912✔
1944
    QUERY_CHECK_CODE(code, line, _return);
2,578,912✔
1945
  }
1946

UNCOV
1947
_return:
×
1948
  taosArrayClear(tmpArray);
2,583,279✔
1949
  taosArrayDestroy(tmpArray);
2,583,279✔
1950
  if (code) {
2,583,279✔
1951
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,578,912✔
1952
  }
1953
  return code;
2,583,279✔
1954
}
1955

1956
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
766,368✔
1957
  int32_t                    code = TSDB_CODE_SUCCESS;
766,368✔
1958
  int32_t                    line = 0;
766,368✔
1959
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
766,368✔
1960
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
766,368✔
1961
  SDBVgInfo*                 dbVgInfo = NULL;
766,368✔
1962

1963
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
11,496,129✔
1964
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
10,729,761✔
1965
    *uid = pKV->uid;
10,729,761✔
1966
    *vgId = pKV->vgId;
10,729,761✔
1967
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
10,729,761✔
1968
      char*   refDbName = NULL;
2,788,024✔
1969
      char*   refTbName = NULL;
2,788,024✔
1970
      char*   refColName = NULL;
2,788,024✔
1971
      SName   name = {0};
2,788,024✔
1972
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
2,788,024✔
1973
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
2,788,024✔
1974

1975
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
2,788,024✔
1976
      QUERY_CHECK_CODE(code, line, _return);
2,788,024✔
1977

1978
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
2,788,024✔
1979

1980
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
2,788,024✔
1981
      QUERY_CHECK_CODE(code, line, _return);
2,788,024✔
1982
      code = tNameGetFullDbName(&name, dbFname);
2,788,024✔
1983
      QUERY_CHECK_CODE(code, line, _return);
2,788,024✔
1984
      code = tNameGetFullTableName(&name, orgTbFName);
2,788,024✔
1985
      QUERY_CHECK_CODE(code, line, _return);
2,788,024✔
1986

1987
      void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
2,788,024✔
1988
      if (!pVal) {
2,788,024✔
1989
        SOrgTbInfo map = {0};
1,236,360✔
1990
        code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
1,236,360✔
1991
        QUERY_CHECK_CODE(code, line, _return);
1,236,360✔
1992
        tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
1,236,360✔
1993
        map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
1,236,360✔
1994
        QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno)
1,235,959✔
1995
        SColIdNameKV colIdNameKV = {0};
1,235,959✔
1996
        colIdNameKV.colId = pKV->colId;
1,235,959✔
1997
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
1,236,360✔
1998
        QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno)
2,472,720✔
1999
        code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
1,236,360✔
2000
        QUERY_CHECK_CODE(code, line, _return);
1,236,360✔
2001
      } else {
2002
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
1,551,664✔
2003
        SColIdNameKV colIdNameKV = {0};
1,551,664✔
2004
        colIdNameKV.colId = pKV->colId;
1,551,664✔
2005
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
1,551,664✔
2006
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
3,103,328✔
2007
      }
2008
      taosMemoryFree(refDbName);
2,788,024✔
2009
      taosMemoryFree(refTbName);
2,788,024✔
2010
      taosMemoryFree(refColName);
2,788,024✔
2011
    }
2012
  }
2013

2014
_return:
766,368✔
2015
  if (code) {
766,368✔
UNCOV
2016
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2017
  }
2018
  return code;
766,368✔
2019
}
2020

2021
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
766,368✔
2022
  int32_t                    code = TSDB_CODE_SUCCESS;
766,368✔
2023
  int32_t                    line = 0;
766,368✔
2024
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
766,368✔
2025
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
766,368✔
2026
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
766,368✔
2027

2028
  pVtbScan->vtbScanParam = NULL;
766,368✔
2029
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
766,368✔
2030
  QUERY_CHECK_CODE(code, line, _return);
766,368✔
2031

2032
  void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
766,368✔
2033
  while (pIter != NULL) {
2,002,728✔
2034
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
1,236,360✔
2035
    SOperatorParam*  pExchangeParam = NULL;
1,236,360✔
2036
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
1,236,360✔
2037
    if (addr != NULL) {
1,236,360✔
2038
      code = buildExchangeOperatorParamForVScanEx(&pExchangeParam, 0, pMap, pTaskInfo->id.taskId, addr);
4,367✔
2039
      QUERY_CHECK_CODE(code, line, _return);
4,367✔
2040
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
4,367✔
2041
      QUERY_CHECK_CODE(code, line, _return);
4,367✔
2042
    } else {
2043
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
1,231,993✔
2044
      QUERY_CHECK_CODE(code, line, _return);
1,231,993✔
2045
    }
2046
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
2,472,720✔
2047
    pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
1,236,360✔
2048
  }
2049

2050
  SOperatorParam*  pExchangeParam = NULL;
766,368✔
2051
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
766,368✔
2052
  QUERY_CHECK_CODE(code, line, _return);
766,368✔
2053
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
766,368✔
2054

2055
_return:
765,967✔
2056
  if (code) {
765,967✔
UNCOV
2057
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2058
  }
2059
  return code;
765,967✔
2060
}
2061

2062
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
2,776,998✔
2063
  int32_t                    code = TSDB_CODE_SUCCESS;
2,776,998✔
2064
  int32_t                    line = 0;
2,776,998✔
2065
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,776,998✔
2066
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,776,998✔
2067
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
2,776,998✔
2068

2069
  pVtbScan->orgTbVgColMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
2,776,998✔
2070
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno)
2,776,998✔
2071
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
2,776,998✔
2072

2073
  while (true) {
2074
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
3,209,130✔
2075
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
2,442,762✔
2076
      QUERY_CHECK_CODE(code, line, _return);
2,442,762✔
2077
    } else {
2078
      taosHashClear(pVtbScan->orgTbVgColMap);
766,368✔
2079
      SArray* pColRefInfo = NULL;
766,368✔
2080
      if (pVtbScan->isSuperTable) {
766,368✔
2081
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
646,881✔
2082
      } else {
2083
        pColRefInfo = pInfo->vtbScan.colRefInfo;
119,487✔
2084
      }
2085
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
766,368✔
2086

2087
      tb_uid_t uid = 0;
766,368✔
2088
      int32_t  vgId = 0;
766,368✔
2089
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
766,368✔
2090
      QUERY_CHECK_CODE(code, line, _return);
766,368✔
2091

2092
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
766,368✔
2093
      QUERY_CHECK_CODE(code, line, _return);
766,368✔
2094

2095
      // reset downstream operator's status
2096
      pVtbScanOp->status = OP_NOT_OPENED;
766,368✔
2097
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
765,967✔
2098
      QUERY_CHECK_CODE(code, line, _return);
766,368✔
2099
    }
2100

2101
    if (*pRes) {
3,209,130✔
2102
      // has result, still read data from this table.
2103
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
2,442,762✔
2104
      break;
2,442,762✔
2105
    } else {
2106
      // no result, read next table.
2107
      pVtbScan->curTableIdx++;
766,368✔
2108
      if (pVtbScan->isSuperTable) {
766,368✔
2109
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
646,881✔
2110
          setOperatorCompleted(pOperator);
214,749✔
2111
          break;
214,749✔
2112
        }
2113
      } else {
2114
        setOperatorCompleted(pOperator);
119,487✔
2115
        break;
119,487✔
2116
      }
2117
    }
2118
  }
2119

2120
_return:
2,776,998✔
2121
  taosHashCleanup(pVtbScan->orgTbVgColMap);
2,776,998✔
2122
  pVtbScan->orgTbVgColMap = NULL;
2,776,998✔
2123
  if (code) {
2,776,998✔
UNCOV
2124
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2125
  }
2126
  return code;
2,776,998✔
2127
}
2128

2129
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
2,781,365✔
2130
  int32_t                    code = TSDB_CODE_SUCCESS;
2,781,365✔
2131
  int32_t                    line = 0;
2,781,365✔
2132
  int64_t                    st = 0;
2,781,365✔
2133
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,781,365✔
2134
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,781,365✔
2135

2136
  if (OPTR_IS_OPENED(pOperator)) {
2,780,968✔
2137
    return code;
2,442,762✔
2138
  }
2139

2140
  if (pOperator->cost.openCost == 0) {
338,603✔
2141
    st = taosGetTimestampUs();
251,160✔
2142
  }
2143

2144
  if (pVtbScan->isSuperTable) {
338,603✔
2145
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
216,734✔
2146
    QUERY_CHECK_CODE(code, line, _return);
216,734✔
2147
  } else {
2148
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
121,472✔
2149
    QUERY_CHECK_CODE(code, line, _return);
121,869✔
2150
  }
2151

2152
  OPTR_SET_OPENED(pOperator);
334,236✔
2153

2154
_return:
338,603✔
2155
  if (pOperator->cost.openCost == 0) {
338,603✔
2156
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
251,160✔
2157
  }
2158
  if (code) {
338,603✔
2159
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
4,367✔
2160
    pOperator->pTaskInfo->code = code;
4,367✔
2161
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
4,367✔
2162
  }
2163
  return code;
334,236✔
2164
}
2165

2166
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
5,360,277✔
2167
  int32_t                    code = TSDB_CODE_SUCCESS;
5,360,277✔
2168
  int32_t                    line = 0;
5,360,277✔
2169
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
5,360,277✔
2170
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
5,360,277✔
2171

2172
  QRY_PARAM_CHECK(pRes);
5,360,277✔
2173
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
5,360,277✔
UNCOV
2174
    return code;
×
2175
  }
2176
  if (pOperator->pOperatorGetParam) {
5,360,277✔
NEW
2177
    if (pOperator->status == OP_EXEC_DONE) {
×
NEW
2178
      pOperator->status = OP_OPENED;
×
2179
    }
NEW
2180
    pVtbScan->curTableIdx = 0;
×
NEW
2181
    pVtbScan->lastTableIdx = -1;
×
NEW
2182
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
NEW
2183
    pOperator->pOperatorGetParam = NULL;
×
2184
  } else {
2185
    pVtbScan->window.skey = INT64_MAX;
5,359,880✔
2186
    pVtbScan->window.ekey = INT64_MIN;
5,360,277✔
2187
  }
2188

2189
  if (pVtbScan->needRedeploy) {
5,360,277✔
2190
    code = virtualTableScanCheckNeedRedeploy(pOperator);
2,583,279✔
2191
    QUERY_CHECK_CODE(code, line, _return);
2,583,279✔
2192
  }
2193

2194
  code = pOperator->fpSet._openFn(pOperator);
2,781,365✔
2195
  QUERY_CHECK_CODE(code, line, _return);
2,776,998✔
2196

2197
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
2,776,998✔
NEW
2198
    setOperatorCompleted(pOperator);
×
NEW
2199
    return code;
×
2200
  }
2201

2202
  code = virtualTableScanGetNext(pOperator, pRes);
2,776,998✔
2203
  QUERY_CHECK_CODE(code, line, _return);
2,776,998✔
2204

2205
  return code;
2,776,998✔
2206

2207
_return:
2,578,912✔
2208
  if (code) {
2,578,912✔
2209
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,578,912✔
2210
    pOperator->pTaskInfo->code = code;
2,578,515✔
2211
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
2,578,912✔
2212
  }
UNCOV
2213
  return code;
×
2214
}
2215

2216
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
505,939✔
2217
  if (batchFetch) {
505,939✔
2218
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
505,279✔
2219
    if (NULL == pPrev->leftHash) {
505,279✔
UNCOV
2220
      return terrno;
×
2221
    }
2222
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
505,279✔
2223
    if (NULL == pPrev->rightHash) {
505,279✔
UNCOV
2224
      return terrno;
×
2225
    }
2226
  } else {
2227
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
660✔
2228
    if (NULL == pPrev->leftCache) {
660✔
UNCOV
2229
      return terrno;
×
2230
    }
2231
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
660✔
2232
    if (NULL == pPrev->rightCache) {
660✔
UNCOV
2233
      return terrno;
×
2234
    }
2235
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
660✔
2236
    if (NULL == pPrev->onceTable) {
660✔
UNCOV
2237
      return terrno;
×
2238
    }
2239
  }
2240

2241
  return TSDB_CODE_SUCCESS;
505,939✔
2242
}
2243

2244
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
251,160✔
2245
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2246
  int32_t      code = TSDB_CODE_SUCCESS;
251,160✔
2247
  int32_t      line = 0;
251,160✔
2248

2249
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
251,160✔
2250
  QUERY_CHECK_CODE(code, line, _return);
251,160✔
2251

2252
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
251,160✔
2253
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
251,160✔
2254
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
251,160✔
2255
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
251,160✔
2256
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
251,160✔
2257
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
251,160✔
2258
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
251,160✔
2259
  pInfo->vtbScan.needRedeploy = false;
251,160✔
2260
  pInfo->vtbScan.pMsgCb = pMsgCb;
251,160✔
2261
  pInfo->vtbScan.curTableIdx = 0;
251,160✔
2262
  pInfo->vtbScan.lastTableIdx = -1;
251,160✔
2263
  pInfo->vtbScan.dynTbUid = 0;
251,160✔
2264
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
251,160✔
2265
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
251,160✔
2266
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
251,160✔
2267
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
251,160✔
2268
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
251,160✔
2269
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
251,160✔
2270
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
708,692✔
2271
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
457,532✔
2272
    int32_t vgId = (int32_t)valueNode->datum.i;
457,532✔
2273
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
457,532✔
2274
    QUERY_CHECK_CODE(code, line, _return);
457,532✔
2275
  }
2276

2277
  if (pPhyciNode->dynTbname) {
251,160✔
UNCOV
2278
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
UNCOV
2279
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
UNCOV
2280
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
UNCOV
2281
      if (pValue != NULL && pValue->isTbname) {
×
UNCOV
2282
        pInfo->vtbScan.dynTbUid = pValue->uid;
×
UNCOV
2283
        break;
×
2284
      }
2285
    }
2286
  }
2287

2288
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
251,160✔
2289
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
251,160✔
2290

2291
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
1,791,516✔
2292
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
1,540,356✔
2293
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
1,540,356✔
2294
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
3,080,712✔
2295
  }
2296

2297
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
251,160✔
2298
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
251,160✔
2299

2300
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
251,160✔
2301
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
251,160✔
2302

2303
  return code;
251,160✔
UNCOV
2304
_return:
×
2305
  // no need to destroy array and hashmap allocated in this function,
2306
  // since the operator's destroy function will take care of it
2307
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
UNCOV
2308
  return code;
×
2309
}
2310

NEW
2311
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
×
2312
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
NEW
2313
  int32_t              code = TSDB_CODE_SUCCESS;
×
NEW
2314
  int32_t              line = 0;
×
NEW
2315
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
×
2316

NEW
2317
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
×
NEW
2318
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
×
NEW
2319
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
×
NEW
2320
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
×
NEW
2321
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
×
NEW
2322
  pInfo->vtbWindow.singleWinMode = pPhyciNode->vtbWindow.singleWinMode;
×
NEW
2323
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
×
2324

NEW
2325
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
×
NEW
2326
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
×
2327

NEW
2328
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
×
NEW
2329
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
×
2330

NEW
2331
  pInfo->vtbWindow.outputWstartSlotId = -1;
×
NEW
2332
  pInfo->vtbWindow.outputWendSlotId = -1;
×
NEW
2333
  pInfo->vtbWindow.outputWdurationSlotId = -1;
×
NEW
2334
  pInfo->vtbWindow.curWinBatchIdx = 0;
×
2335

NEW
2336
  initResultSizeInfo(&pOperator->resultInfo, 1);
×
NEW
2337
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
×
NEW
2338
  QUERY_CHECK_CODE(code, line, _return);
×
2339

NEW
2340
  return code;
×
NEW
2341
_return:
×
NEW
2342
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
NEW
2343
  return code;
×
2344
}
2345

NEW
2346
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
×
NEW
2347
  int32_t code = TSDB_CODE_SUCCESS;
×
NEW
2348
  int32_t lino = 0;
×
2349

NEW
2350
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
×
NEW
2351
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
×
NEW
2352
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
×
2353

NEW
2354
    *ppTsCols = (int64_t*)pColDataInfo->pData;
×
2355

NEW
2356
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
×
NEW
2357
      code = blockDataUpdateTsWindow(pBlock, slotId);
×
NEW
2358
      QUERY_CHECK_CODE(code, lino, _return);
×
2359
    }
2360
  }
2361

NEW
2362
  return code;
×
NEW
2363
_return:
×
NEW
2364
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2365
  return code;
×
2366
}
2367

NEW
2368
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
NEW
2369
  int32_t                       code = TSDB_CODE_SUCCESS;
×
NEW
2370
  int32_t                       lino = 0;
×
NEW
2371
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2372

NEW
2373
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
2374
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2375

NEW
2376
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
NEW
2377
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2378

NEW
2379
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
NEW
2380
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2381

NEW
2382
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
NEW
2383
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2384

NEW
2385
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
NEW
2386
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2387

NEW
2388
  SOperatorParam* pExchangeOperator = NULL;
×
NEW
2389
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
NEW
2390
  QUERY_CHECK_CODE(code, lino, _return);
×
NEW
2391
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
×
2392

NEW
2393
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
NEW
2394
  (*ppRes)->downstreamIdx = idx;
×
NEW
2395
  (*ppRes)->value = pExtWinOp;
×
NEW
2396
  (*ppRes)->reUse = false;
×
2397

NEW
2398
  return code;
×
NEW
2399
_return:
×
NEW
2400
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2401
  if (pExtWinOp) {
×
NEW
2402
    if (pExtWinOp->ExtWins) {
×
NEW
2403
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2404
    }
NEW
2405
    taosMemoryFree(pExtWinOp);
×
2406
  }
NEW
2407
  if (*ppRes) {
×
NEW
2408
    if ((*ppRes)->pChildren) {
×
NEW
2409
      taosArrayDestroy((*ppRes)->pChildren);
×
2410
    }
NEW
2411
    taosMemoryFree(*ppRes);
×
NEW
2412
    *ppRes = NULL;
×
2413
  }
NEW
2414
  return code;
×
2415
}
2416

NEW
2417
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
×
2418
                                       int32_t numOfDownstream, int32_t numOfWins) {
NEW
2419
  int32_t                   code = TSDB_CODE_SUCCESS;
×
NEW
2420
  int32_t                   lino = 0;
×
NEW
2421
  SMergeOperatorParam*      pMergeOp = NULL;
×
2422

NEW
2423
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
2424
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2425

NEW
2426
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
×
NEW
2427
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2428

NEW
2429
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
×
NEW
2430
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
×
2431

NEW
2432
  pMergeOp->winNum = numOfWins;
×
2433

NEW
2434
  for (int32_t i = 0; i < numOfDownstream; i++) {
×
NEW
2435
    SOperatorParam* pExternalWinParam = NULL;
×
NEW
2436
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
×
NEW
2437
    QUERY_CHECK_CODE(code, lino, _return);
×
NEW
2438
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
×
2439
  }
2440

NEW
2441
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
×
NEW
2442
  (*ppRes)->downstreamIdx = 0;
×
NEW
2443
  (*ppRes)->value = pMergeOp;
×
NEW
2444
  (*ppRes)->reUse = false;
×
2445

NEW
2446
  return TSDB_CODE_SUCCESS;
×
NEW
2447
_return:
×
NEW
2448
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2449
  if (pMergeOp) {
×
NEW
2450
    taosMemoryFree(pMergeOp);
×
2451
  }
NEW
2452
  if (*ppRes) {
×
NEW
2453
    if ((*ppRes)->pChildren) {
×
NEW
2454
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
NEW
2455
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
NEW
2456
        if (pChildParam) {
×
NEW
2457
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
NEW
2458
          if (pExtWinOp) {
×
NEW
2459
            if (pExtWinOp->ExtWins) {
×
NEW
2460
              taosArrayDestroy(pExtWinOp->ExtWins);
×
2461
            }
NEW
2462
            taosMemoryFree(pExtWinOp);
×
2463
          }
NEW
2464
          taosMemoryFree(pChildParam);
×
2465
        }
2466
      }
NEW
2467
      taosArrayDestroy((*ppRes)->pChildren);
×
2468
    }
NEW
2469
    taosMemoryFree(*ppRes);
×
NEW
2470
    *ppRes = NULL;
×
2471
  }
NEW
2472
  return code;
×
2473
}
2474

NEW
2475
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
×
NEW
2476
  int32_t                    code = TSDB_CODE_SUCCESS;
×
NEW
2477
  int32_t                    lino = 0;
×
NEW
2478
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
NEW
2479
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
NEW
2480
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
NEW
2481
  int64_t                    st = 0;
×
2482

NEW
2483
  if (OPTR_IS_OPENED(pOperator)) {
×
NEW
2484
    return code;
×
2485
  }
2486

NEW
2487
  if (pOperator->cost.openCost == 0) {
×
NEW
2488
    st = taosGetTimestampUs();
×
2489
  }
2490

NEW
2491
  while (1) {
×
NEW
2492
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
NEW
2493
    if (pBlock == NULL) {
×
NEW
2494
      break;
×
2495
    }
2496

NEW
2497
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
×
NEW
2498
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
×
NEW
2499
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
×
NEW
2500
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
×
NEW
2501
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
×
NEW
2502
            pInfo->outputWstartSlotId = i;
×
NEW
2503
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
×
NEW
2504
            pInfo->outputWendSlotId = i;
×
NEW
2505
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
×
NEW
2506
            pInfo->outputWdurationSlotId = i;
×
2507
          }
2508
        }
2509
      }
2510
    }
2511

NEW
2512
    TSKEY* wstartCol = NULL;
×
NEW
2513
    TSKEY* wendCol = NULL;
×
2514

NEW
2515
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
×
NEW
2516
    QUERY_CHECK_CODE(code, lino, _return);
×
NEW
2517
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
×
NEW
2518
    QUERY_CHECK_CODE(code, lino, _return);
×
2519

NEW
2520
    if (pDynInfo->vtbWindow.singleWinMode) {
×
NEW
2521
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
NEW
2522
        SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
NEW
2523
        QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
2524

NEW
2525
        QUERY_CHECK_NULL(taosArrayReserve(pWin, 1), code, lino, _return, terrno);
×
2526

NEW
2527
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, 0);
×
NEW
2528
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2529
        pWindow->tw.skey = wstartCol[i];
×
NEW
2530
        pWindow->tw.ekey = wendCol[i] + 1;
×
NEW
2531
        pWindow->winOutIdx = -1;
×
2532

NEW
2533
        QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
2534
      }
2535
    } else {
NEW
2536
      SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
NEW
2537
      QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
2538

NEW
2539
      QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
×
2540

NEW
2541
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
NEW
2542
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
×
NEW
2543
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2544
        pWindow->tw.skey = wstartCol[i];
×
NEW
2545
        pWindow->tw.ekey = wendCol[i] + 1;
×
NEW
2546
        pWindow->winOutIdx = -1;
×
2547
      }
2548

NEW
2549
      QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
2550
    }
2551
  }
2552

2553
  // handle first window's start key and last window's end key
NEW
2554
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
×
NEW
2555
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
×
2556

NEW
2557
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
×
NEW
2558
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
×
2559

NEW
2560
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
×
NEW
2561
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
×
2562

NEW
2563
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
NEW
2564
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2565

NEW
2566
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
×
NEW
2567
    lastWin->tw.ekey = INT64_MAX;
×
2568
  }
NEW
2569
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
×
NEW
2570
    firstWin->tw.skey = INT64_MIN;
×
2571
  }
2572

NEW
2573
  OPTR_SET_OPENED(pOperator);
×
2574

NEW
2575
  if (pOperator->cost.openCost == 0) {
×
NEW
2576
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
2577
  }
2578

NEW
2579
_return:
×
NEW
2580
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
2581
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2582
    pTaskInfo->code = code;
×
NEW
2583
    T_LONG_JMP(pTaskInfo->env, code);
×
2584
  }
NEW
2585
  return code;
×
2586
}
2587

NEW
2588
static int32_t buildDynQueryCtrlOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
NEW
2589
  int32_t                     code = TSDB_CODE_SUCCESS;
×
NEW
2590
  int32_t                     lino = 0;
×
NEW
2591
  SDynQueryCtrlOperatorParam* pDyn = NULL;
×
2592

NEW
2593
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
2594
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2595

NEW
2596
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
NEW
2597
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2598

NEW
2599
  pDyn = taosMemoryMalloc(sizeof(SDynQueryCtrlOperatorParam));
×
NEW
2600
  QUERY_CHECK_NULL(pDyn, code, lino, _return, terrno);
×
2601

NEW
2602
  pDyn->window.skey = skey;
×
NEW
2603
  pDyn->window.ekey = ekey;
×
2604

NEW
2605
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL;
×
NEW
2606
  (*ppRes)->downstreamIdx = 0;
×
NEW
2607
  (*ppRes)->reUse = false;
×
NEW
2608
  (*ppRes)->value = pDyn;
×
2609

NEW
2610
  return code;
×
NEW
2611
_return:
×
NEW
2612
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2613
  if (pDyn) {
×
NEW
2614
    taosMemoryFree(pDyn);
×
2615
  }
NEW
2616
  if (*ppRes) {
×
NEW
2617
    if ((*ppRes)->pChildren) {
×
NEW
2618
      taosArrayDestroy((*ppRes)->pChildren);
×
2619
    }
NEW
2620
    taosMemoryFree(*ppRes);
×
NEW
2621
    *ppRes = NULL;
×
2622
  }
NEW
2623
  return code;
×
2624
}
2625

NEW
2626
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
NEW
2627
  int32_t                       code = TSDB_CODE_SUCCESS;
×
NEW
2628
  int32_t                       lino = 0;
×
NEW
2629
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2630

NEW
2631
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
NEW
2632
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2633

NEW
2634
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
NEW
2635
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2636

NEW
2637
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
NEW
2638
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2639

NEW
2640
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
NEW
2641
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2642

NEW
2643
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
NEW
2644
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2645

NEW
2646
  SOperatorParam* pDynQueryCtrlParam = NULL;
×
NEW
2647
  code = buildDynQueryCtrlOperatorParamForExternalWindow(&pDynQueryCtrlParam, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
NEW
2648
  QUERY_CHECK_CODE(code, lino, _return);
×
NEW
2649
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pDynQueryCtrlParam), code, lino, _return, terrno)
×
2650

NEW
2651
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
NEW
2652
  (*ppRes)->downstreamIdx = idx;
×
NEW
2653
  (*ppRes)->value = pExtWinOp;
×
NEW
2654
  (*ppRes)->reUse = false;
×
2655

NEW
2656
  return code;
×
NEW
2657
_return:
×
NEW
2658
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2659
  if (pExtWinOp) {
×
NEW
2660
    if (pExtWinOp->ExtWins) {
×
NEW
2661
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2662
    }
NEW
2663
    taosMemoryFree(pExtWinOp);
×
2664
  }
NEW
2665
  if (*ppRes) {
×
NEW
2666
    if ((*ppRes)->pChildren) {
×
NEW
2667
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
NEW
2668
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
NEW
2669
        if (pChildParam) {
×
NEW
2670
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
NEW
2671
          if (pDynParam) {
×
NEW
2672
            taosMemoryFree(pDynParam);
×
2673
          }
NEW
2674
          taosMemoryFree(pChildParam);
×
2675
        }
2676
      }
NEW
2677
      taosArrayDestroy((*ppRes)->pChildren);
×
2678
    }
NEW
2679
    taosMemoryFree(*ppRes);
×
NEW
2680
    *ppRes = NULL;
×
2681
  }
NEW
2682
  return code;
×
2683
}
2684

NEW
2685
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
NEW
2686
  int32_t                    code = TSDB_CODE_SUCCESS;
×
NEW
2687
  int32_t                    lino = 0;
×
NEW
2688
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
NEW
2689
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
NEW
2690
  int64_t                    st = taosGetTimestampUs();
×
NEW
2691
  int32_t                    numOfWins = 0;
×
NEW
2692
  SOperatorInfo*             mergeOp = NULL;
×
NEW
2693
  SOperatorInfo*             extWinOp = NULL;
×
NEW
2694
  SOperatorParam*            pMergeParam = NULL;
×
NEW
2695
  SOperatorParam*            pExtWinParam = NULL;
×
NEW
2696
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
NEW
2697
  SSDataBlock*               pRes = pInfo->pRes;
×
2698

NEW
2699
  code = pOperator->fpSet._openFn(pOperator);
×
NEW
2700
  QUERY_CHECK_CODE(code, lino, _return);
×
2701

NEW
2702
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
×
NEW
2703
    *ppRes = NULL;
×
NEW
2704
    return code;
×
2705
  }
2706

NEW
2707
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
×
NEW
2708
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
×
2709

NEW
2710
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
×
2711

NEW
2712
  if (pInfo->isVstb) {
×
NEW
2713
    extWinOp = pOperator->pDownstream[1];
×
NEW
2714
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
×
NEW
2715
    QUERY_CHECK_CODE(code, lino, _return);
×
2716

NEW
2717
    SSDataBlock* pExtWinBlock = NULL;
×
NEW
2718
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
×
NEW
2719
    QUERY_CHECK_CODE(code, lino, _return);
×
2720

NEW
2721
    blockDataCleanup(pRes);
×
NEW
2722
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
NEW
2723
    QUERY_CHECK_CODE(code, lino, _return);
×
2724

NEW
2725
    if (pExtWinBlock) {
×
NEW
2726
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
×
NEW
2727
      QUERY_CHECK_CODE(code, lino, _return);
×
2728

NEW
2729
      if (pInfo->curWinBatchIdx == 0) {
×
2730
        // first batch, get _wstart from pMergedBlock
NEW
2731
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
NEW
2732
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2733

NEW
2734
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
×
2735
      }
NEW
2736
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
2737
        // last batch, get _wend from pMergedBlock
NEW
2738
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
NEW
2739
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2740

NEW
2741
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
×
2742
      }
2743
    }
2744
  } else {
NEW
2745
    mergeOp = pOperator->pDownstream[1];
×
NEW
2746
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
×
NEW
2747
    QUERY_CHECK_CODE(code, lino, _return);
×
2748

NEW
2749
    SSDataBlock* pMergedBlock = NULL;
×
NEW
2750
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
×
NEW
2751
    QUERY_CHECK_CODE(code, lino, _return);
×
2752

NEW
2753
    blockDataCleanup(pRes);
×
NEW
2754
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
NEW
2755
    QUERY_CHECK_CODE(code, lino, _return);
×
2756

NEW
2757
    if (pMergedBlock) {
×
NEW
2758
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
×
NEW
2759
      QUERY_CHECK_CODE(code, lino, _return);
×
2760

NEW
2761
      if (pInfo->curWinBatchIdx == 0) {
×
2762
        // first batch, get _wstart from pMergedBlock
NEW
2763
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
NEW
2764
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2765

NEW
2766
        firstWin->tw.skey = pMergedBlock->info.window.skey;
×
2767
      }
NEW
2768
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
2769
        // last batch, get _wend from pMergedBlock
NEW
2770
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
NEW
2771
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2772

NEW
2773
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
×
2774
      }
2775
    }
2776
  }
2777

2778

NEW
2779
  if (pInfo->outputWstartSlotId != -1) {
×
NEW
2780
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
×
NEW
2781
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
×
2782

NEW
2783
    for (int32_t i = 0; i < numOfWins; i++) {
×
NEW
2784
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
NEW
2785
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2786
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
×
NEW
2787
      QUERY_CHECK_CODE(code, lino, _return);
×
2788
    }
2789
  }
NEW
2790
  if (pInfo->outputWendSlotId != -1) {
×
NEW
2791
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
×
NEW
2792
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
×
2793

NEW
2794
    for (int32_t i = 0; i < numOfWins; i++) {
×
NEW
2795
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
NEW
2796
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2797
      TSKEY ekey = pWindow->tw.ekey - 1;
×
NEW
2798
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
×
NEW
2799
      QUERY_CHECK_CODE(code, lino, _return);
×
2800
    }
2801
  }
NEW
2802
  if (pInfo->outputWdurationSlotId != -1) {
×
NEW
2803
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
×
NEW
2804
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
×
2805

NEW
2806
    for (int32_t i = 0; i < numOfWins; i++) {
×
NEW
2807
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
NEW
2808
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
NEW
2809
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
×
NEW
2810
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
×
NEW
2811
      QUERY_CHECK_CODE(code, lino, _return);
×
2812
    }
2813
  }
2814

NEW
2815
  pRes->info.rows = numOfWins;
×
NEW
2816
  *ppRes = pRes;
×
NEW
2817
  pInfo->curWinBatchIdx++;
×
2818

NEW
2819
  return code;
×
2820

NEW
2821
_return:
×
NEW
2822
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
2823
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
2824
    pTaskInfo->code = code;
×
NEW
2825
    T_LONG_JMP(pTaskInfo->env, code);
×
2826
  }
NEW
2827
  return code;
×
2828
}
2829

2830
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,790,229✔
2831
  SDynQueryCtrlOperatorInfo* pDyn = pOper->info;
2,790,229✔
2832
  pOper->status = OP_NOT_OPENED;
2,791,023✔
2833

2834
  switch (pDyn->qType) {
2,791,023✔
UNCOV
2835
    case DYN_QTYPE_STB_HASH:{
×
UNCOV
2836
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
×
UNCOV
2837
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
×
UNCOV
2838
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
×
2839
      
UNCOV
2840
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
×
UNCOV
2841
      if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
2842
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
UNCOV
2843
        return code;
×
2844
      }
UNCOV
2845
      pStbJoin->ctx.prev.pListHead = NULL;
×
UNCOV
2846
      pStbJoin->ctx.prev.joinBuild = false;
×
UNCOV
2847
      pStbJoin->ctx.prev.pListTail = NULL;
×
UNCOV
2848
      pStbJoin->ctx.prev.tableNum = 0;
×
2849

UNCOV
2850
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
×
UNCOV
2851
      break; 
×
2852
    }
2853
    case DYN_QTYPE_VTB_SCAN: {
2,791,023✔
2854
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,791,023✔
2855
      
2856
      if (pVtbScan->orgTbVgColMap) {
2,791,023✔
UNCOV
2857
        taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
UNCOV
2858
        taosHashCleanup(pVtbScan->orgTbVgColMap);
×
UNCOV
2859
        pVtbScan->orgTbVgColMap = NULL;
×
2860
      }
2861
      if (pVtbScan->pRsp) {
2,790,626✔
UNCOV
2862
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
UNCOV
2863
        taosMemoryFreeClear(pVtbScan->pRsp);
×
2864
      }
2865
      if (pVtbScan->colRefInfo) {
2,790,626✔
2866
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
121,869✔
2867
        pVtbScan->colRefInfo = NULL;
121,869✔
2868
      }
2869
      if (pVtbScan->childTableMap) {
2,791,023✔
2870
        taosHashCleanup(pVtbScan->childTableMap);
10,362✔
2871
        pVtbScan->childTableMap = NULL;
10,362✔
2872
      }
2873
      if (pVtbScan->childTableList) {
2,790,626✔
2874
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,790,229✔
2875
      }
2876
      pVtbScan->curTableIdx = 0;
2,790,626✔
2877
      pVtbScan->lastTableIdx = -1;
2,790,626✔
2878
      break;
2,790,229✔
2879
    }
NEW
2880
    case DYN_QTYPE_VTB_WINDOW: {
×
NEW
2881
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
NEW
2882
      if (pVtbWindow->pRes) {
×
NEW
2883
        blockDataDestroy(pVtbWindow->pRes);
×
NEW
2884
        pVtbWindow->pRes = NULL;
×
2885
      }
NEW
2886
      if (pVtbWindow->pWins) {
×
NEW
2887
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
NEW
2888
        pVtbWindow->pWins = NULL;
×
2889
      }
NEW
2890
      pVtbWindow->outputWdurationSlotId = -1;
×
NEW
2891
      pVtbWindow->outputWendSlotId = -1;
×
NEW
2892
      pVtbWindow->outputWstartSlotId = -1;
×
NEW
2893
      pVtbWindow->curWinBatchIdx = 0;
×
NEW
2894
      break;
×
2895
    }
UNCOV
2896
    default:
×
UNCOV
2897
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
UNCOV
2898
      break;
×
2899
  }
2900
  return 0;
2,790,229✔
2901
}
2902

2903
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
757,099✔
2904
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
2905
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
2906
  QRY_PARAM_CHECK(pOptrInfo);
757,099✔
2907

2908
  int32_t                    code = TSDB_CODE_SUCCESS;
757,099✔
2909
  int32_t                    line = 0;
757,099✔
2910
  __optr_fn_t                nextFp = NULL;
757,099✔
2911
  __optr_open_fn_t           openFp = NULL;
757,099✔
2912
  SOperatorInfo*             pOperator = NULL;
757,099✔
2913
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
757,099✔
2914
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
757,099✔
2915

2916
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
757,099✔
2917
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
757,099✔
2918

2919
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
757,099✔
2920

2921
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
757,099✔
2922
  QUERY_CHECK_CODE(code, line, _error);
757,099✔
2923

2924
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
757,099✔
2925
                  pInfo, pTaskInfo);
2926

2927
  pInfo->qType = pPhyciNode->qType;
757,099✔
2928
  switch (pInfo->qType) {
757,099✔
2929
    case DYN_QTYPE_STB_HASH:
505,939✔
2930
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
505,939✔
2931
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
505,939✔
2932
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
505,939✔
2933
      QUERY_CHECK_CODE(code, line, _error);
505,939✔
2934
      nextFp = seqStableJoin;
505,939✔
2935
      openFp = optrDummyOpenFn;
505,939✔
2936
      break;
505,939✔
2937
    case DYN_QTYPE_VTB_SCAN:
251,160✔
2938
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
251,160✔
2939
      QUERY_CHECK_CODE(code, line, _error);
251,160✔
2940
      nextFp = vtbScanNext;
251,160✔
2941
      openFp = vtbScanOpen;
251,160✔
2942
      break;
251,160✔
NEW
2943
    case DYN_QTYPE_VTB_WINDOW:
×
NEW
2944
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
×
NEW
2945
      QUERY_CHECK_CODE(code, line, _error);
×
NEW
2946
      nextFp = vtbWindowNext;
×
NEW
2947
      openFp = vtbWindowOpen;
×
UNCOV
2948
      break;
×
UNCOV
2949
    default:
×
2950
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
2951
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2952
      goto _error;
×
2953
  }
2954

2955
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
757,099✔
2956
                                         NULL, optrDefaultGetNextExtFn, NULL);
2957

2958
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
757,099✔
2959
  *pOptrInfo = pOperator;
757,099✔
2960
  return TSDB_CODE_SUCCESS;
757,099✔
2961

2962
_error:
×
2963
  if (pInfo != NULL) {
×
2964
    destroyDynQueryCtrlOperator(pInfo);
×
2965
  }
UNCOV
2966
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
UNCOV
2967
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
2968
  pTaskInfo->code = code;
×
2969
  return code;
×
2970
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc