• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4887

16 Dec 2025 08:27AM UTC coverage: 65.289% (-0.003%) from 65.292%
#4887

push

travis-ci

web-flow
feat[TS-7233]: audit (#33850)

377 of 536 new or added lines in 28 files covered. (70.34%)

1025 existing lines in 111 files now uncovered.

178977 of 274129 relevant lines covered (65.29%)

102580217.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.13
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
496,191✔
41
  SStbJoinTableList* pNext = NULL;
496,191✔
42
  
43
  while (pListHead) {
496,770✔
44
    taosMemoryFree(pListHead->pLeftVg);
579✔
45
    taosMemoryFree(pListHead->pLeftUid);
579✔
46
    taosMemoryFree(pListHead->pRightVg);
579✔
47
    taosMemoryFree(pListHead->pRightUid);
579✔
48
    pNext = pListHead->pNext;
579✔
49
    taosMemoryFree(pListHead);
579✔
50
    pListHead = pNext;
579✔
51
  }
52
}
496,191✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
496,191✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
496,191✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
496,191✔
60
    if (pStbJoin->ctx.prev.leftHash) {
495,519✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
414,263✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
414,263✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
495,519✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
414,263✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
414,263✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
672✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
672✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
672✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
672✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
672✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
672✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
496,191✔
81
}
496,191✔
82

83
void destroyOrgTbInfo(void *info) {
176,522✔
84
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
176,522✔
85
  if (pOrgTbInfo) {
176,522✔
86
    taosArrayDestroy(pOrgTbInfo->colMap);
176,522✔
87
  }
88
}
176,522✔
89

90
void destroyColRefInfo(void *info) {
33,903,966✔
91
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
33,903,966✔
92
  if (pColRefInfo) {
33,903,966✔
93
    taosMemoryFree(pColRefInfo->colName);
33,903,966✔
94
    taosMemoryFree(pColRefInfo->colrefName);
33,903,966✔
95
  }
96
}
33,903,966✔
97

98
void destroyColRefArray(void *info) {
41,532✔
99
  SArray *pColRefArray = *(SArray **)info;
41,532✔
100
  if (pColRefArray) {
41,532✔
101
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
41,532✔
102
  }
103
}
41,532✔
104

105
void freeUseDbOutput(void* pOutput) {
49,623✔
106
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
49,623✔
107
  if (NULL == pOutput) {
49,623✔
108
    return;
×
109
  }
110

111
  if (pOut->dbVgroup) {
49,623✔
112
    freeVgInfo(pOut->dbVgroup);
49,623✔
113
  }
114
  taosMemFree(pOut);
49,623✔
115
}
116

117
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
45,201✔
118
  if (pVtbScan->dbName) {
45,201✔
119
    taosMemoryFreeClear(pVtbScan->dbName);
45,201✔
120
  }
121
  if (pVtbScan->tbName) {
45,201✔
122
    taosMemoryFreeClear(pVtbScan->tbName);
45,201✔
123
  }
124
  if (pVtbScan->childTableList) {
45,201✔
125
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
45,201✔
126
  }
127
  if (pVtbScan->colRefInfo) {
45,201✔
UNCOV
128
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
UNCOV
129
    pVtbScan->colRefInfo = NULL;
×
130
  }
131
  if (pVtbScan->childTableMap) {
45,201✔
132
    taosHashCleanup(pVtbScan->childTableMap);
9,049✔
133
  }
134
  if (pVtbScan->readColList) {
45,201✔
135
    taosArrayDestroy(pVtbScan->readColList);
45,201✔
136
  }
137
  if (pVtbScan->dbVgInfoMap) {
45,201✔
138
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
45,201✔
139
    taosHashCleanup(pVtbScan->dbVgInfoMap);
45,201✔
140
  }
141
  if (pVtbScan->orgTbVgColMap) {
45,201✔
142
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
143
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
144
  }
145
  if (pVtbScan->pRsp) {
45,201✔
146
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
147
    taosMemoryFreeClear(pVtbScan->pRsp);
×
148
  }
149
  if (pVtbScan->existOrgTbVg) {
45,201✔
150
    taosHashCleanup(pVtbScan->existOrgTbVg);
45,201✔
151
  }
152
  if (pVtbScan->curOrgTbVg) {
45,201✔
153
    taosHashCleanup(pVtbScan->curOrgTbVg);
3,236✔
154
  }
155
  if (pVtbScan->newAddedVgInfo) {
45,201✔
156
    taosHashCleanup(pVtbScan->newAddedVgInfo);
1,608✔
157
  }
158
}
45,201✔
159

160
void destroyWinArray(void *info) {
×
161
  SArray *pWinArray = *(SArray **)info;
×
162
  if (pWinArray) {
×
163
    taosArrayDestroy(pWinArray);
×
164
  }
165
}
×
166

167
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
×
168
  if (pVtbWindow->pRes) {
×
169
    blockDataDestroy(pVtbWindow->pRes);
×
170
  }
171
  if (pVtbWindow->pWins) {
×
172
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
173
  }
174
}
×
175

176
static void destroyDynQueryCtrlOperator(void* param) {
540,075✔
177
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
540,075✔
178

179
  switch (pDyn->qType) {
540,075✔
180
    case DYN_QTYPE_STB_HASH:
494,874✔
181
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
494,874✔
182
      break;
494,874✔
183
    case DYN_QTYPE_VTB_SCAN:
45,201✔
184
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
45,201✔
185
      break;
45,201✔
186
    case DYN_QTYPE_VTB_WINDOW:
×
187
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
×
188
      break;
×
189
    default:
×
190
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
191
      break;
×
192
  }
193

194
  taosMemoryFreeClear(param);
540,075✔
195
}
540,075✔
196

197
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
198
  if (batchFetch) {
3,213,888✔
199
    return true;
3,211,200✔
200
  }
201
  
202
  if (rightTable) {
2,688✔
203
    return pPost->rightCurrUid == pPost->rightNextUid;
1,344✔
204
  }
205

206
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
1,344✔
207

208
  return (NULL == num) ? false : true;
1,344✔
209
}
210

211
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
1,606,944✔
212
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,606,944✔
213
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
1,606,944✔
214
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,606,944✔
215
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
1,606,944✔
216
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
1,606,944✔
217
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
1,606,944✔
218
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
1,606,944✔
219
  int64_t                    readIdx = pNode->readIdx + 1;
1,606,944✔
220
  int64_t                    rightPrevUid = pPost->rightCurrUid;
1,606,944✔
221

222
  pPost->leftCurrUid = *leftUid;
1,606,944✔
223
  pPost->rightCurrUid = *rightUid;
1,606,944✔
224

225
  pPost->leftVgId = *leftVgId;
1,606,944✔
226
  pPost->rightVgId = *rightVgId;
1,606,944✔
227

228
  while (true) {
229
    if (readIdx < pNode->uidNum) {
1,606,944✔
230
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
1,525,595✔
231
      break;
1,525,595✔
232
    }
233
    
234
    pNode = pNode->pNext;
81,349✔
235
    if (NULL == pNode) {
81,349✔
236
      pPost->rightNextUid = 0;
81,349✔
237
      break;
81,349✔
238
    }
239
    
240
    rightUid = pNode->pRightUid;
×
241
    readIdx = 0;
×
242
  }
243

244
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
3,213,888✔
245
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
3,213,888✔
246

247
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
1,606,944✔
248
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
249
    pStbJoin->execInfo.rightCacheNum++;
×
250
  }  
251

252
  return TSDB_CODE_SUCCESS;
1,606,944✔
253
}
254

255

256
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
3,213,888✔
257
  int32_t code = TSDB_CODE_SUCCESS;
3,213,888✔
258
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
3,213,888✔
259
  if (NULL == *ppRes) {
3,213,888✔
260
    code = terrno;
×
261
    freeOperatorParam(pChild, OP_GET_PARAM);
×
262
    return code;
×
263
  }
264
  if (pChild) {
3,213,888✔
265
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
165,200✔
266
    if (NULL == (*ppRes)->pChildren) {
165,200✔
267
      code = terrno;
×
268
      freeOperatorParam(pChild, OP_GET_PARAM);
×
269
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
270
      *ppRes = NULL;
×
271
      return code;
×
272
    }
273
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
330,400✔
274
      code = terrno;
×
275
      freeOperatorParam(pChild, OP_GET_PARAM);
×
276
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
277
      *ppRes = NULL;
×
278
      return code;
×
279
    }
280
  } else {
281
    (*ppRes)->pChildren = NULL;
3,048,688✔
282
  }
283

284
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
3,213,888✔
285
  if (NULL == pGc) {
3,213,888✔
286
    code = terrno;
×
287
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
288
    *ppRes = NULL;
×
289
    return code;
×
290
  }
291

292
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
3,213,888✔
293
  pGc->downstreamIdx = downstreamIdx;
3,213,888✔
294
  pGc->vgId = vgId;
3,213,888✔
295
  pGc->tbUid = tbUid;
3,213,888✔
296
  pGc->needCache = needCache;
3,213,888✔
297

298
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
3,213,888✔
299
  (*ppRes)->downstreamIdx = downstreamIdx;
3,213,888✔
300
  (*ppRes)->value = pGc;
3,213,888✔
301
  (*ppRes)->reUse = false;
3,213,888✔
302

303
  return TSDB_CODE_SUCCESS;
3,213,888✔
304
}
305

306

307
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
308
  int32_t code = TSDB_CODE_SUCCESS;
×
309
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
310
  if (NULL == *ppRes) {
×
311
    return terrno;
×
312
  }
313
  (*ppRes)->pChildren = NULL;
×
314

315
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
316
  if (NULL == pGc) {
×
317
    code = terrno;
×
318
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
319
    return code;
×
320
  }
321

322
  pGc->downstreamIdx = downstreamIdx;
×
323
  pGc->vgId = vgId;
×
324
  pGc->tbUid = tbUid;
×
325

326
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
327
  (*ppRes)->downstreamIdx = downstreamIdx;
×
328
  (*ppRes)->value = pGc;
×
329
  (*ppRes)->reUse = false;
×
330

331
  return TSDB_CODE_SUCCESS;
×
332
}
333

334
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid);
335
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
336
  int32_t                   code = TSDB_CODE_SUCCESS;
×
337
  int32_t                   lino = 0;
×
338
  SExchangeOperatorParam*   pExc = NULL;
×
339

340
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
341
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
342

343
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
344
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
345

346
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
347
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
×
348

349
  pExc->multiParams = false;
×
350
  pExc->basic.vgId = 0;
×
351
  pExc->basic.tableSeq = true;
×
352
  pExc->basic.isVtbRefScan = false;
×
353
  pExc->basic.isVtbTagScan = false;
×
354
  pExc->basic.isVtbWinScan = true;
×
355
  pExc->basic.isNewParam = true;
×
356
  pExc->basic.window.skey = skey;
×
357
  pExc->basic.window.ekey = ekey;
×
358
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
359
  pExc->basic.colMap = NULL;
×
360
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
361
  QUERY_CHECK_NULL(pExc->basic.uidList, code, lino, _return, terrno)
×
362

363
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
364
  (*ppRes)->downstreamIdx = 0;
×
365
  (*ppRes)->reUse = true;
×
366
  (*ppRes)->value = pExc;
×
367

368
  return code;
×
369
_return:
×
370
  qError("failed to build exchange operator param for external window, code:%d, line:%d", code, lino);
×
371
  if (pExc) {
×
372
   if (pExc->basic.uidList) {
×
373
      taosArrayDestroy(pExc->basic.uidList);
×
374
    }
375
    taosMemoryFreeClear(pExc);
×
376
  }
377
  if (*ppRes) {
×
378
    if ((*ppRes)->pChildren) {
×
379
      taosArrayDestroy((*ppRes)->pChildren);
×
380
    }
381
    taosMemoryFreeClear(*ppRes);
×
382
  }
383

384
  return code;
×
385
}
386

387
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
2,688✔
388
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
2,688✔
389
  if (NULL == *ppRes) {
2,688✔
390
    return terrno;
×
391
  }
392
  (*ppRes)->pChildren = NULL;
2,688✔
393
  
394
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
2,688✔
395
  if (NULL == pExc) {
2,688✔
396
    return terrno;
×
397
  }
398

399
  pExc->multiParams = false;
2,688✔
400
  pExc->basic.vgId = *pVgId;
2,688✔
401
  pExc->basic.tableSeq = true;
2,688✔
402
  pExc->basic.isVtbRefScan = false;
2,688✔
403
  pExc->basic.isVtbWinScan = false;
2,688✔
404
  pExc->basic.isVtbTagScan = false;
2,688✔
405
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
2,688✔
406
  pExc->basic.colMap = NULL;
2,688✔
407
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
2,688✔
408
  if (NULL == pExc->basic.uidList) {
2,688✔
409
    taosMemoryFree(pExc);
×
410
    return terrno;
×
411
  }
412
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
5,376✔
413
    taosArrayDestroy(pExc->basic.uidList);
×
414
    taosMemoryFree(pExc);
×
415
    return terrno;
×
416
  }
417

418
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
2,688✔
419
  (*ppRes)->downstreamIdx = downstreamIdx;
2,688✔
420
  (*ppRes)->value = pExc;
2,688✔
421
  (*ppRes)->reUse = false;
2,688✔
422

423
  return TSDB_CODE_SUCCESS;
2,688✔
424
}
425

426
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
148,478✔
427
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
148,478✔
428
  if (NULL == *ppRes) {
148,478✔
429
    return terrno;
×
430
  }
431
  (*ppRes)->pChildren = NULL;
148,478✔
432
  
433
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
148,478✔
434
  if (NULL == pExc) {
148,478✔
435
    taosMemoryFreeClear(*ppRes);
×
436
    return terrno;
×
437
  }
438

439
  pExc->multiParams = true;
148,478✔
440
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
148,478✔
441
  if (NULL == pExc->pBatchs) {
148,478✔
442
    taosMemoryFree(pExc);
×
443
    taosMemoryFreeClear(*ppRes);
×
444
    return terrno;
×
445
  }
446
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
148,478✔
447
  
448
  SExchangeOperatorBasicParam basic;
148,478✔
449
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
148,478✔
450

451
  int32_t iter = 0;
148,478✔
452
  void* p = NULL;
148,478✔
453
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
413,645✔
454
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
265,167✔
455
    SArray* pUidList = *(SArray**)p;
265,167✔
456
    basic.vgId = *pVgId;
265,167✔
457
    basic.uidList = pUidList;
265,167✔
458
    basic.colMap = NULL;
265,167✔
459
    basic.tableSeq = false;
265,167✔
460
    basic.isVtbRefScan = false;
265,167✔
461
    basic.isVtbWinScan = false;
265,167✔
462
    basic.isVtbTagScan = false;
265,167✔
463
    
464
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
265,167✔
465

466
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
265,167✔
467
    *(SArray**)p = NULL;
265,167✔
468
  }
469

470
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
148,478✔
471
  (*ppRes)->downstreamIdx = downstreamIdx;
148,478✔
472
  (*ppRes)->value = pExc;
148,478✔
473
  (*ppRes)->reUse = false;
148,478✔
474

475
  return TSDB_CODE_SUCCESS;
148,478✔
476
}
477

478
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
137,986✔
479
  int32_t                      code = TSDB_CODE_SUCCESS;
137,986✔
480
  int32_t                      lino = 0;
137,986✔
481
  SExchangeOperatorParam*      pExc = NULL;
137,986✔
482
  SExchangeOperatorBasicParam* basic = NULL;
137,986✔
483

484
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
137,986✔
485
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
137,986✔
486
  (*ppRes)->pChildren = NULL;
137,986✔
487

488
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
137,986✔
489
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
137,986✔
490

491
  pExc->multiParams = false;
137,986✔
492

493
  basic = &pExc->basic;
137,986✔
494
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
137,986✔
495

496
  basic->vgId = vgId;
137,986✔
497
  basic->tableSeq = false;
137,986✔
498
  basic->isVtbRefScan = false;
137,986✔
499
  basic->isVtbTagScan = true;
137,986✔
500
  basic->isVtbWinScan = false;
137,986✔
501
  basic->isNewDeployed = false;
137,986✔
502
  basic->colMap = NULL;
137,986✔
503

504
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
137,986✔
505
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
137,986✔
506
  QUERY_CHECK_NULL(taosArrayPush(basic->uidList, &uid), code, lino, _return, terrno)
275,972✔
507

508
  (*ppRes)->pChildren = NULL;
137,986✔
509

510
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
137,986✔
511
  (*ppRes)->downstreamIdx = downstreamIdx;
137,986✔
512
  (*ppRes)->value = pExc;
137,986✔
513
  (*ppRes)->reUse = true;
137,986✔
514

515
  return TSDB_CODE_SUCCESS;
137,986✔
516

517
_return:
×
518
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
519
  taosMemoryFreeClear(*ppRes);
×
520
  if (basic) {
×
521
    if (basic->colMap) {
×
522
      taosArrayDestroy(basic->colMap->colMap);
×
523
      taosMemoryFreeClear(basic->colMap);
×
524
    }
525
    if (basic->uidList) {
×
526
      taosArrayDestroy(basic->uidList);
×
527
    }
528
    taosMemoryFreeClear(basic);
×
529
  }
530
  taosMemoryFreeClear(pExc);
×
531
  return code;
×
532
}
533

534
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
172,100✔
535
  int32_t                      code = TSDB_CODE_SUCCESS;
172,100✔
536
  int32_t                      lino = 0;
172,100✔
537
  SExchangeOperatorParam*      pExc = NULL;
172,100✔
538
  SExchangeOperatorBasicParam* basic = NULL;
172,100✔
539

540
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
172,100✔
541
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
172,100✔
542
  (*ppRes)->pChildren = NULL;
172,100✔
543

544
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
172,100✔
545
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
172,100✔
546

547
  pExc->multiParams = false;
172,100✔
548

549
  basic = &pExc->basic;
172,100✔
550
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
172,100✔
551

552
  basic->vgId = pMap->vgId;
172,100✔
553
  basic->tableSeq = false;
172,100✔
554
  basic->isVtbRefScan = true;
172,100✔
555
  basic->isVtbWinScan = false;
172,100✔
556
  basic->isVtbTagScan = false;
172,100✔
557
  basic->isNewDeployed = false;
172,100✔
558
  basic->isNewParam = true;
172,100✔
559
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
172,100✔
560
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
172,100✔
561
  basic->colMap->vgId = pMap->vgId;
172,100✔
562
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
172,100✔
563
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
172,100✔
564
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
172,100✔
565

566
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
172,100✔
567
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
172,100✔
568

569
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
172,100✔
570
  (*ppRes)->downstreamIdx = downstreamIdx;
172,100✔
571
  (*ppRes)->value = pExc;
172,100✔
572
  (*ppRes)->reUse = true;
172,100✔
573

574
  return TSDB_CODE_SUCCESS;
172,100✔
575

576
_return:
×
577
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
578
  taosMemoryFreeClear(*ppRes);
×
579
  if (basic) {
×
580
    if (basic->colMap) {
×
581
      taosArrayDestroy(basic->colMap->colMap);
×
582
      taosMemoryFreeClear(basic->colMap);
×
583
    }
584
    if (basic->uidList) {
×
585
      taosArrayDestroy(basic->uidList);
×
586
    }
587
    taosMemoryFreeClear(basic);
×
588
  }
589
  taosMemoryFreeClear(pExc);
×
590
  return code;
×
591
}
592

593
static int32_t buildExchangeOperatorParamForVScanEx(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap, uint64_t taskId, SStreamTaskAddr* pTaskAddr) {
4,422✔
594
  int32_t                      code = TSDB_CODE_SUCCESS;
4,422✔
595
  int32_t                      lino = 0;
4,422✔
596
  SExchangeOperatorParam*      pExc = NULL;
4,422✔
597
  SExchangeOperatorBasicParam* basic = NULL;
4,422✔
598

599
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
4,422✔
600
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
4,422✔
601
  (*ppRes)->pChildren = NULL;
4,422✔
602

603
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
4,422✔
604
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
4,422✔
605

606
  pExc->multiParams = false;
4,422✔
607

608
  basic = &pExc->basic;
4,422✔
609
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
4,422✔
610

611
  basic->vgId = pMap->vgId;
4,422✔
612
  basic->tableSeq = false;
4,422✔
613
  basic->isVtbRefScan = true;
4,422✔
614
  basic->isVtbWinScan = false;
4,422✔
615
  basic->isVtbTagScan = false;
4,422✔
616
  basic->isNewDeployed = true;
4,422✔
617
  basic->isNewParam = true;
4,422✔
618
  basic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
4,422✔
619
  basic->newDeployedSrc.clientId = taskId;// current task's taskid
4,422✔
620
  basic->newDeployedSrc.taskId = pTaskAddr->taskId;
4,422✔
621
  basic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
4,422✔
622
  basic->newDeployedSrc.localExec = false;
4,422✔
623
  basic->newDeployedSrc.addr.nodeId = pTaskAddr->nodeId;
4,422✔
624
  memcpy(&basic->newDeployedSrc.addr.epSet, &pTaskAddr->epset, sizeof(SEpSet));
4,422✔
625
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
4,422✔
626
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
4,422✔
627
  basic->colMap->vgId = pMap->vgId;
4,422✔
628
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
4,422✔
629
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
4,422✔
630
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
4,422✔
631

632
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
4,422✔
633
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
4,422✔
634

635
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
4,422✔
636
  (*ppRes)->downstreamIdx = downstreamIdx;
4,422✔
637
  (*ppRes)->value = pExc;
4,422✔
638
  (*ppRes)->reUse = true;
4,422✔
639

640
  return TSDB_CODE_SUCCESS;
4,422✔
641

642
_return:
×
643
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
644
  taosMemoryFreeClear(*ppRes);
×
645
  if (basic) {
×
646
    if (basic->colMap) {
×
647
      taosArrayDestroy(basic->colMap->colMap);
×
648
      taosMemoryFreeClear(basic->colMap);
×
649
    }
650
    if (basic->uidList) {
×
651
      taosArrayDestroy(basic->uidList);
×
652
    }
653
    taosMemoryFreeClear(basic);
×
654
  }
655
  taosMemoryFreeClear(pExc);
×
656
  return code;
×
657
}
658

659
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
1,606,944✔
660
  int32_t code = TSDB_CODE_SUCCESS;
1,606,944✔
661
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,606,944✔
662
  if (NULL == *ppRes) {
1,606,944✔
663
    code = terrno;
×
664
    return code;
×
665
  }
666
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
1,606,944✔
667
  if (NULL == (*ppRes)->pChildren) {
1,606,944✔
668
    code = terrno;
×
669
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
670
    *ppRes = NULL;
×
671
    return code;
×
672
  }
673
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
3,213,888✔
674
    code = terrno;
×
675
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
676
    *ppRes = NULL;
×
677
    return code;
×
678
  }
679
  *ppChild0 = NULL;
1,606,944✔
680
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
3,213,888✔
681
    code = terrno;
×
682
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
683
    *ppRes = NULL;
×
684
    return code;
×
685
  }
686
  *ppChild1 = NULL;
1,606,944✔
687
  
688
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
1,606,944✔
689
  if (NULL == pJoin) {
1,606,944✔
690
    code = terrno;
×
691
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
692
    *ppRes = NULL;
×
693
    return code;
×
694
  }
695

696
  pJoin->initDownstream = initParam;
1,606,944✔
697
  
698
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
1,606,944✔
699
  (*ppRes)->value = pJoin;
1,606,944✔
700
  (*ppRes)->reUse = false;
1,606,944✔
701

702
  return TSDB_CODE_SUCCESS;
1,606,944✔
703
}
704

705
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
706
  int32_t code = TSDB_CODE_SUCCESS;
×
707
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
708
  if (NULL == *ppRes) {
×
709
    code = terrno;
×
710
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
711
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
712
    return code;
×
713
  }
714
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
715
  if (NULL == *ppRes) {
×
716
    code = terrno;
×
717
    taosMemoryFreeClear(*ppRes);
×
718
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
719
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
720
    return code;
×
721
  }
722
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
723
    code = terrno;
×
724
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
725
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
726
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
727
    *ppRes = NULL;
×
728
    return code;
×
729
  }
730
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
731
    code = terrno;
×
732
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
733
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
734
    *ppRes = NULL;
×
735
    return code;
×
736
  }
737
  
738
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
739
  (*ppRes)->value = NULL;
×
740
  (*ppRes)->reUse = false;
×
741

742
  return TSDB_CODE_SUCCESS;
×
743
}
744

745
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
14,034✔
746
  int32_t code = TSDB_CODE_SUCCESS;
14,034✔
747
  int32_t vgNum = tSimpleHashGetSize(pVg);
14,034✔
748
  if (vgNum <= 0 || vgNum > 1) {
14,034✔
749
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
750
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
751
  }
752

753
  int32_t iter = 0;
14,034✔
754
  void* p = NULL;
14,034✔
755
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
28,068✔
756
    SArray* pUidList = *(SArray**)p;
14,034✔
757

758
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
14,034✔
759
    if (code) {
14,034✔
760
      return code;
×
761
    }
762
    taosArrayDestroy(pUidList);
14,034✔
763
    *(SArray**)p = NULL;
14,034✔
764
  }
765
  
766
  return TSDB_CODE_SUCCESS;
14,034✔
767
}
768

769

770
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
771
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
772
  if (NULL == pUidList) {
×
773
    return terrno;
×
774
  }
775
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
776
    return terrno;
×
777
  }
778

779
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
780
  taosArrayDestroy(pUidList);
×
781
  if (code) {
×
782
    return code;
×
783
  }
784
  
785
  return TSDB_CODE_SUCCESS;
×
786
}
787

788
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
1,606,944✔
789
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
1,606,944✔
790
  SOperatorParam*             pSrcParam0 = NULL;
1,606,944✔
791
  SOperatorParam*             pSrcParam1 = NULL;
1,606,944✔
792
  SOperatorParam*             pGcParam0 = NULL;
1,606,944✔
793
  SOperatorParam*             pGcParam1 = NULL;  
1,606,944✔
794
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
1,606,944✔
795
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
1,606,944✔
796
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
1,606,944✔
797
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
1,606,944✔
798
  int32_t                     code = TSDB_CODE_SUCCESS;
1,606,944✔
799

800
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
1,606,944✔
801
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
802

803
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
1,606,944✔
804
  
805
  if (pInfo->stbJoin.basic.batchFetch) {
1,606,944✔
806
    if (pPrev->leftHash) {
1,605,600✔
807
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
81,256✔
808
      if (TSDB_CODE_SUCCESS == code) {
81,256✔
809
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
81,256✔
810
      }
811
      if (TSDB_CODE_SUCCESS == code) {
81,256✔
812
        tSimpleHashCleanup(pPrev->leftHash);
81,256✔
813
        tSimpleHashCleanup(pPrev->rightHash);
81,256✔
814
        pPrev->leftHash = NULL;
81,256✔
815
        pPrev->rightHash = NULL;
81,256✔
816
      }
817
    }
818
  } else {
819
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
1,344✔
820
    if (TSDB_CODE_SUCCESS == code) {
1,344✔
821
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
1,344✔
822
    }
823
  }
824

825
  bool initParam = pSrcParam0 ? true : false;
1,606,944✔
826
  if (TSDB_CODE_SUCCESS == code) {
1,606,944✔
827
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
1,606,944✔
828
    pSrcParam0 = NULL;
1,606,944✔
829
  }
830
  if (TSDB_CODE_SUCCESS == code) {
1,606,944✔
831
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
1,606,944✔
832
    pSrcParam1 = NULL;
1,606,944✔
833
  }
834
  if (TSDB_CODE_SUCCESS == code) {
1,606,944✔
835
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
1,606,944✔
836
  }
837
  if (TSDB_CODE_SUCCESS != code) {
1,606,944✔
838
    if (pSrcParam0) {
×
839
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
840
    }
841
    if (pSrcParam1) {
×
842
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
843
    }
844
    if (pGcParam0) {
×
845
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
846
    }
847
    if (pGcParam1) {
×
848
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
849
    }
850
    if (*ppParam) {
×
851
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
852
      *ppParam = NULL;
×
853
    }
854
  }
855
  
856
  return code;
1,606,944✔
857
}
858

859
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,606,944✔
860
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,606,944✔
861
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,606,944✔
862
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,606,944✔
863
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
1,606,944✔
864
  SOperatorParam*            pParam = NULL;
1,606,944✔
865
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
1,606,944✔
866
  if (TSDB_CODE_SUCCESS != code) {
1,606,944✔
867
    pOperator->pTaskInfo->code = code;
×
868
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
869
  }
870

871
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
1,606,944✔
872
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
1,606,944✔
873
  if (*ppRes && (code == 0)) {
1,606,944✔
874
    code = blockDataCheck(*ppRes);
245,234✔
875
    if (code) {
245,234✔
876
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
877
      pOperator->pTaskInfo->code = code;
×
878
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
879
    }
880
    pPost->isStarted = true;
245,234✔
881
    pStbJoin->execInfo.postBlkNum++;
245,234✔
882
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
245,234✔
883
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
245,234✔
884
  } else {
885
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
1,361,710✔
886
  }
887
}
1,606,944✔
888

889

890
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
891
  SOperatorParam* pGcParam = NULL;
×
892
  SOperatorParam* pMergeJoinParam = NULL;
×
893
  int32_t         downstreamId = leftTable ? 0 : 1;
×
894
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
895
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
896

897
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
898

899
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
900
  if (TSDB_CODE_SUCCESS != code) {
×
901
    return code;
×
902
  }
903
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
904
  if (TSDB_CODE_SUCCESS != code) {
×
905
    return code;
×
906
  }
907

908
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
909
}
910

911
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
1,606,365✔
912
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
1,606,365✔
913
  int32_t code = 0;
1,606,365✔
914
  
915
  pPost->isStarted = false;
1,606,365✔
916
  
917
  if (pStbJoin->basic.batchFetch) {
1,606,365✔
918
    return TSDB_CODE_SUCCESS;
1,605,021✔
919
  }
920
  
921
  if (pPost->leftNeedCache) {
1,344✔
922
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
923
    if (num && --(*num) <= 0) {
×
924
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
925
      if (code) {
×
926
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
927
        QRY_ERR_RET(code);
×
928
      }
929
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
930
    }
931
  }
932
  
933
  if (!pPost->rightNeedCache) {
1,344✔
934
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
1,344✔
935
    if (NULL != v) {
1,344✔
936
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
937
      if (code) {
×
938
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
939
        QRY_ERR_RET(code);
×
940
      }
941
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
942
    }
943
  }
944

945
  return TSDB_CODE_SUCCESS;
1,344✔
946
}
947

948

949
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
950
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
326,583✔
951
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
326,583✔
952
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
326,583✔
953

954
  if (!pPost->isStarted) {
326,583✔
955
    return TSDB_CODE_SUCCESS;
81,928✔
956
  }
957
  
958
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
244,655✔
959
  
960
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
244,655✔
961
  if (NULL == *ppRes) {
244,655✔
962
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
244,655✔
963
    pPrev->pListHead->readIdx++;
244,655✔
964
  } else {
965
    pInfo->stbJoin.execInfo.postBlkNum++;
×
966
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
967
  }
968

969
  return TSDB_CODE_SUCCESS;
244,655✔
970
}
971

972
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
973
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
3,215,832✔
974
  if (NULL == ppArray) {
3,215,832✔
975
    SArray* pArray = taosArrayInit(10, valSize);
279,201✔
976
    if (NULL == pArray) {
279,201✔
977
      return terrno;
×
978
    }
979
    if (NULL == taosArrayPush(pArray, pVal)) {
558,402✔
980
      taosArrayDestroy(pArray);
×
981
      return terrno;
×
982
    }
983
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
279,201✔
984
      taosArrayDestroy(pArray);      
×
985
      return terrno;
×
986
    }
987
    return TSDB_CODE_SUCCESS;
279,201✔
988
  }
989

990
  if (NULL == taosArrayPush(*ppArray, pVal)) {
5,873,262✔
991
    return terrno;
×
992
  }
993
  
994
  return TSDB_CODE_SUCCESS;
2,936,631✔
995
}
996

997
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
998
  int32_t code = TSDB_CODE_SUCCESS;
1,344✔
999
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
1,344✔
1000
  if (NULL == pNum) {
1,344✔
1001
    uint32_t n = 1;
1,344✔
1002
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
1,344✔
1003
    if (code) {
1,344✔
1004
      return code;
×
1005
    }
1006
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
1,344✔
1007
    if (code) {
1,344✔
1008
      return code;
×
1009
    }
1010
    return TSDB_CODE_SUCCESS;
1,344✔
1011
  }
1012

1013
  switch (*pNum) {
×
1014
    case 0:
×
1015
      break;
×
1016
    case UINT32_MAX:
×
1017
      *pNum = 0;
×
1018
      break;
×
1019
    default:
×
1020
      if (1 == (*pNum)) {
×
1021
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1022
        if (code) {
×
1023
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1024
          QRY_ERR_RET(code);
×
1025
        }
1026
      }
1027
      (*pNum)++;
×
1028
      break;
×
1029
  }
1030
  
1031
  return TSDB_CODE_SUCCESS;
×
1032
}
1033

1034

1035
static void freeStbJoinTableList(SStbJoinTableList* pList) {
81,349✔
1036
  if (NULL == pList) {
81,349✔
1037
    return;
×
1038
  }
1039
  taosMemoryFree(pList->pLeftVg);
81,349✔
1040
  taosMemoryFree(pList->pLeftUid);
81,349✔
1041
  taosMemoryFree(pList->pRightVg);
81,349✔
1042
  taosMemoryFree(pList->pRightUid);
81,349✔
1043
  taosMemoryFree(pList);
81,349✔
1044
}
1045

1046
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
81,928✔
1047
  int32_t code = TSDB_CODE_SUCCESS;
81,928✔
1048
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
81,928✔
1049
  if (NULL == pNew) {
81,928✔
1050
    return terrno;
×
1051
  }
1052
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
81,928✔
1053
  if (NULL == pNew->pLeftVg) {
81,928✔
1054
    code = terrno;
×
1055
    freeStbJoinTableList(pNew);
×
1056
    return code;
×
1057
  }
1058
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
81,928✔
1059
  if (NULL == pNew->pLeftUid) {
81,928✔
1060
    code = terrno;
×
1061
    freeStbJoinTableList(pNew);
×
1062
    return code;
×
1063
  }
1064
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
81,928✔
1065
  if (NULL == pNew->pRightVg) {
81,928✔
1066
    code = terrno;
×
1067
    freeStbJoinTableList(pNew);
×
1068
    return code;
×
1069
  }
1070
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
81,928✔
1071
  if (NULL == pNew->pRightUid) {
81,928✔
1072
    code = terrno;
×
1073
    freeStbJoinTableList(pNew);
×
1074
    return code;
×
1075
  }
1076

1077
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
81,928✔
1078
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
81,928✔
1079
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
81,928✔
1080
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
81,928✔
1081

1082
  pNew->readIdx = 0;
81,928✔
1083
  pNew->uidNum = rows;
81,928✔
1084
  pNew->pNext = NULL;
81,928✔
1085
  
1086
  if (pCtx->pListTail) {
81,928✔
1087
    pCtx->pListTail->pNext = pNew;
×
1088
    pCtx->pListTail = pNew;
×
1089
  } else {
1090
    pCtx->pListHead = pNew;
81,928✔
1091
    pCtx->pListTail= pNew;
81,928✔
1092
  }
1093

1094
  return TSDB_CODE_SUCCESS;
81,928✔
1095
}
1096

1097
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
81,928✔
1098
  int32_t                    code = TSDB_CODE_SUCCESS;
81,928✔
1099
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
81,928✔
1100
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
81,928✔
1101
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
81,928✔
1102
  if (NULL == pVg0) {
81,928✔
1103
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1104
  }
1105
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
81,928✔
1106
  if (NULL == pVg1) {
81,928✔
1107
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1108
  }
1109
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
81,928✔
1110
  if (NULL == pUid0) {
81,928✔
1111
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1112
  }
1113
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
81,928✔
1114
  if (NULL == pUid1) {
81,928✔
1115
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1116
  }
1117

1118
  if (pStbJoin->basic.batchFetch) {
81,928✔
1119
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
1,689,172✔
1120
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
1,607,916✔
1121
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
1,607,916✔
1122
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
1,607,916✔
1123
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
1,607,916✔
1124

1125
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
1,607,916✔
1126
      if (TSDB_CODE_SUCCESS != code) {
1,607,916✔
1127
        break;
×
1128
      }
1129
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
1,607,916✔
1130
      if (TSDB_CODE_SUCCESS != code) {
1,607,916✔
1131
        break;
×
1132
      }
1133
    }
1134
  } else {
1135
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2,016✔
1136
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
1,344✔
1137
    
1138
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
1,344✔
1139
      if (TSDB_CODE_SUCCESS != code) {
1,344✔
1140
        break;
×
1141
      }
1142
    }
1143
  }
1144

1145
  if (TSDB_CODE_SUCCESS == code) {
81,928✔
1146
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
81,928✔
1147
    if (TSDB_CODE_SUCCESS == code) {
81,928✔
1148
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
81,928✔
1149
    }
1150
  }
1151

1152
_return:
×
1153

1154
  if (TSDB_CODE_SUCCESS != code) {
81,928✔
1155
    pOperator->pTaskInfo->code = code;
×
1156
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1157
  }
1158
}
81,928✔
1159

1160

1161
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
495,752✔
1162
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
495,752✔
1163
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
495,752✔
1164

1165
  if (pStbJoin->basic.batchFetch) {
495,752✔
1166
    return;
495,080✔
1167
  }
1168

1169
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
672✔
1170
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
672✔
1171
    return;
672✔
1172
  }
1173

1174
  uint64_t* pUid = NULL;
×
1175
  int32_t iter = 0;
×
1176
  int32_t code = 0;
×
1177
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1178
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1179
    if (code) {
×
1180
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1181
    }
1182
  }
1183

1184
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1185
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1186

1187
/*
1188
  // debug only
1189
  iter = 0;
1190
  uint32_t* num = NULL;
1191
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1192
    A S S E R T(*num > 1);
1193
  }
1194
*/  
1195
}
1196

1197
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
495,752✔
1198
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
495,752✔
1199
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
495,752✔
1200

1201
  while (true) {
81,928✔
1202
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
577,680✔
1203
    if (NULL == pBlock) {
577,680✔
1204
      break;
495,752✔
1205
    }
1206

1207
    pStbJoin->execInfo.prevBlkNum++;
81,928✔
1208
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
81,928✔
1209
    
1210
    doBuildStbJoinTableHash(pOperator, pBlock);
81,928✔
1211
  }
1212

1213
  postProcessStbJoinTableHash(pOperator);
495,752✔
1214

1215
  pStbJoin->ctx.prev.joinBuild = true;
495,752✔
1216
}
495,752✔
1217

1218
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
326,583✔
1219
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
326,583✔
1220
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
326,583✔
1221
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
326,583✔
1222
  SStbJoinTableList*         pNode = pPrev->pListHead;
326,583✔
1223

1224
  while (pNode) {
1,769,642✔
1225
    if (pNode->readIdx >= pNode->uidNum) {
1,688,293✔
1226
      pPrev->pListHead = pNode->pNext;
81,349✔
1227
      freeStbJoinTableList(pNode);
81,349✔
1228
      pNode = pPrev->pListHead;
81,349✔
1229
      continue;
81,349✔
1230
    }
1231
    
1232
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
1,606,944✔
1233
    if (*ppRes) {
1,606,944✔
1234
      return TSDB_CODE_SUCCESS;
245,234✔
1235
    }
1236

1237
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
1,361,710✔
1238
    pPrev->pListHead->readIdx++;
1,361,710✔
1239
  }
1240

1241
  *ppRes = NULL;
81,349✔
1242
  setOperatorCompleted(pOperator);
81,349✔
1243

1244
  return TSDB_CODE_SUCCESS;
81,349✔
1245
}
1246

1247
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
740,407✔
1248
  if (pBlock) {
740,407✔
1249
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
245,234✔
1250
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
245,234✔
1251
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
245,234✔
1252

1253
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
248,012✔
1254
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
2,778✔
1255
        if (pSlot == NULL) {
2,778✔
1256
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1257
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1258
        }
1259
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
2,778✔
1260
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
2,778✔
1261
        if (code != TSDB_CODE_SUCCESS) {
2,778✔
1262
          return code;
×
1263
        }
1264
        code = blockDataAppendColInfo(pBlock, &colInfo);
2,778✔
1265
        if (code != TSDB_CODE_SUCCESS) {
2,778✔
1266
          return code;
×
1267
        }
1268
      }
1269
    } else {
1270
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1271
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1272
    }
1273
  }
1274
  return TSDB_CODE_SUCCESS;
740,407✔
1275
}
1276

1277
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
753,535✔
1278
  int32_t                    code = TSDB_CODE_SUCCESS;
753,535✔
1279
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
753,535✔
1280
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
753,535✔
1281

1282
  QRY_PARAM_CHECK(pRes);
753,535✔
1283
  if (pOperator->status == OP_EXEC_DONE) {
753,535✔
1284
    return code;
13,128✔
1285
  }
1286

1287
  int64_t st = 0;
740,407✔
1288
  if (pOperator->cost.openCost == 0) {
740,407✔
1289
    st = taosGetTimestampUs();
494,874✔
1290
  }
1291

1292
  if (!pStbJoin->ctx.prev.joinBuild) {
740,407✔
1293
    buildStbJoinTableList(pOperator);
495,752✔
1294
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
495,752✔
1295
      setOperatorCompleted(pOperator);
413,824✔
1296
      goto _return;
413,824✔
1297
    }
1298
  }
1299

1300
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
326,583✔
1301
  if (*pRes) {
326,583✔
1302
    goto _return;
×
1303
  }
1304

1305
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
326,583✔
1306

1307
_return:
326,583✔
1308
  if (pOperator->cost.openCost == 0) {
740,407✔
1309
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
494,874✔
1310
  }
1311

1312
  if (code) {
740,407✔
1313
    qError("%s failed since %s", __func__, tstrerror(code));
×
1314
    pOperator->pTaskInfo->code = code;
×
1315
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1316
  } else {
1317
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
740,407✔
1318
  }
1319
  return code;
740,407✔
1320
}
1321

1322
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
137,986✔
1323
  int32_t                   code = TSDB_CODE_SUCCESS;
137,986✔
1324
  int32_t                   lino = 0;
137,986✔
1325
  SVTableScanOperatorParam* pVScan = NULL;
137,986✔
1326
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
137,986✔
1327
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
137,986✔
1328

1329
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
137,986✔
1330
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
137,986✔
1331

1332
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
137,986✔
1333
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
137,986✔
1334
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
137,986✔
1335
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
137,986✔
1336
  pVScan->uid = uid;
137,986✔
1337
  pVScan->window = pInfo->vtbScan.window;
137,986✔
1338

1339
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
137,986✔
1340
  (*ppRes)->downstreamIdx = 0;
137,986✔
1341
  (*ppRes)->value = pVScan;
137,986✔
1342
  (*ppRes)->reUse = false;
137,986✔
1343

1344
  return TSDB_CODE_SUCCESS;
137,986✔
1345
_return:
×
1346
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1347
  if (pVScan) {
×
1348
    taosArrayDestroy(pVScan->pOpParamArray);
×
1349
    taosMemoryFreeClear(pVScan);
×
1350
  }
1351
  if (*ppRes) {
×
1352
    taosArrayDestroy((*ppRes)->pChildren);
×
1353
    taosMemoryFreeClear(*ppRes);
×
1354
  }
1355
  return code;
×
1356
}
1357

1358
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
49,623✔
1359
  int32_t                    lino = 0;
49,623✔
1360
  SOperatorInfo*             operator=(SOperatorInfo*) param;
49,623✔
1361
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
49,623✔
1362

1363
  if (TSDB_CODE_SUCCESS != code) {
49,623✔
1364
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1365
    if (operator->pTaskInfo->code != code) {
×
1366
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1367
             tstrerror(operator->pTaskInfo->code));
1368
    } else {
1369
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1370
    }
1371
    goto _return;
×
1372
  }
1373

1374
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
49,623✔
1375
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
49,623✔
1376

1377
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
49,623✔
1378
  QUERY_CHECK_CODE(code, lino, _return);
49,623✔
1379

1380
  taosMemoryFreeClear(pMsg->pData);
49,623✔
1381

1382
  code = tsem_post(&pScanResInfo->vtbScan.ready);
49,623✔
1383
  QUERY_CHECK_CODE(code, lino, _return);
49,623✔
1384

1385
  return code;
49,623✔
1386
_return:
×
1387
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1388
  return code;
×
1389
}
1390

1391
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
49,623✔
1392
  int32_t                    code = TSDB_CODE_SUCCESS;
49,623✔
1393
  int32_t                    lino = 0;
49,623✔
1394
  char*                      buf1 = NULL;
49,623✔
1395
  SUseDbReq*                 pReq = NULL;
49,623✔
1396
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
49,623✔
1397

1398
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
49,623✔
1399
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
49,623✔
1400
  code = tNameGetFullDbName(name, pReq->db);
49,623✔
1401
  QUERY_CHECK_CODE(code, lino, _return);
49,623✔
1402
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
49,623✔
1403
  buf1 = taosMemoryCalloc(1, contLen);
49,623✔
1404
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
49,623✔
1405
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
49,623✔
1406
  if (tempRes < 0) {
49,623✔
1407
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1408
  }
1409

1410
  // send the fetch remote task result request
1411
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
49,623✔
1412
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
49,623✔
1413

1414
  pMsgSendInfo->param = pOperator;
49,623✔
1415
  pMsgSendInfo->msgInfo.pData = buf1;
49,623✔
1416
  pMsgSendInfo->msgInfo.len = contLen;
49,623✔
1417
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
49,623✔
1418
  pMsgSendInfo->fp = dynProcessUseDbRsp;
49,623✔
1419
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
49,623✔
1420

1421
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
49,623✔
1422
  QUERY_CHECK_CODE(code, lino, _return);
49,623✔
1423

1424
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
49,623✔
1425
  QUERY_CHECK_CODE(code, lino, _return);
49,623✔
1426

1427
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
49,623✔
1428
  QUERY_CHECK_CODE(code, lino, _return);
49,623✔
1429

1430
_return:
49,623✔
1431
  if (code) {
49,623✔
1432
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1433
     taosMemoryFree(buf1);
×
1434
  }
1435
  taosMemoryFree(pReq);
49,623✔
1436
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
49,623✔
1437
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
49,623✔
1438
  return code;
49,623✔
1439
}
1440

1441
int dynVgInfoComp(const void* lp, const void* rp) {
80,762✔
1442
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
80,762✔
1443
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
80,762✔
1444
  if (pLeft->hashBegin < pRight->hashBegin) {
80,762✔
1445
    return -1;
80,762✔
1446
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1447
    return 1;
×
1448
  }
1449

1450
  return 0;
×
1451
}
1452

1453
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
224,198✔
1454
  if (NULL == dbInfo) {
224,198✔
1455
    return TSDB_CODE_SUCCESS;
×
1456
  }
1457

1458
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
224,198✔
1459
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
49,623✔
1460
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
49,623✔
1461
    if (NULL == dbInfo->vgArray) {
49,623✔
1462
      return terrno;
×
1463
    }
1464

1465
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
49,623✔
1466
    while (pIter) {
139,627✔
1467
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
180,008✔
1468
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1469
        return terrno;
×
1470
      }
1471

1472
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
90,004✔
1473
    }
1474

1475
    taosArraySort(dbInfo->vgArray, sort_func);
49,623✔
1476
  }
1477

1478
  return TSDB_CODE_SUCCESS;
224,198✔
1479
}
1480

1481
int32_t dynHashValueComp(void const* lp, void const* rp) {
254,457✔
1482
  uint32_t*    key = (uint32_t*)lp;
254,457✔
1483
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
254,457✔
1484

1485
  if (*key < pVg->hashBegin) {
254,457✔
1486
    return -1;
×
1487
  } else if (*key > pVg->hashEnd) {
254,457✔
1488
    return 1;
30,259✔
1489
  }
1490

1491
  return 0;
224,198✔
1492
}
1493

1494
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
224,198✔
1495
  int32_t code = 0;
224,198✔
1496
  int32_t lino = 0;
224,198✔
1497
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
224,198✔
1498
  QUERY_CHECK_CODE(code, lino, _return);
224,198✔
1499

1500
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
224,198✔
1501
  if (vgNum <= 0) {
224,198✔
1502
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1503
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1504
  }
1505

1506
  SVgroupInfo* vgInfo = NULL;
224,198✔
1507
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
224,198✔
1508
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
224,198✔
1509
  int32_t offset = (int32_t)strlen(tbFullName);
224,198✔
1510

1511
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
224,198✔
1512
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
448,396✔
1513
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
224,198✔
1514

1515
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
224,198✔
1516
  if (NULL == vgInfo) {
224,198✔
1517
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1518
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1519
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1520
  }
1521

1522
  *vgId = vgInfo->vgId;
224,198✔
1523

1524
_return:
224,198✔
1525
  return code;
224,198✔
1526
}
1527

1528
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
33,638,000✔
1529
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
33,638,000✔
1530
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
33,638,000✔
1531
  SArray *                   pColList = pVtbScan->readColList;
33,638,000✔
1532
  if (pVtbScan->scanAllCols) {
33,638,000✔
UNCOV
1533
    return true;
×
1534
  }
1535
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
168,025,891✔
1536
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
134,809,579✔
1537
      return true;
421,688✔
1538
    }
1539
  }
1540
  return false;
33,216,312✔
1541
}
1542

1543
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
469,364✔
1544
  int32_t                    code = TSDB_CODE_SUCCESS;
469,364✔
1545
  int32_t                    line = 0;
469,364✔
1546
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
469,364✔
1547
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
469,364✔
1548
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
469,364✔
1549
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
469,364✔
1550
  SUseDbOutput*              output = NULL;
469,364✔
1551
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
469,364✔
1552

1553
  QRY_PARAM_CHECK(dbVgInfo);
469,364✔
1554

1555
  if (find == NULL) {
469,364✔
1556
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
49,623✔
1557
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
49,623✔
1558
    QUERY_CHECK_CODE(code, line, _return);
49,623✔
1559
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
49,623✔
1560
    QUERY_CHECK_CODE(code, line, _return);
49,623✔
1561
  } else {
1562
    output = *find;
419,741✔
1563
  }
1564

1565
  *dbVgInfo = output->dbVgroup;
469,364✔
1566
  return code;
469,364✔
1567
_return:
×
1568
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1569
  freeUseDbOutput(output);
×
1570
  return code;
×
1571
}
1572

1573
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
469,364✔
1574
  int32_t     code = TSDB_CODE_SUCCESS;
469,364✔
1575
  int32_t     line = 0;
469,364✔
1576

1577
  const char *first_dot = strchr(colref, '.');
469,364✔
1578
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
469,364✔
1579

1580
  const char *second_dot = strchr(first_dot + 1, '.');
469,364✔
1581
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
469,364✔
1582

1583
  size_t db_len = first_dot - colref;
469,364✔
1584
  size_t table_len = second_dot - first_dot - 1;
469,364✔
1585
  size_t col_len = strlen(second_dot + 1);
469,364✔
1586

1587
  *refDb = taosMemoryMalloc(db_len + 1);
469,364✔
1588
  *refTb = taosMemoryMalloc(table_len + 1);
469,364✔
1589
  *refCol = taosMemoryMalloc(col_len + 1);
469,364✔
1590
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
469,364✔
1591
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
469,364✔
1592
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
469,364✔
1593

1594
  tstrncpy(*refDb, colref, db_len + 1);
469,364✔
1595
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
469,364✔
1596
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
469,364✔
1597

1598
  return TSDB_CODE_SUCCESS;
469,364✔
1599
_return:
×
1600
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1601
  if (*refDb) {
×
1602
    taosMemoryFree(*refDb);
×
1603
    *refDb = NULL;
×
1604
  }
1605
  if (*refTb) {
×
1606
    taosMemoryFree(*refTb);
×
1607
    *refTb = NULL;
×
1608
  }
1609
  if (*refCol) {
×
1610
    taosMemoryFree(*refCol);
×
1611
    *refCol = NULL;
×
1612
  }
1613
  return code;
×
1614
}
1615

1616
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
40,548,002✔
1617
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
40,548,002✔
1618
      strlen(expectTbName) == varDataLen(tbName) &&
33,903,966✔
1619
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
33,903,966✔
1620
      strlen(expectDbName) == varDataLen(dbName)) {
33,903,966✔
1621
    return true;
33,903,966✔
1622
  }
1623
  return false;
6,644,036✔
1624
}
1625

1626
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
33,903,966✔
1627
  int32_t          code = TSDB_CODE_SUCCESS;
33,903,966✔
1628
  int32_t          line = 0;
33,903,966✔
1629

1630
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
33,903,966✔
1631
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
33,903,966✔
1632
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
33,903,966✔
1633
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
33,903,966✔
1634
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
33,903,966✔
1635
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
33,903,966✔
1636

1637
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
33,903,966✔
1638
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
33,903,966✔
1639
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
33,903,966✔
1640
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
33,903,966✔
1641
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
33,903,966✔
1642
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
33,903,966✔
1643

1644
  if (colDataIsNull_s(pRefCol, index)) {
67,807,932✔
1645
    pInfo->colrefName = NULL;
254,308✔
1646
  } else {
1647
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
33,649,658✔
1648
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
33,649,658✔
1649
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
33,649,658✔
1650
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
33,649,658✔
1651
  }
1652

1653
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
33,903,966✔
1654
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
33,903,966✔
1655
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
33,903,966✔
1656
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
33,903,966✔
1657

1658
  if (!colDataIsNull_s(pUidCol, index)) {
67,807,932✔
1659
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
33,903,966✔
1660
  }
1661
  if (!colDataIsNull_s(pColIdCol, index)) {
67,807,932✔
1662
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
33,649,658✔
1663
  }
1664
  if (!colDataIsNull_s(pVgIdCol, index)) {
67,807,932✔
1665
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
33,903,966✔
1666
  }
1667

1668
_return:
×
1669
  return code;
33,903,966✔
1670
}
1671

1672
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
121,623✔
1673
  int32_t                    code = TSDB_CODE_SUCCESS;
121,623✔
1674
  int32_t                    line = 0;
121,623✔
1675

1676
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
121,623✔
1677
    return code;
9,049✔
1678
  }
1679

1680
  if (pVtbScan->existOrgTbVg == NULL) {
112,574✔
1681
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
1682
    pVtbScan->curOrgTbVg = NULL;
×
1683
  }
1684

1685
  if (pVtbScan->curOrgTbVg != NULL) {
112,574✔
1686
    // which means rversion has changed
1687
    void*   pCurIter = NULL;
18,532✔
1688
    SArray* tmpArray = NULL;
18,532✔
1689
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
53,546✔
1690
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
35,014✔
1691
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
35,014✔
1692
        if (tmpArray == NULL) {
4,422✔
1693
          tmpArray = taosArrayInit(1, sizeof(int32_t));
4,422✔
1694
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
4,422✔
1695
        }
1696
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
4,422✔
1697
      }
1698
    }
1699
    if (tmpArray == NULL) {
18,532✔
1700
      return TSDB_CODE_SUCCESS;
14,110✔
1701
    }
1702
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
4,422✔
1703
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
4,422✔
1704
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
4,422✔
1705
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
1706
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
1707
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
1708
        }
1709
        taosArrayDestroy(expiredInfo);
×
1710
      }
1711
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
4,422✔
1712
        taosArrayDestroy(tmpArray);
×
1713
      }
1714
    }
1715
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
4,422✔
1716
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
4,422✔
1717
    taosHashClear(pVtbScan->curOrgTbVg);
4,422✔
1718
    pVtbScan->needRedeploy = true;
4,422✔
1719
    pVtbScan->rversion = rversion;
4,422✔
1720
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
4,422✔
1721
  }
1722
  return code;
94,042✔
1723
_return:
×
1724
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1725
  return code;
×
1726
}
1727

1728
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
47,676✔
1729
  int32_t                    code =TSDB_CODE_SUCCESS;
47,676✔
1730
  int32_t                    line = 0;
47,676✔
1731
  char*                      refDbName = NULL;
47,676✔
1732
  char*                      refTbName = NULL;
47,676✔
1733
  char*                      refColName = NULL;
47,676✔
1734
  SDBVgInfo*                 dbVgInfo = NULL;
47,676✔
1735
  SName                      name = {0};
47,676✔
1736
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
47,676✔
1737
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
47,676✔
1738

1739
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
47,676✔
1740
  QUERY_CHECK_CODE(code, line, _return);
47,676✔
1741

1742
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
47,676✔
1743

1744
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
47,676✔
1745
  QUERY_CHECK_CODE(code, line, _return);
47,676✔
1746

1747
  code = tNameGetFullDbName(&name, dbFname);
47,676✔
1748
  QUERY_CHECK_CODE(code, line, _return);
47,676✔
1749

1750
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
47,676✔
1751
  QUERY_CHECK_CODE(code, line, _return);
47,676✔
1752

1753
_return:
47,676✔
1754
  taosMemoryFree(refDbName);
47,676✔
1755
  taosMemoryFree(refTbName);
47,676✔
1756
  taosMemoryFree(refColName);
47,676✔
1757
  return code;
47,676✔
1758
}
1759

1760
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
19,541✔
1761
  int32_t                    code = TSDB_CODE_SUCCESS;
19,541✔
1762
  int32_t                    line = 0;
19,541✔
1763
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
19,541✔
1764
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
19,541✔
1765
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
19,541✔
1766
  SArray*                    pColRefArray = NULL;
19,541✔
1767
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
19,541✔
1768

1769
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
19,541✔
1770
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
19,541✔
1771

1772
  while (true) {
44,299✔
1773
    SSDataBlock *pChildInfo = NULL;
63,840✔
1774
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
63,840✔
1775
    QUERY_CHECK_CODE(code, line, _return);
63,840✔
1776
    if (pChildInfo == NULL) {
63,840✔
1777
      break;
19,541✔
1778
    }
1779
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
44,299✔
1780
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
44,299✔
1781
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
44,299✔
1782

1783
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
44,299✔
1784
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
44,299✔
1785
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
44,299✔
1786

1787
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
33,758,645✔
1788
      if (!colDataIsNull_s(pStbNameCol, i)) {
67,428,692✔
1789
        char* stbrawname = colDataGetData(pStbNameCol, i);
33,714,346✔
1790
        char* dbrawname = colDataGetData(pDbNameCol, i);
33,714,346✔
1791
        char *ctbName = colDataGetData(pTableNameCol, i);
33,714,346✔
1792

1793
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
33,714,346✔
1794
          SColRefInfo info = {0};
33,400,366✔
1795
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
33,400,366✔
1796
          QUERY_CHECK_CODE(code, line, _return);
33,400,366✔
1797

1798
          if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
33,400,366✔
1799
            qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
1800
                   pInfo->vtbScan.dynTbUid);
1801

1802
            destroyColRefInfo(&info);
×
1803
            continue;
×
1804
          }
1805

1806
          if (pTaskInfo->pStreamRuntimeInfo) {
33,400,366✔
1807
            if (pVtbScan->curOrgTbVg == NULL) {
64,640✔
1808
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
2,432✔
1809
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
2,432✔
1810
            }
1811

1812
            if (info.colrefName) {
64,640✔
1813
              int32_t vgId;
34,812✔
1814
              code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
34,812✔
1815
              QUERY_CHECK_CODE(code, line, _return);
34,812✔
1816
              code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
34,812✔
1817
              QUERY_CHECK_CODE(code, line, _return);
34,812✔
1818
            }
1819

1820
          }
1821

1822
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
33,400,366✔
1823
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
41,532✔
1824
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
41,532✔
1825
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
83,064✔
1826
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
41,532✔
1827
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
83,064✔
1828
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
41,532✔
1829
            QUERY_CHECK_CODE(code, line, _return);
41,532✔
1830
          } else {
1831
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
33,358,834✔
1832
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
33,358,834✔
1833
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
33,358,834✔
1834
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
33,358,834✔
1835
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
66,717,668✔
1836
          }
1837
        }
1838
      }
1839
    }
1840
  }
1841

1842
  code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
19,541✔
1843
  QUERY_CHECK_CODE(code, line, _return);
19,541✔
1844

1845
_return:
19,541✔
1846
  if (code) {
19,541✔
1847
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,010✔
1848
  }
1849
  return code;
19,541✔
1850
}
1851

1852
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
102,082✔
1853
  int32_t                    code = TSDB_CODE_SUCCESS;
102,082✔
1854
  int32_t                    line = 0;
102,082✔
1855
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
102,082✔
1856
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
102,082✔
1857
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
102,082✔
1858
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
102,082✔
1859
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
102,082✔
1860
  int32_t                    rversion = 0;
102,082✔
1861

1862
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
102,082✔
1863
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
102,082✔
1864

1865
  while (true) {
197,354✔
1866
    SSDataBlock *pTableInfo = NULL;
299,436✔
1867
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
299,436✔
1868
    if (pTableInfo == NULL) {
299,436✔
1869
      break;
102,082✔
1870
    }
1871

1872
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
197,354✔
1873
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
197,354✔
1874
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
197,354✔
1875

1876
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
197,354✔
1877
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
197,354✔
1878
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
197,354✔
1879

1880
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
7,031,010✔
1881
      if (!colDataIsNull_s(pRefVerCol, i)) {
13,667,312✔
1882
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
6,833,656✔
1883
      }
1884

1885
      if (!colDataIsNull_s(pTableNameCol, i)) {
13,667,312✔
1886
        char* tbrawname = colDataGetData(pTableNameCol, i);
6,833,656✔
1887
        char* dbrawname = colDataGetData(pDbNameCol, i);
6,833,656✔
1888
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
6,833,656✔
1889
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
6,833,656✔
1890

1891
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
6,833,656✔
1892
          SColRefInfo info = {0};
503,600✔
1893
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
503,600✔
1894
          QUERY_CHECK_CODE(code, line, _return);
503,600✔
1895

1896
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
503,600✔
1897
            if (pVtbScan->curOrgTbVg == NULL) {
12,864✔
1898
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
804✔
1899
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
804✔
1900
            }
1901
            int32_t vgId;
12,864✔
1902
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
12,864✔
1903
            QUERY_CHECK_CODE(code, line, _return);
12,864✔
1904
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
12,864✔
1905
            QUERY_CHECK_CODE(code, line, _return);
12,864✔
1906
          }
1907

1908
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
1,007,200✔
1909
        }
1910
      }
1911
    }
1912
  }
1913
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
102,082✔
1914
  QUERY_CHECK_CODE(code, line, _return);
102,082✔
1915

1916
_return:
99,670✔
1917
  if (code) {
102,082✔
1918
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
2,412✔
1919
  }
1920
  return code;
102,082✔
1921
}
1922

1923
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
1,833,522✔
1924
  int32_t                    code = TSDB_CODE_SUCCESS;
1,833,522✔
1925
  int32_t                    line = 0;
1,833,522✔
1926
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,833,522✔
1927
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,833,522✔
1928
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,833,522✔
1929

1930
  SArray *tmpArray = NULL;
1,833,522✔
1931
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
1,833,522✔
1932
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
1,833,522✔
1933
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
8,844✔
1934
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
4,422✔
1935
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
4,422✔
1936
      QUERY_CHECK_CODE(code, line, _return);
4,422✔
1937
      if (pVtbScan->newAddedVgInfo == NULL) {
4,422✔
1938
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
1,608✔
1939
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
1,608✔
1940
      }
1941
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
4,422✔
1942
      QUERY_CHECK_CODE(code, line, _return);
4,422✔
1943
    }
1944
    pVtbScan->needRedeploy = false;
4,422✔
1945
  } else {
1946
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
1,829,100✔
1947
    QUERY_CHECK_CODE(code, line, _return);
1,829,100✔
1948
  }
1949

1950
_return:
×
1951
  taosArrayClear(tmpArray);
1,833,522✔
1952
  taosArrayDestroy(tmpArray);
1,833,522✔
1953
  if (code) {
1,833,522✔
1954
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,829,100✔
1955
  }
1956
  return code;
1,833,522✔
1957
}
1958

1959
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
137,986✔
1960
  int32_t                    code = TSDB_CODE_SUCCESS;
137,986✔
1961
  int32_t                    line = 0;
137,986✔
1962
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
137,986✔
1963
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
137,986✔
1964
  SDBVgInfo*                 dbVgInfo = NULL;
137,986✔
1965

1966
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
34,019,440✔
1967
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
33,881,454✔
1968
    *uid = pKV->uid;
33,881,454✔
1969
    *vgId = pKV->vgId;
33,881,454✔
1970
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
33,881,454✔
1971
      char*   refDbName = NULL;
421,688✔
1972
      char*   refTbName = NULL;
421,688✔
1973
      char*   refColName = NULL;
421,688✔
1974
      SName   name = {0};
421,688✔
1975
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
421,688✔
1976
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
421,688✔
1977

1978
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
421,688✔
1979
      QUERY_CHECK_CODE(code, line, _return);
421,688✔
1980

1981
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
421,688✔
1982

1983
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
421,688✔
1984
      QUERY_CHECK_CODE(code, line, _return);
421,688✔
1985
      code = tNameGetFullDbName(&name, dbFname);
421,688✔
1986
      QUERY_CHECK_CODE(code, line, _return);
421,688✔
1987
      code = tNameGetFullTableName(&name, orgTbFName);
421,688✔
1988
      QUERY_CHECK_CODE(code, line, _return);
421,688✔
1989

1990
      void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
421,688✔
1991
      if (!pVal) {
421,688✔
1992
        SOrgTbInfo map = {0};
176,522✔
1993
        code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
176,522✔
1994
        QUERY_CHECK_CODE(code, line, _return);
176,522✔
1995
        tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
176,522✔
1996
        map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
176,522✔
1997
        QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno)
176,522✔
1998
        SColIdNameKV colIdNameKV = {0};
176,522✔
1999
        colIdNameKV.colId = pKV->colId;
176,522✔
2000
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
176,522✔
2001
        QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno)
353,044✔
2002
        code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
176,522✔
2003
        QUERY_CHECK_CODE(code, line, _return);
176,522✔
2004
      } else {
2005
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
245,166✔
2006
        SColIdNameKV colIdNameKV = {0};
245,166✔
2007
        colIdNameKV.colId = pKV->colId;
245,166✔
2008
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
245,166✔
2009
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
490,332✔
2010
      }
2011
      taosMemoryFree(refDbName);
421,688✔
2012
      taosMemoryFree(refTbName);
421,688✔
2013
      taosMemoryFree(refColName);
421,688✔
2014
    }
2015
  }
2016

2017
_return:
137,986✔
2018
  if (code) {
137,986✔
2019
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2020
  }
2021
  return code;
137,986✔
2022
}
2023

2024
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
137,986✔
2025
  int32_t                    code = TSDB_CODE_SUCCESS;
137,986✔
2026
  int32_t                    line = 0;
137,986✔
2027
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
137,986✔
2028
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
137,986✔
2029
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
137,986✔
2030

2031
  pVtbScan->vtbScanParam = NULL;
137,986✔
2032
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
137,986✔
2033
  QUERY_CHECK_CODE(code, line, _return);
137,986✔
2034

2035
  void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
137,986✔
2036
  while (pIter != NULL) {
314,508✔
2037
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
176,522✔
2038
    SOperatorParam*  pExchangeParam = NULL;
176,522✔
2039
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
176,522✔
2040
    if (addr != NULL) {
176,522✔
2041
      code = buildExchangeOperatorParamForVScanEx(&pExchangeParam, 0, pMap, pTaskInfo->id.taskId, addr);
4,422✔
2042
      QUERY_CHECK_CODE(code, line, _return);
4,422✔
2043
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
4,422✔
2044
      QUERY_CHECK_CODE(code, line, _return);
4,422✔
2045
    } else {
2046
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
172,100✔
2047
      QUERY_CHECK_CODE(code, line, _return);
172,100✔
2048
    }
2049
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
353,044✔
2050
    pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
176,522✔
2051
  }
2052

2053
  SOperatorParam*  pExchangeParam = NULL;
137,986✔
2054
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
137,986✔
2055
  QUERY_CHECK_CODE(code, line, _return);
137,986✔
2056
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
137,986✔
2057

2058
_return:
137,986✔
2059
  if (code) {
137,986✔
2060
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2061
  }
2062
  return code;
137,986✔
2063
}
2064

2065
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
259,445✔
2066
  int32_t                    code = TSDB_CODE_SUCCESS;
259,445✔
2067
  int32_t                    line = 0;
259,445✔
2068
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
259,445✔
2069
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
259,445✔
2070
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
259,445✔
2071

2072
  pVtbScan->orgTbVgColMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
259,445✔
2073
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno)
259,445✔
2074
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
259,445✔
2075

2076
  while (true) {
2077
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
280,230✔
2078
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
142,244✔
2079
      QUERY_CHECK_CODE(code, line, _return);
142,244✔
2080
    } else {
2081
      taosHashClear(pVtbScan->orgTbVgColMap);
137,986✔
2082
      SArray* pColRefInfo = NULL;
137,986✔
2083
      if (pVtbScan->isSuperTable) {
137,986✔
2084
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
38,316✔
2085
      } else {
2086
        pColRefInfo = pInfo->vtbScan.colRefInfo;
99,670✔
2087
      }
2088
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
137,986✔
2089

2090
      tb_uid_t uid = 0;
137,986✔
2091
      int32_t  vgId = 0;
137,986✔
2092
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
137,986✔
2093
      QUERY_CHECK_CODE(code, line, _return);
137,986✔
2094

2095
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
137,986✔
2096
      QUERY_CHECK_CODE(code, line, _return);
137,986✔
2097

2098
      // reset downstream operator's status
2099
      pVtbScanOp->status = OP_NOT_OPENED;
137,986✔
2100
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
137,986✔
2101
      QUERY_CHECK_CODE(code, line, _return);
137,986✔
2102
    }
2103

2104
    if (*pRes) {
280,230✔
2105
      // has result, still read data from this table.
2106
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
142,244✔
2107
      break;
142,244✔
2108
    } else {
2109
      // no result, read next table.
2110
      pVtbScan->curTableIdx++;
137,986✔
2111
      if (pVtbScan->isSuperTable) {
137,986✔
2112
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
38,316✔
2113
          setOperatorCompleted(pOperator);
17,531✔
2114
          break;
17,531✔
2115
        }
2116
      } else {
2117
        setOperatorCompleted(pOperator);
99,670✔
2118
        break;
99,670✔
2119
      }
2120
    }
2121
  }
2122

2123
_return:
259,445✔
2124
  taosHashCleanup(pVtbScan->orgTbVgColMap);
259,445✔
2125
  pVtbScan->orgTbVgColMap = NULL;
259,445✔
2126
  if (code) {
259,445✔
2127
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2128
  }
2129
  return code;
259,445✔
2130
}
2131

2132
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
263,867✔
2133
  int32_t                    code = TSDB_CODE_SUCCESS;
263,867✔
2134
  int32_t                    line = 0;
263,867✔
2135
  int64_t                    st = 0;
263,867✔
2136
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
263,867✔
2137
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
263,867✔
2138

2139
  if (OPTR_IS_OPENED(pOperator)) {
263,867✔
2140
    return code;
142,244✔
2141
  }
2142

2143
  if (pOperator->cost.openCost == 0) {
121,623✔
2144
    st = taosGetTimestampUs();
45,201✔
2145
  }
2146

2147
  if (pVtbScan->isSuperTable) {
121,623✔
2148
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
19,541✔
2149
    QUERY_CHECK_CODE(code, line, _return);
19,541✔
2150
  } else {
2151
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
102,082✔
2152
    QUERY_CHECK_CODE(code, line, _return);
102,082✔
2153
  }
2154

2155
  OPTR_SET_OPENED(pOperator);
117,201✔
2156

2157
_return:
121,623✔
2158
  if (pOperator->cost.openCost == 0) {
121,623✔
2159
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
45,201✔
2160
  }
2161
  if (code) {
121,623✔
2162
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
4,422✔
2163
    pOperator->pTaskInfo->code = code;
4,422✔
2164
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
4,422✔
2165
  }
2166
  return code;
117,201✔
2167
}
2168

2169
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
2,092,967✔
2170
  int32_t                    code = TSDB_CODE_SUCCESS;
2,092,967✔
2171
  int32_t                    line = 0;
2,092,967✔
2172
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,092,967✔
2173
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,092,967✔
2174

2175
  QRY_PARAM_CHECK(pRes);
2,092,967✔
2176
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
2,092,967✔
2177
    return code;
×
2178
  }
2179
  if (pOperator->pOperatorGetParam) {
2,092,967✔
2180
    if (pOperator->status == OP_EXEC_DONE) {
×
2181
      pOperator->status = OP_OPENED;
×
2182
    }
2183
    pVtbScan->curTableIdx = 0;
×
2184
    pVtbScan->lastTableIdx = -1;
×
2185
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
2186
    pOperator->pOperatorGetParam = NULL;
×
2187
  } else {
2188
    pVtbScan->window.skey = INT64_MAX;
2,092,967✔
2189
    pVtbScan->window.ekey = INT64_MIN;
2,092,967✔
2190
  }
2191

2192
  if (pVtbScan->needRedeploy) {
2,092,967✔
2193
    code = virtualTableScanCheckNeedRedeploy(pOperator);
1,833,522✔
2194
    QUERY_CHECK_CODE(code, line, _return);
1,833,522✔
2195
  }
2196

2197
  code = pOperator->fpSet._openFn(pOperator);
263,867✔
2198
  QUERY_CHECK_CODE(code, line, _return);
259,445✔
2199

2200
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
259,445✔
UNCOV
2201
    setOperatorCompleted(pOperator);
×
UNCOV
2202
    return code;
×
2203
  }
2204

2205
  code = virtualTableScanGetNext(pOperator, pRes);
259,445✔
2206
  QUERY_CHECK_CODE(code, line, _return);
259,445✔
2207

2208
  return code;
259,445✔
2209

2210
_return:
1,829,100✔
2211
  if (code) {
1,829,100✔
2212
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
1,829,100✔
2213
    pOperator->pTaskInfo->code = code;
1,829,100✔
2214
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
1,829,100✔
2215
  }
2216
  return code;
×
2217
}
2218

2219
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
496,191✔
2220
  if (batchFetch) {
496,191✔
2221
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
495,519✔
2222
    if (NULL == pPrev->leftHash) {
495,519✔
2223
      return terrno;
×
2224
    }
2225
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
495,519✔
2226
    if (NULL == pPrev->rightHash) {
495,519✔
2227
      return terrno;
×
2228
    }
2229
  } else {
2230
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
672✔
2231
    if (NULL == pPrev->leftCache) {
672✔
2232
      return terrno;
×
2233
    }
2234
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
672✔
2235
    if (NULL == pPrev->rightCache) {
672✔
2236
      return terrno;
×
2237
    }
2238
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
672✔
2239
    if (NULL == pPrev->onceTable) {
672✔
2240
      return terrno;
×
2241
    }
2242
  }
2243

2244
  return TSDB_CODE_SUCCESS;
496,191✔
2245
}
2246

2247
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2248
  if (pStreamRuntimeInfo == NULL) {
×
2249
    return;
×
2250
  }
2251

2252
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2253
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2254
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2255
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2256
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2257

2258
      pVtbScan->dynTbUid = pValue->uid;
×
2259
      break;
×
2260
    }
2261
  }
2262
}
2263

2264
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
45,201✔
2265
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2266
  int32_t      code = TSDB_CODE_SUCCESS;
45,201✔
2267
  int32_t      line = 0;
45,201✔
2268

2269
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
45,201✔
2270
  QUERY_CHECK_CODE(code, line, _return);
45,201✔
2271

2272
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
45,201✔
2273
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
45,201✔
2274
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
45,201✔
2275
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
45,201✔
2276
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
45,201✔
2277
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
45,201✔
2278
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
45,201✔
2279
  pInfo->vtbScan.needRedeploy = false;
45,201✔
2280
  pInfo->vtbScan.pMsgCb = pMsgCb;
45,201✔
2281
  pInfo->vtbScan.curTableIdx = 0;
45,201✔
2282
  pInfo->vtbScan.lastTableIdx = -1;
45,201✔
2283
  pInfo->vtbScan.dynTbUid = 0;
45,201✔
2284
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
45,201✔
2285
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
45,201✔
2286
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
45,201✔
2287
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
45,201✔
2288
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
45,201✔
2289
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
45,201✔
2290
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
99,451✔
2291
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
54,250✔
2292
    int32_t vgId = (int32_t)valueNode->datum.i;
54,250✔
2293
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
54,250✔
2294
    QUERY_CHECK_CODE(code, line, _return);
54,250✔
2295
  }
2296

2297
  if (pPhyciNode->dynTbname && pTaskInfo) {
45,201✔
2298
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2299
  }
2300

2301
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
45,201✔
2302
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
45,201✔
2303

2304
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
284,588✔
2305
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
239,387✔
2306
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
239,387✔
2307
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
478,774✔
2308
  }
2309

2310
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
45,201✔
2311
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
45,201✔
2312

2313
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
45,201✔
2314
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
45,201✔
2315

2316
  return code;
45,201✔
2317
_return:
×
2318
  // no need to destroy array and hashmap allocated in this function,
2319
  // since the operator's destroy function will take care of it
2320
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2321
  return code;
×
2322
}
2323

2324
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
×
2325
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
2326
  int32_t              code = TSDB_CODE_SUCCESS;
×
2327
  int32_t              line = 0;
×
2328
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
×
2329

2330
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
×
2331
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
×
2332
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
×
2333
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
×
2334
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
×
2335
  pInfo->vtbWindow.singleWinMode = pPhyciNode->vtbWindow.singleWinMode;
×
2336
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
×
2337

2338
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
×
2339
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
×
2340

2341
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
×
2342
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
×
2343

2344
  pInfo->vtbWindow.outputWstartSlotId = -1;
×
2345
  pInfo->vtbWindow.outputWendSlotId = -1;
×
2346
  pInfo->vtbWindow.outputWdurationSlotId = -1;
×
2347
  pInfo->vtbWindow.curWinBatchIdx = 0;
×
2348

2349
  initResultSizeInfo(&pOperator->resultInfo, 1);
×
2350
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
×
2351
  QUERY_CHECK_CODE(code, line, _return);
×
2352

2353
  return code;
×
2354
_return:
×
2355
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2356
  return code;
×
2357
}
2358

2359
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
×
2360
  int32_t code = TSDB_CODE_SUCCESS;
×
2361
  int32_t lino = 0;
×
2362

2363
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
×
2364
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
×
2365
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
×
2366

2367
    *ppTsCols = (int64_t*)pColDataInfo->pData;
×
2368

2369
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
×
2370
      code = blockDataUpdateTsWindow(pBlock, slotId);
×
2371
      QUERY_CHECK_CODE(code, lino, _return);
×
2372
    }
2373
  }
2374

2375
  return code;
×
2376
_return:
×
2377
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2378
  return code;
×
2379
}
2380

2381
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
2382
  int32_t                       code = TSDB_CODE_SUCCESS;
×
2383
  int32_t                       lino = 0;
×
2384
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2385

2386
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2387
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2388

2389
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
2390
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2391

2392
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
2393
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2394

2395
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
2396
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2397

2398
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
2399
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2400

2401
  SOperatorParam* pExchangeOperator = NULL;
×
2402
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
2403
  QUERY_CHECK_CODE(code, lino, _return);
×
2404
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
×
2405

2406
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
2407
  (*ppRes)->downstreamIdx = idx;
×
2408
  (*ppRes)->value = pExtWinOp;
×
2409
  (*ppRes)->reUse = false;
×
2410

2411
  return code;
×
2412
_return:
×
2413
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2414
  if (pExtWinOp) {
×
2415
    if (pExtWinOp->ExtWins) {
×
2416
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2417
    }
2418
    taosMemoryFree(pExtWinOp);
×
2419
  }
2420
  if (*ppRes) {
×
2421
    if ((*ppRes)->pChildren) {
×
2422
      taosArrayDestroy((*ppRes)->pChildren);
×
2423
    }
2424
    taosMemoryFree(*ppRes);
×
2425
    *ppRes = NULL;
×
2426
  }
2427
  return code;
×
2428
}
2429

2430
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
×
2431
                                       int32_t numOfDownstream, int32_t numOfWins) {
2432
  int32_t                   code = TSDB_CODE_SUCCESS;
×
2433
  int32_t                   lino = 0;
×
2434
  SMergeOperatorParam*      pMergeOp = NULL;
×
2435

2436
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2437
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2438

2439
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
×
2440
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2441

2442
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
×
2443
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
×
2444

2445
  pMergeOp->winNum = numOfWins;
×
2446

2447
  for (int32_t i = 0; i < numOfDownstream; i++) {
×
2448
    SOperatorParam* pExternalWinParam = NULL;
×
2449
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
×
2450
    QUERY_CHECK_CODE(code, lino, _return);
×
2451
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
×
2452
  }
2453

2454
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
×
2455
  (*ppRes)->downstreamIdx = 0;
×
2456
  (*ppRes)->value = pMergeOp;
×
2457
  (*ppRes)->reUse = false;
×
2458

2459
  return TSDB_CODE_SUCCESS;
×
2460
_return:
×
2461
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2462
  if (pMergeOp) {
×
2463
    taosMemoryFree(pMergeOp);
×
2464
  }
2465
  if (*ppRes) {
×
2466
    if ((*ppRes)->pChildren) {
×
2467
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
2468
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
2469
        if (pChildParam) {
×
2470
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
2471
          if (pExtWinOp) {
×
2472
            if (pExtWinOp->ExtWins) {
×
2473
              taosArrayDestroy(pExtWinOp->ExtWins);
×
2474
            }
2475
            taosMemoryFree(pExtWinOp);
×
2476
          }
2477
          taosMemoryFree(pChildParam);
×
2478
        }
2479
      }
2480
      taosArrayDestroy((*ppRes)->pChildren);
×
2481
    }
2482
    taosMemoryFree(*ppRes);
×
2483
    *ppRes = NULL;
×
2484
  }
2485
  return code;
×
2486
}
2487

2488
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
×
2489
  int32_t                    code = TSDB_CODE_SUCCESS;
×
2490
  int32_t                    lino = 0;
×
2491
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
2492
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
2493
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
2494
  int64_t                    st = 0;
×
2495

2496
  if (OPTR_IS_OPENED(pOperator)) {
×
2497
    return code;
×
2498
  }
2499

2500
  if (pOperator->cost.openCost == 0) {
×
2501
    st = taosGetTimestampUs();
×
2502
  }
2503

2504
  while (1) {
×
2505
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2506
    if (pBlock == NULL) {
×
2507
      break;
×
2508
    }
2509

2510
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
×
2511
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
×
2512
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
×
2513
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
×
2514
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
×
2515
            pInfo->outputWstartSlotId = i;
×
2516
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
×
2517
            pInfo->outputWendSlotId = i;
×
2518
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
×
2519
            pInfo->outputWdurationSlotId = i;
×
2520
          }
2521
        }
2522
      }
2523
    }
2524

2525
    TSKEY* wstartCol = NULL;
×
2526
    TSKEY* wendCol = NULL;
×
2527

2528
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
×
2529
    QUERY_CHECK_CODE(code, lino, _return);
×
2530
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
×
2531
    QUERY_CHECK_CODE(code, lino, _return);
×
2532

2533
    if (pDynInfo->vtbWindow.singleWinMode) {
×
2534
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
2535
        SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
2536
        QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
2537

2538
        QUERY_CHECK_NULL(taosArrayReserve(pWin, 1), code, lino, _return, terrno);
×
2539

2540
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, 0);
×
2541
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2542
        pWindow->tw.skey = wstartCol[i];
×
2543
        pWindow->tw.ekey = wendCol[i] + 1;
×
2544
        pWindow->winOutIdx = -1;
×
2545

2546
        QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
2547
      }
2548
    } else {
2549
      SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
2550
      QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
2551

2552
      QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
×
2553

2554
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
2555
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
×
2556
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2557
        pWindow->tw.skey = wstartCol[i];
×
2558
        pWindow->tw.ekey = wendCol[i] + 1;
×
2559
        pWindow->winOutIdx = -1;
×
2560
      }
2561

2562
      QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
2563
    }
2564
  }
2565

2566
  // handle first window's start key and last window's end key
2567
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
×
2568
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
×
2569

2570
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
×
2571
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
×
2572

2573
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
×
2574
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
×
2575

2576
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2577
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2578

2579
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
×
2580
    lastWin->tw.ekey = INT64_MAX;
×
2581
  }
2582
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
×
2583
    firstWin->tw.skey = INT64_MIN;
×
2584
  }
2585

2586
  OPTR_SET_OPENED(pOperator);
×
2587

2588
  if (pOperator->cost.openCost == 0) {
×
2589
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
2590
  }
2591

2592
_return:
×
2593
  if (code != TSDB_CODE_SUCCESS) {
×
2594
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2595
    pTaskInfo->code = code;
×
2596
    T_LONG_JMP(pTaskInfo->env, code);
×
2597
  }
2598
  return code;
×
2599
}
2600

2601
static int32_t buildDynQueryCtrlOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
2602
  int32_t                     code = TSDB_CODE_SUCCESS;
×
2603
  int32_t                     lino = 0;
×
2604
  SDynQueryCtrlOperatorParam* pDyn = NULL;
×
2605

2606
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2607
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2608

2609
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
2610
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2611

2612
  pDyn = taosMemoryMalloc(sizeof(SDynQueryCtrlOperatorParam));
×
2613
  QUERY_CHECK_NULL(pDyn, code, lino, _return, terrno);
×
2614

2615
  pDyn->window.skey = skey;
×
2616
  pDyn->window.ekey = ekey;
×
2617

2618
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL;
×
2619
  (*ppRes)->downstreamIdx = 0;
×
2620
  (*ppRes)->reUse = false;
×
2621
  (*ppRes)->value = pDyn;
×
2622

2623
  return code;
×
2624
_return:
×
2625
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2626
  if (pDyn) {
×
2627
    taosMemoryFree(pDyn);
×
2628
  }
2629
  if (*ppRes) {
×
2630
    if ((*ppRes)->pChildren) {
×
2631
      taosArrayDestroy((*ppRes)->pChildren);
×
2632
    }
2633
    taosMemoryFree(*ppRes);
×
2634
    *ppRes = NULL;
×
2635
  }
2636
  return code;
×
2637
}
2638

2639
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
2640
  int32_t                       code = TSDB_CODE_SUCCESS;
×
2641
  int32_t                       lino = 0;
×
2642
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2643

2644
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2645
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2646

2647
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
2648
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2649

2650
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
2651
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2652

2653
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
2654
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2655

2656
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
2657
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2658

2659
  SOperatorParam* pDynQueryCtrlParam = NULL;
×
2660
  code = buildDynQueryCtrlOperatorParamForExternalWindow(&pDynQueryCtrlParam, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
2661
  QUERY_CHECK_CODE(code, lino, _return);
×
2662
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pDynQueryCtrlParam), code, lino, _return, terrno)
×
2663

2664
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
2665
  (*ppRes)->downstreamIdx = idx;
×
2666
  (*ppRes)->value = pExtWinOp;
×
2667
  (*ppRes)->reUse = false;
×
2668

2669
  return code;
×
2670
_return:
×
2671
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2672
  if (pExtWinOp) {
×
2673
    if (pExtWinOp->ExtWins) {
×
2674
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2675
    }
2676
    taosMemoryFree(pExtWinOp);
×
2677
  }
2678
  if (*ppRes) {
×
2679
    if ((*ppRes)->pChildren) {
×
2680
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
2681
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
2682
        if (pChildParam) {
×
2683
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
2684
          if (pDynParam) {
×
2685
            taosMemoryFree(pDynParam);
×
2686
          }
2687
          taosMemoryFree(pChildParam);
×
2688
        }
2689
      }
2690
      taosArrayDestroy((*ppRes)->pChildren);
×
2691
    }
2692
    taosMemoryFree(*ppRes);
×
2693
    *ppRes = NULL;
×
2694
  }
2695
  return code;
×
2696
}
2697

2698
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2699
  int32_t                    code = TSDB_CODE_SUCCESS;
×
2700
  int32_t                    lino = 0;
×
2701
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
2702
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
2703
  int64_t                    st = taosGetTimestampUs();
×
2704
  int32_t                    numOfWins = 0;
×
2705
  SOperatorInfo*             mergeOp = NULL;
×
2706
  SOperatorInfo*             extWinOp = NULL;
×
2707
  SOperatorParam*            pMergeParam = NULL;
×
2708
  SOperatorParam*            pExtWinParam = NULL;
×
2709
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
2710
  SSDataBlock*               pRes = pInfo->pRes;
×
2711

2712
  code = pOperator->fpSet._openFn(pOperator);
×
2713
  QUERY_CHECK_CODE(code, lino, _return);
×
2714

2715
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
×
2716
    *ppRes = NULL;
×
2717
    return code;
×
2718
  }
2719

2720
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
×
2721
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
×
2722

2723
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
×
2724

2725
  if (pInfo->isVstb) {
×
2726
    extWinOp = pOperator->pDownstream[1];
×
2727
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
×
2728
    QUERY_CHECK_CODE(code, lino, _return);
×
2729

2730
    SSDataBlock* pExtWinBlock = NULL;
×
2731
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
×
2732
    QUERY_CHECK_CODE(code, lino, _return);
×
2733

2734
    blockDataCleanup(pRes);
×
2735
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
2736
    QUERY_CHECK_CODE(code, lino, _return);
×
2737

2738
    if (pExtWinBlock) {
×
2739
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
×
2740
      QUERY_CHECK_CODE(code, lino, _return);
×
2741

2742
      if (pInfo->curWinBatchIdx == 0) {
×
2743
        // first batch, get _wstart from pMergedBlock
2744
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
2745
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2746

2747
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
×
2748
      }
2749
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
2750
        // last batch, get _wend from pMergedBlock
2751
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
2752
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2753

2754
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
×
2755
      }
2756
    }
2757
  } else {
2758
    mergeOp = pOperator->pDownstream[1];
×
2759
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
×
2760
    QUERY_CHECK_CODE(code, lino, _return);
×
2761

2762
    SSDataBlock* pMergedBlock = NULL;
×
2763
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
×
2764
    QUERY_CHECK_CODE(code, lino, _return);
×
2765

2766
    blockDataCleanup(pRes);
×
2767
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
2768
    QUERY_CHECK_CODE(code, lino, _return);
×
2769

2770
    if (pMergedBlock) {
×
2771
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
×
2772
      QUERY_CHECK_CODE(code, lino, _return);
×
2773

2774
      if (pInfo->curWinBatchIdx == 0) {
×
2775
        // first batch, get _wstart from pMergedBlock
2776
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
2777
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2778

2779
        firstWin->tw.skey = pMergedBlock->info.window.skey;
×
2780
      }
2781
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
2782
        // last batch, get _wend from pMergedBlock
2783
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
2784
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2785

2786
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
×
2787
      }
2788
    }
2789
  }
2790

2791

2792
  if (pInfo->outputWstartSlotId != -1) {
×
2793
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
×
2794
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
×
2795

2796
    for (int32_t i = 0; i < numOfWins; i++) {
×
2797
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
2798
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2799
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
×
2800
      QUERY_CHECK_CODE(code, lino, _return);
×
2801
    }
2802
  }
2803
  if (pInfo->outputWendSlotId != -1) {
×
2804
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
×
2805
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
×
2806

2807
    for (int32_t i = 0; i < numOfWins; i++) {
×
2808
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
2809
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2810
      TSKEY ekey = pWindow->tw.ekey - 1;
×
2811
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
×
2812
      QUERY_CHECK_CODE(code, lino, _return);
×
2813
    }
2814
  }
2815
  if (pInfo->outputWdurationSlotId != -1) {
×
2816
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
×
2817
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
×
2818

2819
    for (int32_t i = 0; i < numOfWins; i++) {
×
2820
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
2821
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2822
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
×
2823
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
×
2824
      QUERY_CHECK_CODE(code, lino, _return);
×
2825
    }
2826
  }
2827

2828
  pRes->info.rows = numOfWins;
×
2829
  *ppRes = pRes;
×
2830
  pInfo->curWinBatchIdx++;
×
2831

2832
  return code;
×
2833

2834
_return:
×
2835
  if (code != TSDB_CODE_SUCCESS) {
×
2836
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2837
    pTaskInfo->code = code;
×
2838
    T_LONG_JMP(pTaskInfo->env, code);
×
2839
  }
2840
  return code;
×
2841
}
2842

2843
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
2,011,755✔
2844
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
2,011,755✔
2845
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
2,011,755✔
2846
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
2,011,755✔
2847

2848
  pOper->status = OP_NOT_OPENED;
2,011,755✔
2849

2850
  switch (pDyn->qType) {
2,011,755✔
2851
    case DYN_QTYPE_STB_HASH:{
1,317✔
2852
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
1,317✔
2853
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
1,317✔
2854
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
1,317✔
2855
      
2856
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
1,317✔
2857
      if (TSDB_CODE_SUCCESS != code) {
1,317✔
2858
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
2859
        return code;
×
2860
      }
2861
      pStbJoin->ctx.prev.pListHead = NULL;
1,317✔
2862
      pStbJoin->ctx.prev.joinBuild = false;
1,317✔
2863
      pStbJoin->ctx.prev.pListTail = NULL;
1,317✔
2864
      pStbJoin->ctx.prev.tableNum = 0;
1,317✔
2865

2866
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
1,317✔
2867
      break; 
1,317✔
2868
    }
2869
    case DYN_QTYPE_VTB_SCAN: {
2,010,036✔
2870
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
2,010,036✔
2871
      
2872
      if (pVtbScan->orgTbVgColMap) {
2,010,438✔
2873
        taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
2874
        taosHashCleanup(pVtbScan->orgTbVgColMap);
×
2875
        pVtbScan->orgTbVgColMap = NULL;
×
2876
      }
2877
      if (pVtbScan->pRsp) {
2,010,438✔
2878
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
2879
        taosMemoryFreeClear(pVtbScan->pRsp);
×
2880
      }
2881
      if (pVtbScan->colRefInfo) {
2,010,438✔
2882
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
102,082✔
2883
        pVtbScan->colRefInfo = NULL;
102,082✔
2884
      }
2885
      if (pVtbScan->childTableMap) {
2,010,438✔
2886
        taosHashCleanup(pVtbScan->childTableMap);
10,492✔
2887
        pVtbScan->childTableMap = NULL;
10,492✔
2888
      }
2889
      if (pVtbScan->childTableList) {
2,010,438✔
2890
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
2,010,438✔
2891
      }
2892
      if (pPhyciNode->dynTbname && pTaskInfo) {
2,009,634✔
2893
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2894
      }
2895
      pVtbScan->curTableIdx = 0;
2,009,634✔
2896
      pVtbScan->lastTableIdx = -1;
2,010,438✔
2897
      break;
2,010,438✔
2898
    }
2899
    case DYN_QTYPE_VTB_WINDOW: {
×
2900
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
2901
      if (pVtbWindow->pRes) {
×
2902
        blockDataDestroy(pVtbWindow->pRes);
×
2903
        pVtbWindow->pRes = NULL;
×
2904
      }
2905
      if (pVtbWindow->pWins) {
×
2906
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
2907
        pVtbWindow->pWins = NULL;
×
2908
      }
2909
      pVtbWindow->outputWdurationSlotId = -1;
×
2910
      pVtbWindow->outputWendSlotId = -1;
×
2911
      pVtbWindow->outputWstartSlotId = -1;
×
2912
      pVtbWindow->curWinBatchIdx = 0;
×
2913
      break;
×
2914
    }
2915
    default:
×
2916
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
2917
      break;
×
2918
  }
2919
  return 0;
2,011,755✔
2920
}
2921

2922
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
540,075✔
2923
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
2924
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
2925
  QRY_PARAM_CHECK(pOptrInfo);
540,075✔
2926

2927
  int32_t                    code = TSDB_CODE_SUCCESS;
540,075✔
2928
  int32_t                    line = 0;
540,075✔
2929
  __optr_fn_t                nextFp = NULL;
540,075✔
2930
  __optr_open_fn_t           openFp = NULL;
540,075✔
2931
  SOperatorInfo*             pOperator = NULL;
540,075✔
2932
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
540,075✔
2933
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
540,075✔
2934

2935
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
540,075✔
2936
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
540,075✔
2937

2938
  pOperator->pPhyNode = pPhyciNode;
540,075✔
2939
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
540,075✔
2940

2941
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
540,075✔
2942
  QUERY_CHECK_CODE(code, line, _error);
540,075✔
2943

2944
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
540,075✔
2945
                  pInfo, pTaskInfo);
2946

2947
  pInfo->qType = pPhyciNode->qType;
540,075✔
2948
  switch (pInfo->qType) {
540,075✔
2949
    case DYN_QTYPE_STB_HASH:
494,874✔
2950
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
494,874✔
2951
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
494,874✔
2952
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
494,874✔
2953
      QUERY_CHECK_CODE(code, line, _error);
494,874✔
2954
      nextFp = seqStableJoin;
494,874✔
2955
      openFp = optrDummyOpenFn;
494,874✔
2956
      break;
494,874✔
2957
    case DYN_QTYPE_VTB_SCAN:
45,201✔
2958
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
45,201✔
2959
      QUERY_CHECK_CODE(code, line, _error);
45,201✔
2960
      nextFp = vtbScanNext;
45,201✔
2961
      openFp = vtbScanOpen;
45,201✔
2962
      break;
45,201✔
2963
    case DYN_QTYPE_VTB_WINDOW:
×
2964
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
×
2965
      QUERY_CHECK_CODE(code, line, _error);
×
2966
      nextFp = vtbWindowNext;
×
2967
      openFp = vtbWindowOpen;
×
2968
      break;
×
2969
    default:
×
2970
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
2971
      code = TSDB_CODE_INVALID_PARA;
×
2972
      goto _error;
×
2973
  }
2974

2975
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
540,075✔
2976
                                         NULL, optrDefaultGetNextExtFn, NULL);
2977

2978
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
540,075✔
2979
  *pOptrInfo = pOperator;
540,075✔
2980
  return TSDB_CODE_SUCCESS;
540,075✔
2981

2982
_error:
×
2983
  if (pInfo != NULL) {
×
2984
    destroyDynQueryCtrlOperator(pInfo);
×
2985
  }
2986
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
2987
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
2988
  pTaskInfo->code = code;
×
2989
  return code;
×
2990
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc