• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.05
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
103✔
41
  SStbJoinTableList* pNext = NULL;
103✔
42
  
43
  while (pListHead) {
103!
44
    taosMemoryFree(pListHead->pLeftVg);
×
45
    taosMemoryFree(pListHead->pLeftUid);
×
46
    taosMemoryFree(pListHead->pRightVg);
×
47
    taosMemoryFree(pListHead->pRightUid);
×
48
    pNext = pListHead->pNext;
×
49
    taosMemoryFree(pListHead);
×
50
    pListHead = pNext;
×
51
  }
52
}
103✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
103✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
103!
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
103!
60
    if (pStbJoin->ctx.prev.leftHash) {
103!
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
×
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
×
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
103!
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
×
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
×
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
×
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
×
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
×
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
×
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
×
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
×
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
103✔
81
}
103✔
82
typedef struct {
83
  char*    colName;
84
  char*    colrefName;
85
  tb_uid_t uid;
86
  col_id_t colId;
87
  int32_t  vgId;
88
} SColRefKV;
89

90
void destroyOrgTbInfo(void *info) {
×
91
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
×
92
  if (pOrgTbInfo) {
×
93
    taosArrayDestroy(pOrgTbInfo->colMap);
×
94
  }
95
}
×
96

97
void destroyColRefKV(void *info) {
×
98
  SColRefKV *pColRefKV = (SColRefKV *)info;
×
99
  if (pColRefKV) {
×
100
    taosMemoryFree(pColRefKV->colName);
×
101
    taosMemoryFree(pColRefKV->colrefName);
×
102
  }
103
}
×
104

105
void destroyColRefArray(void *info) {
×
106
  SArray *pColRefArray = *(SArray **)info;
×
107
  if (pColRefArray) {
×
108
    taosArrayDestroyEx(pColRefArray, destroyColRefKV);
×
109
  }
110
}
×
111

112
void freeUseDbOutput(void* pOutput) {
×
113
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
×
114
  if (NULL == pOutput) {
×
115
    return;
×
116
  }
117

118
  if (pOut->dbVgroup) {
×
119
    freeVgInfo(pOut->dbVgroup);
×
120
  }
121
  taosMemFree(pOut);
×
122
}
123

124
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
×
125
  if (pVtbScan->dbName) {
×
126
    taosMemoryFreeClear(pVtbScan->dbName);
×
127
  }
128
  if (pVtbScan->stbName) {
×
129
    taosMemoryFreeClear(pVtbScan->stbName);
×
130
  }
131
  if (pVtbScan->childTableList) {
×
132
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
×
133
  }
134
  if (pVtbScan->childTableMap) {
×
135
    taosHashCleanup(pVtbScan->childTableMap);
×
136
  }
137
  if (pVtbScan->readColList) {
×
138
    taosArrayDestroy(pVtbScan->readColList);
×
139
  }
140
  if (pVtbScan->dbVgInfoMap) {
×
141
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
×
142
    taosHashCleanup(pVtbScan->dbVgInfoMap);
×
143
  }
144
  if (pVtbScan->orgTbVgColMap) {
×
145
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
146
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
147
  }
148
  if (pVtbScan->pRsp) {
×
149
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
150
    taosMemoryFreeClear(pVtbScan->pRsp);
×
151
  }
152
}
×
153

154
static void destroyDynQueryCtrlOperator(void* param) {
103✔
155
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
103✔
156

157
  switch (pDyn->qType) {
103!
158
    case DYN_QTYPE_STB_HASH:
103✔
159
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
103✔
160
      break;
103✔
161
    case DYN_QTYPE_VTB_SCAN:
×
162
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
×
163
      break;
×
164
    default:
×
165
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
166
      break;
×
167
  }
168

169
  taosMemoryFreeClear(param);
103!
170
}
103✔
171

172
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
173
  if (batchFetch) {
508✔
174
    return true;
508✔
175
  }
176
  
177
  if (rightTable) {
×
178
    return pPost->rightCurrUid == pPost->rightNextUid;
×
179
  }
180

181
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
×
182

183
  return (NULL == num) ? false : true;
×
184
}
185

186
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
254✔
187
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
254✔
188
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
254✔
189
  SStbJoinTableList*         pNode = pPrev->pListHead;
254✔
190
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
254✔
191
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
254✔
192
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
254✔
193
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
254✔
194
  int64_t                    readIdx = pNode->readIdx + 1;
254✔
195
  int64_t                    rightPrevUid = pPost->rightCurrUid;
254✔
196

197
  pPost->leftCurrUid = *leftUid;
254✔
198
  pPost->rightCurrUid = *rightUid;
254✔
199

200
  pPost->leftVgId = *leftVgId;
254✔
201
  pPost->rightVgId = *rightVgId;
254✔
202

203
  while (true) {
204
    if (readIdx < pNode->uidNum) {
254✔
205
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
151✔
206
      break;
151✔
207
    }
208
    
209
    pNode = pNode->pNext;
103✔
210
    if (NULL == pNode) {
103!
211
      pPost->rightNextUid = 0;
103✔
212
      break;
103✔
213
    }
214
    
215
    rightUid = pNode->pRightUid;
×
216
    readIdx = 0;
×
217
  }
218

219
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
254!
220
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
254!
221

222
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
254!
223
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
224
    pStbJoin->execInfo.rightCacheNum++;
×
225
  }  
226

227
  return TSDB_CODE_SUCCESS;
254✔
228
}
229

230

231
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
508✔
232
  int32_t code = TSDB_CODE_SUCCESS;
508✔
233
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
508!
234
  if (NULL == *ppRes) {
508!
235
    code = terrno;
×
236
    freeOperatorParam(pChild, OP_GET_PARAM);
×
237
    return code;
×
238
  }
239
  if (pChild) {
508✔
240
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
206✔
241
    if (NULL == (*ppRes)->pChildren) {
206!
242
      code = terrno;
×
243
      freeOperatorParam(pChild, OP_GET_PARAM);
×
244
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
245
      *ppRes = NULL;
×
246
      return code;
×
247
    }
248
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
412!
249
      code = terrno;
×
250
      freeOperatorParam(pChild, OP_GET_PARAM);
×
251
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
252
      *ppRes = NULL;
×
253
      return code;
×
254
    }
255
  } else {
256
    (*ppRes)->pChildren = NULL;
302✔
257
  }
258

259
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
508!
260
  if (NULL == pGc) {
508!
261
    code = terrno;
×
262
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
263
    *ppRes = NULL;
×
264
    return code;
×
265
  }
266

267
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
508✔
268
  pGc->downstreamIdx = downstreamIdx;
508✔
269
  pGc->vgId = vgId;
508✔
270
  pGc->tbUid = tbUid;
508✔
271
  pGc->needCache = needCache;
508✔
272

273
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
508✔
274
  (*ppRes)->downstreamIdx = downstreamIdx;
508✔
275
  (*ppRes)->value = pGc;
508✔
276
  (*ppRes)->reUse = false;
508✔
277

278
  return TSDB_CODE_SUCCESS;
508✔
279
}
280

281

282
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
283
  int32_t code = TSDB_CODE_SUCCESS;
×
284
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
285
  if (NULL == *ppRes) {
×
286
    return terrno;
×
287
  }
288
  (*ppRes)->pChildren = NULL;
×
289

290
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
291
  if (NULL == pGc) {
×
292
    code = terrno;
×
293
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
294
    return code;
×
295
  }
296

297
  pGc->downstreamIdx = downstreamIdx;
×
298
  pGc->vgId = vgId;
×
299
  pGc->tbUid = tbUid;
×
300

301
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
302
  (*ppRes)->downstreamIdx = downstreamIdx;
×
303
  (*ppRes)->value = pGc;
×
304
  (*ppRes)->reUse = false;
×
305

306
  return TSDB_CODE_SUCCESS;
×
307
}
308

309

310
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
×
311
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
312
  if (NULL == *ppRes) {
×
313
    return terrno;
×
314
  }
315
  (*ppRes)->pChildren = NULL;
×
316
  
317
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
318
  if (NULL == pExc) {
×
319
    return terrno;
×
320
  }
321

322
  pExc->multiParams = false;
×
323
  pExc->basic.vgId = *pVgId;
×
324
  pExc->basic.tableSeq = true;
×
325
  pExc->basic.isVtbRefScan = false;
×
326
  pExc->basic.isVtbTagScan = false;
×
327
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
328
  pExc->basic.colMap = NULL;
×
329
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
330
  if (NULL == pExc->basic.uidList) {
×
331
    taosMemoryFree(pExc);
×
332
    return terrno;
×
333
  }
334
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
×
335
    taosArrayDestroy(pExc->basic.uidList);
×
336
    taosMemoryFree(pExc);
×
337
    return terrno;
×
338
  }
339

340
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
341
  (*ppRes)->downstreamIdx = downstreamIdx;
×
342
  (*ppRes)->value = pExc;
×
343
  (*ppRes)->reUse = false;
×
344

345
  return TSDB_CODE_SUCCESS;
×
346
}
347

348
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
184✔
349
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
184!
350
  if (NULL == *ppRes) {
184!
351
    return terrno;
×
352
  }
353
  (*ppRes)->pChildren = NULL;
184✔
354
  
355
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
184!
356
  if (NULL == pExc) {
184!
357
    taosMemoryFreeClear(*ppRes);
×
358
    return terrno;
×
359
  }
360

361
  pExc->multiParams = true;
184✔
362
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
184✔
363
  if (NULL == pExc->pBatchs) {
184!
364
    taosMemoryFree(pExc);
×
365
    taosMemoryFreeClear(*ppRes);
×
366
    return terrno;
×
367
  }
368
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
184✔
369
  
370
  SExchangeOperatorBasicParam basic;
371
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
184✔
372

373
  int32_t iter = 0;
184✔
374
  void* p = NULL;
184✔
375
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
526✔
376
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
342✔
377
    SArray* pUidList = *(SArray**)p;
342✔
378
    basic.vgId = *pVgId;
342✔
379
    basic.uidList = pUidList;
342✔
380
    basic.colMap = NULL;
342✔
381
    basic.tableSeq = false;
342✔
382
    basic.isVtbRefScan = false;
342✔
383
    basic.isVtbTagScan = false;
342✔
384
    
385
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
342!
386

387
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
342!
388
    *(SArray**)p = NULL;
342✔
389
  }
390

391
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
184✔
392
  (*ppRes)->downstreamIdx = downstreamIdx;
184✔
393
  (*ppRes)->value = pExc;
184✔
394
  (*ppRes)->reUse = false;
184✔
395

396
  return TSDB_CODE_SUCCESS;
184✔
397
}
398

399
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
×
400
  int32_t                      code = TSDB_CODE_SUCCESS;
×
401
  int32_t                      lino = 0;
×
402
  SExchangeOperatorParam*      pExc = NULL;
×
403
  SExchangeOperatorBasicParam* basic = NULL;
×
404

405
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
406
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
407
  (*ppRes)->pChildren = NULL;
×
408

409
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
410
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno);
×
411

412
  pExc->multiParams = false;
×
413

414
  basic = &pExc->basic;
×
415
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
×
416

417
  basic->vgId = vgId;
×
418
  basic->tableSeq = false;
×
419
  basic->isVtbRefScan = false;
×
420
  basic->isVtbTagScan = true;
×
421
  basic->colMap = NULL;
×
422

423
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
×
424
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno);
×
425
  taosArrayPush(basic->uidList, &uid);
×
426

427
  (*ppRes)->pChildren = NULL;
×
428

429
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
430
  (*ppRes)->downstreamIdx = downstreamIdx;
×
431
  (*ppRes)->value = pExc;
×
432
  (*ppRes)->reUse = true;
×
433

434
  return TSDB_CODE_SUCCESS;
×
435

436
_return:
×
437
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
438
  taosMemoryFreeClear(*ppRes);
×
439
  if (basic) {
×
440
    if (basic->colMap) {
×
441
      taosArrayDestroy(basic->colMap->colMap);
×
442
      taosMemoryFreeClear(basic->colMap);
×
443
    }
444
    if (basic->uidList) {
×
445
      taosArrayDestroy(basic->uidList);
×
446
    }
447
    taosMemoryFreeClear(basic);
×
448
  }
449
  taosMemoryFreeClear(pExc);
×
450
  return code;
×
451
}
452

453
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
×
454
  int32_t                      code = TSDB_CODE_SUCCESS;
×
455
  int32_t                      lino = 0;
×
456
  SExchangeOperatorParam*      pExc = NULL;
×
457
  SExchangeOperatorBasicParam* basic = NULL;
×
458

459
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
460
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
461
  (*ppRes)->pChildren = NULL;
×
462

463
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
464
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno);
×
465

466
  pExc->multiParams = false;
×
467

468
  basic = &pExc->basic;
×
469
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
470

471
  basic->vgId = pMap->vgId;
×
472
  basic->tableSeq = false;
×
473
  basic->isVtbRefScan = true;
×
474
  basic->isVtbTagScan = false;
×
475
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
476
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno);
×
477
  basic->colMap->vgId = pMap->vgId;
×
478
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
479
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
×
480
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno);
×
481

482
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
×
483
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno);
×
484

485
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
486
  (*ppRes)->downstreamIdx = downstreamIdx;
×
487
  (*ppRes)->value = pExc;
×
488
  (*ppRes)->reUse = true;
×
489

490
  return TSDB_CODE_SUCCESS;
×
491

492
_return:
×
493
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
494
  taosMemoryFreeClear(*ppRes);
×
495
  if (basic) {
×
496
    if (basic->colMap) {
×
497
      taosArrayDestroy(basic->colMap->colMap);
×
498
      taosMemoryFreeClear(basic->colMap);
×
499
    }
500
    if (basic->uidList) {
×
501
      taosArrayDestroy(basic->uidList);
×
502
    }
503
    taosMemoryFreeClear(basic);
×
504
  }
505
  taosMemoryFreeClear(pExc);
×
506
  return code;
×
507
}
508

509
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
254✔
510
  int32_t code = TSDB_CODE_SUCCESS;
254✔
511
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
254!
512
  if (NULL == *ppRes) {
254!
513
    code = terrno;
×
514
    return code;
×
515
  }
516
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
254✔
517
  if (NULL == (*ppRes)->pChildren) {
254!
518
    code = terrno;
×
519
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
520
    *ppRes = NULL;
×
521
    return code;
×
522
  }
523
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
508!
524
    code = terrno;
×
525
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
526
    *ppRes = NULL;
×
527
    return code;
×
528
  }
529
  *ppChild0 = NULL;
254✔
530
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
508!
531
    code = terrno;
×
532
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
533
    *ppRes = NULL;
×
534
    return code;
×
535
  }
536
  *ppChild1 = NULL;
254✔
537
  
538
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
254!
539
  if (NULL == pJoin) {
254!
540
    code = terrno;
×
541
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
542
    *ppRes = NULL;
×
543
    return code;
×
544
  }
545

546
  pJoin->initDownstream = initParam;
254✔
547
  
548
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
254✔
549
  (*ppRes)->value = pJoin;
254✔
550
  (*ppRes)->reUse = false;
254✔
551

552
  return TSDB_CODE_SUCCESS;
254✔
553
}
554

555
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
556
  int32_t code = TSDB_CODE_SUCCESS;
×
557
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
558
  if (NULL == *ppRes) {
×
559
    code = terrno;
×
560
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
561
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
562
    return code;
×
563
  }
564
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
565
  if (NULL == *ppRes) {
×
566
    code = terrno;
×
567
    taosMemoryFreeClear(*ppRes);
×
568
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
569
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
570
    return code;
×
571
  }
572
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
573
    code = terrno;
×
574
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
575
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
576
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
577
    *ppRes = NULL;
×
578
    return code;
×
579
  }
580
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
581
    code = terrno;
×
582
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
583
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
584
    *ppRes = NULL;
×
585
    return code;
×
586
  }
587
  
588
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
589
  (*ppRes)->value = NULL;
×
590
  (*ppRes)->reUse = false;
×
591

592
  return TSDB_CODE_SUCCESS;
×
593
}
594

595
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
22✔
596
  int32_t code = TSDB_CODE_SUCCESS;
22✔
597
  int32_t vgNum = tSimpleHashGetSize(pVg);
22✔
598
  if (vgNum <= 0 || vgNum > 1) {
22!
599
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
600
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
601
  }
602

603
  int32_t iter = 0;
22✔
604
  void* p = NULL;
22✔
605
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
44✔
606
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
22✔
607
    SArray* pUidList = *(SArray**)p;
22✔
608

609
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
22✔
610
    if (code) {
22!
611
      return code;
×
612
    }
613
    taosArrayDestroy(pUidList);
22✔
614
    *(SArray**)p = NULL;
22✔
615
  }
616
  
617
  return TSDB_CODE_SUCCESS;
22✔
618
}
619

620

621
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
622
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
623
  if (NULL == pUidList) {
×
624
    return terrno;
×
625
  }
626
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
627
    return terrno;
×
628
  }
629

630
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
631
  taosArrayDestroy(pUidList);
×
632
  if (code) {
×
633
    return code;
×
634
  }
635
  
636
  return TSDB_CODE_SUCCESS;
×
637
}
638

639
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
254✔
640
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
254✔
641
  SOperatorParam*             pSrcParam0 = NULL;
254✔
642
  SOperatorParam*             pSrcParam1 = NULL;
254✔
643
  SOperatorParam*             pGcParam0 = NULL;
254✔
644
  SOperatorParam*             pGcParam1 = NULL;  
254✔
645
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
254✔
646
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
254✔
647
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
254✔
648
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
254✔
649
  int32_t                     code = TSDB_CODE_SUCCESS;
254✔
650

651
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
254!
652
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
653

654
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
254!
655
  
656
  if (pInfo->stbJoin.basic.batchFetch) {
254!
657
    if (pPrev->leftHash) {
254✔
658
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
103✔
659
      if (TSDB_CODE_SUCCESS == code) {
103!
660
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
103✔
661
      }
662
      if (TSDB_CODE_SUCCESS == code) {
103!
663
        tSimpleHashCleanup(pPrev->leftHash);
103✔
664
        tSimpleHashCleanup(pPrev->rightHash);
103✔
665
        pPrev->leftHash = NULL;
103✔
666
        pPrev->rightHash = NULL;
103✔
667
      }
668
    }
669
  } else {
670
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
×
671
    if (TSDB_CODE_SUCCESS == code) {
×
672
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
×
673
    }
674
  }
675

676
  bool initParam = pSrcParam0 ? true : false;
254✔
677
  if (TSDB_CODE_SUCCESS == code) {
254!
678
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
254✔
679
    pSrcParam0 = NULL;
254✔
680
  }
681
  if (TSDB_CODE_SUCCESS == code) {
254!
682
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
254✔
683
    pSrcParam1 = NULL;
254✔
684
  }
685
  if (TSDB_CODE_SUCCESS == code) {
254!
686
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
254✔
687
  }
688
  if (TSDB_CODE_SUCCESS != code) {
254!
689
    if (pSrcParam0) {
×
690
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
691
    }
692
    if (pSrcParam1) {
×
693
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
694
    }
695
    if (pGcParam0) {
×
696
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
697
    }
698
    if (pGcParam1) {
×
699
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
700
    }
701
    if (*ppParam) {
×
702
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
703
      *ppParam = NULL;
×
704
    }
705
  }
706
  
707
  return code;
254✔
708
}
709

710
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
254✔
711
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
254✔
712
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
254✔
713
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
254✔
714
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
254✔
715
  SOperatorParam*            pParam = NULL;
254✔
716
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
254✔
717
  if (TSDB_CODE_SUCCESS != code) {
254!
718
    pOperator->pTaskInfo->code = code;
×
719
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
720
  }
721

722
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
254!
723
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
254✔
724
  if (*ppRes && (code == 0)) {
254!
725
    code = blockDataCheck(*ppRes);
222✔
726
    if (code) {
222!
727
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
728
      pOperator->pTaskInfo->code = code;
×
729
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
730
    }
731
    pPost->isStarted = true;
222✔
732
    pStbJoin->execInfo.postBlkNum++;
222✔
733
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
222✔
734
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
222!
735
  } else {
736
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
32!
737
  }
738
}
254✔
739

740

741
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
742
  SOperatorParam* pGcParam = NULL;
×
743
  SOperatorParam* pMergeJoinParam = NULL;
×
744
  int32_t         downstreamId = leftTable ? 0 : 1;
×
745
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
746
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
747

748
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
749

750
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
751
  if (TSDB_CODE_SUCCESS != code) {
×
752
    return code;
×
753
  }
754
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
755
  if (TSDB_CODE_SUCCESS != code) {
×
756
    return code;
×
757
  }
758

759
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
760
}
761

762
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
254✔
763
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
254✔
764
  int32_t code = 0;
254✔
765
  
766
  pPost->isStarted = false;
254✔
767
  
768
  if (pStbJoin->basic.batchFetch) {
254!
769
    return TSDB_CODE_SUCCESS;
254✔
770
  }
771
  
772
  if (pPost->leftNeedCache) {
×
773
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
774
    if (num && --(*num) <= 0) {
×
775
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
776
      if (code) {
×
777
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
778
        QRY_ERR_RET(code);
×
779
      }
780
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
781
    }
782
  }
783
  
784
  if (!pPost->rightNeedCache) {
×
785
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
786
    if (NULL != v) {
×
787
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
788
      if (code) {
×
789
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
790
        QRY_ERR_RET(code);
×
791
      }
792
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
793
    }
794
  }
795

796
  return TSDB_CODE_SUCCESS;
×
797
}
798

799

800
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
801
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
325✔
802
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
325✔
803
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
325✔
804

805
  if (!pPost->isStarted) {
325✔
806
    return TSDB_CODE_SUCCESS;
103✔
807
  }
808
  
809
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
222!
810
  
811
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
222✔
812
  if (NULL == *ppRes) {
222!
813
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
222!
814
    pPrev->pListHead->readIdx++;
222✔
815
  } else {
816
    pInfo->stbJoin.execInfo.postBlkNum++;
×
817
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
818
  }
819

820
  return TSDB_CODE_SUCCESS;
222✔
821
}
822

823
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
824
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
508✔
825
  if (NULL == ppArray) {
508✔
826
    SArray* pArray = taosArrayInit(10, valSize);
364✔
827
    if (NULL == pArray) {
364!
828
      return terrno;
×
829
    }
830
    if (NULL == taosArrayPush(pArray, pVal)) {
728!
831
      taosArrayDestroy(pArray);
×
832
      return terrno;
×
833
    }
834
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
364!
835
      taosArrayDestroy(pArray);      
×
836
      return terrno;
×
837
    }
838
    return TSDB_CODE_SUCCESS;
364✔
839
  }
840

841
  if (NULL == taosArrayPush(*ppArray, pVal)) {
288!
842
    return terrno;
×
843
  }
844
  
845
  return TSDB_CODE_SUCCESS;
144✔
846
}
847

848
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
849
  int32_t code = TSDB_CODE_SUCCESS;
×
850
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
×
851
  if (NULL == pNum) {
×
852
    uint32_t n = 1;
×
853
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
×
854
    if (code) {
×
855
      return code;
×
856
    }
857
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
×
858
    if (code) {
×
859
      return code;
×
860
    }
861
    return TSDB_CODE_SUCCESS;
×
862
  }
863

864
  switch (*pNum) {
×
865
    case 0:
×
866
      break;
×
867
    case UINT32_MAX:
×
868
      *pNum = 0;
×
869
      break;
×
870
    default:
×
871
      if (1 == (*pNum)) {
×
872
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
873
        if (code) {
×
874
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
875
          QRY_ERR_RET(code);
×
876
        }
877
      }
878
      (*pNum)++;
×
879
      break;
×
880
  }
881
  
882
  return TSDB_CODE_SUCCESS;
×
883
}
884

885

886
static void freeStbJoinTableList(SStbJoinTableList* pList) {
103✔
887
  if (NULL == pList) {
103!
888
    return;
×
889
  }
890
  taosMemoryFree(pList->pLeftVg);
103!
891
  taosMemoryFree(pList->pLeftUid);
103!
892
  taosMemoryFree(pList->pRightVg);
103!
893
  taosMemoryFree(pList->pRightUid);
103!
894
  taosMemoryFree(pList);
103!
895
}
896

897
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
103✔
898
  int32_t code = TSDB_CODE_SUCCESS;
103✔
899
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
103!
900
  if (NULL == pNew) {
103!
901
    return terrno;
×
902
  }
903
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
103!
904
  if (NULL == pNew->pLeftVg) {
103!
905
    code = terrno;
×
906
    freeStbJoinTableList(pNew);
×
907
    return code;
×
908
  }
909
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
103!
910
  if (NULL == pNew->pLeftUid) {
103!
911
    code = terrno;
×
912
    freeStbJoinTableList(pNew);
×
913
    return code;
×
914
  }
915
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
103!
916
  if (NULL == pNew->pRightVg) {
103!
917
    code = terrno;
×
918
    freeStbJoinTableList(pNew);
×
919
    return code;
×
920
  }
921
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
103!
922
  if (NULL == pNew->pRightUid) {
103!
923
    code = terrno;
×
924
    freeStbJoinTableList(pNew);
×
925
    return code;
×
926
  }
927

928
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
103✔
929
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
103✔
930
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
103✔
931
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
103✔
932

933
  pNew->readIdx = 0;
103✔
934
  pNew->uidNum = rows;
103✔
935
  pNew->pNext = NULL;
103✔
936
  
937
  if (pCtx->pListTail) {
103!
938
    pCtx->pListTail->pNext = pNew;
×
939
    pCtx->pListTail = pNew;
×
940
  } else {
941
    pCtx->pListHead = pNew;
103✔
942
    pCtx->pListTail= pNew;
103✔
943
  }
944

945
  return TSDB_CODE_SUCCESS;
103✔
946
}
947

948
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
103✔
949
  int32_t                    code = TSDB_CODE_SUCCESS;
103✔
950
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
103✔
951
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
103✔
952
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
103✔
953
  if (NULL == pVg0) {
103!
954
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
955
  }
956
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
103✔
957
  if (NULL == pVg1) {
103!
958
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
959
  }
960
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
103✔
961
  if (NULL == pUid0) {
103!
962
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
963
  }
964
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
103✔
965
  if (NULL == pUid1) {
103!
966
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
967
  }
968

969
  if (pStbJoin->basic.batchFetch) {
103!
970
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
357✔
971
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
254✔
972
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
254✔
973
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
254✔
974
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
254✔
975

976
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
254✔
977
      if (TSDB_CODE_SUCCESS != code) {
254!
978
        break;
×
979
      }
980
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
254✔
981
      if (TSDB_CODE_SUCCESS != code) {
254!
982
        break;
×
983
      }
984
    }
985
  } else {
986
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
987
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
×
988
    
989
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
×
990
      if (TSDB_CODE_SUCCESS != code) {
×
991
        break;
×
992
      }
993
    }
994
  }
995

996
  if (TSDB_CODE_SUCCESS == code) {
103!
997
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
103✔
998
    if (TSDB_CODE_SUCCESS == code) {
103!
999
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
103✔
1000
    }
1001
  }
1002

1003
_return:
×
1004

1005
  if (TSDB_CODE_SUCCESS != code) {
103!
1006
    pOperator->pTaskInfo->code = code;
×
1007
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1008
  }
1009
}
103✔
1010

1011

1012
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
103✔
1013
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
103✔
1014
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
103✔
1015

1016
  if (pStbJoin->basic.batchFetch) {
103!
1017
    return;
103✔
1018
  }
1019

1020
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
×
1021
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
×
1022
    return;
×
1023
  }
1024

1025
  uint64_t* pUid = NULL;
×
1026
  int32_t iter = 0;
×
1027
  int32_t code = 0;
×
1028
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1029
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1030
    if (code) {
×
1031
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1032
    }
1033
  }
1034

1035
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1036
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1037

1038
/*
1039
  // debug only
1040
  iter = 0;
1041
  uint32_t* num = NULL;
1042
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1043
    A S S E R T(*num > 1);
1044
  }
1045
*/  
1046
}
1047

1048
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
103✔
1049
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
103✔
1050
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
103✔
1051

1052
  while (true) {
103✔
1053
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
206✔
1054
    if (NULL == pBlock) {
206✔
1055
      break;
103✔
1056
    }
1057

1058
    pStbJoin->execInfo.prevBlkNum++;
103✔
1059
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
103✔
1060
    
1061
    doBuildStbJoinTableHash(pOperator, pBlock);
103✔
1062
  }
1063

1064
  postProcessStbJoinTableHash(pOperator);
103✔
1065

1066
  pStbJoin->ctx.prev.joinBuild = true;
103✔
1067
}
103✔
1068

1069
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
325✔
1070
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
325✔
1071
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
325✔
1072
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
325✔
1073
  SStbJoinTableList*         pNode = pPrev->pListHead;
325✔
1074

1075
  while (pNode) {
460✔
1076
    if (pNode->readIdx >= pNode->uidNum) {
357✔
1077
      pPrev->pListHead = pNode->pNext;
103✔
1078
      freeStbJoinTableList(pNode);
103✔
1079
      pNode = pPrev->pListHead;
103✔
1080
      continue;
103✔
1081
    }
1082
    
1083
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
254✔
1084
    if (*ppRes) {
254✔
1085
      return TSDB_CODE_SUCCESS;
222✔
1086
    }
1087

1088
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
32!
1089
    pPrev->pListHead->readIdx++;
32✔
1090
  }
1091

1092
  *ppRes = NULL;
103✔
1093
  setOperatorCompleted(pOperator);
103✔
1094

1095
  return TSDB_CODE_SUCCESS;
103✔
1096
}
1097

1098
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
325✔
1099
  if (pBlock) {
325✔
1100
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
222!
1101
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
222✔
1102
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
222!
1103

1104
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
222!
1105
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
×
1106
        if (pSlot == NULL) {
×
1107
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1108
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1109
        }
1110
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
×
1111
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
×
1112
        if (code != TSDB_CODE_SUCCESS) {
×
1113
          return code;
×
1114
        }
1115
        code = blockDataAppendColInfo(pBlock, &colInfo);
×
1116
        if (code != TSDB_CODE_SUCCESS) {
×
1117
          return code;
×
1118
        }
1119
      }
1120
    } else {
1121
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1122
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1123
    }
1124
  }
1125
  return TSDB_CODE_SUCCESS;
325✔
1126
}
1127

1128
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
329✔
1129
  int32_t                    code = TSDB_CODE_SUCCESS;
329✔
1130
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
329✔
1131
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
329✔
1132

1133
  QRY_PARAM_CHECK(pRes);
329!
1134
  if (pOperator->status == OP_EXEC_DONE) {
329✔
1135
    return code;
4✔
1136
  }
1137

1138
  int64_t st = 0;
325✔
1139
  if (pOperator->cost.openCost == 0) {
325✔
1140
    st = taosGetTimestampUs();
103✔
1141
  }
1142

1143
  if (!pStbJoin->ctx.prev.joinBuild) {
325✔
1144
    buildStbJoinTableList(pOperator);
103✔
1145
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
103!
1146
      setOperatorCompleted(pOperator);
×
1147
      goto _return;
×
1148
    }
1149
  }
1150

1151
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
325!
1152
  if (*pRes) {
325!
1153
    goto _return;
×
1154
  }
1155

1156
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
325!
1157

1158
_return:
325✔
1159
  if (pOperator->cost.openCost == 0) {
325✔
1160
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
103✔
1161
  }
1162

1163
  if (code) {
325!
1164
    qError("%s failed since %s", __func__, tstrerror(code));
×
1165
    pOperator->pTaskInfo->code = code;
×
1166
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1167
  } else {
1168
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
325✔
1169
  }
1170
  return code;
325✔
1171
}
1172

1173
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
×
1174
  int32_t                   code = TSDB_CODE_SUCCESS;
×
1175
  int32_t                   lino = 0;
×
1176
  SVTableScanOperatorParam* pVScan = NULL;
×
1177
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
1178
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
1179

1180
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
1181
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
×
1182

1183
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
×
1184
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno);
×
1185
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
×
1186
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno);
×
1187
  pVScan->uid = uid;
×
1188

1189
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
×
1190
  (*ppRes)->downstreamIdx = 0;
×
1191
  (*ppRes)->value = pVScan;
×
1192
  (*ppRes)->reUse = false;
×
1193

1194
  return TSDB_CODE_SUCCESS;
×
1195
_return:
×
1196
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1197
  if (pVScan) {
×
1198
    taosArrayDestroy(pVScan->pOpParamArray);
×
1199
    taosMemoryFreeClear(pVScan);
×
1200
  }
1201
  if (*ppRes) {
×
1202
    taosArrayDestroy((*ppRes)->pChildren);
×
1203
    taosMemoryFreeClear(*ppRes);
×
1204
  }
1205
  return code;
×
1206
}
1207

1208
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
1209
  int32_t                    lino = 0;
×
1210
  SOperatorInfo*             operator=(SOperatorInfo*) param;
×
1211
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
×
1212

1213
  if (TSDB_CODE_SUCCESS != code) {
×
1214
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1215
    if (operator->pTaskInfo->code != code) {
×
1216
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1217
             tstrerror(operator->pTaskInfo->code));
1218
    } else {
1219
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1220
    }
1221
    goto _return;
×
1222
  }
1223

1224
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
×
1225
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno);
×
1226

1227
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
×
1228
  QUERY_CHECK_CODE(code, lino, _return);
×
1229

1230
  taosMemoryFreeClear(pMsg->pData);
×
1231

1232
  code = tsem_post(&pScanResInfo->vtbScan.ready);
×
1233
  QUERY_CHECK_CODE(code, lino, _return);
×
1234

1235
  return code;
×
1236
_return:
×
1237
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1238
  return code;
×
1239
}
1240

1241
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
×
1242
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1243
  int32_t                    lino = 0;
×
1244
  char*                      buf1 = NULL;
×
1245
  SUseDbReq*                 pReq = NULL;
×
1246
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
×
1247

1248
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
×
1249
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
×
1250
  code = tNameGetFullDbName(name, pReq->db);
×
1251
  QUERY_CHECK_CODE(code, lino, _return);
×
1252
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
×
1253
  buf1 = taosMemoryCalloc(1, contLen);
×
1254
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
×
1255
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
×
1256
  if (tempRes < 0) {
×
1257
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1258
  }
1259

1260
  // send the fetch remote task result request
1261
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
1262
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
×
1263

1264
  pMsgSendInfo->param = pOperator;
×
1265
  pMsgSendInfo->msgInfo.pData = buf1;
×
1266
  pMsgSendInfo->msgInfo.len = contLen;
×
1267
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
×
1268
  pMsgSendInfo->fp = dynProcessUseDbRsp;
×
1269
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
×
1270

1271
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
×
1272
  QUERY_CHECK_CODE(code, lino, _return);
×
1273

1274
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
×
1275
  QUERY_CHECK_CODE(code, lino, _return);
×
1276

1277
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
×
1278
  QUERY_CHECK_CODE(code, lino, _return);
×
1279

1280
_return:
×
1281
  if (code) {
×
1282
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1283
     taosMemoryFree(buf1);
×
1284
  }
1285
  taosMemoryFree(pReq);
×
1286
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
×
1287
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
×
1288
  return code;
×
1289
}
1290

1291
int dynVgInfoComp(const void* lp, const void* rp) {
×
1292
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
×
1293
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
×
1294
  if (pLeft->hashBegin < pRight->hashBegin) {
×
1295
    return -1;
×
1296
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1297
    return 1;
×
1298
  }
1299

1300
  return 0;
×
1301
}
1302

1303
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
×
1304
  if (NULL == dbInfo) {
×
1305
    return TSDB_CODE_SUCCESS;
×
1306
  }
1307

1308
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
×
1309
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
×
1310
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
×
1311
    if (NULL == dbInfo->vgArray) {
×
1312
      return terrno;
×
1313
    }
1314

1315
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
×
1316
    while (pIter) {
×
1317
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
×
1318
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1319
        return terrno;
×
1320
      }
1321

1322
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
×
1323
    }
1324

1325
    taosArraySort(dbInfo->vgArray, sort_func);
×
1326
  }
1327

1328
  return TSDB_CODE_SUCCESS;
×
1329
}
1330

1331
int32_t dynHashValueComp(void const* lp, void const* rp) {
×
1332
  uint32_t*    key = (uint32_t*)lp;
×
1333
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
1334

1335
  if (*key < pVg->hashBegin) {
×
1336
    return -1;
×
1337
  } else if (*key > pVg->hashEnd) {
×
1338
    return 1;
×
1339
  }
1340

1341
  return 0;
×
1342
}
1343

1344
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
×
1345
  int32_t code = 0;
×
1346
  int32_t lino = 0;
×
1347
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
×
1348
  QUERY_CHECK_CODE(code, lino, _return);
×
1349

1350
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
×
1351
  if (vgNum <= 0) {
×
1352
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1353
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1354
  }
1355

1356
  SVgroupInfo* vgInfo = NULL;
×
1357
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1358
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
×
1359
  int32_t offset = (int32_t)strlen(tbFullName);
×
1360

1361
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
×
1362
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
×
1363
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
1364

1365
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
×
1366
  if (NULL == vgInfo) {
×
1367
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1368
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1369
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1370
  }
1371

1372
  *vgId = vgInfo->vgId;
×
1373

1374
_return:
×
1375
  return code;
×
1376
}
1377

1378
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
×
1379
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1380
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1381
  SArray *                   pColList = pVtbScan->readColList;
×
1382
  if (pVtbScan->scanAllCols) {
×
1383
    return true;
×
1384
  }
1385
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
×
1386
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
×
1387
      return true;
×
1388
    }
1389
  }
1390
  return false;
×
1391
}
1392

1393
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
×
1394
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1395
  int32_t                    line = 0;
×
1396
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1397
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1398
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1399
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
×
1400
  SUseDbOutput*              output = NULL;
×
1401
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
×
1402

1403
  QRY_PARAM_CHECK(dbVgInfo);
×
1404

1405
  if (find == NULL) {
×
1406
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
×
1407
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
×
1408
    QUERY_CHECK_CODE(code, line, _return);
×
1409
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
×
1410
    QUERY_CHECK_CODE(code, line, _return);
×
1411
  } else {
1412
    output = *find;
×
1413
  }
1414

1415
  *dbVgInfo = output->dbVgroup;
×
1416
  return code;
×
1417
_return:
×
1418
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1419
  freeUseDbOutput(output);
×
1420
  return code;
×
1421
}
1422

1423
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
×
1424
  int32_t     code = TSDB_CODE_SUCCESS;
×
1425
  int32_t     line = 0;
×
1426
  const char *dbname = NULL;
×
1427
  const char *tablename = NULL;
×
1428
  const char *colname = NULL;
×
1429

1430
  const char *first_dot = strchr(colref, '.');
×
1431
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
×
1432

1433
  const char *second_dot = strchr(first_dot + 1, '.');
×
1434
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
×
1435

1436
  size_t db_len = first_dot - colref;
×
1437
  size_t table_len = second_dot - first_dot - 1;
×
1438
  size_t col_len = strlen(second_dot + 1);
×
1439

1440
  *refDb = taosMemoryMalloc(db_len + 1);
×
1441
  *refTb = taosMemoryMalloc(table_len + 1);
×
1442
  *refCol = taosMemoryMalloc(col_len + 1);
×
1443
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
×
1444
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
×
1445
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
×
1446

1447
  tstrncpy(*refDb, colref, db_len + 1);
×
1448
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
×
1449
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
×
1450

1451
  return TSDB_CODE_SUCCESS;
×
1452
_return:
×
1453
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1454
  if (*refDb) {
×
1455
    taosMemoryFree(*refDb);
×
1456
    *refDb = NULL;
×
1457
  }
1458
  if (*refTb) {
×
1459
    taosMemoryFree(*refTb);
×
1460
    *refTb = NULL;
×
1461
  }
1462
  if (*refCol) {
×
1463
    taosMemoryFree(*refCol);
×
1464
    *refCol = NULL;
×
1465
  }
1466
  return code;
×
1467
}
1468

1469
int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
1470
  int32_t                    code = TSDB_CODE_SUCCESS;
×
1471
  int32_t                    line = 0;
×
1472
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
1473
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
1474
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
1475
  SDBVgInfo*                 dbVgInfo = NULL;
×
1476
  bool                       readerInit = false;
×
1477
  SArray*                    pColRefArray = NULL;
×
1478

1479
  QRY_PARAM_CHECK(pRes);
×
1480
  if (pOperator->status == OP_EXEC_DONE) {
×
1481
    return code;
×
1482
  }
1483

1484
  int64_t st = 0;
×
1485
  if (pOperator->cost.openCost == 0) {
×
1486
    st = taosGetTimestampUs();
×
1487
  }
1488

1489
  if (pVtbScan->childTableMap == NULL) {
×
1490
    pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1491
    QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno);
×
1492

1493

1494
    while (true) {
×
1495
      SSDataBlock *pChildInfo = NULL;
×
1496
      code = pOperator->pDownstream[1]->fpSet.getNextFn(pOperator->pDownstream[1], &pChildInfo);
×
1497
      if (pChildInfo == NULL) {
×
1498
        break;
×
1499
      }
1500
      SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
×
1501
      SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
×
1502
      SColumnInfoData *pColNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
×
1503
      SColumnInfoData *pUidCol = taosArrayGet(pChildInfo->pDataBlock, 3);
×
1504
      SColumnInfoData *pColIdCol = taosArrayGet(pChildInfo->pDataBlock, 4);
×
1505
      SColumnInfoData *pRefCol = taosArrayGet(pChildInfo->pDataBlock, 5);
×
1506
      SColumnInfoData *pVgIdCol = taosArrayGet(pChildInfo->pDataBlock, 6);
×
1507

1508
      for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
×
1509
        if (!colDataIsNull_s(pStbNameCol, i)) {
×
1510
          char* rawname = colDataGetData(pStbNameCol, i);
×
1511
          char *stbname = varDataVal(rawname);
×
1512
          if (strncmp(varDataVal(rawname), pInfo->vtbScan.stbName, varDataLen(rawname)) == 0 && strlen(pInfo->vtbScan.stbName) == varDataLen(rawname)) {
×
1513

1514
            char *ctbName = colDataGetData(pTableNameCol, i);
×
1515
            SColRefKV kv = {0};
×
1516
            if (colDataIsNull_s(pRefCol, i)) {
×
1517
              kv.colrefName = NULL;
×
1518
            } else {
1519
              kv.colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, i)), 1);
×
1520
              QUERY_CHECK_NULL(kv.colrefName, code, line, _return, terrno);
×
1521
              memcpy(kv.colrefName, varDataVal(colDataGetData(pRefCol, i)), varDataLen(colDataGetData(pRefCol, i)));
×
1522
              kv.colrefName[varDataLen(colDataGetData(pRefCol, i))] = 0;
×
1523
            }
1524

1525
            kv.colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, i)), 1);
×
1526
            QUERY_CHECK_NULL(kv.colName, code, line, _return, terrno);
×
1527
            memcpy(kv.colName, varDataVal(colDataGetData(pColNameCol, i)), varDataLen(colDataGetData(pColNameCol, i)));
×
1528
            kv.colName[varDataLen(colDataGetData(pColNameCol, i))] = 0;
×
1529
            if (!colDataIsNull_s(pUidCol, i)) {
×
1530
              GET_TYPED_DATA(kv.uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
×
1531
            }
1532
            if (!colDataIsNull_s(pColIdCol, i)) {
×
1533
              GET_TYPED_DATA(kv.colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, i), 0);
×
1534
            }
1535
            if (!colDataIsNull_s(pVgIdCol, i)) {
×
1536
              GET_TYPED_DATA(kv.vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, i), 0);
×
1537
            }
1538
            if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
×
1539
              pColRefArray = taosArrayInit(1, sizeof(SColRefKV));
×
1540
              QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno);
×
1541
              QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &kv), code, line, _return, terrno);
×
1542
              int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
×
1543
              QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno);
×
1544
              code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
×
1545
              QUERY_CHECK_CODE(code, line, _return);
×
1546
            } else {
1547
              int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
×
1548
              QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno);
×
1549
              pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
×
1550
              QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno);
×
1551
              QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &kv), code, line, _return, terrno);
×
1552
            }
1553
          }
1554
        }
1555
      }
1556
    }
1557
  }
1558

1559
  size_t num = taosHashGetSize(pVtbScan->childTableMap);
×
1560

1561
  // no child table, return
1562
  if (num == 0) {
×
1563
    setOperatorCompleted(pOperator);
×
1564
    return code;
×
1565
  }
1566

1567
  pVtbScan->orgTbVgColMap = taosHashInit(num * 64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1568
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno);
×
1569
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
1570

1571
  while (true) {
1572
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
×
1573
      code = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes);
×
1574
      QUERY_CHECK_CODE(code, line, _return);
×
1575
    } else {
1576
      taosHashClear(pVtbScan->orgTbVgColMap);
×
1577
      SArray* pColMap = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
×
1578
      QUERY_CHECK_NULL(pColMap, code, line, _return, terrno);
×
1579
      tb_uid_t uid = 0;
×
1580
      int32_t  vgId = 0;
×
1581
      for (int32_t j = 0; j < taosArrayGetSize(pColMap); j++) {
×
1582
        SColRefKV *pKV = (SColRefKV*)taosArrayGet(pColMap, j);
×
1583
        uid = pKV->uid;
×
1584
        vgId = pKV->vgId;
×
1585
        if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
×
1586
          char*   refDbName = NULL;
×
1587
          char*   refTbName = NULL;
×
1588
          char*   refColName = NULL;
×
1589
          SName   name = {0};
×
1590
          char    dbFname[TSDB_DB_FNAME_LEN] = {0};
×
1591
          char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
×
1592

1593
          code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
×
1594
          QUERY_CHECK_CODE(code, line, _return);
×
1595

1596
          toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
×
1597

1598
          code = getDbVgInfo(pOperator, &name, &dbVgInfo);
×
1599
          QUERY_CHECK_CODE(code, line, _return);
×
1600
          tNameGetFullDbName(&name, dbFname);
×
1601
          QUERY_CHECK_CODE(code, line, _return);
×
1602
          tNameGetFullTableName(&name, orgTbFName);
×
1603
          QUERY_CHECK_CODE(code, line, _return);
×
1604

1605
          void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
×
1606
          if (!pVal) {
×
1607
            SOrgTbInfo map = {0};
×
1608
            code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
×
1609
            QUERY_CHECK_CODE(code, line, _return);
×
1610
            tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
×
1611
            map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
×
1612
            QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno);
×
1613
            SColIdNameKV colIdNameKV = {0};
×
1614
            colIdNameKV.colId = pKV->colId; // TODO(smj) : select ref colid from ins_cols;
×
1615
            tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
×
1616
            QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno);
×
1617
            code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
×
1618
            QUERY_CHECK_CODE(code, line, _return);
×
1619
          } else {
1620
            SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
×
1621
            SColIdNameKV colIdNameKV = {0};
×
1622
            colIdNameKV.colId = pKV->colId;
×
1623
            tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
×
1624
            QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno);
×
1625
          }
1626
          taosMemoryFree(refDbName);
×
1627
          taosMemoryFree(refTbName);
×
1628
          taosMemoryFree(refColName);
×
1629
        }
1630
      }
1631

1632
      pVtbScan->vtbScanParam = NULL;
×
1633
      code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
×
1634
      QUERY_CHECK_CODE(code, line, _return);
×
1635

1636
      void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
×
1637
      while (pIter != NULL) {
×
1638
        SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
×
1639
        SOperatorParam*  pExchangeParam = NULL;
×
1640
        code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
×
1641
        QUERY_CHECK_CODE(code, line, _return);
×
1642
        QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno);
×
1643
        pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
×
1644
      }
1645

1646
      SOperatorParam*  pExchangeParam = NULL;
×
1647
      code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
×
1648
      QUERY_CHECK_CODE(code, line, _return);
×
1649
      ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
×
1650

1651
      // reset downstream operator's status
1652
      pOperator->pDownstream[0]->status = OP_NOT_OPENED;
×
1653
      code = pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes);
×
1654
      QUERY_CHECK_CODE(code, line, _return);
×
1655
    }
1656

1657
    if (*pRes) {
×
1658
      // has result, still read data from this table.
1659
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
×
1660
      break;
×
1661
    } else {
1662
      // no result, read next table.
1663
      pVtbScan->curTableIdx++;
×
1664
      if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
×
1665
        setOperatorCompleted(pOperator);
×
1666
        break;
×
1667
      }
1668
    }
1669
  }
1670

1671
_return:
×
1672
  taosHashCleanup(pVtbScan->orgTbVgColMap);
×
1673
  pVtbScan->orgTbVgColMap = NULL;
×
1674
  if (pOperator->cost.openCost == 0) {
×
1675
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
1676
  }
1677

1678
  if (code) {
×
1679
    qError("%s failed since %s", __func__, tstrerror(code));
×
1680
    pOperator->pTaskInfo->code = code;
×
1681
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1682
  }
1683

1684
  return code;
×
1685
}
1686

1687
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
103✔
1688
  if (batchFetch) {
103!
1689
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
103✔
1690
    if (NULL == pPrev->leftHash) {
103!
1691
      return terrno;
×
1692
    }
1693
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
103✔
1694
    if (NULL == pPrev->rightHash) {
103!
1695
      return terrno;
×
1696
    }
1697
  } else {
1698
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1699
    if (NULL == pPrev->leftCache) {
×
1700
      return terrno;
×
1701
    }
1702
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1703
    if (NULL == pPrev->rightCache) {
×
1704
      return terrno;
×
1705
    }
1706
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
1707
    if (NULL == pPrev->onceTable) {
×
1708
      return terrno;
×
1709
    }
1710
  }
1711

1712
  return TSDB_CODE_SUCCESS;
103✔
1713
}
1714

1715
static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
×
1716
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
1717
  int32_t      code = TSDB_CODE_SUCCESS;
×
1718
  int32_t      line = 0;
×
1719

1720
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
×
1721
  QUERY_CHECK_CODE(code, line, _return);
×
1722

1723
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
×
1724
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
×
1725
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
×
1726
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
×
1727
  pInfo->vtbScan.pMsgCb = pMsgCb;
×
1728
  pInfo->vtbScan.curTableIdx = 0;
×
1729
  pInfo->vtbScan.lastTableIdx = -1;
×
1730
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
×
1731
  pInfo->vtbScan.stbName = taosStrdup(pPhyciNode->vtbScan.stbName);
×
1732
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno);
×
1733
  QUERY_CHECK_NULL(pInfo->vtbScan.stbName, code, line, _return, terrno);
×
1734

1735
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
×
1736
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno);
×
1737

1738
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
×
1739
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
×
1740
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno);
×
1741
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno);
×
1742
  }
1743

1744
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
×
1745
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno);
×
1746

1747
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
1748
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno);
×
1749

1750
  return code;
×
1751
_return:
×
1752
  // no need to destroy array and hashmap allocated in this function,
1753
  // since the operator's destroy function will take care of it
1754
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1755
  return code;
×
1756
}
1757

1758
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
×
1759
  SDynQueryCtrlOperatorInfo* pDyn = pOper->info;
×
1760
  pOper->status = OP_NOT_OPENED;
×
1761

1762
  switch (pDyn->qType) {
×
1763
    case DYN_QTYPE_STB_HASH:{
×
1764
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
×
1765
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
×
1766
      if (pStbJoin->basic.batchFetch) {
×
1767
        if (pStbJoin->ctx.prev.leftHash) {
×
1768
          tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
×
1769
          tSimpleHashClear(pStbJoin->ctx.prev.leftHash);
×
1770
        }
1771
        if (pStbJoin->ctx.prev.rightHash) {
×
1772
          tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
×
1773
          tSimpleHashClear(pStbJoin->ctx.prev.rightHash);
×
1774
        }
1775
      } else {
1776
        if (pStbJoin->ctx.prev.leftCache) {
×
1777
          tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
×
1778
        }
1779
        if (pStbJoin->ctx.prev.rightCache) {
×
1780
          tSimpleHashClear(pStbJoin->ctx.prev.rightCache);
×
1781
        }
1782
        if (pStbJoin->ctx.prev.onceTable) {
×
1783
          tSimpleHashClear(pStbJoin->ctx.prev.onceTable);
×
1784
        }
1785
      }
1786
      destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
×
1787
      pStbJoin->ctx.prev.pListHead = NULL;
×
1788
      pStbJoin->ctx.prev.joinBuild = false;
×
1789
      pStbJoin->ctx.prev.pListTail = NULL;
×
1790
      pStbJoin->ctx.prev.tableNum = 0;
×
1791

1792
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
×
1793
      break; 
×
1794
    }
1795
    case DYN_QTYPE_VTB_SCAN: {
×
1796
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
×
1797
      
1798
      if (pVtbScan->orgTbVgColMap) {
×
1799
        taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
1800
        taosHashCleanup(pVtbScan->orgTbVgColMap);
×
1801
        pVtbScan->orgTbVgColMap = NULL;
×
1802
      }
1803
      if (pVtbScan->pRsp) {
×
1804
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
1805
        taosMemoryFreeClear(pVtbScan->pRsp);
×
1806
      }
1807

1808
      pVtbScan->curTableIdx = 0;
×
1809
      pVtbScan->lastTableIdx = -1;
×
1810
      break;
×
1811
    }
1812
    default:
×
1813
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
1814
      break;
×
1815
  }
1816
  return 0;
×
1817
}
1818

1819
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
103✔
1820
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1821
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
1822
  QRY_PARAM_CHECK(pOptrInfo);
103!
1823

1824
  int32_t                    code = TSDB_CODE_SUCCESS;
103✔
1825
  int32_t                    line = 0;
103✔
1826
  __optr_fn_t                nextFp = NULL;
103✔
1827
  SOperatorInfo*             pOperator = NULL;
103✔
1828
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
103!
1829
  if (pInfo == NULL) {
103!
1830
    code = terrno;
×
1831
    goto _error;
×
1832
  }
1833

1834
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
103!
1835
  if (pOperator == NULL) {
103!
1836
    code = terrno;
×
1837
    goto _error;
×
1838
  }
1839

1840
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
103✔
1841

1842
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
103✔
1843
  if (TSDB_CODE_SUCCESS != code) {
103!
1844
    goto _error;
×
1845
  }
1846

1847
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
103✔
1848
                  pInfo, pTaskInfo);
1849

1850
  pInfo->qType = pPhyciNode->qType;
103✔
1851
  switch (pInfo->qType) {
103!
1852
    case DYN_QTYPE_STB_HASH:
103✔
1853
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
103✔
1854
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
103✔
1855
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
103✔
1856
      if (TSDB_CODE_SUCCESS != code) {
103!
1857
        goto _error;
×
1858
      }
1859
      nextFp = seqStableJoin;
103✔
1860
      break;
103✔
1861
    case DYN_QTYPE_VTB_SCAN:
×
1862
      code = initVtbScanInfo(pOperator, pInfo, pMsgCb, pPhyciNode, pTaskInfo);
×
1863
      QUERY_CHECK_CODE(code, line, _error);
×
1864
      nextFp = vtbScan;
×
1865
      break;
×
1866
    default:
×
1867
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1868
      code = TSDB_CODE_INVALID_PARA;
×
1869
      goto _error;
×
1870
  }
1871

1872
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
103✔
1873
                                         NULL, optrDefaultGetNextExtFn, NULL);
1874

1875
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
103✔
1876
  *pOptrInfo = pOperator;
103✔
1877
  return TSDB_CODE_SUCCESS;
103✔
1878

1879
_error:
×
1880
  if (pInfo != NULL) {
×
1881
    destroyDynQueryCtrlOperator(pInfo);
×
1882
  }
1883

1884
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1885
  pTaskInfo->code = code;
×
1886
  return code;
×
1887
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc