• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "nodes.h"
20
#include "operator.h"
21
#include "os.h"
22
#include "plannodes.h"
23
#include "query.h"
24
#include "querynodes.h"
25
#include "querytask.h"
26
#include "tarray.h"
27
#include "tcompare.h"
28
#include "tdatablock.h"
29
#include "thash.h"
30
#include "tmsg.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

UNCOV
40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
×
UNCOV
41
  SStbJoinTableList* pNext = NULL;
×
42
  
UNCOV
43
  while (pListHead) {
×
UNCOV
44
    taosMemoryFree(pListHead->pLeftVg);
×
UNCOV
45
    taosMemoryFree(pListHead->pLeftUid);
×
UNCOV
46
    taosMemoryFree(pListHead->pRightVg);
×
UNCOV
47
    taosMemoryFree(pListHead->pRightUid);
×
UNCOV
48
    pNext = pListHead->pNext;
×
UNCOV
49
    taosMemoryFree(pListHead);
×
UNCOV
50
    pListHead = pNext;
×
51
  }
UNCOV
52
}
×
53

UNCOV
54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
×
UNCOV
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
×
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

UNCOV
59
  if (pStbJoin->basic.batchFetch) {
×
UNCOV
60
    if (pStbJoin->ctx.prev.leftHash) {
×
UNCOV
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
×
UNCOV
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
×
63
    }
UNCOV
64
    if (pStbJoin->ctx.prev.rightHash) {
×
UNCOV
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
×
UNCOV
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
×
67
    }
68
  } else {
UNCOV
69
    if (pStbJoin->ctx.prev.leftCache) {
×
UNCOV
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
×
71
    }
UNCOV
72
    if (pStbJoin->ctx.prev.rightCache) {
×
UNCOV
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
×
74
    }
UNCOV
75
    if (pStbJoin->ctx.prev.onceTable) {
×
UNCOV
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
×
77
    }
78
  }
79

UNCOV
80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
×
UNCOV
81
}
×
82

UNCOV
83
static void destroyDynQueryCtrlOperator(void* param) {
×
UNCOV
84
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
×
85

UNCOV
86
  switch (pDyn->qType) {
×
UNCOV
87
    case DYN_QTYPE_STB_HASH:
×
UNCOV
88
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
×
UNCOV
89
      break;
×
90
    default:
×
91
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
92
      break;
×
93
  }
94

UNCOV
95
  taosMemoryFreeClear(param);
×
UNCOV
96
}
×
97

98
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
UNCOV
99
  if (batchFetch) {
×
UNCOV
100
    return true;
×
101
  }
102
  
UNCOV
103
  if (rightTable) {
×
UNCOV
104
    return pPost->rightCurrUid == pPost->rightNextUid;
×
105
  }
106

UNCOV
107
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
×
108

UNCOV
109
  return (NULL == num) ? false : true;
×
110
}
111

UNCOV
112
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
×
UNCOV
113
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
×
UNCOV
114
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
×
UNCOV
115
  SStbJoinTableList*         pNode = pPrev->pListHead;
×
UNCOV
116
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
×
UNCOV
117
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
×
UNCOV
118
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
×
UNCOV
119
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
×
UNCOV
120
  int64_t                    readIdx = pNode->readIdx + 1;
×
UNCOV
121
  int64_t                    rightPrevUid = pPost->rightCurrUid;
×
122

UNCOV
123
  pPost->leftCurrUid = *leftUid;
×
UNCOV
124
  pPost->rightCurrUid = *rightUid;
×
125

UNCOV
126
  pPost->leftVgId = *leftVgId;
×
UNCOV
127
  pPost->rightVgId = *rightVgId;
×
128

129
  while (true) {
UNCOV
130
    if (readIdx < pNode->uidNum) {
×
UNCOV
131
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
×
UNCOV
132
      break;
×
133
    }
134
    
UNCOV
135
    pNode = pNode->pNext;
×
UNCOV
136
    if (NULL == pNode) {
×
UNCOV
137
      pPost->rightNextUid = 0;
×
UNCOV
138
      break;
×
139
    }
140
    
141
    rightUid = pNode->pRightUid;
×
142
    readIdx = 0;
×
143
  }
144

UNCOV
145
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
×
UNCOV
146
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
×
147

UNCOV
148
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
×
149
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
150
    pStbJoin->execInfo.rightCacheNum++;
×
151
  }  
152

UNCOV
153
  return TSDB_CODE_SUCCESS;
×
154
}
155

156

UNCOV
157
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
×
UNCOV
158
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
159
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
160
  if (NULL == *ppRes) {
×
161
    code = terrno;
×
162
    freeOperatorParam(pChild, OP_GET_PARAM);
×
163
    return code;
×
164
  }
UNCOV
165
  if (pChild) {
×
UNCOV
166
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
×
UNCOV
167
    if (NULL == (*ppRes)->pChildren) {
×
168
      code = terrno;
×
169
      freeOperatorParam(pChild, OP_GET_PARAM);
×
170
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
171
      *ppRes = NULL;
×
172
      return code;
×
173
    }
UNCOV
174
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
×
175
      code = terrno;
×
176
      freeOperatorParam(pChild, OP_GET_PARAM);
×
177
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
178
      *ppRes = NULL;
×
179
      return code;
×
180
    }
181
  } else {
UNCOV
182
    (*ppRes)->pChildren = NULL;
×
183
  }
184

UNCOV
185
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
×
UNCOV
186
  if (NULL == pGc) {
×
187
    code = terrno;
×
188
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
189
    *ppRes = NULL;
×
190
    return code;
×
191
  }
192

UNCOV
193
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
×
UNCOV
194
  pGc->downstreamIdx = downstreamIdx;
×
UNCOV
195
  pGc->vgId = vgId;
×
UNCOV
196
  pGc->tbUid = tbUid;
×
UNCOV
197
  pGc->needCache = needCache;
×
198

UNCOV
199
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
UNCOV
200
  (*ppRes)->downstreamIdx = downstreamIdx;
×
UNCOV
201
  (*ppRes)->value = pGc;
×
202

UNCOV
203
  return TSDB_CODE_SUCCESS;
×
204
}
205

206

207
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
208
  int32_t code = TSDB_CODE_SUCCESS;
×
209
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
210
  if (NULL == *ppRes) {
×
211
    return terrno;
×
212
  }
213
  (*ppRes)->pChildren = NULL;
×
214

215
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
216
  if (NULL == pGc) {
×
217
    code = terrno;
×
218
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
219
    return code;
×
220
  }
221

222
  pGc->downstreamIdx = downstreamIdx;
×
223
  pGc->vgId = vgId;
×
224
  pGc->tbUid = tbUid;
×
225

226
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
227
  (*ppRes)->downstreamIdx = downstreamIdx;
×
228
  (*ppRes)->value = pGc;
×
229

230
  return TSDB_CODE_SUCCESS;
×
231
}
232

233

UNCOV
234
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
UNCOV
235
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
236
  if (NULL == *ppRes) {
×
237
    return terrno;
×
238
  }
UNCOV
239
  (*ppRes)->pChildren = NULL;
×
240
  
UNCOV
241
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
×
UNCOV
242
  if (NULL == pExc) {
×
243
    return terrno;
×
244
  }
245

UNCOV
246
  pExc->multiParams = false;
×
UNCOV
247
  pExc->basic.vgId = *pVgId;
×
UNCOV
248
  pExc->basic.tableSeq = true;
×
UNCOV
249
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
UNCOV
250
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
×
UNCOV
251
  if (NULL == pExc->basic.uidList) {
×
252
    taosMemoryFree(pExc);
×
253
    return terrno;
×
254
  }
UNCOV
255
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
×
256
    taosArrayDestroy(pExc->basic.uidList);
×
257
    taosMemoryFree(pExc);
×
258
    return terrno;
×
259
  }
260

UNCOV
261
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
UNCOV
262
  (*ppRes)->downstreamIdx = downstreamIdx;
×
UNCOV
263
  (*ppRes)->value = pExc;
×
264
  
UNCOV
265
  return TSDB_CODE_SUCCESS;
×
266
}
267

268

UNCOV
269
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
×
UNCOV
270
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
271
  if (NULL == *ppRes) {
×
272
    return terrno;
×
273
  }
UNCOV
274
  (*ppRes)->pChildren = NULL;
×
275
  
UNCOV
276
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
×
UNCOV
277
  if (NULL == pExc) {
×
278
    taosMemoryFreeClear(*ppRes);
×
279
    return terrno;
×
280
  }
281

UNCOV
282
  pExc->multiParams = true;
×
UNCOV
283
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
284
  if (NULL == pExc->pBatchs) {
×
285
    taosMemoryFree(pExc);
×
286
    taosMemoryFreeClear(*ppRes);
×
287
    return terrno;
×
288
  }
UNCOV
289
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
×
290
  
291
  SExchangeOperatorBasicParam basic;
UNCOV
292
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
×
293

UNCOV
294
  int32_t iter = 0;
×
UNCOV
295
  void* p = NULL;
×
UNCOV
296
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
UNCOV
297
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
×
UNCOV
298
    SArray* pUidList = *(SArray**)p;
×
UNCOV
299
    basic.vgId = *pVgId;
×
UNCOV
300
    basic.uidList = pUidList;
×
UNCOV
301
    basic.tableSeq = false;
×
302
    
UNCOV
303
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
×
304

UNCOV
305
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
×
UNCOV
306
    *(SArray**)p = NULL;
×
307
  }
308

UNCOV
309
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
×
UNCOV
310
  (*ppRes)->downstreamIdx = downstreamIdx;
×
UNCOV
311
  (*ppRes)->value = pExc;
×
312
  
UNCOV
313
  return TSDB_CODE_SUCCESS;
×
314
}
315

316

UNCOV
317
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
×
UNCOV
318
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
319
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
UNCOV
320
  if (NULL == *ppRes) {
×
321
    code = terrno;
×
322
    return code;
×
323
  }
UNCOV
324
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
UNCOV
325
  if (NULL == (*ppRes)->pChildren) {
×
326
    code = terrno;
×
327
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
328
    *ppRes = NULL;
×
329
    return code;
×
330
  }
UNCOV
331
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
×
332
    code = terrno;
×
333
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
334
    *ppRes = NULL;
×
335
    return code;
×
336
  }
UNCOV
337
  *ppChild0 = NULL;
×
UNCOV
338
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
×
339
    code = terrno;
×
340
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
341
    *ppRes = NULL;
×
342
    return code;
×
343
  }
UNCOV
344
  *ppChild1 = NULL;
×
345
  
UNCOV
346
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
×
UNCOV
347
  if (NULL == pJoin) {
×
348
    code = terrno;
×
349
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
350
    *ppRes = NULL;
×
351
    return code;
×
352
  }
353

UNCOV
354
  pJoin->initDownstream = initParam;
×
355
  
UNCOV
356
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
UNCOV
357
  (*ppRes)->value = pJoin;
×
358

UNCOV
359
  return TSDB_CODE_SUCCESS;
×
360
}
361

362

363
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
364
  int32_t code = TSDB_CODE_SUCCESS;
×
365
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
366
  if (NULL == *ppRes) {
×
367
    code = terrno;
×
368
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
369
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
370
    return code;
×
371
  }
372
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
373
  if (NULL == *ppRes) {
×
374
    code = terrno;
×
375
    taosMemoryFreeClear(*ppRes);
×
376
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
377
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
378
    return code;
×
379
  }
380
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
381
    code = terrno;
×
382
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
383
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
384
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
385
    *ppRes = NULL;
×
386
    return code;
×
387
  }
388
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
389
    code = terrno;
×
390
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
391
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
392
    *ppRes = NULL;
×
393
    return code;
×
394
  }
395
  
396
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
397
  (*ppRes)->value = NULL;
×
398

399
  return TSDB_CODE_SUCCESS;
×
400
}
401

402

403

UNCOV
404
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
×
UNCOV
405
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
406
  int32_t vgNum = tSimpleHashGetSize(pVg);
×
UNCOV
407
  if (vgNum <= 0 || vgNum > 1) {
×
408
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
409
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
410
  }
411

UNCOV
412
  int32_t iter = 0;
×
UNCOV
413
  void* p = NULL;
×
UNCOV
414
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
×
UNCOV
415
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
×
UNCOV
416
    SArray* pUidList = *(SArray**)p;
×
417

UNCOV
418
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
×
UNCOV
419
    if (code) {
×
420
      return code;
×
421
    }
UNCOV
422
    taosArrayDestroy(pUidList);
×
UNCOV
423
    *(SArray**)p = NULL;
×
424
  }
425
  
UNCOV
426
  return TSDB_CODE_SUCCESS;
×
427
}
428

429

430
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
431
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
432
  if (NULL == pUidList) {
×
433
    return terrno;
×
434
  }
435
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
436
    return terrno;
×
437
  }
438

439
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
440
  taosArrayDestroy(pUidList);
×
441
  if (code) {
×
442
    return code;
×
443
  }
444
  
445
  return TSDB_CODE_SUCCESS;
×
446
}
447

UNCOV
448
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
×
UNCOV
449
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
×
UNCOV
450
  SOperatorParam*             pSrcParam0 = NULL;
×
UNCOV
451
  SOperatorParam*             pSrcParam1 = NULL;
×
UNCOV
452
  SOperatorParam*             pGcParam0 = NULL;
×
UNCOV
453
  SOperatorParam*             pGcParam1 = NULL;  
×
UNCOV
454
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
×
UNCOV
455
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
×
UNCOV
456
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
×
UNCOV
457
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
×
UNCOV
458
  int32_t                     code = TSDB_CODE_SUCCESS;
×
459

UNCOV
460
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
×
461
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
462

UNCOV
463
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
×
464
  
UNCOV
465
  if (pInfo->stbJoin.basic.batchFetch) {
×
UNCOV
466
    if (pPrev->leftHash) {
×
UNCOV
467
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
×
UNCOV
468
      if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
469
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
×
470
      }
UNCOV
471
      if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
472
        tSimpleHashCleanup(pPrev->leftHash);
×
UNCOV
473
        tSimpleHashCleanup(pPrev->rightHash);
×
UNCOV
474
        pPrev->leftHash = NULL;
×
UNCOV
475
        pPrev->rightHash = NULL;
×
476
      }
477
    }
478
  } else {
UNCOV
479
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
×
UNCOV
480
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
481
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
×
482
    }
483
  }
484

UNCOV
485
  bool initParam = pSrcParam0 ? true : false;
×
UNCOV
486
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
487
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
×
UNCOV
488
    pSrcParam0 = NULL;
×
489
  }
UNCOV
490
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
491
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
×
UNCOV
492
    pSrcParam1 = NULL;
×
493
  }
UNCOV
494
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
495
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
×
496
  }
UNCOV
497
  if (TSDB_CODE_SUCCESS != code) {
×
498
    if (pSrcParam0) {
×
499
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
500
    }
501
    if (pSrcParam1) {
×
502
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
503
    }
504
    if (pGcParam0) {
×
505
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
506
    }
507
    if (pGcParam1) {
×
508
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
509
    }
510
    if (*ppParam) {
×
511
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
512
      *ppParam = NULL;
×
513
    }
514
  }
515
  
UNCOV
516
  return code;
×
517
}
518

519

UNCOV
520
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
521
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
522
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
UNCOV
523
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
×
UNCOV
524
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
×
UNCOV
525
  SOperatorParam*            pParam = NULL;
×
UNCOV
526
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
×
UNCOV
527
  if (TSDB_CODE_SUCCESS != code) {
×
528
    pOperator->pTaskInfo->code = code;
×
529
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
530
  }
531

UNCOV
532
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
×
UNCOV
533
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
×
UNCOV
534
  if (*ppRes && (code == 0)) {
×
UNCOV
535
    code = blockDataCheck(*ppRes);
×
UNCOV
536
    if (code) {
×
537
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
538
      pOperator->pTaskInfo->code = code;
×
539
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
540
    }
UNCOV
541
    pPost->isStarted = true;
×
UNCOV
542
    pStbJoin->execInfo.postBlkNum++;
×
UNCOV
543
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
×
UNCOV
544
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
×
545
  } else {
UNCOV
546
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
×
547
  }
UNCOV
548
}
×
549

550

551
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
552
  SOperatorParam* pGcParam = NULL;
×
553
  SOperatorParam* pMergeJoinParam = NULL;
×
554
  int32_t         downstreamId = leftTable ? 0 : 1;
×
555
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
556
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
557

558
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
559

560
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
561
  if (TSDB_CODE_SUCCESS != code) {
×
562
    return code;
×
563
  }
564
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
565
  if (TSDB_CODE_SUCCESS != code) {
×
566
    return code;
×
567
  }
568

569
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
570
}
571

UNCOV
572
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
×
UNCOV
573
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
×
UNCOV
574
  int32_t code = 0;
×
575
  
UNCOV
576
  pPost->isStarted = false;
×
577
  
UNCOV
578
  if (pStbJoin->basic.batchFetch) {
×
UNCOV
579
    return TSDB_CODE_SUCCESS;
×
580
  }
581
  
UNCOV
582
  if (pPost->leftNeedCache) {
×
583
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
584
    if (num && --(*num) <= 0) {
×
585
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
586
      if (code) {
×
587
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
588
        QRY_ERR_RET(code);
×
589
      }
590
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
591
    }
592
  }
593
  
UNCOV
594
  if (!pPost->rightNeedCache) {
×
UNCOV
595
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
UNCOV
596
    if (NULL != v) {
×
597
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
598
      if (code) {
×
599
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
600
        QRY_ERR_RET(code);
×
601
      }
602
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
603
    }
604
  }
605

UNCOV
606
  return TSDB_CODE_SUCCESS;
×
607
}
608

609

610
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
UNCOV
611
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
612
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
×
UNCOV
613
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
×
614

UNCOV
615
  if (!pPost->isStarted) {
×
UNCOV
616
    return TSDB_CODE_SUCCESS;
×
617
  }
618
  
UNCOV
619
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
×
620
  
UNCOV
621
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
×
UNCOV
622
  if (NULL == *ppRes) {
×
UNCOV
623
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
×
UNCOV
624
    pPrev->pListHead->readIdx++;
×
625
  } else {
626
    pInfo->stbJoin.execInfo.postBlkNum++;
×
627
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
628
  }
629

UNCOV
630
  return TSDB_CODE_SUCCESS;
×
631
}
632

633
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
UNCOV
634
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
×
UNCOV
635
  if (NULL == ppArray) {
×
UNCOV
636
    SArray* pArray = taosArrayInit(10, valSize);
×
UNCOV
637
    if (NULL == pArray) {
×
638
      return terrno;
×
639
    }
UNCOV
640
    if (NULL == taosArrayPush(pArray, pVal)) {
×
641
      taosArrayDestroy(pArray);
×
642
      return terrno;
×
643
    }
UNCOV
644
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
×
645
      taosArrayDestroy(pArray);      
×
646
      return terrno;
×
647
    }
UNCOV
648
    return TSDB_CODE_SUCCESS;
×
649
  }
650

UNCOV
651
  if (NULL == taosArrayPush(*ppArray, pVal)) {
×
652
    return terrno;
×
653
  }
654
  
UNCOV
655
  return TSDB_CODE_SUCCESS;
×
656
}
657

658
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
UNCOV
659
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
660
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
×
UNCOV
661
  if (NULL == pNum) {
×
UNCOV
662
    uint32_t n = 1;
×
UNCOV
663
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
×
UNCOV
664
    if (code) {
×
665
      return code;
×
666
    }
UNCOV
667
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
×
UNCOV
668
    if (code) {
×
669
      return code;
×
670
    }
UNCOV
671
    return TSDB_CODE_SUCCESS;
×
672
  }
673

674
  switch (*pNum) {
×
675
    case 0:
×
676
      break;
×
677
    case UINT32_MAX:
×
678
      *pNum = 0;
×
679
      break;
×
680
    default:
×
681
      if (1 == (*pNum)) {
×
682
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
683
        if (code) {
×
684
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
685
          QRY_ERR_RET(code);
×
686
        }
687
      }
688
      (*pNum)++;
×
689
      break;
×
690
  }
691
  
692
  return TSDB_CODE_SUCCESS;
×
693
}
694

695

UNCOV
696
static void freeStbJoinTableList(SStbJoinTableList* pList) {
×
UNCOV
697
  if (NULL == pList) {
×
698
    return;
×
699
  }
UNCOV
700
  taosMemoryFree(pList->pLeftVg);
×
UNCOV
701
  taosMemoryFree(pList->pLeftUid);
×
UNCOV
702
  taosMemoryFree(pList->pRightVg);
×
UNCOV
703
  taosMemoryFree(pList->pRightUid);
×
UNCOV
704
  taosMemoryFree(pList);
×
705
}
706

UNCOV
707
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
×
UNCOV
708
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
709
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
×
UNCOV
710
  if (NULL == pNew) {
×
711
    return terrno;
×
712
  }
UNCOV
713
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
×
UNCOV
714
  if (NULL == pNew->pLeftVg) {
×
715
    code = terrno;
×
716
    freeStbJoinTableList(pNew);
×
717
    return code;
×
718
  }
UNCOV
719
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
×
UNCOV
720
  if (NULL == pNew->pLeftUid) {
×
721
    code = terrno;
×
722
    freeStbJoinTableList(pNew);
×
723
    return code;
×
724
  }
UNCOV
725
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
×
UNCOV
726
  if (NULL == pNew->pRightVg) {
×
727
    code = terrno;
×
728
    freeStbJoinTableList(pNew);
×
729
    return code;
×
730
  }
UNCOV
731
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
×
UNCOV
732
  if (NULL == pNew->pRightUid) {
×
733
    code = terrno;
×
734
    freeStbJoinTableList(pNew);
×
735
    return code;
×
736
  }
737

UNCOV
738
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
×
UNCOV
739
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
×
UNCOV
740
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
×
UNCOV
741
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
×
742

UNCOV
743
  pNew->readIdx = 0;
×
UNCOV
744
  pNew->uidNum = rows;
×
UNCOV
745
  pNew->pNext = NULL;
×
746
  
UNCOV
747
  if (pCtx->pListTail) {
×
748
    pCtx->pListTail->pNext = pNew;
×
749
    pCtx->pListTail = pNew;
×
750
  } else {
UNCOV
751
    pCtx->pListHead = pNew;
×
UNCOV
752
    pCtx->pListTail= pNew;
×
753
  }
754

UNCOV
755
  return TSDB_CODE_SUCCESS;
×
756
}
757

UNCOV
758
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
×
UNCOV
759
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
760
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
761
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
UNCOV
762
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
×
UNCOV
763
  if (NULL == pVg0) {
×
764
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
765
  }
UNCOV
766
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
×
UNCOV
767
  if (NULL == pVg1) {
×
768
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
769
  }
UNCOV
770
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
×
UNCOV
771
  if (NULL == pUid0) {
×
772
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
773
  }
UNCOV
774
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
×
UNCOV
775
  if (NULL == pUid1) {
×
776
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
777
  }
778

UNCOV
779
  if (pStbJoin->basic.batchFetch) {
×
UNCOV
780
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
UNCOV
781
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
×
UNCOV
782
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
×
UNCOV
783
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
×
UNCOV
784
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
×
785

UNCOV
786
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
×
UNCOV
787
      if (TSDB_CODE_SUCCESS != code) {
×
788
        break;
×
789
      }
UNCOV
790
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
×
UNCOV
791
      if (TSDB_CODE_SUCCESS != code) {
×
792
        break;
×
793
      }
794
    }
795
  } else {
UNCOV
796
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
×
UNCOV
797
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
×
798
    
UNCOV
799
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
×
UNCOV
800
      if (TSDB_CODE_SUCCESS != code) {
×
801
        break;
×
802
      }
803
    }
804
  }
805

UNCOV
806
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
807
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
×
UNCOV
808
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
809
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
×
810
    }
811
  }
812

813
_return:
×
814

UNCOV
815
  if (TSDB_CODE_SUCCESS != code) {
×
816
    pOperator->pTaskInfo->code = code;
×
817
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
818
  }
UNCOV
819
}
×
820

821

UNCOV
822
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
×
UNCOV
823
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
824
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
825

UNCOV
826
  if (pStbJoin->basic.batchFetch) {
×
UNCOV
827
    return;
×
828
  }
829

UNCOV
830
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
×
UNCOV
831
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
×
UNCOV
832
    return;
×
833
  }
834

835
  uint64_t* pUid = NULL;
×
836
  int32_t iter = 0;
×
837
  int32_t code = 0;
×
838
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
839
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
840
    if (code) {
×
841
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
842
    }
843
  }
844

845
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
846
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
847

848
/*
849
  // debug only
850
  iter = 0;
851
  uint32_t* num = NULL;
852
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
853
    A S S E R T(*num > 1);
854
  }
855
*/  
856
}
857

UNCOV
858
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
×
UNCOV
859
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
860
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
861

UNCOV
862
  while (true) {
×
UNCOV
863
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
UNCOV
864
    if (NULL == pBlock) {
×
UNCOV
865
      break;
×
866
    }
867

UNCOV
868
    pStbJoin->execInfo.prevBlkNum++;
×
UNCOV
869
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
×
870
    
UNCOV
871
    doBuildStbJoinTableHash(pOperator, pBlock);
×
872
  }
873

UNCOV
874
  postProcessStbJoinTableHash(pOperator);
×
875

UNCOV
876
  pStbJoin->ctx.prev.joinBuild = true;
×
UNCOV
877
}
×
878

UNCOV
879
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
×
UNCOV
880
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
881
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
UNCOV
882
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
×
UNCOV
883
  SStbJoinTableList*         pNode = pPrev->pListHead;
×
884

UNCOV
885
  while (pNode) {
×
UNCOV
886
    if (pNode->readIdx >= pNode->uidNum) {
×
UNCOV
887
      pPrev->pListHead = pNode->pNext;
×
UNCOV
888
      freeStbJoinTableList(pNode);
×
UNCOV
889
      pNode = pPrev->pListHead;
×
UNCOV
890
      continue;
×
891
    }
892
    
UNCOV
893
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
×
UNCOV
894
    if (*ppRes) {
×
UNCOV
895
      return TSDB_CODE_SUCCESS;
×
896
    }
897

UNCOV
898
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
×
UNCOV
899
    pPrev->pListHead->readIdx++;
×
900
  }
901

UNCOV
902
  *ppRes = NULL;
×
UNCOV
903
  setOperatorCompleted(pOperator);
×
904

UNCOV
905
  return TSDB_CODE_SUCCESS;
×
906
}
907

UNCOV
908
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
×
UNCOV
909
  if (pBlock) {
×
UNCOV
910
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
×
UNCOV
911
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
×
UNCOV
912
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
×
913

UNCOV
914
      for (int i = pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
×
UNCOV
915
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
×
UNCOV
916
        if (pSlot == NULL) {
×
917
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
918
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
919
        }
UNCOV
920
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
×
UNCOV
921
        colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
×
UNCOV
922
        int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
UNCOV
923
        if (code != TSDB_CODE_SUCCESS) {
×
924
          return code;
×
925
        }
926
      }
927
    } else {
928
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
929
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
930
    }
931
  }
UNCOV
932
  return TSDB_CODE_SUCCESS;
×
933
}
934

UNCOV
935
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
×
UNCOV
936
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
937
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
×
UNCOV
938
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
×
939

UNCOV
940
  QRY_PARAM_CHECK(pRes);
×
UNCOV
941
  if (pOperator->status == OP_EXEC_DONE) {
×
UNCOV
942
    return code;
×
943
  }
944

UNCOV
945
  int64_t st = 0;
×
UNCOV
946
  if (pOperator->cost.openCost == 0) {
×
UNCOV
947
    st = taosGetTimestampUs();
×
948
  }
949

UNCOV
950
  if (!pStbJoin->ctx.prev.joinBuild) {
×
UNCOV
951
    buildStbJoinTableList(pOperator);
×
UNCOV
952
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
×
UNCOV
953
      setOperatorCompleted(pOperator);
×
UNCOV
954
      goto _return;
×
955
    }
956
  }
957

UNCOV
958
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
×
UNCOV
959
  if (*pRes) {
×
960
    goto _return;
×
961
  }
962

UNCOV
963
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
×
964

UNCOV
965
_return:
×
UNCOV
966
  if (pOperator->cost.openCost == 0) {
×
UNCOV
967
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
×
968
  }
969

UNCOV
970
  if (code) {
×
971
    qError("%s failed since %s", __func__, tstrerror(code));
×
972
    pOperator->pTaskInfo->code = code;
×
973
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
974
  } else {
UNCOV
975
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
×
976
  }
UNCOV
977
  return code;
×
978
}
979

UNCOV
980
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
×
UNCOV
981
  if (batchFetch) {
×
UNCOV
982
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
983
    if (NULL == pPrev->leftHash) {
×
984
      return terrno;
×
985
    }
UNCOV
986
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
×
UNCOV
987
    if (NULL == pPrev->rightHash) {
×
988
      return terrno;
×
989
    }
990
  } else {
UNCOV
991
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
UNCOV
992
    if (NULL == pPrev->leftCache) {
×
993
      return terrno;
×
994
    }
UNCOV
995
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
UNCOV
996
    if (NULL == pPrev->rightCache) {
×
997
      return terrno;
×
998
    }
UNCOV
999
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
×
UNCOV
1000
    if (NULL == pPrev->onceTable) {
×
1001
      return terrno;
×
1002
    }
1003
  }
1004

UNCOV
1005
  return TSDB_CODE_SUCCESS;
×
1006
}
1007

UNCOV
1008
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
×
1009
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1010
                                       SOperatorInfo** pOptrInfo) {
UNCOV
1011
  QRY_PARAM_CHECK(pOptrInfo);
×
1012

UNCOV
1013
  int32_t                    code = TSDB_CODE_SUCCESS;
×
UNCOV
1014
  __optr_fn_t                nextFp = NULL;
×
UNCOV
1015
  SOperatorInfo*             pOperator = NULL;
×
UNCOV
1016
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
×
UNCOV
1017
  if (pInfo == NULL) {
×
1018
    code = terrno;
×
1019
    goto _error;
×
1020
  }
1021

UNCOV
1022
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
×
UNCOV
1023
  if (pOperator == NULL) {
×
1024
    code = terrno;
×
1025
    goto _error;
×
1026
  }
1027

UNCOV
1028
  pTaskInfo->dynamicTask = pPhyciNode->node.dynamicOp;
×
1029

UNCOV
1030
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
×
UNCOV
1031
  if (TSDB_CODE_SUCCESS != code) {
×
1032
    goto _error;
×
1033
  }
1034

UNCOV
1035
  pInfo->qType = pPhyciNode->qType;
×
UNCOV
1036
  switch (pInfo->qType) {
×
UNCOV
1037
    case DYN_QTYPE_STB_HASH:
×
UNCOV
1038
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
×
UNCOV
1039
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
×
UNCOV
1040
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
×
UNCOV
1041
      if (TSDB_CODE_SUCCESS != code) {
×
1042
        goto _error;
×
1043
      }
UNCOV
1044
      nextFp = seqStableJoin;
×
UNCOV
1045
      break;
×
1046
    default:
×
1047
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1048
      code = TSDB_CODE_INVALID_PARA;
×
1049
      goto _error;
×
1050
  }
1051

UNCOV
1052
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
×
1053
                  pInfo, pTaskInfo);
1054

UNCOV
1055
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
×
1056
                                         NULL, optrDefaultGetNextExtFn, NULL);
1057

UNCOV
1058
  *pOptrInfo = pOperator;
×
UNCOV
1059
  return TSDB_CODE_SUCCESS;
×
1060

1061
_error:
×
1062
  if (pInfo != NULL) {
×
1063
    destroyDynQueryCtrlOperator(pInfo);
×
1064
  }
1065

1066
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1067
  pTaskInfo->code = code;
×
1068
  return code;
×
1069
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc