• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3579

12 Jan 2025 03:09AM UTC coverage: 62.976% (-0.2%) from 63.183%
#3579

push

travis-ci

web-flow
Merge pull request #29551 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

139324 of 284527 branches covered (48.97%)

Branch coverage included in aggregate %.

34 of 50 new or added lines in 4 files covered. (68.0%)

1114 existing lines in 141 files now uncovered.

217258 of 281694 relevant lines covered (77.13%)

9262344.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.64
/source/libs/executor/src/dynqueryctrloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executorInt.h"
17
#include "filter.h"
18
#include "function.h"
19
#include "nodes.h"
20
#include "operator.h"
21
#include "os.h"
22
#include "plannodes.h"
23
#include "query.h"
24
#include "querynodes.h"
25
#include "querytask.h"
26
#include "tarray.h"
27
#include "tcompare.h"
28
#include "tdatablock.h"
29
#include "thash.h"
30
#include "tmsg.h"
31
#include "ttypes.h"
32
#include "dynqueryctrl.h"
33

34
int64_t gSessionId = 0;
35

36
void freeVgTableList(void* ptr) { 
×
37
  taosArrayDestroy(*(SArray**)ptr); 
×
38
}
×
39

40
static void destroyStbJoinTableList(SStbJoinTableList* pListHead) {
11,035✔
41
  SStbJoinTableList* pNext = NULL;
11,035✔
42
  
43
  while (pListHead) {
11,036✔
44
    taosMemoryFree(pListHead->pLeftVg);
1!
45
    taosMemoryFree(pListHead->pLeftUid);
1!
46
    taosMemoryFree(pListHead->pRightVg);
1!
47
    taosMemoryFree(pListHead->pRightUid);
1!
48
    pNext = pListHead->pNext;
1✔
49
    taosMemoryFree(pListHead);
1!
50
    pListHead = pNext;
1✔
51
  }
52
}
11,035✔
53

54
static void destroyStbJoinDynCtrlInfo(SStbJoinDynCtrlInfo* pStbJoin) {
11,035✔
55
  qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, 
11,035✔
56
         pStbJoin->execInfo.prevBlkNum, pStbJoin->execInfo.prevBlkRows, pStbJoin->execInfo.postBlkNum, 
57
         pStbJoin->execInfo.postBlkRows, pStbJoin->execInfo.leftCacheNum, pStbJoin->execInfo.rightCacheNum);
58

59
  if (pStbJoin->basic.batchFetch) {
11,035✔
60
    if (pStbJoin->ctx.prev.leftHash) {
11,023✔
61
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.leftHash, freeVgTableList);
10,720✔
62
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftHash);
10,720✔
63
    }
64
    if (pStbJoin->ctx.prev.rightHash) {
11,023✔
65
      tSimpleHashSetFreeFp(pStbJoin->ctx.prev.rightHash, freeVgTableList);
10,720✔
66
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightHash);
10,720✔
67
    }
68
  } else {
69
    if (pStbJoin->ctx.prev.leftCache) {
12!
70
      tSimpleHashCleanup(pStbJoin->ctx.prev.leftCache);
12✔
71
    }
72
    if (pStbJoin->ctx.prev.rightCache) {
12!
73
      tSimpleHashCleanup(pStbJoin->ctx.prev.rightCache);
12✔
74
    }
75
    if (pStbJoin->ctx.prev.onceTable) {
12!
76
      tSimpleHashCleanup(pStbJoin->ctx.prev.onceTable);
12✔
77
    }
78
  }
79

80
  destroyStbJoinTableList(pStbJoin->ctx.prev.pListHead);
11,035✔
81
}
11,035✔
82

83
static void destroyDynQueryCtrlOperator(void* param) {
11,035✔
84
  SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param;
11,035✔
85

86
  switch (pDyn->qType) {
11,035!
87
    case DYN_QTYPE_STB_HASH:
11,035✔
88
      destroyStbJoinDynCtrlInfo(&pDyn->stbJoin);
11,035✔
89
      break;
11,035✔
90
    default:
×
91
      qError("unsupported dynamic query ctrl type: %d", pDyn->qType);
×
92
      break;
×
93
  }
94

95
  taosMemoryFreeClear(param);
11,035!
96
}
11,035✔
97

98
static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) {
99
  if (batchFetch) {
42,082✔
100
    return true;
42,034✔
101
  }
102
  
103
  if (rightTable) {
48!
104
    return pPost->rightCurrUid == pPost->rightNextUid;
24✔
105
  }
106

107
  uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid));
24✔
108

109
  return (NULL == num) ? false : true;
24✔
110
}
111

112
static int32_t updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo*          pStbJoin) {
21,041✔
113
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
21,041✔
114
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
21,041✔
115
  SStbJoinTableList*         pNode = pPrev->pListHead;
21,041✔
116
  int32_t*                   leftVgId = pNode->pLeftVg + pNode->readIdx;
21,041✔
117
  int32_t*                   rightVgId = pNode->pRightVg + pNode->readIdx;
21,041✔
118
  int64_t*                   leftUid = pNode->pLeftUid + pNode->readIdx;
21,041✔
119
  int64_t*                   rightUid = pNode->pRightUid + pNode->readIdx;
21,041✔
120
  int64_t                    readIdx = pNode->readIdx + 1;
21,041✔
121
  int64_t                    rightPrevUid = pPost->rightCurrUid;
21,041✔
122

123
  pPost->leftCurrUid = *leftUid;
21,041✔
124
  pPost->rightCurrUid = *rightUid;
21,041✔
125

126
  pPost->leftVgId = *leftVgId;
21,041✔
127
  pPost->rightVgId = *rightVgId;
21,041✔
128

129
  while (true) {
130
    if (readIdx < pNode->uidNum) {
21,041✔
131
      pPost->rightNextUid = *(pNode->pRightUid + readIdx);
20,727✔
132
      break;
20,727✔
133
    }
134
    
135
    pNode = pNode->pNext;
314✔
136
    if (NULL == pNode) {
314!
137
      pPost->rightNextUid = 0;
314✔
138
      break;
314✔
139
    }
140
    
141
    rightUid = pNode->pRightUid;
×
142
    readIdx = 0;
×
143
  }
144

145
  pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch);
21,041✔
146
  pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch);
21,041✔
147

148
  if (!pStbJoin->basic.batchFetch && pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) {
21,041!
149
    QRY_ERR_RET(tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0));
×
150
    pStbJoin->execInfo.rightCacheNum++;
×
151
  }  
152

153
  return TSDB_CODE_SUCCESS;
21,041✔
154
}
155

156

157
static int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) {
42,082✔
158
  int32_t code = TSDB_CODE_SUCCESS;
42,082✔
159
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
42,082!
160
  if (NULL == *ppRes) {
42,082!
161
    code = terrno;
×
162
    freeOperatorParam(pChild, OP_GET_PARAM);
×
163
    return code;
×
164
  }
165
  if (pChild) {
42,082✔
166
    (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES);
654✔
167
    if (NULL == (*ppRes)->pChildren) {
654!
168
      code = terrno;
×
169
      freeOperatorParam(pChild, OP_GET_PARAM);
×
170
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
171
      *ppRes = NULL;
×
172
      return code;
×
173
    }
174
    if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) {
1,308!
175
      code = terrno;
×
176
      freeOperatorParam(pChild, OP_GET_PARAM);
×
177
      freeOperatorParam(*ppRes, OP_GET_PARAM);
×
178
      *ppRes = NULL;
×
179
      return code;
×
180
    }
181
  } else {
182
    (*ppRes)->pChildren = NULL;
41,428✔
183
  }
184

185
  SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam));
42,082!
186
  if (NULL == pGc) {
42,082!
187
    code = terrno;
×
188
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
189
    *ppRes = NULL;
×
190
    return code;
×
191
  }
192

193
  pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1);
42,082✔
194
  pGc->downstreamIdx = downstreamIdx;
42,082✔
195
  pGc->vgId = vgId;
42,082✔
196
  pGc->tbUid = tbUid;
42,082✔
197
  pGc->needCache = needCache;
42,082✔
198

199
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
42,082✔
200
  (*ppRes)->downstreamIdx = downstreamIdx;
42,082✔
201
  (*ppRes)->value = pGc;
42,082✔
202

203
  return TSDB_CODE_SUCCESS;
42,082✔
204
}
205

206

207
static int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) {
×
208
  int32_t code = TSDB_CODE_SUCCESS;
×
209
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
210
  if (NULL == *ppRes) {
×
211
    return terrno;
×
212
  }
213
  (*ppRes)->pChildren = NULL;
×
214

215
  SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam));
×
216
  if (NULL == pGc) {
×
217
    code = terrno;
×
218
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
219
    return code;
×
220
  }
221

222
  pGc->downstreamIdx = downstreamIdx;
×
223
  pGc->vgId = vgId;
×
224
  pGc->tbUid = tbUid;
×
225

226
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE;
×
227
  (*ppRes)->downstreamIdx = downstreamIdx;
×
228
  (*ppRes)->value = pGc;
×
229

230
  return TSDB_CODE_SUCCESS;
×
231
}
232

233

234
static int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
48✔
235
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
48!
236
  if (NULL == *ppRes) {
48!
237
    return terrno;
×
238
  }
239
  (*ppRes)->pChildren = NULL;
48✔
240
  
241
  SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam));
48!
242
  if (NULL == pExc) {
48!
243
    return terrno;
×
244
  }
245

246
  pExc->multiParams = false;
48✔
247
  pExc->basic.vgId = *pVgId;
48✔
248
  pExc->basic.tableSeq = true;
48✔
249
  pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
48✔
250
  pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t));
48✔
251
  if (NULL == pExc->basic.uidList) {
48!
252
    taosMemoryFree(pExc);
×
253
    return terrno;
×
254
  }
255
  if (NULL == taosArrayPush(pExc->basic.uidList, pUid)) {
96!
256
    taosArrayDestroy(pExc->basic.uidList);
×
257
    taosMemoryFree(pExc);
×
258
    return terrno;
×
259
  }
260

261
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
48✔
262
  (*ppRes)->downstreamIdx = downstreamIdx;
48✔
263
  (*ppRes)->value = pExc;
48✔
264
  
265
  return TSDB_CODE_SUCCESS;
48✔
266
}
267

268

269
static int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
557✔
270
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
557!
271
  if (NULL == *ppRes) {
557!
272
    return terrno;
×
273
  }
274
  (*ppRes)->pChildren = NULL;
557✔
275
  
276
  SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam));
557!
277
  if (NULL == pExc) {
557!
278
    taosMemoryFreeClear(*ppRes);
×
279
    return terrno;
×
280
  }
281

282
  pExc->multiParams = true;
557✔
283
  pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
557✔
284
  if (NULL == pExc->pBatchs) {
557!
285
    taosMemoryFree(pExc);
×
286
    taosMemoryFreeClear(*ppRes);
×
287
    return terrno;
×
288
  }
289
  tSimpleHashSetFreeFp(pExc->pBatchs, freeExchangeGetBasicOperatorParam);
557✔
290
  
291
  SExchangeOperatorBasicParam basic;
292
  basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
557✔
293

294
  int32_t iter = 0;
557✔
295
  void* p = NULL;
557✔
296
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
1,483✔
297
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
926✔
298
    SArray* pUidList = *(SArray**)p;
926✔
299
    basic.vgId = *pVgId;
926✔
300
    basic.uidList = pUidList;
926✔
301
    basic.tableSeq = false;
926✔
302
    
303
    QRY_ERR_RET(tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)));   
926!
304

305
    qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
926!
306
    *(SArray**)p = NULL;
926✔
307
  }
308

309
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
557✔
310
  (*ppRes)->downstreamIdx = downstreamIdx;
557✔
311
  (*ppRes)->value = pExc;
557✔
312
  
313
  return TSDB_CODE_SUCCESS;
557✔
314
}
315

316

317
static int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, bool initParam, SOperatorParam** ppChild0, SOperatorParam** ppChild1) {
21,041✔
318
  int32_t code = TSDB_CODE_SUCCESS;
21,041✔
319
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
21,041!
320
  if (NULL == *ppRes) {
21,041!
321
    code = terrno;
×
322
    return code;
×
323
  }
324
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
21,041✔
325
  if (NULL == (*ppRes)->pChildren) {
21,041!
326
    code = terrno;
×
327
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
328
    *ppRes = NULL;
×
329
    return code;
×
330
  }
331
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild0)) {
42,082!
332
    code = terrno;
×
333
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
334
    *ppRes = NULL;
×
335
    return code;
×
336
  }
337
  *ppChild0 = NULL;
21,041✔
338
  if (NULL == taosArrayPush((*ppRes)->pChildren, ppChild1)) {
42,082!
339
    code = terrno;
×
340
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
341
    *ppRes = NULL;
×
342
    return code;
×
343
  }
344
  *ppChild1 = NULL;
21,041✔
345
  
346
  SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam));
21,041!
347
  if (NULL == pJoin) {
21,041!
348
    code = terrno;
×
349
    freeOperatorParam(*ppRes, OP_GET_PARAM);
×
350
    *ppRes = NULL;
×
351
    return code;
×
352
  }
353

354
  pJoin->initDownstream = initParam;
21,041✔
355
  
356
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
21,041✔
357
  (*ppRes)->value = pJoin;
21,041✔
358

359
  return TSDB_CODE_SUCCESS;
21,041✔
360
}
361

362

363
static int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) {
×
364
  int32_t code = TSDB_CODE_SUCCESS;
×
365
  *ppRes = taosMemoryMalloc(sizeof(SOperatorParam));
×
366
  if (NULL == *ppRes) {
×
367
    code = terrno;
×
368
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
369
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
370
    return code;
×
371
  }
372
  (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES);
×
373
  if (NULL == *ppRes) {
×
374
    code = terrno;
×
375
    taosMemoryFreeClear(*ppRes);
×
376
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
377
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
378
    return code;
×
379
  }
380
  if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) {
×
381
    code = terrno;
×
382
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
383
    freeOperatorParam(pChild0, OP_NOTIFY_PARAM);
×
384
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
385
    *ppRes = NULL;
×
386
    return code;
×
387
  }
388
  if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) {
×
389
    code = terrno;
×
390
    freeOperatorParam(*ppRes, OP_NOTIFY_PARAM);
×
391
    freeOperatorParam(pChild1, OP_NOTIFY_PARAM);
×
392
    *ppRes = NULL;
×
393
    return code;
×
394
  }
395
  
396
  (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
×
397
  (*ppRes)->value = NULL;
×
398

399
  return TSDB_CODE_SUCCESS;
×
400
}
401

402

403

404
static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) {
49✔
405
  int32_t code = TSDB_CODE_SUCCESS;
49✔
406
  int32_t vgNum = tSimpleHashGetSize(pVg);
49✔
407
  if (vgNum <= 0 || vgNum > 1) {
49!
408
    qError("Invalid vgroup num %d to build table scan operator param", vgNum);
×
409
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
410
  }
411

412
  int32_t iter = 0;
49✔
413
  void* p = NULL;
49✔
414
  while (NULL != (p = tSimpleHashIterate(pVg, p, &iter))) {
98✔
415
    int32_t* pVgId = tSimpleHashGetKey(p, NULL);
49✔
416
    SArray* pUidList = *(SArray**)p;
49✔
417

418
    code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false);
49✔
419
    if (code) {
49!
420
      return code;
×
421
    }
422
    taosArrayDestroy(pUidList);
49✔
423
    *(SArray**)p = NULL;
49✔
424
  }
425
  
426
  return TSDB_CODE_SUCCESS;
49✔
427
}
428

429

430
static int32_t buildSingleTableScanOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) {
×
431
  SArray* pUidList = taosArrayInit(1, sizeof(int64_t));
×
432
  if (NULL == pUidList) {
×
433
    return terrno;
×
434
  }
435
  if (NULL == taosArrayPush(pUidList, pUid)) {
×
436
    return terrno;
×
437
  }
438

439
  int32_t code = buildTableScanOperatorParam(ppRes, pUidList, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, true);
×
440
  taosArrayDestroy(pUidList);
×
441
  if (code) {
×
442
    return code;
×
443
  }
444
  
445
  return TSDB_CODE_SUCCESS;
×
446
}
447

448
static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) {
21,041✔
449
  int64_t                     rowIdx = pPrev->pListHead->readIdx;
21,041✔
450
  SOperatorParam*             pSrcParam0 = NULL;
21,041✔
451
  SOperatorParam*             pSrcParam1 = NULL;
21,041✔
452
  SOperatorParam*             pGcParam0 = NULL;
21,041✔
453
  SOperatorParam*             pGcParam1 = NULL;  
21,041✔
454
  int32_t*                    leftVg = pPrev->pListHead->pLeftVg + rowIdx;
21,041✔
455
  int64_t*                    leftUid = pPrev->pListHead->pLeftUid + rowIdx;
21,041✔
456
  int32_t*                    rightVg = pPrev->pListHead->pRightVg + rowIdx;
21,041✔
457
  int64_t*                    rightUid = pPrev->pListHead->pRightUid + rowIdx;
21,041✔
458
  int32_t                     code = TSDB_CODE_SUCCESS;
21,041✔
459

460
  qDebug("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, 
21,041✔
461
      rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid);
462

463
  QRY_ERR_RET(updatePostJoinCurrTableInfo(&pInfo->stbJoin));
21,041!
464
  
465
  if (pInfo->stbJoin.basic.batchFetch) {
21,041✔
466
    if (pPrev->leftHash) {
21,017✔
467
      code = pInfo->stbJoin.basic.srcScan[0] ? buildBatchTableScanOperatorParam(&pSrcParam0, 0, pPrev->leftHash) : buildBatchExchangeOperatorParam(&pSrcParam0, 0, pPrev->leftHash);
303✔
468
      if (TSDB_CODE_SUCCESS == code) {
303!
469
        code = pInfo->stbJoin.basic.srcScan[1] ? buildBatchTableScanOperatorParam(&pSrcParam1, 1, pPrev->rightHash) : buildBatchExchangeOperatorParam(&pSrcParam1, 1, pPrev->rightHash);
303✔
470
      }
471
      if (TSDB_CODE_SUCCESS == code) {
303!
472
        tSimpleHashCleanup(pPrev->leftHash);
303✔
473
        tSimpleHashCleanup(pPrev->rightHash);
303✔
474
        pPrev->leftHash = NULL;
303✔
475
        pPrev->rightHash = NULL;
303✔
476
      }
477
    }
478
  } else {
479
    code = pInfo->stbJoin.basic.srcScan[0] ? buildSingleTableScanOperatorParam(&pSrcParam0, 0, leftVg, leftUid) : buildExchangeOperatorParam(&pSrcParam0, 0, leftVg, leftUid);
24!
480
    if (TSDB_CODE_SUCCESS == code) {
24!
481
      code = pInfo->stbJoin.basic.srcScan[1] ? buildSingleTableScanOperatorParam(&pSrcParam1, 1, rightVg, rightUid) : buildExchangeOperatorParam(&pSrcParam1, 1, rightVg, rightUid);
24!
482
    }
483
  }
484

485
  bool initParam = pSrcParam0 ? true : false;
21,041✔
486
  if (TSDB_CODE_SUCCESS == code) {
21,041!
487
    code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
21,041✔
488
    pSrcParam0 = NULL;
21,041✔
489
  }
490
  if (TSDB_CODE_SUCCESS == code) {
21,041!
491
    code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pSrcParam1);
21,041✔
492
    pSrcParam1 = NULL;
21,041✔
493
  }
494
  if (TSDB_CODE_SUCCESS == code) {
21,041!
495
    code = buildMergeJoinOperatorParam(ppParam, initParam, &pGcParam0, &pGcParam1);
21,041✔
496
  }
497
  if (TSDB_CODE_SUCCESS != code) {
21,041!
498
    if (pSrcParam0) {
×
499
      freeOperatorParam(pSrcParam0, OP_GET_PARAM);
×
500
    }
501
    if (pSrcParam1) {
×
502
      freeOperatorParam(pSrcParam1, OP_GET_PARAM);
×
503
    }
504
    if (pGcParam0) {
×
505
      freeOperatorParam(pGcParam0, OP_GET_PARAM);
×
506
    }
507
    if (pGcParam1) {
×
508
      freeOperatorParam(pGcParam1, OP_GET_PARAM);
×
509
    }
510
    if (*ppParam) {
×
511
      freeOperatorParam(*ppParam, OP_GET_PARAM);
×
512
      *ppParam = NULL;
×
513
    }
514
  }
515
  
516
  return code;
21,041✔
517
}
518

519

520
static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
21,041✔
521
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
21,041✔
522
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
21,041✔
523
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
21,041✔
524
  SStbJoinPostJoinCtx*       pPost = &pStbJoin->ctx.post;
21,041✔
525
  SOperatorParam*            pParam = NULL;
21,041✔
526
  int32_t                    code  = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam);
21,041✔
527
  if (TSDB_CODE_SUCCESS != code) {
21,041!
528
    pOperator->pTaskInfo->code = code;
×
529
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
530
  }
531

532
  qDebug("%s dynamic post task begin", GET_TASKID(pOperator->pTaskInfo));
21,041✔
533
  code = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam, ppRes);
21,041✔
534
  if (*ppRes && (code == 0)) {
21,041!
535
    code = blockDataCheck(*ppRes);
734✔
536
    if (code) {
734!
537
      qError("Invalid block data, blockDataCheck failed, error:%s", tstrerror(code));
×
538
      pOperator->pTaskInfo->code = code;
×
539
      T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
540
    }
541
    pPost->isStarted = true;
734✔
542
    pStbJoin->execInfo.postBlkNum++;
734✔
543
    pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows;
734✔
544
    qDebug("%s join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
734✔
545
  } else {
546
    qDebug("%s Empty join res block retrieved", GET_TASKID(pOperator->pTaskInfo));
20,307✔
547
  }
548
}
21,041✔
549

550

551
static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) {
×
552
  SOperatorParam* pGcParam = NULL;
×
553
  SOperatorParam* pMergeJoinParam = NULL;
×
554
  int32_t         downstreamId = leftTable ? 0 : 1;
×
555
  int32_t         vgId = leftTable ? pPost->leftVgId : pPost->rightVgId;
×
556
  int64_t         uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid;
×
557

558
  qDebug("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId);
×
559

560
  int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid);
×
561
  if (TSDB_CODE_SUCCESS != code) {
×
562
    return code;
×
563
  }
564
  code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL);
×
565
  if (TSDB_CODE_SUCCESS != code) {
×
566
    return code;
×
567
  }
568

569
  return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam);
×
570
}
571

572
static int32_t handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo*          pStbJoin) {
21,040✔
573
  SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post;
21,040✔
574
  int32_t code = 0;
21,040✔
575
  
576
  pPost->isStarted = false;
21,040✔
577
  
578
  if (pStbJoin->basic.batchFetch) {
21,040✔
579
    return TSDB_CODE_SUCCESS;
21,016✔
580
  }
581
  
582
  if (pPost->leftNeedCache) {
24!
583
    uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
584
    if (num && --(*num) <= 0) {
×
585
      code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid));
×
586
      if (code) {
×
587
        qError("tSimpleHashRemove leftCurrUid %" PRId64 " from leftCache failed, error:%s", pPost->leftCurrUid, tstrerror(code));
×
588
        QRY_ERR_RET(code);
×
589
      }
590
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, true));
×
591
    }
592
  }
593
  
594
  if (!pPost->rightNeedCache) {
24!
595
    void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
24✔
596
    if (NULL != v) {
24!
597
      code = tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid));
×
598
      if (code) {
×
599
        qError("tSimpleHashRemove rightCurrUid %" PRId64 " from rightCache failed, error:%s", pPost->rightCurrUid, tstrerror(code));
×
600
        QRY_ERR_RET(code);
×
601
      }
602
      QRY_ERR_RET(notifySeqJoinTableCacheEnd(pOperator, pPost, false));
×
603
    }
604
  }
605

606
  return TSDB_CODE_SUCCESS;
24✔
607
}
608

609

610
static FORCE_INLINE int32_t seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
611
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,048✔
612
  SStbJoinPostJoinCtx*       pPost = &pInfo->stbJoin.ctx.post;
1,048✔
613
  SStbJoinPrevJoinCtx*       pPrev = &pInfo->stbJoin.ctx.prev;
1,048✔
614

615
  if (!pPost->isStarted) {
1,048✔
616
    return TSDB_CODE_SUCCESS;
315✔
617
  }
618
  
619
  qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo));
733✔
620
  
621
  *ppRes = getNextBlockFromDownstream(pOperator, 1);
733✔
622
  if (NULL == *ppRes) {
733!
623
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin));
733!
624
    pPrev->pListHead->readIdx++;
733✔
625
  } else {
626
    pInfo->stbJoin.execInfo.postBlkNum++;
×
627
    pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows;
×
628
  }
629

630
  return TSDB_CODE_SUCCESS;
733✔
631
}
632

633
static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) {
634
  SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize);
42,042✔
635
  if (NULL == ppArray) {
42,042✔
636
    SArray* pArray = taosArrayInit(10, valSize);
975✔
637
    if (NULL == pArray) {
975!
638
      return terrno;
×
639
    }
640
    if (NULL == taosArrayPush(pArray, pVal)) {
1,950!
641
      taosArrayDestroy(pArray);
×
642
      return terrno;
×
643
    }
644
    if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
975!
645
      taosArrayDestroy(pArray);      
×
646
      return terrno;
×
647
    }
648
    return TSDB_CODE_SUCCESS;
975✔
649
  }
650

651
  if (NULL == taosArrayPush(*ppArray, pVal)) {
82,134!
652
    return terrno;
×
653
  }
654
  
655
  return TSDB_CODE_SUCCESS;
41,067✔
656
}
657

658
static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) {
659
  int32_t code = TSDB_CODE_SUCCESS;
24✔
660
  uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize);
24✔
661
  if (NULL == pNum) {
24!
662
    uint32_t n = 1;
24✔
663
    code = tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n));
24✔
664
    if (code) {
24!
665
      return code;
×
666
    }
667
    code = tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0);
24✔
668
    if (code) {
24!
669
      return code;
×
670
    }
671
    return TSDB_CODE_SUCCESS;
24✔
672
  }
673

674
  switch (*pNum) {
×
675
    case 0:
×
676
      break;
×
677
    case UINT32_MAX:
×
678
      *pNum = 0;
×
679
      break;
×
680
    default:
×
681
      if (1 == (*pNum)) {
×
682
        code = tSimpleHashRemove(pOnceHash, pKey, keySize);
×
683
        if (code) {
×
684
          qError("tSimpleHashRemove failed in addToJoinTableHash, error:%s", tstrerror(code));
×
685
          QRY_ERR_RET(code);
×
686
        }
687
      }
688
      (*pNum)++;
×
689
      break;
×
690
  }
691
  
692
  return TSDB_CODE_SUCCESS;
×
693
}
694

695

696
static void freeStbJoinTableList(SStbJoinTableList* pList) {
314✔
697
  if (NULL == pList) {
314!
698
    return;
×
699
  }
700
  taosMemoryFree(pList->pLeftVg);
314!
701
  taosMemoryFree(pList->pLeftUid);
314!
702
  taosMemoryFree(pList->pRightVg);
314!
703
  taosMemoryFree(pList->pRightUid);
314!
704
  taosMemoryFree(pList);
314!
705
}
706

707
static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
315✔
708
  int32_t code = TSDB_CODE_SUCCESS;
315✔
709
  SStbJoinTableList* pNew = taosMemoryCalloc(1, sizeof(SStbJoinTableList));
315!
710
  if (NULL == pNew) {
315!
711
    return terrno;
×
712
  }
713
  pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg));
315!
714
  if (NULL == pNew->pLeftVg) {
315!
715
    code = terrno;
×
716
    freeStbJoinTableList(pNew);
×
717
    return code;
×
718
  }
719
  pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid));
315!
720
  if (NULL == pNew->pLeftUid) {
315!
721
    code = terrno;
×
722
    freeStbJoinTableList(pNew);
×
723
    return code;
×
724
  }
725
  pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg));
315!
726
  if (NULL == pNew->pRightVg) {
315!
727
    code = terrno;
×
728
    freeStbJoinTableList(pNew);
×
729
    return code;
×
730
  }
731
  pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid));
315!
732
  if (NULL == pNew->pRightUid) {
315!
733
    code = terrno;
×
734
    freeStbJoinTableList(pNew);
×
735
    return code;
×
736
  }
737

738
  TAOS_MEMCPY(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg));
315✔
739
  TAOS_MEMCPY(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid));
315✔
740
  TAOS_MEMCPY(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg));
315✔
741
  TAOS_MEMCPY(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid));
315✔
742

743
  pNew->readIdx = 0;
315✔
744
  pNew->uidNum = rows;
315✔
745
  pNew->pNext = NULL;
315✔
746
  
747
  if (pCtx->pListTail) {
315!
748
    pCtx->pListTail->pNext = pNew;
×
749
    pCtx->pListTail = pNew;
×
750
  } else {
751
    pCtx->pListHead = pNew;
315✔
752
    pCtx->pListTail= pNew;
315✔
753
  }
754

755
  return TSDB_CODE_SUCCESS;
315✔
756
}
757

758
static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
315✔
759
  int32_t                    code = TSDB_CODE_SUCCESS;
315✔
760
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
315✔
761
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
315✔
762
  SColumnInfoData*           pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]);
315✔
763
  if (NULL == pVg0) {
315!
764
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
765
  }
766
  SColumnInfoData*           pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]);
315✔
767
  if (NULL == pVg1) {
315!
768
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
769
  }
770
  SColumnInfoData*           pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]);
315✔
771
  if (NULL == pUid0) {
315!
772
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
773
  }
774
  SColumnInfoData*           pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]);
315✔
775
  if (NULL == pUid1) {
315!
776
    QRY_ERR_JRET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
777
  }
778

779
  if (pStbJoin->basic.batchFetch) {
315✔
780
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
21,324✔
781
      int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i);
21,021✔
782
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
21,021✔
783
      int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i);
21,021✔
784
      int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i);
21,021✔
785

786
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid));
21,021✔
787
      if (TSDB_CODE_SUCCESS != code) {
21,021!
788
        break;
×
789
      }
790
      code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid));
21,021✔
791
      if (TSDB_CODE_SUCCESS != code) {
21,021!
792
        break;
×
793
      }
794
    }
795
  } else {
796
    for (int32_t i = 0; i < pBlock->info.rows; ++i) {
36✔
797
      int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i);
24✔
798
    
799
      code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid));
24✔
800
      if (TSDB_CODE_SUCCESS != code) {
24!
801
        break;
×
802
      }
803
    }
804
  }
805

806
  if (TSDB_CODE_SUCCESS == code) {
315!
807
    code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
315✔
808
    if (TSDB_CODE_SUCCESS == code) {
315!
809
      pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
315✔
810
    }
811
  }
812

813
_return:
×
814

815
  if (TSDB_CODE_SUCCESS != code) {
315!
816
    pOperator->pTaskInfo->code = code;
×
817
    T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
×
818
  }
819
}
315✔
820

821

822
static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) {
11,035✔
823
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,035✔
824
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,035✔
825

826
  if (pStbJoin->basic.batchFetch) {
11,035✔
827
    return;
11,035✔
828
  }
829

830
  if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) {
12!
831
    tSimpleHashClear(pStbJoin->ctx.prev.leftCache);
12✔
832
    return;
12✔
833
  }
834

835
  uint64_t* pUid = NULL;
×
836
  int32_t iter = 0;
×
837
  int32_t code = 0;
×
838
  while (NULL != (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter))) {
×
839
    code = tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid));
×
840
    if (code) {
×
841
      qError("tSimpleHashRemove failed in postProcessStbJoinTableHash, error:%s", tstrerror(code));
×
842
    }
843
  }
844

845
  pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache);
×
846
  qDebug("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache));
×
847

848
/*
849
  // debug only
850
  iter = 0;
851
  uint32_t* num = NULL;
852
  while (NULL != (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter))) {
853
    A S S E R T(*num > 1);
854
  }
855
*/  
856
}
857

858
static void buildStbJoinTableList(SOperatorInfo* pOperator) {
11,035✔
859
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
11,035✔
860
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
11,035✔
861

862
  while (true) {
315✔
863
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
11,350✔
864
    if (NULL == pBlock) {
11,350✔
865
      break;
11,035✔
866
    }
867

868
    pStbJoin->execInfo.prevBlkNum++;
315✔
869
    pStbJoin->execInfo.prevBlkRows += pBlock->info.rows;
315✔
870
    
871
    doBuildStbJoinTableHash(pOperator, pBlock);
315✔
872
  }
873

874
  postProcessStbJoinTableHash(pOperator);
11,035✔
875

876
  pStbJoin->ctx.prev.joinBuild = true;
11,035✔
877
}
11,035✔
878

879
static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,048✔
880
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
1,048✔
881
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
1,048✔
882
  SStbJoinPrevJoinCtx*       pPrev = &pStbJoin->ctx.prev;
1,048✔
883
  SStbJoinTableList*         pNode = pPrev->pListHead;
1,048✔
884

885
  while (pNode) {
21,669✔
886
    if (pNode->readIdx >= pNode->uidNum) {
21,355✔
887
      pPrev->pListHead = pNode->pNext;
314✔
888
      freeStbJoinTableList(pNode);
314✔
889
      pNode = pPrev->pListHead;
314✔
890
      continue;
314✔
891
    }
892
    
893
    seqJoinLaunchNewRetrieveImpl(pOperator, ppRes);
21,041✔
894
    if (*ppRes) {
21,041✔
895
      return TSDB_CODE_SUCCESS;
734✔
896
    }
897

898
    QRY_ERR_RET(handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin));
20,307!
899
    pPrev->pListHead->readIdx++;
20,307✔
900
  }
901

902
  *ppRes = NULL;
314✔
903
  setOperatorCompleted(pOperator);
314✔
904

905
  return TSDB_CODE_SUCCESS;
314✔
906
}
907

908
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
11,768✔
909
  if (pBlock) {
11,768✔
910
    if (pStbJoin && pStbJoin->pOutputDataBlockDesc) {
734!
911
      pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
734✔
912
      if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
734!
913

914
      for (int i = pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
752✔
915
        SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
18✔
916
        if (pSlot == NULL) {
18!
NEW
917
          qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
×
NEW
918
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
919
        }
920
        SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
18✔
921
        colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
18✔
922
        int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
18✔
923
        if (code != TSDB_CODE_SUCCESS) {
18!
NEW
924
          return code;
×
925
        }
926
      }
927
    } else {
NEW
928
      qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
×
NEW
929
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
930
    }
931
  }
932
  return TSDB_CODE_SUCCESS;
11,768✔
933
}
934

935
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
12,060✔
936
  int32_t                    code = TSDB_CODE_SUCCESS;
12,060✔
937
  SDynQueryCtrlOperatorInfo* pInfo = pOperator->info;
12,060✔
938
  SStbJoinDynCtrlInfo*       pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin;
12,060✔
939

940
  QRY_PARAM_CHECK(pRes);
12,060!
941
  if (pOperator->status == OP_EXEC_DONE) {
12,060✔
942
    return code;
292✔
943
  }
944

945
  int64_t st = 0;
11,768✔
946
  if (pOperator->cost.openCost == 0) {
11,768✔
947
    st = taosGetTimestampUs();
11,035✔
948
  }
949

950
  if (!pStbJoin->ctx.prev.joinBuild) {
11,768✔
951
    buildStbJoinTableList(pOperator);
11,035✔
952
    if (pStbJoin->execInfo.prevBlkRows <= 0) {
11,035✔
953
      setOperatorCompleted(pOperator);
10,720✔
954
      goto _return;
10,720✔
955
    }
956
  }
957

958
  QRY_ERR_JRET(seqJoinContinueCurrRetrieve(pOperator, pRes));
1,048!
959
  if (*pRes) {
1,048!
960
    goto _return;
×
961
  }
962

963
  QRY_ERR_JRET(seqJoinLaunchNewRetrieve(pOperator, pRes));
1,048!
964

965
_return:
1,048✔
966
  if (pOperator->cost.openCost == 0) {
11,768✔
967
    pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
11,035✔
968
  }
969

970
  if (code) {
11,768!
971
    qError("%s failed since %s", __func__, tstrerror(code));
×
972
    pOperator->pTaskInfo->code = code;
×
973
    T_LONG_JMP(pOperator->pTaskInfo->env, code);
×
974
  } else {
975
    code = seqStableJoinComposeRes(pStbJoin, *pRes);
11,768✔
976
  }
977
  return code;
11,768✔
978
}
979

980
int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) {
11,035✔
981
  if (batchFetch) {
11,035✔
982
    pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
11,023✔
983
    if (NULL == pPrev->leftHash) {
11,023!
984
      return terrno;
×
985
    }
986
    pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
11,023✔
987
    if (NULL == pPrev->rightHash) {
11,023!
988
      return terrno;
×
989
    }
990
  } else {
991
    pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
992
    if (NULL == pPrev->leftCache) {
12!
993
      return terrno;
×
994
    }
995
    pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
996
    if (NULL == pPrev->rightCache) {
12!
997
      return terrno;
×
998
    }
999
    pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
12✔
1000
    if (NULL == pPrev->onceTable) {
12!
1001
      return terrno;
×
1002
    }
1003
  }
1004

1005
  return TSDB_CODE_SUCCESS;
11,035✔
1006
}
1007

1008
int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
11,035✔
1009
                                       SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo,
1010
                                       SOperatorInfo** pOptrInfo) {
1011
  QRY_PARAM_CHECK(pOptrInfo);
11,035!
1012

1013
  int32_t                    code = TSDB_CODE_SUCCESS;
11,035✔
1014
  __optr_fn_t                nextFp = NULL;
11,035✔
1015
  SOperatorInfo*             pOperator = NULL;
11,035✔
1016
  SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo));
11,035!
1017
  if (pInfo == NULL) {
11,035!
1018
    code = terrno;
×
1019
    goto _error;
×
1020
  }
1021

1022
  pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
11,035!
1023
  if (pOperator == NULL) {
11,035!
1024
    code = terrno;
×
1025
    goto _error;
×
1026
  }
1027

1028
  pTaskInfo->dynamicTask = pPhyciNode->node.dynamicOp;
11,035✔
1029

1030
  code = appendDownstream(pOperator, pDownstream, numOfDownstream);
11,035✔
1031
  if (TSDB_CODE_SUCCESS != code) {
11,035!
1032
    goto _error;
×
1033
  }
1034

1035
  pInfo->qType = pPhyciNode->qType;
11,035✔
1036
  switch (pInfo->qType) {
11,035!
1037
    case DYN_QTYPE_STB_HASH:
11,035✔
1038
      TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
11,035✔
1039
      pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
11,035✔
1040
      code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
11,035✔
1041
      if (TSDB_CODE_SUCCESS != code) {
11,035!
1042
        goto _error;
×
1043
      }
1044
      nextFp = seqStableJoin;
11,035✔
1045
      break;
11,035✔
1046
    default:
×
1047
      qError("unsupported dynamic query ctrl type: %d", pInfo->qType);
×
1048
      code = TSDB_CODE_INVALID_PARA;
×
1049
      goto _error;
×
1050
  }
1051

1052
  setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED,
11,035✔
1053
                  pInfo, pTaskInfo);
1054

1055
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn,
11,035✔
1056
                                         NULL, optrDefaultGetNextExtFn, NULL);
1057

1058
  *pOptrInfo = pOperator;
11,035✔
1059
  return TSDB_CODE_SUCCESS;
11,035✔
1060

1061
_error:
×
1062
  if (pInfo != NULL) {
×
1063
    destroyDynQueryCtrlOperator(pInfo);
×
1064
  }
1065

1066
  destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
×
1067
  pTaskInfo->code = code;
×
1068
  return code;
×
1069
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc