• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4881

14 Dec 2025 03:48AM UTC coverage: 60.617% (+0.5%) from 60.092%
#4881

push

travis-ci

web-flow
test: update coverage workflow time (#33918)

156854 of 258761 relevant lines covered (60.62%)

75258957.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.03
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
216,981✔
41
  SStbJoinTableList* pNext = NULL;
216,981✔
42
  
43
  while (pListHead) {
217,337✔
44
    taosMemoryFree(pListHead->pLeftVg);
356✔
45
    taosMemoryFree(pListHead->pLeftUid);
356✔
46
    taosMemoryFree(pListHead->pRightVg);
356✔
47
    taosMemoryFree(pListHead->pRightUid);
356✔
48
    pNext = pListHead->pNext;
356✔
49
    taosMemoryFree(pListHead);
356✔
50
    pListHead = pNext;
356✔
51
  }
52
}
216,981✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
216,981✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
216,981✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
216,981✔
60
    if (pStbJoin->ctx.prev.leftHash) {
216,621✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
174,727✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
174,727✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
216,621✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
174,727✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
174,727✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
360✔
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
360✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
360✔
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
360✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
360✔
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
360✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
216,981✔
81
}
216,981✔
82

83
void destroyOrgTbInfo(void *info) {
444,892✔
84
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
444,892✔
85
  if (pOrgTbInfo) {
444,892✔
86
    taosArrayDestroy(pOrgTbInfo->colMap);
444,892✔
87
  }
88
}
444,892✔
89

90
void destroyColRefInfo(void *info) {
3,895,476✔
91
  SColRefInfo *pColRefInfo = (SColRefInfo *)info;
3,895,476✔
92
  if (pColRefInfo) {
3,895,476✔
93
    taosMemoryFree(pColRefInfo->colName);
3,895,476✔
94
    taosMemoryFree(pColRefInfo->colrefName);
3,895,476✔
95
  }
96
}
3,895,476✔
97

98
void destroyColRefArray(void *info) {
246,786✔
99
  SArray *pColRefArray = *(SArray **)info;
246,786✔
100
  if (pColRefArray) {
246,786✔
101
    taosArrayDestroyEx(pColRefArray, destroyColRefInfo);
246,786✔
102
  }
103
}
246,786✔
104

105
void freeUseDbOutput(void* pOutput) {
102,285✔
106
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
102,285✔
107
  if (NULL == pOutput) {
102,285✔
108
    return;
×
109
  }
110

111
  if (pOut->dbVgroup) {
102,285✔
112
    freeVgInfo(pOut->dbVgroup);
102,285✔
113
  }
114
  taosMemFree(pOut);
102,285✔
115
}
116

117
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
121,773✔
118
  if (pVtbScan->dbName) {
121,773✔
119
    taosMemoryFreeClear(pVtbScan->dbName);
121,773✔
120
  }
121
  if (pVtbScan->tbName) {
121,773✔
122
    taosMemoryFreeClear(pVtbScan->tbName);
121,773✔
123
  }
124
  if (pVtbScan->childTableList) {
121,773✔
125
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
121,773✔
126
  }
127
  if (pVtbScan->colRefInfo) {
121,773✔
128
    taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
129
    pVtbScan->colRefInfo = NULL;
×
130
  }
131
  if (pVtbScan->childTableMap) {
121,773✔
132
    taosHashCleanup(pVtbScan->childTableMap);
121,773✔
133
  }
134
  if (pVtbScan->readColList) {
121,773✔
135
    taosArrayDestroy(pVtbScan->readColList);
121,773✔
136
  }
137
  if (pVtbScan->dbVgInfoMap) {
121,773✔
138
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
121,773✔
139
    taosHashCleanup(pVtbScan->dbVgInfoMap);
121,773✔
140
  }
141
  if (pVtbScan->orgTbVgColMap) {
121,773✔
142
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
143
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
144
  }
145
  if (pVtbScan->pRsp) {
121,773✔
146
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
147
    taosMemoryFreeClear(pVtbScan->pRsp);
×
148
  }
149
  if (pVtbScan->existOrgTbVg) {
121,773✔
150
    taosHashCleanup(pVtbScan->existOrgTbVg);
121,773✔
151
  }
152
  if (pVtbScan->curOrgTbVg) {
121,773✔
153
    taosHashCleanup(pVtbScan->curOrgTbVg);
×
154
  }
155
  if (pVtbScan->newAddedVgInfo) {
121,773✔
156
    taosHashCleanup(pVtbScan->newAddedVgInfo);
×
157
  }
158
}
121,773✔
159

160
void destroyWinArray(void *info) {
×
161
  SArray *pWinArray = *(SArray **)info;
×
162
  if (pWinArray) {
×
163
    taosArrayDestroy(pWinArray);
×
164
  }
165
}
×
166

167
static void destroyVtbWindowDynCtrlInfo(SVtbWindowDynCtrlInfo* pVtbWindow) {
×
168
  if (pVtbWindow->pRes) {
×
169
    blockDataDestroy(pVtbWindow->pRes);
×
170
  }
171
  if (pVtbWindow->pWins) {
×
172
    taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
173
  }
174
}
×
175

176
static void destroyDynQueryCtrlOperator(void* param) {
338,754✔
177
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
338,754✔
178

179
  switch (pDyn->qType) {
338,754✔
180
    case DYN_QTYPE_STB_HASH:
216,981✔
181
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
216,981✔
182
      break;
216,981✔
183
    case DYN_QTYPE_VTB_SCAN:
121,773✔
184
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
121,773✔
185
      break;
121,773✔
186
    case DYN_QTYPE_VTB_WINDOW:
×
187
      destroyVtbWindowDynCtrlInfo(&pDyn->vtbWindow);
×
188
      break;
×
189
    default:
×
190
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
191
      break;
×
192
  }
193

194
  taosMemoryFreeClear(param);
338,754✔
195
}
338,754✔
196

197
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
198
  if (batchFetch) {
1,547,966✔
199
    return true;
1,546,526✔
200
  }
201
  
202
  if (rightTable) {
1,440✔
203
    return pPost->rightCurrUid == pPost->rightNextUid;
720✔
204
  }
205

206
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
720✔
207

208
  return (NULL == num) ? false : true;
720✔
209
}
210

211
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
773,983✔
212
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
773,983✔
213
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
773,983✔
214
  SStbJoinTableList*         pNode = pPrev->pListHead;
773,983✔
215
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
773,983✔
216
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
773,983✔
217
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
773,983✔
218
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
773,983✔
219
  int64_t                    readIdx = pNode->readIdx + 1;
773,983✔
220
  int64_t                    rightPrevUid = pPost->rightCurrUid;
773,983✔
221

222
  pPost->leftCurrUid = *leftUid;
773,983✔
223
  pPost->rightCurrUid = *rightUid;
773,983✔
224

225
  pPost->leftVgId = *leftVgId;
773,983✔
226
  pPost->rightVgId = *rightVgId;
773,983✔
227

228
  while (true) {
229
    if (readIdx < pNode->uidNum) {
773,983✔
230
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
732,085✔
231
      break;
732,085✔
232
    }
233
    
234
    pNode = pNode->pNext;
41,898✔
235
    if (NULL == pNode) {
41,898✔
236
      pPost->rightNextUid = 0;
41,898✔
237
      break;
41,898✔
238
    }
239
    
240
    rightUid = pNode->pRightUid;
×
241
    readIdx = 0;
×
242
  }
243

244
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
1,547,966✔
245
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
1,547,966✔
246

247
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
773,983✔
248
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
249
    pStbJoin->execInfo.rightCacheNum++;
×
250
  }  
251

252
  return TSDB_CODE_SUCCESS;
773,983✔
253
}
254

255

256
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
1,547,966✔
257
  int32_t code = TSDB_CODE_SUCCESS;
1,547,966✔
258
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,547,966✔
259
  if (NULL == *ppRes) {
1,547,966✔
260
    code = terrno;
×
261
    freeOperatorParam(pChild, OP_GET_PARAM);
×
262
    return code;
×
263
  }
264
  if (pChild) {
1,547,966✔
265
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
85,228✔
266
    if (NULL == (*ppRes)->pChildren) {
85,228✔
267
      code = terrno;
×
268
      freeOperatorParam(pChild, OP_GET_PARAM);
×
269
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
270
      *ppRes = NULL;
×
271
      return code;
×
272
    }
273
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
170,456✔
274
      code = terrno;
×
275
      freeOperatorParam(pChild, OP_GET_PARAM);
×
276
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
277
      *ppRes = NULL;
×
278
      return code;
×
279
    }
280
  } else {
281
    (*ppRes)->pChildren = NULL;
1,462,738✔
282
  }
283

284
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
1,547,966✔
285
  if (NULL == pGc) {
1,547,966✔
286
    code = terrno;
×
287
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
288
    *ppRes = NULL;
×
289
    return code;
×
290
  }
291

292
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
1,547,966✔
293
  pGc->downstreamIdx = downstreamIdx;
1,547,966✔
294
  pGc->vgId = vgId;
1,547,966✔
295
  pGc->tbUid = tbUid;
1,547,966✔
296
  pGc->needCache = needCache;
1,547,966✔
297

298
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
1,547,966✔
299
  (*ppRes)->downstreamIdx = downstreamIdx;
1,547,966✔
300
  (*ppRes)->value = pGc;
1,547,966✔
301
  (*ppRes)->reUse = false;
1,547,966✔
302

303
  return TSDB_CODE_SUCCESS;
1,547,966✔
304
}
305

306

307
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
308
  int32_t code = TSDB_CODE_SUCCESS;
×
309
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
310
  if (NULL == *ppRes) {
×
311
    return terrno;
×
312
  }
313
  (*ppRes)->pChildren = NULL;
×
314

315
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
316
  if (NULL == pGc) {
×
317
    code = terrno;
×
318
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
319
    return code;
×
320
  }
321

322
  pGc->downstreamIdx = downstreamIdx;
×
323
  pGc->vgId = vgId;
×
324
  pGc->tbUid = tbUid;
×
325

326
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
327
  (*ppRes)->downstreamIdx = downstreamIdx;
×
328
  (*ppRes)->value = pGc;
×
329
  (*ppRes)->reUse = false;
×
330

331
  return TSDB_CODE_SUCCESS;
×
332
}
333

334
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid);
335
static int32_t buildExchangeOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
336
  int32_t                   code = TSDB_CODE_SUCCESS;
×
337
  int32_t                   lino = 0;
×
338
  SExchangeOperatorParam*   pExc = NULL;
×
339

340
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
341
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
342

343
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
344
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
345

346
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
347
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
×
348

349
  pExc->multiParams = false;
×
350
  pExc->basic.vgId = 0;
×
351
  pExc->basic.tableSeq = true;
×
352
  pExc->basic.isVtbRefScan = false;
×
353
  pExc->basic.isVtbTagScan = false;
×
354
  pExc->basic.isVtbWinScan = true;
×
355
  pExc->basic.isNewParam = true;
×
356
  pExc->basic.window.skey = skey;
×
357
  pExc->basic.window.ekey = ekey;
×
358
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
359
  pExc->basic.colMap = NULL;
×
360
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
361
  QUERY_CHECK_NULL(pExc->basic.uidList, code, lino, _return, terrno)
×
362

363
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
364
  (*ppRes)->downstreamIdx = 0;
×
365
  (*ppRes)->reUse = true;
×
366
  (*ppRes)->value = pExc;
×
367

368
  return code;
×
369
_return:
×
370
  qError("failed to build exchange operator param for external window, code:%d, line:%d", code, lino);
×
371
  if (pExc) {
×
372
   if (pExc->basic.uidList) {
×
373
      taosArrayDestroy(pExc->basic.uidList);
×
374
    }
375
    taosMemoryFreeClear(pExc);
×
376
  }
377
  if (*ppRes) {
×
378
    if ((*ppRes)->pChildren) {
×
379
      taosArrayDestroy((*ppRes)->pChildren);
×
380
    }
381
    taosMemoryFreeClear(*ppRes);
×
382
  }
383

384
  return code;
×
385
}
386

387
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
1,440✔
388
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
1,440✔
389
  if (NULL == *ppRes) {
1,440✔
390
    return terrno;
×
391
  }
392
  (*ppRes)->pChildren = NULL;
1,440✔
393
  
394
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
1,440✔
395
  if (NULL == pExc) {
1,440✔
396
    return terrno;
×
397
  }
398

399
  pExc->multiParams = false;
1,440✔
400
  pExc->basic.vgId = *pVgId;
1,440✔
401
  pExc->basic.tableSeq = true;
1,440✔
402
  pExc->basic.isVtbRefScan = false;
1,440✔
403
  pExc->basic.isVtbWinScan = false;
1,440✔
404
  pExc->basic.isVtbTagScan = false;
1,440✔
405
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
1,440✔
406
  pExc->basic.colMap = NULL;
1,440✔
407
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
1,440✔
408
  if (NULL == pExc->basic.uidList) {
1,440✔
409
    taosMemoryFree(pExc);
×
410
    return terrno;
×
411
  }
412
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
2,880✔
413
    taosArrayDestroy(pExc->basic.uidList);
×
414
    taosMemoryFree(pExc);
×
415
    return terrno;
×
416
  }
417

418
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
1,440✔
419
  (*ppRes)->downstreamIdx = downstreamIdx;
1,440✔
420
  (*ppRes)->value = pExc;
1,440✔
421
  (*ppRes)->reUse = false;
1,440✔
422

423
  return TSDB_CODE_SUCCESS;
1,440✔
424
}
425

426
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
76,585✔
427
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
76,585✔
428
  if (NULL == *ppRes) {
76,585✔
429
    return terrno;
×
430
  }
431
  (*ppRes)->pChildren = NULL;
76,585✔
432
  
433
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
76,585✔
434
  if (NULL == pExc) {
76,585✔
435
    taosMemoryFreeClear(*ppRes);
×
436
    return terrno;
×
437
  }
438

439
  pExc->multiParams = true;
76,585✔
440
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
76,585✔
441
  if (NULL == pExc->pBatchs) {
76,585✔
442
    taosMemoryFree(pExc);
×
443
    taosMemoryFreeClear(*ppRes);
×
444
    return terrno;
×
445
  }
446
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
76,585✔
447
  
448
  SExchangeOperatorBasicParam basic;
76,585✔
449
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
76,585✔
450

451
  int32_t iter = 0;
76,585✔
452
  void* p = NULL;
76,585✔
453
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
214,073✔
454
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
137,488✔
455
    SArray* pUidList = *(SArray**)p;
137,488✔
456
    basic.vgId = *pVgId;
137,488✔
457
    basic.uidList = pUidList;
137,488✔
458
    basic.colMap = NULL;
137,488✔
459
    basic.tableSeq = false;
137,488✔
460
    basic.isVtbRefScan = false;
137,488✔
461
    basic.isVtbWinScan = false;
137,488✔
462
    basic.isVtbTagScan = false;
137,488✔
463
    
464
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
137,488✔
465

466
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
137,488✔
467
    *(SArray**)p = NULL;
137,488✔
468
  }
469

470
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
76,585✔
471
  (*ppRes)->downstreamIdx = downstreamIdx;
76,585✔
472
  (*ppRes)->value = pExc;
76,585✔
473
  (*ppRes)->reUse = false;
76,585✔
474

475
  return TSDB_CODE_SUCCESS;
76,585✔
476
}
477

478
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
246,786✔
479
  int32_t                      code = TSDB_CODE_SUCCESS;
246,786✔
480
  int32_t                      lino = 0;
246,786✔
481
  SExchangeOperatorParam*      pExc = NULL;
246,786✔
482
  SExchangeOperatorBasicParam* basic = NULL;
246,786✔
483

484
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
246,786✔
485
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
246,786✔
486
  (*ppRes)->pChildren = NULL;
246,786✔
487

488
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
246,786✔
489
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
246,786✔
490

491
  pExc->multiParams = false;
246,786✔
492

493
  basic = &pExc->basic;
246,786✔
494
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
246,786✔
495

496
  basic->vgId = vgId;
246,786✔
497
  basic->tableSeq = false;
246,786✔
498
  basic->isVtbRefScan = false;
246,786✔
499
  basic->isVtbTagScan = true;
246,786✔
500
  basic->isVtbWinScan = false;
246,786✔
501
  basic->isNewDeployed = false;
246,786✔
502
  basic->colMap = NULL;
246,786✔
503

504
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
246,786✔
505
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
246,786✔
506
  QUERY_CHECK_NULL(taosArrayPush(basic->uidList, &uid), code, lino, _return, terrno)
493,572✔
507

508
  (*ppRes)->pChildren = NULL;
246,786✔
509

510
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
246,786✔
511
  (*ppRes)->downstreamIdx = downstreamIdx;
246,786✔
512
  (*ppRes)->value = pExc;
246,786✔
513
  (*ppRes)->reUse = true;
246,786✔
514

515
  return TSDB_CODE_SUCCESS;
246,786✔
516

517
_return:
×
518
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
519
  taosMemoryFreeClear(*ppRes);
×
520
  if (basic) {
×
521
    if (basic->colMap) {
×
522
      taosArrayDestroy(basic->colMap->colMap);
×
523
      taosMemoryFreeClear(basic->colMap);
×
524
    }
525
    if (basic->uidList) {
×
526
      taosArrayDestroy(basic->uidList);
×
527
    }
528
    taosMemoryFreeClear(basic);
×
529
  }
530
  taosMemoryFreeClear(pExc);
×
531
  return code;
×
532
}
533

534
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
444,892✔
535
  int32_t                      code = TSDB_CODE_SUCCESS;
444,892✔
536
  int32_t                      lino = 0;
444,892✔
537
  SExchangeOperatorParam*      pExc = NULL;
444,892✔
538
  SExchangeOperatorBasicParam* basic = NULL;
444,892✔
539

540
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
444,892✔
541
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
444,892✔
542
  (*ppRes)->pChildren = NULL;
444,892✔
543

544
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
444,892✔
545
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
444,892✔
546

547
  pExc->multiParams = false;
444,892✔
548

549
  basic = &pExc->basic;
444,892✔
550
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
444,892✔
551

552
  basic->vgId = pMap->vgId;
444,892✔
553
  basic->tableSeq = false;
444,892✔
554
  basic->isVtbRefScan = true;
444,892✔
555
  basic->isVtbWinScan = false;
444,892✔
556
  basic->isVtbTagScan = false;
444,892✔
557
  basic->isNewDeployed = false;
444,892✔
558
  basic->isNewParam = true;
444,892✔
559
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
444,892✔
560
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
444,892✔
561
  basic->colMap->vgId = pMap->vgId;
444,892✔
562
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
444,892✔
563
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
444,892✔
564
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
444,892✔
565

566
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
444,892✔
567
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
444,892✔
568

569
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
444,892✔
570
  (*ppRes)->downstreamIdx = downstreamIdx;
444,892✔
571
  (*ppRes)->value = pExc;
444,892✔
572
  (*ppRes)->reUse = true;
444,892✔
573

574
  return TSDB_CODE_SUCCESS;
444,892✔
575

576
_return:
×
577
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
578
  taosMemoryFreeClear(*ppRes);
×
579
  if (basic) {
×
580
    if (basic->colMap) {
×
581
      taosArrayDestroy(basic->colMap->colMap);
×
582
      taosMemoryFreeClear(basic->colMap);
×
583
    }
584
    if (basic->uidList) {
×
585
      taosArrayDestroy(basic->uidList);
×
586
    }
587
    taosMemoryFreeClear(basic);
×
588
  }
589
  taosMemoryFreeClear(pExc);
×
590
  return code;
×
591
}
592

593
static int32_t buildExchangeOperatorParamForVScanEx(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap, uint64_t taskId, SStreamTaskAddr* pTaskAddr) {
×
594
  int32_t                      code = TSDB_CODE_SUCCESS;
×
595
  int32_t                      lino = 0;
×
596
  SExchangeOperatorParam*      pExc = NULL;
×
597
  SExchangeOperatorBasicParam* basic = NULL;
×
598

599
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
600
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
601
  (*ppRes)->pChildren = NULL;
×
602

603
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
604
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno)
×
605

606
  pExc->multiParams = false;
×
607

608
  basic = &pExc->basic;
×
609
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
610

611
  basic->vgId = pMap->vgId;
×
612
  basic->tableSeq = false;
×
613
  basic->isVtbRefScan = true;
×
614
  basic->isVtbWinScan = false;
×
615
  basic->isVtbTagScan = false;
×
616
  basic->isNewDeployed = true;
×
617
  basic->isNewParam = true;
×
618
  basic->newDeployedSrc.type = QUERY_NODE_DOWNSTREAM_SOURCE;
×
619
  basic->newDeployedSrc.clientId = taskId;// current task's taskid
×
620
  basic->newDeployedSrc.taskId = pTaskAddr->taskId;
×
621
  basic->newDeployedSrc.fetchMsgType = TDMT_STREAM_FETCH;
×
622
  basic->newDeployedSrc.localExec = false;
×
623
  basic->newDeployedSrc.addr.nodeId = pTaskAddr->nodeId;
×
624
  memcpy(&basic->newDeployedSrc.addr.epSet, &pTaskAddr->epset, sizeof(SEpSet));
×
625
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
626
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno)
×
627
  basic->colMap->vgId = pMap->vgId;
×
628
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
629
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
×
630
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno)
×
631

632
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
×
633
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno)
×
634

635
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
636
  (*ppRes)->downstreamIdx = downstreamIdx;
×
637
  (*ppRes)->value = pExc;
×
638
  (*ppRes)->reUse = true;
×
639

640
  return TSDB_CODE_SUCCESS;
×
641

642
_return:
×
643
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
644
  taosMemoryFreeClear(*ppRes);
×
645
  if (basic) {
×
646
    if (basic->colMap) {
×
647
      taosArrayDestroy(basic->colMap->colMap);
×
648
      taosMemoryFreeClear(basic->colMap);
×
649
    }
650
    if (basic->uidList) {
×
651
      taosArrayDestroy(basic->uidList);
×
652
    }
653
    taosMemoryFreeClear(basic);
×
654
  }
655
  taosMemoryFreeClear(pExc);
×
656
  return code;
×
657
}
658

659
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
773,983✔
660
  int32_t code = TSDB_CODE_SUCCESS;
773,983✔
661
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
773,983✔
662
  if (NULL == *ppRes) {
773,983✔
663
    code = terrno;
×
664
    return code;
×
665
  }
666
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
773,983✔
667
  if (NULL == (*ppRes)->pChildren) {
773,983✔
668
    code = terrno;
×
669
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
670
    *ppRes = NULL;
×
671
    return code;
×
672
  }
673
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
1,547,966✔
674
    code = terrno;
×
675
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
676
    *ppRes = NULL;
×
677
    return code;
×
678
  }
679
  *ppChild0 = NULL;
773,983✔
680
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
1,547,966✔
681
    code = terrno;
×
682
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
683
    *ppRes = NULL;
×
684
    return code;
×
685
  }
686
  *ppChild1 = NULL;
773,983✔
687
  
688
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
773,983✔
689
  if (NULL == pJoin) {
773,983✔
690
    code = terrno;
×
691
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
692
    *ppRes = NULL;
×
693
    return code;
×
694
  }
695

696
  pJoin->initDownstream = initParam;
773,983✔
697
  
698
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
773,983✔
699
  (*ppRes)->value = pJoin;
773,983✔
700
  (*ppRes)->reUse = false;
773,983✔
701

702
  return TSDB_CODE_SUCCESS;
773,983✔
703
}
704

705
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
706
  int32_t code = TSDB_CODE_SUCCESS;
×
707
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
708
  if (NULL == *ppRes) {
×
709
    code = terrno;
×
710
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
711
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
712
    return code;
×
713
  }
714
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
715
  if (NULL == *ppRes) {
×
716
    code = terrno;
×
717
    taosMemoryFreeClear(*ppRes);
×
718
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
719
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
720
    return code;
×
721
  }
722
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
723
    code = terrno;
×
724
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
725
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
726
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
727
    *ppRes = NULL;
×
728
    return code;
×
729
  }
730
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
731
    code = terrno;
×
732
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
733
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
734
    *ppRes = NULL;
×
735
    return code;
×
736
  }
737
  
738
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
739
  (*ppRes)->value = NULL;
×
740
  (*ppRes)->reUse = false;
×
741

742
  return TSDB_CODE_SUCCESS;
×
743
}
744

745
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
7,203✔
746
  int32_t code = TSDB_CODE_SUCCESS;
7,203✔
747
  int32_t vgNum = tSimpleHashGetSize(pVg);
7,203✔
748
  if (vgNum <= 0 || vgNum > 1) {
7,203✔
749
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
750
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
751
  }
752

753
  int32_t iter = 0;
7,203✔
754
  void* p = NULL;
7,203✔
755
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
14,406✔
756
    SArray* pUidList = *(SArray**)p;
7,203✔
757

758
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
7,203✔
759
    if (code) {
7,203✔
760
      return code;
×
761
    }
762
    taosArrayDestroy(pUidList);
7,203✔
763
    *(SArray**)p = NULL;
7,203✔
764
  }
765
  
766
  return TSDB_CODE_SUCCESS;
7,203✔
767
}
768

769

770
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
771
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
772
  if (NULL == pUidList) {
×
773
    return terrno;
×
774
  }
775
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
776
    return terrno;
×
777
  }
778

779
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
780
  taosArrayDestroy(pUidList);
×
781
  if (code) {
×
782
    return code;
×
783
  }
784
  
785
  return TSDB_CODE_SUCCESS;
×
786
}
787

788
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
773,983✔
789
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
773,983✔
790
  SOperatorParam*             pSrcParam0 = NULL;
773,983✔
791
  SOperatorParam*             pSrcParam1 = NULL;
773,983✔
792
  SOperatorParam*             pGcParam0 = NULL;
773,983✔
793
  SOperatorParam*             pGcParam1 = NULL;  
773,983✔
794
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
773,983✔
795
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
773,983✔
796
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
773,983✔
797
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
773,983✔
798
  int32_t                     code = TSDB_CODE_SUCCESS;
773,983✔
799

800
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
773,983✔
801
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
802

803
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
773,983✔
804
  
805
  if (pInfo->stbJoin.basic.batchFetch) {
773,983✔
806
    if (pPrev->leftHash) {
773,263✔
807
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
41,894✔
808
      if (TSDB_CODE_SUCCESS == code) {
41,894✔
809
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
41,894✔
810
      }
811
      if (TSDB_CODE_SUCCESS == code) {
41,894✔
812
        tSimpleHashCleanup(pPrev->leftHash);
41,894✔
813
        tSimpleHashCleanup(pPrev->rightHash);
41,894✔
814
        pPrev->leftHash = NULL;
41,894✔
815
        pPrev->rightHash = NULL;
41,894✔
816
      }
817
    }
818
  } else {
819
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
720✔
820
    if (TSDB_CODE_SUCCESS == code) {
720✔
821
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
720✔
822
    }
823
  }
824

825
  bool initParam = pSrcParam0 ? true : false;
773,983✔
826
  if (TSDB_CODE_SUCCESS == code) {
773,983✔
827
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
773,983✔
828
    pSrcParam0 = NULL;
773,983✔
829
  }
830
  if (TSDB_CODE_SUCCESS == code) {
773,983✔
831
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
773,983✔
832
    pSrcParam1 = NULL;
773,983✔
833
  }
834
  if (TSDB_CODE_SUCCESS == code) {
773,983✔
835
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
773,983✔
836
  }
837
  if (TSDB_CODE_SUCCESS != code) {
773,983✔
838
    if (pSrcParam0) {
×
839
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
840
    }
841
    if (pSrcParam1) {
×
842
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
843
    }
844
    if (pGcParam0) {
×
845
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
846
    }
847
    if (pGcParam1) {
×
848
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
849
    }
850
    if (*ppParam) {
×
851
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
852
      *ppParam = NULL;
×
853
    }
854
  }
855
  
856
  return code;
773,983✔
857
}
858

859
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
773,983✔
860
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
773,983✔
861
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
773,983✔
862
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
773,983✔
863
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
773,983✔
864
  SOperatorParam*            pParam = NULL;
773,983✔
865
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
773,983✔
866
  if (TSDB_CODE_SUCCESS != code) {
773,983✔
867
    pOperator->pTaskInfo->code = code;
×
868
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
869
  }
870

871
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
773,983✔
872
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
773,983✔
873
  if (*ppRes && (code == 0)) {
773,983✔
874
    code = blockDataCheck(*ppRes);
128,311✔
875
    if (code) {
128,311✔
876
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
877
      pOperator->pTaskInfo->code = code;
×
878
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
879
    }
880
    pPost->isStarted = true;
128,311✔
881
    pStbJoin->execInfo.postBlkNum++;
128,311✔
882
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
128,311✔
883
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
128,311✔
884
  } else {
885
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
645,672✔
886
  }
887
}
773,983✔
888

889

890
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
891
  SOperatorParam* pGcParam = NULL;
×
892
  SOperatorParam* pMergeJoinParam = NULL;
×
893
  int32_t         downstreamId = leftTable ? 0 : 1;
×
894
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
895
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
896

897
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
898

899
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
900
  if (TSDB_CODE_SUCCESS != code) {
×
901
    return code;
×
902
  }
903
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
904
  if (TSDB_CODE_SUCCESS != code) {
×
905
    return code;
×
906
  }
907

908
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
909
}
910

911
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
773,627✔
912
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
773,627✔
913
  int32_t code = 0;
773,627✔
914
  
915
  pPost->isStarted = false;
773,627✔
916
  
917
  if (pStbJoin->basic.batchFetch) {
773,627✔
918
    return TSDB_CODE_SUCCESS;
772,907✔
919
  }
920
  
921
  if (pPost->leftNeedCache) {
720✔
922
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
923
    if (num && --(*num) <= 0) {
×
924
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
925
      if (code) {
×
926
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
927
        QRY_ERR_RET(code);
×
928
      }
929
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
930
    }
931
  }
932
  
933
  if (!pPost->rightNeedCache) {
720✔
934
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
720✔
935
    if (NULL != v) {
720✔
936
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
937
      if (code) {
×
938
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
939
        QRY_ERR_RET(code);
×
940
      }
941
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
942
    }
943
  }
944

945
  return TSDB_CODE_SUCCESS;
720✔
946
}
947

948

949
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
950
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
170,209✔
951
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
170,209✔
952
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
170,209✔
953

954
  if (!pPost->isStarted) {
170,209✔
955
    return TSDB_CODE_SUCCESS;
42,254✔
956
  }
957
  
958
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
127,955✔
959
  
960
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
127,955✔
961
  if (NULL == *ppRes) {
127,955✔
962
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
127,955✔
963
    pPrev->pListHead->readIdx++;
127,955✔
964
  } else {
965
    pInfo->stbJoin.execInfo.postBlkNum++;
×
966
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
967
  }
968

969
  return TSDB_CODE_SUCCESS;
127,955✔
970
}
971

972
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
973
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
1,549,374✔
974
  if (NULL == ppArray) {
1,549,374✔
975
    SArray* pArray = taosArrayInit(10, valSize);
144,691✔
976
    if (NULL == pArray) {
144,691✔
977
      return terrno;
×
978
    }
979
    if (NULL == taosArrayPush(pArray, pVal)) {
289,382✔
980
      taosArrayDestroy(pArray);
×
981
      return terrno;
×
982
    }
983
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
144,691✔
984
      taosArrayDestroy(pArray);      
×
985
      return terrno;
×
986
    }
987
    return TSDB_CODE_SUCCESS;
144,691✔
988
  }
989

990
  if (NULL == taosArrayPush(*ppArray, pVal)) {
2,809,366✔
991
    return terrno;
×
992
  }
993
  
994
  return TSDB_CODE_SUCCESS;
1,404,683✔
995
}
996

997
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
998
  int32_t code = TSDB_CODE_SUCCESS;
720✔
999
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
720✔
1000
  if (NULL == pNum) {
720✔
1001
    uint32_t n = 1;
720✔
1002
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
720✔
1003
    if (code) {
720✔
1004
      return code;
×
1005
    }
1006
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
720✔
1007
    if (code) {
720✔
1008
      return code;
×
1009
    }
1010
    return TSDB_CODE_SUCCESS;
720✔
1011
  }
1012

1013
  switch (*pNum) {
×
1014
    case 0:
×
1015
      break;
×
1016
    case UINT32_MAX:
×
1017
      *pNum = 0;
×
1018
      break;
×
1019
    default:
×
1020
      if (1 == (*pNum)) {
×
1021
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
1022
        if (code) {
×
1023
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
1024
          QRY_ERR_RET(code);
×
1025
        }
1026
      }
1027
      (*pNum)++;
×
1028
      break;
×
1029
  }
1030
  
1031
  return TSDB_CODE_SUCCESS;
×
1032
}
1033

1034

1035
static void freeStbJoinTableList(SStbJoinTableList* pList) {
41,898✔
1036
  if (NULL == pList) {
41,898✔
1037
    return;
×
1038
  }
1039
  taosMemoryFree(pList->pLeftVg);
41,898✔
1040
  taosMemoryFree(pList->pLeftUid);
41,898✔
1041
  taosMemoryFree(pList->pRightVg);
41,898✔
1042
  taosMemoryFree(pList->pRightUid);
41,898✔
1043
  taosMemoryFree(pList);
41,898✔
1044
}
1045

1046
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
42,254✔
1047
  int32_t code = TSDB_CODE_SUCCESS;
42,254✔
1048
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
42,254✔
1049
  if (NULL == pNew) {
42,254✔
1050
    return terrno;
×
1051
  }
1052
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
42,254✔
1053
  if (NULL == pNew->pLeftVg) {
42,254✔
1054
    code = terrno;
×
1055
    freeStbJoinTableList(pNew);
×
1056
    return code;
×
1057
  }
1058
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
42,254✔
1059
  if (NULL == pNew->pLeftUid) {
42,254✔
1060
    code = terrno;
×
1061
    freeStbJoinTableList(pNew);
×
1062
    return code;
×
1063
  }
1064
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
42,254✔
1065
  if (NULL == pNew->pRightVg) {
42,254✔
1066
    code = terrno;
×
1067
    freeStbJoinTableList(pNew);
×
1068
    return code;
×
1069
  }
1070
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
42,254✔
1071
  if (NULL == pNew->pRightUid) {
42,254✔
1072
    code = terrno;
×
1073
    freeStbJoinTableList(pNew);
×
1074
    return code;
×
1075
  }
1076

1077
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
42,254✔
1078
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
42,254✔
1079
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
42,254✔
1080
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
42,254✔
1081

1082
  pNew->readIdx = 0;
42,254✔
1083
  pNew->uidNum = rows;
42,254✔
1084
  pNew->pNext = NULL;
42,254✔
1085
  
1086
  if (pCtx->pListTail) {
42,254✔
1087
    pCtx->pListTail->pNext = pNew;
×
1088
    pCtx->pListTail = pNew;
×
1089
  } else {
1090
    pCtx->pListHead = pNew;
42,254✔
1091
    pCtx->pListTail= pNew;
42,254✔
1092
  }
1093

1094
  return TSDB_CODE_SUCCESS;
42,254✔
1095
}
1096

1097
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
42,254✔
1098
  int32_t                    code = TSDB_CODE_SUCCESS;
42,254✔
1099
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
42,254✔
1100
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
42,254✔
1101
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
42,254✔
1102
  if (NULL == pVg0) {
42,254✔
1103
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1104
  }
1105
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
42,254✔
1106
  if (NULL == pVg1) {
42,254✔
1107
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1108
  }
1109
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
42,254✔
1110
  if (NULL == pUid0) {
42,254✔
1111
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1112
  }
1113
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
42,254✔
1114
  if (NULL == pUid1) {
42,254✔
1115
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
1116
  }
1117

1118
  if (pStbJoin->basic.batchFetch) {
42,254✔
1119
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
816,581✔
1120
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
774,687✔
1121
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
774,687✔
1122
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
774,687✔
1123
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
774,687✔
1124

1125
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
774,687✔
1126
      if (TSDB_CODE_SUCCESS != code) {
774,687✔
1127
        break;
×
1128
      }
1129
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
774,687✔
1130
      if (TSDB_CODE_SUCCESS != code) {
774,687✔
1131
        break;
×
1132
      }
1133
    }
1134
  } else {
1135
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
1,080✔
1136
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
720✔
1137
    
1138
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
720✔
1139
      if (TSDB_CODE_SUCCESS != code) {
720✔
1140
        break;
×
1141
      }
1142
    }
1143
  }
1144

1145
  if (TSDB_CODE_SUCCESS == code) {
42,254✔
1146
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
42,254✔
1147
    if (TSDB_CODE_SUCCESS == code) {
42,254✔
1148
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
42,254✔
1149
    }
1150
  }
1151

1152
_return:
×
1153

1154
  if (TSDB_CODE_SUCCESS != code) {
42,254✔
1155
    pOperator->pTaskInfo->code = code;
×
1156
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1157
  }
1158
}
42,254✔
1159

1160

1161
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
216,981✔
1162
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
216,981✔
1163
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
216,981✔
1164

1165
  if (pStbJoin->basic.batchFetch) {
216,981✔
1166
    return;
216,621✔
1167
  }
1168

1169
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
360✔
1170
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
360✔
1171
    return;
360✔
1172
  }
1173

1174
  uint64_t* pUid = NULL;
×
1175
  int32_t iter = 0;
×
1176
  int32_t code = 0;
×
1177
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1178
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1179
    if (code) {
×
1180
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1181
    }
1182
  }
1183

1184
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1185
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1186

1187
/*
1188
  // debug only
1189
  iter = 0;
1190
  uint32_t* num = NULL;
1191
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1192
    A S S E R T(*num > 1);
1193
  }
1194
*/  
1195
}
1196

1197
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
216,981✔
1198
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
216,981✔
1199
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
216,981✔
1200

1201
  while (true) {
42,254✔
1202
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
259,235✔
1203
    if (NULL == pBlock) {
259,235✔
1204
      break;
216,981✔
1205
    }
1206

1207
    pStbJoin->execInfo.prevBlkNum++;
42,254✔
1208
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
42,254✔
1209
    
1210
    doBuildStbJoinTableHash(pOperator, pBlock);
42,254✔
1211
  }
1212

1213
  postProcessStbJoinTableHash(pOperator);
216,981✔
1214

1215
  pStbJoin->ctx.prev.joinBuild = true;
216,981✔
1216
}
216,981✔
1217

1218
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
170,209✔
1219
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
170,209✔
1220
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
170,209✔
1221
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
170,209✔
1222
  SStbJoinTableList*         pNode = pPrev->pListHead;
170,209✔
1223

1224
  while (pNode) {
857,779✔
1225
    if (pNode->readIdx >= pNode->uidNum) {
815,881✔
1226
      pPrev->pListHead = pNode->pNext;
41,898✔
1227
      freeStbJoinTableList(pNode);
41,898✔
1228
      pNode = pPrev->pListHead;
41,898✔
1229
      continue;
41,898✔
1230
    }
1231
    
1232
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
773,983✔
1233
    if (*ppRes) {
773,983✔
1234
      return TSDB_CODE_SUCCESS;
128,311✔
1235
    }
1236

1237
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
645,672✔
1238
    pPrev->pListHead->readIdx++;
645,672✔
1239
  }
1240

1241
  *ppRes = NULL;
41,898✔
1242
  setOperatorCompleted(pOperator);
41,898✔
1243

1244
  return TSDB_CODE_SUCCESS;
41,898✔
1245
}
1246

1247
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
344,936✔
1248
  if (pBlock) {
344,936✔
1249
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
128,311✔
1250
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
128,311✔
1251
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
128,311✔
1252

1253
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
129,769✔
1254
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
1,458✔
1255
        if (pSlot == NULL) {
1,458✔
1256
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1257
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1258
        }
1259
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
1,458✔
1260
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
1,458✔
1261
        if (code != TSDB_CODE_SUCCESS) {
1,458✔
1262
          return code;
×
1263
        }
1264
        code = blockDataAppendColInfo(pBlock, &colInfo);
1,458✔
1265
        if (code != TSDB_CODE_SUCCESS) {
1,458✔
1266
          return code;
×
1267
        }
1268
      }
1269
    } else {
1270
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1271
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1272
    }
1273
  }
1274
  return TSDB_CODE_SUCCESS;
344,936✔
1275
}
1276

1277
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
350,462✔
1278
  int32_t                    code = TSDB_CODE_SUCCESS;
350,462✔
1279
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
350,462✔
1280
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
350,462✔
1281

1282
  QRY_PARAM_CHECK(pRes);
350,462✔
1283
  if (pOperator->status == OP_EXEC_DONE) {
350,462✔
1284
    return code;
5,526✔
1285
  }
1286

1287
  int64_t st = 0;
344,936✔
1288
  if (pOperator->cost.openCost == 0) {
344,936✔
1289
    st = taosGetTimestampUs();
216,981✔
1290
  }
1291

1292
  if (!pStbJoin->ctx.prev.joinBuild) {
344,936✔
1293
    buildStbJoinTableList(pOperator);
216,981✔
1294
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
216,981✔
1295
      setOperatorCompleted(pOperator);
174,727✔
1296
      goto _return;
174,727✔
1297
    }
1298
  }
1299

1300
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
170,209✔
1301
  if (*pRes) {
170,209✔
1302
    goto _return;
×
1303
  }
1304

1305
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
170,209✔
1306

1307
_return:
170,209✔
1308
  if (pOperator->cost.openCost == 0) {
344,936✔
1309
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
216,981✔
1310
  }
1311

1312
  if (code) {
344,936✔
1313
    qError("%s failed since %s", __func__, tstrerror(code));
×
1314
    pOperator->pTaskInfo->code = code;
×
1315
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1316
  } else {
1317
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
344,936✔
1318
  }
1319
  return code;
344,936✔
1320
}
1321

1322
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
246,786✔
1323
  int32_t                   code = TSDB_CODE_SUCCESS;
246,786✔
1324
  int32_t                   lino = 0;
246,786✔
1325
  SVTableScanOperatorParam* pVScan = NULL;
246,786✔
1326
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
246,786✔
1327
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
246,786✔
1328

1329
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
246,786✔
1330
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
246,786✔
1331

1332
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
246,786✔
1333
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno)
246,786✔
1334
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
246,786✔
1335
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno)
246,786✔
1336
  pVScan->uid = uid;
246,786✔
1337
  pVScan->window = pInfo->vtbScan.window;
246,786✔
1338

1339
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
246,786✔
1340
  (*ppRes)->downstreamIdx = 0;
246,786✔
1341
  (*ppRes)->value = pVScan;
246,786✔
1342
  (*ppRes)->reUse = false;
246,786✔
1343

1344
  return TSDB_CODE_SUCCESS;
246,786✔
1345
_return:
×
1346
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1347
  if (pVScan) {
×
1348
    taosArrayDestroy(pVScan->pOpParamArray);
×
1349
    taosMemoryFreeClear(pVScan);
×
1350
  }
1351
  if (*ppRes) {
×
1352
    taosArrayDestroy((*ppRes)->pChildren);
×
1353
    taosMemoryFreeClear(*ppRes);
×
1354
  }
1355
  return code;
×
1356
}
1357

1358
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
102,285✔
1359
  int32_t                    lino = 0;
102,285✔
1360
  SOperatorInfo*             operator=(SOperatorInfo*) param;
102,285✔
1361
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
102,285✔
1362

1363
  if (TSDB_CODE_SUCCESS != code) {
102,285✔
1364
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1365
    if (operator->pTaskInfo->code != code) {
×
1366
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1367
             tstrerror(operator->pTaskInfo->code));
1368
    } else {
1369
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1370
    }
1371
    goto _return;
×
1372
  }
1373

1374
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
102,285✔
1375
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno)
102,285✔
1376

1377
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
102,285✔
1378
  QUERY_CHECK_CODE(code, lino, _return);
102,285✔
1379

1380
  taosMemoryFreeClear(pMsg->pData);
102,285✔
1381

1382
  code = tsem_post(&pScanResInfo->vtbScan.ready);
102,285✔
1383
  QUERY_CHECK_CODE(code, lino, _return);
102,285✔
1384

1385
  return code;
102,285✔
1386
_return:
×
1387
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1388
  return code;
×
1389
}
1390

1391
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
102,285✔
1392
  int32_t                    code = TSDB_CODE_SUCCESS;
102,285✔
1393
  int32_t                    lino = 0;
102,285✔
1394
  char*                      buf1 = NULL;
102,285✔
1395
  SUseDbReq*                 pReq = NULL;
102,285✔
1396
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
102,285✔
1397

1398
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
102,285✔
1399
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno)
102,285✔
1400
  code = tNameGetFullDbName(name, pReq->db);
102,285✔
1401
  QUERY_CHECK_CODE(code, lino, _return);
102,285✔
1402
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
102,285✔
1403
  buf1 = taosMemoryCalloc(1, contLen);
102,285✔
1404
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno)
102,285✔
1405
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
102,285✔
1406
  if (tempRes < 0) {
102,285✔
1407
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1408
  }
1409

1410
  // send the fetch remote task result request
1411
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
102,285✔
1412
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno)
102,285✔
1413

1414
  pMsgSendInfo->param = pOperator;
102,285✔
1415
  pMsgSendInfo->msgInfo.pData = buf1;
102,285✔
1416
  pMsgSendInfo->msgInfo.len = contLen;
102,285✔
1417
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
102,285✔
1418
  pMsgSendInfo->fp = dynProcessUseDbRsp;
102,285✔
1419
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
102,285✔
1420

1421
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
102,285✔
1422
  QUERY_CHECK_CODE(code, lino, _return);
102,285✔
1423

1424
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
102,285✔
1425
  QUERY_CHECK_CODE(code, lino, _return);
102,285✔
1426

1427
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
102,285✔
1428
  QUERY_CHECK_CODE(code, lino, _return);
102,285✔
1429

1430
_return:
102,285✔
1431
  if (code) {
102,285✔
1432
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1433
     taosMemoryFree(buf1);
×
1434
  }
1435
  taosMemoryFree(pReq);
102,285✔
1436
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
102,285✔
1437
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
102,285✔
1438
  return code;
102,285✔
1439
}
1440

1441
int dynVgInfoComp(const void* lp, const void* rp) {
204,570✔
1442
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
204,570✔
1443
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
204,570✔
1444
  if (pLeft->hashBegin < pRight->hashBegin) {
204,570✔
1445
    return -1;
204,570✔
1446
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1447
    return 1;
×
1448
  }
1449

1450
  return 0;
×
1451
}
1452

1453
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
444,892✔
1454
  if (NULL == dbInfo) {
444,892✔
1455
    return TSDB_CODE_SUCCESS;
×
1456
  }
1457

1458
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
444,892✔
1459
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
102,285✔
1460
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
102,285✔
1461
    if (NULL == dbInfo->vgArray) {
102,285✔
1462
      return terrno;
×
1463
    }
1464

1465
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
102,285✔
1466
    while (pIter) {
306,855✔
1467
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
409,140✔
1468
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1469
        return terrno;
×
1470
      }
1471

1472
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
204,570✔
1473
    }
1474

1475
    taosArraySort(dbInfo->vgArray, sort_func);
102,285✔
1476
  }
1477

1478
  return TSDB_CODE_SUCCESS;
444,892✔
1479
}
1480

1481
int32_t dynHashValueComp(void const* lp, void const* rp) {
702,296✔
1482
  uint32_t*    key = (uint32_t*)lp;
702,296✔
1483
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
702,296✔
1484

1485
  if (*key < pVg->hashBegin) {
702,296✔
1486
    return -1;
×
1487
  } else if (*key > pVg->hashEnd) {
702,296✔
1488
    return 1;
257,404✔
1489
  }
1490

1491
  return 0;
444,892✔
1492
}
1493

1494
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
444,892✔
1495
  int32_t code = 0;
444,892✔
1496
  int32_t lino = 0;
444,892✔
1497
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
444,892✔
1498
  QUERY_CHECK_CODE(code, lino, _return);
444,892✔
1499

1500
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
444,892✔
1501
  if (vgNum <= 0) {
444,892✔
1502
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1503
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1504
  }
1505

1506
  SVgroupInfo* vgInfo = NULL;
444,892✔
1507
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
444,892✔
1508
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
444,892✔
1509
  int32_t offset = (int32_t)strlen(tbFullName);
444,892✔
1510

1511
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
444,892✔
1512
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
889,784✔
1513
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
444,892✔
1514

1515
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
444,892✔
1516
  if (NULL == vgInfo) {
444,892✔
1517
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1518
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1519
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1520
  }
1521

1522
  *vgId = vgInfo->vgId;
444,892✔
1523

1524
_return:
444,892✔
1525
  return code;
444,892✔
1526
}
1527

1528
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
2,379,478✔
1529
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
2,379,478✔
1530
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
2,379,478✔
1531
  SArray *                   pColList = pVtbScan->readColList;
2,379,478✔
1532
  if (pVtbScan->scanAllCols) {
2,379,478✔
1533
    return true;
443,352✔
1534
  }
1535
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
9,901,944✔
1536
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
8,625,480✔
1537
      return true;
659,662✔
1538
    }
1539
  }
1540
  return false;
1,276,464✔
1541
}
1542

1543
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
1,103,014✔
1544
  int32_t                    code = TSDB_CODE_SUCCESS;
1,103,014✔
1545
  int32_t                    line = 0;
1,103,014✔
1546
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,103,014✔
1547
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,103,014✔
1548
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
1,103,014✔
1549
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
1,103,014✔
1550
  SUseDbOutput*              output = NULL;
1,103,014✔
1551
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
1,103,014✔
1552

1553
  QRY_PARAM_CHECK(dbVgInfo);
1,103,014✔
1554

1555
  if (find == NULL) {
1,103,014✔
1556
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
102,285✔
1557
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
102,285✔
1558
    QUERY_CHECK_CODE(code, line, _return);
102,285✔
1559
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
102,285✔
1560
    QUERY_CHECK_CODE(code, line, _return);
102,285✔
1561
  } else {
1562
    output = *find;
1,000,729✔
1563
  }
1564

1565
  *dbVgInfo = output->dbVgroup;
1,103,014✔
1566
  return code;
1,103,014✔
1567
_return:
×
1568
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1569
  freeUseDbOutput(output);
×
1570
  return code;
×
1571
}
1572

1573
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
1,103,014✔
1574
  int32_t     code = TSDB_CODE_SUCCESS;
1,103,014✔
1575
  int32_t     line = 0;
1,103,014✔
1576

1577
  const char *first_dot = strchr(colref, '.');
1,103,014✔
1578
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
1,103,014✔
1579

1580
  const char *second_dot = strchr(first_dot + 1, '.');
1,103,014✔
1581
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
1,103,014✔
1582

1583
  size_t db_len = first_dot - colref;
1,103,014✔
1584
  size_t table_len = second_dot - first_dot - 1;
1,103,014✔
1585
  size_t col_len = strlen(second_dot + 1);
1,103,014✔
1586

1587
  *refDb = taosMemoryMalloc(db_len + 1);
1,103,014✔
1588
  *refTb = taosMemoryMalloc(table_len + 1);
1,103,014✔
1589
  *refCol = taosMemoryMalloc(col_len + 1);
1,103,014✔
1590
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
1,103,014✔
1591
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
1,103,014✔
1592
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
1,103,014✔
1593

1594
  tstrncpy(*refDb, colref, db_len + 1);
1,103,014✔
1595
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
1,103,014✔
1596
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
1,103,014✔
1597

1598
  return TSDB_CODE_SUCCESS;
1,103,014✔
1599
_return:
×
1600
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1601
  if (*refDb) {
×
1602
    taosMemoryFree(*refDb);
×
1603
    *refDb = NULL;
×
1604
  }
1605
  if (*refTb) {
×
1606
    taosMemoryFree(*refTb);
×
1607
    *refTb = NULL;
×
1608
  }
1609
  if (*refCol) {
×
1610
    taosMemoryFree(*refCol);
×
1611
    *refCol = NULL;
×
1612
  }
1613
  return code;
×
1614
}
1615

1616
bool tableInfoNeedCollect(char *dbName, char *tbName, char *expectDbName, char *expectTbName) {
9,610,828✔
1617
  if (strncmp(varDataVal(tbName), expectTbName, varDataLen(tbName)) == 0 &&
9,610,828✔
1618
      strlen(expectTbName) == varDataLen(tbName) &&
3,895,476✔
1619
      strncmp(varDataVal(dbName), expectDbName, varDataLen(dbName)) == 0 &&
3,895,476✔
1620
      strlen(expectDbName) == varDataLen(dbName)) {
3,895,476✔
1621
    return true;
3,895,476✔
1622
  }
1623
  return false;
5,715,352✔
1624
}
1625

1626
int32_t getColRefInfo(SColRefInfo *pInfo, SArray* pDataBlock, int32_t index) {
3,895,476✔
1627
  int32_t          code = TSDB_CODE_SUCCESS;
3,895,476✔
1628
  int32_t          line = 0;
3,895,476✔
1629

1630
  SColumnInfoData *pColNameCol = taosArrayGet(pDataBlock, 3);
3,895,476✔
1631
  SColumnInfoData *pUidCol = taosArrayGet(pDataBlock, 4);
3,895,476✔
1632
  SColumnInfoData *pColIdCol = taosArrayGet(pDataBlock, 5);
3,895,476✔
1633
  SColumnInfoData *pRefCol = taosArrayGet(pDataBlock, 6);
3,895,476✔
1634
  SColumnInfoData *pVgIdCol = taosArrayGet(pDataBlock, 7);
3,895,476✔
1635
  SColumnInfoData *pRefVerCol = taosArrayGet(pDataBlock, 8);
3,895,476✔
1636

1637
  QUERY_CHECK_NULL(pColNameCol, code, line, _return, terrno)
3,895,476✔
1638
  QUERY_CHECK_NULL(pUidCol, code, line, _return, terrno)
3,895,476✔
1639
  QUERY_CHECK_NULL(pColIdCol, code, line, _return, terrno)
3,895,476✔
1640
  QUERY_CHECK_NULL(pRefCol, code, line, _return, terrno)
3,895,476✔
1641
  QUERY_CHECK_NULL(pVgIdCol, code, line, _return, terrno)
3,895,476✔
1642
  QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
3,895,476✔
1643

1644
  if (colDataIsNull_s(pRefCol, index)) {
7,790,952✔
1645
    pInfo->colrefName = NULL;
1,515,998✔
1646
  } else {
1647
    pInfo->colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, index)), 1);
2,379,478✔
1648
    QUERY_CHECK_NULL(pInfo->colrefName, code, line, _return, terrno)
2,379,478✔
1649
    memcpy(pInfo->colrefName, varDataVal(colDataGetData(pRefCol, index)), varDataLen(colDataGetData(pRefCol, index)));
2,379,478✔
1650
    pInfo->colrefName[varDataLen(colDataGetData(pRefCol, index))] = 0;
2,379,478✔
1651
  }
1652

1653
  pInfo->colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, index)), 1);
3,895,476✔
1654
  QUERY_CHECK_NULL(pInfo->colName, code, line, _return, terrno)
3,895,476✔
1655
  memcpy(pInfo->colName, varDataVal(colDataGetData(pColNameCol, index)), varDataLen(colDataGetData(pColNameCol, index)));
3,895,476✔
1656
  pInfo->colName[varDataLen(colDataGetData(pColNameCol, index))] = 0;
3,895,476✔
1657

1658
  if (!colDataIsNull_s(pUidCol, index)) {
7,790,952✔
1659
    GET_TYPED_DATA(pInfo->uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, index), 0);
3,895,476✔
1660
  }
1661
  if (!colDataIsNull_s(pColIdCol, index)) {
7,790,952✔
1662
    GET_TYPED_DATA(pInfo->colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, index), 0);
2,379,478✔
1663
  }
1664
  if (!colDataIsNull_s(pVgIdCol, index)) {
7,790,952✔
1665
    GET_TYPED_DATA(pInfo->vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, index), 0);
3,895,476✔
1666
  }
1667

1668
_return:
×
1669
  return code;
3,895,476✔
1670
}
1671

1672
int32_t processOrgTbVg(SVtbScanDynCtrlInfo* pVtbScan, SExecTaskInfo* pTaskInfo, int32_t rversion) {
121,773✔
1673
  int32_t                    code = TSDB_CODE_SUCCESS;
121,773✔
1674
  int32_t                    line = 0;
121,773✔
1675

1676
  if (pTaskInfo->pStreamRuntimeInfo == NULL) {
121,773✔
1677
    return code;
121,773✔
1678
  }
1679

1680
  if (pVtbScan->existOrgTbVg == NULL) {
×
1681
    pVtbScan->existOrgTbVg = pVtbScan->curOrgTbVg;
×
1682
    pVtbScan->curOrgTbVg = NULL;
×
1683
  }
1684

1685
  if (pVtbScan->curOrgTbVg != NULL) {
×
1686
    // which means rversion has changed
1687
    void*   pCurIter = NULL;
×
1688
    SArray* tmpArray = NULL;
×
1689
    while ((pCurIter = taosHashIterate(pVtbScan->curOrgTbVg, pCurIter))) {
×
1690
      int32_t* vgId = (int32_t*)taosHashGetKey(pCurIter, NULL);
×
1691
      if (taosHashGet(pVtbScan->existOrgTbVg, vgId, sizeof(int32_t)) == NULL) {
×
1692
        if (tmpArray == NULL) {
×
1693
          tmpArray = taosArrayInit(1, sizeof(int32_t));
×
1694
          QUERY_CHECK_NULL(tmpArray, code, line, _return, terrno)
×
1695
        }
1696
        QUERY_CHECK_NULL(taosArrayPush(tmpArray, vgId), code, line, _return, terrno)
×
1697
      }
1698
    }
1699
    if (tmpArray == NULL) {
×
1700
      return TSDB_CODE_SUCCESS;
×
1701
    }
1702
    if (tmpArray != NULL && pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds == NULL) {
×
1703
      SArray* expiredInfo = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
×
1704
      if (expiredInfo && expiredInfo == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, expiredInfo, NULL)) {
×
1705
        for (int32_t i = 0; i < taosArrayGetSize(expiredInfo); i++) {
×
1706
          SStreamTaskAddr* vgInfo = (SStreamTaskAddr*)taosArrayGet(expiredInfo, i);
×
1707
          QUERY_CHECK_NULL(taosArrayPush(tmpArray, &vgInfo->nodeId), code, line, _return, terrno)
×
1708
        }
1709
        taosArrayDestroy(expiredInfo);
×
1710
      }
1711
      if (atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addVgIds, NULL, tmpArray)) {
×
1712
        taosArrayDestroy(tmpArray);
×
1713
      }
1714
    }
1715
    atomic_store_64(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.uid, (int64_t)(pVtbScan->isSuperTable ? pVtbScan->suid : pVtbScan->uid));
×
1716
    (void)atomic_val_compare_exchange_8(pTaskInfo->pStreamRuntimeInfo->vtableDeployGot, 0, 1);
×
1717
    taosHashClear(pVtbScan->curOrgTbVg);
×
1718
    pVtbScan->needRedeploy = true;
×
1719
    pVtbScan->rversion = rversion;
×
1720
    return TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
×
1721
  }
1722
  return code;
×
1723
_return:
×
1724
  qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1725
  return code;
×
1726
}
1727

1728
int32_t getVgIdFromColref(SOperatorInfo* pOperator, const char* colRef, int32_t* vgId) {
×
1729
  int32_t                    code =TSDB_CODE_SUCCESS;
×
1730
  int32_t                    line = 0;
×
1731
  char*                      refDbName = NULL;
×
1732
  char*                      refTbName = NULL;
×
1733
  char*                      refColName = NULL;
×
1734
  SDBVgInfo*                 dbVgInfo = NULL;
×
1735
  SName                      name = {0};
×
1736
  char                       dbFname[TSDB_DB_FNAME_LEN] = {0};
×
1737
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1738

1739
  code = extractColRefName(colRef, &refDbName, &refTbName, &refColName);
×
1740
  QUERY_CHECK_CODE(code, line, _return);
×
1741

1742
  toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
×
1743

1744
  code = getDbVgInfo(pOperator, &name, &dbVgInfo);
×
1745
  QUERY_CHECK_CODE(code, line, _return);
×
1746

1747
  code = tNameGetFullDbName(&name, dbFname);
×
1748
  QUERY_CHECK_CODE(code, line, _return);
×
1749

1750
  code = getVgId(dbVgInfo, dbFname, vgId, name.tname);
×
1751
  QUERY_CHECK_CODE(code, line, _return);
×
1752

1753
_return:
×
1754
  taosMemoryFree(refDbName);
×
1755
  taosMemoryFree(refTbName);
×
1756
  taosMemoryFree(refColName);
×
1757
  return code;
×
1758
}
1759

1760
int32_t buildVirtualSuperTableScanChildTableMap(SOperatorInfo* pOperator) {
121,773✔
1761
  int32_t                    code = TSDB_CODE_SUCCESS;
121,773✔
1762
  int32_t                    line = 0;
121,773✔
1763
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
121,773✔
1764
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
121,773✔
1765
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
121,773✔
1766
  SArray*                    pColRefArray = NULL;
121,773✔
1767
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
121,773✔
1768

1769
  pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
121,773✔
1770
  QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno)
121,773✔
1771

1772
  while (true) {
242,710✔
1773
    SSDataBlock *pChildInfo = NULL;
364,483✔
1774
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pChildInfo);
364,483✔
1775
    QUERY_CHECK_CODE(code, line, _return);
364,483✔
1776
    if (pChildInfo == NULL) {
364,483✔
1777
      break;
121,773✔
1778
    }
1779
    SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
242,710✔
1780
    SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
242,710✔
1781
    SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
242,710✔
1782

1783
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
242,710✔
1784
    QUERY_CHECK_NULL(pStbNameCol, code, line, _return, terrno)
242,710✔
1785
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
242,710✔
1786

1787
    for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
9,853,538✔
1788
      if (!colDataIsNull_s(pStbNameCol, i)) {
19,221,656✔
1789
        char* stbrawname = colDataGetData(pStbNameCol, i);
9,610,828✔
1790
        char* dbrawname = colDataGetData(pDbNameCol, i);
9,610,828✔
1791
        char *ctbName = colDataGetData(pTableNameCol, i);
9,610,828✔
1792

1793
        if (tableInfoNeedCollect(dbrawname, stbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
9,610,828✔
1794
          SColRefInfo info = {0};
3,895,476✔
1795
          code = getColRefInfo(&info, pChildInfo->pDataBlock, i);
3,895,476✔
1796
          QUERY_CHECK_CODE(code, line, _return);
3,895,476✔
1797

1798
          if (pInfo->vtbScan.dynTbUid != 0 && info.uid != pInfo->vtbScan.dynTbUid) {
3,895,476✔
1799
            qTrace("dynQueryCtrl tb uid filter, info uid:%" PRIu64 ", dyn tb uid:%" PRIu64, info.uid,
×
1800
                   pInfo->vtbScan.dynTbUid);
1801

1802
            destroyColRefInfo(&info);
×
1803
            continue;
×
1804
          }
1805

1806
          if (pTaskInfo->pStreamRuntimeInfo) {
3,895,476✔
1807
            if (pVtbScan->curOrgTbVg == NULL) {
×
1808
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
×
1809
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
×
1810
            }
1811

1812
            if (info.colrefName) {
×
1813
              int32_t vgId;
×
1814
              code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
×
1815
              QUERY_CHECK_CODE(code, line, _return);
×
1816
              code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
×
1817
              QUERY_CHECK_CODE(code, line, _return);
×
1818
            }
1819

1820
          }
1821

1822
          if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
3,895,476✔
1823
            pColRefArray = taosArrayInit(1, sizeof(SColRefInfo));
246,786✔
1824
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
246,786✔
1825
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
493,572✔
1826
            int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
246,786✔
1827
            QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno)
493,572✔
1828
            code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
246,786✔
1829
            QUERY_CHECK_CODE(code, line, _return);
246,786✔
1830
          } else {
1831
            int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
3,648,690✔
1832
            QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno)
3,648,690✔
1833
            pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
3,648,690✔
1834
            QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno)
3,648,690✔
1835
            QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &info), code, line, _return, terrno)
7,297,380✔
1836
          }
1837
        }
1838
      }
1839
    }
1840
  }
1841

1842
  code = processOrgTbVg(pVtbScan, pTaskInfo, 1);
121,773✔
1843
  QUERY_CHECK_CODE(code, line, _return);
121,773✔
1844

1845
_return:
121,773✔
1846
  if (code) {
121,773✔
1847
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1848
  }
1849
  return code;
121,773✔
1850
}
1851

1852
int32_t buildVirtualNormalChildTableScanChildTableMap(SOperatorInfo* pOperator) {
×
1853
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1854
  int32_t                    line = 0;
×
1855
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1856
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1857
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1858
  SArray*                    pColRefInfo = pInfo->vtbScan.colRefInfo;
×
1859
  SOperatorInfo*             pSystableScanOp = pOperator->pDownstream[1];
×
1860
  int32_t                    rversion = 0;
×
1861

1862
  pInfo->vtbScan.colRefInfo = taosArrayInit(1, sizeof(SColRefInfo));
×
1863
  QUERY_CHECK_NULL(pInfo->vtbScan.colRefInfo, code, line, _return, terrno)
×
1864

1865
  while (true) {
×
1866
    SSDataBlock *pTableInfo = NULL;
×
1867
    code = pSystableScanOp->fpSet.getNextFn(pSystableScanOp, &pTableInfo);
×
1868
    if (pTableInfo == NULL) {
×
1869
      break;
×
1870
    }
1871

1872
    SColumnInfoData *pTableNameCol = taosArrayGet(pTableInfo->pDataBlock, 0);
×
1873
    SColumnInfoData *pDbNameCol = taosArrayGet(pTableInfo->pDataBlock, 2);
×
1874
    SColumnInfoData *pRefVerCol = taosArrayGet(pTableInfo->pDataBlock, 8);
×
1875

1876
    QUERY_CHECK_NULL(pTableNameCol, code, line, _return, terrno)
×
1877
    QUERY_CHECK_NULL(pDbNameCol, code, line, _return, terrno)
×
1878
    QUERY_CHECK_NULL(pRefVerCol, code, line, _return, terrno)
×
1879

1880
    for (int32_t i = 0; i < pTableInfo->info.rows; i++) {
×
1881
      if (!colDataIsNull_s(pRefVerCol, i)) {
×
1882
        GET_TYPED_DATA(rversion, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pRefVerCol, i), 0);
×
1883
      }
1884

1885
      if (!colDataIsNull_s(pTableNameCol, i)) {
×
1886
        char* tbrawname = colDataGetData(pTableNameCol, i);
×
1887
        char* dbrawname = colDataGetData(pDbNameCol, i);
×
1888
        QUERY_CHECK_NULL(tbrawname, code, line, _return, terrno)
×
1889
        QUERY_CHECK_NULL(dbrawname, code, line, _return, terrno)
×
1890

1891
        if (tableInfoNeedCollect(dbrawname, tbrawname, pInfo->vtbScan.dbName, pInfo->vtbScan.tbName)) {
×
1892
          SColRefInfo info = {0};
×
1893
          code = getColRefInfo(&info, pTableInfo->pDataBlock, i);
×
1894
          QUERY_CHECK_CODE(code, line, _return);
×
1895

1896
          if ((rversion != pVtbScan->rversion || pVtbScan->existOrgTbVg == NULL) && info.colrefName) {
×
1897
            if (pVtbScan->curOrgTbVg == NULL) {
×
1898
              pVtbScan->curOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
×
1899
              QUERY_CHECK_NULL(pVtbScan->curOrgTbVg, code, line, _return, terrno)
×
1900
            }
1901
            int32_t vgId;
×
1902
            code = getVgIdFromColref(pOperator, info.colrefName, &vgId);
×
1903
            QUERY_CHECK_CODE(code, line, _return);
×
1904
            code = taosHashPut(pVtbScan->curOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
×
1905
            QUERY_CHECK_CODE(code, line, _return);
×
1906
          }
1907

1908
          QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.colRefInfo, &info), code, line, _return, terrno)
×
1909
        }
1910
      }
1911
    }
1912
  }
1913
  code = processOrgTbVg(pVtbScan, pTaskInfo, rversion);
×
1914
  QUERY_CHECK_CODE(code, line, _return);
×
1915

1916
_return:
×
1917
  if (code) {
×
1918
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1919
  }
1920
  return code;
×
1921
}
1922

1923
int32_t virtualTableScanCheckNeedRedeploy(SOperatorInfo* pOperator) {
×
1924
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1925
  int32_t                    line = 0;
×
1926
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1927
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1928
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1929

1930
  SArray *tmpArray = NULL;
×
1931
  tmpArray = atomic_load_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo);
×
1932
  if (tmpArray && tmpArray == atomic_val_compare_exchange_ptr(&pTaskInfo->pStreamRuntimeInfo->vtableDeployInfo.addedVgInfo, tmpArray, NULL)) {
×
1933
    for (int32_t i = 0; i < taosArrayGetSize(tmpArray); i++) {
×
1934
      SStreamTaskAddr* pTaskAddr = (SStreamTaskAddr*)taosArrayGet(tmpArray, i);
×
1935
      code = taosHashPut(pVtbScan->existOrgTbVg, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), NULL, 0);
×
1936
      QUERY_CHECK_CODE(code, line, _return);
×
1937
      if (pVtbScan->newAddedVgInfo == NULL) {
×
1938
        pVtbScan->newAddedVgInfo = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
×
1939
        QUERY_CHECK_NULL(pVtbScan->newAddedVgInfo, code, line, _return, terrno)
×
1940
      }
1941
      code = taosHashPut(pVtbScan->newAddedVgInfo, &pTaskAddr->nodeId, sizeof(pTaskAddr->nodeId), pTaskAddr, sizeof(SStreamTaskAddr));
×
1942
      QUERY_CHECK_CODE(code, line, _return);
×
1943
    }
1944
    pVtbScan->needRedeploy = false;
×
1945
  } else {
1946
    code = TSDB_CODE_STREAM_VTABLE_NEED_REDEPLOY;
×
1947
    QUERY_CHECK_CODE(code, line, _return);
×
1948
  }
1949

1950
_return:
×
1951
  taosArrayClear(tmpArray);
×
1952
  taosArrayDestroy(tmpArray);
×
1953
  if (code) {
×
1954
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
1955
  }
1956
  return code;
×
1957
}
1958

1959
int32_t virtualTableScanProcessColRefInfo(SOperatorInfo* pOperator, SArray* pColRefInfo, tb_uid_t* uid, int32_t* vgId) {
246,786✔
1960
  int32_t                    code = TSDB_CODE_SUCCESS;
246,786✔
1961
  int32_t                    line = 0;
246,786✔
1962
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
246,786✔
1963
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
246,786✔
1964
  SDBVgInfo*                 dbVgInfo = NULL;
246,786✔
1965

1966
  for (int32_t j = 0; j < taosArrayGetSize(pColRefInfo); j++) {
4,142,262✔
1967
    SColRefInfo *pKV = (SColRefInfo*)taosArrayGet(pColRefInfo, j);
3,895,476✔
1968
    *uid = pKV->uid;
3,895,476✔
1969
    *vgId = pKV->vgId;
3,895,476✔
1970
    if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
3,895,476✔
1971
      char*   refDbName = NULL;
1,103,014✔
1972
      char*   refTbName = NULL;
1,103,014✔
1973
      char*   refColName = NULL;
1,103,014✔
1974
      SName   name = {0};
1,103,014✔
1975
      char    dbFname[TSDB_DB_FNAME_LEN] = {0};
1,103,014✔
1976
      char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
1,103,014✔
1977

1978
      code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
1,103,014✔
1979
      QUERY_CHECK_CODE(code, line, _return);
1,103,014✔
1980

1981
      toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
1,103,014✔
1982

1983
      code = getDbVgInfo(pOperator, &name, &dbVgInfo);
1,103,014✔
1984
      QUERY_CHECK_CODE(code, line, _return);
1,103,014✔
1985
      code = tNameGetFullDbName(&name, dbFname);
1,103,014✔
1986
      QUERY_CHECK_CODE(code, line, _return);
1,103,014✔
1987
      code = tNameGetFullTableName(&name, orgTbFName);
1,103,014✔
1988
      QUERY_CHECK_CODE(code, line, _return);
1,103,014✔
1989

1990
      void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
1,103,014✔
1991
      if (!pVal) {
1,103,014✔
1992
        SOrgTbInfo map = {0};
444,892✔
1993
        code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
444,892✔
1994
        QUERY_CHECK_CODE(code, line, _return);
444,892✔
1995
        tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
444,892✔
1996
        map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
444,892✔
1997
        QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno)
444,892✔
1998
        SColIdNameKV colIdNameKV = {0};
444,892✔
1999
        colIdNameKV.colId = pKV->colId;
444,892✔
2000
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
444,892✔
2001
        QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno)
889,784✔
2002
        code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
444,892✔
2003
        QUERY_CHECK_CODE(code, line, _return);
444,892✔
2004
      } else {
2005
        SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
658,122✔
2006
        SColIdNameKV colIdNameKV = {0};
658,122✔
2007
        colIdNameKV.colId = pKV->colId;
658,122✔
2008
        tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
658,122✔
2009
        QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno)
1,316,244✔
2010
      }
2011
      taosMemoryFree(refDbName);
1,103,014✔
2012
      taosMemoryFree(refTbName);
1,103,014✔
2013
      taosMemoryFree(refColName);
1,103,014✔
2014
    }
2015
  }
2016

2017
_return:
246,786✔
2018
  if (code) {
246,786✔
2019
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2020
  }
2021
  return code;
246,786✔
2022
}
2023

2024
int32_t virtualTableScanBuildDownStreamOpParam(SOperatorInfo* pOperator, tb_uid_t uid, int32_t vgId) {
246,786✔
2025
  int32_t                    code = TSDB_CODE_SUCCESS;
246,786✔
2026
  int32_t                    line = 0;
246,786✔
2027
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
246,786✔
2028
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
246,786✔
2029
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
246,786✔
2030

2031
  pVtbScan->vtbScanParam = NULL;
246,786✔
2032
  code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
246,786✔
2033
  QUERY_CHECK_CODE(code, line, _return);
246,786✔
2034

2035
  void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
246,786✔
2036
  while (pIter != NULL) {
691,678✔
2037
    SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
444,892✔
2038
    SOperatorParam*  pExchangeParam = NULL;
444,892✔
2039
    SStreamTaskAddr* addr = taosHashGet(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
444,892✔
2040
    if (addr != NULL) {
444,892✔
2041
      code = buildExchangeOperatorParamForVScanEx(&pExchangeParam, 0, pMap, pTaskInfo->id.taskId, addr);
×
2042
      QUERY_CHECK_CODE(code, line, _return);
×
2043
      code = taosHashRemove(pVtbScan->newAddedVgInfo, &pMap->vgId, sizeof(pMap->vgId));
×
2044
      QUERY_CHECK_CODE(code, line, _return);
×
2045
    } else {
2046
      code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
444,892✔
2047
      QUERY_CHECK_CODE(code, line, _return);
444,892✔
2048
    }
2049
    QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno)
889,784✔
2050
    pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
444,892✔
2051
  }
2052

2053
  SOperatorParam*  pExchangeParam = NULL;
246,786✔
2054
  code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
246,786✔
2055
  QUERY_CHECK_CODE(code, line, _return);
246,786✔
2056
  ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
246,786✔
2057

2058
_return:
246,786✔
2059
  if (code) {
246,786✔
2060
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2061
  }
2062
  return code;
246,786✔
2063
}
2064

2065
int32_t virtualTableScanGetNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,071,359✔
2066
  int32_t                    code = TSDB_CODE_SUCCESS;
1,071,359✔
2067
  int32_t                    line = 0;
1,071,359✔
2068
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,071,359✔
2069
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,071,359✔
2070
  SOperatorInfo*             pVtbScanOp = pOperator->pDownstream[0];
1,071,359✔
2071

2072
  pVtbScan->orgTbVgColMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
1,071,359✔
2073
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno)
1,071,359✔
2074
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
1,071,359✔
2075

2076
  while (true) {
2077
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
1,196,372✔
2078
      code = pVtbScanOp->fpSet.getNextFn(pVtbScanOp, pRes);
949,586✔
2079
      QUERY_CHECK_CODE(code, line, _return);
949,586✔
2080
    } else {
2081
      taosHashClear(pVtbScan->orgTbVgColMap);
246,786✔
2082
      SArray* pColRefInfo = NULL;
246,786✔
2083
      if (pVtbScan->isSuperTable) {
246,786✔
2084
        pColRefInfo = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
246,786✔
2085
      } else {
2086
        pColRefInfo = pInfo->vtbScan.colRefInfo;
×
2087
      }
2088
      QUERY_CHECK_NULL(pColRefInfo, code, line, _return, terrno)
246,786✔
2089

2090
      tb_uid_t uid = 0;
246,786✔
2091
      int32_t  vgId = 0;
246,786✔
2092
      code = virtualTableScanProcessColRefInfo(pOperator, pColRefInfo, &uid, &vgId);
246,786✔
2093
      QUERY_CHECK_CODE(code, line, _return);
246,786✔
2094

2095
      code = virtualTableScanBuildDownStreamOpParam(pOperator, uid, vgId);
246,786✔
2096
      QUERY_CHECK_CODE(code, line, _return);
246,786✔
2097

2098
      // reset downstream operator's status
2099
      pVtbScanOp->status = OP_NOT_OPENED;
246,786✔
2100
      code = pVtbScanOp->fpSet.getNextExtFn(pVtbScanOp, pVtbScan->vtbScanParam, pRes);
246,786✔
2101
      QUERY_CHECK_CODE(code, line, _return);
246,786✔
2102
    }
2103

2104
    if (*pRes) {
1,196,372✔
2105
      // has result, still read data from this table.
2106
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
949,586✔
2107
      break;
949,586✔
2108
    } else {
2109
      // no result, read next table.
2110
      pVtbScan->curTableIdx++;
246,786✔
2111
      if (pVtbScan->isSuperTable) {
246,786✔
2112
        if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
246,786✔
2113
          setOperatorCompleted(pOperator);
121,773✔
2114
          break;
121,773✔
2115
        }
2116
      } else {
2117
        setOperatorCompleted(pOperator);
×
2118
        break;
×
2119
      }
2120
    }
2121
  }
2122

2123
_return:
1,071,359✔
2124
  taosHashCleanup(pVtbScan->orgTbVgColMap);
1,071,359✔
2125
  pVtbScan->orgTbVgColMap = NULL;
1,071,359✔
2126
  if (code) {
1,071,359✔
2127
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2128
  }
2129
  return code;
1,071,359✔
2130
}
2131

2132
int32_t vtbScanOpen(SOperatorInfo* pOperator) {
1,071,359✔
2133
  int32_t                    code = TSDB_CODE_SUCCESS;
1,071,359✔
2134
  int32_t                    line = 0;
1,071,359✔
2135
  int64_t                    st = 0;
1,071,359✔
2136
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,071,359✔
2137
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,071,359✔
2138

2139
  if (OPTR_IS_OPENED(pOperator)) {
1,071,359✔
2140
    return code;
949,586✔
2141
  }
2142

2143
  if (pOperator->cost.openCost == 0) {
121,773✔
2144
    st = taosGetTimestampUs();
121,773✔
2145
  }
2146

2147
  if (pVtbScan->isSuperTable) {
121,773✔
2148
    code = buildVirtualSuperTableScanChildTableMap(pOperator);
121,773✔
2149
    QUERY_CHECK_CODE(code, line, _return);
121,773✔
2150
  } else {
2151
    code = buildVirtualNormalChildTableScanChildTableMap(pOperator);
×
2152
    QUERY_CHECK_CODE(code, line, _return);
×
2153
  }
2154

2155
  OPTR_SET_OPENED(pOperator);
121,773✔
2156

2157
_return:
121,773✔
2158
  if (pOperator->cost.openCost == 0) {
121,773✔
2159
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
121,773✔
2160
  }
2161
  if (code) {
121,773✔
2162
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2163
    pOperator->pTaskInfo->code = code;
×
2164
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2165
  }
2166
  return code;
121,773✔
2167
}
2168

2169
int32_t vtbScanNext(SOperatorInfo* pOperator, SSDataBlock** pRes) {
1,071,359✔
2170
  int32_t                    code = TSDB_CODE_SUCCESS;
1,071,359✔
2171
  int32_t                    line = 0;
1,071,359✔
2172
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,071,359✔
2173
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
1,071,359✔
2174

2175
  QRY_PARAM_CHECK(pRes);
1,071,359✔
2176
  if (pOperator->status == OP_EXEC_DONE && !pOperator->pOperatorGetParam) {
1,071,359✔
2177
    return code;
×
2178
  }
2179
  if (pOperator->pOperatorGetParam) {
1,071,359✔
2180
    if (pOperator->status == OP_EXEC_DONE) {
×
2181
      pOperator->status = OP_OPENED;
×
2182
    }
2183
    pVtbScan->curTableIdx = 0;
×
2184
    pVtbScan->lastTableIdx = -1;
×
2185
    pVtbScan->window = ((SDynQueryCtrlOperatorParam *)(pOperator->pOperatorGetParam)->value)->window;
×
2186
    pOperator->pOperatorGetParam = NULL;
×
2187
  } else {
2188
    pVtbScan->window.skey = INT64_MAX;
1,071,359✔
2189
    pVtbScan->window.ekey = INT64_MIN;
1,071,359✔
2190
  }
2191

2192
  if (pVtbScan->needRedeploy) {
1,071,359✔
2193
    code = virtualTableScanCheckNeedRedeploy(pOperator);
×
2194
    QUERY_CHECK_CODE(code, line, _return);
×
2195
  }
2196

2197
  code = pOperator->fpSet._openFn(pOperator);
1,071,359✔
2198
  QUERY_CHECK_CODE(code, line, _return);
1,071,359✔
2199

2200
  if (pVtbScan->isSuperTable && taosArrayGetSize(pVtbScan->childTableList) == 0) {
1,071,359✔
2201
    setOperatorCompleted(pOperator);
×
2202
    return code;
×
2203
  }
2204

2205
  code = virtualTableScanGetNext(pOperator, pRes);
1,071,359✔
2206
  QUERY_CHECK_CODE(code, line, _return);
1,071,359✔
2207

2208
  return code;
1,071,359✔
2209

2210
_return:
×
2211
  if (code) {
×
2212
    qError("%s failed since %s, line %d", __func__, tstrerror(code), line);
×
2213
    pOperator->pTaskInfo->code = code;
×
2214
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
2215
  }
2216
  return code;
×
2217
}
2218

2219
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
216,981✔
2220
  if (batchFetch) {
216,981✔
2221
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
216,621✔
2222
    if (NULL == pPrev->leftHash) {
216,621✔
2223
      return terrno;
×
2224
    }
2225
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
216,621✔
2226
    if (NULL == pPrev->rightHash) {
216,621✔
2227
      return terrno;
×
2228
    }
2229
  } else {
2230
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
360✔
2231
    if (NULL == pPrev->leftCache) {
360✔
2232
      return terrno;
×
2233
    }
2234
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
360✔
2235
    if (NULL == pPrev->rightCache) {
360✔
2236
      return terrno;
×
2237
    }
2238
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
360✔
2239
    if (NULL == pPrev->onceTable) {
360✔
2240
      return terrno;
×
2241
    }
2242
  }
2243

2244
  return TSDB_CODE_SUCCESS;
216,981✔
2245
}
2246

2247
static void updateDynTbUidIfNeeded(SVtbScanDynCtrlInfo* pVtbScan, SStreamRuntimeInfo* pStreamRuntimeInfo) {
×
2248
  if (pStreamRuntimeInfo == NULL) {
×
2249
    return;
×
2250
  }
2251

2252
  SArray* vals = pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
2253
  for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
2254
    SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
2255
    if (pValue != NULL && pValue->isTbname && pValue->uid != pVtbScan->dynTbUid) {
×
2256
      qTrace("dynQueryCtrl dyn tb uid:%" PRIu64 " reset to:%" PRIu64, pVtbScan->dynTbUid, pValue->uid);
×
2257

2258
      pVtbScan->dynTbUid = pValue->uid;
×
2259
      break;
×
2260
    }
2261
  }
2262
}
2263

2264
static int32_t initVtbScanInfo(SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
121,773✔
2265
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
2266
  int32_t      code = TSDB_CODE_SUCCESS;
121,773✔
2267
  int32_t      line = 0;
121,773✔
2268

2269
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
121,773✔
2270
  QUERY_CHECK_CODE(code, line, _return);
121,773✔
2271

2272
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
121,773✔
2273
  pInfo->vtbScan.isSuperTable = pPhyciNode->vtbScan.isSuperTable;
121,773✔
2274
  pInfo->vtbScan.rversion = pPhyciNode->vtbScan.rversion;
121,773✔
2275
  pInfo->vtbScan.uid = pPhyciNode->vtbScan.uid;
121,773✔
2276
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
121,773✔
2277
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
121,773✔
2278
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
121,773✔
2279
  pInfo->vtbScan.needRedeploy = false;
121,773✔
2280
  pInfo->vtbScan.pMsgCb = pMsgCb;
121,773✔
2281
  pInfo->vtbScan.curTableIdx = 0;
121,773✔
2282
  pInfo->vtbScan.lastTableIdx = -1;
121,773✔
2283
  pInfo->vtbScan.dynTbUid = 0;
121,773✔
2284
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
121,773✔
2285
  pInfo->vtbScan.tbName = taosStrdup(pPhyciNode->vtbScan.tbName);
121,773✔
2286
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno)
121,773✔
2287
  QUERY_CHECK_NULL(pInfo->vtbScan.tbName, code, line, _return, terrno)
121,773✔
2288
  pInfo->vtbScan.existOrgTbVg = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
121,773✔
2289
  QUERY_CHECK_NULL(pInfo->vtbScan.existOrgTbVg, code, line, _return, terrno)
121,773✔
2290
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pOrgVgIds); ++i) {
365,319✔
2291
    SValueNode* valueNode = (SValueNode*)nodesListGetNode(pPhyciNode->vtbScan.pOrgVgIds, i);
243,546✔
2292
    int32_t vgId = (int32_t)valueNode->datum.i;
243,546✔
2293
    code = taosHashPut(pInfo->vtbScan.existOrgTbVg, &vgId, sizeof(vgId), NULL, 0);
243,546✔
2294
    QUERY_CHECK_CODE(code, line, _return);
243,546✔
2295
  }
2296

2297
  if (pPhyciNode->dynTbname && pTaskInfo) {
121,773✔
2298
    updateDynTbUidIfNeeded(&pInfo->vtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2299
  }
2300

2301
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
121,773✔
2302
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno)
121,773✔
2303

2304
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
907,401✔
2305
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
785,628✔
2306
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno)
785,628✔
2307
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno)
1,571,256✔
2308
  }
2309

2310
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
121,773✔
2311
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno)
121,773✔
2312

2313
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
121,773✔
2314
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno)
121,773✔
2315

2316
  return code;
121,773✔
2317
_return:
×
2318
  // no need to destroy array and hashmap allocated in this function,
2319
  // since the operator's destroy function will take care of it
2320
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2321
  return code;
×
2322
}
2323

2324
static int32_t initVtbWindowInfo(SDynQueryCtrlOperatorInfo* pInfo, SDynQueryCtrlPhysiNode* pPhyciNode,
×
2325
                                 SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) {
2326
  int32_t              code = TSDB_CODE_SUCCESS;
×
2327
  int32_t              line = 0;
×
2328
  SDataBlockDescNode*  pDescNode = pPhyciNode->node.pOutputDataBlockDesc;
×
2329

2330
  pInfo->vtbWindow.wstartSlotId = pPhyciNode->vtbWindow.wstartSlotId;
×
2331
  pInfo->vtbWindow.wendSlotId = pPhyciNode->vtbWindow.wendSlotId;
×
2332
  pInfo->vtbWindow.wdurationSlotId = pPhyciNode->vtbWindow.wdurationSlotId;
×
2333
  pInfo->vtbWindow.pTargets = pPhyciNode->vtbWindow.pTargets;
×
2334
  pInfo->vtbWindow.isVstb = pPhyciNode->vtbWindow.isVstb;
×
2335
  pInfo->vtbWindow.singleWinMode = pPhyciNode->vtbWindow.singleWinMode;
×
2336
  pInfo->vtbWindow.extendOption = pPhyciNode->vtbWindow.extendOption;
×
2337

2338
  pInfo->vtbWindow.pRes = createDataBlockFromDescNode(pDescNode);
×
2339
  QUERY_CHECK_NULL(pInfo->vtbWindow.pRes, code, line, _return, terrno)
×
2340

2341
  pInfo->vtbWindow.pWins = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
×
2342
  QUERY_CHECK_NULL(pInfo->vtbWindow.pWins, code, line, _return, terrno)
×
2343

2344
  pInfo->vtbWindow.outputWstartSlotId = -1;
×
2345
  pInfo->vtbWindow.outputWendSlotId = -1;
×
2346
  pInfo->vtbWindow.outputWdurationSlotId = -1;
×
2347
  pInfo->vtbWindow.curWinBatchIdx = 0;
×
2348

2349
  initResultSizeInfo(&pOperator->resultInfo, 1);
×
2350
  code = blockDataEnsureCapacity(pInfo->vtbWindow.pRes, pOperator->resultInfo.capacity);
×
2351
  QUERY_CHECK_CODE(code, line, _return);
×
2352

2353
  return code;
×
2354
_return:
×
2355
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
2356
  return code;
×
2357
}
2358

2359
static int32_t extractTsCol(SSDataBlock* pBlock, int32_t slotId, TSKEY** ppTsCols) {
×
2360
  int32_t code = TSDB_CODE_SUCCESS;
×
2361
  int32_t lino = 0;
×
2362

2363
  if (pBlock->pDataBlock != NULL && pBlock->info.dataLoad) {
×
2364
    SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, slotId);
×
2365
    QUERY_CHECK_NULL(pColDataInfo, code, lino, _return, terrno)
×
2366

2367
    *ppTsCols = (int64_t*)pColDataInfo->pData;
×
2368

2369
    if ((*ppTsCols)[0] != 0 && (pBlock->info.window.skey == 0 && pBlock->info.window.ekey == 0)) {
×
2370
      code = blockDataUpdateTsWindow(pBlock, slotId);
×
2371
      QUERY_CHECK_CODE(code, lino, _return);
×
2372
    }
2373
  }
2374

2375
  return code;
×
2376
_return:
×
2377
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2378
  return code;
×
2379
}
2380

2381
static int32_t buildExternalWindowOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
2382
  int32_t                       code = TSDB_CODE_SUCCESS;
×
2383
  int32_t                       lino = 0;
×
2384
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2385

2386
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2387
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2388

2389
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
2390
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2391

2392
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
2393
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2394

2395
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
2396
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2397

2398
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
2399
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2400

2401
  SOperatorParam* pExchangeOperator = NULL;
×
2402
  code = buildExchangeOperatorParamForExternalWindow(&pExchangeOperator, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
2403
  QUERY_CHECK_CODE(code, lino, _return);
×
2404
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExchangeOperator), code, lino, _return, terrno)
×
2405

2406
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
2407
  (*ppRes)->downstreamIdx = idx;
×
2408
  (*ppRes)->value = pExtWinOp;
×
2409
  (*ppRes)->reUse = false;
×
2410

2411
  return code;
×
2412
_return:
×
2413
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2414
  if (pExtWinOp) {
×
2415
    if (pExtWinOp->ExtWins) {
×
2416
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2417
    }
2418
    taosMemoryFree(pExtWinOp);
×
2419
  }
2420
  if (*ppRes) {
×
2421
    if ((*ppRes)->pChildren) {
×
2422
      taosArrayDestroy((*ppRes)->pChildren);
×
2423
    }
2424
    taosMemoryFree(*ppRes);
×
2425
    *ppRes = NULL;
×
2426
  }
2427
  return code;
×
2428
}
2429

2430
static int32_t buildMergeOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins,
×
2431
                                       int32_t numOfDownstream, int32_t numOfWins) {
2432
  int32_t                   code = TSDB_CODE_SUCCESS;
×
2433
  int32_t                   lino = 0;
×
2434
  SMergeOperatorParam*      pMergeOp = NULL;
×
2435

2436
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2437
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2438

2439
  (*ppRes)->pChildren = taosArrayInit(numOfDownstream, POINTER_BYTES);
×
2440
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2441

2442
  pMergeOp = taosMemoryMalloc(sizeof(SMergeOperatorParam));
×
2443
  QUERY_CHECK_NULL(pMergeOp, code, lino, _return, terrno)
×
2444

2445
  pMergeOp->winNum = numOfWins;
×
2446

2447
  for (int32_t i = 0; i < numOfDownstream; i++) {
×
2448
    SOperatorParam* pExternalWinParam = NULL;
×
2449
    code = buildExternalWindowOperatorParam(pInfo, &pExternalWinParam, pWins, i);
×
2450
    QUERY_CHECK_CODE(code, lino, _return);
×
2451
    QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pExternalWinParam), code, lino, _return, terrno)
×
2452
  }
2453

2454
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
×
2455
  (*ppRes)->downstreamIdx = 0;
×
2456
  (*ppRes)->value = pMergeOp;
×
2457
  (*ppRes)->reUse = false;
×
2458

2459
  return TSDB_CODE_SUCCESS;
×
2460
_return:
×
2461
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2462
  if (pMergeOp) {
×
2463
    taosMemoryFree(pMergeOp);
×
2464
  }
2465
  if (*ppRes) {
×
2466
    if ((*ppRes)->pChildren) {
×
2467
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
2468
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
2469
        if (pChildParam) {
×
2470
          SExternalWindowOperatorParam* pExtWinOp = (SExternalWindowOperatorParam*)pChildParam->value;
×
2471
          if (pExtWinOp) {
×
2472
            if (pExtWinOp->ExtWins) {
×
2473
              taosArrayDestroy(pExtWinOp->ExtWins);
×
2474
            }
2475
            taosMemoryFree(pExtWinOp);
×
2476
          }
2477
          taosMemoryFree(pChildParam);
×
2478
        }
2479
      }
2480
      taosArrayDestroy((*ppRes)->pChildren);
×
2481
    }
2482
    taosMemoryFree(*ppRes);
×
2483
    *ppRes = NULL;
×
2484
  }
2485
  return code;
×
2486
}
2487

2488
int32_t vtbWindowOpen(SOperatorInfo* pOperator) {
×
2489
  int32_t                    code = TSDB_CODE_SUCCESS;
×
2490
  int32_t                    lino = 0;
×
2491
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
2492
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
2493
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
2494
  int64_t                    st = 0;
×
2495

2496
  if (OPTR_IS_OPENED(pOperator)) {
×
2497
    return code;
×
2498
  }
2499

2500
  if (pOperator->cost.openCost == 0) {
×
2501
    st = taosGetTimestampUs();
×
2502
  }
2503

2504
  while (1) {
×
2505
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
2506
    if (pBlock == NULL) {
×
2507
      break;
×
2508
    }
2509

2510
    if (pInfo->outputWendSlotId == -1 && pInfo->outputWstartSlotId == -1 && pInfo->outputWdurationSlotId == -1) {
×
2511
      for (int32_t i = 0; i < LIST_LENGTH(pInfo->pTargets); ++i) {
×
2512
        STargetNode* pNode = (STargetNode*)nodesListGetNode(pInfo->pTargets, i);
×
2513
        if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == pBlock->info.id.blockId) {
×
2514
          if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wstartSlotId) {
×
2515
            pInfo->outputWstartSlotId = i;
×
2516
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wendSlotId) {
×
2517
            pInfo->outputWendSlotId = i;
×
2518
          } else if (((SColumnNode*)pNode->pExpr)->slotId == pDynInfo->vtbWindow.wdurationSlotId) {
×
2519
            pInfo->outputWdurationSlotId = i;
×
2520
          }
2521
        }
2522
      }
2523
    }
2524

2525
    TSKEY* wstartCol = NULL;
×
2526
    TSKEY* wendCol = NULL;
×
2527

2528
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wstartSlotId, &wstartCol);
×
2529
    QUERY_CHECK_CODE(code, lino, _return);
×
2530
    code = extractTsCol(pBlock, pDynInfo->vtbWindow.wendSlotId, &wendCol);
×
2531
    QUERY_CHECK_CODE(code, lino, _return);
×
2532

2533
    if (pDynInfo->vtbWindow.singleWinMode) {
×
2534
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
2535
        SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
2536
        QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
2537

2538
        QUERY_CHECK_NULL(taosArrayReserve(pWin, 1), code, lino, _return, terrno);
×
2539

2540
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, 0);
×
2541
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2542
        pWindow->tw.skey = wstartCol[i];
×
2543
        pWindow->tw.ekey = wendCol[i] + 1;
×
2544
        pWindow->winOutIdx = -1;
×
2545

2546
        QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
2547
      }
2548
    } else {
2549
      SArray* pWin = taosArrayInit(pBlock->info.rows, sizeof(SExtWinTimeWindow));
×
2550
      QUERY_CHECK_NULL(pWin, code, lino, _return, terrno)
×
2551

2552
      QUERY_CHECK_NULL(taosArrayReserve(pWin, pBlock->info.rows), code, lino, _return, terrno);
×
2553

2554
      for (int32_t i = 0; i < pBlock->info.rows; i++) {
×
2555
        SExtWinTimeWindow* pWindow = taosArrayGet(pWin, i);
×
2556
        QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2557
        pWindow->tw.skey = wstartCol[i];
×
2558
        pWindow->tw.ekey = wendCol[i] + 1;
×
2559
        pWindow->winOutIdx = -1;
×
2560
      }
2561

2562
      QUERY_CHECK_NULL(taosArrayPush(pDynInfo->vtbWindow.pWins, &pWin), code, lino, _return, terrno);
×
2563
    }
2564
  }
2565

2566
  // handle first window's start key and last window's end key
2567
  SArray* firstBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, 0);
×
2568
  SArray* lastBatch = (SArray*)taosArrayGetP(pDynInfo->vtbWindow.pWins, taosArrayGetSize(pDynInfo->vtbWindow.pWins) - 1);
×
2569

2570
  QUERY_CHECK_NULL(firstBatch, code, lino, _return, terrno)
×
2571
  QUERY_CHECK_NULL(lastBatch, code, lino, _return, terrno)
×
2572

2573
  SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(firstBatch, 0);
×
2574
  SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(lastBatch);
×
2575

2576
  QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2577
  QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2578

2579
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_BACKWARD) {
×
2580
    lastWin->tw.ekey = INT64_MAX;
×
2581
  }
2582
  if (pInfo->extendOption == STATE_WIN_EXTEND_OPTION_FORWARD) {
×
2583
    firstWin->tw.skey = INT64_MIN;
×
2584
  }
2585

2586
  OPTR_SET_OPENED(pOperator);
×
2587

2588
  if (pOperator->cost.openCost == 0) {
×
2589
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
2590
  }
2591

2592
_return:
×
2593
  if (code != TSDB_CODE_SUCCESS) {
×
2594
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2595
    pTaskInfo->code = code;
×
2596
    T_LONG_JMP(pTaskInfo->env, code);
×
2597
  }
2598
  return code;
×
2599
}
2600

2601
static int32_t buildDynQueryCtrlOperatorParamForExternalWindow(SOperatorParam** ppRes, int32_t downstreamIdx, int64_t skey, int64_t ekey) {
×
2602
  int32_t                     code = TSDB_CODE_SUCCESS;
×
2603
  int32_t                     lino = 0;
×
2604
  SDynQueryCtrlOperatorParam* pDyn = NULL;
×
2605

2606
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2607
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2608

2609
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
2610
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2611

2612
  pDyn = taosMemoryMalloc(sizeof(SDynQueryCtrlOperatorParam));
×
2613
  QUERY_CHECK_NULL(pDyn, code, lino, _return, terrno);
×
2614

2615
  pDyn->window.skey = skey;
×
2616
  pDyn->window.ekey = ekey;
×
2617

2618
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL;
×
2619
  (*ppRes)->downstreamIdx = 0;
×
2620
  (*ppRes)->reUse = false;
×
2621
  (*ppRes)->value = pDyn;
×
2622

2623
  return code;
×
2624
_return:
×
2625
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2626
  if (pDyn) {
×
2627
    taosMemoryFree(pDyn);
×
2628
  }
2629
  if (*ppRes) {
×
2630
    if ((*ppRes)->pChildren) {
×
2631
      taosArrayDestroy((*ppRes)->pChildren);
×
2632
    }
2633
    taosMemoryFree(*ppRes);
×
2634
    *ppRes = NULL;
×
2635
  }
2636
  return code;
×
2637
}
2638

2639
static int32_t buildExternalWindowOperatorParamEx(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, SArray* pWins, int32_t idx) {
×
2640
  int32_t                       code = TSDB_CODE_SUCCESS;
×
2641
  int32_t                       lino = 0;
×
2642
  SExternalWindowOperatorParam* pExtWinOp = NULL;
×
2643

2644
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
2645
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno)
×
2646

2647
  pExtWinOp = taosMemoryMalloc(sizeof(SExternalWindowOperatorParam));
×
2648
  QUERY_CHECK_NULL(pExtWinOp, code, lino, _return, terrno)
×
2649

2650
  pExtWinOp->ExtWins = taosArrayDup(pWins, NULL);
×
2651
  QUERY_CHECK_NULL(pExtWinOp->ExtWins, code, lino, _return, terrno)
×
2652

2653
  SExtWinTimeWindow *firstWin = (SExtWinTimeWindow *)taosArrayGet(pWins, 0);
×
2654
  SExtWinTimeWindow *lastWin = (SExtWinTimeWindow *)taosArrayGet(pWins, taosArrayGetSize(pWins) - 1);
×
2655

2656
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
2657
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno)
×
2658

2659
  SOperatorParam* pDynQueryCtrlParam = NULL;
×
2660
  code = buildDynQueryCtrlOperatorParamForExternalWindow(&pDynQueryCtrlParam, 0, firstWin->tw.skey, lastWin->tw.ekey);
×
2661
  QUERY_CHECK_CODE(code, lino, _return);
×
2662
  QUERY_CHECK_NULL(taosArrayPush((*ppRes)->pChildren, &pDynQueryCtrlParam), code, lino, _return, terrno)
×
2663

2664
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXTERNAL_WINDOW;
×
2665
  (*ppRes)->downstreamIdx = idx;
×
2666
  (*ppRes)->value = pExtWinOp;
×
2667
  (*ppRes)->reUse = false;
×
2668

2669
  return code;
×
2670
_return:
×
2671
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2672
  if (pExtWinOp) {
×
2673
    if (pExtWinOp->ExtWins) {
×
2674
      taosArrayDestroy(pExtWinOp->ExtWins);
×
2675
    }
2676
    taosMemoryFree(pExtWinOp);
×
2677
  }
2678
  if (*ppRes) {
×
2679
    if ((*ppRes)->pChildren) {
×
2680
      for (int32_t i = 0; i < taosArrayGetSize((*ppRes)->pChildren); i++) {
×
2681
        SOperatorParam* pChildParam = (SOperatorParam*)taosArrayGet((*ppRes)->pChildren, i);
×
2682
        if (pChildParam) {
×
2683
          SDynQueryCtrlOperatorParam* pDynParam = (SDynQueryCtrlOperatorParam*)pChildParam->value;
×
2684
          if (pDynParam) {
×
2685
            taosMemoryFree(pDynParam);
×
2686
          }
2687
          taosMemoryFree(pChildParam);
×
2688
        }
2689
      }
2690
      taosArrayDestroy((*ppRes)->pChildren);
×
2691
    }
2692
    taosMemoryFree(*ppRes);
×
2693
    *ppRes = NULL;
×
2694
  }
2695
  return code;
×
2696
}
2697

2698
int32_t vtbWindowNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
2699
  int32_t                    code = TSDB_CODE_SUCCESS;
×
2700
  int32_t                    lino = 0;
×
2701
  SDynQueryCtrlOperatorInfo* pDynInfo = pOperator->info;
×
2702
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
2703
  int64_t                    st = taosGetTimestampUs();
×
2704
  int32_t                    numOfWins = 0;
×
2705
  SOperatorInfo*             mergeOp = NULL;
×
2706
  SOperatorInfo*             extWinOp = NULL;
×
2707
  SOperatorParam*            pMergeParam = NULL;
×
2708
  SOperatorParam*            pExtWinParam = NULL;
×
2709
  SVtbWindowDynCtrlInfo*     pInfo = &pDynInfo->vtbWindow;
×
2710
  SSDataBlock*               pRes = pInfo->pRes;
×
2711

2712
  code = pOperator->fpSet._openFn(pOperator);
×
2713
  QUERY_CHECK_CODE(code, lino, _return);
×
2714

2715
  if (pInfo->curWinBatchIdx >= taosArrayGetSize(pInfo->pWins)) {
×
2716
    *ppRes = NULL;
×
2717
    return code;
×
2718
  }
2719

2720
  SArray* pWinArray = (SArray*)taosArrayGetP(pInfo->pWins, pInfo->curWinBatchIdx);
×
2721
  QUERY_CHECK_NULL(pWinArray, code, lino, _return, terrno)
×
2722

2723
  numOfWins = (int32_t)taosArrayGetSize(pWinArray);
×
2724

2725
  if (pInfo->isVstb) {
×
2726
    extWinOp = pOperator->pDownstream[1];
×
2727
    code = buildExternalWindowOperatorParamEx(pDynInfo, &pExtWinParam, pWinArray, extWinOp->numOfDownstream);
×
2728
    QUERY_CHECK_CODE(code, lino, _return);
×
2729

2730
    SSDataBlock* pExtWinBlock = NULL;
×
2731
    code = extWinOp->fpSet.getNextExtFn(extWinOp, pExtWinParam, &pExtWinBlock);
×
2732
    QUERY_CHECK_CODE(code, lino, _return);
×
2733

2734
    blockDataCleanup(pRes);
×
2735
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
2736
    QUERY_CHECK_CODE(code, lino, _return);
×
2737

2738
    if (pExtWinBlock) {
×
2739
      code = copyColumnsValue(pInfo->pTargets, pExtWinBlock->info.id.blockId, pRes, pExtWinBlock, numOfWins);
×
2740
      QUERY_CHECK_CODE(code, lino, _return);
×
2741

2742
      if (pInfo->curWinBatchIdx == 0) {
×
2743
        // first batch, get _wstart from pMergedBlock
2744
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
2745
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2746

2747
        firstWin->tw.skey = pExtWinBlock->info.window.skey;
×
2748
      }
2749
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
2750
        // last batch, get _wend from pMergedBlock
2751
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
2752
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2753

2754
        lastWin->tw.ekey = pExtWinBlock->info.window.ekey + 1;
×
2755
      }
2756
    }
2757
  } else {
2758
    mergeOp = pOperator->pDownstream[1];
×
2759
    code = buildMergeOperatorParam(pDynInfo, &pMergeParam, pWinArray, mergeOp->numOfDownstream, numOfWins);
×
2760
    QUERY_CHECK_CODE(code, lino, _return);
×
2761

2762
    SSDataBlock* pMergedBlock = NULL;
×
2763
    code = mergeOp->fpSet.getNextExtFn(mergeOp, pMergeParam, &pMergedBlock);
×
2764
    QUERY_CHECK_CODE(code, lino, _return);
×
2765

2766
    blockDataCleanup(pRes);
×
2767
    code = blockDataEnsureCapacity(pRes, numOfWins);
×
2768
    QUERY_CHECK_CODE(code, lino, _return);
×
2769

2770
    if (pMergedBlock) {
×
2771
      code = copyColumnsValue(pInfo->pTargets, pMergedBlock->info.id.blockId, pRes, pMergedBlock, numOfWins);
×
2772
      QUERY_CHECK_CODE(code, lino, _return);
×
2773

2774
      if (pInfo->curWinBatchIdx == 0) {
×
2775
        // first batch, get _wstart from pMergedBlock
2776
        SExtWinTimeWindow* firstWin = (SExtWinTimeWindow*)taosArrayGet(taosArrayGetP(pInfo->pWins, 0), 0);
×
2777
        QUERY_CHECK_NULL(firstWin, code, lino, _return, terrno)
×
2778

2779
        firstWin->tw.skey = pMergedBlock->info.window.skey;
×
2780
      }
2781
      if (pInfo->curWinBatchIdx == taosArrayGetSize(pInfo->pWins) - 1) {
×
2782
        // last batch, get _wend from pMergedBlock
2783
        SExtWinTimeWindow* lastWin = (SExtWinTimeWindow*)taosArrayGetLast(taosArrayGetP(pInfo->pWins, taosArrayGetSize(pInfo->pWins) - 1));
×
2784
        QUERY_CHECK_NULL(lastWin, code, lino, _return, terrno)
×
2785

2786
        lastWin->tw.ekey = pMergedBlock->info.window.ekey + 1;
×
2787
      }
2788
    }
2789
  }
2790

2791

2792
  if (pInfo->outputWstartSlotId != -1) {
×
2793
    SColumnInfoData* pWstartCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWstartSlotId);
×
2794
    QUERY_CHECK_NULL(pWstartCol, code, lino, _return, terrno)
×
2795

2796
    for (int32_t i = 0; i < numOfWins; i++) {
×
2797
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
2798
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2799
      code = colDataSetVal(pWstartCol, i, (const char*)&pWindow->tw.skey, false);
×
2800
      QUERY_CHECK_CODE(code, lino, _return);
×
2801
    }
2802
  }
2803
  if (pInfo->outputWendSlotId != -1) {
×
2804
    SColumnInfoData* pWendCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWendSlotId);
×
2805
    QUERY_CHECK_NULL(pWendCol, code, lino, _return, terrno)
×
2806

2807
    for (int32_t i = 0; i < numOfWins; i++) {
×
2808
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
2809
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2810
      TSKEY ekey = pWindow->tw.ekey - 1;
×
2811
      code = colDataSetVal(pWendCol, i, (const char*)&ekey, false);
×
2812
      QUERY_CHECK_CODE(code, lino, _return);
×
2813
    }
2814
  }
2815
  if (pInfo->outputWdurationSlotId != -1) {
×
2816
    SColumnInfoData* pWdurationCol = taosArrayGet(pRes->pDataBlock, pInfo->outputWdurationSlotId);
×
2817
    QUERY_CHECK_NULL(pWdurationCol, code, lino, _return, terrno)
×
2818

2819
    for (int32_t i = 0; i < numOfWins; i++) {
×
2820
      SExtWinTimeWindow* pWindow = (SExtWinTimeWindow*)taosArrayGet(pWinArray, i);
×
2821
      QUERY_CHECK_NULL(pWindow, code, lino, _return, terrno)
×
2822
      int64_t duration = pWindow->tw.ekey - 1 - pWindow->tw.skey;
×
2823
      code = colDataSetVal(pWdurationCol, i, (const char*)&duration, false);
×
2824
      QUERY_CHECK_CODE(code, lino, _return);
×
2825
    }
2826
  }
2827

2828
  pRes->info.rows = numOfWins;
×
2829
  *ppRes = pRes;
×
2830
  pInfo->curWinBatchIdx++;
×
2831

2832
  return code;
×
2833

2834
_return:
×
2835
  if (code != TSDB_CODE_SUCCESS) {
×
2836
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
2837
    pTaskInfo->code = code;
×
2838
    T_LONG_JMP(pTaskInfo->env, code);
×
2839
  }
2840
  return code;
×
2841
}
2842

2843
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
×
2844
  SDynQueryCtrlOperatorInfo*    pDyn = pOper->info;
×
2845
  SDynQueryCtrlPhysiNode const* pPhyciNode = pOper->pPhyNode;
×
2846
  SExecTaskInfo*                pTaskInfo = pOper->pTaskInfo;
×
2847

2848
  pOper->status = OP_NOT_OPENED;
×
2849

2850
  switch (pDyn->qType) {
×
2851
    case DYN_QTYPE_STB_HASH:{
×
2852
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
×
2853
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
×
2854
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
×
2855
      
2856
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
×
2857
      if (TSDB_CODE_SUCCESS != code) {
×
2858
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
2859
        return code;
×
2860
      }
2861
      pStbJoin->ctx.prev.pListHead = NULL;
×
2862
      pStbJoin->ctx.prev.joinBuild = false;
×
2863
      pStbJoin->ctx.prev.pListTail = NULL;
×
2864
      pStbJoin->ctx.prev.tableNum = 0;
×
2865

2866
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
×
2867
      break; 
×
2868
    }
2869
    case DYN_QTYPE_VTB_SCAN: {
×
2870
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
×
2871
      
2872
      if (pVtbScan->orgTbVgColMap) {
×
2873
        taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
2874
        taosHashCleanup(pVtbScan->orgTbVgColMap);
×
2875
        pVtbScan->orgTbVgColMap = NULL;
×
2876
      }
2877
      if (pVtbScan->pRsp) {
×
2878
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
2879
        taosMemoryFreeClear(pVtbScan->pRsp);
×
2880
      }
2881
      if (pVtbScan->colRefInfo) {
×
2882
        taosArrayDestroyEx(pVtbScan->colRefInfo, destroyColRefInfo);
×
2883
        pVtbScan->colRefInfo = NULL;
×
2884
      }
2885
      if (pVtbScan->childTableMap) {
×
2886
        taosHashCleanup(pVtbScan->childTableMap);
×
2887
        pVtbScan->childTableMap = NULL;
×
2888
      }
2889
      if (pVtbScan->childTableList) {
×
2890
        taosArrayClearEx(pVtbScan->childTableList, destroyColRefArray);
×
2891
      }
2892
      if (pPhyciNode->dynTbname && pTaskInfo) {
×
2893
        updateDynTbUidIfNeeded(pVtbScan, pTaskInfo->pStreamRuntimeInfo);
×
2894
      }
2895
      pVtbScan->curTableIdx = 0;
×
2896
      pVtbScan->lastTableIdx = -1;
×
2897
      break;
×
2898
    }
2899
    case DYN_QTYPE_VTB_WINDOW: {
×
2900
      SVtbWindowDynCtrlInfo* pVtbWindow = &pDyn->vtbWindow;
×
2901
      if (pVtbWindow->pRes) {
×
2902
        blockDataDestroy(pVtbWindow->pRes);
×
2903
        pVtbWindow->pRes = NULL;
×
2904
      }
2905
      if (pVtbWindow->pWins) {
×
2906
        taosArrayDestroyEx(pVtbWindow->pWins, destroyWinArray);
×
2907
        pVtbWindow->pWins = NULL;
×
2908
      }
2909
      pVtbWindow->outputWdurationSlotId = -1;
×
2910
      pVtbWindow->outputWendSlotId = -1;
×
2911
      pVtbWindow->outputWstartSlotId = -1;
×
2912
      pVtbWindow->curWinBatchIdx = 0;
×
2913
      break;
×
2914
    }
2915
    default:
×
2916
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
2917
      break;
×
2918
  }
2919
  return 0;
×
2920
}
2921

2922
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
338,754✔
2923
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
2924
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
2925
  QRY_PARAM_CHECK(pOptrInfo);
338,754✔
2926

2927
  int32_t                    code = TSDB_CODE_SUCCESS;
338,754✔
2928
  int32_t                    line = 0;
338,754✔
2929
  __optr_fn_t                nextFp = NULL;
338,754✔
2930
  __optr_open_fn_t           openFp = NULL;
338,754✔
2931
  SOperatorInfo*             pOperator = NULL;
338,754✔
2932
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
338,754✔
2933
  QUERY_CHECK_NULL(pInfo, code, line, _error, terrno)
338,754✔
2934

2935
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
338,754✔
2936
  QUERY_CHECK_NULL(pOperator, code, line, _error, terrno)
338,754✔
2937

2938
  pOperator->pPhyNode = pPhyciNode;
338,754✔
2939
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
338,754✔
2940

2941
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
338,754✔
2942
  QUERY_CHECK_CODE(code, line, _error);
338,754✔
2943

2944
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
338,754✔
2945
                  pInfo, pTaskInfo);
2946

2947
  pInfo->qType = pPhyciNode->qType;
338,754✔
2948
  switch (pInfo->qType) {
338,754✔
2949
    case DYN_QTYPE_STB_HASH:
216,981✔
2950
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
216,981✔
2951
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
216,981✔
2952
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
216,981✔
2953
      QUERY_CHECK_CODE(code, line, _error);
216,981✔
2954
      nextFp = seqStableJoin;
216,981✔
2955
      openFp = optrDummyOpenFn;
216,981✔
2956
      break;
216,981✔
2957
    case DYN_QTYPE_VTB_SCAN:
121,773✔
2958
      code = initVtbScanInfo(pInfo, pMsgCb, pPhyciNode, pTaskInfo);
121,773✔
2959
      QUERY_CHECK_CODE(code, line, _error);
121,773✔
2960
      nextFp = vtbScanNext;
121,773✔
2961
      openFp = vtbScanOpen;
121,773✔
2962
      break;
121,773✔
2963
    case DYN_QTYPE_VTB_WINDOW:
×
2964
      code = initVtbWindowInfo(pInfo, pPhyciNode, pTaskInfo, pOperator);
×
2965
      QUERY_CHECK_CODE(code, line, _error);
×
2966
      nextFp = vtbWindowNext;
×
2967
      openFp = vtbWindowOpen;
×
2968
      break;
×
2969
    default:
×
2970
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
2971
      code = TSDB_CODE_INVALID_PARA;
×
2972
      goto _error;
×
2973
  }
2974

2975
  pOperator->fpSet = createOperatorFpSet(openFp, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
338,754✔
2976
                                         NULL, optrDefaultGetNextExtFn, NULL);
2977

2978
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
338,754✔
2979
  *pOptrInfo = pOperator;
338,754✔
2980
  return TSDB_CODE_SUCCESS;
338,754✔
2981

2982
_error:
×
2983
  if (pInfo != NULL) {
×
2984
    destroyDynQueryCtrlOperator(pInfo);
×
2985
  }
2986
  qError("failed to create dyn query ctrl operator, %s code:%s, line:%d", __func__, tstrerror(code), line);
×
2987
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
2988
  pTaskInfo->code = code;
×
2989
  return code;
×
2990
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc