• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4754

25 Sep 2025 05:58AM UTC coverage: 57.946% (-1.0%) from 58.977%
#4754

push

travis-ci

web-flow
enh: taos command line support '-uroot' on windows (#33055)

133189 of 293169 branches covered (45.43%)

Branch coverage included in aggregate %.

201677 of 284720 relevant lines covered (70.83%)

5398749.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.22
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
6,279✔
41
  SStbJoinTableList* pNext = NULL;
6,279✔
42
  
43
  while (pListHead) {
6,280✔
44
    taosMemoryFree(pListHead->pLeftVg);
1!
45
    taosMemoryFree(pListHead->pLeftUid);
1!
46
    taosMemoryFree(pListHead->pRightVg);
1!
47
    taosMemoryFree(pListHead->pRightUid);
1!
48
    pNext = pListHead->pNext;
1✔
49
    taosMemoryFree(pListHead);
1!
50
    pListHead = pNext;
1✔
51
  }
52
}
6,279✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
6,279✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
6,279✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
6,279✔
60
    if (pStbJoin->ctx.prev.leftHash) {
6,270✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
5,974✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
5,974✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
6,270✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
5,974✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
5,974✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
9!
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
9✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
9!
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
9✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
9!
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
9✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
6,279✔
81
}
6,279✔
82
typedef struct {
83
  char*    colName;
84
  char*    colrefName;
85
  tb_uid_t uid;
86
  col_id_t colId;
87
  int32_t  vgId;
88
} SColRefKV;
89

90
void destroyOrgTbInfo(void *info) {
16✔
91
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
16✔
92
  if (pOrgTbInfo) {
16!
93
    taosArrayDestroy(pOrgTbInfo->colMap);
16✔
94
  }
95
}
16✔
96

97
void destroyColRefKV(void *info) {
32✔
98
  SColRefKV *pColRefKV = (SColRefKV *)info;
32✔
99
  if (pColRefKV) {
32!
100
    taosMemoryFree(pColRefKV->colName);
32!
101
    taosMemoryFree(pColRefKV->colrefName);
32!
102
  }
103
}
32✔
104

105
void destroyColRefArray(void *info) {
8✔
106
  SArray *pColRefArray = *(SArray **)info;
8✔
107
  if (pColRefArray) {
8!
108
    taosArrayDestroyEx(pColRefArray, destroyColRefKV);
8✔
109
  }
110
}
8✔
111

112
void freeUseDbOutput(void* pOutput) {
4✔
113
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
4✔
114
  if (NULL == pOutput) {
4!
115
    return;
×
116
  }
117

118
  if (pOut->dbVgroup) {
4!
119
    freeVgInfo(pOut->dbVgroup);
4✔
120
  }
121
  taosMemFree(pOut);
4✔
122
}
123

124
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
4✔
125
  if (pVtbScan->dbName) {
4!
126
    taosMemoryFreeClear(pVtbScan->dbName);
4!
127
  }
128
  if (pVtbScan->stbName) {
4!
129
    taosMemoryFreeClear(pVtbScan->stbName);
4!
130
  }
131
  if (pVtbScan->childTableList) {
4!
132
    taosArrayDestroyEx(pVtbScan->childTableList, destroyColRefArray);
4✔
133
  }
134
  if (pVtbScan->childTableMap) {
4!
135
    taosHashCleanup(pVtbScan->childTableMap);
4✔
136
  }
137
  if (pVtbScan->readColList) {
4!
138
    taosArrayDestroy(pVtbScan->readColList);
4✔
139
  }
140
  if (pVtbScan->dbVgInfoMap) {
4!
141
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
4✔
142
    taosHashCleanup(pVtbScan->dbVgInfoMap);
4✔
143
  }
144
  if (pVtbScan->orgTbVgColMap) {
4!
145
    taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
146
    taosHashCleanup(pVtbScan->orgTbVgColMap);
×
147
  }
148
  if (pVtbScan->pRsp) {
4!
149
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
150
    taosMemoryFreeClear(pVtbScan->pRsp);
×
151
  }
152
}
4✔
153

154
static void destroyDynQueryCtrlOperator(void* param) {
6,283✔
155
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
6,283✔
156

157
  switch (pDyn->qType) {
6,283!
158
    case DYN_QTYPE_STB_HASH:
6,279✔
159
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
6,279✔
160
      break;
6,279✔
161
    case DYN_QTYPE_VTB_SCAN:
4✔
162
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
4✔
163
      break;
4✔
164
    default:
×
165
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
166
      break;
×
167
  }
168

169
  taosMemoryFreeClear(param);
6,283!
170
}
6,283✔
171

172
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
173
  if (batchFetch) {
55,682✔
174
    return true;
55,646✔
175
  }
176
  
177
  if (rightTable) {
36!
178
    return pPost->rightCurrUid == pPost->rightNextUid;
18✔
179
  }
180

181
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
18✔
182

183
  return (NULL == num) ? false : true;
18✔
184
}
185

186
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
27,841✔
187
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
27,841✔
188
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
27,841✔
189
  SStbJoinTableList*         pNode = pPrev->pListHead;
27,841✔
190
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
27,841✔
191
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
27,841✔
192
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
27,841✔
193
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
27,841✔
194
  int64_t                    readIdx = pNode->readIdx + 1;
27,841✔
195
  int64_t                    rightPrevUid = pPost->rightCurrUid;
27,841✔
196

197
  pPost->leftCurrUid = *leftUid;
27,841✔
198
  pPost->rightCurrUid = *rightUid;
27,841✔
199

200
  pPost->leftVgId = *leftVgId;
27,841✔
201
  pPost->rightVgId = *rightVgId;
27,841✔
202

203
  while (true) {
204
    if (readIdx < pNode->uidNum) {
27,841✔
205
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
27,537✔
206
      break;
27,537✔
207
    }
208
    
209
    pNode = pNode->pNext;
304✔
210
    if (NULL == pNode) {
304!
211
      pPost->rightNextUid = 0;
304✔
212
      break;
304✔
213
    }
214
    
215
    rightUid = pNode->pRightUid;
×
216
    readIdx = 0;
×
217
  }
218

219
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
27,841✔
220
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
27,841✔
221

222
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
27,841!
223
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
224
    pStbJoin->execInfo.rightCacheNum++;
×
225
  }  
226

227
  return TSDB_CODE_SUCCESS;
27,841✔
228
}
229

230

231
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
55,682✔
232
  int32_t code = TSDB_CODE_SUCCESS;
55,682✔
233
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
55,682!
234
  if (NULL == *ppRes) {
55,682!
235
    code = terrno;
×
236
    freeOperatorParam(pChild, OP_GET_PARAM);
×
237
    return code;
×
238
  }
239
  if (pChild) {
55,682✔
240
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
628✔
241
    if (NULL == (*ppRes)->pChildren) {
628!
242
      code = terrno;
×
243
      freeOperatorParam(pChild, OP_GET_PARAM);
×
244
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
245
      *ppRes = NULL;
×
246
      return code;
×
247
    }
248
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
1,256!
249
      code = terrno;
×
250
      freeOperatorParam(pChild, OP_GET_PARAM);
×
251
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
252
      *ppRes = NULL;
×
253
      return code;
×
254
    }
255
  } else {
256
    (*ppRes)->pChildren = NULL;
55,054✔
257
  }
258

259
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
55,682!
260
  if (NULL == pGc) {
55,682!
261
    code = terrno;
×
262
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
263
    *ppRes = NULL;
×
264
    return code;
×
265
  }
266

267
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
55,682✔
268
  pGc->downstreamIdx = downstreamIdx;
55,682✔
269
  pGc->vgId = vgId;
55,682✔
270
  pGc->tbUid = tbUid;
55,682✔
271
  pGc->needCache = needCache;
55,682✔
272

273
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
55,682✔
274
  (*ppRes)->downstreamIdx = downstreamIdx;
55,682✔
275
  (*ppRes)->value = pGc;
55,682✔
276
  (*ppRes)->reUse = false;
55,682✔
277

278
  return TSDB_CODE_SUCCESS;
55,682✔
279
}
280

281

282
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
283
  int32_t code = TSDB_CODE_SUCCESS;
×
284
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
285
  if (NULL == *ppRes) {
×
286
    return terrno;
×
287
  }
288
  (*ppRes)->pChildren = NULL;
×
289

290
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
291
  if (NULL == pGc) {
×
292
    code = terrno;
×
293
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
294
    return code;
×
295
  }
296

297
  pGc->downstreamIdx = downstreamIdx;
×
298
  pGc->vgId = vgId;
×
299
  pGc->tbUid = tbUid;
×
300

301
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
302
  (*ppRes)->downstreamIdx = downstreamIdx;
×
303
  (*ppRes)->value = pGc;
×
304
  (*ppRes)->reUse = false;
×
305

306
  return TSDB_CODE_SUCCESS;
×
307
}
308

309

310
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
36✔
311
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
36!
312
  if (NULL == *ppRes) {
36!
313
    return terrno;
×
314
  }
315
  (*ppRes)->pChildren = NULL;
36✔
316
  
317
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
36!
318
  if (NULL == pExc) {
36!
319
    return terrno;
×
320
  }
321

322
  pExc->multiParams = false;
36✔
323
  pExc->basic.vgId = *pVgId;
36✔
324
  pExc->basic.tableSeq = true;
36✔
325
  pExc->basic.isVtbRefScan = false;
36✔
326
  pExc->basic.isVtbTagScan = false;
36✔
327
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
36✔
328
  pExc->basic.colMap = NULL;
36✔
329
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
36✔
330
  if (NULL == pExc->basic.uidList) {
36!
331
    taosMemoryFree(pExc);
×
332
    return terrno;
×
333
  }
334
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
72!
335
    taosArrayDestroy(pExc->basic.uidList);
×
336
    taosMemoryFree(pExc);
×
337
    return terrno;
×
338
  }
339

340
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
36✔
341
  (*ppRes)->downstreamIdx = downstreamIdx;
36✔
342
  (*ppRes)->value = pExc;
36✔
343
  (*ppRes)->reUse = false;
36✔
344

345
  return TSDB_CODE_SUCCESS;
36✔
346
}
347

348
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
525✔
349
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
525!
350
  if (NULL == *ppRes) {
525!
351
    return terrno;
×
352
  }
353
  (*ppRes)->pChildren = NULL;
525✔
354
  
355
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
525!
356
  if (NULL == pExc) {
525!
357
    taosMemoryFreeClear(*ppRes);
×
358
    return terrno;
×
359
  }
360

361
  pExc->multiParams = true;
525✔
362
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
525✔
363
  if (NULL == pExc->pBatchs) {
525!
364
    taosMemoryFree(pExc);
×
365
    taosMemoryFreeClear(*ppRes);
×
366
    return terrno;
×
367
  }
368
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
525✔
369
  
370
  SExchangeOperatorBasicParam basic;
371
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
525✔
372

373
  int32_t iter = 0;
525✔
374
  void* p = NULL;
525✔
375
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
1,413✔
376
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
888✔
377
    SArray* pUidList = *(SArray**)p;
888✔
378
    basic.vgId = *pVgId;
888✔
379
    basic.uidList = pUidList;
888✔
380
    basic.colMap = NULL;
888✔
381
    basic.tableSeq = false;
888✔
382
    basic.isVtbRefScan = false;
888✔
383
    basic.isVtbTagScan = false;
888✔
384
    
385
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
888!
386

387
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
888!
388
    *(SArray**)p = NULL;
888✔
389
  }
390

391
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
525✔
392
  (*ppRes)->downstreamIdx = downstreamIdx;
525✔
393
  (*ppRes)->value = pExc;
525✔
394
  (*ppRes)->reUse = false;
525✔
395

396
  return TSDB_CODE_SUCCESS;
525✔
397
}
398

399
static int32_t buildExchangeOperatorParamForVTagScan(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, tb_uid_t uid) {
16✔
400
  int32_t                      code = TSDB_CODE_SUCCESS;
16✔
401
  int32_t                      lino = 0;
16✔
402
  SExchangeOperatorParam*      pExc = NULL;
16✔
403
  SExchangeOperatorBasicParam* basic = NULL;
16✔
404

405
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
16!
406
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
16!
407
  (*ppRes)->pChildren = NULL;
16✔
408

409
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
16!
410
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno);
16!
411

412
  pExc->multiParams = false;
16✔
413

414
  basic = &pExc->basic;
16✔
415
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
16✔
416

417
  basic->vgId = vgId;
16✔
418
  basic->tableSeq = false;
16✔
419
  basic->isVtbRefScan = false;
16✔
420
  basic->isVtbTagScan = true;
16✔
421
  basic->colMap = NULL;
16✔
422

423
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
16✔
424
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno);
16!
425
  taosArrayPush(basic->uidList, &uid);
16✔
426

427
  (*ppRes)->pChildren = NULL;
16✔
428

429
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
16✔
430
  (*ppRes)->downstreamIdx = downstreamIdx;
16✔
431
  (*ppRes)->value = pExc;
16✔
432
  (*ppRes)->reUse = true;
16✔
433

434
  return TSDB_CODE_SUCCESS;
16✔
435

436
_return:
×
437
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
438
  taosMemoryFreeClear(*ppRes);
×
439
  if (basic) {
×
440
    if (basic->colMap) {
×
441
      taosArrayDestroy(basic->colMap->colMap);
×
442
      taosMemoryFreeClear(basic->colMap);
×
443
    }
444
    if (basic->uidList) {
×
445
      taosArrayDestroy(basic->uidList);
×
446
    }
447
    taosMemoryFreeClear(basic);
×
448
  }
449
  taosMemoryFreeClear(pExc);
×
450
  return code;
×
451
}
452

453
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
16✔
454
  int32_t                      code = TSDB_CODE_SUCCESS;
16✔
455
  int32_t                      lino = 0;
16✔
456
  SExchangeOperatorParam*      pExc = NULL;
16✔
457
  SExchangeOperatorBasicParam* basic = NULL;
16✔
458

459
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
16!
460
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
16!
461
  (*ppRes)->pChildren = NULL;
16✔
462

463
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
16!
464
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno);
16!
465

466
  pExc->multiParams = false;
16✔
467

468
  basic = &pExc->basic;
16✔
469
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
16✔
470

471
  basic->vgId = pMap->vgId;
16✔
472
  basic->tableSeq = false;
16✔
473
  basic->isVtbRefScan = true;
16✔
474
  basic->isVtbTagScan = false;
16✔
475
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
16!
476
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno);
16!
477
  basic->colMap->vgId = pMap->vgId;
16✔
478
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
16✔
479
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
16✔
480
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno);
16!
481

482
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
16✔
483
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno);
16!
484

485
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
16✔
486
  (*ppRes)->downstreamIdx = downstreamIdx;
16✔
487
  (*ppRes)->value = pExc;
16✔
488
  (*ppRes)->reUse = true;
16✔
489

490
  return TSDB_CODE_SUCCESS;
16✔
491

492
_return:
×
493
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
494
  taosMemoryFreeClear(*ppRes);
×
495
  if (basic) {
×
496
    if (basic->colMap) {
×
497
      taosArrayDestroy(basic->colMap->colMap);
×
498
      taosMemoryFreeClear(basic->colMap);
×
499
    }
500
    if (basic->uidList) {
×
501
      taosArrayDestroy(basic->uidList);
×
502
    }
503
    taosMemoryFreeClear(basic);
×
504
  }
505
  taosMemoryFreeClear(pExc);
×
506
  return code;
×
507
}
508

509
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
27,841✔
510
  int32_t code = TSDB_CODE_SUCCESS;
27,841✔
511
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
27,841!
512
  if (NULL == *ppRes) {
27,841!
513
    code = terrno;
×
514
    return code;
×
515
  }
516
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
27,841✔
517
  if (NULL == (*ppRes)->pChildren) {
27,841!
518
    code = terrno;
×
519
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
520
    *ppRes = NULL;
×
521
    return code;
×
522
  }
523
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
55,682!
524
    code = terrno;
×
525
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
526
    *ppRes = NULL;
×
527
    return code;
×
528
  }
529
  *ppChild0 = NULL;
27,841✔
530
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
55,682!
531
    code = terrno;
×
532
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
533
    *ppRes = NULL;
×
534
    return code;
×
535
  }
536
  *ppChild1 = NULL;
27,841✔
537
  
538
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
27,841!
539
  if (NULL == pJoin) {
27,841!
540
    code = terrno;
×
541
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
542
    *ppRes = NULL;
×
543
    return code;
×
544
  }
545

546
  pJoin->initDownstream = initParam;
27,841✔
547
  
548
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
27,841✔
549
  (*ppRes)->value = pJoin;
27,841✔
550
  (*ppRes)->reUse = false;
27,841✔
551

552
  return TSDB_CODE_SUCCESS;
27,841✔
553
}
554

555
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
556
  int32_t code = TSDB_CODE_SUCCESS;
×
557
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
558
  if (NULL == *ppRes) {
×
559
    code = terrno;
×
560
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
561
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
562
    return code;
×
563
  }
564
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
565
  if (NULL == *ppRes) {
×
566
    code = terrno;
×
567
    taosMemoryFreeClear(*ppRes);
×
568
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
569
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
570
    return code;
×
571
  }
572
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
573
    code = terrno;
×
574
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
575
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
576
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
577
    *ppRes = NULL;
×
578
    return code;
×
579
  }
580
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
581
    code = terrno;
×
582
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
583
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
584
    *ppRes = NULL;
×
585
    return code;
×
586
  }
587
  
588
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
589
  (*ppRes)->value = NULL;
×
590
  (*ppRes)->reUse = false;
×
591

592
  return TSDB_CODE_SUCCESS;
×
593
}
594

595
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
67✔
596
  int32_t code = TSDB_CODE_SUCCESS;
67✔
597
  int32_t vgNum = tSimpleHashGetSize(pVg);
67✔
598
  if (vgNum <= 0 || vgNum > 1) {
67!
599
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
600
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
601
  }
602

603
  int32_t iter = 0;
67✔
604
  void* p = NULL;
67✔
605
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
134✔
606
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
67✔
607
    SArray* pUidList = *(SArray**)p;
67✔
608

609
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
67✔
610
    if (code) {
67!
611
      return code;
×
612
    }
613
    taosArrayDestroy(pUidList);
67✔
614
    *(SArray**)p = NULL;
67✔
615
  }
616
  
617
  return TSDB_CODE_SUCCESS;
67✔
618
}
619

620

621
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
622
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
623
  if (NULL == pUidList) {
×
624
    return terrno;
×
625
  }
626
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
627
    return terrno;
×
628
  }
629

630
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
631
  taosArrayDestroy(pUidList);
×
632
  if (code) {
×
633
    return code;
×
634
  }
635
  
636
  return TSDB_CODE_SUCCESS;
×
637
}
638

639
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
27,841✔
640
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
27,841✔
641
  SOperatorParam*             pSrcParam0 = NULL;
27,841✔
642
  SOperatorParam*             pSrcParam1 = NULL;
27,841✔
643
  SOperatorParam*             pGcParam0 = NULL;
27,841✔
644
  SOperatorParam*             pGcParam1 = NULL;  
27,841✔
645
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
27,841✔
646
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
27,841✔
647
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
27,841✔
648
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
27,841✔
649
  int32_t                     code = TSDB_CODE_SUCCESS;
27,841✔
650

651
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
27,841✔
652
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
653

654
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
27,841!
655
  
656
  if (pInfo->stbJoin.basic.batchFetch) {
27,841✔
657
    if (pPrev->leftHash) {
27,823✔
658
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
296✔
659
      if (TSDB_CODE_SUCCESS == code) {
296!
660
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
296✔
661
      }
662
      if (TSDB_CODE_SUCCESS == code) {
296!
663
        tSimpleHashCleanup(pPrev->leftHash);
296✔
664
        tSimpleHashCleanup(pPrev->rightHash);
296✔
665
        pPrev->leftHash = NULL;
296✔
666
        pPrev->rightHash = NULL;
296✔
667
      }
668
    }
669
  } else {
670
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
18!
671
    if (TSDB_CODE_SUCCESS == code) {
18!
672
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
18!
673
    }
674
  }
675

676
  bool initParam = pSrcParam0 ? true : false;
27,841✔
677
  if (TSDB_CODE_SUCCESS == code) {
27,841!
678
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
27,841✔
679
    pSrcParam0 = NULL;
27,841✔
680
  }
681
  if (TSDB_CODE_SUCCESS == code) {
27,841!
682
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
27,841✔
683
    pSrcParam1 = NULL;
27,841✔
684
  }
685
  if (TSDB_CODE_SUCCESS == code) {
27,841!
686
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
27,841✔
687
  }
688
  if (TSDB_CODE_SUCCESS != code) {
27,841!
689
    if (pSrcParam0) {
×
690
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
691
    }
692
    if (pSrcParam1) {
×
693
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
694
    }
695
    if (pGcParam0) {
×
696
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
697
    }
698
    if (pGcParam1) {
×
699
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
700
    }
701
    if (*ppParam) {
×
702
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
703
      *ppParam = NULL;
×
704
    }
705
  }
706
  
707
  return code;
27,841✔
708
}
709

710
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
27,841✔
711
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
27,841✔
712
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
27,841✔
713
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
27,841✔
714
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
27,841✔
715
  SOperatorParam*            pParam = NULL;
27,841✔
716
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
27,841✔
717
  if (TSDB_CODE_SUCCESS != code) {
27,841!
718
    pOperator->pTaskInfo->code = code;
×
719
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
720
  }
721

722
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
27,841✔
723
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
27,841✔
724
  if (*ppRes && (code == 0)) {
27,841!
725
    code = blockDataCheck(*ppRes);
806✔
726
    if (code) {
806!
727
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
728
      pOperator->pTaskInfo->code = code;
×
729
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
730
    }
731
    pPost->isStarted = true;
806✔
732
    pStbJoin->execInfo.postBlkNum++;
806✔
733
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
806✔
734
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
806!
735
  } else {
736
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
27,035✔
737
  }
738
}
27,841✔
739

740

741
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
742
  SOperatorParam* pGcParam = NULL;
×
743
  SOperatorParam* pMergeJoinParam = NULL;
×
744
  int32_t         downstreamId = leftTable ? 0 : 1;
×
745
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
746
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
747

748
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
749

750
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
751
  if (TSDB_CODE_SUCCESS != code) {
×
752
    return code;
×
753
  }
754
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
755
  if (TSDB_CODE_SUCCESS != code) {
×
756
    return code;
×
757
  }
758

759
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
760
}
761

762
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
27,840✔
763
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
27,840✔
764
  int32_t code = 0;
27,840✔
765
  
766
  pPost->isStarted = false;
27,840✔
767
  
768
  if (pStbJoin->basic.batchFetch) {
27,840✔
769
    return TSDB_CODE_SUCCESS;
27,822✔
770
  }
771
  
772
  if (pPost->leftNeedCache) {
18!
773
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
774
    if (num && --(*num) <= 0) {
×
775
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
776
      if (code) {
×
777
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
778
        QRY_ERR_RET(code);
×
779
      }
780
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
781
    }
782
  }
783
  
784
  if (!pPost->rightNeedCache) {
18!
785
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
18✔
786
    if (NULL != v) {
18!
787
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
788
      if (code) {
×
789
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
790
        QRY_ERR_RET(code);
×
791
      }
792
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
793
    }
794
  }
795

796
  return TSDB_CODE_SUCCESS;
18✔
797
}
798

799

800
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
801
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,110✔
802
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
1,110✔
803
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
1,110✔
804

805
  if (!pPost->isStarted) {
1,110✔
806
    return TSDB_CODE_SUCCESS;
305✔
807
  }
808
  
809
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
805!
810
  
811
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
805✔
812
  if (NULL == *ppRes) {
805!
813
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
805!
814
    pPrev->pListHead->readIdx++;
805✔
815
  } else {
816
    pInfo->stbJoin.execInfo.postBlkNum++;
×
817
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
818
  }
819

820
  return TSDB_CODE_SUCCESS;
805✔
821
}
822

823
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
824
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
55,654✔
825
  if (NULL == ppArray) {
55,654✔
826
    SArray* pArray = taosArrayInit(10, valSize);
955✔
827
    if (NULL == pArray) {
955!
828
      return terrno;
×
829
    }
830
    if (NULL == taosArrayPush(pArray, pVal)) {
1,910!
831
      taosArrayDestroy(pArray);
×
832
      return terrno;
×
833
    }
834
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
955!
835
      taosArrayDestroy(pArray);      
×
836
      return terrno;
×
837
    }
838
    return TSDB_CODE_SUCCESS;
955✔
839
  }
840

841
  if (NULL == taosArrayPush(*ppArray, pVal)) {
109,398!
842
    return terrno;
×
843
  }
844
  
845
  return TSDB_CODE_SUCCESS;
54,699✔
846
}
847

848
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
849
  int32_t code = TSDB_CODE_SUCCESS;
18✔
850
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
18✔
851
  if (NULL == pNum) {
18!
852
    uint32_t n = 1;
18✔
853
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
18✔
854
    if (code) {
18!
855
      return code;
×
856
    }
857
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
18✔
858
    if (code) {
18!
859
      return code;
×
860
    }
861
    return TSDB_CODE_SUCCESS;
18✔
862
  }
863

864
  switch (*pNum) {
×
865
    case 0:
×
866
      break;
×
867
    case UINT32_MAX:
×
868
      *pNum = 0;
×
869
      break;
×
870
    default:
×
871
      if (1 == (*pNum)) {
×
872
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
873
        if (code) {
×
874
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
875
          QRY_ERR_RET(code);
×
876
        }
877
      }
878
      (*pNum)++;
×
879
      break;
×
880
  }
881
  
882
  return TSDB_CODE_SUCCESS;
×
883
}
884

885

886
static void freeStbJoinTableList(SStbJoinTableList* pList) {
304✔
887
  if (NULL == pList) {
304!
888
    return;
×
889
  }
890
  taosMemoryFree(pList->pLeftVg);
304!
891
  taosMemoryFree(pList->pLeftUid);
304!
892
  taosMemoryFree(pList->pRightVg);
304!
893
  taosMemoryFree(pList->pRightUid);
304!
894
  taosMemoryFree(pList);
304!
895
}
896

897
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
305✔
898
  int32_t code = TSDB_CODE_SUCCESS;
305✔
899
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
305!
900
  if (NULL == pNew) {
305!
901
    return terrno;
×
902
  }
903
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
305!
904
  if (NULL == pNew->pLeftVg) {
305!
905
    code = terrno;
×
906
    freeStbJoinTableList(pNew);
×
907
    return code;
×
908
  }
909
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
305!
910
  if (NULL == pNew->pLeftUid) {
305!
911
    code = terrno;
×
912
    freeStbJoinTableList(pNew);
×
913
    return code;
×
914
  }
915
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
305!
916
  if (NULL == pNew->pRightVg) {
305!
917
    code = terrno;
×
918
    freeStbJoinTableList(pNew);
×
919
    return code;
×
920
  }
921
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
305!
922
  if (NULL == pNew->pRightUid) {
305!
923
    code = terrno;
×
924
    freeStbJoinTableList(pNew);
×
925
    return code;
×
926
  }
927

928
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
305✔
929
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
305✔
930
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
305✔
931
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
305✔
932

933
  pNew->readIdx = 0;
305✔
934
  pNew->uidNum = rows;
305✔
935
  pNew->pNext = NULL;
305✔
936
  
937
  if (pCtx->pListTail) {
305!
938
    pCtx->pListTail->pNext = pNew;
×
939
    pCtx->pListTail = pNew;
×
940
  } else {
941
    pCtx->pListHead = pNew;
305✔
942
    pCtx->pListTail= pNew;
305✔
943
  }
944

945
  return TSDB_CODE_SUCCESS;
305✔
946
}
947

948
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
305✔
949
  int32_t                    code = TSDB_CODE_SUCCESS;
305✔
950
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
305✔
951
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
305✔
952
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
305✔
953
  if (NULL == pVg0) {
305!
954
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
955
  }
956
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
305✔
957
  if (NULL == pVg1) {
305!
958
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
959
  }
960
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
305✔
961
  if (NULL == pUid0) {
305!
962
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
963
  }
964
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
305✔
965
  if (NULL == pUid1) {
305!
966
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
967
  }
968

969
  if (pStbJoin->basic.batchFetch) {
305✔
970
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
28,123✔
971
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
27,827✔
972
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
27,827✔
973
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
27,827✔
974
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
27,827✔
975

976
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
27,827✔
977
      if (TSDB_CODE_SUCCESS != code) {
27,827!
978
        break;
×
979
      }
980
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
27,827✔
981
      if (TSDB_CODE_SUCCESS != code) {
27,827!
982
        break;
×
983
      }
984
    }
985
  } else {
986
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
27✔
987
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
18✔
988
    
989
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
18✔
990
      if (TSDB_CODE_SUCCESS != code) {
18!
991
        break;
×
992
      }
993
    }
994
  }
995

996
  if (TSDB_CODE_SUCCESS == code) {
305!
997
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
305✔
998
    if (TSDB_CODE_SUCCESS == code) {
305!
999
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
305✔
1000
    }
1001
  }
1002

1003
_return:
×
1004

1005
  if (TSDB_CODE_SUCCESS != code) {
305!
1006
    pOperator->pTaskInfo->code = code;
×
1007
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
1008
  }
1009
}
305✔
1010

1011

1012
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
6,279✔
1013
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
6,279✔
1014
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
6,279✔
1015

1016
  if (pStbJoin->basic.batchFetch) {
6,279✔
1017
    return;
6,279✔
1018
  }
1019

1020
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
9!
1021
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
9✔
1022
    return;
9✔
1023
  }
1024

1025
  uint64_t* pUid = NULL;
×
1026
  int32_t iter = 0;
×
1027
  int32_t code = 0;
×
1028
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
1029
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
1030
    if (code) {
×
1031
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
1032
    }
1033
  }
1034

1035
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
1036
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
1037

1038
/*
1039
  // debug only
1040
  iter = 0;
1041
  uint32_t* num = NULL;
1042
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
1043
    A S S E R T(*num > 1);
1044
  }
1045
*/  
1046
}
1047

1048
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
6,279✔
1049
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
6,279✔
1050
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
6,279✔
1051

1052
  while (true) {
305✔
1053
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
6,584✔
1054
    if (NULL == pBlock) {
6,584✔
1055
      break;
6,279✔
1056
    }
1057

1058
    pStbJoin->execInfo.prevBlkNum++;
305✔
1059
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
305✔
1060
    
1061
    doBuildStbJoinTableHash(pOperator, pBlock);
305✔
1062
  }
1063

1064
  postProcessStbJoinTableHash(pOperator);
6,279✔
1065

1066
  pStbJoin->ctx.prev.joinBuild = true;
6,279✔
1067
}
6,279✔
1068

1069
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,110✔
1070
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,110✔
1071
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,110✔
1072
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,110✔
1073
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,110✔
1074

1075
  while (pNode) {
28,449✔
1076
    if (pNode->readIdx >= pNode->uidNum) {
28,145✔
1077
      pPrev->pListHead = pNode->pNext;
304✔
1078
      freeStbJoinTableList(pNode);
304✔
1079
      pNode = pPrev->pListHead;
304✔
1080
      continue;
304✔
1081
    }
1082
    
1083
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
27,841✔
1084
    if (*ppRes) {
27,841✔
1085
      return TSDB_CODE_SUCCESS;
806✔
1086
    }
1087

1088
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
27,035!
1089
    pPrev->pListHead->readIdx++;
27,035✔
1090
  }
1091

1092
  *ppRes = NULL;
304✔
1093
  setOperatorCompleted(pOperator);
304✔
1094

1095
  return TSDB_CODE_SUCCESS;
304✔
1096
}
1097

1098
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
7,084✔
1099
  if (pBlock) {
7,084✔
1100
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
806!
1101
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
806✔
1102
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
806!
1103

1104
      for (int i = (int)pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
836✔
1105
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
30✔
1106
        if (pSlot == NULL) {
30!
1107
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
1108
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1109
        }
1110
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
30✔
1111
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
30✔
1112
        if (code != TSDB_CODE_SUCCESS) {
30!
1113
          return code;
×
1114
        }
1115
        code = blockDataAppendColInfo(pBlock, &colInfo);
30✔
1116
        if (code != TSDB_CODE_SUCCESS) {
30!
1117
          return code;
×
1118
        }
1119
      }
1120
    } else {
1121
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
1122
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1123
    }
1124
  }
1125
  return TSDB_CODE_SUCCESS;
7,084✔
1126
}
1127

1128
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
7,234✔
1129
  int32_t                    code = TSDB_CODE_SUCCESS;
7,234✔
1130
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
7,234✔
1131
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
7,234✔
1132

1133
  QRY_PARAM_CHECK(pRes);
7,234!
1134
  if (pOperator->status == OP_EXEC_DONE) {
7,234✔
1135
    return code;
150✔
1136
  }
1137

1138
  int64_t st = 0;
7,084✔
1139
  if (pOperator->cost.openCost == 0) {
7,084✔
1140
    st = taosGetTimestampUs();
6,279✔
1141
  }
1142

1143
  if (!pStbJoin->ctx.prev.joinBuild) {
7,084✔
1144
    buildStbJoinTableList(pOperator);
6,279✔
1145
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
6,279✔
1146
      setOperatorCompleted(pOperator);
5,974✔
1147
      goto _return;
5,974✔
1148
    }
1149
  }
1150

1151
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
1,110!
1152
  if (*pRes) {
1,110!
1153
    goto _return;
×
1154
  }
1155

1156
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
1,110!
1157

1158
_return:
1,110✔
1159
  if (pOperator->cost.openCost == 0) {
7,084✔
1160
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
6,279✔
1161
  }
1162

1163
  if (code) {
7,084!
1164
    qError("%s failed since %s", __func__, tstrerror(code));
×
1165
    pOperator->pTaskInfo->code = code;
×
1166
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1167
  } else {
1168
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
7,084✔
1169
  }
1170
  return code;
7,084✔
1171
}
1172

1173
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes, uint64_t uid) {
16✔
1174
  int32_t                   code = TSDB_CODE_SUCCESS;
16✔
1175
  int32_t                   lino = 0;
16✔
1176
  SVTableScanOperatorParam* pVScan = NULL;
16✔
1177
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
16!
1178
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
16!
1179

1180
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
16✔
1181
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
16!
1182

1183
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
16!
1184
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno);
16!
1185
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
16✔
1186
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno);
16!
1187
  pVScan->uid = uid;
16✔
1188

1189
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
16✔
1190
  (*ppRes)->downstreamIdx = 0;
16✔
1191
  (*ppRes)->value = pVScan;
16✔
1192
  (*ppRes)->reUse = false;
16✔
1193

1194
  return TSDB_CODE_SUCCESS;
16✔
1195
_return:
×
1196
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1197
  if (pVScan) {
×
1198
    taosArrayDestroy(pVScan->pOpParamArray);
×
1199
    taosMemoryFreeClear(pVScan);
×
1200
  }
1201
  if (*ppRes) {
×
1202
    taosArrayDestroy((*ppRes)->pChildren);
×
1203
    taosMemoryFreeClear(*ppRes);
×
1204
  }
1205
  return code;
×
1206
}
1207

1208
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
4✔
1209
  int32_t                    lino = 0;
4✔
1210
  SOperatorInfo*             operator=(SOperatorInfo*) param;
4✔
1211
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
4✔
1212

1213
  if (TSDB_CODE_SUCCESS != code) {
4!
1214
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
1215
    if (operator->pTaskInfo->code != code) {
×
1216
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1217
             tstrerror(operator->pTaskInfo->code));
1218
    } else {
1219
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1220
    }
1221
    goto _return;
×
1222
  }
1223

1224
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
4!
1225
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno);
4!
1226

1227
  code = tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp);
4✔
1228
  QUERY_CHECK_CODE(code, lino, _return);
4!
1229

1230
  taosMemoryFreeClear(pMsg->pData);
4!
1231

1232
  code = tsem_post(&pScanResInfo->vtbScan.ready);
4✔
1233
  QUERY_CHECK_CODE(code, lino, _return);
4!
1234

1235
  return code;
4✔
1236
_return:
×
1237
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1238
  return code;
×
1239
}
1240

1241
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SMsgCb* pMsgCb, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
4✔
1242
  int32_t                    code = TSDB_CODE_SUCCESS;
4✔
1243
  int32_t                    lino = 0;
4✔
1244
  char*                      buf1 = NULL;
4✔
1245
  SUseDbReq*                 pReq = NULL;
4✔
1246
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
4✔
1247

1248
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
4!
1249
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
4!
1250
  code = tNameGetFullDbName(name, pReq->db);
4✔
1251
  QUERY_CHECK_CODE(code, lino, _return);
4!
1252
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
4✔
1253
  buf1 = taosMemoryCalloc(1, contLen);
4!
1254
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
4!
1255
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
4✔
1256
  if (tempRes < 0) {
4!
1257
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1258
  }
1259

1260
  // send the fetch remote task result request
1261
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
4!
1262
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
4!
1263

1264
  pMsgSendInfo->param = pOperator;
4✔
1265
  pMsgSendInfo->msgInfo.pData = buf1;
4✔
1266
  pMsgSendInfo->msgInfo.len = contLen;
4✔
1267
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
4✔
1268
  pMsgSendInfo->fp = dynProcessUseDbRsp;
4✔
1269
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
4✔
1270

1271
  code = asyncSendMsgToServer(pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo);
4✔
1272
  QUERY_CHECK_CODE(code, lino, _return);
4!
1273

1274
  code = tsem_wait(&pScanResInfo->vtbScan.ready);
4✔
1275
  QUERY_CHECK_CODE(code, lino, _return);
4!
1276

1277
  code = queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp);
4✔
1278
  QUERY_CHECK_CODE(code, lino, _return);
4!
1279

1280
_return:
4✔
1281
  if (code) {
4!
1282
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1283
     taosMemoryFree(buf1);
×
1284
  }
1285
  taosMemoryFree(pReq);
4!
1286
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
4✔
1287
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
4!
1288
  return code;
4✔
1289
}
1290

1291
int dynVgInfoComp(const void* lp, const void* rp) {
×
1292
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
×
1293
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
×
1294
  if (pLeft->hashBegin < pRight->hashBegin) {
×
1295
    return -1;
×
1296
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
1297
    return 1;
×
1298
  }
1299

1300
  return 0;
×
1301
}
1302

1303
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
16✔
1304
  if (NULL == dbInfo) {
16!
1305
    return TSDB_CODE_SUCCESS;
×
1306
  }
1307

1308
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
16!
1309
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
4✔
1310
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
4✔
1311
    if (NULL == dbInfo->vgArray) {
4!
1312
      return terrno;
×
1313
    }
1314

1315
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
4✔
1316
    while (pIter) {
8✔
1317
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
8!
1318
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
1319
        return terrno;
×
1320
      }
1321

1322
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
4✔
1323
    }
1324

1325
    taosArraySort(dbInfo->vgArray, sort_func);
4✔
1326
  }
1327

1328
  return TSDB_CODE_SUCCESS;
16✔
1329
}
1330

1331
int32_t dynHashValueComp(void const* lp, void const* rp) {
16✔
1332
  uint32_t*    key = (uint32_t*)lp;
16✔
1333
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
16✔
1334

1335
  if (*key < pVg->hashBegin) {
16!
1336
    return -1;
×
1337
  } else if (*key > pVg->hashEnd) {
16!
1338
    return 1;
×
1339
  }
1340

1341
  return 0;
16✔
1342
}
1343

1344
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
16✔
1345
  int32_t code = 0;
16✔
1346
  int32_t lino = 0;
16✔
1347
  code = dynMakeVgArraySortBy(dbInfo, dynVgInfoComp);
16✔
1348
  QUERY_CHECK_CODE(code, lino, _return);
16!
1349

1350
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
16✔
1351
  if (vgNum <= 0) {
16!
1352
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
1353
    QUERY_CHECK_CODE(code = TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1354
  }
1355

1356
  SVgroupInfo* vgInfo = NULL;
16✔
1357
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
1358
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
16✔
1359
  int32_t offset = (int32_t)strlen(tbFullName);
16✔
1360

1361
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
16✔
1362
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
32✔
1363
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
16!
1364

1365
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
16✔
1366
  if (NULL == vgInfo) {
16!
1367
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1368
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
1369
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1370
  }
1371

1372
  *vgId = vgInfo->vgId;
16✔
1373

1374
_return:
16✔
1375
  return code;
16✔
1376
}
1377

1378
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
48✔
1379
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
48✔
1380
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
48✔
1381
  SArray *                   pColList = pVtbScan->readColList;
48✔
1382
  if (pVtbScan->scanAllCols) {
48!
1383
    return true;
×
1384
  }
1385
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
144✔
1386
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
128✔
1387
      return true;
32✔
1388
    }
1389
  }
1390
  return false;
16✔
1391
}
1392

1393
int32_t getDbVgInfo(SOperatorInfo* pOperator, SName *name, SDBVgInfo **dbVgInfo) {
32✔
1394
  int32_t                    code = TSDB_CODE_SUCCESS;
32✔
1395
  int32_t                    line = 0;
32✔
1396
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
32✔
1397
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
32✔
1398
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
32✔
1399
  SMsgCb*                    pMsgCb = pVtbScan->pMsgCb;
32✔
1400
  SUseDbOutput*              output = NULL;
32✔
1401
  SUseDbOutput**             find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname));
32✔
1402

1403
  QRY_PARAM_CHECK(dbVgInfo);
32!
1404

1405
  if (find == NULL) {
32✔
1406
    output = taosMemoryMalloc(sizeof(SUseDbOutput));
4!
1407
    code = buildDbVgInfoMap(pOperator, pMsgCb, name, pTaskInfo, output);
4✔
1408
    QUERY_CHECK_CODE(code, line, _return);
4!
1409
    code = taosHashPut(pInfo->vtbScan.dbVgInfoMap, name->dbname, strlen(name->dbname), &output, POINTER_BYTES);
4✔
1410
    QUERY_CHECK_CODE(code, line, _return);
4!
1411
  } else {
1412
    output = *find;
28✔
1413
  }
1414

1415
  *dbVgInfo = output->dbVgroup;
32✔
1416
  return code;
32✔
1417
_return:
×
1418
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1419
  freeUseDbOutput(output);
×
1420
  return code;
×
1421
}
1422

1423
int32_t extractColRefName(const char *colref, char **refDb, char** refTb, char** refCol) {
32✔
1424
  int32_t     code = TSDB_CODE_SUCCESS;
32✔
1425
  int32_t     line = 0;
32✔
1426
  const char *dbname = NULL;
32✔
1427
  const char *tablename = NULL;
32✔
1428
  const char *colname = NULL;
32✔
1429

1430
  const char *first_dot = strchr(colref, '.');
32✔
1431
  QUERY_CHECK_NULL(first_dot, code, line, _return, terrno)
32!
1432

1433
  const char *second_dot = strchr(first_dot + 1, '.');
32✔
1434
  QUERY_CHECK_NULL(second_dot, code, line, _return, terrno)
32!
1435

1436
  size_t db_len = first_dot - colref;
32✔
1437
  size_t table_len = second_dot - first_dot - 1;
32✔
1438
  size_t col_len = strlen(second_dot + 1);
32✔
1439

1440
  *refDb = taosMemoryMalloc(db_len + 1);
32!
1441
  *refTb = taosMemoryMalloc(table_len + 1);
32!
1442
  *refCol = taosMemoryMalloc(col_len + 1);
32!
1443
  QUERY_CHECK_NULL(*refDb, code, line, _return, terrno)
32!
1444
  QUERY_CHECK_NULL(*refTb, code, line, _return, terrno)
32!
1445
  QUERY_CHECK_NULL(*refCol, code, line, _return, terrno)
32!
1446

1447
  tstrncpy(*refDb, colref, db_len + 1);
32✔
1448
  tstrncpy(*refTb, first_dot + 1, table_len + 1);
32✔
1449
  tstrncpy(*refCol, second_dot + 1, col_len + 1);
32✔
1450

1451
  return TSDB_CODE_SUCCESS;
32✔
1452
_return:
×
1453
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1454
  if (*refDb) {
×
1455
    taosMemoryFree(*refDb);
×
1456
    *refDb = NULL;
×
1457
  }
1458
  if (*refTb) {
×
1459
    taosMemoryFree(*refTb);
×
1460
    *refTb = NULL;
×
1461
  }
1462
  if (*refCol) {
×
1463
    taosMemoryFree(*refCol);
×
1464
    *refCol = NULL;
×
1465
  }
1466
  return code;
×
1467
}
1468

1469
int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
16✔
1470
  int32_t                    code = TSDB_CODE_SUCCESS;
16✔
1471
  int32_t                    line = 0;
16✔
1472
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
16✔
1473
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
16✔
1474
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
16✔
1475
  SDBVgInfo*                 dbVgInfo = NULL;
16✔
1476
  bool                       readerInit = false;
16✔
1477
  SArray*                    pColRefArray = NULL;
16✔
1478

1479
  QRY_PARAM_CHECK(pRes);
16!
1480
  if (pOperator->status == OP_EXEC_DONE) {
16!
1481
    return code;
×
1482
  }
1483

1484
  int64_t st = 0;
16✔
1485
  if (pOperator->cost.openCost == 0) {
16✔
1486
    st = taosGetTimestampUs();
4✔
1487
  }
1488

1489
  if (pVtbScan->childTableMap == NULL) {
16✔
1490
    pVtbScan->childTableMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
4✔
1491
    QUERY_CHECK_NULL(pVtbScan->childTableMap, code, line, _return, terrno);
4!
1492

1493

1494
    while (true) {
28✔
1495
      SSDataBlock *pChildInfo = NULL;
32✔
1496
      code = pOperator->pDownstream[1]->fpSet.getNextFn(pOperator->pDownstream[1], &pChildInfo);
32✔
1497
      if (pChildInfo == NULL) {
32✔
1498
        break;
4✔
1499
      }
1500
      SColumnInfoData *pTableNameCol = taosArrayGet(pChildInfo->pDataBlock, 0);
28✔
1501
      SColumnInfoData *pStbNameCol = taosArrayGet(pChildInfo->pDataBlock, 1);
28✔
1502
      SColumnInfoData *pDbNameCol = taosArrayGet(pChildInfo->pDataBlock, 2);
28✔
1503
      SColumnInfoData *pColNameCol = taosArrayGet(pChildInfo->pDataBlock, 3);
28✔
1504
      SColumnInfoData *pUidCol = taosArrayGet(pChildInfo->pDataBlock, 4);
28✔
1505
      SColumnInfoData *pColIdCol = taosArrayGet(pChildInfo->pDataBlock, 5);
28✔
1506
      SColumnInfoData *pRefCol = taosArrayGet(pChildInfo->pDataBlock, 6);
28✔
1507
      SColumnInfoData *pVgIdCol = taosArrayGet(pChildInfo->pDataBlock, 7);
28✔
1508

1509
      for (int32_t i = 0; i < pChildInfo->info.rows; i++) {
292✔
1510
        if (!colDataIsNull_s(pStbNameCol, i)) {
528!
1511
          char* stbrawname = colDataGetData(pStbNameCol, i);
264!
1512
          char* dbrawname = colDataGetData(pDbNameCol, i);
264!
1513

1514
          if (strncmp(varDataVal(stbrawname), pInfo->vtbScan.stbName, varDataLen(stbrawname)) == 0 &&
264✔
1515
              strlen(pInfo->vtbScan.stbName) == varDataLen(stbrawname) &&
216!
1516
              strncmp(varDataVal(dbrawname), pInfo->vtbScan.dbName, varDataLen(dbrawname)) == 0 &&
216✔
1517
              strlen(pInfo->vtbScan.dbName) == varDataLen(dbrawname)) {
32!
1518
            char *ctbName = colDataGetData(pTableNameCol, i);
32!
1519
            SColRefKV kv = {0};
32✔
1520
            if (colDataIsNull_s(pRefCol, i)) {
64!
1521
              kv.colrefName = NULL;
8✔
1522
            } else {
1523
              kv.colrefName = taosMemoryCalloc(varDataTLen(colDataGetData(pRefCol, i)), 1);
24!
1524
              QUERY_CHECK_NULL(kv.colrefName, code, line, _return, terrno);
24!
1525
              memcpy(kv.colrefName, varDataVal(colDataGetData(pRefCol, i)), varDataLen(colDataGetData(pRefCol, i)));
24!
1526
              kv.colrefName[varDataLen(colDataGetData(pRefCol, i))] = 0;
24!
1527
            }
1528

1529
            kv.colName = taosMemoryCalloc(varDataTLen(colDataGetData(pColNameCol, i)), 1);
32!
1530
            QUERY_CHECK_NULL(kv.colName, code, line, _return, terrno);
32!
1531
            memcpy(kv.colName, varDataVal(colDataGetData(pColNameCol, i)), varDataLen(colDataGetData(pColNameCol, i)));
32!
1532
            kv.colName[varDataLen(colDataGetData(pColNameCol, i))] = 0;
32!
1533
            if (!colDataIsNull_s(pUidCol, i)) {
64!
1534
              GET_TYPED_DATA(kv.uid, int64_t, TSDB_DATA_TYPE_BIGINT, colDataGetNumData(pUidCol, i), 0);
32✔
1535
            }
1536
            if (!colDataIsNull_s(pColIdCol, i)) {
64!
1537
              GET_TYPED_DATA(kv.colId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pColIdCol, i), 0);
24✔
1538
            }
1539
            if (!colDataIsNull_s(pVgIdCol, i)) {
64!
1540
              GET_TYPED_DATA(kv.vgId, int32_t, TSDB_DATA_TYPE_INT, colDataGetNumData(pVgIdCol, i), 0);
32✔
1541
            }
1542
            if (pInfo->vtbScan.dynTbUid != 0 && kv.uid != pInfo->vtbScan.dynTbUid) {
32!
1543
              continue;
×
1544
            }
1545
            if (taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName)) == NULL) {
32✔
1546
              pColRefArray = taosArrayInit(1, sizeof(SColRefKV));
8✔
1547
              QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno);
8!
1548
              QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &kv), code, line, _return, terrno);
16!
1549
              int32_t tableIdx = (int32_t)taosArrayGetSize(pVtbScan->childTableList);
8✔
1550
              QUERY_CHECK_NULL(taosArrayPush(pVtbScan->childTableList, &pColRefArray), code, line, _return, terrno);
16!
1551
              code = taosHashPut(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName), &tableIdx, sizeof(tableIdx));
8✔
1552
              QUERY_CHECK_CODE(code, line, _return);
8!
1553
            } else {
1554
              int32_t *tableIdx = (int32_t*)taosHashGet(pVtbScan->childTableMap, varDataVal(ctbName), varDataLen(ctbName));
24✔
1555
              QUERY_CHECK_NULL(tableIdx, code, line, _return, terrno);
24!
1556
              pColRefArray = (SArray *)taosArrayGetP(pVtbScan->childTableList, *tableIdx);
24✔
1557
              QUERY_CHECK_NULL(pColRefArray, code, line, _return, terrno);
24!
1558
              QUERY_CHECK_NULL(taosArrayPush(pColRefArray, &kv), code, line, _return, terrno);
48!
1559
            }
1560
          }
1561
        }
1562
      }
1563
    }
1564
  }
1565

1566
  size_t num = taosHashGetSize(pVtbScan->childTableMap);
16✔
1567

1568
  // no child table, return
1569
  if (num == 0) {
16!
1570
    setOperatorCompleted(pOperator);
×
1571
    return code;
×
1572
  }
1573

1574
  pVtbScan->orgTbVgColMap = taosHashInit(num * 64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
16✔
1575
  QUERY_CHECK_NULL(pVtbScan->orgTbVgColMap, code, line, _return, terrno);
16!
1576
  taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
16✔
1577

1578
  while (true) {
1579
    if (pVtbScan->curTableIdx == pVtbScan->lastTableIdx) {
24✔
1580
      code = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes);
8✔
1581
      QUERY_CHECK_CODE(code, line, _return);
8!
1582
    } else {
1583
      taosHashClear(pVtbScan->orgTbVgColMap);
16✔
1584
      SArray* pColMap = (SArray*)taosArrayGetP(pVtbScan->childTableList, pVtbScan->curTableIdx);
16✔
1585
      QUERY_CHECK_NULL(pColMap, code, line, _return, terrno);
16!
1586
      tb_uid_t uid = 0;
16✔
1587
      int32_t  vgId = 0;
16✔
1588
      for (int32_t j = 0; j < taosArrayGetSize(pColMap); j++) {
80✔
1589
        SColRefKV *pKV = (SColRefKV*)taosArrayGet(pColMap, j);
64✔
1590
        uid = pKV->uid;
64✔
1591
        vgId = pKV->vgId;
64✔
1592
        if (pKV->colrefName != NULL && colNeedScan(pOperator, pKV->colId)) {
64✔
1593
          char*   refDbName = NULL;
32✔
1594
          char*   refTbName = NULL;
32✔
1595
          char*   refColName = NULL;
32✔
1596
          SName   name = {0};
32✔
1597
          char    dbFname[TSDB_DB_FNAME_LEN] = {0};
32✔
1598
          char    orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
32✔
1599

1600
          code = extractColRefName(pKV->colrefName, &refDbName, &refTbName, &refColName);
32✔
1601
          QUERY_CHECK_CODE(code, line, _return);
32!
1602

1603
          toName(pInfo->vtbScan.acctId, refDbName, refTbName, &name);
32✔
1604

1605
          code = getDbVgInfo(pOperator, &name, &dbVgInfo);
32✔
1606
          QUERY_CHECK_CODE(code, line, _return);
32!
1607
          tNameGetFullDbName(&name, dbFname);
32✔
1608
          QUERY_CHECK_CODE(code, line, _return);
32!
1609
          tNameGetFullTableName(&name, orgTbFName);
32✔
1610
          QUERY_CHECK_CODE(code, line, _return);
32!
1611

1612
          void *pVal = taosHashGet(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
32✔
1613
          if (!pVal) {
32✔
1614
            SOrgTbInfo map = {0};
16✔
1615
            code = getVgId(dbVgInfo, dbFname, &map.vgId, name.tname);
16✔
1616
            QUERY_CHECK_CODE(code, line, _return);
16!
1617
            tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
16✔
1618
            map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
16✔
1619
            QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno);
16!
1620
            SColIdNameKV colIdNameKV = {0};
16✔
1621
            colIdNameKV.colId = pKV->colId; // TODO(smj) : select ref colid from ins_cols;
16✔
1622
            tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
16✔
1623
            QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno);
32!
1624
            code = taosHashPut(pVtbScan->orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map));
16✔
1625
            QUERY_CHECK_CODE(code, line, _return);
16!
1626
          } else {
1627
            SOrgTbInfo *tbInfo = (SOrgTbInfo *)pVal;
16✔
1628
            SColIdNameKV colIdNameKV = {0};
16✔
1629
            colIdNameKV.colId = pKV->colId;
16✔
1630
            tstrncpy(colIdNameKV.colName, refColName, sizeof(colIdNameKV.colName));
16✔
1631
            QUERY_CHECK_NULL(taosArrayPush(tbInfo->colMap, &colIdNameKV), code, line, _return, terrno);
32!
1632
          }
1633
          taosMemoryFree(refDbName);
32!
1634
          taosMemoryFree(refTbName);
32!
1635
          taosMemoryFree(refColName);
32!
1636
        }
1637
      }
1638

1639
      pVtbScan->vtbScanParam = NULL;
16✔
1640
      code = buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam, uid);
16✔
1641
      QUERY_CHECK_CODE(code, line, _return);
16!
1642

1643
      void* pIter = taosHashIterate(pVtbScan->orgTbVgColMap, NULL);
16✔
1644
      while (pIter != NULL) {
32✔
1645
        SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
16✔
1646
        SOperatorParam*  pExchangeParam = NULL;
16✔
1647
        code = buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap);
16✔
1648
        QUERY_CHECK_CODE(code, line, _return);
16!
1649
        QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno);
32!
1650
        pIter = taosHashIterate(pVtbScan->orgTbVgColMap, pIter);
16✔
1651
      }
1652

1653
      SOperatorParam*  pExchangeParam = NULL;
16✔
1654
      code = buildExchangeOperatorParamForVTagScan(&pExchangeParam, 0, vgId, uid);
16✔
1655
      QUERY_CHECK_CODE(code, line, _return);
16!
1656
      ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pTagScanOp = pExchangeParam;
16✔
1657

1658
      // reset downstream operator's status
1659
      pOperator->pDownstream[0]->status = OP_NOT_OPENED;
16✔
1660
      code = pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes);
16✔
1661
      QUERY_CHECK_CODE(code, line, _return);
16!
1662
    }
1663

1664
    if (*pRes) {
24✔
1665
      // has result, still read data from this table.
1666
      pVtbScan->lastTableIdx = pVtbScan->curTableIdx;
8✔
1667
      break;
8✔
1668
    } else {
1669
      // no result, read next table.
1670
      pVtbScan->curTableIdx++;
16✔
1671
      if (pVtbScan->curTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
16✔
1672
        setOperatorCompleted(pOperator);
8✔
1673
        break;
8✔
1674
      }
1675
    }
1676
  }
1677

1678
_return:
16✔
1679
  taosHashCleanup(pVtbScan->orgTbVgColMap);
16✔
1680
  pVtbScan->orgTbVgColMap = NULL;
16✔
1681
  if (pOperator->cost.openCost == 0) {
16✔
1682
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
4✔
1683
  }
1684

1685
  if (code) {
16!
1686
    qError("%s failed since %s", __func__, tstrerror(code));
×
1687
    pOperator->pTaskInfo->code = code;
×
1688
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1689
  }
1690

1691
  return code;
16✔
1692
}
1693

1694
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
6,279✔
1695
  if (batchFetch) {
6,279✔
1696
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,270✔
1697
    if (NULL == pPrev->leftHash) {
6,270!
1698
      return terrno;
×
1699
    }
1700
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
6,270✔
1701
    if (NULL == pPrev->rightHash) {
6,270!
1702
      return terrno;
×
1703
    }
1704
  } else {
1705
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
9✔
1706
    if (NULL == pPrev->leftCache) {
9!
1707
      return terrno;
×
1708
    }
1709
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
9✔
1710
    if (NULL == pPrev->rightCache) {
9!
1711
      return terrno;
×
1712
    }
1713
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
9✔
1714
    if (NULL == pPrev->onceTable) {
9!
1715
      return terrno;
×
1716
    }
1717
  }
1718

1719
  return TSDB_CODE_SUCCESS;
6,279✔
1720
}
1721

1722
static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorInfo* pInfo, SMsgCb* pMsgCb,
4✔
1723
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
1724
  int32_t      code = TSDB_CODE_SUCCESS;
4✔
1725
  int32_t      line = 0;
4✔
1726

1727
  code = tsem_init(&pInfo->vtbScan.ready, 0, 0);
4✔
1728
  QUERY_CHECK_CODE(code, line, _return);
4!
1729

1730
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
4✔
1731
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
4✔
1732
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
4✔
1733
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
4✔
1734
  pInfo->vtbScan.pMsgCb = pMsgCb;
4✔
1735
  pInfo->vtbScan.curTableIdx = 0;
4✔
1736
  pInfo->vtbScan.lastTableIdx = -1;
4✔
1737
  pInfo->vtbScan.dynTbUid = 0;
4✔
1738
  pInfo->vtbScan.dbName = taosStrdup(pPhyciNode->vtbScan.dbName);
4!
1739
  pInfo->vtbScan.stbName = taosStrdup(pPhyciNode->vtbScan.stbName);
4!
1740
  QUERY_CHECK_NULL(pInfo->vtbScan.dbName, code, line, _return, terrno);
4!
1741
  QUERY_CHECK_NULL(pInfo->vtbScan.stbName, code, line, _return, terrno);
4!
1742

1743
  if (pPhyciNode->dynTbname) {
4!
1744
    SArray* vals = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamPartColVals;
×
1745
    for (int32_t i = 0; i < taosArrayGetSize(vals); ++i) {
×
1746
      SStreamGroupValue* pValue = taosArrayGet(vals, i);
×
1747
      if (pValue != NULL && pValue->isTbname) {
×
1748
        pInfo->vtbScan.dynTbUid = pValue->uid;
×
1749
        break;
×
1750
      }
1751
    }
1752
  }
1753

1754
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
4!
1755
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno);
4!
1756

1757
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
20!
1758
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
16✔
1759
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno);
16!
1760
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno);
32!
1761
  }
1762

1763
  pInfo->vtbScan.childTableList = taosArrayInit(10, POINTER_BYTES);
4✔
1764
  QUERY_CHECK_NULL(pInfo->vtbScan.childTableList, code, line, _return, terrno);
4!
1765

1766
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
4✔
1767
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno);
4!
1768

1769
  return code;
4✔
1770
_return:
×
1771
  // no need to destroy array and hashmap allocated in this function,
1772
  // since the operator's destroy function will take care of it
1773
  qError("%s failed at line %d since %s", __func__, line, tstrerror(code));
×
1774
  return code;
×
1775
}
1776

1777
static int32_t resetDynQueryCtrlOperState(SOperatorInfo* pOper) {
8✔
1778
  SDynQueryCtrlOperatorInfo* pDyn = pOper->info;
8✔
1779
  pOper->status = OP_NOT_OPENED;
8✔
1780

1781
  switch (pDyn->qType) {
8!
1782
    case DYN_QTYPE_STB_HASH:{
×
1783
      pDyn->stbJoin.execInfo = (SDynQueryCtrlExecInfo){0};
×
1784
      SStbJoinDynCtrlInfo* pStbJoin = &pDyn->stbJoin;
×
1785
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
×
1786
      
1787
      int32_t code = initSeqStbJoinTableHash(&pDyn->stbJoin.ctx.prev, pDyn->stbJoin.basic.batchFetch);
×
1788
      if (TSDB_CODE_SUCCESS != code) {
×
1789
        qError("initSeqStbJoinTableHash failed since %s", tstrerror(code));
×
1790
        return code;
×
1791
      }
1792
      pStbJoin->ctx.prev.pListHead = NULL;
×
1793
      pStbJoin->ctx.prev.joinBuild = false;
×
1794
      pStbJoin->ctx.prev.pListTail = NULL;
×
1795
      pStbJoin->ctx.prev.tableNum = 0;
×
1796

1797
      pStbJoin->ctx.post = (SStbJoinPostJoinCtx){0};
×
1798
      break; 
×
1799
    }
1800
    case DYN_QTYPE_VTB_SCAN: {
8✔
1801
      SVtbScanDynCtrlInfo* pVtbScan = &pDyn->vtbScan;
8✔
1802
      
1803
      if (pVtbScan->orgTbVgColMap) {
8!
1804
        taosHashSetFreeFp(pVtbScan->orgTbVgColMap, destroyOrgTbInfo);
×
1805
        taosHashCleanup(pVtbScan->orgTbVgColMap);
×
1806
        pVtbScan->orgTbVgColMap = NULL;
×
1807
      }
1808
      if (pVtbScan->pRsp) {
8!
1809
        tFreeSUsedbRsp(pVtbScan->pRsp);
×
1810
        taosMemoryFreeClear(pVtbScan->pRsp);
×
1811
      }
1812

1813
      pVtbScan->curTableIdx = 0;
8✔
1814
      pVtbScan->lastTableIdx = -1;
8✔
1815
      break;
8✔
1816
    }
1817
    default:
×
1818
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
1819
      break;
×
1820
  }
1821
  return 0;
8✔
1822
}
1823

1824
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
6,283✔
1825
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1826
                                       SMsgCb* pMsgCb, SOperatorInfo** pOptrInfo) {
1827
  QRY_PARAM_CHECK(pOptrInfo);
6,283!
1828

1829
  int32_t                    code = TSDB_CODE_SUCCESS;
6,283✔
1830
  int32_t                    line = 0;
6,283✔
1831
  __optr_fn_t                nextFp = NULL;
6,283✔
1832
  SOperatorInfo*             pOperator = NULL;
6,283✔
1833
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
6,283!
1834
  if (pInfo == NULL) {
6,283!
1835
    code = terrno;
×
1836
    goto _error;
×
1837
  }
1838

1839
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
6,283!
1840
  if (pOperator == NULL) {
6,283!
1841
    code = terrno;
×
1842
    goto _error;
×
1843
  }
1844

1845
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
6,283✔
1846

1847
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
6,283✔
1848
  if (TSDB_CODE_SUCCESS != code) {
6,283!
1849
    goto _error;
×
1850
  }
1851

1852
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
6,283✔
1853
                  pInfo, pTaskInfo);
1854

1855
  pInfo->qType = pPhyciNode->qType;
6,283✔
1856
  switch (pInfo->qType) {
6,283!
1857
    case DYN_QTYPE_STB_HASH:
6,279✔
1858
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
6,279✔
1859
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
6,279✔
1860
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
6,279✔
1861
      if (TSDB_CODE_SUCCESS != code) {
6,279!
1862
        goto _error;
×
1863
      }
1864
      nextFp = seqStableJoin;
6,279✔
1865
      break;
6,279✔
1866
    case DYN_QTYPE_VTB_SCAN:
4✔
1867
      code = initVtbScanInfo(pOperator, pInfo, pMsgCb, pPhyciNode, pTaskInfo);
4✔
1868
      QUERY_CHECK_CODE(code, line, _error);
4!
1869
      nextFp = vtbScan;
4✔
1870
      break;
4✔
1871
    default:
×
1872
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1873
      code = TSDB_CODE_INVALID_PARA;
×
1874
      goto _error;
×
1875
  }
1876

1877
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
6,283✔
1878
                                         NULL, optrDefaultGetNextExtFn, NULL);
1879

1880
  setOperatorResetStateFn(pOperator, resetDynQueryCtrlOperState);
6,283✔
1881
  *pOptrInfo = pOperator;
6,283✔
1882
  return TSDB_CODE_SUCCESS;
6,283✔
1883

1884
_error:
×
1885
  if (pInfo != NULL) {
×
1886
    destroyDynQueryCtrlOperator(pInfo);
×
1887
  }
1888

1889
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1890
  pTaskInfo->code = code;
×
1891
  return code;
×
1892
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc