• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.09
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "nodes.h"
19
#include "operator.h"
20
#include "os.h"
21
#include "plannodes.h"
22
#include "query.h"
23
#include "querynodes.h"
24
#include "querytask.h"
25
#include "tarray.h"
26
#include "tcompare.h"
27
#include "tdatablock.h"
28
#include "thash.h"
29
#include "tmsg.h"
30
#include "trpc.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
11,035✔
41
  SStbJoinTableList* pNext = NULL;
11,035✔
42
  
43
  while (pListHead) {
11,036✔
44
    taosMemoryFree(pListHead->pLeftVg);
1!
45
    taosMemoryFree(pListHead->pLeftUid);
1!
46
    taosMemoryFree(pListHead->pRightVg);
1!
47
    taosMemoryFree(pListHead->pRightUid);
1!
48
    pNext = pListHead->pNext;
1✔
49
    taosMemoryFree(pListHead);
1!
50
    pListHead = pNext;
1✔
51
  }
52
}
11,035✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
11,035✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
11,035✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
11,035✔
60
    if (pStbJoin->ctx.prev.leftHash) {
11,023✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
10,720✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
10,720✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
11,023✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
10,720✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
10,720✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
12!
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
12✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
12!
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
12✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
12!
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
12✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
11,035✔
81
}
11,035✔
82

UNCOV
83
void freeUseDbOutput(void* pOutput) {
×
UNCOV
84
  SUseDbOutput *pOut = *(SUseDbOutput**)pOutput;
×
UNCOV
85
  if (NULL == pOutput) {
×
UNCOV
86
    return;
×
87
  }
88

UNCOV
89
  if (pOut->dbVgroup) {
×
90
    freeVgInfo(pOut->dbVgroup);
×
91
  }
92
  taosMemFree(pOut);
×
93
}
94

UNCOV
95
static void destroyVtbScanDynCtrlInfo(SVtbScanDynCtrlInfo* pVtbScan) {
×
UNCOV
96
  if (pVtbScan->childTableList) {
×
UNCOV
97
    taosArrayDestroy(pVtbScan->childTableList);
×
98
  }
UNCOV
99
  if (pVtbScan->readColList) {
×
UNCOV
100
    taosArrayDestroy(pVtbScan->readColList);
×
101
  }
UNCOV
102
  if (pVtbScan->dbVgInfoMap) {
×
UNCOV
103
    taosHashSetFreeFp(pVtbScan->dbVgInfoMap, freeUseDbOutput);
×
UNCOV
104
    taosHashCleanup(pVtbScan->dbVgInfoMap);
×
105
  }
UNCOV
106
  if (pVtbScan->pRsp) {
×
UNCOV
107
    tFreeSUsedbRsp(pVtbScan->pRsp);
×
UNCOV
108
    taosMemoryFreeClear(pVtbScan->pRsp);
×
109
  }
UNCOV
110
}
×
111

112
static void destroyDynQueryCtrlOperator(void* param) {
11,035✔
113
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
11,035✔
114

115
  switch (pDyn->qType) {
11,035!
116
    case DYN_QTYPE_STB_HASH:
11,035✔
117
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
11,035✔
118
      break;
11,035✔
UNCOV
119
    case DYN_QTYPE_VTB_SCAN:
×
UNCOV
120
      destroyVtbScanDynCtrlInfo(&pDyn->vtbScan);
×
UNCOV
121
      break;
×
UNCOV
122
    default:
×
UNCOV
123
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
UNCOV
124
      break;
×
125
  }
126

127
  taosMemoryFreeClear(param);
11,035!
128
}
11,035✔
129

130
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
131
  if (batchFetch) {
42,082✔
132
    return true;
42,034✔
133
  }
134
  
135
  if (rightTable) {
48!
136
    return pPost->rightCurrUid == pPost->rightNextUid;
24✔
137
  }
138

139
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
24✔
140

141
  return (NULL == num) ? false : true;
24✔
142
}
143

144
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
21,041✔
145
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
21,041✔
146
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
21,041✔
147
  SStbJoinTableList*         pNode = pPrev->pListHead;
21,041✔
148
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
21,041✔
149
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
21,041✔
150
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
21,041✔
151
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
21,041✔
152
  int64_t                    readIdx = pNode->readIdx + 1;
21,041✔
153
  int64_t                    rightPrevUid = pPost->rightCurrUid;
21,041✔
154

155
  pPost->leftCurrUid = *leftUid;
21,041✔
156
  pPost->rightCurrUid = *rightUid;
21,041✔
157

158
  pPost->leftVgId = *leftVgId;
21,041✔
159
  pPost->rightVgId = *rightVgId;
21,041✔
160

161
  while (true) {
162
    if (readIdx < pNode->uidNum) {
21,041✔
163
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
20,727✔
164
      break;
20,727✔
165
    }
166
    
167
    pNode = pNode->pNext;
314✔
168
    if (NULL == pNode) {
314!
169
      pPost->rightNextUid = 0;
314✔
170
      break;
314✔
171
    }
172
    
UNCOV
173
    rightUid = pNode->pRightUid;
×
UNCOV
174
    readIdx = 0;
×
175
  }
176

177
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
21,041✔
178
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
21,041✔
179

180
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
21,041!
UNCOV
181
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
UNCOV
182
    pStbJoin->execInfo.rightCacheNum++;
×
183
  }  
184

185
  return TSDB_CODE_SUCCESS;
21,041✔
186
}
187

188

189
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
42,082✔
190
  int32_t code = TSDB_CODE_SUCCESS;
42,082✔
191
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
42,082!
192
  if (NULL == *ppRes) {
42,082!
UNCOV
193
    code = terrno;
×
UNCOV
194
    freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
195
    return code;
×
196
  }
197
  if (pChild) {
42,082✔
198
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
654✔
199
    if (NULL == (*ppRes)->pChildren) {
654!
UNCOV
200
      code = terrno;
×
UNCOV
201
      freeOperatorParam(pChild, OP_GET_PARAM);
×
UNCOV
202
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
203
      *ppRes = NULL;
×
UNCOV
204
      return code;
×
205
    }
206
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
1,308!
207
      code = terrno;
×
208
      freeOperatorParam(pChild, OP_GET_PARAM);
×
209
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
210
      *ppRes = NULL;
×
211
      return code;
×
212
    }
213
  } else {
214
    (*ppRes)->pChildren = NULL;
41,428✔
215
  }
216

217
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
42,082!
218
  if (NULL == pGc) {
42,082!
219
    code = terrno;
×
UNCOV
220
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
221
    *ppRes = NULL;
×
222
    return code;
×
223
  }
224

225
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
42,082✔
226
  pGc->downstreamIdx = downstreamIdx;
42,082✔
227
  pGc->vgId = vgId;
42,082✔
228
  pGc->tbUid = tbUid;
42,082✔
229
  pGc->needCache = needCache;
42,082✔
230

231
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
42,082✔
232
  (*ppRes)->downstreamIdx = downstreamIdx;
42,082✔
233
  (*ppRes)->value = pGc;
42,082✔
234
  (*ppRes)->reUse = false;
42,082✔
235

236
  return TSDB_CODE_SUCCESS;
42,082✔
237
}
238

239

UNCOV
240
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
UNCOV
241
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
242
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
243
  if (NULL == *ppRes) {
×
UNCOV
244
    return terrno;
×
245
  }
UNCOV
246
  (*ppRes)->pChildren = NULL;
×
247

UNCOV
248
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
UNCOV
249
  if (NULL == pGc) {
×
UNCOV
250
    code = terrno;
×
UNCOV
251
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
252
    return code;
×
253
  }
254

UNCOV
255
  pGc->downstreamIdx = downstreamIdx;
×
256
  pGc->vgId = vgId;
×
257
  pGc->tbUid = tbUid;
×
258

UNCOV
259
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
UNCOV
260
  (*ppRes)->downstreamIdx = downstreamIdx;
×
UNCOV
261
  (*ppRes)->value = pGc;
×
UNCOV
262
  (*ppRes)->reUse = false;
×
263

UNCOV
264
  return TSDB_CODE_SUCCESS;
×
265
}
266

267

268
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, const int32_t* pVgId, int64_t* pUid) {
48✔
269
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
48!
270
  if (NULL == *ppRes) {
48!
UNCOV
271
    return terrno;
×
272
  }
273
  (*ppRes)->pChildren = NULL;
48✔
274
  
275
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
48!
276
  if (NULL == pExc) {
48!
UNCOV
277
    return terrno;
×
278
  }
279

280
  pExc->multiParams = false;
48✔
281
  pExc->basic.vgId = *pVgId;
48✔
282
  pExc->basic.tableSeq = true;
48✔
283
  pExc->basic.isVtbRefScan = false;
48✔
284
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
48✔
285
  pExc->basic.colMap = NULL;
48✔
286
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
48✔
287
  if (NULL == pExc->basic.uidList) {
48!
UNCOV
288
    taosMemoryFree(pExc);
×
UNCOV
289
    return terrno;
×
290
  }
291
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
96!
UNCOV
292
    taosArrayDestroy(pExc->basic.uidList);
×
UNCOV
293
    taosMemoryFree(pExc);
×
UNCOV
294
    return terrno;
×
295
  }
296

297
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
48✔
298
  (*ppRes)->downstreamIdx = downstreamIdx;
48✔
299
  (*ppRes)->value = pExc;
48✔
300
  (*ppRes)->reUse = false;
48✔
301

302
  return TSDB_CODE_SUCCESS;
48✔
303
}
304

305
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
557✔
306
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
557!
307
  if (NULL == *ppRes) {
557!
UNCOV
308
    return terrno;
×
309
  }
310
  (*ppRes)->pChildren = NULL;
557✔
311
  
312
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
557!
313
  if (NULL == pExc) {
557!
UNCOV
314
    taosMemoryFreeClear(*ppRes);
×
UNCOV
315
    return terrno;
×
316
  }
317

318
  pExc->multiParams = true;
557✔
319
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
557✔
320
  if (NULL == pExc->pBatchs) {
557!
321
    taosMemoryFree(pExc);
×
322
    taosMemoryFreeClear(*ppRes);
×
UNCOV
323
    return terrno;
×
324
  }
325
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
557✔
326
  
327
  SExchangeOperatorBasicParam basic;
328
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
557✔
329

330
  int32_t iter = 0;
557✔
331
  void* p = NULL;
557✔
332
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
1,483✔
333
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
926✔
334
    SArray* pUidList = *(SArray**)p;
926✔
335
    basic.vgId = *pVgId;
926✔
336
    basic.uidList = pUidList;
926✔
337
    basic.colMap = NULL;
926✔
338
    basic.tableSeq = false;
926✔
339
    basic.isVtbRefScan = false;
926✔
340
    
341
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
926!
342

343
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
926!
344
    *(SArray**)p = NULL;
926✔
345
  }
346

347
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
557✔
348
  (*ppRes)->downstreamIdx = downstreamIdx;
557✔
349
  (*ppRes)->value = pExc;
557✔
350
  (*ppRes)->reUse = false;
557✔
351

352
  return TSDB_CODE_SUCCESS;
557✔
353
}
354

UNCOV
355
static int32_t buildExchangeOperatorParamForVScan(SOperatorParam** ppRes, int32_t downstreamIdx, SOrgTbInfo* pMap) {
×
UNCOV
356
  int32_t                      code = TSDB_CODE_SUCCESS;
×
UNCOV
357
  int32_t                      lino = 0;
×
UNCOV
358
  SExchangeOperatorParam*      pExc = NULL;
×
UNCOV
359
  SExchangeOperatorBasicParam* basic = NULL;
×
360

UNCOV
361
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
362
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
363
  (*ppRes)->pChildren = NULL;
×
364

365
  pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
366
  QUERY_CHECK_NULL(pExc, code, lino, _return, terrno);
×
367

368
  pExc->multiParams = false;
×
369

370
  basic = &pExc->basic;
×
UNCOV
371
  basic->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
372

373
  basic->vgId = pMap->vgId;
×
374
  basic->tableSeq = false;
×
375
  basic->isVtbRefScan = true;
×
376
  basic->colMap = taosMemoryMalloc(sizeof(SOrgTbInfo));
×
377
  QUERY_CHECK_NULL(basic->colMap, code, lino, _return, terrno);
×
378
  basic->colMap->vgId = pMap->vgId;
×
UNCOV
379
  tstrncpy(basic->colMap->tbName, pMap->tbName, TSDB_TABLE_FNAME_LEN);
×
380
  basic->colMap->colMap = taosArrayDup(pMap->colMap, NULL);
×
381
  QUERY_CHECK_NULL(basic->colMap->colMap, code, lino, _return, terrno);
×
382

383
  basic->uidList = taosArrayInit(1, sizeof(int64_t));
×
384
  QUERY_CHECK_NULL(basic->uidList, code, lino, _return, terrno);
×
385

386
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
UNCOV
387
  (*ppRes)->downstreamIdx = downstreamIdx;
×
388
  (*ppRes)->value = pExc;
×
389
  (*ppRes)->reUse = true;
×
390

391
  return TSDB_CODE_SUCCESS;
×
392

393
_return:
×
UNCOV
394
  qError("failed to build exchange operator param for vscan, code:%d", code);
×
UNCOV
395
  taosMemoryFreeClear(*ppRes);
×
396
  if (basic) {
×
397
    if (basic->colMap) {
×
UNCOV
398
      taosArrayDestroy(basic->colMap->colMap);
×
399
      taosMemoryFreeClear(basic->colMap);
×
400
    }
UNCOV
401
    if (basic->uidList) {
×
UNCOV
402
      taosArrayDestroy(basic->uidList);
×
403
    }
UNCOV
404
    taosMemoryFreeClear(basic);
×
405
  }
UNCOV
406
  taosMemoryFreeClear(pExc);
×
UNCOV
407
  return code;
×
408
}
409

410
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
21,041✔
411
  int32_t code = TSDB_CODE_SUCCESS;
21,041✔
412
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
21,041!
413
  if (NULL == *ppRes) {
21,041!
UNCOV
414
    code = terrno;
×
UNCOV
415
    return code;
×
416
  }
417
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
21,041✔
418
  if (NULL == (*ppRes)->pChildren) {
21,041!
UNCOV
419
    code = terrno;
×
420
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
421
    *ppRes = NULL;
×
UNCOV
422
    return code;
×
423
  }
424
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
42,082!
UNCOV
425
    code = terrno;
×
UNCOV
426
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
427
    *ppRes = NULL;
×
UNCOV
428
    return code;
×
429
  }
430
  *ppChild0 = NULL;
21,041✔
431
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
42,082!
432
    code = terrno;
×
433
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
434
    *ppRes = NULL;
×
435
    return code;
×
436
  }
437
  *ppChild1 = NULL;
21,041✔
438
  
439
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
21,041!
440
  if (NULL == pJoin) {
21,041!
441
    code = terrno;
×
442
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
UNCOV
443
    *ppRes = NULL;
×
UNCOV
444
    return code;
×
445
  }
446

447
  pJoin->initDownstream = initParam;
21,041✔
448
  
449
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
21,041✔
450
  (*ppRes)->value = pJoin;
21,041✔
451
  (*ppRes)->reUse = false;
21,041✔
452

453
  return TSDB_CODE_SUCCESS;
21,041✔
454
}
455

UNCOV
456
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
UNCOV
457
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
458
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
459
  if (NULL == *ppRes) {
×
UNCOV
460
    code = terrno;
×
UNCOV
461
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
462
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
463
    return code;
×
464
  }
UNCOV
465
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
UNCOV
466
  if (NULL == *ppRes) {
×
UNCOV
467
    code = terrno;
×
UNCOV
468
    taosMemoryFreeClear(*ppRes);
×
UNCOV
469
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
470
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
471
    return code;
×
472
  }
UNCOV
473
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
UNCOV
474
    code = terrno;
×
UNCOV
475
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
UNCOV
476
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
UNCOV
477
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
478
    *ppRes = NULL;
×
UNCOV
479
    return code;
×
480
  }
UNCOV
481
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
UNCOV
482
    code = terrno;
×
UNCOV
483
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
UNCOV
484
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
UNCOV
485
    *ppRes = NULL;
×
UNCOV
486
    return code;
×
487
  }
488
  
UNCOV
489
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
UNCOV
490
  (*ppRes)->value = NULL;
×
UNCOV
491
  (*ppRes)->reUse = false;
×
492

UNCOV
493
  return TSDB_CODE_SUCCESS;
×
494
}
495

496
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
49✔
497
  int32_t code = TSDB_CODE_SUCCESS;
49✔
498
  int32_t vgNum = tSimpleHashGetSize(pVg);
49✔
499
  if (vgNum <= 0 || vgNum > 1) {
49!
UNCOV
500
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
501
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
502
  }
503

504
  int32_t iter = 0;
49✔
505
  void* p = NULL;
49✔
506
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
98✔
507
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
49✔
508
    SArray* pUidList = *(SArray**)p;
49✔
509

510
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
49✔
511
    if (code) {
49!
512
      return code;
×
513
    }
514
    taosArrayDestroy(pUidList);
49✔
515
    *(SArray**)p = NULL;
49✔
516
  }
517
  
518
  return TSDB_CODE_SUCCESS;
49✔
519
}
520

521

UNCOV
522
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
UNCOV
523
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
UNCOV
524
  if (NULL == pUidList) {
×
UNCOV
525
    return terrno;
×
526
  }
UNCOV
527
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
528
    return terrno;
×
529
  }
530

UNCOV
531
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
UNCOV
532
  taosArrayDestroy(pUidList);
×
UNCOV
533
  if (code) {
×
UNCOV
534
    return code;
×
535
  }
536
  
537
  return TSDB_CODE_SUCCESS;
×
538
}
539

540
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
21,041✔
541
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
21,041✔
542
  SOperatorParam*             pSrcParam0 = NULL;
21,041✔
543
  SOperatorParam*             pSrcParam1 = NULL;
21,041✔
544
  SOperatorParam*             pGcParam0 = NULL;
21,041✔
545
  SOperatorParam*             pGcParam1 = NULL;  
21,041✔
546
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
21,041✔
547
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
21,041✔
548
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
21,041✔
549
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
21,041✔
550
  int32_t                     code = TSDB_CODE_SUCCESS;
21,041✔
551

552
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
21,041✔
553
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
554

555
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
21,041!
556
  
557
  if (pInfo->stbJoin.basic.batchFetch) {
21,041✔
558
    if (pPrev->leftHash) {
21,017✔
559
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
303✔
560
      if (TSDB_CODE_SUCCESS == code) {
303!
561
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
303✔
562
      }
563
      if (TSDB_CODE_SUCCESS == code) {
303!
564
        tSimpleHashCleanup(pPrev->leftHash);
303✔
565
        tSimpleHashCleanup(pPrev->rightHash);
303✔
566
        pPrev->leftHash = NULL;
303✔
567
        pPrev->rightHash = NULL;
303✔
568
      }
569
    }
570
  } else {
571
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
24!
572
    if (TSDB_CODE_SUCCESS == code) {
24!
573
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
24!
574
    }
575
  }
576

577
  bool initParam = pSrcParam0 ? true : false;
21,041✔
578
  if (TSDB_CODE_SUCCESS == code) {
21,041!
579
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
21,041✔
580
    pSrcParam0 = NULL;
21,041✔
581
  }
582
  if (TSDB_CODE_SUCCESS == code) {
21,041!
583
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
21,041✔
584
    pSrcParam1 = NULL;
21,041✔
585
  }
586
  if (TSDB_CODE_SUCCESS == code) {
21,041!
587
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
21,041✔
588
  }
589
  if (TSDB_CODE_SUCCESS != code) {
21,041!
590
    if (pSrcParam0) {
×
UNCOV
591
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
592
    }
UNCOV
593
    if (pSrcParam1) {
×
UNCOV
594
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
595
    }
UNCOV
596
    if (pGcParam0) {
×
597
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
598
    }
599
    if (pGcParam1) {
×
600
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
601
    }
602
    if (*ppParam) {
×
UNCOV
603
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
UNCOV
604
      *ppParam = NULL;
×
605
    }
606
  }
607
  
608
  return code;
21,041✔
609
}
610

611
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
21,041✔
612
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
21,041✔
613
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
21,041✔
614
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
21,041✔
615
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
21,041✔
616
  SOperatorParam*            pParam = NULL;
21,041✔
617
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
21,041✔
618
  if (TSDB_CODE_SUCCESS != code) {
21,041!
UNCOV
619
    pOperator->pTaskInfo->code = code;
×
UNCOV
620
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
621
  }
622

623
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
21,041✔
624
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
21,041✔
625
  if (*ppRes && (code == 0)) {
21,041!
626
    code = blockDataCheck(*ppRes);
734✔
627
    if (code) {
734!
UNCOV
628
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
UNCOV
629
      pOperator->pTaskInfo->code = code;
×
UNCOV
630
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
631
    }
632
    pPost->isStarted = true;
734✔
633
    pStbJoin->execInfo.postBlkNum++;
734✔
634
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
734✔
635
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
734✔
636
  } else {
637
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
20,307✔
638
  }
639
}
21,041✔
640

641

642
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
UNCOV
643
  SOperatorParam* pGcParam = NULL;
×
UNCOV
644
  SOperatorParam* pMergeJoinParam = NULL;
×
645
  int32_t         downstreamId = leftTable ? 0 : 1;
×
646
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
UNCOV
647
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
648

UNCOV
649
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
650

UNCOV
651
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
652
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
653
    return code;
×
654
  }
UNCOV
655
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
UNCOV
656
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
657
    return code;
×
658
  }
659

UNCOV
660
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
661
}
662

663
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
21,040✔
664
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
21,040✔
665
  int32_t code = 0;
21,040✔
666
  
667
  pPost->isStarted = false;
21,040✔
668
  
669
  if (pStbJoin->basic.batchFetch) {
21,040✔
670
    return TSDB_CODE_SUCCESS;
21,016✔
671
  }
672
  
673
  if (pPost->leftNeedCache) {
24!
674
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
675
    if (num && --(*num) <= 0) {
×
676
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
677
      if (code) {
×
678
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
679
        QRY_ERR_RET(code);
×
680
      }
681
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
682
    }
683
  }
684
  
685
  if (!pPost->rightNeedCache) {
24!
686
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
24✔
687
    if (NULL != v) {
24!
688
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
689
      if (code) {
×
UNCOV
690
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
UNCOV
691
        QRY_ERR_RET(code);
×
692
      }
UNCOV
693
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
694
    }
695
  }
696

697
  return TSDB_CODE_SUCCESS;
24✔
698
}
699

700

701
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
702
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,048✔
703
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
1,048✔
704
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
1,048✔
705

706
  if (!pPost->isStarted) {
1,048✔
707
    return TSDB_CODE_SUCCESS;
315✔
708
  }
709
  
710
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
733✔
711
  
712
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
733✔
713
  if (NULL == *ppRes) {
733!
714
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
733!
715
    pPrev->pListHead->readIdx++;
733✔
716
  } else {
717
    pInfo->stbJoin.execInfo.postBlkNum++;
×
UNCOV
718
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
719
  }
720

721
  return TSDB_CODE_SUCCESS;
733✔
722
}
723

724
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
725
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
42,042✔
726
  if (NULL == ppArray) {
42,042✔
727
    SArray* pArray = taosArrayInit(10, valSize);
975✔
728
    if (NULL == pArray) {
975!
729
      return terrno;
×
730
    }
731
    if (NULL == taosArrayPush(pArray, pVal)) {
1,950!
UNCOV
732
      taosArrayDestroy(pArray);
×
733
      return terrno;
×
734
    }
735
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
975!
UNCOV
736
      taosArrayDestroy(pArray);      
×
UNCOV
737
      return terrno;
×
738
    }
739
    return TSDB_CODE_SUCCESS;
975✔
740
  }
741

742
  if (NULL == taosArrayPush(*ppArray, pVal)) {
82,134!
UNCOV
743
    return terrno;
×
744
  }
745
  
746
  return TSDB_CODE_SUCCESS;
41,067✔
747
}
748

749
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
750
  int32_t code = TSDB_CODE_SUCCESS;
24✔
751
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
24✔
752
  if (NULL == pNum) {
24!
753
    uint32_t n = 1;
24✔
754
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
24✔
755
    if (code) {
24!
UNCOV
756
      return code;
×
757
    }
758
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
24✔
759
    if (code) {
24!
UNCOV
760
      return code;
×
761
    }
762
    return TSDB_CODE_SUCCESS;
24✔
763
  }
764

UNCOV
765
  switch (*pNum) {
×
UNCOV
766
    case 0:
×
UNCOV
767
      break;
×
768
    case UINT32_MAX:
×
UNCOV
769
      *pNum = 0;
×
UNCOV
770
      break;
×
UNCOV
771
    default:
×
772
      if (1 == (*pNum)) {
×
UNCOV
773
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
UNCOV
774
        if (code) {
×
UNCOV
775
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
776
          QRY_ERR_RET(code);
×
777
        }
778
      }
UNCOV
779
      (*pNum)++;
×
UNCOV
780
      break;
×
781
  }
782
  
UNCOV
783
  return TSDB_CODE_SUCCESS;
×
784
}
785

786

787
static void freeStbJoinTableList(SStbJoinTableList* pList) {
314✔
788
  if (NULL == pList) {
314!
UNCOV
789
    return;
×
790
  }
791
  taosMemoryFree(pList->pLeftVg);
314!
792
  taosMemoryFree(pList->pLeftUid);
314!
793
  taosMemoryFree(pList->pRightVg);
314!
794
  taosMemoryFree(pList->pRightUid);
314!
795
  taosMemoryFree(pList);
314!
796
}
797

798
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
315✔
799
  int32_t code = TSDB_CODE_SUCCESS;
315✔
800
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
315!
801
  if (NULL == pNew) {
315!
UNCOV
802
    return terrno;
×
803
  }
804
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
315!
805
  if (NULL == pNew->pLeftVg) {
315!
UNCOV
806
    code = terrno;
×
UNCOV
807
    freeStbJoinTableList(pNew);
×
UNCOV
808
    return code;
×
809
  }
810
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
315!
811
  if (NULL == pNew->pLeftUid) {
315!
UNCOV
812
    code = terrno;
×
813
    freeStbJoinTableList(pNew);
×
UNCOV
814
    return code;
×
815
  }
816
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
315!
817
  if (NULL == pNew->pRightVg) {
315!
UNCOV
818
    code = terrno;
×
UNCOV
819
    freeStbJoinTableList(pNew);
×
UNCOV
820
    return code;
×
821
  }
822
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
315!
823
  if (NULL == pNew->pRightUid) {
315!
UNCOV
824
    code = terrno;
×
UNCOV
825
    freeStbJoinTableList(pNew);
×
UNCOV
826
    return code;
×
827
  }
828

829
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
315✔
830
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
315✔
831
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
315✔
832
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
315✔
833

834
  pNew->readIdx = 0;
315✔
835
  pNew->uidNum = rows;
315✔
836
  pNew->pNext = NULL;
315✔
837
  
838
  if (pCtx->pListTail) {
315!
839
    pCtx->pListTail->pNext = pNew;
×
840
    pCtx->pListTail = pNew;
×
841
  } else {
842
    pCtx->pListHead = pNew;
315✔
843
    pCtx->pListTail= pNew;
315✔
844
  }
845

846
  return TSDB_CODE_SUCCESS;
315✔
847
}
848

849
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
315✔
850
  int32_t                    code = TSDB_CODE_SUCCESS;
315✔
851
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
315✔
852
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
315✔
853
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
315✔
854
  if (NULL == pVg0) {
315!
UNCOV
855
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
856
  }
857
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
315✔
858
  if (NULL == pVg1) {
315!
UNCOV
859
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
860
  }
861
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
315✔
862
  if (NULL == pUid0) {
315!
UNCOV
863
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
864
  }
865
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
315✔
866
  if (NULL == pUid1) {
315!
UNCOV
867
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
868
  }
869

870
  if (pStbJoin->basic.batchFetch) {
315✔
871
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
21,324✔
872
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
21,021✔
873
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
21,021✔
874
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
21,021✔
875
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
21,021✔
876

877
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
21,021✔
878
      if (TSDB_CODE_SUCCESS != code) {
21,021!
UNCOV
879
        break;
×
880
      }
881
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
21,021✔
882
      if (TSDB_CODE_SUCCESS != code) {
21,021!
UNCOV
883
        break;
×
884
      }
885
    }
886
  } else {
887
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
36✔
888
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
24✔
889
    
890
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
24✔
891
      if (TSDB_CODE_SUCCESS != code) {
24!
UNCOV
892
        break;
×
893
      }
894
    }
895
  }
896

897
  if (TSDB_CODE_SUCCESS == code) {
315!
898
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
315✔
899
    if (TSDB_CODE_SUCCESS == code) {
315!
900
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
315✔
901
    }
902
  }
903

UNCOV
904
_return:
×
905

906
  if (TSDB_CODE_SUCCESS != code) {
315!
UNCOV
907
    pOperator->pTaskInfo->code = code;
×
UNCOV
908
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
909
  }
910
}
315✔
911

912

913
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
11,035✔
914
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,035✔
915
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,035✔
916

917
  if (pStbJoin->basic.batchFetch) {
11,035✔
918
    return;
11,035✔
919
  }
920

921
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
12!
922
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
12✔
923
    return;
12✔
924
  }
925

UNCOV
926
  uint64_t* pUid = NULL;
×
UNCOV
927
  int32_t iter = 0;
×
928
  int32_t code = 0;
×
929
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
UNCOV
930
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
UNCOV
931
    if (code) {
×
UNCOV
932
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
933
    }
934
  }
935

UNCOV
936
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
UNCOV
937
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
938

939
/*
940
  // debug only
941
  iter = 0;
942
  uint32_t* num = NULL;
943
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
944
    A S S E R T(*num > 1);
945
  }
946
*/  
947
}
948

949
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
11,035✔
950
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,035✔
951
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,035✔
952

953
  while (true) {
315✔
954
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
11,350✔
955
    if (NULL == pBlock) {
11,350✔
956
      break;
11,035✔
957
    }
958

959
    pStbJoin->execInfo.prevBlkNum++;
315✔
960
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
315✔
961
    
962
    doBuildStbJoinTableHash(pOperator, pBlock);
315✔
963
  }
964

965
  postProcessStbJoinTableHash(pOperator);
11,035✔
966

967
  pStbJoin->ctx.prev.joinBuild = true;
11,035✔
968
}
11,035✔
969

970
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,048✔
971
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,048✔
972
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,048✔
973
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,048✔
974
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,048✔
975

976
  while (pNode) {
21,669✔
977
    if (pNode->readIdx >= pNode->uidNum) {
21,355✔
978
      pPrev->pListHead = pNode->pNext;
314✔
979
      freeStbJoinTableList(pNode);
314✔
980
      pNode = pPrev->pListHead;
314✔
981
      continue;
314✔
982
    }
983
    
984
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
21,041✔
985
    if (*ppRes) {
21,041✔
986
      return TSDB_CODE_SUCCESS;
734✔
987
    }
988

989
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
20,307!
990
    pPrev->pListHead->readIdx++;
20,307✔
991
  }
992

993
  *ppRes = NULL;
314✔
994
  setOperatorCompleted(pOperator);
314✔
995

996
  return TSDB_CODE_SUCCESS;
314✔
997
}
998

999
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
11,768✔
1000
  if (pBlock) {
11,768✔
1001
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
734!
1002
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
734✔
1003
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
734!
1004

1005
      for (int i = pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
752✔
1006
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
18✔
1007
        if (pSlot == NULL) {
18!
UNCOV
1008
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
UNCOV
1009
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1010
        }
1011
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
18✔
1012
        int32_t code = colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
18✔
1013
        if (code != TSDB_CODE_SUCCESS) {
18!
UNCOV
1014
          return code;
×
1015
        }
1016
        code = blockDataAppendColInfo(pBlock, &colInfo);
18✔
1017
        if (code != TSDB_CODE_SUCCESS) {
18!
1018
          return code;
×
1019
        }
1020
      }
1021
    } else {
UNCOV
1022
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
UNCOV
1023
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1024
    }
1025
  }
1026
  return TSDB_CODE_SUCCESS;
11,768✔
1027
}
1028

1029
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
12,029✔
1030
  int32_t                    code = TSDB_CODE_SUCCESS;
12,029✔
1031
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,029✔
1032
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
12,029✔
1033

1034
  QRY_PARAM_CHECK(pRes);
12,029!
1035
  if (pOperator->status == OP_EXEC_DONE) {
12,029✔
1036
    return code;
261✔
1037
  }
1038

1039
  int64_t st = 0;
11,768✔
1040
  if (pOperator->cost.openCost == 0) {
11,768✔
1041
    st = taosGetTimestampUs();
11,035✔
1042
  }
1043

1044
  if (!pStbJoin->ctx.prev.joinBuild) {
11,768✔
1045
    buildStbJoinTableList(pOperator);
11,035✔
1046
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
11,035✔
1047
      setOperatorCompleted(pOperator);
10,720✔
1048
      goto _return;
10,720✔
1049
    }
1050
  }
1051

1052
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
1,048!
1053
  if (*pRes) {
1,048!
UNCOV
1054
    goto _return;
×
1055
  }
1056

1057
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
1,048!
1058

1059
_return:
1,048✔
1060
  if (pOperator->cost.openCost == 0) {
11,768✔
1061
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
11,035✔
1062
  }
1063

1064
  if (code) {
11,768!
UNCOV
1065
    qError("%s failed since %s", __func__, tstrerror(code));
×
1066
    pOperator->pTaskInfo->code = code;
×
1067
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1068
  } else {
1069
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
11,768✔
1070
  }
1071
  return code;
11,768✔
1072
}
1073

UNCOV
1074
static int32_t buildVtbScanOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SOperatorParam** ppRes) {
×
UNCOV
1075
  int32_t                   code = TSDB_CODE_SUCCESS;
×
UNCOV
1076
  int32_t                   lino = 0;
×
UNCOV
1077
  SVTableScanOperatorParam* pVScan = NULL;
×
UNCOV
1078
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
1079
  QUERY_CHECK_NULL(*ppRes, code, lino, _return, terrno);
×
1080

UNCOV
1081
  (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
1082
  QUERY_CHECK_NULL((*ppRes)->pChildren, code, lino, _return, terrno);
×
1083

UNCOV
1084
  pVScan = taosMemoryMalloc(sizeof(SVTableScanOperatorParam));
×
UNCOV
1085
  QUERY_CHECK_NULL(pVScan, code, lino, _return, terrno);
×
UNCOV
1086
  pVScan->pOpParamArray = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
1087
  QUERY_CHECK_NULL(pVScan->pOpParamArray, code, lino, _return, terrno);
×
1088

UNCOV
1089
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_VIRTUAL_TABLE_SCAN;
×
UNCOV
1090
  (*ppRes)->downstreamIdx = 0;
×
UNCOV
1091
  (*ppRes)->value = pVScan;
×
UNCOV
1092
  (*ppRes)->reUse = false;
×
1093

UNCOV
1094
  return TSDB_CODE_SUCCESS;
×
UNCOV
1095
_return:
×
UNCOV
1096
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1097
  if (pVScan) {
×
UNCOV
1098
    taosArrayDestroy(pVScan->pOpParamArray);
×
UNCOV
1099
    taosMemoryFreeClear(pVScan);
×
1100
  }
UNCOV
1101
  if (*ppRes) {
×
UNCOV
1102
    taosArrayDestroy((*ppRes)->pChildren);
×
UNCOV
1103
    taosMemoryFreeClear(*ppRes);
×
1104
  }
UNCOV
1105
  return code;
×
1106
}
1107

UNCOV
1108
int32_t dynProcessUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
UNCOV
1109
  int32_t                    lino = 0;
×
UNCOV
1110
  SOperatorInfo*             operator=(SOperatorInfo*) param;
×
UNCOV
1111
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)operator->info;
×
1112

UNCOV
1113
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1114
    operator->pTaskInfo->code = rpcCvtErrCode(code);
×
UNCOV
1115
    if (operator->pTaskInfo->code != code) {
×
UNCOV
1116
      qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
×
1117
             tstrerror(operator->pTaskInfo->code));
1118
    } else {
UNCOV
1119
      qError("load systable rsp received, error:%s", tstrerror(code));
×
1120
    }
UNCOV
1121
    goto _return;
×
1122
  }
1123

UNCOV
1124
  pScanResInfo->vtbScan.pRsp = taosMemoryMalloc(sizeof(SUseDbRsp));
×
UNCOV
1125
  QUERY_CHECK_NULL(pScanResInfo->vtbScan.pRsp, code, lino, _return, terrno);
×
1126

UNCOV
1127
  QUERY_CHECK_CODE(tDeserializeSUseDbRsp(pMsg->pData, (int32_t)pMsg->len, pScanResInfo->vtbScan.pRsp), lino, _return);
×
1128

UNCOV
1129
  taosMemoryFreeClear(pMsg->pData);
×
1130

UNCOV
1131
  QUERY_CHECK_CODE(tsem_post(&pScanResInfo->vtbScan.ready), lino, _return);
×
1132

UNCOV
1133
  return TSDB_CODE_SUCCESS;
×
UNCOV
1134
_return:
×
UNCOV
1135
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1136
  return code;
×
1137
}
1138

UNCOV
1139
static int32_t buildDbVgInfoMap(SOperatorInfo* pOperator, SReadHandle* pHandle, SName* name, SExecTaskInfo* pTaskInfo, SUseDbOutput* output) {
×
UNCOV
1140
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
1141
  int32_t                    lino = 0;
×
UNCOV
1142
  char*                      buf1 = NULL;
×
UNCOV
1143
  SUseDbReq*                 pReq = NULL;
×
UNCOV
1144
  SDynQueryCtrlOperatorInfo* pScanResInfo = (SDynQueryCtrlOperatorInfo*)pOperator->info;
×
1145

UNCOV
1146
  pReq = taosMemoryMalloc(sizeof(SUseDbReq));
×
UNCOV
1147
  QUERY_CHECK_NULL(pReq, code, lino, _return, terrno);
×
UNCOV
1148
  QUERY_CHECK_CODE(tNameGetFullDbName(name, pReq->db), lino, _return);
×
UNCOV
1149
  int32_t contLen = tSerializeSUseDbReq(NULL, 0, pReq);
×
UNCOV
1150
  buf1 = taosMemoryCalloc(1, contLen);
×
UNCOV
1151
  QUERY_CHECK_NULL(buf1, code, lino, _return, terrno);
×
UNCOV
1152
  int32_t tempRes = tSerializeSUseDbReq(buf1, contLen, pReq);
×
UNCOV
1153
  if (tempRes < 0) {
×
UNCOV
1154
    QUERY_CHECK_CODE(terrno, lino, _return);
×
1155
  }
1156

1157
  // send the fetch remote task result request
UNCOV
1158
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
UNCOV
1159
  QUERY_CHECK_NULL(pMsgSendInfo, code, lino, _return, terrno);
×
1160

UNCOV
1161
  pMsgSendInfo->param = pOperator;
×
UNCOV
1162
  pMsgSendInfo->msgInfo.pData = buf1;
×
UNCOV
1163
  pMsgSendInfo->msgInfo.len = contLen;
×
UNCOV
1164
  pMsgSendInfo->msgType = TDMT_MND_GET_DB_INFO;
×
UNCOV
1165
  pMsgSendInfo->fp = dynProcessUseDbRsp;
×
UNCOV
1166
  pMsgSendInfo->requestId = pTaskInfo->id.queryId;
×
1167

UNCOV
1168
  QUERY_CHECK_CODE(asyncSendMsgToServer(pHandle->pMsgCb->clientRpc, &pScanResInfo->vtbScan.epSet, NULL, pMsgSendInfo), lino, _return);
×
1169

UNCOV
1170
  QUERY_CHECK_CODE(tsem_wait(&pScanResInfo->vtbScan.ready), lino, _return);
×
1171

UNCOV
1172
  QUERY_CHECK_CODE(queryBuildUseDbOutput(output, pScanResInfo->vtbScan.pRsp), lino, _return);
×
1173

UNCOV
1174
_return:
×
UNCOV
1175
  if (code) {
×
UNCOV
1176
     qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
1177
     taosMemoryFree(buf1);
×
1178
  }
UNCOV
1179
  taosMemoryFree(pReq);
×
UNCOV
1180
  tFreeSUsedbRsp(pScanResInfo->vtbScan.pRsp);
×
UNCOV
1181
  taosMemoryFreeClear(pScanResInfo->vtbScan.pRsp);
×
UNCOV
1182
  return code;
×
1183
}
1184

UNCOV
1185
int dynVgInfoComp(const void* lp, const void* rp) {
×
UNCOV
1186
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
×
UNCOV
1187
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
×
UNCOV
1188
  if (pLeft->hashBegin < pRight->hashBegin) {
×
UNCOV
1189
    return -1;
×
UNCOV
1190
  } else if (pLeft->hashBegin > pRight->hashBegin) {
×
UNCOV
1191
    return 1;
×
1192
  }
1193

UNCOV
1194
  return 0;
×
1195
}
1196

UNCOV
1197
int32_t dynMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
×
UNCOV
1198
  if (NULL == dbInfo) {
×
UNCOV
1199
    return TSDB_CODE_SUCCESS;
×
1200
  }
1201

UNCOV
1202
  if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
×
UNCOV
1203
    int32_t vgSize = taosHashGetSize(dbInfo->vgHash);
×
UNCOV
1204
    dbInfo->vgArray = taosArrayInit(vgSize, sizeof(SVgroupInfo));
×
UNCOV
1205
    if (NULL == dbInfo->vgArray) {
×
UNCOV
1206
      return terrno;
×
1207
    }
1208

UNCOV
1209
    void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
×
UNCOV
1210
    while (pIter) {
×
UNCOV
1211
      if (NULL == taosArrayPush(dbInfo->vgArray, pIter)) {
×
UNCOV
1212
        taosHashCancelIterate(dbInfo->vgHash, pIter);
×
UNCOV
1213
        return terrno;
×
1214
      }
1215

UNCOV
1216
      pIter = taosHashIterate(dbInfo->vgHash, pIter);
×
1217
    }
1218

UNCOV
1219
    taosArraySort(dbInfo->vgArray, sort_func);
×
1220
  }
1221

UNCOV
1222
  return TSDB_CODE_SUCCESS;
×
1223
}
1224

UNCOV
1225
int32_t dynHashValueComp(void const* lp, void const* rp) {
×
UNCOV
1226
  uint32_t*    key = (uint32_t*)lp;
×
UNCOV
1227
  SVgroupInfo* pVg = (SVgroupInfo*)rp;
×
1228

UNCOV
1229
  if (*key < pVg->hashBegin) {
×
UNCOV
1230
    return -1;
×
UNCOV
1231
  } else if (*key > pVg->hashEnd) {
×
UNCOV
1232
    return 1;
×
1233
  }
1234

UNCOV
1235
  return 0;
×
1236
}
1237

UNCOV
1238
int32_t getVgId(SDBVgInfo* dbInfo, char* dbFName, int32_t* vgId, char *tbName) {
×
UNCOV
1239
  int32_t code = 0;
×
UNCOV
1240
  int32_t lino = 0;
×
UNCOV
1241
  QUERY_CHECK_CODE(dynMakeVgArraySortBy(dbInfo, dynVgInfoComp), lino, _return);
×
1242

UNCOV
1243
  int32_t vgNum = (int32_t)taosArrayGetSize(dbInfo->vgArray);
×
UNCOV
1244
  if (vgNum <= 0) {
×
UNCOV
1245
    qError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
×
UNCOV
1246
    QUERY_CHECK_CODE(TSDB_CODE_TSC_DB_NOT_SELECTED, lino, _return);
×
1247
  }
1248

UNCOV
1249
  SVgroupInfo* vgInfo = NULL;
×
1250
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
UNCOV
1251
  (void)snprintf(tbFullName, sizeof(tbFullName), "%s.", dbFName);
×
UNCOV
1252
  int32_t offset = (int32_t)strlen(tbFullName);
×
1253

UNCOV
1254
  (void)snprintf(tbFullName + offset, sizeof(tbFullName) - offset, "%s", tbName);
×
UNCOV
1255
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (int32_t)strlen(tbFullName), dbInfo->hashMethod,
×
UNCOV
1256
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
×
1257

UNCOV
1258
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, dynHashValueComp, TD_EQ);
×
UNCOV
1259
  if (NULL == vgInfo) {
×
UNCOV
1260
    qError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
×
1261
           (int32_t)taosArrayGetSize(dbInfo->vgArray));
UNCOV
1262
    return TSDB_CODE_CTG_INTERNAL_ERROR;
×
1263
  }
1264

UNCOV
1265
  *vgId = vgInfo->vgId;
×
1266

UNCOV
1267
_return:
×
UNCOV
1268
  return code;
×
1269
}
1270

UNCOV
1271
bool colNeedScan(SOperatorInfo* pOperator, col_id_t colId) {
×
UNCOV
1272
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
1273
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
1274
  SArray *                   pColList = pVtbScan->readColList;
×
UNCOV
1275
  if (pVtbScan->scanAllCols) {
×
UNCOV
1276
    return true;
×
1277
  }
UNCOV
1278
  for (int32_t i = 0; i < taosArrayGetSize(pColList); i++) {
×
UNCOV
1279
    if (colId == *(col_id_t*)taosArrayGet(pColList, i)) {
×
UNCOV
1280
      return true;
×
1281
    }
1282
  }
UNCOV
1283
  return false;
×
1284
}
1285

UNCOV
1286
void destroyOrgTbInfo(void *info) {
×
UNCOV
1287
  SOrgTbInfo *pOrgTbInfo = (SOrgTbInfo *)info;
×
UNCOV
1288
  if (pOrgTbInfo) {
×
UNCOV
1289
    taosArrayDestroy(pOrgTbInfo->colMap);
×
1290
  }
UNCOV
1291
}
×
1292

UNCOV
1293
int32_t vtbScan(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
UNCOV
1294
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
1295
  int32_t                    line = 0;
×
UNCOV
1296
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
1297
  SVtbScanDynCtrlInfo*       pVtbScan = (SVtbScanDynCtrlInfo*)&pInfo->vtbScan;
×
UNCOV
1298
  SExecTaskInfo*             pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
1299
  SReadHandle*               pHandle = &pVtbScan->readHandle;
×
UNCOV
1300
  SMetaReader                mr = {0};
×
UNCOV
1301
  SHashObj*                  orgTbVgColMap = NULL;
×
1302

UNCOV
1303
  QRY_PARAM_CHECK(pRes);
×
UNCOV
1304
  if (pOperator->status == OP_EXEC_DONE) {
×
UNCOV
1305
    return code;
×
1306
  }
1307

UNCOV
1308
  int64_t st = 0;
×
UNCOV
1309
  if (pOperator->cost.openCost == 0) {
×
UNCOV
1310
    st = taosGetTimestampUs();
×
1311
  }
1312

UNCOV
1313
  size_t num = taosArrayGetSize(pVtbScan->childTableList);
×
1314

UNCOV
1315
  if (num == 0) {
×
UNCOV
1316
    setOperatorCompleted(pOperator);
×
UNCOV
1317
    return code;
×
1318
  }
1319

1320
  // TODO(smj) : proper hash size
UNCOV
1321
  orgTbVgColMap = taosHashInit(num * 64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
UNCOV
1322
  QUERY_CHECK_NULL(orgTbVgColMap, code, line, _return, terrno);
×
UNCOV
1323
  taosHashSetFreeFp(orgTbVgColMap, destroyOrgTbInfo);
×
1324

1325
  while (true) {
UNCOV
1326
    if (pVtbScan->readTableIdx == pVtbScan->lastTableIdx) {
×
UNCOV
1327
      QUERY_CHECK_CODE(pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0], pRes), line, _return);
×
1328
    } else {
UNCOV
1329
      uint64_t* id = taosArrayGet(pVtbScan->childTableList, pVtbScan->readTableIdx);
×
UNCOV
1330
      QUERY_CHECK_NULL(id, code, line, _return, terrno);
×
UNCOV
1331
      pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn);
×
UNCOV
1332
      QUERY_CHECK_CODE(pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id), line, _return);
×
1333

UNCOV
1334
      for (int32_t j = 0; j < mr.me.colRef.nCols; j++) {
×
UNCOV
1335
        if (mr.me.colRef.pColRef[j].hasRef && colNeedScan(pOperator, mr.me.colRef.pColRef[j].id)) {
×
UNCOV
1336
          SName name = {0};
×
UNCOV
1337
          toName(pInfo->vtbScan.acctId, mr.me.colRef.pColRef[j].refDbName, "", &name);
×
UNCOV
1338
          SUseDbOutput*  output = NULL;
×
UNCOV
1339
          SUseDbOutput** find = (SUseDbOutput**)taosHashGet(pInfo->vtbScan.dbVgInfoMap, name.dbname, strlen(name.dbname));
×
UNCOV
1340
          if (find == NULL) {
×
UNCOV
1341
            output = taosMemoryMalloc(sizeof(SUseDbOutput));
×
UNCOV
1342
            QUERY_CHECK_CODE(buildDbVgInfoMap(pOperator, pHandle, &name, pTaskInfo, output), line, _return);
×
UNCOV
1343
            QUERY_CHECK_CODE(taosHashPut(pInfo->vtbScan.dbVgInfoMap, name.dbname, strlen(name.dbname), &output, sizeof(output)), line, _return);
×
1344
          } else {
UNCOV
1345
            output = *find;
×
1346
          }
UNCOV
1347
          int32_t vgId = 0;
×
UNCOV
1348
          char dbFname[TSDB_DB_FNAME_LEN] = {0};
×
UNCOV
1349
          QUERY_CHECK_CODE(tNameGetFullDbName(&name, dbFname), line, _return);
×
UNCOV
1350
          QUERY_CHECK_CODE(getVgId(output->dbVgroup, dbFname, &vgId, mr.me.colRef.pColRef[j].refTableName), line, _return);
×
UNCOV
1351
          char orgTbFName[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
1352
          TAOS_STRNCAT(orgTbFName, mr.me.colRef.pColRef[j].refDbName, TSDB_DB_NAME_LEN);
×
UNCOV
1353
          TAOS_STRNCAT(orgTbFName, ".", 2);
×
UNCOV
1354
          TAOS_STRNCAT(orgTbFName, mr.me.colRef.pColRef[j].refTableName, TSDB_TABLE_NAME_LEN);
×
1355

UNCOV
1356
          void *tbVgCol = taosHashGet(orgTbVgColMap, orgTbFName, sizeof(orgTbFName));
×
UNCOV
1357
          if (!tbVgCol) {
×
UNCOV
1358
            SOrgTbInfo map = {0};
×
UNCOV
1359
            map.vgId = vgId;
×
UNCOV
1360
            tstrncpy(map.tbName, orgTbFName, sizeof(map.tbName));
×
UNCOV
1361
            map.colMap = taosArrayInit(10, sizeof(SColIdNameKV));
×
UNCOV
1362
            QUERY_CHECK_NULL(map.colMap, code, line, _return, terrno);
×
UNCOV
1363
            SColIdNameKV colIdNameKV = {0};
×
UNCOV
1364
            colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
×
UNCOV
1365
            tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
×
UNCOV
1366
            QUERY_CHECK_NULL(taosArrayPush(map.colMap, &colIdNameKV), code, line, _return, terrno);
×
UNCOV
1367
            QUERY_CHECK_CODE(taosHashPut(orgTbVgColMap, orgTbFName, sizeof(orgTbFName), &map, sizeof(map)), line, _return);
×
1368
          } else {
UNCOV
1369
            SOrgTbInfo *map = (SOrgTbInfo *)tbVgCol;
×
UNCOV
1370
            SColIdNameKV colIdNameKV = {0};
×
UNCOV
1371
            colIdNameKV.colId = mr.me.colRef.pColRef[j].id;
×
UNCOV
1372
            tstrncpy(colIdNameKV.colName, mr.me.colRef.pColRef[j].refColName, sizeof(colIdNameKV.colName));
×
UNCOV
1373
            QUERY_CHECK_NULL(taosArrayPush(map->colMap, &colIdNameKV), code, line, _return, terrno);
×
1374
          }
1375
        }
1376
      }
1377

UNCOV
1378
      pVtbScan->vtbScanParam = NULL;
×
UNCOV
1379
      QUERY_CHECK_CODE(buildVtbScanOperatorParam(pInfo, &pVtbScan->vtbScanParam), line, _return);
×
UNCOV
1380
      ((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->uid = *id;
×
1381

UNCOV
1382
      void* pIter = taosHashIterate(orgTbVgColMap, NULL);
×
UNCOV
1383
      while (pIter != NULL) {
×
UNCOV
1384
        SOrgTbInfo*      pMap = (SOrgTbInfo*)pIter;
×
UNCOV
1385
        SOperatorParam*  pExchangeParam = NULL;
×
UNCOV
1386
        QUERY_CHECK_CODE(buildExchangeOperatorParamForVScan(&pExchangeParam, 0, pMap), line, _return);
×
UNCOV
1387
        QUERY_CHECK_NULL(taosArrayPush(((SVTableScanOperatorParam*)pVtbScan->vtbScanParam->value)->pOpParamArray, &pExchangeParam), code, line, _return, terrno);
×
UNCOV
1388
        pIter = taosHashIterate(orgTbVgColMap, pIter);
×
1389
      }
UNCOV
1390
      pHandle->api.metaReaderFn.clearReader(&mr);
×
UNCOV
1391
      pOperator->pDownstream[0]->status = OP_NOT_OPENED;
×
UNCOV
1392
      QUERY_CHECK_CODE(pOperator->pDownstream[0]->fpSet.getNextExtFn(pOperator->pDownstream[0], pVtbScan->vtbScanParam, pRes), line, _return);
×
1393
    }
1394

UNCOV
1395
    if (*pRes) {
×
UNCOV
1396
      pVtbScan->lastTableIdx = pVtbScan->readTableIdx;
×
UNCOV
1397
      break;
×
1398
    } else {
UNCOV
1399
      pVtbScan->readTableIdx++;
×
UNCOV
1400
      if (pVtbScan->readTableIdx >= taosArrayGetSize(pVtbScan->childTableList)) {
×
UNCOV
1401
        setOperatorCompleted(pOperator);
×
UNCOV
1402
        break;
×
1403
      }
1404
    }
1405
  }
1406

UNCOV
1407
_return:
×
UNCOV
1408
  taosHashCleanup(orgTbVgColMap);
×
UNCOV
1409
  if (pOperator->cost.openCost == 0) {
×
UNCOV
1410
    pOperator->cost.openCost = (double)(taosGetTimestampUs() - st) / 1000.0;
×
1411
  }
1412

UNCOV
1413
  if (code) {
×
UNCOV
1414
    qError("%s failed since %s", __func__, tstrerror(code));
×
UNCOV
1415
    pOperator->pTaskInfo->code = code;
×
UNCOV
1416
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
1417
  }
1418

UNCOV
1419
  return code;
×
1420
}
1421

1422
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
11,035✔
1423
  if (batchFetch) {
11,035✔
1424
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
11,023✔
1425
    if (NULL == pPrev->leftHash) {
11,023!
UNCOV
1426
      return terrno;
×
1427
    }
1428
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
11,023✔
1429
    if (NULL == pPrev->rightHash) {
11,023!
UNCOV
1430
      return terrno;
×
1431
    }
1432
  } else {
1433
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
1434
    if (NULL == pPrev->leftCache) {
12!
UNCOV
1435
      return terrno;
×
1436
    }
1437
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
1438
    if (NULL == pPrev->rightCache) {
12!
UNCOV
1439
      return terrno;
×
1440
    }
1441
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
1442
    if (NULL == pPrev->onceTable) {
12!
UNCOV
1443
      return terrno;
×
1444
    }
1445
  }
1446

1447
  return TSDB_CODE_SUCCESS;
11,035✔
1448
}
1449

UNCOV
1450
static int32_t initVtbScanInfo(SOperatorInfo* pOperator, SDynQueryCtrlOperatorInfo* pInfo, SReadHandle* pHandle,
×
1451
                               SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
UNCOV
1452
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
1453
  int32_t      line = 0;
×
1454

UNCOV
1455
  QUERY_CHECK_CODE(tsem_init(&pInfo->vtbScan.ready, 0, 0), line, _return);
×
1456

UNCOV
1457
  pInfo->vtbScan.scanAllCols = pPhyciNode->vtbScan.scanAllCols;
×
UNCOV
1458
  pInfo->vtbScan.suid = pPhyciNode->vtbScan.suid;
×
UNCOV
1459
  pInfo->vtbScan.epSet = pPhyciNode->vtbScan.mgmtEpSet;
×
UNCOV
1460
  pInfo->vtbScan.acctId = pPhyciNode->vtbScan.accountId;
×
UNCOV
1461
  pInfo->vtbScan.readHandle = *pHandle;
×
UNCOV
1462
  pInfo->vtbScan.readTableIdx = 0;
×
UNCOV
1463
  pInfo->vtbScan.lastTableIdx = -1;
×
1464

UNCOV
1465
  pInfo->vtbScan.readColList = taosArrayInit(LIST_LENGTH(pPhyciNode->vtbScan.pScanCols), sizeof(col_id_t));
×
UNCOV
1466
  QUERY_CHECK_NULL(pInfo->vtbScan.readColList, code, line, _return, terrno);
×
1467

UNCOV
1468
  for (int32_t i = 0; i < LIST_LENGTH(pPhyciNode->vtbScan.pScanCols); ++i) {
×
UNCOV
1469
    SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pPhyciNode->vtbScan.pScanCols, i);
×
UNCOV
1470
    QUERY_CHECK_NULL(pNode, code, line, _return, terrno);
×
UNCOV
1471
    QUERY_CHECK_NULL(taosArrayPush(pInfo->vtbScan.readColList, &pNode->colId), code, line, _return, terrno);
×
1472
  }
1473

UNCOV
1474
  pInfo->vtbScan.childTableList = taosArrayInit(10, sizeof(uint64_t));
×
UNCOV
1475
  QUERY_CHECK_CODE(pHandle->api.metaFn.getChildTableList(pHandle->vnode, pInfo->vtbScan.suid, pInfo->vtbScan.childTableList), line, _return);
×
1476

UNCOV
1477
  pInfo->vtbScan.dbVgInfoMap = taosHashInit(taosArrayGetSize(pInfo->vtbScan.childTableList), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
UNCOV
1478
  QUERY_CHECK_NULL(pInfo->vtbScan.dbVgInfoMap, code, line, _return, terrno);
×
1479

UNCOV
1480
_return:
×
UNCOV
1481
  return code;
×
1482
}
1483

1484
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
11,035✔
1485
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1486
                                       SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
1487
  QRY_PARAM_CHECK(pOptrInfo);
11,035!
1488

1489
  int32_t                    code = TSDB_CODE_SUCCESS;
11,035✔
1490
  __optr_fn_t                nextFp = NULL;
11,035✔
1491
  SOperatorInfo*             pOperator = NULL;
11,035✔
1492
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
11,035!
1493
  if (pInfo == NULL) {
11,035!
UNCOV
1494
    code = terrno;
×
UNCOV
1495
    goto _error;
×
1496
  }
1497

1498
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
11,035!
1499
  if (pOperator == NULL) {
11,035!
UNCOV
1500
    code = terrno;
×
UNCOV
1501
    goto _error;
×
1502
  }
1503

1504
  pTaskInfo->dynamicTask = (int8_t)pPhyciNode->node.dynamicOp;
11,035✔
1505

1506
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
11,035✔
1507
  if (TSDB_CODE_SUCCESS != code) {
11,035!
UNCOV
1508
    goto _error;
×
1509
  }
1510

1511
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
11,035✔
1512
                  pInfo, pTaskInfo);
1513

1514
  pInfo->qType = pPhyciNode->qType;
11,035✔
1515
  switch (pInfo->qType) {
11,035!
1516
    case DYN_QTYPE_STB_HASH:
11,035✔
1517
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
11,035✔
1518
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
11,035✔
1519
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
11,035✔
1520
      if (TSDB_CODE_SUCCESS != code) {
11,035!
UNCOV
1521
        goto _error;
×
1522
      }
1523
      nextFp = seqStableJoin;
11,035✔
1524
      break;
11,035✔
UNCOV
1525
    case DYN_QTYPE_VTB_SCAN:
×
UNCOV
1526
      QUERY_CHECK_CODE(initVtbScanInfo(pOperator, pInfo, pHandle, pPhyciNode, pTaskInfo), code, _error);
×
UNCOV
1527
      nextFp = vtbScan;
×
UNCOV
1528
      break;
×
UNCOV
1529
    default:
×
UNCOV
1530
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
UNCOV
1531
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1532
      goto _error;
×
1533
  }
1534

1535
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
11,035✔
1536
                                         NULL, optrDefaultGetNextExtFn, NULL);
1537

1538
  *pOptrInfo = pOperator;
11,035✔
1539
  return TSDB_CODE_SUCCESS;
11,035✔
1540

UNCOV
1541
_error:
×
UNCOV
1542
  if (pInfo != NULL) {
×
UNCOV
1543
    destroyDynQueryCtrlOperator(pInfo);
×
1544
  }
1545

UNCOV
1546
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
UNCOV
1547
  pTaskInfo->code = code;
×
UNCOV
1548
  return code;
×
1549
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc